"Fossies" - the Fresh Open Source Software Archive 
Member "salt-3002.2/salt/pillar/__init__.py" (18 Nov 2020, 49735 Bytes) of package /linux/misc/salt-3002.2.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "__init__.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
3002.1_vs_3002.2.
1 """
2 Render the pillar data
3 """
4
5
6 import collections
7 import copy
8 import fnmatch
9 import inspect
10 import logging
11 import os
12 import sys
13 import traceback
14
15 import salt.ext.tornado.gen
16 import salt.fileclient
17 import salt.loader
18 import salt.minion
19 import salt.transport.client
20 import salt.utils.args
21 import salt.utils.cache
22 import salt.utils.crypt
23 import salt.utils.data
24 import salt.utils.dictupdate
25 import salt.utils.url
26 from salt.exceptions import SaltClientError
27 from salt.ext import six
28 from salt.template import compile_template
29
30 # Even though dictupdate is imported, invoking salt.utils.dictupdate.merge here
31 # causes an UnboundLocalError. This should be investigated and fixed, but until
32 # then, leave the import directly below this comment intact.
33 from salt.utils.dictupdate import merge
34 from salt.utils.odict import OrderedDict
35 from salt.version import __version__
36
37 log = logging.getLogger(__name__)
38
39
40 def get_pillar(
41 opts,
42 grains,
43 minion_id,
44 saltenv=None,
45 ext=None,
46 funcs=None,
47 pillar_override=None,
48 pillarenv=None,
49 extra_minion_data=None,
50 ):
51 """
52 Return the correct pillar driver based on the file_client option
53 """
54 # When file_client is 'local' this makes the minion masterless
55 # but sometimes we want the minion to read its files from the local
56 # filesystem instead of asking for them from the master, but still
57 # get commands from the master.
58 # To enable this functionality set file_client=local and
59 # use_master_when_local=True in the minion config. Then here we override
60 # the file client to be 'remote' for getting pillar. If we don't do this
61 # then the minion never sends the event that the master uses to update
62 # its minion_data_cache. If the master doesn't update the minion_data_cache
63 # then the SSE salt-master plugin won't see any grains for those minions.
64 file_client = opts["file_client"]
65 if opts.get("master_type") == "disable" and file_client == "remote":
66 file_client = "local"
67 elif file_client == "local" and opts.get("use_master_when_local"):
68 file_client = "remote"
69
70 ptype = {"remote": RemotePillar, "local": Pillar}.get(file_client, Pillar)
71 # If local pillar and we're caching, run through the cache system first
72 log.debug("Determining pillar cache")
73 if opts["pillar_cache"]:
74 log.debug("get_pillar using pillar cache with ext: %s", ext)
75 return PillarCache(
76 opts,
77 grains,
78 minion_id,
79 saltenv,
80 ext=ext,
81 functions=funcs,
82 pillar_override=pillar_override,
83 pillarenv=pillarenv,
84 )
85 return ptype(
86 opts,
87 grains,
88 minion_id,
89 saltenv,
90 ext,
91 functions=funcs,
92 pillar_override=pillar_override,
93 pillarenv=pillarenv,
94 extra_minion_data=extra_minion_data,
95 )
96
97
98 # TODO: migrate everyone to this one!
99 def get_async_pillar(
100 opts,
101 grains,
102 minion_id,
103 saltenv=None,
104 ext=None,
105 funcs=None,
106 pillar_override=None,
107 pillarenv=None,
108 extra_minion_data=None,
109 ):
110 """
111 Return the correct pillar driver based on the file_client option
112 """
113 file_client = opts["file_client"]
114 if opts.get("master_type") == "disable" and file_client == "remote":
115 file_client = "local"
116 elif file_client == "local" and opts.get("use_master_when_local"):
117 file_client = "remote"
118 ptype = {"remote": AsyncRemotePillar, "local": AsyncPillar}.get(
119 file_client, AsyncPillar
120 )
121 return ptype(
122 opts,
123 grains,
124 minion_id,
125 saltenv,
126 ext,
127 functions=funcs,
128 pillar_override=pillar_override,
129 pillarenv=pillarenv,
130 extra_minion_data=extra_minion_data,
131 )
132
133
134 class RemotePillarMixin:
135 """
136 Common remote pillar functionality
137 """
138
139 def get_ext_pillar_extra_minion_data(self, opts):
140 """
141 Returns the extra data from the minion's opts dict (the config file).
142
143 This data will be passed to external pillar functions.
144 """
145
146 def get_subconfig(opts_key):
147 """
148 Returns a dict containing the opts key subtree, while maintaining
149 the opts structure
150 """
151 ret_dict = aux_dict = {}
152 config_val = opts
153 subkeys = opts_key.split(":")
154 # Build an empty dict with the opts path
155 for subkey in subkeys[:-1]:
156 aux_dict[subkey] = {}
157 aux_dict = aux_dict[subkey]
158 if not config_val.get(subkey):
159 # The subkey is not in the config
160 return {}
161 config_val = config_val[subkey]
162 if subkeys[-1] not in config_val:
163 return {}
164 aux_dict[subkeys[-1]] = config_val[subkeys[-1]]
165 return ret_dict
166
167 extra_data = {}
168 if "pass_to_ext_pillars" in opts:
169 if not isinstance(opts["pass_to_ext_pillars"], list):
170 log.exception("'pass_to_ext_pillars' config is malformed.")
171 raise SaltClientError("'pass_to_ext_pillars' config is " "malformed.")
172 for key in opts["pass_to_ext_pillars"]:
173 salt.utils.dictupdate.update(
174 extra_data,
175 get_subconfig(key),
176 recursive_update=True,
177 merge_lists=True,
178 )
179 log.trace("ext_pillar_extra_data = %s", extra_data)
180 return extra_data
181
182
183 class AsyncRemotePillar(RemotePillarMixin):
184 """
185 Get the pillar from the master
186 """
187
188 def __init__(
189 self,
190 opts,
191 grains,
192 minion_id,
193 saltenv,
194 ext=None,
195 functions=None,
196 pillar_override=None,
197 pillarenv=None,
198 extra_minion_data=None,
199 ):
200 self.opts = opts
201 self.opts["saltenv"] = saltenv
202 self.ext = ext
203 self.grains = grains
204 self.minion_id = minion_id
205 self.channel = salt.transport.client.AsyncReqChannel.factory(opts)
206 if pillarenv is not None:
207 self.opts["pillarenv"] = pillarenv
208 self.pillar_override = pillar_override or {}
209 if not isinstance(self.pillar_override, dict):
210 self.pillar_override = {}
211 log.error("Pillar data must be a dictionary")
212 self.extra_minion_data = extra_minion_data or {}
213 if not isinstance(self.extra_minion_data, dict):
214 self.extra_minion_data = {}
215 log.error("Extra minion data must be a dictionary")
216 salt.utils.dictupdate.update(
217 self.extra_minion_data,
218 self.get_ext_pillar_extra_minion_data(opts),
219 recursive_update=True,
220 merge_lists=True,
221 )
222 self._closing = False
223
224 @salt.ext.tornado.gen.coroutine
225 def compile_pillar(self):
226 """
227 Return a future which will contain the pillar data from the master
228 """
229 load = {
230 "id": self.minion_id,
231 "grains": self.grains,
232 "saltenv": self.opts["saltenv"],
233 "pillarenv": self.opts["pillarenv"],
234 "pillar_override": self.pillar_override,
235 "extra_minion_data": self.extra_minion_data,
236 "ver": "2",
237 "cmd": "_pillar",
238 }
239 if self.ext:
240 load["ext"] = self.ext
241 try:
242 ret_pillar = yield self.channel.crypted_transfer_decode_dictentry(
243 load, dictkey="pillar",
244 )
245 except Exception: # pylint: disable=broad-except
246 log.exception("Exception getting pillar:")
247 raise SaltClientError("Exception getting pillar.")
248
249 if not isinstance(ret_pillar, dict):
250 msg = (
251 "Got a bad pillar from master, type {}, expecting dict: " "{}"
252 ).format(type(ret_pillar).__name__, ret_pillar)
253 log.error(msg)
254 # raise an exception! Pillar isn't empty, we can't sync it!
255 raise SaltClientError(msg)
256 raise salt.ext.tornado.gen.Return(ret_pillar)
257
258 def destroy(self):
259 if self._closing:
260 return
261
262 self._closing = True
263 self.channel.close()
264
265 # pylint: disable=W1701
266 def __del__(self):
267 self.destroy()
268
269 # pylint: enable=W1701
270
271
272 class RemotePillar(RemotePillarMixin):
273 """
274 Get the pillar from the master
275 """
276
277 def __init__(
278 self,
279 opts,
280 grains,
281 minion_id,
282 saltenv,
283 ext=None,
284 functions=None,
285 pillar_override=None,
286 pillarenv=None,
287 extra_minion_data=None,
288 ):
289 self.opts = opts
290 self.opts["saltenv"] = saltenv
291 self.ext = ext
292 self.grains = grains
293 self.minion_id = minion_id
294 self.channel = salt.transport.client.ReqChannel.factory(opts)
295 if pillarenv is not None:
296 self.opts["pillarenv"] = pillarenv
297 self.pillar_override = pillar_override or {}
298 if not isinstance(self.pillar_override, dict):
299 self.pillar_override = {}
300 log.error("Pillar data must be a dictionary")
301 self.extra_minion_data = extra_minion_data or {}
302 if not isinstance(self.extra_minion_data, dict):
303 self.extra_minion_data = {}
304 log.error("Extra minion data must be a dictionary")
305 salt.utils.dictupdate.update(
306 self.extra_minion_data,
307 self.get_ext_pillar_extra_minion_data(opts),
308 recursive_update=True,
309 merge_lists=True,
310 )
311 self._closing = False
312
313 def compile_pillar(self):
314 """
315 Return the pillar data from the master
316 """
317 load = {
318 "id": self.minion_id,
319 "grains": self.grains,
320 "saltenv": self.opts["saltenv"],
321 "pillarenv": self.opts["pillarenv"],
322 "pillar_override": self.pillar_override,
323 "extra_minion_data": self.extra_minion_data,
324 "ver": "2",
325 "cmd": "_pillar",
326 }
327 if self.ext:
328 load["ext"] = self.ext
329 ret_pillar = self.channel.crypted_transfer_decode_dictentry(
330 load, dictkey="pillar",
331 )
332
333 if not isinstance(ret_pillar, dict):
334 log.error(
335 "Got a bad pillar from master, type %s, expecting dict: %s",
336 type(ret_pillar).__name__,
337 ret_pillar,
338 )
339 return {}
340 return ret_pillar
341
342 def destroy(self):
343 if hasattr(self, "_closing") and self._closing:
344 return
345
346 self._closing = True
347 self.channel.close()
348
349 # pylint: disable=W1701
350 def __del__(self):
351 self.destroy()
352
353 # pylint: enable=W1701
354
355
356 class PillarCache:
357 """
358 Return a cached pillar if it exists, otherwise cache it.
359
360 Pillar caches are structed in two diminensions: minion_id with a dict of
361 saltenvs. Each saltenv contains a pillar dict
362
363 Example data structure:
364
365 ```
366 {'minion_1':
367 {'base': {'pilar_key_1' 'pillar_val_1'}
368 }
369 """
370
371 # TODO ABC?
372 def __init__(
373 self,
374 opts,
375 grains,
376 minion_id,
377 saltenv,
378 ext=None,
379 functions=None,
380 pillar_override=None,
381 pillarenv=None,
382 extra_minion_data=None,
383 ):
384 # Yes, we need all of these because we need to route to the Pillar object
385 # if we have no cache. This is another refactor target.
386
387 # Go ahead and assign these because they may be needed later
388 self.opts = opts
389 self.grains = grains
390 self.minion_id = minion_id
391 self.ext = ext
392 self.functions = functions
393 self.pillar_override = pillar_override
394 self.pillarenv = pillarenv
395
396 if saltenv is None:
397 self.saltenv = "base"
398 else:
399 self.saltenv = saltenv
400
401 # Determine caching backend
402 self.cache = salt.utils.cache.CacheFactory.factory(
403 self.opts["pillar_cache_backend"],
404 self.opts["pillar_cache_ttl"],
405 minion_cache_path=self._minion_cache_path(minion_id),
406 )
407
408 def _minion_cache_path(self, minion_id):
409 """
410 Return the path to the cache file for the minion.
411
412 Used only for disk-based backends
413 """
414 return os.path.join(self.opts["cachedir"], "pillar_cache", minion_id)
415
416 def fetch_pillar(self):
417 """
418 In the event of a cache miss, we need to incur the overhead of caching
419 a new pillar.
420 """
421 log.debug("Pillar cache getting external pillar with ext: %s", self.ext)
422 fresh_pillar = Pillar(
423 self.opts,
424 self.grains,
425 self.minion_id,
426 self.saltenv,
427 ext=self.ext,
428 functions=self.functions,
429 pillar_override=self.pillar_override,
430 pillarenv=self.pillarenv,
431 )
432 return fresh_pillar.compile_pillar()
433
434 def compile_pillar(self, *args, **kwargs): # Will likely just be pillar_dirs
435 log.debug(
436 "Scanning pillar cache for information about minion %s and pillarenv %s",
437 self.minion_id,
438 self.pillarenv,
439 )
440 if self.opts["pillar_cache_backend"] == "memory":
441 cache_dict = self.cache
442 else:
443 cache_dict = self.cache._dict
444
445 log.debug("Scanning cache: %s", cache_dict)
446 # Check the cache!
447 if self.minion_id in self.cache: # Keyed by minion_id
448 # TODO Compare grains, etc?
449 if self.pillarenv in self.cache[self.minion_id]:
450 # We have a cache hit! Send it back.
451 log.debug(
452 "Pillar cache hit for minion %s and pillarenv %s",
453 self.minion_id,
454 self.pillarenv,
455 )
456 return self.cache[self.minion_id][self.pillarenv]
457 else:
458 # We found the minion but not the env. Store it.
459 fresh_pillar = self.fetch_pillar()
460
461 minion_cache = self.cache[self.minion_id]
462 minion_cache[self.pillarenv] = fresh_pillar
463 self.cache[self.minion_id] = minion_cache
464
465 log.debug(
466 "Pillar cache miss for pillarenv %s for minion %s",
467 self.pillarenv,
468 self.minion_id,
469 )
470 return fresh_pillar
471 else:
472 # We haven't seen this minion yet in the cache. Store it.
473 fresh_pillar = self.fetch_pillar()
474 self.cache[self.minion_id] = {self.pillarenv: fresh_pillar}
475 log.debug("Pillar cache miss for minion %s", self.minion_id)
476 log.debug("Current pillar cache: %s", cache_dict) # FIXME hack!
477 return fresh_pillar
478
479
480 class Pillar:
481 """
482 Read over the pillar top files and render the pillar data
483 """
484
485 def __init__(
486 self,
487 opts,
488 grains,
489 minion_id,
490 saltenv,
491 ext=None,
492 functions=None,
493 pillar_override=None,
494 pillarenv=None,
495 extra_minion_data=None,
496 ):
497 self.minion_id = minion_id
498 self.ext = ext
499 if pillarenv is None:
500 if opts.get("pillarenv_from_saltenv", False):
501 opts["pillarenv"] = saltenv
502 # use the local file client
503 self.opts = self.__gen_opts(opts, grains, saltenv=saltenv, pillarenv=pillarenv)
504 self.saltenv = saltenv
505 self.client = salt.fileclient.get_file_client(self.opts, True)
506 self.avail = self.__gather_avail()
507
508 if opts.get("file_client", "") == "local" and not opts.get(
509 "use_master_when_local", False
510 ):
511 opts["grains"] = grains
512
513 # if we didn't pass in functions, lets load them
514 if functions is None:
515 utils = salt.loader.utils(opts)
516 if opts.get("file_client", "") == "local":
517 self.functions = salt.loader.minion_mods(opts, utils=utils)
518 else:
519 self.functions = salt.loader.minion_mods(self.opts, utils=utils)
520 else:
521 self.functions = functions
522
523 self.opts["minion_id"] = minion_id
524 self.matchers = salt.loader.matchers(self.opts)
525 self.rend = salt.loader.render(self.opts, self.functions)
526 ext_pillar_opts = copy.deepcopy(self.opts)
527 # Keep the incoming opts ID intact, ie, the master id
528 if "id" in opts:
529 ext_pillar_opts["id"] = opts["id"]
530 self.merge_strategy = "smart"
531 if opts.get("pillar_source_merging_strategy"):
532 self.merge_strategy = opts["pillar_source_merging_strategy"]
533
534 self.ext_pillars = salt.loader.pillars(ext_pillar_opts, self.functions)
535 self.ignored_pillars = {}
536 self.pillar_override = pillar_override or {}
537 if not isinstance(self.pillar_override, dict):
538 self.pillar_override = {}
539 log.error("Pillar data must be a dictionary")
540 self.extra_minion_data = extra_minion_data or {}
541 if not isinstance(self.extra_minion_data, dict):
542 self.extra_minion_data = {}
543 log.error("Extra minion data must be a dictionary")
544 self._closing = False
545
546 def __valid_on_demand_ext_pillar(self, opts):
547 """
548 Check to see if the on demand external pillar is allowed
549 """
550 if not isinstance(self.ext, dict):
551 log.error("On-demand pillar %s is not formatted as a dictionary", self.ext)
552 return False
553
554 on_demand = opts.get("on_demand_ext_pillar", [])
555 try:
556 invalid_on_demand = {x for x in self.ext if x not in on_demand}
557 except TypeError:
558 # Prevent traceback when on_demand_ext_pillar option is malformed
559 log.error(
560 "The 'on_demand_ext_pillar' configuration option is "
561 "malformed, it should be a list of ext_pillar module names"
562 )
563 return False
564
565 if invalid_on_demand:
566 log.error(
567 "The following ext_pillar modules are not allowed for "
568 "on-demand pillar data: %s. Valid on-demand ext_pillar "
569 "modules are: %s. The valid modules can be adjusted by "
570 "setting the 'on_demand_ext_pillar' config option.",
571 ", ".join(sorted(invalid_on_demand)),
572 ", ".join(on_demand),
573 )
574 return False
575 return True
576
577 def __gather_avail(self):
578 """
579 Gather the lists of available sls data from the master
580 """
581 avail = {}
582 for saltenv in self._get_envs():
583 avail[saltenv] = self.client.list_states(saltenv)
584 return avail
585
586 def __gen_opts(self, opts_in, grains, saltenv=None, ext=None, pillarenv=None):
587 """
588 The options need to be altered to conform to the file client
589 """
590 opts = copy.deepcopy(opts_in)
591 opts["file_client"] = "local"
592 if not grains:
593 opts["grains"] = {}
594 else:
595 opts["grains"] = grains
596 # Allow minion/CLI saltenv/pillarenv to take precedence over master
597 opts["saltenv"] = saltenv if saltenv is not None else opts.get("saltenv")
598 opts["pillarenv"] = (
599 pillarenv if pillarenv is not None else opts.get("pillarenv")
600 )
601 opts["id"] = self.minion_id
602 if opts["state_top"].startswith("salt://"):
603 opts["state_top"] = opts["state_top"]
604 elif opts["state_top"].startswith("/"):
605 opts["state_top"] = salt.utils.url.create(opts["state_top"][1:])
606 else:
607 opts["state_top"] = salt.utils.url.create(opts["state_top"])
608 if self.ext and self.__valid_on_demand_ext_pillar(opts):
609 if "ext_pillar" in opts:
610 opts["ext_pillar"].append(self.ext)
611 else:
612 opts["ext_pillar"] = [self.ext]
613 if "__env__" in opts["pillar_roots"]:
614 env = opts.get("pillarenv") or opts.get("saltenv") or "base"
615 if env not in opts["pillar_roots"]:
616 log.debug(
617 "pillar environment '%s' maps to __env__ pillar_roots directory",
618 env,
619 )
620 opts["pillar_roots"][env] = opts["pillar_roots"].pop("__env__")
621 else:
622 log.debug(
623 "pillar_roots __env__ ignored (environment '%s' found in pillar_roots)",
624 env,
625 )
626 opts["pillar_roots"].pop("__env__")
627 return opts
628
629 def _get_envs(self):
630 """
631 Pull the file server environments out of the master options
632 """
633 envs = {"base"}
634 if "pillar_roots" in self.opts:
635 envs.update(list(self.opts["pillar_roots"]))
636 return envs
637
638 def get_tops(self):
639 """
640 Gather the top files
641 """
642 tops = collections.defaultdict(list)
643 include = collections.defaultdict(list)
644 done = collections.defaultdict(list)
645 errors = []
646 # Gather initial top files
647 try:
648 saltenvs = set()
649 if self.opts["pillarenv"]:
650 # If the specified pillarenv is not present in the available
651 # pillar environments, do not cache the pillar top file.
652 if self.opts["pillarenv"] not in self.opts["pillar_roots"]:
653 log.debug(
654 "pillarenv '%s' not found in the configured pillar "
655 "environments (%s)",
656 self.opts["pillarenv"],
657 ", ".join(self.opts["pillar_roots"]),
658 )
659 else:
660 saltenvs.add(self.opts["pillarenv"])
661 else:
662 saltenvs = self._get_envs()
663 if self.opts.get("pillar_source_merging_strategy", None) == "none":
664 saltenvs &= {self.saltenv or "base"}
665
666 for saltenv in saltenvs:
667 top = self.client.cache_file(self.opts["state_top"], saltenv)
668 if top:
669 tops[saltenv].append(
670 compile_template(
671 top,
672 self.rend,
673 self.opts["renderer"],
674 self.opts["renderer_blacklist"],
675 self.opts["renderer_whitelist"],
676 saltenv=saltenv,
677 _pillar_rend=True,
678 )
679 )
680 except Exception as exc: # pylint: disable=broad-except
681 errors.append(
682 "Rendering Primary Top file failed, render error:\n{}".format(exc)
683 )
684 log.exception("Pillar rendering failed for minion %s", self.minion_id)
685
686 # Search initial top files for includes
687 for saltenv, ctops in tops.items():
688 for ctop in ctops:
689 if "include" not in ctop:
690 continue
691 for sls in ctop["include"]:
692 include[saltenv].append(sls)
693 ctop.pop("include")
694 # Go through the includes and pull out the extra tops and add them
695 while include:
696 pops = []
697 for saltenv, states in include.items():
698 pops.append(saltenv)
699 if not states:
700 continue
701 for sls in states:
702 if sls in done[saltenv]:
703 continue
704 try:
705 tops[saltenv].append(
706 compile_template(
707 self.client.get_state(sls, saltenv).get("dest", False),
708 self.rend,
709 self.opts["renderer"],
710 self.opts["renderer_blacklist"],
711 self.opts["renderer_whitelist"],
712 saltenv=saltenv,
713 _pillar_rend=True,
714 )
715 )
716 except Exception as exc: # pylint: disable=broad-except
717 errors.append(
718 (
719 "Rendering Top file {} failed, render error" ":\n{}"
720 ).format(sls, exc)
721 )
722 done[saltenv].append(sls)
723 for saltenv in pops:
724 if saltenv in include:
725 include.pop(saltenv)
726
727 return tops, errors
728
729 def merge_tops(self, tops):
730 """
731 Cleanly merge the top files
732 """
733 top = collections.defaultdict(OrderedDict)
734 orders = collections.defaultdict(OrderedDict)
735 for ctops in tops.values():
736 for ctop in ctops:
737 for saltenv, targets in ctop.items():
738 if saltenv == "include":
739 continue
740 for tgt in targets:
741 matches = []
742 states = OrderedDict()
743 orders[saltenv][tgt] = 0
744 ignore_missing = False
745 for comp in ctop[saltenv][tgt]:
746 if isinstance(comp, dict):
747 if "match" in comp:
748 matches.append(comp)
749 if "order" in comp:
750 order = comp["order"]
751 if not isinstance(order, int):
752 try:
753 order = int(order)
754 except ValueError:
755 order = 0
756 orders[saltenv][tgt] = order
757 if comp.get("ignore_missing", False):
758 ignore_missing = True
759 if isinstance(comp, str):
760 states[comp] = True
761 if ignore_missing:
762 if saltenv not in self.ignored_pillars:
763 self.ignored_pillars[saltenv] = []
764 self.ignored_pillars[saltenv].extend(states.keys())
765 top[saltenv][tgt] = matches
766 top[saltenv][tgt].extend(states)
767 return self.sort_top_targets(top, orders)
768
769 def sort_top_targets(self, top, orders):
770 """
771 Returns the sorted high data from the merged top files
772 """
773 sorted_top = collections.defaultdict(OrderedDict)
774 # pylint: disable=cell-var-from-loop
775 for saltenv, targets in top.items():
776 sorted_targets = sorted(targets, key=lambda target: orders[saltenv][target])
777 for target in sorted_targets:
778 sorted_top[saltenv][target] = targets[target]
779 # pylint: enable=cell-var-from-loop
780 return sorted_top
781
782 def get_top(self):
783 """
784 Returns the high data derived from the top file
785 """
786 tops, errors = self.get_tops()
787 try:
788 merged_tops = self.merge_tops(tops)
789 except TypeError as err:
790 merged_tops = OrderedDict()
791 errors.append("Error encountered while rendering pillar top file.")
792 return merged_tops, errors
793
794 def top_matches(self, top, reload=False):
795 """
796 Search through the top high data for matches and return the states
797 that this minion needs to execute.
798
799 Returns:
800 {'saltenv': ['state1', 'state2', ...]}
801
802 reload
803 Reload the matcher loader
804 """
805 matches = {}
806 if reload:
807 self.matchers = salt.loader.matchers(self.opts)
808 for saltenv, body in top.items():
809 if self.opts["pillarenv"]:
810 if saltenv != self.opts["pillarenv"]:
811 continue
812 for match, data in body.items():
813 if self.matchers["confirm_top.confirm_top"](
814 match, data, self.opts.get("nodegroups", {}),
815 ):
816 if saltenv not in matches:
817 matches[saltenv] = env_matches = []
818 else:
819 env_matches = matches[saltenv]
820 for item in data:
821 if isinstance(item, str) and item not in env_matches:
822 env_matches.append(item)
823 return matches
824
825 def render_pstate(self, sls, saltenv, mods, defaults=None):
826 """
827 Collect a single pillar sls file and render it
828 """
829 if defaults is None:
830 defaults = {}
831 err = ""
832 errors = []
833 state_data = self.client.get_state(sls, saltenv)
834 fn_ = state_data.get("dest", False)
835 if not fn_:
836 if sls in self.ignored_pillars.get(saltenv, []):
837 log.debug(
838 "Skipping ignored and missing SLS '%s' in " "environment '%s'",
839 sls,
840 saltenv,
841 )
842 return None, mods, errors
843 elif self.opts["pillar_roots"].get(saltenv):
844 msg = (
845 "Specified SLS '{}' in environment '{}' is not"
846 " available on the salt master"
847 ).format(sls, saltenv)
848 log.error(msg)
849 errors.append(msg)
850 else:
851 msg = (
852 "Specified SLS '{}' in environment '{}' was not "
853 "found. ".format(sls, saltenv)
854 )
855 if self.opts.get("__git_pillar", False) is True:
856 msg += (
857 "This is likely caused by a git_pillar top file "
858 "containing an environment other than the one for the "
859 "branch in which it resides. Each git_pillar "
860 "branch/tag must have its own top file."
861 )
862 else:
863 msg += (
864 "This could be because SLS '{0}' is in an "
865 "environment other than '{1}', but '{1}' is "
866 "included in that environment's Pillar top file. It "
867 "could also be due to environment '{1}' not being "
868 "defined in 'pillar_roots'.".format(sls, saltenv)
869 )
870 log.debug(msg)
871 # return state, mods, errors
872 return None, mods, errors
873 state = None
874 try:
875 state = compile_template(
876 fn_,
877 self.rend,
878 self.opts["renderer"],
879 self.opts["renderer_blacklist"],
880 self.opts["renderer_whitelist"],
881 saltenv,
882 sls,
883 _pillar_rend=True,
884 **defaults
885 )
886 except Exception as exc: # pylint: disable=broad-except
887 msg = "Rendering SLS '{}' failed, render error:\n{}".format(sls, exc)
888 log.critical(msg, exc_info=True)
889 if self.opts.get("pillar_safe_render_error", True):
890 errors.append(
891 "Rendering SLS '{}' failed. Please see master log for "
892 "details.".format(sls)
893 )
894 else:
895 errors.append(msg)
896 mods.add(sls)
897 nstate = None
898 if state:
899 if not isinstance(state, dict):
900 msg = "SLS '{}' does not render to a dictionary".format(sls)
901 log.error(msg)
902 errors.append(msg)
903 else:
904 if "include" in state:
905 if not isinstance(state["include"], list):
906 msg = (
907 "Include Declaration in SLS '{}' is not "
908 "formed as a list".format(sls)
909 )
910 log.error(msg)
911 errors.append(msg)
912 else:
913 # render included state(s)
914 include_states = []
915 for sub_sls in state.pop("include"):
916 if isinstance(sub_sls, dict):
917 sub_sls, v = next(iter(sub_sls.items()))
918 defaults = v.get("defaults", {})
919 key = v.get("key", None)
920 else:
921 key = None
922 try:
923 matched_pstates = fnmatch.filter(
924 self.avail[saltenv],
925 sub_sls.lstrip(".").replace("/", "."),
926 )
927 if sub_sls.startswith("."):
928 if state_data.get("source", "").endswith(
929 "/init.sls"
930 ):
931 include_parts = sls.split(".")
932 else:
933 include_parts = sls.split(".")[:-1]
934 sub_sls = ".".join(include_parts + [sub_sls[1:]])
935 matches = fnmatch.filter(self.avail[saltenv], sub_sls,)
936 matched_pstates.extend(matches)
937 except KeyError:
938 errors.extend(
939 [
940 "No matching pillar environment for environment "
941 "'{}' found".format(saltenv)
942 ]
943 )
944 matched_pstates = [sub_sls]
945 # If matched_pstates is empty, set to sub_sls
946 if len(matched_pstates) < 1:
947 matched_pstates = [sub_sls]
948 for m_sub_sls in matched_pstates:
949 if m_sub_sls not in mods:
950 nstate, mods, err = self.render_pstate(
951 m_sub_sls, saltenv, mods, defaults
952 )
953 if nstate:
954 if key:
955 # If key is x:y, convert it to {x: {y: nstate}}
956 for key_fragment in reversed(
957 key.split(":")
958 ):
959 nstate = {key_fragment: nstate}
960 if not self.opts.get(
961 "pillar_includes_override_sls", False
962 ):
963 include_states.append(nstate)
964 else:
965 state = merge(
966 state,
967 nstate,
968 self.merge_strategy,
969 self.opts.get("renderer", "yaml"),
970 self.opts.get(
971 "pillar_merge_lists", False
972 ),
973 )
974 if err:
975 errors += err
976 if not self.opts.get("pillar_includes_override_sls", False):
977 # merge included state(s) with the current state
978 # merged last to ensure that its values are
979 # authoritative.
980 include_states.append(state)
981 state = None
982 for s in include_states:
983 if state is None:
984 state = s
985 else:
986 state = merge(
987 state,
988 s,
989 self.merge_strategy,
990 self.opts.get("renderer", "yaml"),
991 self.opts.get("pillar_merge_lists", False),
992 )
993 return state, mods, errors
994
995 def render_pillar(self, matches, errors=None):
996 """
997 Extract the sls pillar files from the matches and render them into the
998 pillar
999 """
1000 pillar = copy.copy(self.pillar_override)
1001 if errors is None:
1002 errors = []
1003 for saltenv, pstates in matches.items():
1004 pstatefiles = []
1005 mods = set()
1006 for sls_match in pstates:
1007 matched_pstates = []
1008 try:
1009 matched_pstates = fnmatch.filter(self.avail[saltenv], sls_match)
1010 except KeyError:
1011 errors.extend(
1012 [
1013 "No matching pillar environment for environment "
1014 "'{}' found".format(saltenv)
1015 ]
1016 )
1017 if matched_pstates:
1018 pstatefiles.extend(matched_pstates)
1019 else:
1020 pstatefiles.append(sls_match)
1021
1022 for sls in pstatefiles:
1023 pstate, mods, err = self.render_pstate(sls, saltenv, mods)
1024
1025 if err:
1026 errors += err
1027
1028 if pstate is not None:
1029 if not isinstance(pstate, dict):
1030 log.error(
1031 "The rendered pillar sls file, '%s' state did "
1032 "not return the expected data format. This is "
1033 "a sign of a malformed pillar sls file. Returned "
1034 "errors: %s",
1035 sls,
1036 ", ".join(["'{}'".format(e) for e in errors]),
1037 )
1038 continue
1039 pillar = merge(
1040 pillar,
1041 pstate,
1042 self.merge_strategy,
1043 self.opts.get("renderer", "yaml"),
1044 self.opts.get("pillar_merge_lists", False),
1045 )
1046
1047 return pillar, errors
1048
1049 def _external_pillar_data(self, pillar, val, key):
1050 """
1051 Builds actual pillar data structure and updates the ``pillar`` variable
1052 """
1053 ext = None
1054 args = salt.utils.args.get_function_argspec(self.ext_pillars[key]).args
1055
1056 if isinstance(val, dict):
1057 if ("extra_minion_data" in args) and self.extra_minion_data:
1058 ext = self.ext_pillars[key](
1059 self.minion_id,
1060 pillar,
1061 extra_minion_data=self.extra_minion_data,
1062 **val
1063 )
1064 else:
1065 ext = self.ext_pillars[key](self.minion_id, pillar, **val)
1066 elif isinstance(val, list):
1067 if ("extra_minion_data" in args) and self.extra_minion_data:
1068 ext = self.ext_pillars[key](
1069 self.minion_id,
1070 pillar,
1071 *val,
1072 extra_minion_data=self.extra_minion_data
1073 )
1074 else:
1075 ext = self.ext_pillars[key](self.minion_id, pillar, *val)
1076 else:
1077 if ("extra_minion_data" in args) and self.extra_minion_data:
1078 ext = self.ext_pillars[key](
1079 self.minion_id,
1080 pillar,
1081 val,
1082 extra_minion_data=self.extra_minion_data,
1083 )
1084 else:
1085 ext = self.ext_pillars[key](self.minion_id, pillar, val)
1086 return ext
1087
1088 def ext_pillar(self, pillar, errors=None):
1089 """
1090 Render the external pillar data
1091 """
1092 if errors is None:
1093 errors = []
1094 try:
1095 # Make sure that on-demand git_pillar is fetched before we try to
1096 # compile the pillar data. git_pillar will fetch a remote when
1097 # the git ext_pillar() func is run, but only for masterless.
1098 if self.ext and "git" in self.ext and self.opts.get("__role") != "minion":
1099 # Avoid circular import
1100 import salt.utils.gitfs
1101 import salt.pillar.git_pillar
1102
1103 git_pillar = salt.utils.gitfs.GitPillar(
1104 self.opts,
1105 self.ext["git"],
1106 per_remote_overrides=salt.pillar.git_pillar.PER_REMOTE_OVERRIDES,
1107 per_remote_only=salt.pillar.git_pillar.PER_REMOTE_ONLY,
1108 global_only=salt.pillar.git_pillar.GLOBAL_ONLY,
1109 )
1110 git_pillar.fetch_remotes()
1111 except TypeError:
1112 # Handle malformed ext_pillar
1113 pass
1114 if "ext_pillar" not in self.opts:
1115 return pillar, errors
1116 if not isinstance(self.opts["ext_pillar"], list):
1117 errors.append('The "ext_pillar" option is malformed')
1118 log.critical(errors[-1])
1119 return pillar, errors
1120 ext = None
1121 # Bring in CLI pillar data
1122 if self.pillar_override:
1123 pillar = merge(
1124 pillar,
1125 self.pillar_override,
1126 self.merge_strategy,
1127 self.opts.get("renderer", "yaml"),
1128 self.opts.get("pillar_merge_lists", False),
1129 )
1130
1131 for run in self.opts["ext_pillar"]:
1132 if not isinstance(run, dict):
1133 errors.append('The "ext_pillar" option is malformed')
1134 log.critical(errors[-1])
1135 return {}, errors
1136 if next(iter(run.keys())) in self.opts.get("exclude_ext_pillar", []):
1137 continue
1138 for key, val in run.items():
1139 if key not in self.ext_pillars:
1140 log.critical(
1141 "Specified ext_pillar interface %s is unavailable", key
1142 )
1143 continue
1144 try:
1145 ext = self._external_pillar_data(pillar, val, key)
1146 except Exception as exc: # pylint: disable=broad-except
1147 errors.append(
1148 "Failed to load ext_pillar {}: {}".format(key, exc.__str__(),)
1149 )
1150 log.error(
1151 "Exception caught loading ext_pillar '%s':\n%s",
1152 key,
1153 "".join(traceback.format_tb(sys.exc_info()[2])),
1154 )
1155 if ext:
1156 pillar = merge(
1157 pillar,
1158 ext,
1159 self.merge_strategy,
1160 self.opts.get("renderer", "yaml"),
1161 self.opts.get("pillar_merge_lists", False),
1162 )
1163 ext = None
1164 return pillar, errors
1165
1166 def compile_pillar(self, ext=True):
1167 """
1168 Render the pillar data and return
1169 """
1170 top, top_errors = self.get_top()
1171 if ext:
1172 if self.opts.get("ext_pillar_first", False):
1173 self.opts["pillar"], errors = self.ext_pillar(self.pillar_override)
1174 self.rend = salt.loader.render(self.opts, self.functions)
1175 matches = self.top_matches(top, reload=True)
1176 pillar, errors = self.render_pillar(matches, errors=errors)
1177 pillar = merge(
1178 self.opts["pillar"],
1179 pillar,
1180 self.merge_strategy,
1181 self.opts.get("renderer", "yaml"),
1182 self.opts.get("pillar_merge_lists", False),
1183 )
1184 else:
1185 matches = self.top_matches(top)
1186 pillar, errors = self.render_pillar(matches)
1187 pillar, errors = self.ext_pillar(pillar, errors=errors)
1188 else:
1189 matches = self.top_matches(top)
1190 pillar, errors = self.render_pillar(matches)
1191 errors.extend(top_errors)
1192 if self.opts.get("pillar_opts", False):
1193 mopts = dict(self.opts)
1194 if "grains" in mopts:
1195 mopts.pop("grains")
1196 mopts["saltversion"] = __version__
1197 pillar["master"] = mopts
1198 if "pillar" in self.opts and self.opts.get("ssh_merge_pillar", False):
1199 pillar = merge(
1200 self.opts["pillar"],
1201 pillar,
1202 self.merge_strategy,
1203 self.opts.get("renderer", "yaml"),
1204 self.opts.get("pillar_merge_lists", False),
1205 )
1206 if errors:
1207 for error in errors:
1208 log.critical("Pillar render error: %s", error)
1209 pillar["_errors"] = errors
1210
1211 if self.pillar_override:
1212 pillar = merge(
1213 pillar,
1214 self.pillar_override,
1215 self.merge_strategy,
1216 self.opts.get("renderer", "yaml"),
1217 self.opts.get("pillar_merge_lists", False),
1218 )
1219
1220 decrypt_errors = self.decrypt_pillar(pillar)
1221 if decrypt_errors:
1222 pillar.setdefault("_errors", []).extend(decrypt_errors)
1223 return pillar
1224
1225 def decrypt_pillar(self, pillar):
1226 """
1227 Decrypt the specified pillar dictionary items, if configured to do so
1228 """
1229 errors = []
1230 if self.opts.get("decrypt_pillar"):
1231 decrypt_pillar = self.opts["decrypt_pillar"]
1232 if not isinstance(decrypt_pillar, dict):
1233 decrypt_pillar = salt.utils.data.repack_dictlist(
1234 self.opts["decrypt_pillar"]
1235 )
1236 if not decrypt_pillar:
1237 errors.append("decrypt_pillar config option is malformed")
1238 for key, rend in decrypt_pillar.items():
1239 ptr = salt.utils.data.traverse_dict(
1240 pillar,
1241 key,
1242 default=None,
1243 delimiter=self.opts["decrypt_pillar_delimiter"],
1244 )
1245 if ptr is None:
1246 log.debug("Pillar key %s not present", key)
1247 continue
1248 try:
1249 hash(ptr)
1250 immutable = True
1251 except TypeError:
1252 immutable = False
1253 try:
1254 ret = salt.utils.crypt.decrypt(
1255 ptr,
1256 rend or self.opts["decrypt_pillar_default"],
1257 renderers=self.rend,
1258 opts=self.opts,
1259 valid_rend=self.opts["decrypt_pillar_renderers"],
1260 )
1261 if immutable:
1262 # Since the key pointed to an immutable type, we need
1263 # to replace it in the pillar dict. First we will find
1264 # the parent, and then we will replace the child key
1265 # with the return data from the renderer.
1266 parent, _, child = key.rpartition(
1267 self.opts["decrypt_pillar_delimiter"]
1268 )
1269 if not parent:
1270 # key is a top-level key, so the pointer to the
1271 # parent is the pillar dict itself.
1272 ptr = pillar
1273 else:
1274 ptr = salt.utils.data.traverse_dict(
1275 pillar,
1276 parent,
1277 default=None,
1278 delimiter=self.opts["decrypt_pillar_delimiter"],
1279 )
1280 if ptr is not None:
1281 ptr[child] = ret
1282 except Exception as exc: # pylint: disable=broad-except
1283 msg = "Failed to decrypt pillar key '{}': {}".format(key, exc)
1284 errors.append(msg)
1285 log.error(msg, exc_info=True)
1286 return errors
1287
1288 def destroy(self):
1289 """
1290 This method exist in order to be API compatible with RemotePillar
1291 """
1292 if self._closing:
1293 return
1294 self._closing = True
1295
1296 # pylint: disable=W1701
1297 def __del__(self):
1298 self.destroy()
1299
1300 # pylint: enable=W1701
1301
1302
1303 # TODO: actually migrate from Pillar to AsyncPillar to allow for futures in
1304 # ext_pillar etc.
1305 class AsyncPillar(Pillar):
1306 @salt.ext.tornado.gen.coroutine
1307 def compile_pillar(self, ext=True):
1308 ret = super().compile_pillar(ext=ext)
1309 raise salt.ext.tornado.gen.Return(ret)