"Fossies" - the Fresh Open Source Software Archive

Member "salt-3002.2/salt/minion.py" (18 Nov 2020, 152184 Bytes) of package /linux/misc/salt-3002.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "minion.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3002.1_vs_3002.2.

    1 """
    2 Routines to set up a minion
    3 """
    4 
    5 import contextlib
    6 import copy
    7 import functools
    8 import logging
    9 import multiprocessing
   10 import os
   11 import random
   12 import signal
   13 import sys
   14 import threading
   15 import time
   16 import traceback
   17 import types
   18 from binascii import crc32
   19 from random import randint, shuffle
   20 from stat import S_IMODE
   21 
   22 import salt
   23 import salt.beacons
   24 import salt.cli.daemons
   25 import salt.client
   26 import salt.crypt
   27 import salt.defaults.events
   28 import salt.defaults.exitcodes
   29 import salt.engines
   30 
   31 # pylint: enable=no-name-in-module,redefined-builtin
   32 import salt.ext.tornado
   33 import salt.ext.tornado.gen  # pylint: disable=F0401
   34 import salt.ext.tornado.ioloop  # pylint: disable=F0401
   35 import salt.loader
   36 import salt.log.setup
   37 import salt.payload
   38 import salt.pillar
   39 import salt.serializers.msgpack
   40 import salt.syspaths
   41 import salt.transport.client
   42 import salt.utils.args
   43 import salt.utils.context
   44 import salt.utils.crypt
   45 import salt.utils.data
   46 import salt.utils.dictupdate
   47 import salt.utils.error
   48 import salt.utils.event
   49 import salt.utils.files
   50 import salt.utils.jid
   51 import salt.utils.minion
   52 import salt.utils.minions
   53 import salt.utils.network
   54 import salt.utils.platform
   55 import salt.utils.process
   56 import salt.utils.schedule
   57 import salt.utils.ssdp
   58 import salt.utils.user
   59 import salt.utils.zeromq
   60 from salt._compat import ipaddress
   61 from salt.config import DEFAULT_MINION_OPTS
   62 from salt.defaults import DEFAULT_TARGET_DELIM
   63 from salt.exceptions import (
   64     CommandExecutionError,
   65     CommandNotFoundError,
   66     SaltClientError,
   67     SaltDaemonNotRunning,
   68     SaltException,
   69     SaltInvocationError,
   70     SaltMasterUnresolvableError,
   71     SaltReqTimeoutError,
   72     SaltSystemExit,
   73 )
   74 
   75 # pylint: disable=import-error,no-name-in-module,redefined-builtin
   76 from salt.ext import six
   77 from salt.ext.six.moves import range
   78 from salt.template import SLS_ENCODING
   79 from salt.utils.ctx import RequestContext
   80 from salt.utils.debug import enable_sigusr1_handler
   81 from salt.utils.event import tagify
   82 from salt.utils.network import parse_host_port
   83 from salt.utils.odict import OrderedDict
   84 from salt.utils.process import ProcessManager, SignalHandlingProcess, default_signals
   85 from salt.utils.zeromq import ZMQ_VERSION_INFO, ZMQDefaultLoop, install_zmq, zmq
   86 
   87 HAS_PSUTIL = False
   88 try:
   89     import salt.utils.psutil_compat as psutil
   90 
   91     HAS_PSUTIL = True
   92 except ImportError:
   93     pass
   94 
   95 HAS_RESOURCE = False
   96 try:
   97     import resource
   98 
   99     HAS_RESOURCE = True
  100 except ImportError:
  101     pass
  102 
  103 try:
  104     import salt.utils.win_functions
  105 
  106     HAS_WIN_FUNCTIONS = True
  107 except ImportError:
  108     HAS_WIN_FUNCTIONS = False
  109 # pylint: enable=import-error
  110 
  111 log = logging.getLogger(__name__)
  112 
  113 # To set up a minion:
  114 # 1. Read in the configuration
  115 # 2. Generate the function mapping dict
  116 # 3. Authenticate with the master
  117 # 4. Store the AES key
  118 # 5. Connect to the publisher
  119 # 6. Handle publications
  120 
  121 
  122 def resolve_dns(opts, fallback=True):
  123     """
  124     Resolves the master_ip and master_uri options
  125     """
  126     ret = {}
  127     check_dns = True
  128     if opts.get("file_client", "remote") == "local" and not opts.get(
  129         "use_master_when_local", False
  130     ):
  131         check_dns = False
  132     # Since salt.log is imported below, salt.utils.network needs to be imported here as well
  133     import salt.utils.network
  134 
  135     if check_dns is True:
  136         try:
  137             if opts["master"] == "":
  138                 raise SaltSystemExit
  139             ret["master_ip"] = salt.utils.network.dns_check(
  140                 opts["master"], int(opts["master_port"]), True, opts["ipv6"]
  141             )
  142         except SaltClientError:
  143             retry_dns_count = opts.get("retry_dns_count", None)
  144             if opts["retry_dns"]:
  145                 while True:
  146                     if retry_dns_count is not None:
  147                         if retry_dns_count == 0:
  148                             raise SaltMasterUnresolvableError
  149                         retry_dns_count -= 1
  150                     import salt.log
  151 
  152                     msg = (
  153                         "Master hostname: '{}' not found or not responsive. "
  154                         "Retrying in {} seconds"
  155                     ).format(opts["master"], opts["retry_dns"])
  156                     if salt.log.setup.is_console_configured():
  157                         log.error(msg)
  158                     else:
  159                         print("WARNING: {}".format(msg))
  160                     time.sleep(opts["retry_dns"])
  161                     try:
  162                         ret["master_ip"] = salt.utils.network.dns_check(
  163                             opts["master"], int(opts["master_port"]), True, opts["ipv6"]
  164                         )
  165                         break
  166                     except SaltClientError:
  167                         pass
  168             else:
  169                 if fallback:
  170                     ret["master_ip"] = "127.0.0.1"
  171                 else:
  172                     raise
  173         except SaltSystemExit:
  174             unknown_str = "unknown address"
  175             master = opts.get("master", unknown_str)
  176             if master == "":
  177                 master = unknown_str
  178             if opts.get("__role") == "syndic":
  179                 err = (
  180                     "Master address: '{}' could not be resolved. Invalid or unresolveable address. "
  181                     "Set 'syndic_master' value in minion config.".format(master)
  182                 )
  183             else:
  184                 err = (
  185                     "Master address: '{}' could not be resolved. Invalid or unresolveable address. "
  186                     "Set 'master' value in minion config.".format(master)
  187                 )
  188             log.error(err)
  189             raise SaltSystemExit(code=42, msg=err)
  190     else:
  191         ret["master_ip"] = "127.0.0.1"
  192 
  193     if "master_ip" in ret and "master_ip" in opts:
  194         if ret["master_ip"] != opts["master_ip"]:
  195             log.warning(
  196                 "Master ip address changed from %s to %s",
  197                 opts["master_ip"],
  198                 ret["master_ip"],
  199             )
  200     if opts["source_interface_name"]:
  201         log.trace("Custom source interface required: %s", opts["source_interface_name"])
  202         interfaces = salt.utils.network.interfaces()
  203         log.trace("The following interfaces are available on this Minion:")
  204         log.trace(interfaces)
  205         if opts["source_interface_name"] in interfaces:
  206             if interfaces[opts["source_interface_name"]]["up"]:
  207                 addrs = (
  208                     interfaces[opts["source_interface_name"]]["inet"]
  209                     if not opts["ipv6"]
  210                     else interfaces[opts["source_interface_name"]]["inet6"]
  211                 )
  212                 ret["source_ip"] = addrs[0]["address"]
  213                 log.debug("Using %s as source IP address", ret["source_ip"])
  214             else:
  215                 log.warning(
  216                     "The interface %s is down so it cannot be used as source to connect to the Master",
  217                     opts["source_interface_name"],
  218                 )
  219         else:
  220             log.warning(
  221                 "%s is not a valid interface. Ignoring.", opts["source_interface_name"]
  222             )
  223     elif opts["source_address"]:
  224         ret["source_ip"] = salt.utils.network.dns_check(
  225             opts["source_address"], int(opts["source_ret_port"]), True, opts["ipv6"]
  226         )
  227         log.debug("Using %s as source IP address", ret["source_ip"])
  228     if opts["source_ret_port"]:
  229         ret["source_ret_port"] = int(opts["source_ret_port"])
  230         log.debug("Using %d as source port for the ret server", ret["source_ret_port"])
  231     if opts["source_publish_port"]:
  232         ret["source_publish_port"] = int(opts["source_publish_port"])
  233         log.debug(
  234             "Using %d as source port for the master pub", ret["source_publish_port"]
  235         )
  236     ret["master_uri"] = "tcp://{ip}:{port}".format(
  237         ip=ret["master_ip"], port=opts["master_port"]
  238     )
  239     log.debug("Master URI: %s", ret["master_uri"])
  240 
  241     return ret
  242 
  243 
  244 def prep_ip_port(opts):
  245     """
  246     parse host:port values from opts['master'] and return valid:
  247         master: ip address or hostname as a string
  248         master_port: (optional) master returner port as integer
  249 
  250     e.g.:
  251       - master: 'localhost:1234' -> {'master': 'localhost', 'master_port': 1234}
  252       - master: '127.0.0.1:1234' -> {'master': '127.0.0.1', 'master_port' :1234}
  253       - master: '[::1]:1234' -> {'master': '::1', 'master_port': 1234}
  254       - master: 'fe80::a00:27ff:fedc:ba98' -> {'master': 'fe80::a00:27ff:fedc:ba98'}
  255     """
  256     ret = {}
  257     # Use given master IP if "ip_only" is set or if master_ip is an ipv6 address without
  258     # a port specified. The is_ipv6 check returns False if brackets are used in the IP
  259     # definition such as master: '[::1]:1234'.
  260     if opts["master_uri_format"] == "ip_only":
  261         ret["master"] = ipaddress.ip_address(opts["master"])
  262     else:
  263         try:
  264             host, port = parse_host_port(opts["master"])
  265         except ValueError as exc:
  266             raise SaltClientError(exc)
  267         ret = {"master": host}
  268         if port:
  269             ret.update({"master_port": port})
  270 
  271     return ret
  272 
  273 
  274 def get_proc_dir(cachedir, **kwargs):
  275     """
  276     Given the cache directory, return the directory that process data is
  277     stored in, creating it if it doesn't exist.
  278     The following optional Keyword Arguments are handled:
  279 
  280     mode: which is anything os.makedir would accept as mode.
  281 
  282     uid: the uid to set, if not set, or it is None or -1 no changes are
  283          made. Same applies if the directory is already owned by this
  284          uid. Must be int. Works only on unix/unix like systems.
  285 
  286     gid: the gid to set, if not set, or it is None or -1 no changes are
  287          made. Same applies if the directory is already owned by this
  288          gid. Must be int. Works only on unix/unix like systems.
  289     """
  290     fn_ = os.path.join(cachedir, "proc")
  291     mode = kwargs.pop("mode", None)
  292 
  293     if mode is None:
  294         mode = {}
  295     else:
  296         mode = {"mode": mode}
  297 
  298     if not os.path.isdir(fn_):
  299         # proc_dir is not present, create it with mode settings
  300         os.makedirs(fn_, **mode)
  301 
  302     d_stat = os.stat(fn_)
  303 
  304     # if mode is not an empty dict then we have an explicit
  305     # dir mode. So lets check if mode needs to be changed.
  306     if mode:
  307         mode_part = S_IMODE(d_stat.st_mode)
  308         if mode_part != mode["mode"]:
  309             os.chmod(fn_, (d_stat.st_mode ^ mode_part) | mode["mode"])
  310 
  311     if hasattr(os, "chown"):
  312         # only on unix/unix like systems
  313         uid = kwargs.pop("uid", -1)
  314         gid = kwargs.pop("gid", -1)
  315 
  316         # if uid and gid are both -1 then go ahead with
  317         # no changes at all
  318         if (d_stat.st_uid != uid or d_stat.st_gid != gid) and [
  319             i for i in (uid, gid) if i != -1
  320         ]:
  321             os.chown(fn_, uid, gid)
  322 
  323     return fn_
  324 
  325 
  326 def load_args_and_kwargs(func, args, data=None, ignore_invalid=False):
  327     """
  328     Detect the args and kwargs that need to be passed to a function call, and
  329     check them against what was passed.
  330     """
  331     argspec = salt.utils.args.get_function_argspec(func)
  332     _args = []
  333     _kwargs = {}
  334     invalid_kwargs = []
  335 
  336     for arg in args:
  337         if isinstance(arg, dict) and arg.pop("__kwarg__", False) is True:
  338             # if the arg is a dict with __kwarg__ == True, then its a kwarg
  339             for key, val in arg.items():
  340                 if argspec.keywords or key in argspec.args:
  341                     # Function supports **kwargs or is a positional argument to
  342                     # the function.
  343                     _kwargs[key] = val
  344                 else:
  345                     # **kwargs not in argspec and parsed argument name not in
  346                     # list of positional arguments. This keyword argument is
  347                     # invalid.
  348                     invalid_kwargs.append("{}={}".format(key, val))
  349             continue
  350 
  351         else:
  352             string_kwarg = salt.utils.args.parse_input([arg], condition=False)[
  353                 1
  354             ]  # pylint: disable=W0632
  355             if string_kwarg:
  356                 if argspec.keywords or next(iter(string_kwarg.keys())) in argspec.args:
  357                     # Function supports **kwargs or is a positional argument to
  358                     # the function.
  359                     _kwargs.update(string_kwarg)
  360                 else:
  361                     # **kwargs not in argspec and parsed argument name not in
  362                     # list of positional arguments. This keyword argument is
  363                     # invalid.
  364                     for key, val in string_kwarg.items():
  365                         invalid_kwargs.append("{}={}".format(key, val))
  366             else:
  367                 _args.append(arg)
  368 
  369     if invalid_kwargs and not ignore_invalid:
  370         salt.utils.args.invalid_kwargs(invalid_kwargs)
  371 
  372     if argspec.keywords and isinstance(data, dict):
  373         # this function accepts **kwargs, pack in the publish data
  374         for key, val in data.items():
  375             _kwargs["__pub_{}".format(key)] = val
  376 
  377     return _args, _kwargs
  378 
  379 
  380 def eval_master_func(opts):
  381     """
  382     Evaluate master function if master type is 'func'
  383     and save it result in opts['master']
  384     """
  385     if "__master_func_evaluated" not in opts:
  386         # split module and function and try loading the module
  387         mod_fun = opts["master"]
  388         mod, fun = mod_fun.split(".")
  389         try:
  390             master_mod = salt.loader.raw_mod(opts, mod, fun)
  391             if not master_mod:
  392                 raise KeyError
  393             # we take whatever the module returns as master address
  394             opts["master"] = master_mod[mod_fun]()
  395             # Check for valid types
  396             if not isinstance(opts["master"], ((str,), list)):
  397                 raise TypeError
  398             opts["__master_func_evaluated"] = True
  399         except KeyError:
  400             log.error("Failed to load module %s", mod_fun)
  401             sys.exit(salt.defaults.exitcodes.EX_GENERIC)
  402         except TypeError:
  403             log.error("%s returned from %s is not a string", opts["master"], mod_fun)
  404             sys.exit(salt.defaults.exitcodes.EX_GENERIC)
  405         log.info("Evaluated master from module: %s", mod_fun)
  406 
  407 
  408 def master_event(type, master=None):
  409     """
  410     Centralized master event function which will return event type based on event_map
  411     """
  412     event_map = {
  413         "connected": "__master_connected",
  414         "disconnected": "__master_disconnected",
  415         "failback": "__master_failback",
  416         "alive": "__master_alive",
  417     }
  418 
  419     if type == "alive" and master is not None:
  420         return "{}_{}".format(event_map.get(type), master)
  421 
  422     return event_map.get(type, None)
  423 
  424 
  425 def service_name():
  426     """
  427     Return the proper service name based on platform
  428     """
  429     return "salt_minion" if "bsd" in sys.platform else "salt-minion"
  430 
  431 
  432 class MinionBase:
  433     def __init__(self, opts):
  434         self.opts = opts
  435         self.beacons_leader = opts.get("beacons_leader", True)
  436 
  437     def gen_modules(self, initial_load=False, context=None):
  438         """
  439         Tell the minion to reload the execution modules
  440 
  441         CLI Example:
  442 
  443         .. code-block:: bash
  444 
  445             salt '*' sys.reload_modules
  446         """
  447         if initial_load:
  448             self.opts["pillar"] = salt.pillar.get_pillar(
  449                 self.opts,
  450                 self.opts["grains"],
  451                 self.opts["id"],
  452                 self.opts["saltenv"],
  453                 pillarenv=self.opts.get("pillarenv"),
  454             ).compile_pillar()
  455 
  456         self.utils = salt.loader.utils(self.opts, context=context)
  457         self.functions = salt.loader.minion_mods(
  458             self.opts, utils=self.utils, context=context
  459         )
  460         self.serializers = salt.loader.serializers(self.opts)
  461         self.returners = salt.loader.returners(
  462             self.opts, functions=self.functions, context=context
  463         )
  464         self.proxy = salt.loader.proxy(
  465             self.opts, functions=self.functions, returners=self.returners
  466         )
  467         # TODO: remove
  468         self.function_errors = {}  # Keep the funcs clean
  469         self.states = salt.loader.states(
  470             self.opts,
  471             functions=self.functions,
  472             utils=self.utils,
  473             serializers=self.serializers,
  474             context=context,
  475         )
  476         self.rend = salt.loader.render(
  477             self.opts, functions=self.functions, context=context
  478         )
  479         #        self.matcher = Matcher(self.opts, self.functions)
  480         self.matchers = salt.loader.matchers(self.opts)
  481         self.functions["sys.reload_modules"] = self.gen_modules
  482         self.executors = salt.loader.executors(
  483             self.opts, functions=self.functions, proxy=self.proxy, context=context
  484         )
  485 
  486     @staticmethod
  487     def process_schedule(minion, loop_interval):
  488         try:
  489             if hasattr(minion, "schedule"):
  490                 minion.schedule.eval()
  491             else:
  492                 log.error(
  493                     "Minion scheduler not initialized. Scheduled jobs will not be run."
  494                 )
  495                 return
  496             # Check if scheduler requires lower loop interval than
  497             # the loop_interval setting
  498             if minion.schedule.loop_interval < loop_interval:
  499                 loop_interval = minion.schedule.loop_interval
  500                 log.debug("Overriding loop_interval because of scheduled jobs.")
  501         except Exception as exc:  # pylint: disable=broad-except
  502             log.error("Exception %s occurred in scheduled job", exc)
  503         return loop_interval
  504 
  505     def process_beacons(self, functions):
  506         """
  507         Evaluate all of the configured beacons, grab the config again in case
  508         the pillar or grains changed
  509         """
  510         if "config.merge" in functions:
  511             b_conf = functions["config.merge"](
  512                 "beacons", self.opts["beacons"], omit_opts=True
  513             )
  514             if b_conf:
  515                 return self.beacons.process(
  516                     b_conf, self.opts["grains"]
  517                 )  # pylint: disable=no-member
  518         return []
  519 
  520     @salt.ext.tornado.gen.coroutine
  521     def eval_master(self, opts, timeout=60, safe=True, failed=False, failback=False):
  522         """
  523         Evaluates and returns a tuple of the current master address and the pub_channel.
  524 
  525         In standard mode, just creates a pub_channel with the given master address.
  526 
  527         With master_type=func evaluates the current master address from the given
  528         module and then creates a pub_channel.
  529 
  530         With master_type=failover takes the list of masters and loops through them.
  531         The first one that allows the minion to create a pub_channel is then
  532         returned. If this function is called outside the minions initialization
  533         phase (for example from the minions main event-loop when a master connection
  534         loss was detected), 'failed' should be set to True. The current
  535         (possibly failed) master will then be removed from the list of masters.
  536         """
  537         # return early if we are not connecting to a master
  538         if opts["master_type"] == "disable":
  539             log.warning("Master is set to disable, skipping connection")
  540             self.connected = False
  541             raise salt.ext.tornado.gen.Return((None, None))
  542 
  543         # Run masters discovery over SSDP. This may modify the whole configuration,
  544         # depending of the networking and sets of masters.
  545         self._discover_masters()
  546 
  547         # check if master_type was altered from its default
  548         if opts["master_type"] != "str" and opts["__role"] != "syndic":
  549             # check for a valid keyword
  550             if opts["master_type"] == "func":
  551                 eval_master_func(opts)
  552 
  553             # if failover or distributed is set, master has to be of type list
  554             elif opts["master_type"] in ("failover", "distributed"):
  555                 if isinstance(opts["master"], list):
  556                     log.info(
  557                         "Got list of available master addresses: %s", opts["master"]
  558                     )
  559 
  560                     if opts["master_type"] == "distributed":
  561                         master_len = len(opts["master"])
  562                         if master_len > 1:
  563                             secondary_masters = opts["master"][1:]
  564                             master_idx = crc32(opts["id"]) % master_len
  565                             try:
  566                                 preferred_masters = opts["master"]
  567                                 preferred_masters[0] = opts["master"][master_idx]
  568                                 preferred_masters[1:] = [
  569                                     m
  570                                     for m in opts["master"]
  571                                     if m != preferred_masters[0]
  572                                 ]
  573                                 opts["master"] = preferred_masters
  574                                 log.info(
  575                                     "Distributed to the master at '%s'.",
  576                                     opts["master"][0],
  577                                 )
  578                             except (KeyError, AttributeError, TypeError):
  579                                 log.warning(
  580                                     "Failed to distribute to a specific master."
  581                                 )
  582                         else:
  583                             log.warning(
  584                                 "master_type = distributed needs more than 1 master."
  585                             )
  586 
  587                     if opts["master_shuffle"]:
  588                         log.warning(
  589                             "Use of 'master_shuffle' detected. 'master_shuffle' is deprecated in favor "
  590                             "of 'random_master'. Please update your minion config file."
  591                         )
  592                         opts["random_master"] = opts["master_shuffle"]
  593 
  594                     opts["auth_tries"] = 0
  595                     if (
  596                         opts["master_failback"]
  597                         and opts["master_failback_interval"] == 0
  598                     ):
  599                         opts["master_failback_interval"] = opts["master_alive_interval"]
  600                 # if opts['master'] is a str and we have never created opts['master_list']
  601                 elif isinstance(opts["master"], str) and ("master_list" not in opts):
  602                     # We have a string, but a list was what was intended. Convert.
  603                     # See issue 23611 for details
  604                     opts["master"] = [opts["master"]]
  605                 elif opts["__role"] == "syndic":
  606                     log.info("Syndic setting master_syndic to '%s'", opts["master"])
  607 
  608                 # if failed=True, the minion was previously connected
  609                 # we're probably called from the minions main-event-loop
  610                 # because a master connection loss was detected. remove
  611                 # the possibly failed master from the list of masters.
  612                 elif failed:
  613                     if failback:
  614                         # failback list of masters to original config
  615                         opts["master"] = opts["master_list"]
  616                     else:
  617                         log.info(
  618                             "Moving possibly failed master %s to the end of "
  619                             "the list of masters",
  620                             opts["master"],
  621                         )
  622                         if opts["master"] in opts["local_masters"]:
  623                             # create new list of master with the possibly failed
  624                             # one moved to the end
  625                             failed_master = opts["master"]
  626                             opts["master"] = [
  627                                 x for x in opts["local_masters"] if opts["master"] != x
  628                             ]
  629                             opts["master"].append(failed_master)
  630                         else:
  631                             opts["master"] = opts["master_list"]
  632                 else:
  633                     msg = (
  634                         "master_type set to 'failover' but 'master' "
  635                         "is not of type list but of type "
  636                         "{}".format(type(opts["master"]))
  637                     )
  638                     log.error(msg)
  639                     sys.exit(salt.defaults.exitcodes.EX_GENERIC)
  640                 # If failover is set, minion have to failover on DNS errors instead of retry DNS resolve.
  641                 # See issue 21082 for details
  642                 if opts["retry_dns"] and opts["master_type"] == "failover":
  643                     msg = (
  644                         "'master_type' set to 'failover' but 'retry_dns' is not 0. "
  645                         "Setting 'retry_dns' to 0 to failover to the next master on DNS errors."
  646                     )
  647                     log.critical(msg)
  648                     opts["retry_dns"] = 0
  649             else:
  650                 msg = "Invalid keyword '{}' for variable " "'master_type'".format(
  651                     opts["master_type"]
  652                 )
  653                 log.error(msg)
  654                 sys.exit(salt.defaults.exitcodes.EX_GENERIC)
  655 
  656         # FIXME: if SMinion don't define io_loop, it can't switch master see #29088
  657         # Specify kwargs for the channel factory so that SMinion doesn't need to define an io_loop
  658         # (The channel factories will set a default if the kwarg isn't passed)
  659         factory_kwargs = {"timeout": timeout, "safe": safe}
  660         if getattr(self, "io_loop", None):
  661             factory_kwargs["io_loop"] = self.io_loop  # pylint: disable=no-member
  662 
  663         tries = opts.get("master_tries", 1)
  664         attempts = 0
  665 
  666         # if we have a list of masters, loop through them and be
  667         # happy with the first one that allows us to connect
  668         if isinstance(opts["master"], list):
  669             conn = False
  670             last_exc = None
  671             opts["master_uri_list"] = []
  672             opts["local_masters"] = copy.copy(opts["master"])
  673 
  674             # shuffle the masters and then loop through them
  675             if opts["random_master"]:
  676                 # master_failback is only used when master_type is set to failover
  677                 if opts["master_type"] == "failover" and opts["master_failback"]:
  678                     secondary_masters = opts["local_masters"][1:]
  679                     shuffle(secondary_masters)
  680                     opts["local_masters"][1:] = secondary_masters
  681                 else:
  682                     shuffle(opts["local_masters"])
  683 
  684             # This sits outside of the connection loop below because it needs to set
  685             # up a list of master URIs regardless of which masters are available
  686             # to connect _to_. This is primarily used for masterless mode, when
  687             # we need a list of master URIs to fire calls back to.
  688             for master in opts["local_masters"]:
  689                 opts["master"] = master
  690                 opts.update(prep_ip_port(opts))
  691                 if opts["master_type"] == "failover":
  692                     try:
  693                         opts["master_uri_list"].append(
  694                             resolve_dns(opts, False)["master_uri"]
  695                         )
  696                     except SaltClientError:
  697                         continue
  698                 else:
  699                     opts["master_uri_list"].append(resolve_dns(opts)["master_uri"])
  700 
  701             if not opts["master_uri_list"]:
  702                 msg = "No master could be resolved"
  703                 log.error(msg)
  704                 raise SaltClientError(msg)
  705 
  706             pub_channel = None
  707             while True:
  708                 if attempts != 0:
  709                     # Give up a little time between connection attempts
  710                     # to allow the IOLoop to run any other scheduled tasks.
  711                     yield salt.ext.tornado.gen.sleep(opts["acceptance_wait_time"])
  712                 attempts += 1
  713                 if tries > 0:
  714                     log.debug("Connecting to master. Attempt %s of %s", attempts, tries)
  715                 else:
  716                     log.debug(
  717                         "Connecting to master. Attempt %s (infinite attempts)", attempts
  718                     )
  719                 for master in opts["local_masters"]:
  720                     opts["master"] = master
  721                     opts.update(prep_ip_port(opts))
  722                     if opts["master_type"] == "failover":
  723                         try:
  724                             opts.update(resolve_dns(opts, False))
  725                         except SaltClientError:
  726                             continue
  727                     else:
  728                         opts.update(resolve_dns(opts))
  729 
  730                     # on first run, update self.opts with the whole master list
  731                     # to enable a minion to re-use old masters if they get fixed
  732                     if "master_list" not in opts:
  733                         opts["master_list"] = copy.copy(opts["local_masters"])
  734 
  735                     self.opts = opts
  736 
  737                     pub_channel = salt.transport.client.AsyncPubChannel.factory(
  738                         opts, **factory_kwargs
  739                     )
  740                     try:
  741                         yield pub_channel.connect()
  742                         conn = True
  743                         break
  744                     except SaltClientError as exc:
  745                         last_exc = exc
  746                         if exc.strerror.startswith("Could not access"):
  747                             msg = (
  748                                 "Failed to initiate connection with Master "
  749                                 "%s: check ownership/permissions. Error "
  750                                 "message: %s",
  751                                 opts["master"],
  752                                 exc,
  753                             )
  754                         else:
  755                             msg = (
  756                                 "Master %s could not be reached, trying next "
  757                                 "next master (if any)",
  758                                 opts["master"],
  759                             )
  760                         log.info(msg)
  761                         pub_channel.close()
  762                         pub_channel = None
  763                         continue
  764 
  765                 if not conn:
  766                     if attempts == tries:
  767                         # Exhausted all attempts. Return exception.
  768                         self.connected = False
  769                         self.opts["master"] = copy.copy(self.opts["local_masters"])
  770                         log.error(
  771                             "No master could be reached or all masters "
  772                             "denied the minion's connection attempt."
  773                         )
  774                         if pub_channel:
  775                             pub_channel.close()
  776                         # If the code reaches this point, 'last_exc'
  777                         # should already be set.
  778                         raise last_exc  # pylint: disable=E0702
  779                 else:
  780                     self.tok = pub_channel.auth.gen_token(b"salt")
  781                     self.connected = True
  782                     raise salt.ext.tornado.gen.Return((opts["master"], pub_channel))
  783 
  784         # single master sign in
  785         else:
  786             if opts["random_master"]:
  787                 log.warning(
  788                     "random_master is True but there is only one master specified. Ignoring."
  789                 )
  790             pub_channel = None
  791             while True:
  792                 if attempts != 0:
  793                     # Give up a little time between connection attempts
  794                     # to allow the IOLoop to run any other scheduled tasks.
  795                     yield salt.ext.tornado.gen.sleep(opts["acceptance_wait_time"])
  796                 attempts += 1
  797                 if tries > 0:
  798                     log.debug("Connecting to master. Attempt %s of %s", attempts, tries)
  799                 else:
  800                     log.debug(
  801                         "Connecting to master. Attempt %s (infinite attempts)", attempts
  802                     )
  803                 opts.update(prep_ip_port(opts))
  804                 opts.update(resolve_dns(opts))
  805                 try:
  806                     if self.opts["transport"] == "detect":
  807                         self.opts["detect_mode"] = True
  808                         for trans in ("zeromq", "tcp"):
  809                             if trans == "zeromq" and not zmq:
  810                                 continue
  811                             self.opts["transport"] = trans
  812                             pub_channel = salt.transport.client.AsyncPubChannel.factory(
  813                                 self.opts, **factory_kwargs
  814                             )
  815                             yield pub_channel.connect()
  816                             if not pub_channel.auth.authenticated:
  817                                 continue
  818                             del self.opts["detect_mode"]
  819                             break
  820                     else:
  821                         pub_channel = salt.transport.client.AsyncPubChannel.factory(
  822                             self.opts, **factory_kwargs
  823                         )
  824                         yield pub_channel.connect()
  825                     self.tok = pub_channel.auth.gen_token(b"salt")
  826                     self.connected = True
  827                     raise salt.ext.tornado.gen.Return((opts["master"], pub_channel))
  828                 except SaltClientError:
  829                     if attempts == tries:
  830                         # Exhausted all attempts. Return exception.
  831                         self.connected = False
  832                         if pub_channel:
  833                             pub_channel.close()
  834                         raise
  835 
  836     def _discover_masters(self):
  837         """
  838         Discover master(s) and decide where to connect, if SSDP is around.
  839         This modifies the configuration on the fly.
  840         :return:
  841         """
  842         if (
  843             self.opts["master"] == DEFAULT_MINION_OPTS["master"]
  844             and self.opts["discovery"] is not False
  845         ):
  846             master_discovery_client = salt.utils.ssdp.SSDPDiscoveryClient()
  847             masters = {}
  848             for att in range(self.opts["discovery"].get("attempts", 3)):
  849                 try:
  850                     att += 1
  851                     log.info("Attempting %s time(s) to discover masters", att)
  852                     masters.update(master_discovery_client.discover())
  853                     if not masters:
  854                         time.sleep(self.opts["discovery"].get("pause", 5))
  855                     else:
  856                         break
  857                 except Exception as err:  # pylint: disable=broad-except
  858                     log.error("SSDP discovery failure: %s", err)
  859                     break
  860 
  861             if masters:
  862                 policy = self.opts.get("discovery", {}).get("match", "any")
  863                 if policy not in ["any", "all"]:
  864                     log.error(
  865                         'SSDP configuration matcher failure: unknown value "%s". '
  866                         'Should be "any" or "all"',
  867                         policy,
  868                     )
  869                 else:
  870                     mapping = self.opts["discovery"].get("mapping", {})
  871                     for addr, mappings in masters.items():
  872                         for proto_data in mappings:
  873                             cnt = len(
  874                                 [
  875                                     key
  876                                     for key, value in mapping.items()
  877                                     if proto_data.get("mapping", {}).get(key) == value
  878                                 ]
  879                             )
  880                             if policy == "any" and bool(cnt) or cnt == len(mapping):
  881                                 self.opts["master"] = proto_data["master"]
  882                                 return
  883 
  884     def _return_retry_timer(self):
  885         """
  886         Based on the minion configuration, either return a randomized timer or
  887         just return the value of the return_retry_timer.
  888         """
  889         msg = "Minion return retry timer set to %s seconds"
  890         if self.opts.get("return_retry_timer_max"):
  891             try:
  892                 random_retry = randint(
  893                     self.opts["return_retry_timer"], self.opts["return_retry_timer_max"]
  894                 )
  895                 retry_msg = msg % random_retry
  896                 log.debug("%s (randomized)", msg % random_retry)
  897                 return random_retry
  898             except ValueError:
  899                 # Catch wiseguys using negative integers here
  900                 log.error(
  901                     "Invalid value (return_retry_timer: %s or "
  902                     "return_retry_timer_max: %s). Both must be positive "
  903                     "integers.",
  904                     self.opts["return_retry_timer"],
  905                     self.opts["return_retry_timer_max"],
  906                 )
  907                 log.debug(msg, DEFAULT_MINION_OPTS["return_retry_timer"])
  908                 return DEFAULT_MINION_OPTS["return_retry_timer"]
  909         else:
  910             log.debug(msg, self.opts.get("return_retry_timer"))
  911             return self.opts.get("return_retry_timer")
  912 
  913 
  914 class SMinion(MinionBase):
  915     """
  916     Create an object that has loaded all of the minion module functions,
  917     grains, modules, returners etc.  The SMinion allows developers to
  918     generate all of the salt minion functions and present them with these
  919     functions for general use.
  920     """
  921 
  922     def __init__(self, opts, context=None):
  923         # Late setup of the opts grains, so we can log from the grains module
  924         import salt.loader
  925 
  926         opts["grains"] = salt.loader.grains(opts)
  927         super().__init__(opts)
  928 
  929         # Clean out the proc directory (default /var/cache/salt/minion/proc)
  930         if self.opts.get("file_client", "remote") == "remote" or self.opts.get(
  931             "use_master_when_local", False
  932         ):
  933             install_zmq()
  934             io_loop = ZMQDefaultLoop.current()
  935             io_loop.run_sync(lambda: self.eval_master(self.opts, failed=True))
  936         self.gen_modules(initial_load=True, context=context or {})
  937 
  938         # If configured, cache pillar data on the minion
  939         if self.opts["file_client"] == "remote" and self.opts.get(
  940             "minion_pillar_cache", False
  941         ):
  942             import salt.utils.yaml
  943 
  944             pdir = os.path.join(self.opts["cachedir"], "pillar")
  945             if not os.path.isdir(pdir):
  946                 os.makedirs(pdir, 0o700)
  947             ptop = os.path.join(pdir, "top.sls")
  948             if self.opts["saltenv"] is not None:
  949                 penv = self.opts["saltenv"]
  950             else:
  951                 penv = "base"
  952             cache_top = {penv: {self.opts["id"]: ["cache"]}}
  953             with salt.utils.files.fopen(ptop, "wb") as fp_:
  954                 salt.utils.yaml.safe_dump(cache_top, fp_, encoding=SLS_ENCODING)
  955                 os.chmod(ptop, 0o600)
  956             cache_sls = os.path.join(pdir, "cache.sls")
  957             with salt.utils.files.fopen(cache_sls, "wb") as fp_:
  958                 salt.utils.yaml.safe_dump(
  959                     self.opts["pillar"], fp_, encoding=SLS_ENCODING
  960                 )
  961                 os.chmod(cache_sls, 0o600)
  962 
  963 
  964 class MasterMinion:
  965     """
  966     Create a fully loaded minion function object for generic use on the
  967     master. What makes this class different is that the pillar is
  968     omitted, otherwise everything else is loaded cleanly.
  969     """
  970 
  971     def __init__(
  972         self,
  973         opts,
  974         returners=True,
  975         states=True,
  976         rend=True,
  977         matcher=True,
  978         whitelist=None,
  979         ignore_config_errors=True,
  980     ):
  981         self.opts = salt.config.minion_config(
  982             opts["conf_file"], ignore_config_errors=ignore_config_errors, role="master"
  983         )
  984         self.opts.update(opts)
  985         self.whitelist = whitelist
  986         self.opts["grains"] = salt.loader.grains(opts)
  987         self.opts["pillar"] = {}
  988         self.mk_returners = returners
  989         self.mk_states = states
  990         self.mk_rend = rend
  991         self.mk_matcher = matcher
  992         self.gen_modules(initial_load=True)
  993 
  994     def gen_modules(self, initial_load=False):
  995         """
  996         Tell the minion to reload the execution modules
  997 
  998         CLI Example:
  999 
 1000         .. code-block:: bash
 1001 
 1002             salt '*' sys.reload_modules
 1003         """
 1004         self.utils = salt.loader.utils(self.opts)
 1005         self.functions = salt.loader.minion_mods(
 1006             self.opts,
 1007             utils=self.utils,
 1008             whitelist=self.whitelist,
 1009             initial_load=initial_load,
 1010         )
 1011         self.serializers = salt.loader.serializers(self.opts)
 1012         if self.mk_returners:
 1013             self.returners = salt.loader.returners(self.opts, self.functions)
 1014         if self.mk_states:
 1015             self.states = salt.loader.states(
 1016                 self.opts, self.functions, self.utils, self.serializers
 1017             )
 1018         if self.mk_rend:
 1019             self.rend = salt.loader.render(self.opts, self.functions)
 1020         if self.mk_matcher:
 1021             self.matchers = salt.loader.matchers(self.opts)
 1022         self.functions["sys.reload_modules"] = self.gen_modules
 1023 
 1024 
 1025 class MinionManager(MinionBase):
 1026     """
 1027     Create a multi minion interface, this creates as many minions as are
 1028     defined in the master option and binds each minion object to a respective
 1029     master.
 1030     """
 1031 
 1032     def __init__(self, opts):
 1033         super().__init__(opts)
 1034         self.auth_wait = self.opts["acceptance_wait_time"]
 1035         self.max_auth_wait = self.opts["acceptance_wait_time_max"]
 1036         self.minions = []
 1037         self.jid_queue = []
 1038 
 1039         install_zmq()
 1040         self.io_loop = ZMQDefaultLoop.current()
 1041         self.process_manager = ProcessManager(name="MultiMinionProcessManager")
 1042         self.io_loop.spawn_callback(
 1043             self.process_manager.run, **{"asynchronous": True}
 1044         )  # Tornado backward compat
 1045 
 1046     # pylint: disable=W1701
 1047     def __del__(self):
 1048         self.destroy()
 1049 
 1050     # pylint: enable=W1701
 1051 
 1052     def _bind(self):
 1053         # start up the event publisher, so we can see events during startup
 1054         self.event_publisher = salt.utils.event.AsyncEventPublisher(
 1055             self.opts, io_loop=self.io_loop,
 1056         )
 1057         self.event = salt.utils.event.get_event(
 1058             "minion", opts=self.opts, io_loop=self.io_loop
 1059         )
 1060         self.event.subscribe("")
 1061         self.event.set_event_handler(self.handle_event)
 1062 
 1063     @salt.ext.tornado.gen.coroutine
 1064     def handle_event(self, package):
 1065         for minion in self.minions:
 1066             minion.handle_event(package)
 1067 
 1068     def _create_minion_object(
 1069         self, opts, timeout, safe, io_loop=None, loaded_base_name=None, jid_queue=None
 1070     ):
 1071         """
 1072         Helper function to return the correct type of object
 1073         """
 1074         return Minion(
 1075             opts,
 1076             timeout,
 1077             safe,
 1078             io_loop=io_loop,
 1079             loaded_base_name=loaded_base_name,
 1080             jid_queue=jid_queue,
 1081         )
 1082 
 1083     def _check_minions(self):
 1084         """
 1085         Check the size of self.minions and raise an error if it's empty
 1086         """
 1087         if not self.minions:
 1088             err = "Minion unable to successfully connect to " "a Salt Master."
 1089             log.error(err)
 1090 
 1091     def _spawn_minions(self, timeout=60):
 1092         """
 1093         Spawn all the coroutines which will sign in to masters
 1094         """
 1095         masters = self.opts["master"]
 1096         if (self.opts["master_type"] in ("failover", "distributed")) or not isinstance(
 1097             self.opts["master"], list
 1098         ):
 1099             masters = [masters]
 1100 
 1101         beacons_leader = True
 1102         for master in masters:
 1103             s_opts = copy.deepcopy(self.opts)
 1104             s_opts["master"] = master
 1105             s_opts["multimaster"] = True
 1106             s_opts["beacons_leader"] = beacons_leader
 1107             if beacons_leader:
 1108                 beacons_leader = False
 1109             minion = self._create_minion_object(
 1110                 s_opts,
 1111                 s_opts["auth_timeout"],
 1112                 False,
 1113                 io_loop=self.io_loop,
 1114                 loaded_base_name="salt.loader.{}".format(s_opts["master"]),
 1115                 jid_queue=self.jid_queue,
 1116             )
 1117             self.io_loop.spawn_callback(self._connect_minion, minion)
 1118         self.io_loop.call_later(timeout, self._check_minions)
 1119 
 1120     @salt.ext.tornado.gen.coroutine
 1121     def _connect_minion(self, minion):
 1122         """
 1123         Create a minion, and asynchronously connect it to a master
 1124         """
 1125         last = 0  # never have we signed in
 1126         auth_wait = minion.opts["acceptance_wait_time"]
 1127         failed = False
 1128         while True:
 1129             try:
 1130                 if minion.opts.get("beacons_before_connect", False):
 1131                     minion.setup_beacons(before_connect=True)
 1132                 if minion.opts.get("scheduler_before_connect", False):
 1133                     minion.setup_scheduler(before_connect=True)
 1134                 yield minion.connect_master(failed=failed)
 1135                 minion.tune_in(start=False)
 1136                 self.minions.append(minion)
 1137                 break
 1138             except SaltClientError as exc:
 1139                 failed = True
 1140                 log.error(
 1141                     "Error while bringing up minion for multi-master. Is "
 1142                     "master at %s responding?",
 1143                     minion.opts["master"],
 1144                 )
 1145                 last = time.time()
 1146                 if auth_wait < self.max_auth_wait:
 1147                     auth_wait += self.auth_wait
 1148                 yield salt.ext.tornado.gen.sleep(auth_wait)  # TODO: log?
 1149             except SaltMasterUnresolvableError:
 1150                 err = (
 1151                     "Master address: '{}' could not be resolved. Invalid or unresolveable address. "
 1152                     "Set 'master' value in minion config.".format(minion.opts["master"])
 1153                 )
 1154                 log.error(err)
 1155                 break
 1156             except Exception as e:  # pylint: disable=broad-except
 1157                 failed = True
 1158                 log.critical(
 1159                     "Unexpected error while connecting to %s",
 1160                     minion.opts["master"],
 1161                     exc_info=True,
 1162                 )
 1163 
 1164     # Multi Master Tune In
 1165     def tune_in(self):
 1166         """
 1167         Bind to the masters
 1168 
 1169         This loop will attempt to create connections to masters it hasn't connected
 1170         to yet, but once the initial connection is made it is up to ZMQ to do the
 1171         reconnect (don't know of an API to get the state here in salt)
 1172         """
 1173         self._bind()
 1174 
 1175         # Fire off all the minion coroutines
 1176         self._spawn_minions()
 1177 
 1178         # serve forever!
 1179         self.io_loop.start()
 1180 
 1181     @property
 1182     def restart(self):
 1183         for minion in self.minions:
 1184             if minion.restart:
 1185                 return True
 1186         return False
 1187 
 1188     def stop(self, signum):
 1189         for minion in self.minions:
 1190             minion.process_manager.stop_restarting()
 1191             minion.process_manager.send_signal_to_processes(signum)
 1192             # kill any remaining processes
 1193             minion.process_manager.kill_children()
 1194             minion.destroy()
 1195 
 1196     def destroy(self):
 1197         for minion in self.minions:
 1198             minion.destroy()
 1199 
 1200 
 1201 class Minion(MinionBase):
 1202     """
 1203     This class instantiates a minion, runs connections for a minion,
 1204     and loads all of the functions into the minion
 1205     """
 1206 
 1207     def __init__(
 1208         self,
 1209         opts,
 1210         timeout=60,
 1211         safe=True,
 1212         loaded_base_name=None,
 1213         io_loop=None,
 1214         jid_queue=None,
 1215     ):  # pylint: disable=W0231
 1216         """
 1217         Pass in the options dict
 1218         """
 1219         # this means that the parent class doesn't know *which* master we connect to
 1220         super().__init__(opts)
 1221         self.timeout = timeout
 1222         self.safe = safe
 1223 
 1224         self._running = None
 1225         self.win_proc = []
 1226         self.subprocess_list = salt.utils.process.SubprocessList()
 1227         self.loaded_base_name = loaded_base_name
 1228         self.connected = False
 1229         self.restart = False
 1230         # Flag meaning minion has finished initialization including first connect to the master.
 1231         # True means the Minion is fully functional and ready to handle events.
 1232         self.ready = False
 1233         self.jid_queue = [] if jid_queue is None else jid_queue
 1234         self.periodic_callbacks = {}
 1235 
 1236         if io_loop is None:
 1237             install_zmq()
 1238             self.io_loop = ZMQDefaultLoop.current()
 1239         else:
 1240             self.io_loop = io_loop
 1241 
 1242         # Warn if ZMQ < 3.2
 1243         if zmq:
 1244             if ZMQ_VERSION_INFO < (3, 2):
 1245                 log.warning(
 1246                     "You have a version of ZMQ less than ZMQ 3.2! There are "
 1247                     "known connection keep-alive issues with ZMQ < 3.2 which "
 1248                     "may result in loss of contact with minions. Please "
 1249                     "upgrade your ZMQ!"
 1250                 )
 1251         # Late setup of the opts grains, so we can log from the grains
 1252         # module.  If this is a proxy, however, we need to init the proxymodule
 1253         # before we can get the grains.  We do this for proxies in the
 1254         # post_master_init
 1255         if not salt.utils.platform.is_proxy():
 1256             if not self.opts.get("grains", {}):
 1257                 self.opts["grains"] = salt.loader.grains(opts)
 1258         else:
 1259             if self.opts.get("beacons_before_connect", False):
 1260                 log.warning(
 1261                     "'beacons_before_connect' is not supported "
 1262                     "for proxy minions. Setting to False"
 1263                 )
 1264                 self.opts["beacons_before_connect"] = False
 1265             if self.opts.get("scheduler_before_connect", False):
 1266                 log.warning(
 1267                     "'scheduler_before_connect' is not supported "
 1268                     "for proxy minions. Setting to False"
 1269                 )
 1270                 self.opts["scheduler_before_connect"] = False
 1271 
 1272         log.info("Creating minion process manager")
 1273 
 1274         if self.opts["random_startup_delay"]:
 1275             sleep_time = random.randint(0, self.opts["random_startup_delay"])
 1276             log.info(
 1277                 "Minion sleeping for %s seconds due to configured "
 1278                 "startup_delay between 0 and %s seconds",
 1279                 sleep_time,
 1280                 self.opts["random_startup_delay"],
 1281             )
 1282             time.sleep(sleep_time)
 1283 
 1284         self.process_manager = ProcessManager(name="MinionProcessManager")
 1285         self.io_loop.spawn_callback(self.process_manager.run, **{"asynchronous": True})
 1286         # We don't have the proxy setup yet, so we can't start engines
 1287         # Engines need to be able to access __proxy__
 1288         if not salt.utils.platform.is_proxy():
 1289             self.io_loop.spawn_callback(
 1290                 salt.engines.start_engines, self.opts, self.process_manager
 1291             )
 1292 
 1293         # Install the SIGINT/SIGTERM handlers if not done so far
 1294         if signal.getsignal(signal.SIGINT) is signal.SIG_DFL:
 1295             # No custom signal handling was added, install our own
 1296             signal.signal(signal.SIGINT, self._handle_signals)
 1297 
 1298         if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL:
 1299             # No custom signal handling was added, install our own
 1300             signal.signal(signal.SIGTERM, self._handle_signals)
 1301 
 1302     def _handle_signals(self, signum, sigframe):  # pylint: disable=unused-argument
 1303         self._running = False
 1304         # escalate the signals to the process manager
 1305         self.process_manager.stop_restarting()
 1306         self.process_manager.send_signal_to_processes(signum)
 1307         # kill any remaining processes
 1308         self.process_manager.kill_children()
 1309         time.sleep(1)
 1310         sys.exit(0)
 1311 
 1312     def sync_connect_master(self, timeout=None, failed=False):
 1313         """
 1314         Block until we are connected to a master
 1315         """
 1316         self._sync_connect_master_success = False
 1317         log.debug("sync_connect_master")
 1318 
 1319         def on_connect_master_future_done(future):
 1320             self._sync_connect_master_success = True
 1321             self.io_loop.stop()
 1322 
 1323         self._connect_master_future = self.connect_master(failed=failed)
 1324         # finish connecting to master
 1325         self._connect_master_future.add_done_callback(on_connect_master_future_done)
 1326         if timeout:
 1327             self.io_loop.call_later(timeout, self.io_loop.stop)
 1328         try:
 1329             self.io_loop.start()
 1330         except KeyboardInterrupt:
 1331             self.destroy()
 1332         # I made the following 3 line oddity to preserve traceback.
 1333         # Please read PR #23978 before changing, hopefully avoiding regressions.
 1334         # Good luck, we're all counting on you.  Thanks.
 1335         if self._connect_master_future.done():
 1336             future_exception = self._connect_master_future.exception()
 1337             if future_exception:
 1338                 # This needs to be re-raised to preserve restart_on_error behavior.
 1339                 raise six.reraise(*future_exception)
 1340         if timeout and self._sync_connect_master_success is False:
 1341             raise SaltDaemonNotRunning("Failed to connect to the salt-master")
 1342 
 1343     @salt.ext.tornado.gen.coroutine
 1344     def connect_master(self, failed=False):
 1345         """
 1346         Return a future which will complete when you are connected to a master
 1347         """
 1348         master, self.pub_channel = yield self.eval_master(
 1349             self.opts, self.timeout, self.safe, failed
 1350         )
 1351         yield self._post_master_init(master)
 1352 
 1353     # TODO: better name...
 1354     @salt.ext.tornado.gen.coroutine
 1355     def _post_master_init(self, master):
 1356         """
 1357         Function to finish init after connecting to a master
 1358 
 1359         This is primarily loading modules, pillars, etc. (since they need
 1360         to know which master they connected to)
 1361 
 1362         If this function is changed, please check ProxyMinion._post_master_init
 1363         to see if those changes need to be propagated.
 1364 
 1365         Minions and ProxyMinions need significantly different post master setups,
 1366         which is why the differences are not factored out into separate helper
 1367         functions.
 1368         """
 1369         if self.connected:
 1370             self.opts["master"] = master
 1371 
 1372             # Initialize pillar before loader to make pillar accessible in modules
 1373             async_pillar = salt.pillar.get_async_pillar(
 1374                 self.opts,
 1375                 self.opts["grains"],
 1376                 self.opts["id"],
 1377                 self.opts["saltenv"],
 1378                 pillarenv=self.opts.get("pillarenv"),
 1379             )
 1380             self.opts["pillar"] = yield async_pillar.compile_pillar()
 1381             async_pillar.destroy()
 1382 
 1383         if not self.ready:
 1384             self._setup_core()
 1385         elif self.connected and self.opts["pillar"]:
 1386             # The pillar has changed due to the connection to the master.
 1387             # Reload the functions so that they can use the new pillar data.
 1388             (
 1389                 self.functions,
 1390                 self.returners,
 1391                 self.function_errors,
 1392                 self.executors,
 1393             ) = self._load_modules()
 1394             if hasattr(self, "schedule"):
 1395                 self.schedule.functions = self.functions
 1396                 self.schedule.returners = self.returners
 1397 
 1398         if not hasattr(self, "schedule"):
 1399             self.schedule = salt.utils.schedule.Schedule(
 1400                 self.opts,
 1401                 self.functions,
 1402                 self.returners,
 1403                 cleanup=[master_event(type="alive")],
 1404             )
 1405 
 1406         # add default scheduling jobs to the minions scheduler
 1407         if self.opts["mine_enabled"] and "mine.update" in self.functions:
 1408             self.schedule.add_job(
 1409                 {
 1410                     "__mine_interval": {
 1411                         "function": "mine.update",
 1412                         "minutes": self.opts["mine_interval"],
 1413                         "jid_include": True,
 1414                         "maxrunning": 2,
 1415                         "run_on_start": True,
 1416                         "return_job": self.opts.get("mine_return_job", False),
 1417                     }
 1418                 },
 1419                 persist=True,
 1420             )
 1421             log.info("Added mine.update to scheduler")
 1422         else:
 1423             self.schedule.delete_job("__mine_interval", persist=True)
 1424 
 1425         # add master_alive job if enabled
 1426         if (
 1427             self.opts["transport"] != "tcp"
 1428             and self.opts["master_alive_interval"] > 0
 1429             and self.connected
 1430         ):
 1431             self.schedule.add_job(
 1432                 {
 1433                     master_event(type="alive", master=self.opts["master"]): {
 1434                         "function": "status.master",
 1435                         "seconds": self.opts["master_alive_interval"],
 1436                         "jid_include": True,
 1437                         "maxrunning": 1,
 1438                         "return_job": False,
 1439                         "kwargs": {"master": self.opts["master"], "connected": True},
 1440                     }
 1441                 },
 1442                 persist=True,
 1443             )
 1444             if (
 1445                 self.opts["master_failback"]
 1446                 and "master_list" in self.opts
 1447                 and self.opts["master"] != self.opts["master_list"][0]
 1448             ):
 1449                 self.schedule.add_job(
 1450                     {
 1451                         master_event(type="failback"): {
 1452                             "function": "status.ping_master",
 1453                             "seconds": self.opts["master_failback_interval"],
 1454                             "jid_include": True,
 1455                             "maxrunning": 1,
 1456                             "return_job": False,
 1457                             "kwargs": {"master": self.opts["master_list"][0]},
 1458                         }
 1459                     },
 1460                     persist=True,
 1461                 )
 1462             else:
 1463                 self.schedule.delete_job(master_event(type="failback"), persist=True)
 1464         else:
 1465             self.schedule.delete_job(
 1466                 master_event(type="alive", master=self.opts["master"]), persist=True
 1467             )
 1468             self.schedule.delete_job(master_event(type="failback"), persist=True)
 1469 
 1470     def _prep_mod_opts(self):
 1471         """
 1472         Returns a copy of the opts with key bits stripped out
 1473         """
 1474         mod_opts = {}
 1475         for key, val in self.opts.items():
 1476             if key == "logger":
 1477                 continue
 1478             mod_opts[key] = val
 1479         return mod_opts
 1480 
 1481     def _load_modules(
 1482         self, force_refresh=False, notify=False, grains=None, opts=None, context=None
 1483     ):
 1484         """
 1485         Return the functions and the returners loaded up from the loader
 1486         module
 1487         """
 1488         opt_in = True
 1489         if not opts:
 1490             opts = self.opts
 1491             opt_in = False
 1492         # if this is a *nix system AND modules_max_memory is set, lets enforce
 1493         # a memory limit on module imports
 1494         # this feature ONLY works on *nix like OSs (resource module doesn't work on windows)
 1495         modules_max_memory = False
 1496         if opts.get("modules_max_memory", -1) > 0 and HAS_PSUTIL and HAS_RESOURCE:
 1497             log.debug(
 1498                 "modules_max_memory set, enforcing a maximum of %s",
 1499                 opts["modules_max_memory"],
 1500             )
 1501             modules_max_memory = True
 1502             old_mem_limit = resource.getrlimit(resource.RLIMIT_AS)
 1503             rss, vms = psutil.Process(os.getpid()).memory_info()[:2]
 1504             mem_limit = rss + vms + opts["modules_max_memory"]
 1505             resource.setrlimit(resource.RLIMIT_AS, (mem_limit, mem_limit))
 1506         elif opts.get("modules_max_memory", -1) > 0:
 1507             if not HAS_PSUTIL:
 1508                 log.error(
 1509                     "Unable to enforce modules_max_memory because psutil is missing"
 1510                 )
 1511             if not HAS_RESOURCE:
 1512                 log.error(
 1513                     "Unable to enforce modules_max_memory because resource is missing"
 1514                 )
 1515 
 1516         # This might be a proxy minion
 1517         if hasattr(self, "proxy"):
 1518             proxy = self.proxy
 1519         else:
 1520             proxy = None
 1521 
 1522         if context is None:
 1523             context = {}
 1524 
 1525         if grains is None:
 1526             opts["grains"] = salt.loader.grains(
 1527                 opts, force_refresh, proxy=proxy, context=context
 1528             )
 1529         self.utils = salt.loader.utils(opts, proxy=proxy, context=context)
 1530 
 1531         if opts.get("multimaster", False):
 1532             s_opts = copy.deepcopy(opts)
 1533             functions = salt.loader.minion_mods(
 1534                 s_opts,
 1535                 utils=self.utils,
 1536                 proxy=proxy,
 1537                 loaded_base_name=self.loaded_base_name,
 1538                 notify=notify,
 1539                 context=context,
 1540             )
 1541         else:
 1542             functions = salt.loader.minion_mods(
 1543                 opts, utils=self.utils, notify=notify, proxy=proxy, context=context,
 1544             )
 1545         returners = salt.loader.returners(opts, functions, proxy=proxy, context=context)
 1546         errors = {}
 1547         if "_errors" in functions:
 1548             errors = functions["_errors"]
 1549             functions.pop("_errors")
 1550 
 1551         # we're done, reset the limits!
 1552         if modules_max_memory is True:
 1553             resource.setrlimit(resource.RLIMIT_AS, old_mem_limit)
 1554 
 1555         executors = salt.loader.executors(opts, functions, proxy=proxy, context=context)
 1556 
 1557         if opt_in:
 1558             self.opts = opts
 1559 
 1560         return functions, returners, errors, executors
 1561 
 1562     def _send_req_sync(self, load, timeout):
 1563 
 1564         if self.opts["minion_sign_messages"]:
 1565             log.trace("Signing event to be published onto the bus.")
 1566             minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
 1567             sig = salt.crypt.sign_message(
 1568                 minion_privkey_path, salt.serializers.msgpack.serialize(load)
 1569             )
 1570             load["sig"] = sig
 1571 
 1572         with salt.transport.client.ReqChannel.factory(self.opts) as channel:
 1573             return channel.send(load, timeout=timeout)
 1574 
 1575     @salt.ext.tornado.gen.coroutine
 1576     def _send_req_async(self, load, timeout):
 1577 
 1578         if self.opts["minion_sign_messages"]:
 1579             log.trace("Signing event to be published onto the bus.")
 1580             minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
 1581             sig = salt.crypt.sign_message(
 1582                 minion_privkey_path, salt.serializers.msgpack.serialize(load)
 1583             )
 1584             load["sig"] = sig
 1585 
 1586         with salt.transport.client.AsyncReqChannel.factory(self.opts) as channel:
 1587             ret = yield channel.send(load, timeout=timeout)
 1588             raise salt.ext.tornado.gen.Return(ret)
 1589 
 1590     def _fire_master(
 1591         self,
 1592         data=None,
 1593         tag=None,
 1594         events=None,
 1595         pretag=None,
 1596         timeout=60,
 1597         sync=True,
 1598         timeout_handler=None,
 1599         include_startup_grains=False,
 1600     ):
 1601         """
 1602         Fire an event on the master, or drop message if unable to send.
 1603         """
 1604         load = {
 1605             "id": self.opts["id"],
 1606             "cmd": "_minion_event",
 1607             "pretag": pretag,
 1608             "tok": self.tok,
 1609         }
 1610         if events:
 1611             load["events"] = events
 1612         elif data and tag:
 1613             load["data"] = data
 1614             load["tag"] = tag
 1615         elif not data and tag:
 1616             load["data"] = {}
 1617             load["tag"] = tag
 1618         else:
 1619             return
 1620 
 1621         if include_startup_grains:
 1622             grains_to_add = {
 1623                 k: v
 1624                 for k, v in self.opts.get("grains", {}).items()
 1625                 if k in self.opts["start_event_grains"]
 1626             }
 1627             load["grains"] = grains_to_add
 1628 
 1629         if sync:
 1630             try:
 1631                 self._send_req_sync(load, timeout)
 1632             except salt.exceptions.SaltReqTimeoutError:
 1633                 log.info(
 1634                     "fire_master failed: master could not be contacted. Request timed out."
 1635                 )
 1636                 return False
 1637             except Exception:  # pylint: disable=broad-except
 1638                 log.info("fire_master failed: %s", traceback.format_exc())
 1639                 return False
 1640         else:
 1641             if timeout_handler is None:
 1642 
 1643                 def handle_timeout(*_):
 1644                     log.info(
 1645                         "fire_master failed: master could not be contacted. Request timed out."
 1646                     )
 1647                     return True
 1648 
 1649                 timeout_handler = handle_timeout
 1650 
 1651             with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
 1652                 # pylint: disable=unexpected-keyword-arg
 1653                 self._send_req_async(load, timeout, callback=lambda f: None)
 1654                 # pylint: enable=unexpected-keyword-arg
 1655         return True
 1656 
 1657     @salt.ext.tornado.gen.coroutine
 1658     def _handle_decoded_payload(self, data):
 1659         """
 1660         Override this method if you wish to handle the decoded data
 1661         differently.
 1662         """
 1663         # Ensure payload is unicode. Disregard failure to decode binary blobs.
 1664         if six.PY2:
 1665             data = salt.utils.data.decode(data, keep=True)
 1666         if "user" in data:
 1667             log.info(
 1668                 "User %s Executing command %s with jid %s",
 1669                 data["user"],
 1670                 data["fun"],
 1671                 data["jid"],
 1672             )
 1673         else:
 1674             log.info("Executing command %s with jid %s", data["fun"], data["jid"])
 1675         log.debug("Command details %s", data)
 1676 
 1677         # Don't duplicate jobs
 1678         log.trace("Started JIDs: %s", self.jid_queue)
 1679         if self.jid_queue is not None:
 1680             if data["jid"] in self.jid_queue:
 1681                 return
 1682             else:
 1683                 self.jid_queue.append(data["jid"])
 1684                 if len(self.jid_queue) > self.opts["minion_jid_queue_hwm"]:
 1685                     self.jid_queue.pop(0)
 1686 
 1687         if isinstance(data["fun"], str):
 1688             if data["fun"] == "sys.reload_modules":
 1689                 (
 1690                     self.functions,
 1691                     self.returners,
 1692                     self.function_errors,
 1693                     self.executors,
 1694                 ) = self._load_modules()
 1695                 self.schedule.functions = self.functions
 1696                 self.schedule.returners = self.returners
 1697 
 1698         process_count_max = self.opts.get("process_count_max")
 1699         if process_count_max > 0:
 1700             process_count = len(salt.utils.minion.running(self.opts))
 1701             while process_count >= process_count_max:
 1702                 log.warning(
 1703                     "Maximum number of processes reached while executing jid %s, waiting...",
 1704                     data["jid"],
 1705                 )
 1706                 yield salt.ext.tornado.gen.sleep(10)
 1707                 process_count = len(salt.utils.minion.running(self.opts))
 1708 
 1709         # We stash an instance references to allow for the socket
 1710         # communication in Windows. You can't pickle functions, and thus
 1711         # python needs to be able to reconstruct the reference on the other
 1712         # side.
 1713         instance = self
 1714         multiprocessing_enabled = self.opts.get("multiprocessing", True)
 1715         if multiprocessing_enabled:
 1716             if sys.platform.startswith("win"):
 1717                 # let python reconstruct the minion on the other side if we're
 1718                 # running on windows
 1719                 instance = None
 1720             with default_signals(signal.SIGINT, signal.SIGTERM):
 1721                 process = SignalHandlingProcess(
 1722                     target=self._target,
 1723                     name="ProcessPayload",
 1724                     args=(instance, self.opts, data, self.connected),
 1725                 )
 1726                 process._after_fork_methods.append(
 1727                     (salt.utils.crypt.reinit_crypto, [], {})
 1728                 )
 1729         else:
 1730             process = threading.Thread(
 1731                 target=self._target,
 1732                 args=(instance, self.opts, data, self.connected),
 1733                 name=data["jid"],
 1734             )
 1735 
 1736         if multiprocessing_enabled:
 1737             with default_signals(signal.SIGINT, signal.SIGTERM):
 1738                 # Reset current signals before starting the process in
 1739                 # order not to inherit the current signal handlers
 1740                 process.start()
 1741         else:
 1742             process.start()
 1743         process.name = "{}-Job-{}".format(process.name, data["jid"])
 1744         self.subprocess_list.add(process)
 1745 
 1746     def ctx(self):
 1747         """
 1748         Return a single context manager for the minion's data
 1749         """
 1750         exitstack = contextlib.ExitStack()
 1751         exitstack.enter_context(self.functions.context_dict.clone())
 1752         exitstack.enter_context(self.returners.context_dict.clone())
 1753         exitstack.enter_context(self.executors.context_dict.clone())
 1754         return exitstack
 1755 
 1756     @classmethod
 1757     def _target(cls, minion_instance, opts, data, connected):
 1758         if not minion_instance:
 1759             minion_instance = cls(opts)
 1760             minion_instance.connected = connected
 1761             if not hasattr(minion_instance, "functions"):
 1762                 (
 1763                     functions,
 1764                     returners,
 1765                     function_errors,
 1766                     executors,
 1767                 ) = minion_instance._load_modules(grains=opts["grains"])
 1768                 minion_instance.functions = functions
 1769                 minion_instance.returners = returners
 1770                 minion_instance.function_errors = function_errors
 1771                 minion_instance.executors = executors
 1772             if not hasattr(minion_instance, "serial"):
 1773                 minion_instance.serial = salt.payload.Serial(opts)
 1774             if not hasattr(minion_instance, "proc_dir"):
 1775                 uid = salt.utils.user.get_uid(user=opts.get("user", None))
 1776                 minion_instance.proc_dir = get_proc_dir(opts["cachedir"], uid=uid)
 1777 
 1778         def run_func(minion_instance, opts, data):
 1779             if isinstance(data["fun"], tuple) or isinstance(data["fun"], list):
 1780                 return Minion._thread_multi_return(minion_instance, opts, data)
 1781             else:
 1782                 return Minion._thread_return(minion_instance, opts, data)
 1783 
 1784         with salt.ext.tornado.stack_context.StackContext(
 1785             functools.partial(RequestContext, {"data": data, "opts": opts})
 1786         ):
 1787             with salt.ext.tornado.stack_context.StackContext(minion_instance.ctx):
 1788                 run_func(minion_instance, opts, data)
 1789 
 1790     def _execute_job_function(
 1791         self, function_name, function_args, executors, opts, data
 1792     ):
 1793         """
 1794         Executes a function within a job given it's name, the args and the executors.
 1795         It also checks if the function is allowed to run if 'blackout mode' is enabled.
 1796         """
 1797         minion_blackout_violation = False
 1798         if self.connected and self.opts["pillar"].get("minion_blackout", False):
 1799             whitelist = self.opts["pillar"].get("minion_blackout_whitelist", [])
 1800             # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
 1801             if (
 1802                 function_name != "saltutil.refresh_pillar"
 1803                 and function_name not in whitelist
 1804             ):
 1805                 minion_blackout_violation = True
 1806         # use minion_blackout_whitelist from grains if it exists
 1807         if self.opts["grains"].get("minion_blackout", False):
 1808             whitelist = self.opts["grains"].get("minion_blackout_whitelist", [])
 1809             if (
 1810                 function_name != "saltutil.refresh_pillar"
 1811                 and function_name not in whitelist
 1812             ):
 1813                 minion_blackout_violation = True
 1814         if minion_blackout_violation:
 1815             raise SaltInvocationError(
 1816                 "Minion in blackout mode. Set 'minion_blackout' "
 1817                 "to False in pillar or grains to resume operations. Only "
 1818                 "saltutil.refresh_pillar allowed in blackout mode."
 1819             )
 1820 
 1821         if function_name in self.functions:
 1822             func = self.functions[function_name]
 1823             args, kwargs = load_args_and_kwargs(func, function_args, data)
 1824         else:
 1825             # only run if function_name is not in minion_instance.functions and allow_missing_funcs is True
 1826             func = function_name
 1827             args, kwargs = function_args, data
 1828         self.functions.pack["__context__"]["retcode"] = 0
 1829 
 1830         if isinstance(executors, str):
 1831             executors = [executors]
 1832         elif not isinstance(executors, list) or not executors:
 1833             raise SaltInvocationError(
 1834                 "Wrong executors specification: {}. String or non-empty list expected".format(
 1835                     executors
 1836                 )
 1837             )
 1838         if opts.get("sudo_user", "") and executors[-1] != "sudo":
 1839             executors[-1] = "sudo"  # replace the last one with sudo
 1840         log.trace("Executors list %s", executors)  # pylint: disable=no-member
 1841 
 1842         for name in executors:
 1843             fname = "{}.execute".format(name)
 1844             if fname not in self.executors:
 1845                 raise SaltInvocationError("Executor '{}' is not available".format(name))
 1846             return_data = self.executors[fname](opts, data, func, args, kwargs)
 1847             if return_data is not None:
 1848                 return return_data
 1849 
 1850         return None
 1851 
 1852     @classmethod
 1853     def _thread_return(cls, minion_instance, opts, data):
 1854         """
 1855         This method should be used as a threading target, start the actual
 1856         minion side execution.
 1857         """
 1858         minion_instance.gen_modules()
 1859         fn_ = os.path.join(minion_instance.proc_dir, data["jid"])
 1860 
 1861         salt.utils.process.appendproctitle(
 1862             "{}._thread_return {}".format(cls.__name__, data["jid"])
 1863         )
 1864 
 1865         sdata = {"pid": os.getpid()}
 1866         sdata.update(data)
 1867         log.info("Starting a new job %s with PID %s", data["jid"], sdata["pid"])
 1868         with salt.utils.files.fopen(fn_, "w+b") as fp_:
 1869             fp_.write(minion_instance.serial.dumps(sdata))
 1870         ret = {"success": False}
 1871         function_name = data["fun"]
 1872         function_args = data["arg"]
 1873         executors = (
 1874             data.get("module_executors")
 1875             or getattr(minion_instance, "module_executors", [])
 1876             or opts.get("module_executors", ["direct_call"])
 1877         )
 1878         allow_missing_funcs = any(
 1879             [
 1880                 minion_instance.executors["{}.allow_missing_func".format(executor)](
 1881                     function_name
 1882                 )
 1883                 for executor in executors
 1884                 if "{}.allow_missing_func".format(executor) in minion_instance.executors
 1885             ]
 1886         )
 1887         if function_name in minion_instance.functions or allow_missing_funcs is True:
 1888             try:
 1889                 return_data = minion_instance._execute_job_function(
 1890                     function_name, function_args, executors, opts, data
 1891                 )
 1892 
 1893                 if isinstance(return_data, types.GeneratorType):
 1894                     ind = 0
 1895                     iret = {}
 1896                     for single in return_data:
 1897                         if isinstance(single, dict) and isinstance(iret, dict):
 1898                             iret.update(single)
 1899                         else:
 1900                             if not iret:
 1901                                 iret = []
 1902                             iret.append(single)
 1903                         tag = tagify([data["jid"], "prog", opts["id"], str(ind)], "job")
 1904                         event_data = {"return": single}
 1905                         minion_instance._fire_master(event_data, tag)
 1906                         ind += 1
 1907                     ret["return"] = iret
 1908                 else:
 1909                     ret["return"] = return_data
 1910 
 1911                 retcode = minion_instance.functions.pack["__context__"].get(
 1912                     "retcode", salt.defaults.exitcodes.EX_OK
 1913                 )
 1914                 if retcode == salt.defaults.exitcodes.EX_OK:
 1915                     # No nonzero retcode in __context__ dunder. Check if return
 1916                     # is a dictionary with a "result" or "success" key.
 1917                     try:
 1918                         func_result = all(
 1919                             return_data.get(x, True) for x in ("result", "success")
 1920                         )
 1921                     except Exception:  # pylint: disable=broad-except
 1922                         # return data is not a dict
 1923                         func_result = True
 1924                     if not func_result:
 1925                         retcode = salt.defaults.exitcodes.EX_GENERIC
 1926 
 1927                 ret["retcode"] = retcode
 1928                 ret["success"] = retcode == salt.defaults.exitcodes.EX_OK
 1929             except CommandNotFoundError as exc:
 1930                 msg = "Command required for '{}' not found".format(function_name)
 1931                 log.debug(msg, exc_info=True)
 1932                 ret["return"] = "{}: {}".format(msg, exc)
 1933                 ret["out"] = "nested"
 1934                 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
 1935             except CommandExecutionError as exc:
 1936                 log.error(
 1937                     "A command in '%s' had a problem: %s",
 1938                     function_name,
 1939                     exc,
 1940                     exc_info_on_loglevel=logging.DEBUG,
 1941                 )
 1942                 ret["return"] = "ERROR: {}".format(exc)
 1943                 ret["out"] = "nested"
 1944                 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
 1945             except SaltInvocationError as exc:
 1946                 log.error(
 1947                     "Problem executing '%s': %s",
 1948                     function_name,
 1949                     exc,
 1950                     exc_info_on_loglevel=logging.DEBUG,
 1951                 )
 1952                 ret["return"] = "ERROR executing '{}': {}".format(function_name, exc)
 1953                 ret["out"] = "nested"
 1954                 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
 1955             except TypeError as exc:
 1956                 msg = "Passed invalid arguments to {}: {}\n{}".format(
 1957                     function_name,
 1958                     exc,
 1959                     minion_instance.functions[function_name].__doc__ or "",
 1960                 )
 1961                 log.warning(msg, exc_info_on_loglevel=logging.DEBUG)
 1962                 ret["return"] = msg
 1963                 ret["out"] = "nested"
 1964                 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
 1965             except Exception:  # pylint: disable=broad-except
 1966                 msg = "The minion function caused an exception"
 1967                 log.warning(msg, exc_info_on_loglevel=True)
 1968                 salt.utils.error.fire_exception(
 1969                     salt.exceptions.MinionError(msg), opts, job=data
 1970                 )
 1971                 ret["return"] = "{}: {}".format(msg, traceback.format_exc())
 1972                 ret["out"] = "nested"
 1973                 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
 1974         else:
 1975             docs = minion_instance.functions["sys.doc"]("{}*".format(function_name))
 1976             if docs:
 1977                 docs[function_name] = minion_instance.functions.missing_fun_string(
 1978                     function_name
 1979                 )
 1980                 ret["return"] = docs
 1981             else:
 1982                 ret["return"] = minion_instance.functions.missing_fun_string(
 1983                     function_name
 1984                 )
 1985                 mod_name = function_name.split(".")[0]
 1986                 if mod_name in minion_instance.function_errors:
 1987                     ret["return"] += " Possible reasons: '{}'".format(
 1988                         minion_instance.function_errors[mod_name]
 1989                     )
 1990             ret["success"] = False
 1991             ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
 1992             ret["out"] = "nested"
 1993 
 1994         ret["jid"] = data["jid"]
 1995         ret["fun"] = data["fun"]
 1996         ret["fun_args"] = data["arg"]
 1997         if "master_id" in data:
 1998             ret["master_id"] = data["master_id"]
 1999         if "metadata" in data:
 2000             if isinstance(data["metadata"], dict):
 2001                 ret["metadata"] = data["metadata"]
 2002             else:
 2003                 log.warning("The metadata parameter must be a dictionary. Ignoring.")
 2004         if minion_instance.connected:
 2005             minion_instance._return_pub(
 2006                 ret, timeout=minion_instance._return_retry_timer()
 2007             )
 2008 
 2009         # Add default returners from minion config
 2010         # Should have been coverted to comma-delimited string already
 2011         if isinstance(opts.get("return"), str):
 2012             if data["ret"]:
 2013                 data["ret"] = ",".join((data["ret"], opts["return"]))
 2014             else:
 2015                 data["ret"] = opts["return"]
 2016 
 2017         log.debug("minion return: %s", ret)
 2018         # TODO: make a list? Seems odd to split it this late :/
 2019         if data["ret"] and isinstance(data["ret"], str):
 2020             if "ret_config" in data:
 2021                 ret["ret_config"] = data["ret_config"]
 2022             if "ret_kwargs" in data:
 2023                 ret["ret_kwargs"] = data["ret_kwargs"]
 2024             ret["id"] = opts["id"]
 2025             for returner in set(data["ret"].split(",")):
 2026                 try:
 2027                     returner_str = "{}.returner".format(returner)
 2028                     if returner_str in minion_instance.returners:
 2029                         minion_instance.returners[returner_str](ret)
 2030                     else:
 2031                         returner_err = minion_instance.returners.missing_fun_string(
 2032                             returner_str
 2033                         )
 2034                         log.error(
 2035                             "Returner %s could not be loaded: %s",
 2036                             returner_str,
 2037                             returner_err,
 2038                         )
 2039                 except Exception as exc:  # pylint: disable=broad-except
 2040                     log.exception("The return failed for job %s: %s", data["jid"], exc)
 2041 
 2042     @classmethod
 2043     def _thread_multi_return(cls, minion_instance, opts, data):
 2044         """
 2045         This method should be used as a threading target, start the actual
 2046         minion side execution.
 2047         """
 2048         minion_instance.gen_modules()
 2049         fn_ = os.path.join(minion_instance.proc_dir, data["jid"])
 2050 
 2051         salt.utils.process.appendproctitle(
 2052             "{}._thread_multi_return {}".format(cls.__name__, data["jid"])
 2053         )
 2054 
 2055         sdata = {"pid": os.getpid()}
 2056         sdata.update(data)
 2057         log.info("Starting a new job with PID %s", sdata["pid"])
 2058         with salt.utils.files.fopen(fn_, "w+b") as fp_:
 2059             fp_.write(minion_instance.serial.dumps(sdata))
 2060 
 2061         multifunc_ordered = opts.get("multifunc_ordered", False)
 2062         num_funcs = len(data["fun"])
 2063         if multifunc_ordered:
 2064             ret = {
 2065                 "return": [None] * num_funcs,
 2066                 "retcode": [None] * num_funcs,
 2067                 "success": [False] * num_funcs,
 2068             }
 2069         else:
 2070             ret = {"return": {}, "retcode": {}, "success": {}}
 2071         executors = (
 2072             data.get("module_executors")
 2073             or getattr(minion_instance, "module_executors", [])
 2074             or opts.get("module_executors", ["direct_call"])
 2075         )
 2076 
 2077         for ind in range(0, num_funcs):
 2078             function_name = data["fun"][ind]
 2079             function_args = data["arg"][ind]
 2080             if not multifunc_ordered:
 2081                 ret["success"][function_name] = False
 2082             try:
 2083                 return_data = minion_instance._execute_job_function(
 2084                     function_name, function_args, executors, opts, data
 2085                 )
 2086 
 2087                 key = ind if multifunc_ordered else data["fun"][ind]
 2088                 ret["return"][key] = return_data
 2089                 retcode = minion_instance.functions.pack["__context__"].get(
 2090                     "retcode", 0
 2091                 )
 2092                 if retcode == 0:
 2093                     # No nonzero retcode in __context__ dunder. Check if return
 2094                     # is a dictionary with a "result" or "success" key.
 2095                     try:
 2096                         func_result = all(
 2097                             ret["return"][key].get(x, True)
 2098                             for x in ("result", "success")
 2099                         )
 2100                     except Exception:  # pylint: disable=broad-except
 2101                         # return data is not a dict
 2102                         func_result = True
 2103                     if not func_result:
 2104                         retcode = 1
 2105 
 2106                 ret["retcode"][key] = retcode
 2107                 ret["success"][key] = retcode == 0
 2108             except Exception as exc:  # pylint: disable=broad-except
 2109                 trb = traceback.format_exc()
 2110                 log.warning("The minion function caused an exception: %s", exc)
 2111                 if multifunc_ordered:
 2112                     ret["return"][ind] = trb
 2113                 else:
 2114                     ret["return"][data["fun"][ind]] = trb
 2115             ret["jid"] = data["jid"]
 2116             ret["fun"] = data["fun"]
 2117             ret["fun_args"] = data["arg"]
 2118         if "metadata" in data:
 2119             ret["metadata"] = data["metadata"]
 2120         if minion_instance.connected:
 2121             minion_instance._return_pub(
 2122                 ret, timeout=minion_instance._return_retry_timer()
 2123             )
 2124         if data["ret"]:
 2125             if "ret_config" in data:
 2126                 ret["ret_config"] = data["ret_config"]
 2127             if "ret_kwargs" in data:
 2128                 ret["ret_kwargs"] = data["ret_kwargs"]
 2129             for returner in set(data["ret"].split(",")):
 2130                 ret["id"] = opts["id"]
 2131                 try:
 2132                     minion_instance.returners["{}.returner".format(returner)](ret)
 2133                 except Exception as exc:  # pylint: disable=broad-except
 2134                     log.error("The return failed for job %s: %s", data["jid"], exc)
 2135 
 2136     def _return_pub(self, ret, ret_cmd="_return", timeout=60, sync=True):
 2137         """
 2138         Return the data from the executed command to the master server
 2139         """
 2140         jid = ret.get("jid", ret.get("__jid__"))
 2141         fun = ret.get("fun", ret.get("__fun__"))
 2142         if self.opts["multiprocessing"]:
 2143             fn_ = os.path.join(self.proc_dir, jid)
 2144             if os.path.isfile(fn_):
 2145                 try:
 2146                     os.remove(fn_)
 2147                 except OSError:
 2148                     # The file is gone already
 2149                     pass
 2150         log.info("Returning information for job: %s", jid)
 2151         log.trace("Return data: %s", ret)
 2152         if ret_cmd == "_syndic_return":
 2153             load = {
 2154                 "cmd": ret_cmd,
 2155                 "id": self.opts["uid"],
 2156                 "jid": jid,
 2157                 "fun": fun,
 2158                 "arg": ret.get("arg"),
 2159                 "tgt": ret.get("tgt"),
 2160                 "tgt_type": ret.get("tgt_type"),
 2161                 "load": ret.get("__load__"),
 2162             }
 2163             if "__master_id__" in ret:
 2164                 load["master_id"] = ret["__master_id__"]
 2165             load["return"] = {}
 2166             for key, value in ret.items():
 2167                 if key.startswith("__"):
 2168                     continue
 2169                 load["return"][key] = value
 2170         else:
 2171             load = {"cmd": ret_cmd, "id": self.opts["id"]}
 2172             for key, value in ret.items():
 2173                 load[key] = value
 2174 
 2175         if "out" in ret:
 2176             if isinstance(ret["out"], str):
 2177                 load["out"] = ret["out"]
 2178             else:
 2179                 log.error("Invalid outputter %s. This is likely a bug.", ret["out"])
 2180         else:
 2181             try:
 2182                 oput = self.functions[fun].__outputter__
 2183             except (KeyError, AttributeError, TypeError):
 2184                 pass
 2185             else:
 2186                 if isinstance(oput, str):
 2187                     load["out"] = oput
 2188         if self.opts["cache_jobs"]:
 2189             # Local job cache has been enabled
 2190             if ret["jid"] == "req":
 2191                 ret["jid"] = salt.utils.jid.gen_jid(self.opts)
 2192             salt.utils.minion.cache_jobs(self.opts, ret["jid"], ret)
 2193 
 2194         if not self.opts["pub_ret"]:
 2195             return ""
 2196 
 2197         def timeout_handler(*_):
 2198             log.warning(
 2199                 "The minion failed to return the job information for job %s. "
 2200                 "This is often due to the master being shut down or "
 2201                 "overloaded. If the master is running, consider increasing "
 2202                 "the worker_threads value.",
 2203                 jid,
 2204             )
 2205             return True
 2206 
 2207         if sync:
 2208             try:
 2209                 ret_val = self._send_req_sync(load, timeout=timeout)
 2210             except SaltReqTimeoutError:
 2211                 timeout_handler()
 2212                 return ""
 2213         else:
 2214             with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
 2215                 # pylint: disable=unexpected-keyword-arg
 2216                 ret_val = self._send_req_async(
 2217                     load, timeout=timeout, callback=lambda f: None
 2218                 )
 2219                 # pylint: enable=unexpected-keyword-arg
 2220 
 2221         log.trace("ret_val = %s", ret_val)  # pylint: disable=no-member
 2222         return ret_val
 2223 
 2224     def _return_pub_multi(self, rets, ret_cmd="_return", timeout=60, sync=True):
 2225         """
 2226         Return the data from the executed command to the master server
 2227         """
 2228         if not isinstance(rets, list):
 2229             rets = [rets]
 2230         jids = {}
 2231         for ret in rets:
 2232             jid = ret.get("jid", ret.get("__jid__"))
 2233             fun = ret.get("fun", ret.get("__fun__"))
 2234             if self.opts["multiprocessing"]:
 2235                 fn_ = os.path.join(self.proc_dir, jid)
 2236                 if os.path.isfile(fn_):
 2237                     try:
 2238                         os.remove(fn_)
 2239                     except OSError:
 2240                         # The file is gone already
 2241                         pass
 2242             log.info("Returning information for job: %s", jid)
 2243             load = jids.setdefault(jid, {})
 2244             if ret_cmd == "_syndic_return":
 2245                 if not load:
 2246                     load.update(
 2247                         {
 2248                             "id": self.opts["id"],
 2249                             "jid": jid,
 2250                             "fun": fun,
 2251                             "arg": ret.get("arg"),
 2252                             "tgt": ret.get("tgt"),
 2253                             "tgt_type": ret.get("tgt_type"),
 2254                             "load": ret.get("__load__"),
 2255                             "return": {},
 2256                         }
 2257                     )
 2258                 if "__master_id__" in ret:
 2259                     load["master_id"] = ret["__master_id__"]
 2260                 for key, value in ret.items():
 2261                     if key.startswith("__"):
 2262                         continue
 2263                     load["return"][key] = value
 2264             else:
 2265                 load.update({"id": self.opts["id"]})
 2266                 for key, value in ret.items():
 2267                     load[key] = value
 2268 
 2269             if "out" in ret:
 2270                 if isinstance(ret["out"], str):
 2271                     load["out"] = ret["out"]
 2272                 else:
 2273                     log.error("Invalid outputter %s. This is likely a bug.", ret["out"])
 2274             else:
 2275                 try:
 2276                     oput = self.functions[fun].__outputter__
 2277                 except (KeyError, AttributeError, TypeError):
 2278                     pass
 2279                 else:
 2280                     if isinstance(oput, str):
 2281                         load["out"] = oput
 2282             if self.opts["cache_jobs"]:
 2283                 # Local job cache has been enabled
 2284                 salt.utils.minion.cache_jobs(self.opts, load["jid"], ret)
 2285 
 2286         load = {"cmd": ret_cmd, "load": list(jids.values())}
 2287 
 2288         def timeout_handler(*_):
 2289             log.warning(
 2290                 "The minion failed to return the job information for job %s. "
 2291                 "This is often due to the master being shut down or "
 2292                 "overloaded. If the master is running, consider increasing "
 2293                 "the worker_threads value.",
 2294                 jid,
 2295             )
 2296             return True
 2297 
 2298         if sync:
 2299             try:
 2300                 ret_val = self._send_req_sync(load, timeout=timeout)
 2301             except SaltReqTimeoutError:
 2302                 timeout_handler()
 2303                 return ""
 2304         else:
 2305             with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
 2306                 # pylint: disable=unexpected-keyword-arg
 2307                 ret_val = self._send_req_async(
 2308                     load, timeout=timeout, callback=lambda f: None
 2309                 )
 2310                 # pylint: enable=unexpected-keyword-arg
 2311 
 2312         log.trace("ret_val = %s", ret_val)  # pylint: disable=no-member
 2313         return ret_val
 2314 
 2315     def _state_run(self):
 2316         """
 2317         Execute a state run based on information set in the minion config file
 2318         """
 2319         if self.opts["startup_states"]:
 2320             if (
 2321                 self.opts.get("master_type", "str") == "disable"
 2322                 and self.opts.get("file_client", "remote") == "remote"
 2323             ):
 2324                 log.warning(
 2325                     "Cannot run startup_states when 'master_type' is set "
 2326                     "to 'disable' and 'file_client' is set to "
 2327                     "'remote'. Skipping."
 2328                 )
 2329             else:
 2330                 data = {"jid": "req", "ret": self.opts.get("ext_job_cache", "")}
 2331                 if self.opts["startup_states"] == "sls":
 2332                     data["fun"] = "state.sls"
 2333                     data["arg"] = [self.opts["sls_list"]]
 2334                 elif self.opts["startup_states"] == "top":
 2335                     data["fun"] = "state.top"
 2336                     data["arg"] = [self.opts["top_file"]]
 2337                 else:
 2338                     data["fun"] = "state.highstate"
 2339                     data["arg"] = []
 2340                 self._handle_decoded_payload(data)
 2341 
 2342     def _refresh_grains_watcher(self, refresh_interval_in_minutes):
 2343         """
 2344         Create a loop that will fire a pillar refresh to inform a master about a change in the grains of this minion
 2345         :param refresh_interval_in_minutes:
 2346         :return: None
 2347         """
 2348         if "__update_grains" not in self.opts.get("schedule", {}):
 2349             if "schedule" not in self.opts:
 2350                 self.opts["schedule"] = {}
 2351             self.opts["schedule"].update(
 2352                 {
 2353                     "__update_grains": {
 2354                         "function": "event.fire",
 2355                         "args": [{}, "grains_refresh"],
 2356                         "minutes": refresh_interval_in_minutes,
 2357                     }
 2358                 }
 2359             )
 2360 
 2361     def _fire_master_minion_start(self):
 2362         include_grains = False
 2363         if self.opts["start_event_grains"]:
 2364             include_grains = True
 2365         # Send an event to the master that the minion is live
 2366         if self.opts["enable_legacy_startup_events"]:
 2367             # Old style event. Defaults to False in 3001 release.
 2368             self._fire_master(
 2369                 "Minion {} started at {}".format(self.opts["id"], time.asctime()),
 2370                 "minion_start",
 2371                 include_startup_grains=include_grains,
 2372             )
 2373         # send name spaced event
 2374         self._fire_master(
 2375             "Minion {} started at {}".format(self.opts["id"], time.asctime()),
 2376             tagify([self.opts["id"], "start"], "minion"),
 2377             include_startup_grains=include_grains,
 2378         )
 2379 
 2380     def module_refresh(self, force_refresh=False, notify=False):
 2381         """
 2382         Refresh the functions and returners.
 2383         """
 2384         log.debug("Refreshing modules. Notify=%s", notify)
 2385         self.functions, self.returners, _, self.executors = self._load_modules(
 2386             force_refresh, notify=notify
 2387         )
 2388 
 2389         self.schedule.functions = self.functions
 2390         self.schedule.returners = self.returners
 2391 
 2392     def beacons_refresh(self):
 2393         """
 2394         Refresh the functions and returners.
 2395         """
 2396         if not self.beacons_leader:
 2397             return
 2398         log.debug("Refreshing beacons.")
 2399         self.beacons = salt.beacons.Beacon(self.opts, self.functions)
 2400 
 2401     def matchers_refresh(self):
 2402         """
 2403         Refresh the matchers
 2404         """
 2405         log.debug("Refreshing matchers.")
 2406         self.matchers = salt.loader.matchers(self.opts)
 2407 
 2408     # TODO: only allow one future in flight at a time?
 2409     @salt.ext.tornado.gen.coroutine
 2410     def pillar_refresh(self, force_refresh=False):
 2411         """
 2412         Refresh the pillar
 2413         """
 2414         self.module_refresh(force_refresh)
 2415 
 2416         if self.connected:
 2417             log.debug("Refreshing pillar.")
 2418             async_pillar = salt.pillar.get_async_pillar(
 2419                 self.opts,
 2420                 self.opts["grains"],
 2421                 self.opts["id"],
 2422                 self.opts["saltenv"],
 2423                 pillarenv=self.opts.get("pillarenv"),
 2424             )
 2425             try:
 2426                 self.opts["pillar"] = yield async_pillar.compile_pillar()
 2427             except SaltClientError:
 2428                 # Do not exit if a pillar refresh fails.
 2429                 log.error(
 2430                     "Pillar data could not be refreshed. "
 2431                     "One or more masters may be down!"
 2432                 )
 2433             finally:
 2434                 async_pillar.destroy()
 2435         self.matchers_refresh()
 2436         self.beacons_refresh()
 2437         with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt:
 2438             evt.fire_event(
 2439                 {"complete": True},
 2440                 tag=salt.defaults.events.MINION_PILLAR_REFRESH_COMPLETE,
 2441             )
 2442 
 2443     def manage_schedule(self, tag, data):
 2444         """
 2445         Refresh the functions and returners.
 2446         """
 2447         func = data.get("func", None)
 2448         name = data.get("name", None)
 2449         schedule = data.get("schedule", None)
 2450         where = data.get("where", None)
 2451         persist = data.get("persist", None)
 2452 
 2453         funcs = {
 2454             "delete": ("delete_job", (name, persist)),
 2455             "add": ("add_job", (schedule, persist)),
 2456             "modify": ("modify_job", (name, schedule, persist)),
 2457             "enable": ("enable_schedule", (persist,)),
 2458             "disable": ("disable_schedule", (persist,)),
 2459             "enable_job": ("enable_job", (name, persist)),
 2460             "run_job": ("run_job", (name,)),
 2461             "disable_job": ("disable_job", (name, persist)),
 2462             "postpone_job": ("postpone_job", (name, data)),
 2463             "skip_job": ("skip_job", (name, data)),
 2464             "reload": ("reload", (schedule,)),
 2465             "list": ("list", (where,)),
 2466             "save_schedule": ("save_schedule", ()),
 2467             "get_next_fire_time": ("get_next_fire_time", (name,)),
 2468         }
 2469 
 2470         # Call the appropriate schedule function
 2471         try:
 2472             alias, params = funcs.get(func)
 2473             getattr(self.schedule, alias)(*params)
 2474         except TypeError:
 2475             log.error('Function "%s" is unavailable in salt.utils.scheduler', func)
 2476 
 2477     def manage_beacons(self, tag, data):
 2478         """
 2479         Manage Beacons
 2480         """
 2481         if not self.beacons_leader:
 2482             return
 2483 
 2484         func = data.get("func", None)
 2485         name = data.get("name", None)
 2486         beacon_data = data.get("beacon_data", None)
 2487         include_pillar = data.get("include_pillar", None)
 2488         include_opts = data.get("include_opts", None)
 2489 
 2490         funcs = {
 2491             "add": ("add_beacon", (name, beacon_data)),
 2492             "modify": ("modify_beacon", (name, beacon_data)),
 2493             "delete": ("delete_beacon", (name,)),
 2494             "enable": ("enable_beacons", ()),
 2495             "disable": ("disable_beacons", ()),
 2496             "enable_beacon": ("enable_beacon", (name,)),
 2497             "disable_beacon": ("disable_beacon", (name,)),
 2498             "list": ("list_beacons", (include_opts, include_pillar)),
 2499             "list_available": ("list_available_beacons", ()),
 2500             "validate_beacon": ("validate_beacon", (name, beacon_data)),
 2501             "reset": ("reset", ()),
 2502         }
 2503 
 2504         # Call the appropriate beacon function
 2505         try:
 2506             alias, params = funcs.get(func)
 2507             getattr(self.beacons, alias)(*params)
 2508         except TypeError:
 2509             log.error('Function "%s" is unavailable in salt.utils.beacons', func)
 2510 
 2511     def environ_setenv(self, tag, data):
 2512         """
 2513         Set the salt-minion main process environment according to
 2514         the data contained in the minion event data
 2515         """
 2516         environ = data.get("environ", None)
 2517         if environ is None:
 2518             return False
 2519         false_unsets = data.get("false_unsets", False)
 2520         clear_all = data.get("clear_all", False)
 2521         import salt.modules.environ as mod_environ
 2522 
 2523         return mod_environ.setenv(environ, false_unsets, clear_all)
 2524 
 2525     def _pre_tune(self):
 2526         """
 2527         Set the minion running flag and issue the appropriate warnings if
 2528         the minion cannot be started or is already running
 2529         """
 2530         if self._running is None:
 2531             self._running = True
 2532         elif self._running is False:
 2533             log.error(
 2534                 "This %s was scheduled to stop. Not running %s.tune_in()",
 2535                 self.__class__.__name__,
 2536                 self.__class__.__name__,
 2537             )
 2538             return
 2539         elif self._running is True:
 2540             log.error(
 2541                 "This %s is already running. Not running %s.tune_in()",
 2542                 self.__class__.__name__,
 2543                 self.__class__.__name__,
 2544             )
 2545             return
 2546 
 2547         try:
 2548             log.info(
 2549                 "%s is starting as user '%s'",
 2550                 self.__class__.__name__,
 2551                 salt.utils.user.get_user(),
 2552             )
 2553         except Exception as err:  # pylint: disable=broad-except
 2554             # Only windows is allowed to fail here. See #3189. Log as debug in
 2555             # that case. Else, error.
 2556             log.log(
 2557                 salt.utils.platform.is_windows() and logging.DEBUG or logging.ERROR,
 2558                 "Failed to get the user who is starting %s",
 2559                 self.__class__.__name__,
 2560                 exc_info=err,
 2561             )
 2562 
 2563     def _mine_send(self, tag, data):
 2564         """
 2565         Send mine data to the master
 2566         """
 2567         with salt.transport.client.ReqChannel.factory(self.opts) as channel:
 2568             data["tok"] = self.tok
 2569             try:
 2570                 ret = channel.send(data)
 2571                 return ret
 2572             except SaltReqTimeoutError:
 2573                 log.warning("Unable to send mine data to master.")
 2574                 return None
 2575 
 2576     @salt.ext.tornado.gen.coroutine
 2577     def handle_event(self, package):
 2578         """
 2579         Handle an event from the epull_sock (all local minion events)
 2580         """
 2581         if not self.ready:
 2582             raise salt.ext.tornado.gen.Return()
 2583         tag, data = salt.utils.event.SaltEvent.unpack(package)
 2584 
 2585         if "proxy_target" in data and self.opts.get("metaproxy") == "deltaproxy":
 2586             proxy_target = data["proxy_target"]
 2587             _minion = self.deltaproxy_objs[proxy_target]
 2588         else:
 2589             _minion = self
 2590 
 2591         log.debug("Minion of '%s' is handling event tag '%s'", self.opts["master"], tag)
 2592         if tag.startswith("module_refresh"):
 2593             _minion.module_refresh(
 2594                 force_refresh=data.get("force_refresh", False),
 2595                 notify=data.get("notify", False),
 2596             )
 2597         elif tag.startswith("pillar_refresh"):
 2598             yield _minion.pillar_refresh(force_refresh=data.get("force_refresh", False))
 2599         elif tag.startswith("beacons_refresh"):
 2600             _minion.beacons_refresh()
 2601         elif tag.startswith("matchers_refresh"):
 2602             _minion.matchers_refresh()
 2603         elif tag.startswith("manage_schedule"):
 2604             _minion.manage_schedule(tag, data)
 2605         elif tag.startswith("manage_beacons"):
 2606             _minion.manage_beacons(tag, data)
 2607         elif tag.startswith("grains_refresh"):
 2608             if (
 2609                 data.get("force_refresh", False)
 2610                 or _minion.grains_cache != _minion.opts["grains"]
 2611             ):
 2612                 _minion.pillar_refresh(force_refresh=True)
 2613                 _minion.grains_cache = _minion.opts["grains"]
 2614         elif tag.startswith("environ_setenv"):
 2615             self.environ_setenv(tag, data)
 2616         elif tag.startswith("_minion_mine"):
 2617             self._mine_send(tag, data)
 2618         elif tag.startswith("fire_master"):
 2619             if self.connected:
 2620                 log.debug("Forwarding master event tag=%s", data["tag"])
 2621                 self._fire_master(
 2622                     data["data"],
 2623                     data["tag"],
 2624                     data["events"],
 2625                     data["pretag"],
 2626                     sync=False,
 2627                 )
 2628         elif tag.startswith(master_event(type="disconnected")) or tag.startswith(
 2629             master_event(type="failback")
 2630         ):
 2631             # if the master disconnect event is for a different master, raise an exception
 2632             if (
 2633                 tag.startswith(master_event(type="disconnected"))
 2634                 and data["master"] != self.opts["master"]
 2635             ):
 2636                 # not mine master, ignore
 2637                 raise salt.ext.tornado.gen.Return()
 2638             if tag.startswith(master_event(type="failback")):
 2639                 # if the master failback event is not for the top master, raise an exception
 2640                 if data["master"] != self.opts["master_list"][0]:
 2641                     raise SaltException(
 2642                         "Bad master '{}' when mine failback is '{}'".format(
 2643                             data["master"], self.opts["master"]
 2644                         )
 2645                     )
 2646                 # if the master failback event is for the current master, raise an exception
 2647                 elif data["master"] == self.opts["master"][0]:
 2648                     raise SaltException(
 2649                         "Already connected to '{}'".format(data["master"])
 2650                     )
 2651 
 2652             if self.connected:
 2653                 # we are not connected anymore
 2654                 self.connected = False
 2655                 log.info("Connection to master %s lost", self.opts["master"])
 2656 
 2657                 if self.opts["master_type"] != "failover":
 2658                     # modify the scheduled job to fire on reconnect
 2659                     if self.opts["transport"] != "tcp":
 2660                         schedule = {
 2661                             "function": "status.master",
 2662                             "seconds": self.opts["master_alive_interval"],
 2663                             "jid_include": True,
 2664                             "maxrunning": 1,
 2665                             "return_job": False,
 2666                             "kwargs": {
 2667                                 "master": self.opts["master"],
 2668                                 "connected": False,
 2669                             },
 2670                         }
 2671                         self.schedule.modify_job(
 2672                             name=master_event(type="alive", master=self.opts["master"]),
 2673                             schedule=schedule,
 2674                         )
 2675                 else:
 2676                     # delete the scheduled job to don't interfere with the failover process
 2677                     if self.opts["transport"] != "tcp":
 2678                         self.schedule.delete_job(name=master_event(type="alive"))
 2679 
 2680                     log.info("Trying to tune in to next master from master-list")
 2681 
 2682                     if hasattr(self, "pub_channel"):
 2683                         self.pub_channel.on_recv(None)
 2684                         if hasattr(self.pub_channel, "auth"):
 2685                             self.pub_channel.auth.invalidate()
 2686                         if hasattr(self.pub_channel, "close"):
 2687                             self.pub_channel.close()
 2688                         del self.pub_channel
 2689 
 2690                     # if eval_master finds a new master for us, self.connected
 2691                     # will be True again on successful master authentication
 2692                     try:
 2693                         master, self.pub_channel = yield self.eval_master(
 2694                             opts=self.opts,
 2695                             failed=True,
 2696                             failback=tag.startswith(master_event(type="failback")),
 2697                         )
 2698                     except SaltClientError:
 2699                         pass
 2700 
 2701                     if self.connected:
 2702                         self.opts["master"] = master
 2703 
 2704                         # re-init the subsystems to work with the new master
 2705                         log.info(
 2706                             "Re-initialising subsystems for new master %s",
 2707                             self.opts["master"],
 2708                         )
 2709                         # put the current schedule into the new loaders
 2710                         self.opts["schedule"] = self.schedule.option("schedule")
 2711                         (
 2712                             self.functions,
 2713                             self.returners,
 2714                             self.function_errors,
 2715                             self.executors,
 2716                         ) = self._load_modules()
 2717                         # make the schedule to use the new 'functions' loader
 2718                         self.schedule.functions = self.functions
 2719                         self.pub_channel.on_recv(self._handle_payload)
 2720                         self._fire_master_minion_start()
 2721                         log.info("Minion is ready to receive requests!")
 2722 
 2723                         # update scheduled job to run with the new master addr
 2724                         if self.opts["transport"] != "tcp":
 2725                             schedule = {
 2726                                 "function": "status.master",
 2727                                 "seconds": self.opts["master_alive_interval"],
 2728                                 "jid_include": True,
 2729                                 "maxrunning": 1,
 2730                                 "return_job": False,
 2731                                 "kwargs": {
 2732                                     "master": self.opts["master"],
 2733                                     "connected": True,
 2734                                 },
 2735                             }
 2736                             self.schedule.modify_job(
 2737                                 name=master_event(
 2738                                     type="alive", master=self.opts["master"]
 2739                                 ),
 2740                                 schedule=schedule,
 2741                             )
 2742 
 2743                             if (
 2744                                 self.opts["master_failback"]
 2745                                 and "master_list" in self.opts
 2746                             ):
 2747                                 if self.opts["master"] != self.opts["master_list"][0]:
 2748                                     schedule = {
 2749                                         "function": "status.ping_master",
 2750                                         "seconds": self.opts[
 2751                                             "master_failback_interval"
 2752                                         ],
 2753                                         "jid_include": True,
 2754                                         "maxrunning": 1,
 2755                                         "return_job": False,
 2756                                         "kwargs": {
 2757                                             "master": self.opts["master_list"][0]
 2758                                         },
 2759                                     }
 2760                                     self.schedule.modify_job(
 2761                                         name=master_event(type="failback"),
 2762                                         schedule=schedule,
 2763                                     )
 2764                                 else:
 2765                                     self.schedule.delete_job(
 2766                                         name=master_event(type="failback"), persist=True
 2767                                     )
 2768                     else:
 2769                         self.restart = True
 2770                         self.io_loop.stop()
 2771 
 2772         elif tag.startswith(master_event(type="connected")):
 2773             # handle this event only once. otherwise it will pollute the log
 2774             # also if master type is failover all the reconnection work is done
 2775             # by `disconnected` event handler and this event must never happen,
 2776             # anyway check it to be sure
 2777             if not self.connected and self.opts["master_type"] != "failover":
 2778                 log.info("Connection to master %s re-established", self.opts["master"])
 2779                 self.connected = True
 2780                 # modify the __master_alive job to only fire,
 2781                 # if the connection is lost again
 2782                 if self.opts["transport"] != "tcp":
 2783                     schedule = {
 2784                         "function": "status.master",
 2785                         "seconds": self.opts["master_alive_interval"],
 2786                         "jid_include": True,
 2787                         "maxrunning": 1,
 2788                         "return_job": False,
 2789                         "kwargs": {"master": self.opts["master"], "connected": True},
 2790                     }
 2791 
 2792                     self.schedule.modify_job(
 2793                         name=master_event(type="alive", master=self.opts["master"]),
 2794                         schedule=schedule,
 2795                     )
 2796         elif tag.startswith("__schedule_return"):
 2797             # reporting current connection with master
 2798             if data["schedule"].startswith(master_event(type="alive", master="")):
 2799                 if data["return"]:
 2800                     log.debug(
 2801                         "Connected to master %s",
 2802                         data["schedule"].split(master_event(type="alive", master=""))[
 2803                             1
 2804                         ],
 2805                     )
 2806             self._return_pub(data, ret_cmd="_return", sync=False)
 2807         elif tag.startswith("_salt_error"):
 2808             if self.connected:
 2809                 log.debug("Forwarding salt error event tag=%s", tag)
 2810                 self._fire_master(data, tag, sync=False)
 2811         elif tag.startswith("salt/auth/creds"):
 2812             key = tuple(data["key"])
 2813             log.debug(
 2814                 "Updating auth data for %s: %s -> %s",
 2815                 key,
 2816                 salt.crypt.AsyncAuth.creds_map.get(key),
 2817                 data["creds"],
 2818             )
 2819             salt.crypt.AsyncAuth.creds_map[tuple(data["key"])] = data["creds"]
 2820         elif tag.startswith("__beacons_return"):
 2821             if self.connected:
 2822                 log.debug("Firing beacons to master")
 2823                 self._fire_master(events=data["beacons"])
 2824 
 2825     def cleanup_subprocesses(self):
 2826         """
 2827         Clean up subprocesses and spawned threads.
 2828         """
 2829         # Add an extra fallback in case a forked process leaks through
 2830         multiprocessing.active_children()
 2831         self.subprocess_list.cleanup()
 2832         if self.schedule:
 2833             self.schedule.cleanup_subprocesses()
 2834 
 2835     def _setup_core(self):
 2836         """
 2837         Set up the core minion attributes.
 2838         This is safe to call multiple times.
 2839         """
 2840         if not self.ready:
 2841             # First call. Initialize.
 2842             (
 2843                 self.functions,
 2844                 self.returners,
 2845                 self.function_errors,
 2846                 self.executors,
 2847             ) = self._load_modules()
 2848             self.serial = salt.payload.Serial(self.opts)
 2849             self.mod_opts = self._prep_mod_opts()
 2850             #            self.matcher = Matcher(self.opts, self.functions)
 2851             self.matchers = salt.loader.matchers(self.opts)
 2852             if self.beacons_leader:
 2853                 self.beacons = salt.beacons.Beacon(self.opts, self.functions)
 2854             uid = salt.utils.user.get_uid(user=self.opts.get("user", None))
 2855             self.proc_dir = get_proc_dir(self.opts["cachedir"], uid=uid)
 2856             self.grains_cache = self.opts["grains"]
 2857             self.ready = True
 2858 
 2859     def setup_beacons(self, before_connect=False):
 2860         """
 2861         Set up the beacons.
 2862         This is safe to call multiple times.
 2863         """
 2864         # In multimaster configuration the only one minion shall execute beacons
 2865         if not self.beacons_leader:
 2866             return
 2867 
 2868         self._setup_core()
 2869         loop_interval = self.opts["loop_interval"]
 2870         if "beacons" not in self.periodic_callbacks:
 2871             self.beacons = salt.beacons.Beacon(self.opts, self.functions)
 2872 
 2873             def handle_beacons():
 2874                 # Process Beacons
 2875                 beacons = None
 2876                 try:
 2877                     beacons = self.process_beacons(self.functions)
 2878                 except Exception:  # pylint: disable=broad-except
 2879                     log.critical("The beacon errored: ", exc_info=True)
 2880                 if beacons:
 2881                     event = salt.utils.event.get_event(
 2882                         "minion", opts=self.opts, listen=False
 2883                     )
 2884                     event.fire_event({"beacons": beacons}, "__beacons_return")
 2885                     event.destroy()
 2886 
 2887             if before_connect:
 2888                 # Make sure there is a chance for one iteration to occur before connect
 2889                 handle_beacons()
 2890 
 2891             self.add_periodic_callback("beacons", handle_beacons)
 2892 
 2893     def setup_scheduler(self, before_connect=False):
 2894         """
 2895         Set up the scheduler.
 2896         This is safe to call multiple times.
 2897         """
 2898         self._setup_core()
 2899 
 2900         loop_interval = self.opts["loop_interval"]
 2901 
 2902         if "schedule" not in self.periodic_callbacks:
 2903             if "schedule" not in self.opts:
 2904                 self.opts["schedule"] = {}
 2905             if not hasattr(self, "schedule"):
 2906                 self.schedule = salt.utils.schedule.Schedule(
 2907                     self.opts,
 2908                     self.functions,
 2909                     self.returners,
 2910                     utils=self.utils,
 2911                     cleanup=[master_event(type="alive")],
 2912                 )
 2913 
 2914             try:
 2915                 if self.opts["grains_refresh_every"]:  # In minutes, not seconds!
 2916                     log.debug(
 2917                         "Enabling the grains refresher. Will run every %d minute(s).",
 2918                         self.opts["grains_refresh_every"],
 2919                     )
 2920                     self._refresh_grains_watcher(abs(self.opts["grains_refresh_every"]))
 2921             except Exception as exc:  # pylint: disable=broad-except
 2922                 log.error(
 2923                     "Exception occurred in attempt to initialize grain refresh "
 2924                     "routine during minion tune-in: %s",
 2925                     exc,
 2926                 )
 2927 
 2928             # TODO: actually listen to the return and change period
 2929             def handle_schedule():
 2930                 self.process_schedule(self, loop_interval)
 2931 
 2932             if before_connect:
 2933                 # Make sure there is a chance for one iteration to occur before connect
 2934                 handle_schedule()
 2935 
 2936             self.add_periodic_callback("schedule", handle_schedule)
 2937 
 2938     def add_periodic_callback(self, name, method, interval=1):
 2939         """
 2940         Add a periodic callback to the event loop and call its start method.
 2941         If a callback by the given name exists this method returns False
 2942         """
 2943         if name in self.periodic_callbacks:
 2944             return False
 2945         self.periodic_callbacks[name] = salt.ext.tornado.ioloop.PeriodicCallback(
 2946             method, interval * 1000,
 2947         )
 2948         self.periodic_callbacks[name].start()
 2949         return True
 2950 
 2951     def remove_periodic_callback(self, name):
 2952         """
 2953         Remove a periodic callback.
 2954         If a callback by the given name does not exist this method returns False
 2955         """
 2956         callback = self.periodic_callbacks.pop(name, None)
 2957         if callback is None:
 2958             return False
 2959         callback.stop()
 2960         return True
 2961 
 2962     # Main Minion Tune In
 2963     def tune_in(self, start=True):
 2964         """
 2965         Lock onto the publisher. This is the main event loop for the minion
 2966         :rtype : None
 2967         """
 2968         self._pre_tune()
 2969 
 2970         log.debug("Minion '%s' trying to tune in", self.opts["id"])
 2971 
 2972         if start:
 2973             if self.opts.get("beacons_before_connect", False):
 2974                 self.setup_beacons(before_connect=True)
 2975             if self.opts.get("scheduler_before_connect", False):
 2976                 self.setup_scheduler(before_connect=True)
 2977             self.sync_connect_master()
 2978         if self.connected:
 2979             self._fire_master_minion_start()
 2980             log.info("Minion is ready to receive requests!")
 2981 
 2982         # Make sure to gracefully handle SIGUSR1
 2983         enable_sigusr1_handler()
 2984 
 2985         # Make sure to gracefully handle CTRL_LOGOFF_EVENT
 2986         if HAS_WIN_FUNCTIONS:
 2987             salt.utils.win_functions.enable_ctrl_logoff_handler()
 2988 
 2989         # On first startup execute a state run if configured to do so
 2990         self._state_run()
 2991 
 2992         self.setup_beacons()
 2993         self.setup_scheduler()
 2994         self.add_periodic_callback("cleanup", self.cleanup_subprocesses)
 2995 
 2996         # schedule the stuff that runs every interval
 2997         ping_interval = self.opts.get("ping_interval", 0) * 60
 2998         if ping_interval > 0 and self.connected:
 2999 
 3000             def ping_master():
 3001                 try:
 3002 
 3003                     def ping_timeout_handler(*_):
 3004                         if self.opts.get("auth_safemode", False):
 3005                             log.error(
 3006                                 "** Master Ping failed. Attempting to restart minion**"
 3007                             )
 3008                             delay = self.opts.get("random_reauth_delay", 5)
 3009                             log.info("delaying random_reauth_delay %ss", delay)
 3010                             try:
 3011                                 self.functions["service.restart"](service_name())
 3012                             except KeyError:
 3013                                 # Probably no init system (running in docker?)
 3014                                 log.warning(
 3015                                     "ping_interval reached without response "
 3016                                     "from the master, but service.restart "
 3017                                     "could not be run to restart the minion "
 3018                                     "daemon. ping_interval requires that the "
 3019                                     "minion is running under an init system."
 3020                                 )
 3021 
 3022                     self._fire_master(
 3023                         "ping",
 3024                         "minion_ping",
 3025                         sync=False,
 3026                         timeout_handler=ping_timeout_handler,
 3027                     )
 3028                 except Exception:  # pylint: disable=broad-except
 3029                     log.warning(
 3030                         "Attempt to ping master failed.", exc_on_loglevel=logging.DEBUG
 3031                     )
 3032 
 3033             self.remove_periodic_callback("ping")
 3034             self.add_periodic_callback("ping", ping_master, ping_interval)
 3035 
 3036         # add handler to subscriber
 3037         if hasattr(self, "pub_channel") and self.pub_channel is not None:
 3038             self.pub_channel.on_recv(self._handle_payload)
 3039         elif self.opts.get("master_type") != "disable":
 3040             log.error("No connection to master found. Scheduled jobs will not run.")
 3041 
 3042         if start:
 3043             try:
 3044                 self.io_loop.start()
 3045                 if self.restart:
 3046                     self.destroy()
 3047             except (
 3048                 KeyboardInterrupt,
 3049                 RuntimeError,
 3050             ):  # A RuntimeError can be re-raised by Tornado on shutdown
 3051                 self.destroy()
 3052 
 3053     def _handle_payload(self, payload):
 3054         if payload is not None and payload["enc"] == "aes":
 3055             if self._target_load(payload["load"]):
 3056                 self._handle_decoded_payload(payload["load"])
 3057             elif self.opts["zmq_filtering"]:
 3058                 # In the filtering enabled case, we'd like to know when minion sees something it shouldnt
 3059                 log.trace(
 3060                     "Broadcast message received not for this minion, Load: %s",
 3061                     payload["load"],
 3062                 )
 3063         # If it's not AES, and thus has not been verified, we do nothing.
 3064         # In the future, we could add support for some clearfuncs, but
 3065         # the minion currently has no need.
 3066 
 3067     def _target_load(self, load):
 3068         # Verify that the publication is valid
 3069         if (
 3070             "tgt" not in load
 3071             or "jid" not in load
 3072             or "fun" not in load
 3073             or "arg" not in load
 3074         ):
 3075             return False
 3076         # Verify that the publication applies to this minion
 3077 
 3078         # It's important to note that the master does some pre-processing
 3079         # to determine which minions to send a request to. So for example,
 3080         # a "salt -G 'grain_key:grain_val' test.ping" will invoke some
 3081         # pre-processing on the master and this minion should not see the
 3082         # publication if the master does not determine that it should.
 3083 
 3084         if "tgt_type" in load:
 3085             match_func = self.matchers.get(
 3086                 "{}_match.match".format(load["tgt_type"]), None
 3087             )
 3088             if match_func is None:
 3089                 return False
 3090             if load["tgt_type"] in ("grain", "grain_pcre", "pillar"):
 3091                 delimiter = load.get("delimiter", DEFAULT_TARGET_DELIM)
 3092                 if not match_func(load["tgt"], delimiter=delimiter):
 3093                     return False
 3094             elif not match_func(load["tgt"]):
 3095                 return False
 3096         else:
 3097             if not self.matchers["glob_match.match"](load["tgt"]):
 3098                 return False
 3099 
 3100         return True
 3101 
 3102     def destroy(self):
 3103         """
 3104         Tear down the minion
 3105         """
 3106         if self._running is False:
 3107             return
 3108 
 3109         self._running = False
 3110         if hasattr(self, "schedule"):
 3111             del self.schedule
 3112         if hasattr(self, "pub_channel") and self.pub_channel is not None:
 3113             self.pub_channel.on_recv(None)
 3114             if hasattr(self.pub_channel, "close"):
 3115                 self.pub_channel.close()
 3116             del self.pub_channel
 3117         if hasattr(self, "periodic_callbacks"):
 3118             for cb in self.periodic_callbacks.values():
 3119                 cb.stop()
 3120 
 3121     # pylint: disable=W1701
 3122     def __del__(self):
 3123         self.destroy()
 3124 
 3125     # pylint: enable=W1701
 3126 
 3127 
 3128 class Syndic(Minion):
 3129     """
 3130     Make a Syndic minion, this minion will use the minion keys on the
 3131     master to authenticate with a higher level master.
 3132     """
 3133 
 3134     def __init__(self, opts, **kwargs):
 3135         self._syndic_interface = opts.get("interface")
 3136         self._syndic = True
 3137         # force auth_safemode True because Syndic don't support autorestart
 3138         opts["auth_safemode"] = True
 3139         opts["loop_interval"] = 1
 3140         super().__init__(opts, **kwargs)
 3141         self.mminion = salt.minion.MasterMinion(opts)
 3142         self.jid_forward_cache = set()
 3143         self.jids = {}
 3144         self.raw_events = []
 3145         self.pub_future = None
 3146 
 3147     def _handle_decoded_payload(self, data):
 3148         """
 3149         Override this method if you wish to handle the decoded data
 3150         differently.
 3151         """
 3152         # TODO: even do this??
 3153         data["to"] = int(data.get("to", self.opts["timeout"])) - 1
 3154         # Only forward the command if it didn't originate from ourselves
 3155         if data.get("master_id", 0) != self.opts.get("master_id", 1):
 3156             self.syndic_cmd(data)
 3157 
 3158     def syndic_cmd(self, data):
 3159         """
 3160         Take the now clear load and forward it on to the client cmd
 3161         """
 3162         # Set up default tgt_type
 3163         if "tgt_type" not in data:
 3164             data["tgt_type"] = "glob"
 3165         kwargs = {}
 3166 
 3167         # optionally add a few fields to the publish data
 3168         for field in (
 3169             "master_id",  # which master the job came from
 3170             "user",  # which user ran the job
 3171         ):
 3172             if field in data:
 3173                 kwargs[field] = data[field]
 3174 
 3175         def timeout_handler(*args):
 3176             log.warning("Unable to forward pub data: %s", args[1])
 3177             return True
 3178 
 3179         with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler):
 3180             self.local.pub_async(
 3181                 data["tgt"],
 3182                 data["fun"],
 3183                 data["arg"],
 3184                 data["tgt_type"],
 3185                 data["ret"],
 3186                 data["jid"],
 3187                 data["to"],
 3188                 io_loop=self.io_loop,
 3189                 callback=lambda _: None,
 3190                 **kwargs
 3191             )
 3192 
 3193     def fire_master_syndic_start(self):
 3194         # Send an event to the master that the minion is live
 3195         if self.opts["enable_legacy_startup_events"]:
 3196             # Old style event. Defaults to false in 3001 release.
 3197             self._fire_master(
 3198                 "Syndic {} started at {}".format(self.opts["id"], time.asctime()),
 3199                 "syndic_start",
 3200                 sync=False,
 3201             )
 3202         self._fire_master(
 3203             "Syndic {} started at {}".format(self.opts["id"], time.asctime()),
 3204             tagify([self.opts["id"], "start"], "syndic"),
 3205             sync=False,
 3206         )
 3207 
 3208     # TODO: clean up docs
 3209     def tune_in_no_block(self):
 3210         """
 3211         Executes the tune_in sequence but omits extra logging and the
 3212         management of the event bus assuming that these are handled outside
 3213         the tune_in sequence
 3214         """
 3215         # Instantiate the local client
 3216         self.local = salt.client.get_local_client(
 3217             self.opts["_minion_conf_file"], io_loop=self.io_loop
 3218         )
 3219 
 3220         # add handler to subscriber
 3221         self.pub_channel.on_recv(self._process_cmd_socket)
 3222 
 3223     def _process_cmd_socket(self, payload):
 3224         if payload is not None and payload["enc"] == "aes":
 3225             log.trace("Handling payload")
 3226             self._handle_decoded_payload(payload["load"])
 3227         # If it's not AES, and thus has not been verified, we do nothing.
 3228         # In the future, we could add support for some clearfuncs, but
 3229         # the syndic currently has no need.
 3230 
 3231     @salt.ext.tornado.gen.coroutine
 3232     def reconnect(self):
 3233         if hasattr(self, "pub_channel"):
 3234             self.pub_channel.on_recv(None)
 3235             if hasattr(self.pub_channel, "close"):
 3236                 self.pub_channel.close()
 3237             del self.pub_channel
 3238 
 3239         # if eval_master finds a new master for us, self.connected
 3240         # will be True again on successful master authentication
 3241         master, self.pub_channel = yield self.eval_master(opts=self.opts)
 3242 
 3243         if self.connected:
 3244             self.opts["master"] = master
 3245             self.pub_channel.on_recv(self._process_cmd_socket)
 3246             log.info("Minion is ready to receive requests!")
 3247 
 3248         raise salt.ext.tornado.gen.Return(self)
 3249 
 3250     def destroy(self):
 3251         """
 3252         Tear down the syndic minion
 3253         """
 3254         # We borrowed the local clients poller so give it back before
 3255         # it's destroyed. Reset the local poller reference.
 3256         super().destroy()
 3257         if hasattr(self, "local"):
 3258             del self.local
 3259 
 3260         if hasattr(self, "forward_events"):
 3261             self.forward_events.stop()
 3262 
 3263 
 3264 # TODO: need a way of knowing if the syndic connection is busted
 3265 class SyndicManager(MinionBase):
 3266     """
 3267     Make a MultiMaster syndic minion, this minion will handle relaying jobs and returns from
 3268     all minions connected to it to the list of masters it is connected to.
 3269 
 3270     Modes (controlled by `syndic_mode`:
 3271         sync: This mode will synchronize all events and publishes from higher level masters
 3272         cluster: This mode will only sync job publishes and returns
 3273 
 3274     Note: jobs will be returned best-effort to the requesting master. This also means
 3275     (since we are using zmq) that if a job was fired and the master disconnects
 3276     between the publish and return, that the return will end up in a zmq buffer
 3277     in this Syndic headed to that original master.
 3278 
 3279     In addition, since these classes all seem to use a mix of blocking and non-blocking
 3280     calls (with varying timeouts along the way) this daemon does not handle failure well,
 3281     it will (under most circumstances) stall the daemon for ~15s trying to forward events
 3282     to the down master
 3283     """
 3284 
 3285     # time to connect to upstream master
 3286     SYNDIC_CONNECT_TIMEOUT = 5
 3287     SYNDIC_EVENT_TIMEOUT = 5
 3288 
 3289     def __init__(self, opts, io_loop=None):
 3290         opts["loop_interval"] = 1
 3291         super().__init__(opts)
 3292         self.mminion = salt.minion.MasterMinion(opts)
 3293         # sync (old behavior), cluster (only returns and publishes)
 3294         self.syndic_mode = self.opts.get("syndic_mode", "sync")
 3295         self.syndic_failover = self.opts.get("syndic_failover", "random")
 3296 
 3297         self.auth_wait = self.opts["acceptance_wait_time"]
 3298         self.max_auth_wait = self.opts["acceptance_wait_time_max"]
 3299 
 3300         self._has_master = threading.Event()
 3301         self.jid_forward_cache = set()
 3302 
 3303         if io_loop is None:
 3304             install_zmq()
 3305             self.io_loop = ZMQDefaultLoop.current()
 3306         else:
 3307             self.io_loop = io_loop
 3308 
 3309         # List of events
 3310         self.raw_events = []
 3311         # Dict of rets: {master_id: {event_tag: job_ret, ...}, ...}
 3312         self.job_rets = {}
 3313         # List of delayed job_rets which was unable to send for some reason and will be resend to
 3314         # any available master
 3315         self.delayed = []
 3316         # Active pub futures: {master_id: (future, [job_ret, ...]), ...}
 3317         self.pub_futures = {}
 3318 
 3319     def _spawn_syndics(self):
 3320         """
 3321         Spawn all the coroutines which will sign in the syndics
 3322         """
 3323         self._syndics = OrderedDict()  # mapping of opts['master'] -> syndic
 3324         masters = self.opts["master"]
 3325         if not isinstance(masters, list):
 3326             masters = [masters]
 3327         for master in masters:
 3328             s_opts = copy.copy(self.opts)
 3329             s_opts["master"] = master
 3330             self._syndics[master] = self._connect_syndic(s_opts)
 3331 
 3332     @salt.ext.tornado.gen.coroutine
 3333     def _connect_syndic(self, opts):
 3334         """
 3335         Create a syndic, and asynchronously connect it to a master
 3336         """
 3337         last = 0  # never have we signed in
 3338         auth_wait = opts["acceptance_wait_time"]
 3339         failed = False
 3340         while True:
 3341             log.debug("Syndic attempting to connect to %s", opts["master"])
 3342             try:
 3343                 syndic = Syndic(
 3344                     opts,
 3345                     timeout=self.SYNDIC_CONNECT_TIMEOUT,
 3346                     safe=False,
 3347                     io_loop=self.io_loop,
 3348                 )
 3349                 yield syndic.connect_master(failed=failed)
 3350                 # set up the syndic to handle publishes (specifically not event forwarding)
 3351                 syndic.tune_in_no_block()
 3352 
 3353                 # Send an event to the master that the minion is live
 3354                 syndic.fire_master_syndic_start()
 3355 
 3356                 log.info("Syndic successfully connected to %s", opts["master"])
 3357                 break
 3358             except SaltClientError as exc:
 3359                 failed = True
 3360                 log.error(
 3361                     "Error while bringing up syndic for multi-syndic. Is the "
 3362                     "master at %s responding?",
 3363                     opts["master"],
 3364                 )
 3365                 last = time.time()
 3366                 if auth_wait < self.max_auth_wait:
 3367                     auth_wait += self.auth_wait
 3368                 yield salt.ext.tornado.gen.sleep(auth_wait)  # TODO: log?
 3369             except (KeyboardInterrupt, SystemExit):  # pylint: disable=try-except-raise
 3370                 raise
 3371             except Exception:  # pylint: disable=broad-except
 3372                 failed = True
 3373                 log.critical(
 3374                     "Unexpected error while connecting to %s",
 3375                     opts["master"],
 3376                     exc_info=True,
 3377                 )
 3378 
 3379         raise salt.ext.tornado.gen.Return(syndic)
 3380 
 3381     def _mark_master_dead(self, master):
 3382         """
 3383         Mark a master as dead. This will start the sign-in routine
 3384         """
 3385         # if its connected, mark it dead
 3386         if self._syndics[master].done():
 3387             syndic = self._syndics[master].result()  # pylint: disable=no-member
 3388             self._syndics[master] = syndic.reconnect()
 3389         else:
 3390             # TODO: debug?
 3391             log.info(
 3392                 "Attempting to mark %s as dead, although it is already " "marked dead",
 3393                 master,
 3394             )
 3395 
 3396     def _call_syndic(self, func, args=(), kwargs=None, master_id=None):
 3397         """
 3398         Wrapper to call a given func on a syndic, best effort to get the one you asked for
 3399         """
 3400         if kwargs is None:
 3401             kwargs = {}
 3402         successful = False
 3403         # Call for each master
 3404         for master, syndic_future in self.iter_master_options(master_id):
 3405             if not syndic_future.done() or syndic_future.exception():
 3406                 log.error(
 3407                     "Unable to call %s on %s, that syndic is not connected",
 3408                     func,
 3409                     master,
 3410                 )
 3411                 continue
 3412 
 3413             try:
 3414                 getattr(syndic_future.result(), func)(*args, **kwargs)
 3415                 successful = True
 3416             except SaltClientError:
 3417                 log.error("Unable to call %s on %s, trying another...", func, master)
 3418                 self._mark_master_dead(master)
 3419         if not successful:
 3420             log.critical("Unable to call %s on any masters!", func)
 3421 
 3422     def _return_pub_syndic(self, values, master_id=None):
 3423         """
 3424         Wrapper to call the '_return_pub_multi' a syndic, best effort to get the one you asked for
 3425         """
 3426         func = "_return_pub_multi"
 3427         for master, syndic_future in self.iter_master_options(master_id):
 3428             if not syndic_future.done() or syndic_future.exception():
 3429                 log.error(
 3430                     "Unable to call %s on %s, that syndic is not connected",
 3431                     func,
 3432                     master,
 3433                 )
 3434                 continue
 3435 
 3436             future, data = self.pub_futures.get(master, (None, None))
 3437             if future is not None:
 3438                 if not future.done():
 3439                     if master == master_id:
 3440                         # Targeted master previous send not done yet, call again later
 3441                         return False
 3442                     else:
 3443                         # Fallback master is busy, try the next one
 3444                         continue
 3445                 elif future.exception():
 3446                     # Previous execution on this master returned an error
 3447                     log.error(
 3448                         "Unable to call %s on %s, trying another...", func, master
 3449                     )
 3450                     self._mark_master_dead(master)
 3451                     del self.pub_futures[master]
 3452                     # Add not sent data to the delayed list and try the next master
 3453                     self.delayed.extend(data)
 3454                     continue
 3455             future = getattr(syndic_future.result(), func)(
 3456                 values, "_syndic_return", timeout=self._return_retry_timer(), sync=False
 3457             )
 3458             self.pub_futures[master] = (future, values)
 3459             return True
 3460         # Loop done and didn't exit: wasn't sent, try again later
 3461         return False
 3462 
 3463     def iter_master_options(self, master_id=None):
 3464         """
 3465         Iterate (in order) over your options for master
 3466         """
 3467         masters = list(self._syndics.keys())
 3468         if self.opts["syndic_failover"] == "random":
 3469             shuffle(masters)
 3470         if master_id not in self._syndics:
 3471             master_id = masters.pop(0)
 3472         else:
 3473             masters.remove(master_id)
 3474 
 3475         while True:
 3476             yield master_id, self._syndics[master_id]
 3477             if not masters:
 3478                 break
 3479             master_id = masters.pop(0)
 3480 
 3481     def _reset_event_aggregation(self):
 3482         self.job_rets = {}
 3483         self.raw_events = []
 3484 
 3485     def reconnect_event_bus(self, something):
 3486         future = self.local.event.set_event_handler(self._process_event)
 3487         self.io_loop.add_future(future, self.reconnect_event_bus)
 3488 
 3489     # Syndic Tune In
 3490     def tune_in(self):
 3491         """
 3492         Lock onto the publisher. This is the main event loop for the syndic
 3493         """
 3494         self._spawn_syndics()
 3495         # Instantiate the local client
 3496         self.local = salt.client.get_local_client(
 3497             self.opts["_minion_conf_file"], io_loop=self.io_loop
 3498         )
 3499         self.local.event.subscribe("")
 3500 
 3501         log.debug("SyndicManager '%s' trying to tune in", self.opts["id"])
 3502 
 3503         # register the event sub to the poller
 3504         self.job_rets = {}
 3505         self.raw_events = []
 3506         self._reset_event_aggregation()
 3507         future = self.local.event.set_event_handler(self._process_event)
 3508         self.io_loop.add_future(future, self.reconnect_event_bus)
 3509 
 3510         # forward events every syndic_event_forward_timeout
 3511         self.forward_events = salt.ext.tornado.ioloop.PeriodicCallback(
 3512             self._forward_events, self.opts["syndic_event_forward_timeout"] * 1000,
 3513         )
 3514         self.forward_events.start()
 3515 
 3516         # Make sure to gracefully handle SIGUSR1
 3517         enable_sigusr1_handler()
 3518 
 3519         self.io_loop.start()
 3520 
 3521     def _process_event(self, raw):
 3522         # TODO: cleanup: Move down into event class
 3523         mtag, data = self.local.event.unpack(raw, self.local.event.serial)
 3524         log.trace("Got event %s", mtag)  # pylint: disable=no-member
 3525 
 3526         tag_parts = mtag.split("/")
 3527         if (
 3528             len(tag_parts) >= 4
 3529             and tag_parts[1] == "job"
 3530             and salt.utils.jid.is_jid(tag_parts[2])
 3531             and tag_parts[3] == "ret"
 3532             and "return" in data
 3533         ):
 3534             if "jid" not in data:
 3535                 # Not a job return
 3536                 return
 3537             if self.syndic_mode == "cluster" and data.get(
 3538                 "master_id", 0
 3539             ) == self.opts.get("master_id", 1):
 3540                 log.debug("Return received with matching master_id, not forwarding")
 3541                 return
 3542 
 3543             master = data.get("master_id")
 3544             jdict = self.job_rets.setdefault(master, {}).setdefault(mtag, {})
 3545             if not jdict:
 3546                 jdict["__fun__"] = data.get("fun")
 3547                 jdict["__jid__"] = data["jid"]
 3548                 jdict["__load__"] = {}
 3549                 fstr = "{}.get_load".format(self.opts["master_job_cache"])
 3550                 # Only need to forward each load once. Don't hit the disk
 3551                 # for every minion return!
 3552                 if data["jid"] not in self.jid_forward_cache:
 3553                     jdict["__load__"].update(self.mminion.returners[fstr](data["jid"]))
 3554                     self.jid_forward_cache.add(data["jid"])
 3555                     if (
 3556                         len(self.jid_forward_cache)
 3557                         > self.opts["syndic_jid_forward_cache_hwm"]
 3558                     ):
 3559                         # Pop the oldest jid from the cache
 3560                         tmp = sorted(list(self.jid_forward_cache))
 3561                         tmp.pop(0)
 3562                         self.jid_forward_cache = set(tmp)
 3563             if master is not None:
 3564                 # __'s to make sure it doesn't print out on the master cli
 3565                 jdict["__master_id__"] = master
 3566             ret = {}
 3567             for key in "return", "retcode", "success":
 3568                 if key in data:
 3569                     ret[key] = data[key]
 3570             jdict[data["id"]] = ret
 3571         else:
 3572             # TODO: config to forward these? If so we'll have to keep track of who
 3573             # has seen them
 3574             # if we are the top level masters-- don't forward all the minion events
 3575             if self.syndic_mode == "sync":
 3576                 # Add generic event aggregation here
 3577                 if "retcode" not in data:
 3578                     self.raw_events.append({"data": data, "tag": mtag})
 3579 
 3580     def _forward_events(self):
 3581         log.trace("Forwarding events")  # pylint: disable=no-member
 3582         if self.raw_events:
 3583             events = self.raw_events
 3584             self.raw_events = []
 3585             self._call_syndic(
 3586                 "_fire_master",
 3587                 kwargs={
 3588                     "events": events,
 3589                     "pretag": tagify(self.opts["id"], base="syndic"),
 3590                     "timeout": self._return_retry_timer(),
 3591                     "sync": False,
 3592                 },
 3593             )
 3594         if self.delayed:
 3595             res = self._return_pub_syndic(self.delayed)
 3596             if res:
 3597                 self.delayed = []
 3598         for master in list(self.job_rets.keys()):
 3599             values = list(self.job_rets[master].values())
 3600             res = self._return_pub_syndic(values, master_id=master)
 3601             if res:
 3602                 del self.job_rets[master]
 3603 
 3604 
 3605 class ProxyMinionManager(MinionManager):
 3606     """
 3607     Create the multi-minion interface but for proxy minions
 3608     """
 3609 
 3610     def _create_minion_object(
 3611         self, opts, timeout, safe, io_loop=None, loaded_base_name=None, jid_queue=None
 3612     ):
 3613         """
 3614         Helper function to return the correct type of object
 3615         """
 3616         return ProxyMinion(
 3617             opts,
 3618             timeout,
 3619             safe,
 3620             io_loop=io_loop,
 3621             loaded_base_name=loaded_base_name,
 3622             jid_queue=jid_queue,
 3623         )
 3624 
 3625 
 3626 def _metaproxy_call(opts, fn_name):
 3627     loaded_base_name = "{}.{}".format(opts["id"], salt.loader.LOADED_BASE_NAME)
 3628     metaproxy = salt.loader.metaproxy(opts, loaded_base_name=loaded_base_name)
 3629     try:
 3630         metaproxy_name = opts["metaproxy"]
 3631     except KeyError:
 3632         metaproxy_name = "proxy"
 3633         errmsg = (
 3634             "No metaproxy key found in opts for id "
 3635             + opts["id"]
 3636             + ". "
 3637             + "Defaulting to standard proxy minion"
 3638         )
 3639         log.error(errmsg)
 3640 
 3641     metaproxy_fn = metaproxy_name + "." + fn_name
 3642     return metaproxy[metaproxy_fn]
 3643 
 3644 
 3645 class ProxyMinion(Minion):
 3646     """
 3647     This class instantiates a 'proxy' minion--a minion that does not manipulate
 3648     the host it runs on, but instead manipulates a device that cannot run a minion.
 3649     """
 3650 
 3651     # TODO: better name...
 3652     @salt.ext.tornado.gen.coroutine
 3653     def _post_master_init(self, master):
 3654         """
 3655         Function to finish init after connecting to a master
 3656 
 3657         This is primarily loading modules, pillars, etc. (since they need
 3658         to know which master they connected to)
 3659 
 3660         If this function is changed, please check Minion._post_master_init
 3661         to see if those changes need to be propagated.
 3662 
 3663         ProxyMinions need a significantly different post master setup,
 3664         which is why the differences are not factored out into separate helper
 3665         functions.
 3666         """
 3667         mp_call = _metaproxy_call(self.opts, "post_master_init")
 3668         return mp_call(self, master)
 3669 
 3670     def tune_in(self, start=True):
 3671         """
 3672         Lock onto the publisher. This is the main event loop for the minion
 3673         :rtype : None
 3674         """
 3675         mp_call = _metaproxy_call(self.opts, "tune_in")
 3676         return mp_call(self, start)
 3677 
 3678     def _target_load(self, load):
 3679         """
 3680         Verify that the publication is valid and applies to this minion
 3681         """
 3682         mp_call = _metaproxy_call(self.opts, "target_load")
 3683         return mp_call(self, load)
 3684 
 3685     def _handle_payload(self, payload):
 3686         mp_call = _metaproxy_call(self.opts, "handle_payload")
 3687         return mp_call(self, payload)
 3688 
 3689     @salt.ext.tornado.gen.coroutine
 3690     def _handle_decoded_payload(self, data):
 3691         mp_call = _metaproxy_call(self.opts, "handle_decoded_payload")
 3692         return mp_call(self, data)
 3693 
 3694     @classmethod
 3695     def _target(cls, minion_instance, opts, data, connected):
 3696 
 3697         mp_call = _metaproxy_call(opts, "target")
 3698         return mp_call(cls, minion_instance, opts, data, connected)
 3699 
 3700     @classmethod
 3701     def _thread_return(cls, minion_instance, opts, data):
 3702         mp_call = _metaproxy_call(opts, "thread_return")
 3703         return mp_call(cls, minion_instance, opts, data)
 3704 
 3705     @classmethod
 3706     def _thread_multi_return(cls, minion_instance, opts, data):
 3707         mp_call = _metaproxy_call(opts, "thread_multi_return")
 3708         return mp_call(cls, minion_instance, opts, data)
 3709 
 3710 
 3711 class SProxyMinion(SMinion):
 3712     """
 3713     Create an object that has loaded all of the minion module functions,
 3714     grains, modules, returners etc.  The SProxyMinion allows developers to
 3715     generate all of the salt minion functions and present them with these
 3716     functions for general use.
 3717     """
 3718 
 3719     def gen_modules(self, initial_load=False, context=None):
 3720         """
 3721         Tell the minion to reload the execution modules
 3722 
 3723         CLI Example:
 3724 
 3725         .. code-block:: bash
 3726 
 3727             salt '*' sys.reload_modules
 3728         """
 3729         self.opts["grains"] = salt.loader.grains(self.opts)
 3730         self.opts["pillar"] = salt.pillar.get_pillar(
 3731             self.opts,
 3732             self.opts["grains"],
 3733             self.opts["id"],
 3734             saltenv=self.opts["saltenv"],
 3735             pillarenv=self.opts.get("pillarenv"),
 3736         ).compile_pillar()
 3737 
 3738         if "proxy" not in self.opts["pillar"] and "proxy" not in self.opts:
 3739             errmsg = (
 3740                 'No "proxy" configuration key found in pillar or opts '
 3741                 "dictionaries for id {id}. Check your pillar/options "
 3742                 "configuration and contents. Salt-proxy aborted."
 3743             ).format(id=self.opts["id"])
 3744             log.error(errmsg)
 3745             self._running = False
 3746             raise SaltSystemExit(code=salt.defaults.exitcodes.EX_GENERIC, msg=errmsg)
 3747 
 3748         if "proxy" not in self.opts:
 3749             self.opts["proxy"] = self.opts["pillar"]["proxy"]
 3750 
 3751         # Then load the proxy module
 3752         self.proxy = salt.loader.proxy(self.opts)
 3753 
 3754         self.utils = salt.loader.utils(self.opts, proxy=self.proxy, context=context)
 3755 
 3756         self.functions = salt.loader.minion_mods(
 3757             self.opts, utils=self.utils, notify=False, proxy=self.proxy, context=context
 3758         )
 3759         self.returners = salt.loader.returners(
 3760             self.opts, functions=self.functions, proxy=self.proxy, context=context
 3761         )
 3762         self.matchers = salt.loader.matchers(self.opts)
 3763         self.functions["sys.reload_modules"] = self.gen_modules
 3764         self.executors = salt.loader.executors(
 3765             self.opts, functions=self.functions, proxy=self.proxy, context=context,
 3766         )
 3767 
 3768         fq_proxyname = self.opts["proxy"]["proxytype"]
 3769 
 3770         # we can then sync any proxymodules down from the master
 3771         # we do a sync_all here in case proxy code was installed by
 3772         # SPM or was manually placed in /srv/salt/_modules etc.
 3773         self.functions["saltutil.sync_all"](saltenv=self.opts["saltenv"])
 3774 
 3775         self.functions.pack["__proxy__"] = self.proxy
 3776         self.proxy.pack["__salt__"] = self.functions
 3777         self.proxy.pack["__ret__"] = self.returners
 3778         self.proxy.pack["__pillar__"] = self.opts["pillar"]
 3779 
 3780         # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
 3781         self.utils = salt.loader.utils(self.opts, proxy=self.proxy, context=context)
 3782         self.proxy.pack["__utils__"] = self.utils
 3783 
 3784         # Reload all modules so all dunder variables are injected
 3785         self.proxy.reload_modules()
 3786 
 3787         if (
 3788             "{}.init".format(fq_proxyname) not in self.proxy
 3789             or "{}.shutdown".format(fq_proxyname) not in self.proxy
 3790         ):
 3791             errmsg = (
 3792                 "Proxymodule {} is missing an init() or a shutdown() or both. ".format(
 3793                     fq_proxyname
 3794                 )
 3795                 + "Check your proxymodule.  Salt-proxy aborted."
 3796             )
 3797             log.error(errmsg)
 3798             self._running = False
 3799             raise SaltSystemExit(code=salt.defaults.exitcodes.EX_GENERIC, msg=errmsg)
 3800 
 3801         self.module_executors = self.proxy.get(
 3802             "{}.module_executors".format(fq_proxyname), lambda: []
 3803         )()
 3804         proxy_init_fn = self.proxy[fq_proxyname + ".init"]
 3805         proxy_init_fn(self.opts)
 3806 
 3807         self.opts["grains"] = salt.loader.grains(self.opts, proxy=self.proxy)
 3808 
 3809         #  Sync the grains here so the proxy can communicate them to the master
 3810         self.functions["saltutil.sync_grains"](saltenv="base")
 3811         self.grains_cache = self.opts["grains"]
 3812         self.ready = True