"Fossies" - the Fresh Open Source Software Archive

Member "salt-3002.2/salt/pillar/__init__.py" (18 Nov 2020, 49735 Bytes) of package /linux/misc/salt-3002.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "__init__.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3002.1_vs_3002.2.

    1 """
    2 Render the pillar data
    3 """
    4 
    5 
    6 import collections
    7 import copy
    8 import fnmatch
    9 import inspect
   10 import logging
   11 import os
   12 import sys
   13 import traceback
   14 
   15 import salt.ext.tornado.gen
   16 import salt.fileclient
   17 import salt.loader
   18 import salt.minion
   19 import salt.transport.client
   20 import salt.utils.args
   21 import salt.utils.cache
   22 import salt.utils.crypt
   23 import salt.utils.data
   24 import salt.utils.dictupdate
   25 import salt.utils.url
   26 from salt.exceptions import SaltClientError
   27 from salt.ext import six
   28 from salt.template import compile_template
   29 
   30 # Even though dictupdate is imported, invoking salt.utils.dictupdate.merge here
   31 # causes an UnboundLocalError. This should be investigated and fixed, but until
   32 # then, leave the import directly below this comment intact.
   33 from salt.utils.dictupdate import merge
   34 from salt.utils.odict import OrderedDict
   35 from salt.version import __version__
   36 
   37 log = logging.getLogger(__name__)
   38 
   39 
   40 def get_pillar(
   41     opts,
   42     grains,
   43     minion_id,
   44     saltenv=None,
   45     ext=None,
   46     funcs=None,
   47     pillar_override=None,
   48     pillarenv=None,
   49     extra_minion_data=None,
   50 ):
   51     """
   52     Return the correct pillar driver based on the file_client option
   53     """
   54     # When file_client is 'local' this makes the minion masterless
   55     # but sometimes we want the minion to read its files from the local
   56     # filesystem instead of asking for them from the master, but still
   57     # get commands from the master.
   58     # To enable this functionality set file_client=local and
   59     # use_master_when_local=True in the minion config.  Then here we override
   60     # the file client to be 'remote' for getting pillar.  If we don't do this
   61     # then the minion never sends the event that the master uses to update
   62     # its minion_data_cache.  If the master doesn't update the minion_data_cache
   63     # then the SSE salt-master plugin won't see any grains for those minions.
   64     file_client = opts["file_client"]
   65     if opts.get("master_type") == "disable" and file_client == "remote":
   66         file_client = "local"
   67     elif file_client == "local" and opts.get("use_master_when_local"):
   68         file_client = "remote"
   69 
   70     ptype = {"remote": RemotePillar, "local": Pillar}.get(file_client, Pillar)
   71     # If local pillar and we're caching, run through the cache system first
   72     log.debug("Determining pillar cache")
   73     if opts["pillar_cache"]:
   74         log.debug("get_pillar using pillar cache with ext: %s", ext)
   75         return PillarCache(
   76             opts,
   77             grains,
   78             minion_id,
   79             saltenv,
   80             ext=ext,
   81             functions=funcs,
   82             pillar_override=pillar_override,
   83             pillarenv=pillarenv,
   84         )
   85     return ptype(
   86         opts,
   87         grains,
   88         minion_id,
   89         saltenv,
   90         ext,
   91         functions=funcs,
   92         pillar_override=pillar_override,
   93         pillarenv=pillarenv,
   94         extra_minion_data=extra_minion_data,
   95     )
   96 
   97 
   98 # TODO: migrate everyone to this one!
   99 def get_async_pillar(
  100     opts,
  101     grains,
  102     minion_id,
  103     saltenv=None,
  104     ext=None,
  105     funcs=None,
  106     pillar_override=None,
  107     pillarenv=None,
  108     extra_minion_data=None,
  109 ):
  110     """
  111     Return the correct pillar driver based on the file_client option
  112     """
  113     file_client = opts["file_client"]
  114     if opts.get("master_type") == "disable" and file_client == "remote":
  115         file_client = "local"
  116     elif file_client == "local" and opts.get("use_master_when_local"):
  117         file_client = "remote"
  118     ptype = {"remote": AsyncRemotePillar, "local": AsyncPillar}.get(
  119         file_client, AsyncPillar
  120     )
  121     return ptype(
  122         opts,
  123         grains,
  124         minion_id,
  125         saltenv,
  126         ext,
  127         functions=funcs,
  128         pillar_override=pillar_override,
  129         pillarenv=pillarenv,
  130         extra_minion_data=extra_minion_data,
  131     )
  132 
  133 
  134 class RemotePillarMixin:
  135     """
  136     Common remote pillar functionality
  137     """
  138 
  139     def get_ext_pillar_extra_minion_data(self, opts):
  140         """
  141         Returns the extra data from the minion's opts dict (the config file).
  142 
  143         This data will be passed to external pillar functions.
  144         """
  145 
  146         def get_subconfig(opts_key):
  147             """
  148             Returns a dict containing the opts key subtree, while maintaining
  149             the opts structure
  150             """
  151             ret_dict = aux_dict = {}
  152             config_val = opts
  153             subkeys = opts_key.split(":")
  154             # Build an empty dict with the opts path
  155             for subkey in subkeys[:-1]:
  156                 aux_dict[subkey] = {}
  157                 aux_dict = aux_dict[subkey]
  158                 if not config_val.get(subkey):
  159                     # The subkey is not in the config
  160                     return {}
  161                 config_val = config_val[subkey]
  162             if subkeys[-1] not in config_val:
  163                 return {}
  164             aux_dict[subkeys[-1]] = config_val[subkeys[-1]]
  165             return ret_dict
  166 
  167         extra_data = {}
  168         if "pass_to_ext_pillars" in opts:
  169             if not isinstance(opts["pass_to_ext_pillars"], list):
  170                 log.exception("'pass_to_ext_pillars' config is malformed.")
  171                 raise SaltClientError("'pass_to_ext_pillars' config is " "malformed.")
  172             for key in opts["pass_to_ext_pillars"]:
  173                 salt.utils.dictupdate.update(
  174                     extra_data,
  175                     get_subconfig(key),
  176                     recursive_update=True,
  177                     merge_lists=True,
  178                 )
  179         log.trace("ext_pillar_extra_data = %s", extra_data)
  180         return extra_data
  181 
  182 
  183 class AsyncRemotePillar(RemotePillarMixin):
  184     """
  185     Get the pillar from the master
  186     """
  187 
  188     def __init__(
  189         self,
  190         opts,
  191         grains,
  192         minion_id,
  193         saltenv,
  194         ext=None,
  195         functions=None,
  196         pillar_override=None,
  197         pillarenv=None,
  198         extra_minion_data=None,
  199     ):
  200         self.opts = opts
  201         self.opts["saltenv"] = saltenv
  202         self.ext = ext
  203         self.grains = grains
  204         self.minion_id = minion_id
  205         self.channel = salt.transport.client.AsyncReqChannel.factory(opts)
  206         if pillarenv is not None:
  207             self.opts["pillarenv"] = pillarenv
  208         self.pillar_override = pillar_override or {}
  209         if not isinstance(self.pillar_override, dict):
  210             self.pillar_override = {}
  211             log.error("Pillar data must be a dictionary")
  212         self.extra_minion_data = extra_minion_data or {}
  213         if not isinstance(self.extra_minion_data, dict):
  214             self.extra_minion_data = {}
  215             log.error("Extra minion data must be a dictionary")
  216         salt.utils.dictupdate.update(
  217             self.extra_minion_data,
  218             self.get_ext_pillar_extra_minion_data(opts),
  219             recursive_update=True,
  220             merge_lists=True,
  221         )
  222         self._closing = False
  223 
  224     @salt.ext.tornado.gen.coroutine
  225     def compile_pillar(self):
  226         """
  227         Return a future which will contain the pillar data from the master
  228         """
  229         load = {
  230             "id": self.minion_id,
  231             "grains": self.grains,
  232             "saltenv": self.opts["saltenv"],
  233             "pillarenv": self.opts["pillarenv"],
  234             "pillar_override": self.pillar_override,
  235             "extra_minion_data": self.extra_minion_data,
  236             "ver": "2",
  237             "cmd": "_pillar",
  238         }
  239         if self.ext:
  240             load["ext"] = self.ext
  241         try:
  242             ret_pillar = yield self.channel.crypted_transfer_decode_dictentry(
  243                 load, dictkey="pillar",
  244             )
  245         except Exception:  # pylint: disable=broad-except
  246             log.exception("Exception getting pillar:")
  247             raise SaltClientError("Exception getting pillar.")
  248 
  249         if not isinstance(ret_pillar, dict):
  250             msg = (
  251                 "Got a bad pillar from master, type {}, expecting dict: " "{}"
  252             ).format(type(ret_pillar).__name__, ret_pillar)
  253             log.error(msg)
  254             # raise an exception! Pillar isn't empty, we can't sync it!
  255             raise SaltClientError(msg)
  256         raise salt.ext.tornado.gen.Return(ret_pillar)
  257 
  258     def destroy(self):
  259         if self._closing:
  260             return
  261 
  262         self._closing = True
  263         self.channel.close()
  264 
  265     # pylint: disable=W1701
  266     def __del__(self):
  267         self.destroy()
  268 
  269     # pylint: enable=W1701
  270 
  271 
  272 class RemotePillar(RemotePillarMixin):
  273     """
  274     Get the pillar from the master
  275     """
  276 
  277     def __init__(
  278         self,
  279         opts,
  280         grains,
  281         minion_id,
  282         saltenv,
  283         ext=None,
  284         functions=None,
  285         pillar_override=None,
  286         pillarenv=None,
  287         extra_minion_data=None,
  288     ):
  289         self.opts = opts
  290         self.opts["saltenv"] = saltenv
  291         self.ext = ext
  292         self.grains = grains
  293         self.minion_id = minion_id
  294         self.channel = salt.transport.client.ReqChannel.factory(opts)
  295         if pillarenv is not None:
  296             self.opts["pillarenv"] = pillarenv
  297         self.pillar_override = pillar_override or {}
  298         if not isinstance(self.pillar_override, dict):
  299             self.pillar_override = {}
  300             log.error("Pillar data must be a dictionary")
  301         self.extra_minion_data = extra_minion_data or {}
  302         if not isinstance(self.extra_minion_data, dict):
  303             self.extra_minion_data = {}
  304             log.error("Extra minion data must be a dictionary")
  305         salt.utils.dictupdate.update(
  306             self.extra_minion_data,
  307             self.get_ext_pillar_extra_minion_data(opts),
  308             recursive_update=True,
  309             merge_lists=True,
  310         )
  311         self._closing = False
  312 
  313     def compile_pillar(self):
  314         """
  315         Return the pillar data from the master
  316         """
  317         load = {
  318             "id": self.minion_id,
  319             "grains": self.grains,
  320             "saltenv": self.opts["saltenv"],
  321             "pillarenv": self.opts["pillarenv"],
  322             "pillar_override": self.pillar_override,
  323             "extra_minion_data": self.extra_minion_data,
  324             "ver": "2",
  325             "cmd": "_pillar",
  326         }
  327         if self.ext:
  328             load["ext"] = self.ext
  329         ret_pillar = self.channel.crypted_transfer_decode_dictentry(
  330             load, dictkey="pillar",
  331         )
  332 
  333         if not isinstance(ret_pillar, dict):
  334             log.error(
  335                 "Got a bad pillar from master, type %s, expecting dict: %s",
  336                 type(ret_pillar).__name__,
  337                 ret_pillar,
  338             )
  339             return {}
  340         return ret_pillar
  341 
  342     def destroy(self):
  343         if hasattr(self, "_closing") and self._closing:
  344             return
  345 
  346         self._closing = True
  347         self.channel.close()
  348 
  349     # pylint: disable=W1701
  350     def __del__(self):
  351         self.destroy()
  352 
  353     # pylint: enable=W1701
  354 
  355 
  356 class PillarCache:
  357     """
  358     Return a cached pillar if it exists, otherwise cache it.
  359 
  360     Pillar caches are structed in two diminensions: minion_id with a dict of
  361     saltenvs. Each saltenv contains a pillar dict
  362 
  363     Example data structure:
  364 
  365     ```
  366     {'minion_1':
  367         {'base': {'pilar_key_1' 'pillar_val_1'}
  368     }
  369     """
  370 
  371     # TODO ABC?
  372     def __init__(
  373         self,
  374         opts,
  375         grains,
  376         minion_id,
  377         saltenv,
  378         ext=None,
  379         functions=None,
  380         pillar_override=None,
  381         pillarenv=None,
  382         extra_minion_data=None,
  383     ):
  384         # Yes, we need all of these because we need to route to the Pillar object
  385         # if we have no cache. This is another refactor target.
  386 
  387         # Go ahead and assign these because they may be needed later
  388         self.opts = opts
  389         self.grains = grains
  390         self.minion_id = minion_id
  391         self.ext = ext
  392         self.functions = functions
  393         self.pillar_override = pillar_override
  394         self.pillarenv = pillarenv
  395 
  396         if saltenv is None:
  397             self.saltenv = "base"
  398         else:
  399             self.saltenv = saltenv
  400 
  401         # Determine caching backend
  402         self.cache = salt.utils.cache.CacheFactory.factory(
  403             self.opts["pillar_cache_backend"],
  404             self.opts["pillar_cache_ttl"],
  405             minion_cache_path=self._minion_cache_path(minion_id),
  406         )
  407 
  408     def _minion_cache_path(self, minion_id):
  409         """
  410         Return the path to the cache file for the minion.
  411 
  412         Used only for disk-based backends
  413         """
  414         return os.path.join(self.opts["cachedir"], "pillar_cache", minion_id)
  415 
  416     def fetch_pillar(self):
  417         """
  418         In the event of a cache miss, we need to incur the overhead of caching
  419         a new pillar.
  420         """
  421         log.debug("Pillar cache getting external pillar with ext: %s", self.ext)
  422         fresh_pillar = Pillar(
  423             self.opts,
  424             self.grains,
  425             self.minion_id,
  426             self.saltenv,
  427             ext=self.ext,
  428             functions=self.functions,
  429             pillar_override=self.pillar_override,
  430             pillarenv=self.pillarenv,
  431         )
  432         return fresh_pillar.compile_pillar()
  433 
  434     def compile_pillar(self, *args, **kwargs):  # Will likely just be pillar_dirs
  435         log.debug(
  436             "Scanning pillar cache for information about minion %s and pillarenv %s",
  437             self.minion_id,
  438             self.pillarenv,
  439         )
  440         if self.opts["pillar_cache_backend"] == "memory":
  441             cache_dict = self.cache
  442         else:
  443             cache_dict = self.cache._dict
  444 
  445         log.debug("Scanning cache: %s", cache_dict)
  446         # Check the cache!
  447         if self.minion_id in self.cache:  # Keyed by minion_id
  448             # TODO Compare grains, etc?
  449             if self.pillarenv in self.cache[self.minion_id]:
  450                 # We have a cache hit! Send it back.
  451                 log.debug(
  452                     "Pillar cache hit for minion %s and pillarenv %s",
  453                     self.minion_id,
  454                     self.pillarenv,
  455                 )
  456                 return self.cache[self.minion_id][self.pillarenv]
  457             else:
  458                 # We found the minion but not the env. Store it.
  459                 fresh_pillar = self.fetch_pillar()
  460 
  461                 minion_cache = self.cache[self.minion_id]
  462                 minion_cache[self.pillarenv] = fresh_pillar
  463                 self.cache[self.minion_id] = minion_cache
  464 
  465                 log.debug(
  466                     "Pillar cache miss for pillarenv %s for minion %s",
  467                     self.pillarenv,
  468                     self.minion_id,
  469                 )
  470                 return fresh_pillar
  471         else:
  472             # We haven't seen this minion yet in the cache. Store it.
  473             fresh_pillar = self.fetch_pillar()
  474             self.cache[self.minion_id] = {self.pillarenv: fresh_pillar}
  475             log.debug("Pillar cache miss for minion %s", self.minion_id)
  476             log.debug("Current pillar cache: %s", cache_dict)  # FIXME hack!
  477             return fresh_pillar
  478 
  479 
  480 class Pillar:
  481     """
  482     Read over the pillar top files and render the pillar data
  483     """
  484 
  485     def __init__(
  486         self,
  487         opts,
  488         grains,
  489         minion_id,
  490         saltenv,
  491         ext=None,
  492         functions=None,
  493         pillar_override=None,
  494         pillarenv=None,
  495         extra_minion_data=None,
  496     ):
  497         self.minion_id = minion_id
  498         self.ext = ext
  499         if pillarenv is None:
  500             if opts.get("pillarenv_from_saltenv", False):
  501                 opts["pillarenv"] = saltenv
  502         # use the local file client
  503         self.opts = self.__gen_opts(opts, grains, saltenv=saltenv, pillarenv=pillarenv)
  504         self.saltenv = saltenv
  505         self.client = salt.fileclient.get_file_client(self.opts, True)
  506         self.avail = self.__gather_avail()
  507 
  508         if opts.get("file_client", "") == "local" and not opts.get(
  509             "use_master_when_local", False
  510         ):
  511             opts["grains"] = grains
  512 
  513         # if we didn't pass in functions, lets load them
  514         if functions is None:
  515             utils = salt.loader.utils(opts)
  516             if opts.get("file_client", "") == "local":
  517                 self.functions = salt.loader.minion_mods(opts, utils=utils)
  518             else:
  519                 self.functions = salt.loader.minion_mods(self.opts, utils=utils)
  520         else:
  521             self.functions = functions
  522 
  523         self.opts["minion_id"] = minion_id
  524         self.matchers = salt.loader.matchers(self.opts)
  525         self.rend = salt.loader.render(self.opts, self.functions)
  526         ext_pillar_opts = copy.deepcopy(self.opts)
  527         # Keep the incoming opts ID intact, ie, the master id
  528         if "id" in opts:
  529             ext_pillar_opts["id"] = opts["id"]
  530         self.merge_strategy = "smart"
  531         if opts.get("pillar_source_merging_strategy"):
  532             self.merge_strategy = opts["pillar_source_merging_strategy"]
  533 
  534         self.ext_pillars = salt.loader.pillars(ext_pillar_opts, self.functions)
  535         self.ignored_pillars = {}
  536         self.pillar_override = pillar_override or {}
  537         if not isinstance(self.pillar_override, dict):
  538             self.pillar_override = {}
  539             log.error("Pillar data must be a dictionary")
  540         self.extra_minion_data = extra_minion_data or {}
  541         if not isinstance(self.extra_minion_data, dict):
  542             self.extra_minion_data = {}
  543             log.error("Extra minion data must be a dictionary")
  544         self._closing = False
  545 
  546     def __valid_on_demand_ext_pillar(self, opts):
  547         """
  548         Check to see if the on demand external pillar is allowed
  549         """
  550         if not isinstance(self.ext, dict):
  551             log.error("On-demand pillar %s is not formatted as a dictionary", self.ext)
  552             return False
  553 
  554         on_demand = opts.get("on_demand_ext_pillar", [])
  555         try:
  556             invalid_on_demand = {x for x in self.ext if x not in on_demand}
  557         except TypeError:
  558             # Prevent traceback when on_demand_ext_pillar option is malformed
  559             log.error(
  560                 "The 'on_demand_ext_pillar' configuration option is "
  561                 "malformed, it should be a list of ext_pillar module names"
  562             )
  563             return False
  564 
  565         if invalid_on_demand:
  566             log.error(
  567                 "The following ext_pillar modules are not allowed for "
  568                 "on-demand pillar data: %s. Valid on-demand ext_pillar "
  569                 "modules are: %s. The valid modules can be adjusted by "
  570                 "setting the 'on_demand_ext_pillar' config option.",
  571                 ", ".join(sorted(invalid_on_demand)),
  572                 ", ".join(on_demand),
  573             )
  574             return False
  575         return True
  576 
  577     def __gather_avail(self):
  578         """
  579         Gather the lists of available sls data from the master
  580         """
  581         avail = {}
  582         for saltenv in self._get_envs():
  583             avail[saltenv] = self.client.list_states(saltenv)
  584         return avail
  585 
  586     def __gen_opts(self, opts_in, grains, saltenv=None, ext=None, pillarenv=None):
  587         """
  588         The options need to be altered to conform to the file client
  589         """
  590         opts = copy.deepcopy(opts_in)
  591         opts["file_client"] = "local"
  592         if not grains:
  593             opts["grains"] = {}
  594         else:
  595             opts["grains"] = grains
  596         # Allow minion/CLI saltenv/pillarenv to take precedence over master
  597         opts["saltenv"] = saltenv if saltenv is not None else opts.get("saltenv")
  598         opts["pillarenv"] = (
  599             pillarenv if pillarenv is not None else opts.get("pillarenv")
  600         )
  601         opts["id"] = self.minion_id
  602         if opts["state_top"].startswith("salt://"):
  603             opts["state_top"] = opts["state_top"]
  604         elif opts["state_top"].startswith("/"):
  605             opts["state_top"] = salt.utils.url.create(opts["state_top"][1:])
  606         else:
  607             opts["state_top"] = salt.utils.url.create(opts["state_top"])
  608         if self.ext and self.__valid_on_demand_ext_pillar(opts):
  609             if "ext_pillar" in opts:
  610                 opts["ext_pillar"].append(self.ext)
  611             else:
  612                 opts["ext_pillar"] = [self.ext]
  613         if "__env__" in opts["pillar_roots"]:
  614             env = opts.get("pillarenv") or opts.get("saltenv") or "base"
  615             if env not in opts["pillar_roots"]:
  616                 log.debug(
  617                     "pillar environment '%s' maps to __env__ pillar_roots directory",
  618                     env,
  619                 )
  620                 opts["pillar_roots"][env] = opts["pillar_roots"].pop("__env__")
  621             else:
  622                 log.debug(
  623                     "pillar_roots __env__ ignored (environment '%s' found in pillar_roots)",
  624                     env,
  625                 )
  626                 opts["pillar_roots"].pop("__env__")
  627         return opts
  628 
  629     def _get_envs(self):
  630         """
  631         Pull the file server environments out of the master options
  632         """
  633         envs = {"base"}
  634         if "pillar_roots" in self.opts:
  635             envs.update(list(self.opts["pillar_roots"]))
  636         return envs
  637 
  638     def get_tops(self):
  639         """
  640         Gather the top files
  641         """
  642         tops = collections.defaultdict(list)
  643         include = collections.defaultdict(list)
  644         done = collections.defaultdict(list)
  645         errors = []
  646         # Gather initial top files
  647         try:
  648             saltenvs = set()
  649             if self.opts["pillarenv"]:
  650                 # If the specified pillarenv is not present in the available
  651                 # pillar environments, do not cache the pillar top file.
  652                 if self.opts["pillarenv"] not in self.opts["pillar_roots"]:
  653                     log.debug(
  654                         "pillarenv '%s' not found in the configured pillar "
  655                         "environments (%s)",
  656                         self.opts["pillarenv"],
  657                         ", ".join(self.opts["pillar_roots"]),
  658                     )
  659                 else:
  660                     saltenvs.add(self.opts["pillarenv"])
  661             else:
  662                 saltenvs = self._get_envs()
  663                 if self.opts.get("pillar_source_merging_strategy", None) == "none":
  664                     saltenvs &= {self.saltenv or "base"}
  665 
  666             for saltenv in saltenvs:
  667                 top = self.client.cache_file(self.opts["state_top"], saltenv)
  668                 if top:
  669                     tops[saltenv].append(
  670                         compile_template(
  671                             top,
  672                             self.rend,
  673                             self.opts["renderer"],
  674                             self.opts["renderer_blacklist"],
  675                             self.opts["renderer_whitelist"],
  676                             saltenv=saltenv,
  677                             _pillar_rend=True,
  678                         )
  679                     )
  680         except Exception as exc:  # pylint: disable=broad-except
  681             errors.append(
  682                 "Rendering Primary Top file failed, render error:\n{}".format(exc)
  683             )
  684             log.exception("Pillar rendering failed for minion %s", self.minion_id)
  685 
  686         # Search initial top files for includes
  687         for saltenv, ctops in tops.items():
  688             for ctop in ctops:
  689                 if "include" not in ctop:
  690                     continue
  691                 for sls in ctop["include"]:
  692                     include[saltenv].append(sls)
  693                 ctop.pop("include")
  694         # Go through the includes and pull out the extra tops and add them
  695         while include:
  696             pops = []
  697             for saltenv, states in include.items():
  698                 pops.append(saltenv)
  699                 if not states:
  700                     continue
  701                 for sls in states:
  702                     if sls in done[saltenv]:
  703                         continue
  704                     try:
  705                         tops[saltenv].append(
  706                             compile_template(
  707                                 self.client.get_state(sls, saltenv).get("dest", False),
  708                                 self.rend,
  709                                 self.opts["renderer"],
  710                                 self.opts["renderer_blacklist"],
  711                                 self.opts["renderer_whitelist"],
  712                                 saltenv=saltenv,
  713                                 _pillar_rend=True,
  714                             )
  715                         )
  716                     except Exception as exc:  # pylint: disable=broad-except
  717                         errors.append(
  718                             (
  719                                 "Rendering Top file {} failed, render error" ":\n{}"
  720                             ).format(sls, exc)
  721                         )
  722                     done[saltenv].append(sls)
  723             for saltenv in pops:
  724                 if saltenv in include:
  725                     include.pop(saltenv)
  726 
  727         return tops, errors
  728 
  729     def merge_tops(self, tops):
  730         """
  731         Cleanly merge the top files
  732         """
  733         top = collections.defaultdict(OrderedDict)
  734         orders = collections.defaultdict(OrderedDict)
  735         for ctops in tops.values():
  736             for ctop in ctops:
  737                 for saltenv, targets in ctop.items():
  738                     if saltenv == "include":
  739                         continue
  740                     for tgt in targets:
  741                         matches = []
  742                         states = OrderedDict()
  743                         orders[saltenv][tgt] = 0
  744                         ignore_missing = False
  745                         for comp in ctop[saltenv][tgt]:
  746                             if isinstance(comp, dict):
  747                                 if "match" in comp:
  748                                     matches.append(comp)
  749                                 if "order" in comp:
  750                                     order = comp["order"]
  751                                     if not isinstance(order, int):
  752                                         try:
  753                                             order = int(order)
  754                                         except ValueError:
  755                                             order = 0
  756                                     orders[saltenv][tgt] = order
  757                                 if comp.get("ignore_missing", False):
  758                                     ignore_missing = True
  759                             if isinstance(comp, str):
  760                                 states[comp] = True
  761                         if ignore_missing:
  762                             if saltenv not in self.ignored_pillars:
  763                                 self.ignored_pillars[saltenv] = []
  764                             self.ignored_pillars[saltenv].extend(states.keys())
  765                         top[saltenv][tgt] = matches
  766                         top[saltenv][tgt].extend(states)
  767         return self.sort_top_targets(top, orders)
  768 
  769     def sort_top_targets(self, top, orders):
  770         """
  771         Returns the sorted high data from the merged top files
  772         """
  773         sorted_top = collections.defaultdict(OrderedDict)
  774         # pylint: disable=cell-var-from-loop
  775         for saltenv, targets in top.items():
  776             sorted_targets = sorted(targets, key=lambda target: orders[saltenv][target])
  777             for target in sorted_targets:
  778                 sorted_top[saltenv][target] = targets[target]
  779         # pylint: enable=cell-var-from-loop
  780         return sorted_top
  781 
  782     def get_top(self):
  783         """
  784         Returns the high data derived from the top file
  785         """
  786         tops, errors = self.get_tops()
  787         try:
  788             merged_tops = self.merge_tops(tops)
  789         except TypeError as err:
  790             merged_tops = OrderedDict()
  791             errors.append("Error encountered while rendering pillar top file.")
  792         return merged_tops, errors
  793 
  794     def top_matches(self, top, reload=False):
  795         """
  796         Search through the top high data for matches and return the states
  797         that this minion needs to execute.
  798 
  799         Returns:
  800         {'saltenv': ['state1', 'state2', ...]}
  801 
  802         reload
  803             Reload the matcher loader
  804         """
  805         matches = {}
  806         if reload:
  807             self.matchers = salt.loader.matchers(self.opts)
  808         for saltenv, body in top.items():
  809             if self.opts["pillarenv"]:
  810                 if saltenv != self.opts["pillarenv"]:
  811                     continue
  812             for match, data in body.items():
  813                 if self.matchers["confirm_top.confirm_top"](
  814                     match, data, self.opts.get("nodegroups", {}),
  815                 ):
  816                     if saltenv not in matches:
  817                         matches[saltenv] = env_matches = []
  818                     else:
  819                         env_matches = matches[saltenv]
  820                     for item in data:
  821                         if isinstance(item, str) and item not in env_matches:
  822                             env_matches.append(item)
  823         return matches
  824 
  825     def render_pstate(self, sls, saltenv, mods, defaults=None):
  826         """
  827         Collect a single pillar sls file and render it
  828         """
  829         if defaults is None:
  830             defaults = {}
  831         err = ""
  832         errors = []
  833         state_data = self.client.get_state(sls, saltenv)
  834         fn_ = state_data.get("dest", False)
  835         if not fn_:
  836             if sls in self.ignored_pillars.get(saltenv, []):
  837                 log.debug(
  838                     "Skipping ignored and missing SLS '%s' in " "environment '%s'",
  839                     sls,
  840                     saltenv,
  841                 )
  842                 return None, mods, errors
  843             elif self.opts["pillar_roots"].get(saltenv):
  844                 msg = (
  845                     "Specified SLS '{}' in environment '{}' is not"
  846                     " available on the salt master"
  847                 ).format(sls, saltenv)
  848                 log.error(msg)
  849                 errors.append(msg)
  850             else:
  851                 msg = (
  852                     "Specified SLS '{}' in environment '{}' was not "
  853                     "found. ".format(sls, saltenv)
  854                 )
  855                 if self.opts.get("__git_pillar", False) is True:
  856                     msg += (
  857                         "This is likely caused by a git_pillar top file "
  858                         "containing an environment other than the one for the "
  859                         "branch in which it resides. Each git_pillar "
  860                         "branch/tag must have its own top file."
  861                     )
  862                 else:
  863                     msg += (
  864                         "This could be because SLS '{0}' is in an "
  865                         "environment other than '{1}', but '{1}' is "
  866                         "included in that environment's Pillar top file. It "
  867                         "could also be due to environment '{1}' not being "
  868                         "defined in 'pillar_roots'.".format(sls, saltenv)
  869                     )
  870                 log.debug(msg)
  871                 # return state, mods, errors
  872                 return None, mods, errors
  873         state = None
  874         try:
  875             state = compile_template(
  876                 fn_,
  877                 self.rend,
  878                 self.opts["renderer"],
  879                 self.opts["renderer_blacklist"],
  880                 self.opts["renderer_whitelist"],
  881                 saltenv,
  882                 sls,
  883                 _pillar_rend=True,
  884                 **defaults
  885             )
  886         except Exception as exc:  # pylint: disable=broad-except
  887             msg = "Rendering SLS '{}' failed, render error:\n{}".format(sls, exc)
  888             log.critical(msg, exc_info=True)
  889             if self.opts.get("pillar_safe_render_error", True):
  890                 errors.append(
  891                     "Rendering SLS '{}' failed. Please see master log for "
  892                     "details.".format(sls)
  893                 )
  894             else:
  895                 errors.append(msg)
  896         mods.add(sls)
  897         nstate = None
  898         if state:
  899             if not isinstance(state, dict):
  900                 msg = "SLS '{}' does not render to a dictionary".format(sls)
  901                 log.error(msg)
  902                 errors.append(msg)
  903             else:
  904                 if "include" in state:
  905                     if not isinstance(state["include"], list):
  906                         msg = (
  907                             "Include Declaration in SLS '{}' is not "
  908                             "formed as a list".format(sls)
  909                         )
  910                         log.error(msg)
  911                         errors.append(msg)
  912                     else:
  913                         # render included state(s)
  914                         include_states = []
  915                         for sub_sls in state.pop("include"):
  916                             if isinstance(sub_sls, dict):
  917                                 sub_sls, v = next(iter(sub_sls.items()))
  918                                 defaults = v.get("defaults", {})
  919                                 key = v.get("key", None)
  920                             else:
  921                                 key = None
  922                             try:
  923                                 matched_pstates = fnmatch.filter(
  924                                     self.avail[saltenv],
  925                                     sub_sls.lstrip(".").replace("/", "."),
  926                                 )
  927                                 if sub_sls.startswith("."):
  928                                     if state_data.get("source", "").endswith(
  929                                         "/init.sls"
  930                                     ):
  931                                         include_parts = sls.split(".")
  932                                     else:
  933                                         include_parts = sls.split(".")[:-1]
  934                                     sub_sls = ".".join(include_parts + [sub_sls[1:]])
  935                                 matches = fnmatch.filter(self.avail[saltenv], sub_sls,)
  936                                 matched_pstates.extend(matches)
  937                             except KeyError:
  938                                 errors.extend(
  939                                     [
  940                                         "No matching pillar environment for environment "
  941                                         "'{}' found".format(saltenv)
  942                                     ]
  943                                 )
  944                                 matched_pstates = [sub_sls]
  945                             # If matched_pstates is empty, set to sub_sls
  946                             if len(matched_pstates) < 1:
  947                                 matched_pstates = [sub_sls]
  948                             for m_sub_sls in matched_pstates:
  949                                 if m_sub_sls not in mods:
  950                                     nstate, mods, err = self.render_pstate(
  951                                         m_sub_sls, saltenv, mods, defaults
  952                                     )
  953                                     if nstate:
  954                                         if key:
  955                                             # If key is x:y, convert it to {x: {y: nstate}}
  956                                             for key_fragment in reversed(
  957                                                 key.split(":")
  958                                             ):
  959                                                 nstate = {key_fragment: nstate}
  960                                         if not self.opts.get(
  961                                             "pillar_includes_override_sls", False
  962                                         ):
  963                                             include_states.append(nstate)
  964                                         else:
  965                                             state = merge(
  966                                                 state,
  967                                                 nstate,
  968                                                 self.merge_strategy,
  969                                                 self.opts.get("renderer", "yaml"),
  970                                                 self.opts.get(
  971                                                     "pillar_merge_lists", False
  972                                                 ),
  973                                             )
  974                                     if err:
  975                                         errors += err
  976                         if not self.opts.get("pillar_includes_override_sls", False):
  977                             # merge included state(s) with the current state
  978                             # merged last to ensure that its values are
  979                             # authoritative.
  980                             include_states.append(state)
  981                             state = None
  982                             for s in include_states:
  983                                 if state is None:
  984                                     state = s
  985                                 else:
  986                                     state = merge(
  987                                         state,
  988                                         s,
  989                                         self.merge_strategy,
  990                                         self.opts.get("renderer", "yaml"),
  991                                         self.opts.get("pillar_merge_lists", False),
  992                                     )
  993         return state, mods, errors
  994 
  995     def render_pillar(self, matches, errors=None):
  996         """
  997         Extract the sls pillar files from the matches and render them into the
  998         pillar
  999         """
 1000         pillar = copy.copy(self.pillar_override)
 1001         if errors is None:
 1002             errors = []
 1003         for saltenv, pstates in matches.items():
 1004             pstatefiles = []
 1005             mods = set()
 1006             for sls_match in pstates:
 1007                 matched_pstates = []
 1008                 try:
 1009                     matched_pstates = fnmatch.filter(self.avail[saltenv], sls_match)
 1010                 except KeyError:
 1011                     errors.extend(
 1012                         [
 1013                             "No matching pillar environment for environment "
 1014                             "'{}' found".format(saltenv)
 1015                         ]
 1016                     )
 1017                 if matched_pstates:
 1018                     pstatefiles.extend(matched_pstates)
 1019                 else:
 1020                     pstatefiles.append(sls_match)
 1021 
 1022             for sls in pstatefiles:
 1023                 pstate, mods, err = self.render_pstate(sls, saltenv, mods)
 1024 
 1025                 if err:
 1026                     errors += err
 1027 
 1028                 if pstate is not None:
 1029                     if not isinstance(pstate, dict):
 1030                         log.error(
 1031                             "The rendered pillar sls file, '%s' state did "
 1032                             "not return the expected data format. This is "
 1033                             "a sign of a malformed pillar sls file. Returned "
 1034                             "errors: %s",
 1035                             sls,
 1036                             ", ".join(["'{}'".format(e) for e in errors]),
 1037                         )
 1038                         continue
 1039                     pillar = merge(
 1040                         pillar,
 1041                         pstate,
 1042                         self.merge_strategy,
 1043                         self.opts.get("renderer", "yaml"),
 1044                         self.opts.get("pillar_merge_lists", False),
 1045                     )
 1046 
 1047         return pillar, errors
 1048 
 1049     def _external_pillar_data(self, pillar, val, key):
 1050         """
 1051         Builds actual pillar data structure and updates the ``pillar`` variable
 1052         """
 1053         ext = None
 1054         args = salt.utils.args.get_function_argspec(self.ext_pillars[key]).args
 1055 
 1056         if isinstance(val, dict):
 1057             if ("extra_minion_data" in args) and self.extra_minion_data:
 1058                 ext = self.ext_pillars[key](
 1059                     self.minion_id,
 1060                     pillar,
 1061                     extra_minion_data=self.extra_minion_data,
 1062                     **val
 1063                 )
 1064             else:
 1065                 ext = self.ext_pillars[key](self.minion_id, pillar, **val)
 1066         elif isinstance(val, list):
 1067             if ("extra_minion_data" in args) and self.extra_minion_data:
 1068                 ext = self.ext_pillars[key](
 1069                     self.minion_id,
 1070                     pillar,
 1071                     *val,
 1072                     extra_minion_data=self.extra_minion_data
 1073                 )
 1074             else:
 1075                 ext = self.ext_pillars[key](self.minion_id, pillar, *val)
 1076         else:
 1077             if ("extra_minion_data" in args) and self.extra_minion_data:
 1078                 ext = self.ext_pillars[key](
 1079                     self.minion_id,
 1080                     pillar,
 1081                     val,
 1082                     extra_minion_data=self.extra_minion_data,
 1083                 )
 1084             else:
 1085                 ext = self.ext_pillars[key](self.minion_id, pillar, val)
 1086         return ext
 1087 
 1088     def ext_pillar(self, pillar, errors=None):
 1089         """
 1090         Render the external pillar data
 1091         """
 1092         if errors is None:
 1093             errors = []
 1094         try:
 1095             # Make sure that on-demand git_pillar is fetched before we try to
 1096             # compile the pillar data. git_pillar will fetch a remote when
 1097             # the git ext_pillar() func is run, but only for masterless.
 1098             if self.ext and "git" in self.ext and self.opts.get("__role") != "minion":
 1099                 # Avoid circular import
 1100                 import salt.utils.gitfs
 1101                 import salt.pillar.git_pillar
 1102 
 1103                 git_pillar = salt.utils.gitfs.GitPillar(
 1104                     self.opts,
 1105                     self.ext["git"],
 1106                     per_remote_overrides=salt.pillar.git_pillar.PER_REMOTE_OVERRIDES,
 1107                     per_remote_only=salt.pillar.git_pillar.PER_REMOTE_ONLY,
 1108                     global_only=salt.pillar.git_pillar.GLOBAL_ONLY,
 1109                 )
 1110                 git_pillar.fetch_remotes()
 1111         except TypeError:
 1112             # Handle malformed ext_pillar
 1113             pass
 1114         if "ext_pillar" not in self.opts:
 1115             return pillar, errors
 1116         if not isinstance(self.opts["ext_pillar"], list):
 1117             errors.append('The "ext_pillar" option is malformed')
 1118             log.critical(errors[-1])
 1119             return pillar, errors
 1120         ext = None
 1121         # Bring in CLI pillar data
 1122         if self.pillar_override:
 1123             pillar = merge(
 1124                 pillar,
 1125                 self.pillar_override,
 1126                 self.merge_strategy,
 1127                 self.opts.get("renderer", "yaml"),
 1128                 self.opts.get("pillar_merge_lists", False),
 1129             )
 1130 
 1131         for run in self.opts["ext_pillar"]:
 1132             if not isinstance(run, dict):
 1133                 errors.append('The "ext_pillar" option is malformed')
 1134                 log.critical(errors[-1])
 1135                 return {}, errors
 1136             if next(iter(run.keys())) in self.opts.get("exclude_ext_pillar", []):
 1137                 continue
 1138             for key, val in run.items():
 1139                 if key not in self.ext_pillars:
 1140                     log.critical(
 1141                         "Specified ext_pillar interface %s is unavailable", key
 1142                     )
 1143                     continue
 1144                 try:
 1145                     ext = self._external_pillar_data(pillar, val, key)
 1146                 except Exception as exc:  # pylint: disable=broad-except
 1147                     errors.append(
 1148                         "Failed to load ext_pillar {}: {}".format(key, exc.__str__(),)
 1149                     )
 1150                     log.error(
 1151                         "Exception caught loading ext_pillar '%s':\n%s",
 1152                         key,
 1153                         "".join(traceback.format_tb(sys.exc_info()[2])),
 1154                     )
 1155             if ext:
 1156                 pillar = merge(
 1157                     pillar,
 1158                     ext,
 1159                     self.merge_strategy,
 1160                     self.opts.get("renderer", "yaml"),
 1161                     self.opts.get("pillar_merge_lists", False),
 1162                 )
 1163                 ext = None
 1164         return pillar, errors
 1165 
 1166     def compile_pillar(self, ext=True):
 1167         """
 1168         Render the pillar data and return
 1169         """
 1170         top, top_errors = self.get_top()
 1171         if ext:
 1172             if self.opts.get("ext_pillar_first", False):
 1173                 self.opts["pillar"], errors = self.ext_pillar(self.pillar_override)
 1174                 self.rend = salt.loader.render(self.opts, self.functions)
 1175                 matches = self.top_matches(top, reload=True)
 1176                 pillar, errors = self.render_pillar(matches, errors=errors)
 1177                 pillar = merge(
 1178                     self.opts["pillar"],
 1179                     pillar,
 1180                     self.merge_strategy,
 1181                     self.opts.get("renderer", "yaml"),
 1182                     self.opts.get("pillar_merge_lists", False),
 1183                 )
 1184             else:
 1185                 matches = self.top_matches(top)
 1186                 pillar, errors = self.render_pillar(matches)
 1187                 pillar, errors = self.ext_pillar(pillar, errors=errors)
 1188         else:
 1189             matches = self.top_matches(top)
 1190             pillar, errors = self.render_pillar(matches)
 1191         errors.extend(top_errors)
 1192         if self.opts.get("pillar_opts", False):
 1193             mopts = dict(self.opts)
 1194             if "grains" in mopts:
 1195                 mopts.pop("grains")
 1196             mopts["saltversion"] = __version__
 1197             pillar["master"] = mopts
 1198         if "pillar" in self.opts and self.opts.get("ssh_merge_pillar", False):
 1199             pillar = merge(
 1200                 self.opts["pillar"],
 1201                 pillar,
 1202                 self.merge_strategy,
 1203                 self.opts.get("renderer", "yaml"),
 1204                 self.opts.get("pillar_merge_lists", False),
 1205             )
 1206         if errors:
 1207             for error in errors:
 1208                 log.critical("Pillar render error: %s", error)
 1209             pillar["_errors"] = errors
 1210 
 1211         if self.pillar_override:
 1212             pillar = merge(
 1213                 pillar,
 1214                 self.pillar_override,
 1215                 self.merge_strategy,
 1216                 self.opts.get("renderer", "yaml"),
 1217                 self.opts.get("pillar_merge_lists", False),
 1218             )
 1219 
 1220         decrypt_errors = self.decrypt_pillar(pillar)
 1221         if decrypt_errors:
 1222             pillar.setdefault("_errors", []).extend(decrypt_errors)
 1223         return pillar
 1224 
 1225     def decrypt_pillar(self, pillar):
 1226         """
 1227         Decrypt the specified pillar dictionary items, if configured to do so
 1228         """
 1229         errors = []
 1230         if self.opts.get("decrypt_pillar"):
 1231             decrypt_pillar = self.opts["decrypt_pillar"]
 1232             if not isinstance(decrypt_pillar, dict):
 1233                 decrypt_pillar = salt.utils.data.repack_dictlist(
 1234                     self.opts["decrypt_pillar"]
 1235                 )
 1236             if not decrypt_pillar:
 1237                 errors.append("decrypt_pillar config option is malformed")
 1238             for key, rend in decrypt_pillar.items():
 1239                 ptr = salt.utils.data.traverse_dict(
 1240                     pillar,
 1241                     key,
 1242                     default=None,
 1243                     delimiter=self.opts["decrypt_pillar_delimiter"],
 1244                 )
 1245                 if ptr is None:
 1246                     log.debug("Pillar key %s not present", key)
 1247                     continue
 1248                 try:
 1249                     hash(ptr)
 1250                     immutable = True
 1251                 except TypeError:
 1252                     immutable = False
 1253                 try:
 1254                     ret = salt.utils.crypt.decrypt(
 1255                         ptr,
 1256                         rend or self.opts["decrypt_pillar_default"],
 1257                         renderers=self.rend,
 1258                         opts=self.opts,
 1259                         valid_rend=self.opts["decrypt_pillar_renderers"],
 1260                     )
 1261                     if immutable:
 1262                         # Since the key pointed to an immutable type, we need
 1263                         # to replace it in the pillar dict. First we will find
 1264                         # the parent, and then we will replace the child key
 1265                         # with the return data from the renderer.
 1266                         parent, _, child = key.rpartition(
 1267                             self.opts["decrypt_pillar_delimiter"]
 1268                         )
 1269                         if not parent:
 1270                             # key is a top-level key, so the pointer to the
 1271                             # parent is the pillar dict itself.
 1272                             ptr = pillar
 1273                         else:
 1274                             ptr = salt.utils.data.traverse_dict(
 1275                                 pillar,
 1276                                 parent,
 1277                                 default=None,
 1278                                 delimiter=self.opts["decrypt_pillar_delimiter"],
 1279                             )
 1280                         if ptr is not None:
 1281                             ptr[child] = ret
 1282                 except Exception as exc:  # pylint: disable=broad-except
 1283                     msg = "Failed to decrypt pillar key '{}': {}".format(key, exc)
 1284                     errors.append(msg)
 1285                     log.error(msg, exc_info=True)
 1286         return errors
 1287 
 1288     def destroy(self):
 1289         """
 1290         This method exist in order to be API compatible with RemotePillar
 1291         """
 1292         if self._closing:
 1293             return
 1294         self._closing = True
 1295 
 1296     # pylint: disable=W1701
 1297     def __del__(self):
 1298         self.destroy()
 1299 
 1300     # pylint: enable=W1701
 1301 
 1302 
 1303 # TODO: actually migrate from Pillar to AsyncPillar to allow for futures in
 1304 # ext_pillar etc.
 1305 class AsyncPillar(Pillar):
 1306     @salt.ext.tornado.gen.coroutine
 1307     def compile_pillar(self, ext=True):
 1308         ret = super().compile_pillar(ext=ext)
 1309         raise salt.ext.tornado.gen.Return(ret)