"Fossies" - the Fresh Open Source Software Archive 
Member "salt-3002.2/salt/returners/elasticsearch_return.py" (18 Nov 2020, 12452 Bytes) of package /linux/misc/salt-3002.2.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "elasticsearch_return.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
3002.1_vs_3002.2.
1 """
2 Return data to an elasticsearch server for indexing.
3
4 :maintainer: Jurnell Cockhren <jurnell.cockhren@sophicware.com>, Arnold Bechtoldt <mail@arnoldbechtoldt.com>
5 :maturity: New
6 :depends: `elasticsearch-py <https://elasticsearch-py.readthedocs.io/en/latest/>`_
7 :platform: all
8
9 To enable this returner the elasticsearch python client must be installed
10 on the desired minions (all or some subset).
11
12 Please see documentation of :mod:`elasticsearch execution module <salt.modules.elasticsearch>`
13 for a valid connection configuration.
14
15 .. warning::
16
17 The index that you wish to store documents will be created by Elasticsearch automatically if
18 doesn't exist yet. It is highly recommended to create predefined index templates with appropriate mapping(s)
19 that will be used by Elasticsearch upon index creation. Otherwise you will have problems as described in #20826.
20
21 To use the returner per salt call:
22
23 .. code-block:: bash
24
25 salt '*' test.ping --return elasticsearch
26
27 In order to have the returner apply to all minions:
28
29 .. code-block:: yaml
30
31 ext_job_cache: elasticsearch
32
33 Minion configuration:
34 debug_returner_payload': False
35 Output the payload being posted to the log file in debug mode
36
37 doc_type: 'default'
38 Document type to use for normal return messages
39
40 functions_blacklist
41 Optional list of functions that should not be returned to elasticsearch
42
43 index_date: False
44 Use a dated index (e.g. <index>-2016.11.29)
45
46 master_event_index: 'salt-master-event-cache'
47 Index to use when returning master events
48
49 master_event_doc_type: 'efault'
50 Document type to use got master events
51
52 master_job_cache_index: 'salt-master-job-cache'
53 Index to use for master job cache
54
55 master_job_cache_doc_type: 'default'
56 Document type to use for master job cache
57
58 number_of_shards: 1
59 Number of shards to use for the indexes
60
61 number_of_replicas: 0
62 Number of replicas to use for the indexes
63
64 NOTE: The following options are valid for 'state.apply', 'state.sls' and 'state.highstate' functions only.
65
66 states_count: False
67 Count the number of states which succeeded or failed and return it in top-level item called 'counts'.
68 States reporting None (i.e. changes would be made but it ran in test mode) are counted as successes.
69 states_order_output: False
70 Prefix the state UID (e.g. file_|-yum_configured_|-/etc/yum.conf_|-managed) with a zero-padded version
71 of the '__run_num__' value to allow for easier sorting. Also store the state function (i.e. file.managed)
72 into a new key '_func'. Change the index to be '<index>-ordered' (e.g. salt-state_apply-ordered).
73 states_single_index: False
74 Store results for state.apply, state.sls and state.highstate in the salt-state_apply index
75 (or -ordered/-<date>) indexes if enabled
76
77 .. code-block:: yaml
78
79 elasticsearch:
80 hosts:
81 - "10.10.10.10:9200"
82 - "10.10.10.11:9200"
83 - "10.10.10.12:9200"
84 index_date: True
85 number_of_shards: 5
86 number_of_replicas: 1
87 debug_returner_payload: True
88 states_count: True
89 states_order_output: True
90 states_single_index: True
91 functions_blacklist:
92 - test.ping
93 - saltutil.find_job
94 """
95
96
97 import datetime
98 import logging
99 import uuid
100 from datetime import timedelta, tzinfo
101
102 import salt.returners
103 import salt.utils.jid
104 import salt.utils.json
105
106 __virtualname__ = "elasticsearch"
107
108 log = logging.getLogger(__name__)
109
110 STATE_FUNCTIONS = {
111 "state.apply": "state_apply",
112 "state.highstate": "state_apply",
113 "state.sls": "state_apply",
114 }
115
116
117 def __virtual__():
118 if "elasticsearch.index_exists" not in __salt__:
119 return (
120 False,
121 "Elasticsearch module not availble. Check that the elasticsearch library is installed.",
122 )
123 return __virtualname__
124
125
126 def _get_options(ret=None):
127 """
128 Get the returner options from salt.
129 """
130
131 defaults = {
132 "debug_returner_payload": False,
133 "doc_type": "default",
134 "functions_blacklist": [],
135 "index_date": False,
136 "master_event_index": "salt-master-event-cache",
137 "master_event_doc_type": "default",
138 "master_job_cache_index": "salt-master-job-cache",
139 "master_job_cache_doc_type": "default",
140 "number_of_shards": 1,
141 "number_of_replicas": 0,
142 "states_order_output": False,
143 "states_count": False,
144 "states_single_index": False,
145 }
146
147 attrs = {
148 "debug_returner_payload": "debug_returner_payload",
149 "doc_type": "doc_type",
150 "functions_blacklist": "functions_blacklist",
151 "index_date": "index_date",
152 "master_event_index": "master_event_index",
153 "master_event_doc_type": "master_event_doc_type",
154 "master_job_cache_index": "master_job_cache_index",
155 "master_job_cache_doc_type": "master_job_cache_doc_type",
156 "number_of_shards": "number_of_shards",
157 "number_of_replicas": "number_of_replicas",
158 "states_count": "states_count",
159 "states_order_output": "states_order_output",
160 "states_single_index": "states_single_index",
161 }
162
163 _options = salt.returners.get_returner_options(
164 __virtualname__,
165 ret,
166 attrs,
167 __salt__=__salt__,
168 __opts__=__opts__,
169 defaults=defaults,
170 )
171 return _options
172
173
174 def _ensure_index(index):
175 index_exists = __salt__["elasticsearch.index_exists"](index)
176 if not index_exists:
177 options = _get_options()
178
179 index_definition = {
180 "settings": {
181 "number_of_shards": options["number_of_shards"],
182 "number_of_replicas": options["number_of_replicas"],
183 }
184 }
185 __salt__["elasticsearch.index_create"]("{}-v1".format(index), index_definition)
186 __salt__["elasticsearch.alias_create"]("{}-v1".format(index), index)
187
188
189 def _convert_keys(data):
190 if isinstance(data, dict):
191 new_data = {}
192 for k, sub_data in data.items():
193 if "." in k:
194 new_data["_orig_key"] = k
195 k = k.replace(".", "_")
196 new_data[k] = _convert_keys(sub_data)
197 elif isinstance(data, list):
198 new_data = []
199 for item in data:
200 new_data.append(_convert_keys(item))
201 else:
202 return data
203
204 return new_data
205
206
207 def returner(ret):
208 """
209 Process the return from Salt
210 """
211
212 job_fun = ret["fun"]
213 job_fun_escaped = job_fun.replace(".", "_")
214 job_id = ret["jid"]
215 job_retcode = ret.get("retcode", 1)
216 job_success = True if not job_retcode else False
217
218 options = _get_options(ret)
219
220 if job_fun in options["functions_blacklist"]:
221 log.info(
222 "Won't push new data to Elasticsearch, job with jid=%s and "
223 "function=%s which is in the user-defined list of ignored "
224 "functions",
225 job_id,
226 job_fun,
227 )
228 return
229 if ret.get("data", None) is None and ret.get("return") is None:
230 log.info(
231 "Won't push new data to Elasticsearch, job with jid=%s was "
232 "not successful",
233 job_id,
234 )
235 return
236
237 # Build the index name
238 if options["states_single_index"] and job_fun in STATE_FUNCTIONS:
239 index = "salt-{}".format(STATE_FUNCTIONS[job_fun])
240 else:
241 index = "salt-{}".format(job_fun_escaped)
242
243 if options["index_date"]:
244 index = "{}-{}".format(index, datetime.date.today().strftime("%Y.%m.%d"))
245
246 counts = {}
247
248 # Do some special processing for state returns
249 if job_fun in STATE_FUNCTIONS:
250 # Init the state counts
251 if options["states_count"]:
252 counts = {
253 "succeeded": 0,
254 "failed": 0,
255 }
256
257 # Prepend each state execution key in ret['return'] with a zero-padded
258 # version of the '__run_num__' field allowing the states to be ordered
259 # more easily. Change the index to be
260 # index to be '<index>-ordered' so as not to clash with the unsorted
261 # index data format
262 if options["states_order_output"] and isinstance(ret["return"], dict):
263 index = "{}-ordered".format(index)
264 max_chars = len(str(len(ret["return"])))
265
266 for uid, data in ret["return"].items():
267 # Skip keys we've already prefixed
268 if uid.startswith(tuple("0123456789")):
269 continue
270
271 # Store the function being called as it's a useful key to search
272 decoded_uid = uid.split("_|-")
273 ret["return"][uid]["_func"] = "{}.{}".format(
274 decoded_uid[0], decoded_uid[-1]
275 )
276
277 # Prefix the key with the run order so it can be sorted
278 new_uid = "{}_|-{}".format(
279 str(data["__run_num__"]).zfill(max_chars), uid,
280 )
281
282 ret["return"][new_uid] = ret["return"].pop(uid)
283
284 # Catch a state output that has failed and where the error message is
285 # not in a dict as expected. This prevents elasticsearch from
286 # complaining about a mapping error
287 elif not isinstance(ret["return"], dict):
288 ret["return"] = {"return": ret["return"]}
289
290 # Need to count state successes and failures
291 if options["states_count"]:
292 for state_data in ret["return"].values():
293 if state_data["result"] is False:
294 counts["failed"] += 1
295 else:
296 counts["succeeded"] += 1
297
298 # Ensure the index exists
299 _ensure_index(index)
300
301 # Build the payload
302 class UTC(tzinfo):
303 def utcoffset(self, dt):
304 return timedelta(0)
305
306 def tzname(self, dt):
307 return "UTC"
308
309 def dst(self, dt):
310 return timedelta(0)
311
312 utc = UTC()
313 data = {
314 "@timestamp": datetime.datetime.now(utc).isoformat(),
315 "success": job_success,
316 "retcode": job_retcode,
317 "minion": ret["id"],
318 "fun": job_fun,
319 "jid": job_id,
320 "counts": counts,
321 "data": _convert_keys(ret["return"]),
322 }
323
324 if options["debug_returner_payload"]:
325 log.debug("elasicsearch payload: %s", data)
326
327 # Post the payload
328 ret = __salt__["elasticsearch.document_create"](
329 index=index, doc_type=options["doc_type"], body=salt.utils.json.dumps(data)
330 )
331
332
333 def event_return(events):
334 """
335 Return events to Elasticsearch
336
337 Requires that the `event_return` configuration be set in master config.
338 """
339 options = _get_options()
340
341 index = options["master_event_index"]
342 doc_type = options["master_event_doc_type"]
343
344 if options["index_date"]:
345 index = "{}-{}".format(index, datetime.date.today().strftime("%Y.%m.%d"))
346
347 _ensure_index(index)
348
349 for event in events:
350 data = {"tag": event.get("tag", ""), "data": event.get("data", "")}
351
352 ret = __salt__["elasticsearch.document_create"](
353 index=index,
354 doc_type=doc_type,
355 id=uuid.uuid4(),
356 body=salt.utils.json.dumps(data),
357 )
358
359
360 def prep_jid(nocache=False, passed_jid=None): # pylint: disable=unused-argument
361 """
362 Do any work necessary to prepare a JID, including sending a custom id
363 """
364 return passed_jid if passed_jid is not None else salt.utils.jid.gen_jid(__opts__)
365
366
367 def save_load(jid, load, minions=None):
368 """
369 Save the load to the specified jid id
370
371 .. versionadded:: 2015.8.1
372 """
373 options = _get_options()
374
375 index = options["master_job_cache_index"]
376 doc_type = options["master_job_cache_doc_type"]
377
378 _ensure_index(index)
379
380 data = {
381 "jid": jid,
382 "load": load,
383 }
384
385 ret = __salt__["elasticsearch.document_create"](
386 index=index, doc_type=doc_type, id=jid, body=salt.utils.json.dumps(data)
387 )
388
389
390 def get_load(jid):
391 """
392 Return the load data that marks a specified jid
393
394 .. versionadded:: 2015.8.1
395 """
396 options = _get_options()
397
398 index = options["master_job_cache_index"]
399 doc_type = options["master_job_cache_doc_type"]
400
401 data = __salt__["elasticsearch.document_get"](
402 index=index, id=jid, doc_type=doc_type
403 )
404 if data:
405 return salt.utils.json.loads(data)
406 return {}