elasticsearch_return.py (salt-3002.1) | : | elasticsearch_return.py (salt-3002.2) | ||
---|---|---|---|---|
# -*- coding: utf-8 -*- | ||||
""" | """ | |||
Return data to an elasticsearch server for indexing. | Return data to an elasticsearch server for indexing. | |||
:maintainer: Jurnell Cockhren <jurnell.cockhren@sophicware.com>, Arnold Becht oldt <mail@arnoldbechtoldt.com> | :maintainer: Jurnell Cockhren <jurnell.cockhren@sophicware.com>, Arnold Becht oldt <mail@arnoldbechtoldt.com> | |||
:maturity: New | :maturity: New | |||
:depends: `elasticsearch-py <https://elasticsearch-py.readthedocs.io/en/la test/>`_ | :depends: `elasticsearch-py <https://elasticsearch-py.readthedocs.io/en/la test/>`_ | |||
:platform: all | :platform: all | |||
To enable this returner the elasticsearch python client must be installed | To enable this returner the elasticsearch python client must be installed | |||
on the desired minions (all or some subset). | on the desired minions (all or some subset). | |||
skipping to change at line 97 | skipping to change at line 96 | |||
number_of_replicas: 1 | number_of_replicas: 1 | |||
debug_returner_payload: True | debug_returner_payload: True | |||
states_count: True | states_count: True | |||
states_order_output: True | states_order_output: True | |||
states_single_index: True | states_single_index: True | |||
functions_blacklist: | functions_blacklist: | |||
- test.ping | - test.ping | |||
- saltutil.find_job | - saltutil.find_job | |||
""" | """ | |||
# Import Python libs | ||||
from __future__ import absolute_import, print_function, unicode_literals | ||||
import datetime | import datetime | |||
import logging | import logging | |||
import uuid | import uuid | |||
from datetime import timedelta, tzinfo | from datetime import timedelta, tzinfo | |||
# Import Salt libs | ||||
import salt.returners | import salt.returners | |||
import salt.utils.jid | import salt.utils.jid | |||
import salt.utils.json | import salt.utils.json | |||
# Import 3rd-party libs | ||||
from salt.ext import six | ||||
__virtualname__ = "elasticsearch" | __virtualname__ = "elasticsearch" | |||
log = logging.getLogger(__name__) | log = logging.getLogger(__name__) | |||
STATE_FUNCTIONS = { | STATE_FUNCTIONS = { | |||
"state.apply": "state_apply", | "state.apply": "state_apply", | |||
"state.highstate": "state_apply", | "state.highstate": "state_apply", | |||
"state.sls": "state_apply", | "state.sls": "state_apply", | |||
} | } | |||
def __virtual__(): | def __virtual__(): | |||
if "elasticsearch.index_exists" not in __salt__: | ||||
return ( | ||||
False, | ||||
"Elasticsearch module not availble. Check that the elasticsearch li | ||||
brary is installed.", | ||||
) | ||||
return __virtualname__ | return __virtualname__ | |||
def _get_options(ret=None): | def _get_options(ret=None): | |||
""" | """ | |||
Get the returner options from salt. | Get the returner options from salt. | |||
""" | """ | |||
defaults = { | defaults = { | |||
"debug_returner_payload": False, | "debug_returner_payload": False, | |||
"doc_type": "default", | "doc_type": "default", | |||
skipping to change at line 184 | skipping to change at line 181 | |||
index_exists = __salt__["elasticsearch.index_exists"](index) | index_exists = __salt__["elasticsearch.index_exists"](index) | |||
if not index_exists: | if not index_exists: | |||
options = _get_options() | options = _get_options() | |||
index_definition = { | index_definition = { | |||
"settings": { | "settings": { | |||
"number_of_shards": options["number_of_shards"], | "number_of_shards": options["number_of_shards"], | |||
"number_of_replicas": options["number_of_replicas"], | "number_of_replicas": options["number_of_replicas"], | |||
} | } | |||
} | } | |||
__salt__["elasticsearch.index_create"]("{0}-v1".format(index), index_def | __salt__["elasticsearch.index_create"]("{}-v1".format(index), index_defi | |||
inition) | nition) | |||
__salt__["elasticsearch.alias_create"]("{0}-v1".format(index), index) | __salt__["elasticsearch.alias_create"]("{}-v1".format(index), index) | |||
def _convert_keys(data): | def _convert_keys(data): | |||
if isinstance(data, dict): | if isinstance(data, dict): | |||
new_data = {} | new_data = {} | |||
for k, sub_data in data.items(): | for k, sub_data in data.items(): | |||
if "." in k: | if "." in k: | |||
new_data["_orig_key"] = k | new_data["_orig_key"] = k | |||
k = k.replace(".", "_") | k = k.replace(".", "_") | |||
new_data[k] = _convert_keys(sub_data) | new_data[k] = _convert_keys(sub_data) | |||
elif isinstance(data, list): | elif isinstance(data, list): | |||
skipping to change at line 236 | skipping to change at line 233 | |||
if ret.get("data", None) is None and ret.get("return") is None: | if ret.get("data", None) is None and ret.get("return") is None: | |||
log.info( | log.info( | |||
"Won't push new data to Elasticsearch, job with jid=%s was " | "Won't push new data to Elasticsearch, job with jid=%s was " | |||
"not successful", | "not successful", | |||
job_id, | job_id, | |||
) | ) | |||
return | return | |||
# Build the index name | # Build the index name | |||
if options["states_single_index"] and job_fun in STATE_FUNCTIONS: | if options["states_single_index"] and job_fun in STATE_FUNCTIONS: | |||
index = "salt-{0}".format(STATE_FUNCTIONS[job_fun]) | index = "salt-{}".format(STATE_FUNCTIONS[job_fun]) | |||
else: | else: | |||
index = "salt-{0}".format(job_fun_escaped) | index = "salt-{}".format(job_fun_escaped) | |||
if options["index_date"]: | if options["index_date"]: | |||
index = "{0}-{1}".format(index, datetime.date.today().strftime("%Y.%m.%d ")) | index = "{}-{}".format(index, datetime.date.today().strftime("%Y.%m.%d") ) | |||
counts = {} | counts = {} | |||
# Do some special processing for state returns | # Do some special processing for state returns | |||
if job_fun in STATE_FUNCTIONS: | if job_fun in STATE_FUNCTIONS: | |||
# Init the state counts | # Init the state counts | |||
if options["states_count"]: | if options["states_count"]: | |||
counts = { | counts = { | |||
"succeeded": 0, | "succeeded": 0, | |||
"failed": 0, | "failed": 0, | |||
} | } | |||
# Prepend each state execution key in ret['return'] with a zero-padded | # Prepend each state execution key in ret['return'] with a zero-padded | |||
# version of the '__run_num__' field allowing the states to be ordered | # version of the '__run_num__' field allowing the states to be ordered | |||
# more easily. Change the index to be | # more easily. Change the index to be | |||
# index to be '<index>-ordered' so as not to clash with the unsorted | # index to be '<index>-ordered' so as not to clash with the unsorted | |||
# index data format | # index data format | |||
if options["states_order_output"] and isinstance(ret["return"], dict): | if options["states_order_output"] and isinstance(ret["return"], dict): | |||
index = "{0}-ordered".format(index) | index = "{}-ordered".format(index) | |||
max_chars = len(six.text_type(len(ret["return"]))) | max_chars = len(str(len(ret["return"]))) | |||
for uid, data in six.iteritems(ret["return"]): | for uid, data in ret["return"].items(): | |||
# Skip keys we've already prefixed | # Skip keys we've already prefixed | |||
if uid.startswith(tuple("0123456789")): | if uid.startswith(tuple("0123456789")): | |||
continue | continue | |||
# Store the function being called as it's a useful key to search | # Store the function being called as it's a useful key to search | |||
decoded_uid = uid.split("_|-") | decoded_uid = uid.split("_|-") | |||
ret["return"][uid]["_func"] = "{0}.{1}".format( | ret["return"][uid]["_func"] = "{}.{}".format( | |||
decoded_uid[0], decoded_uid[-1] | decoded_uid[0], decoded_uid[-1] | |||
) | ) | |||
# Prefix the key with the run order so it can be sorted | # Prefix the key with the run order so it can be sorted | |||
new_uid = "{0}_|-{1}".format( | new_uid = "{}_|-{}".format( | |||
six.text_type(data["__run_num__"]).zfill(max_chars), uid, | str(data["__run_num__"]).zfill(max_chars), uid, | |||
) | ) | |||
ret["return"][new_uid] = ret["return"].pop(uid) | ret["return"][new_uid] = ret["return"].pop(uid) | |||
# Catch a state output that has failed and where the error message is | # Catch a state output that has failed and where the error message is | |||
# not in a dict as expected. This prevents elasticsearch from | # not in a dict as expected. This prevents elasticsearch from | |||
# complaining about a mapping error | # complaining about a mapping error | |||
elif not isinstance(ret["return"], dict): | elif not isinstance(ret["return"], dict): | |||
ret["return"] = {"return": ret["return"]} | ret["return"] = {"return": ret["return"]} | |||
skipping to change at line 341 | skipping to change at line 338 | |||
Return events to Elasticsearch | Return events to Elasticsearch | |||
Requires that the `event_return` configuration be set in master config. | Requires that the `event_return` configuration be set in master config. | |||
""" | """ | |||
options = _get_options() | options = _get_options() | |||
index = options["master_event_index"] | index = options["master_event_index"] | |||
doc_type = options["master_event_doc_type"] | doc_type = options["master_event_doc_type"] | |||
if options["index_date"]: | if options["index_date"]: | |||
index = "{0}-{1}".format(index, datetime.date.today().strftime("%Y.%m.%d ")) | index = "{}-{}".format(index, datetime.date.today().strftime("%Y.%m.%d") ) | |||
_ensure_index(index) | _ensure_index(index) | |||
for event in events: | for event in events: | |||
data = {"tag": event.get("tag", ""), "data": event.get("data", "")} | data = {"tag": event.get("tag", ""), "data": event.get("data", "")} | |||
ret = __salt__["elasticsearch.document_create"]( | ret = __salt__["elasticsearch.document_create"]( | |||
index=index, | index=index, | |||
doc_type=doc_type, | doc_type=doc_type, | |||
id=uuid.uuid4(), | id=uuid.uuid4(), | |||
End of changes. 14 change blocks. | ||||
21 lines changed or deleted | 19 lines changed or added |