"Fossies" - the Fresh Open Source Software Archive

Member "salt-3002.2/salt/returners/elasticsearch_return.py" (18 Nov 2020, 12452 Bytes) of package /linux/misc/salt-3002.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "elasticsearch_return.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3002.1_vs_3002.2.

    1 """
    2 Return data to an elasticsearch server for indexing.
    3 
    4 :maintainer:    Jurnell Cockhren <jurnell.cockhren@sophicware.com>, Arnold Bechtoldt <mail@arnoldbechtoldt.com>
    5 :maturity:      New
    6 :depends:       `elasticsearch-py <https://elasticsearch-py.readthedocs.io/en/latest/>`_
    7 :platform:      all
    8 
    9 To enable this returner the elasticsearch python client must be installed
   10 on the desired minions (all or some subset).
   11 
   12 Please see documentation of :mod:`elasticsearch execution module <salt.modules.elasticsearch>`
   13 for a valid connection configuration.
   14 
   15 .. warning::
   16 
   17         The index that you wish to store documents will be created by Elasticsearch automatically if
   18         doesn't exist yet. It is highly recommended to create predefined index templates with appropriate mapping(s)
   19         that will be used by Elasticsearch upon index creation. Otherwise you will have problems as described in #20826.
   20 
   21 To use the returner per salt call:
   22 
   23 .. code-block:: bash
   24 
   25     salt '*' test.ping --return elasticsearch
   26 
   27 In order to have the returner apply to all minions:
   28 
   29 .. code-block:: yaml
   30 
   31     ext_job_cache: elasticsearch
   32 
   33 Minion configuration:
   34     debug_returner_payload': False
   35         Output the payload being posted to the log file in debug mode
   36 
   37     doc_type: 'default'
   38         Document type to use for normal return messages
   39 
   40     functions_blacklist
   41         Optional list of functions that should not be returned to elasticsearch
   42 
   43     index_date: False
   44         Use a dated index (e.g. <index>-2016.11.29)
   45 
   46     master_event_index: 'salt-master-event-cache'
   47         Index to use when returning master events
   48 
   49     master_event_doc_type: 'efault'
   50         Document type to use got master events
   51 
   52     master_job_cache_index: 'salt-master-job-cache'
   53         Index to use for master job cache
   54 
   55     master_job_cache_doc_type: 'default'
   56         Document type to use for master job cache
   57 
   58     number_of_shards: 1
   59         Number of shards to use for the indexes
   60 
   61     number_of_replicas: 0
   62         Number of replicas to use for the indexes
   63 
   64     NOTE: The following options are valid for 'state.apply', 'state.sls' and 'state.highstate' functions only.
   65 
   66     states_count: False
   67         Count the number of states which succeeded or failed and return it in top-level item called 'counts'.
   68         States reporting None (i.e. changes would be made but it ran in test mode) are counted as successes.
   69     states_order_output: False
   70         Prefix the state UID (e.g. file_|-yum_configured_|-/etc/yum.conf_|-managed) with a zero-padded version
   71         of the '__run_num__' value to allow for easier sorting. Also store the state function (i.e. file.managed)
   72         into a new key '_func'. Change the index to be '<index>-ordered' (e.g. salt-state_apply-ordered).
   73     states_single_index: False
   74         Store results for state.apply, state.sls and state.highstate in the salt-state_apply index
   75         (or -ordered/-<date>) indexes if enabled
   76 
   77 .. code-block:: yaml
   78 
   79     elasticsearch:
   80         hosts:
   81           - "10.10.10.10:9200"
   82           - "10.10.10.11:9200"
   83           - "10.10.10.12:9200"
   84         index_date: True
   85         number_of_shards: 5
   86         number_of_replicas: 1
   87         debug_returner_payload: True
   88         states_count: True
   89         states_order_output: True
   90         states_single_index: True
   91         functions_blacklist:
   92           - test.ping
   93           - saltutil.find_job
   94 """
   95 
   96 
   97 import datetime
   98 import logging
   99 import uuid
  100 from datetime import timedelta, tzinfo
  101 
  102 import salt.returners
  103 import salt.utils.jid
  104 import salt.utils.json
  105 
  106 __virtualname__ = "elasticsearch"
  107 
  108 log = logging.getLogger(__name__)
  109 
  110 STATE_FUNCTIONS = {
  111     "state.apply": "state_apply",
  112     "state.highstate": "state_apply",
  113     "state.sls": "state_apply",
  114 }
  115 
  116 
  117 def __virtual__():
  118     if "elasticsearch.index_exists" not in __salt__:
  119         return (
  120             False,
  121             "Elasticsearch module not availble.  Check that the elasticsearch library is installed.",
  122         )
  123     return __virtualname__
  124 
  125 
  126 def _get_options(ret=None):
  127     """
  128     Get the returner options from salt.
  129     """
  130 
  131     defaults = {
  132         "debug_returner_payload": False,
  133         "doc_type": "default",
  134         "functions_blacklist": [],
  135         "index_date": False,
  136         "master_event_index": "salt-master-event-cache",
  137         "master_event_doc_type": "default",
  138         "master_job_cache_index": "salt-master-job-cache",
  139         "master_job_cache_doc_type": "default",
  140         "number_of_shards": 1,
  141         "number_of_replicas": 0,
  142         "states_order_output": False,
  143         "states_count": False,
  144         "states_single_index": False,
  145     }
  146 
  147     attrs = {
  148         "debug_returner_payload": "debug_returner_payload",
  149         "doc_type": "doc_type",
  150         "functions_blacklist": "functions_blacklist",
  151         "index_date": "index_date",
  152         "master_event_index": "master_event_index",
  153         "master_event_doc_type": "master_event_doc_type",
  154         "master_job_cache_index": "master_job_cache_index",
  155         "master_job_cache_doc_type": "master_job_cache_doc_type",
  156         "number_of_shards": "number_of_shards",
  157         "number_of_replicas": "number_of_replicas",
  158         "states_count": "states_count",
  159         "states_order_output": "states_order_output",
  160         "states_single_index": "states_single_index",
  161     }
  162 
  163     _options = salt.returners.get_returner_options(
  164         __virtualname__,
  165         ret,
  166         attrs,
  167         __salt__=__salt__,
  168         __opts__=__opts__,
  169         defaults=defaults,
  170     )
  171     return _options
  172 
  173 
  174 def _ensure_index(index):
  175     index_exists = __salt__["elasticsearch.index_exists"](index)
  176     if not index_exists:
  177         options = _get_options()
  178 
  179         index_definition = {
  180             "settings": {
  181                 "number_of_shards": options["number_of_shards"],
  182                 "number_of_replicas": options["number_of_replicas"],
  183             }
  184         }
  185         __salt__["elasticsearch.index_create"]("{}-v1".format(index), index_definition)
  186         __salt__["elasticsearch.alias_create"]("{}-v1".format(index), index)
  187 
  188 
  189 def _convert_keys(data):
  190     if isinstance(data, dict):
  191         new_data = {}
  192         for k, sub_data in data.items():
  193             if "." in k:
  194                 new_data["_orig_key"] = k
  195                 k = k.replace(".", "_")
  196             new_data[k] = _convert_keys(sub_data)
  197     elif isinstance(data, list):
  198         new_data = []
  199         for item in data:
  200             new_data.append(_convert_keys(item))
  201     else:
  202         return data
  203 
  204     return new_data
  205 
  206 
  207 def returner(ret):
  208     """
  209     Process the return from Salt
  210     """
  211 
  212     job_fun = ret["fun"]
  213     job_fun_escaped = job_fun.replace(".", "_")
  214     job_id = ret["jid"]
  215     job_retcode = ret.get("retcode", 1)
  216     job_success = True if not job_retcode else False
  217 
  218     options = _get_options(ret)
  219 
  220     if job_fun in options["functions_blacklist"]:
  221         log.info(
  222             "Won't push new data to Elasticsearch, job with jid=%s and "
  223             "function=%s which is in the user-defined list of ignored "
  224             "functions",
  225             job_id,
  226             job_fun,
  227         )
  228         return
  229     if ret.get("data", None) is None and ret.get("return") is None:
  230         log.info(
  231             "Won't push new data to Elasticsearch, job with jid=%s was "
  232             "not successful",
  233             job_id,
  234         )
  235         return
  236 
  237     # Build the index name
  238     if options["states_single_index"] and job_fun in STATE_FUNCTIONS:
  239         index = "salt-{}".format(STATE_FUNCTIONS[job_fun])
  240     else:
  241         index = "salt-{}".format(job_fun_escaped)
  242 
  243     if options["index_date"]:
  244         index = "{}-{}".format(index, datetime.date.today().strftime("%Y.%m.%d"))
  245 
  246     counts = {}
  247 
  248     # Do some special processing for state returns
  249     if job_fun in STATE_FUNCTIONS:
  250         # Init the state counts
  251         if options["states_count"]:
  252             counts = {
  253                 "succeeded": 0,
  254                 "failed": 0,
  255             }
  256 
  257         # Prepend each state execution key in ret['return'] with a zero-padded
  258         # version of the '__run_num__' field allowing the states to be ordered
  259         # more easily. Change the index to be
  260         # index to be '<index>-ordered' so as not to clash with the unsorted
  261         # index data format
  262         if options["states_order_output"] and isinstance(ret["return"], dict):
  263             index = "{}-ordered".format(index)
  264             max_chars = len(str(len(ret["return"])))
  265 
  266             for uid, data in ret["return"].items():
  267                 # Skip keys we've already prefixed
  268                 if uid.startswith(tuple("0123456789")):
  269                     continue
  270 
  271                 # Store the function being called as it's a useful key to search
  272                 decoded_uid = uid.split("_|-")
  273                 ret["return"][uid]["_func"] = "{}.{}".format(
  274                     decoded_uid[0], decoded_uid[-1]
  275                 )
  276 
  277                 # Prefix the key with the run order so it can be sorted
  278                 new_uid = "{}_|-{}".format(
  279                     str(data["__run_num__"]).zfill(max_chars), uid,
  280                 )
  281 
  282                 ret["return"][new_uid] = ret["return"].pop(uid)
  283 
  284         # Catch a state output that has failed and where the error message is
  285         # not in a dict as expected. This prevents elasticsearch from
  286         # complaining about a mapping error
  287         elif not isinstance(ret["return"], dict):
  288             ret["return"] = {"return": ret["return"]}
  289 
  290         # Need to count state successes and failures
  291         if options["states_count"]:
  292             for state_data in ret["return"].values():
  293                 if state_data["result"] is False:
  294                     counts["failed"] += 1
  295                 else:
  296                     counts["succeeded"] += 1
  297 
  298     # Ensure the index exists
  299     _ensure_index(index)
  300 
  301     # Build the payload
  302     class UTC(tzinfo):
  303         def utcoffset(self, dt):
  304             return timedelta(0)
  305 
  306         def tzname(self, dt):
  307             return "UTC"
  308 
  309         def dst(self, dt):
  310             return timedelta(0)
  311 
  312     utc = UTC()
  313     data = {
  314         "@timestamp": datetime.datetime.now(utc).isoformat(),
  315         "success": job_success,
  316         "retcode": job_retcode,
  317         "minion": ret["id"],
  318         "fun": job_fun,
  319         "jid": job_id,
  320         "counts": counts,
  321         "data": _convert_keys(ret["return"]),
  322     }
  323 
  324     if options["debug_returner_payload"]:
  325         log.debug("elasicsearch payload: %s", data)
  326 
  327     # Post the payload
  328     ret = __salt__["elasticsearch.document_create"](
  329         index=index, doc_type=options["doc_type"], body=salt.utils.json.dumps(data)
  330     )
  331 
  332 
  333 def event_return(events):
  334     """
  335     Return events to Elasticsearch
  336 
  337     Requires that the `event_return` configuration be set in master config.
  338     """
  339     options = _get_options()
  340 
  341     index = options["master_event_index"]
  342     doc_type = options["master_event_doc_type"]
  343 
  344     if options["index_date"]:
  345         index = "{}-{}".format(index, datetime.date.today().strftime("%Y.%m.%d"))
  346 
  347     _ensure_index(index)
  348 
  349     for event in events:
  350         data = {"tag": event.get("tag", ""), "data": event.get("data", "")}
  351 
  352     ret = __salt__["elasticsearch.document_create"](
  353         index=index,
  354         doc_type=doc_type,
  355         id=uuid.uuid4(),
  356         body=salt.utils.json.dumps(data),
  357     )
  358 
  359 
  360 def prep_jid(nocache=False, passed_jid=None):  # pylint: disable=unused-argument
  361     """
  362     Do any work necessary to prepare a JID, including sending a custom id
  363     """
  364     return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)
  365 
  366 
  367 def save_load(jid, load, minions=None):
  368     """
  369     Save the load to the specified jid id
  370 
  371     .. versionadded:: 2015.8.1
  372     """
  373     options = _get_options()
  374 
  375     index = options["master_job_cache_index"]
  376     doc_type = options["master_job_cache_doc_type"]
  377 
  378     _ensure_index(index)
  379 
  380     data = {
  381         "jid": jid,
  382         "load": load,
  383     }
  384 
  385     ret = __salt__["elasticsearch.document_create"](
  386         index=index, doc_type=doc_type, id=jid, body=salt.utils.json.dumps(data)
  387     )
  388 
  389 
  390 def get_load(jid):
  391     """
  392     Return the load data that marks a specified jid
  393 
  394     .. versionadded:: 2015.8.1
  395     """
  396     options = _get_options()
  397 
  398     index = options["master_job_cache_index"]
  399     doc_type = options["master_job_cache_doc_type"]
  400 
  401     data = __salt__["elasticsearch.document_get"](
  402         index=index, id=jid, doc_type=doc_type
  403     )
  404     if data:
  405         return salt.utils.json.loads(data)
  406     return {}