"Fossies" - the Fresh Open Source Software Archive

Member "salt-3002.2/salt/metaproxy/proxy.py" (18 Nov 2020, 34468 Bytes) of package /linux/misc/salt-3002.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "proxy.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3002.1_vs_3002.2.

    1 #
    2 # Proxy minion metaproxy modules
    3 #
    4 
    5 import logging
    6 import os
    7 import signal
    8 import sys
    9 import threading
   10 import traceback
   11 import types
   12 
   13 # pylint: disable=3rd-party-module-not-gated
   14 import salt
   15 import salt.beacons
   16 import salt.cli.daemons
   17 import salt.client
   18 import salt.crypt
   19 import salt.defaults.exitcodes
   20 import salt.engines
   21 import salt.ext.tornado.gen  # pylint: disable=F0401
   22 import salt.ext.tornado.ioloop  # pylint: disable=F0401
   23 import salt.loader
   24 import salt.log.setup
   25 import salt.minion
   26 import salt.payload
   27 import salt.pillar
   28 import salt.serializers.msgpack
   29 import salt.syspaths
   30 import salt.utils.args
   31 import salt.utils.context
   32 import salt.utils.data
   33 import salt.utils.dictupdate
   34 import salt.utils.error
   35 import salt.utils.event
   36 import salt.utils.files
   37 import salt.utils.jid
   38 import salt.utils.minion
   39 import salt.utils.minions
   40 import salt.utils.network
   41 import salt.utils.platform
   42 import salt.utils.process
   43 import salt.utils.schedule
   44 import salt.utils.ssdp
   45 import salt.utils.user
   46 import salt.utils.zeromq
   47 from salt.defaults import DEFAULT_TARGET_DELIM
   48 from salt.exceptions import (
   49     CommandExecutionError,
   50     CommandNotFoundError,
   51     SaltInvocationError,
   52     SaltSystemExit,
   53 )
   54 from salt.ext import six
   55 from salt.ext.six.moves import range
   56 from salt.minion import ProxyMinion
   57 from salt.utils.event import tagify
   58 from salt.utils.process import SignalHandlingProcess, default_signals
   59 
   60 log = logging.getLogger(__name__)
   61 
   62 
   63 def post_master_init(self, master):
   64     log.debug("subclassed LazyLoaded _post_master_init")
   65     if self.connected:
   66         self.opts["master"] = master
   67 
   68         self.opts["pillar"] = yield salt.pillar.get_async_pillar(
   69             self.opts,
   70             self.opts["grains"],
   71             self.opts["id"],
   72             saltenv=self.opts["saltenv"],
   73             pillarenv=self.opts.get("pillarenv"),
   74         ).compile_pillar()
   75 
   76     if "proxy" not in self.opts["pillar"] and "proxy" not in self.opts:
   77         errmsg = (
   78             "No proxy key found in pillar or opts for id "
   79             + self.opts["id"]
   80             + ". "
   81             + "Check your pillar/opts configuration and contents.  Salt-proxy aborted."
   82         )
   83         log.error(errmsg)
   84         self._running = False
   85         raise SaltSystemExit(code=-1, msg=errmsg)
   86 
   87     if "proxy" not in self.opts:
   88         self.opts["proxy"] = self.opts["pillar"]["proxy"]
   89 
   90     if self.opts.get("proxy_merge_pillar_in_opts"):
   91         # Override proxy opts with pillar data when the user required.
   92         self.opts = salt.utils.dictupdate.merge(
   93             self.opts,
   94             self.opts["pillar"],
   95             strategy=self.opts.get("proxy_merge_pillar_in_opts_strategy"),
   96             merge_lists=self.opts.get("proxy_deep_merge_pillar_in_opts", False),
   97         )
   98     elif self.opts.get("proxy_mines_pillar"):
   99         # Even when not required, some details such as mine configuration
  100         # should be merged anyway whenever possible.
  101         if "mine_interval" in self.opts["pillar"]:
  102             self.opts["mine_interval"] = self.opts["pillar"]["mine_interval"]
  103         if "mine_functions" in self.opts["pillar"]:
  104             general_proxy_mines = self.opts.get("mine_functions", [])
  105             specific_proxy_mines = self.opts["pillar"]["mine_functions"]
  106             try:
  107                 self.opts["mine_functions"] = general_proxy_mines + specific_proxy_mines
  108             except TypeError as terr:
  109                 log.error(
  110                     "Unable to merge mine functions from the pillar in the opts, for proxy {}".format(
  111                         self.opts["id"]
  112                     )
  113                 )
  114 
  115     fq_proxyname = self.opts["proxy"]["proxytype"]
  116 
  117     # Need to load the modules so they get all the dunder variables
  118     (
  119         self.functions,
  120         self.returners,
  121         self.function_errors,
  122         self.executors,
  123     ) = self._load_modules()
  124 
  125     # we can then sync any proxymodules down from the master
  126     # we do a sync_all here in case proxy code was installed by
  127     # SPM or was manually placed in /srv/salt/_modules etc.
  128     self.functions["saltutil.sync_all"](saltenv=self.opts["saltenv"])
  129 
  130     # Pull in the utils
  131     self.utils = salt.loader.utils(self.opts)
  132 
  133     # Then load the proxy module
  134     self.proxy = salt.loader.proxy(self.opts, utils=self.utils)
  135 
  136     # And re-load the modules so the __proxy__ variable gets injected
  137     (
  138         self.functions,
  139         self.returners,
  140         self.function_errors,
  141         self.executors,
  142     ) = self._load_modules()
  143     self.functions.pack["__proxy__"] = self.proxy
  144     self.proxy.pack["__salt__"] = self.functions
  145     self.proxy.pack["__ret__"] = self.returners
  146     self.proxy.pack["__pillar__"] = self.opts["pillar"]
  147 
  148     # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
  149     self.utils = salt.loader.utils(self.opts, proxy=self.proxy)
  150     self.proxy.pack["__utils__"] = self.utils
  151 
  152     # Reload all modules so all dunder variables are injected
  153     self.proxy.reload_modules()
  154 
  155     # Start engines here instead of in the Minion superclass __init__
  156     # This is because we need to inject the __proxy__ variable but
  157     # it is not setup until now.
  158     self.io_loop.spawn_callback(
  159         salt.engines.start_engines, self.opts, self.process_manager, proxy=self.proxy
  160     )
  161 
  162     if (
  163         "{}.init".format(fq_proxyname) not in self.proxy
  164         or "{}.shutdown".format(fq_proxyname) not in self.proxy
  165     ):
  166         errmsg = (
  167             "Proxymodule {} is missing an init() or a shutdown() or both. ".format(
  168                 fq_proxyname
  169             )
  170             + "Check your proxymodule.  Salt-proxy aborted."
  171         )
  172         log.error(errmsg)
  173         self._running = False
  174         raise SaltSystemExit(code=-1, msg=errmsg)
  175 
  176     self.module_executors = self.proxy.get(
  177         "{}.module_executors".format(fq_proxyname), lambda: []
  178     )()
  179     proxy_init_fn = self.proxy[fq_proxyname + ".init"]
  180     proxy_init_fn(self.opts)
  181 
  182     self.opts["grains"] = salt.loader.grains(self.opts, proxy=self.proxy)
  183 
  184     self.serial = salt.payload.Serial(self.opts)
  185     self.mod_opts = self._prep_mod_opts()
  186     self.matchers = salt.loader.matchers(self.opts)
  187     self.beacons = salt.beacons.Beacon(self.opts, self.functions)
  188     uid = salt.utils.user.get_uid(user=self.opts.get("user", None))
  189     self.proc_dir = salt.minion.get_proc_dir(self.opts["cachedir"], uid=uid)
  190 
  191     if self.connected and self.opts["pillar"]:
  192         # The pillar has changed due to the connection to the master.
  193         # Reload the functions so that they can use the new pillar data.
  194         (
  195             self.functions,
  196             self.returners,
  197             self.function_errors,
  198             self.executors,
  199         ) = self._load_modules()
  200         if hasattr(self, "schedule"):
  201             self.schedule.functions = self.functions
  202             self.schedule.returners = self.returners
  203 
  204     if not hasattr(self, "schedule"):
  205         self.schedule = salt.utils.schedule.Schedule(
  206             self.opts,
  207             self.functions,
  208             self.returners,
  209             cleanup=[salt.minion.master_event(type="alive")],
  210             proxy=self.proxy,
  211         )
  212 
  213     # add default scheduling jobs to the minions scheduler
  214     if self.opts["mine_enabled"] and "mine.update" in self.functions:
  215         self.schedule.add_job(
  216             {
  217                 "__mine_interval": {
  218                     "function": "mine.update",
  219                     "minutes": self.opts["mine_interval"],
  220                     "jid_include": True,
  221                     "maxrunning": 2,
  222                     "run_on_start": True,
  223                     "return_job": self.opts.get("mine_return_job", False),
  224                 }
  225             },
  226             persist=True,
  227         )
  228         log.info("Added mine.update to scheduler")
  229     else:
  230         self.schedule.delete_job("__mine_interval", persist=True)
  231 
  232     # add master_alive job if enabled
  233     if self.opts["transport"] != "tcp" and self.opts["master_alive_interval"] > 0:
  234         self.schedule.add_job(
  235             {
  236                 salt.minion.master_event(type="alive", master=self.opts["master"]): {
  237                     "function": "status.master",
  238                     "seconds": self.opts["master_alive_interval"],
  239                     "jid_include": True,
  240                     "maxrunning": 1,
  241                     "return_job": False,
  242                     "kwargs": {"master": self.opts["master"], "connected": True},
  243                 }
  244             },
  245             persist=True,
  246         )
  247         if (
  248             self.opts["master_failback"]
  249             and "master_list" in self.opts
  250             and self.opts["master"] != self.opts["master_list"][0]
  251         ):
  252             self.schedule.add_job(
  253                 {
  254                     salt.minion.master_event(type="failback"): {
  255                         "function": "status.ping_master",
  256                         "seconds": self.opts["master_failback_interval"],
  257                         "jid_include": True,
  258                         "maxrunning": 1,
  259                         "return_job": False,
  260                         "kwargs": {"master": self.opts["master_list"][0]},
  261                     }
  262                 },
  263                 persist=True,
  264             )
  265         else:
  266             self.schedule.delete_job(
  267                 salt.minion.master_event(type="failback"), persist=True
  268             )
  269     else:
  270         self.schedule.delete_job(
  271             salt.minion.master_event(type="alive", master=self.opts["master"]),
  272             persist=True,
  273         )
  274         self.schedule.delete_job(
  275             salt.minion.master_event(type="failback"), persist=True
  276         )
  277 
  278     # proxy keepalive
  279     proxy_alive_fn = fq_proxyname + ".alive"
  280     if (
  281         proxy_alive_fn in self.proxy
  282         and "status.proxy_reconnect" in self.functions
  283         and self.opts.get("proxy_keep_alive", True)
  284     ):
  285         # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting
  286         self.schedule.add_job(
  287             {
  288                 "__proxy_keepalive": {
  289                     "function": "status.proxy_reconnect",
  290                     "minutes": self.opts.get(
  291                         "proxy_keep_alive_interval", 1
  292                     ),  # by default, check once per minute
  293                     "jid_include": True,
  294                     "maxrunning": 1,
  295                     "return_job": False,
  296                     "kwargs": {"proxy_name": fq_proxyname},
  297                 }
  298             },
  299             persist=True,
  300         )
  301         self.schedule.enable_schedule()
  302     else:
  303         self.schedule.delete_job("__proxy_keepalive", persist=True)
  304 
  305     #  Sync the grains here so the proxy can communicate them to the master
  306     self.functions["saltutil.sync_grains"](saltenv="base")
  307     self.grains_cache = self.opts["grains"]
  308     self.ready = True
  309 
  310 
  311 def target(cls, minion_instance, opts, data, connected):
  312 
  313     if not minion_instance:
  314         minion_instance = cls(opts)
  315         minion_instance.connected = connected
  316         if not hasattr(minion_instance, "functions"):
  317             # Need to load the modules so they get all the dunder variables
  318             (
  319                 functions,
  320                 returners,
  321                 function_errors,
  322                 executors,
  323             ) = minion_instance._load_modules(grains=opts["grains"])
  324             minion_instance.functions = functions
  325             minion_instance.returners = returners
  326             minion_instance.function_errors = function_errors
  327             minion_instance.executors = executors
  328 
  329             # Pull in the utils
  330             minion_instance.utils = salt.loader.utils(minion_instance.opts)
  331 
  332             # Then load the proxy module
  333             minion_instance.proxy = salt.loader.proxy(
  334                 minion_instance.opts, utils=minion_instance.utils
  335             )
  336 
  337             # And re-load the modules so the __proxy__ variable gets injected
  338             (
  339                 functions,
  340                 returners,
  341                 function_errors,
  342                 executors,
  343             ) = minion_instance._load_modules(grains=opts["grains"])
  344             minion_instance.functions = functions
  345             minion_instance.returners = returners
  346             minion_instance.function_errors = function_errors
  347             minion_instance.executors = executors
  348 
  349             minion_instance.functions.pack["__proxy__"] = minion_instance.proxy
  350             minion_instance.proxy.pack["__salt__"] = minion_instance.functions
  351             minion_instance.proxy.pack["__ret__"] = minion_instance.returners
  352             minion_instance.proxy.pack["__pillar__"] = minion_instance.opts["pillar"]
  353 
  354             # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
  355             minion_instance.utils = salt.loader.utils(
  356                 minion_instance.opts, proxy=minion_instance.proxy
  357             )
  358             minion_instance.proxy.pack["__utils__"] = minion_instance.utils
  359 
  360             # Reload all modules so all dunder variables are injected
  361             minion_instance.proxy.reload_modules()
  362 
  363             fq_proxyname = opts["proxy"]["proxytype"]
  364 
  365             minion_instance.module_executors = minion_instance.proxy.get(
  366                 "{}.module_executors".format(fq_proxyname), lambda: []
  367             )()
  368 
  369             proxy_init_fn = minion_instance.proxy[fq_proxyname + ".init"]
  370             proxy_init_fn(opts)
  371         if not hasattr(minion_instance, "serial"):
  372             minion_instance.serial = salt.payload.Serial(opts)
  373         if not hasattr(minion_instance, "proc_dir"):
  374             uid = salt.utils.user.get_uid(user=opts.get("user", None))
  375             minion_instance.proc_dir = salt.minion.get_proc_dir(
  376                 opts["cachedir"], uid=uid
  377             )
  378 
  379     with salt.ext.tornado.stack_context.StackContext(minion_instance.ctx):
  380         if isinstance(data["fun"], tuple) or isinstance(data["fun"], list):
  381             ProxyMinion._thread_multi_return(minion_instance, opts, data)
  382         else:
  383             ProxyMinion._thread_return(minion_instance, opts, data)
  384 
  385 
  386 def thread_return(cls, minion_instance, opts, data):
  387     """
  388     This method should be used as a threading target, start the actual
  389     minion side execution.
  390     """
  391     fn_ = os.path.join(minion_instance.proc_dir, data["jid"])
  392 
  393     salt.utils.process.appendproctitle(
  394         "{}._thread_return {}".format(cls.__name__, data["jid"])
  395     )
  396 
  397     sdata = {"pid": os.getpid()}
  398     sdata.update(data)
  399     log.info("Starting a new job with PID %s", sdata["pid"])
  400     with salt.utils.files.fopen(fn_, "w+b") as fp_:
  401         fp_.write(minion_instance.serial.dumps(sdata))
  402     ret = {"success": False}
  403     function_name = data["fun"]
  404     executors = (
  405         data.get("module_executors")
  406         or getattr(minion_instance, "module_executors", [])
  407         or opts.get("module_executors", ["direct_call"])
  408     )
  409     allow_missing_funcs = any(
  410         [
  411             minion_instance.executors["{}.allow_missing_func".format(executor)](
  412                 function_name
  413             )
  414             for executor in executors
  415             if "{}.allow_missing_func".format(executor) in minion_instance.executors
  416         ]
  417     )
  418     if function_name in minion_instance.functions or allow_missing_funcs is True:
  419         try:
  420             minion_blackout_violation = False
  421             if minion_instance.connected and minion_instance.opts["pillar"].get(
  422                 "minion_blackout", False
  423             ):
  424                 whitelist = minion_instance.opts["pillar"].get(
  425                     "minion_blackout_whitelist", []
  426                 )
  427                 # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
  428                 if (
  429                     function_name != "saltutil.refresh_pillar"
  430                     and function_name not in whitelist
  431                 ):
  432                     minion_blackout_violation = True
  433             # use minion_blackout_whitelist from grains if it exists
  434             if minion_instance.opts["grains"].get("minion_blackout", False):
  435                 whitelist = minion_instance.opts["grains"].get(
  436                     "minion_blackout_whitelist", []
  437                 )
  438                 if (
  439                     function_name != "saltutil.refresh_pillar"
  440                     and function_name not in whitelist
  441                 ):
  442                     minion_blackout_violation = True
  443             if minion_blackout_violation:
  444                 raise SaltInvocationError(
  445                     "Minion in blackout mode. Set 'minion_blackout' "
  446                     "to False in pillar or grains to resume operations. Only "
  447                     "saltutil.refresh_pillar allowed in blackout mode."
  448                 )
  449 
  450             if function_name in minion_instance.functions:
  451                 func = minion_instance.functions[function_name]
  452                 args, kwargs = salt.minion.load_args_and_kwargs(func, data["arg"], data)
  453             else:
  454                 # only run if function_name is not in minion_instance.functions and allow_missing_funcs is True
  455                 func = function_name
  456                 args, kwargs = data["arg"], data
  457             minion_instance.functions.pack["__context__"]["retcode"] = 0
  458             if isinstance(executors, str):
  459                 executors = [executors]
  460             elif not isinstance(executors, list) or not executors:
  461                 raise SaltInvocationError(
  462                     "Wrong executors specification: {}. String or non-empty list expected".format(
  463                         executors
  464                     )
  465                 )
  466             if opts.get("sudo_user", "") and executors[-1] != "sudo":
  467                 executors[-1] = "sudo"  # replace the last one with sudo
  468             log.trace("Executors list %s", executors)  # pylint: disable=no-member
  469 
  470             for name in executors:
  471                 fname = "{}.execute".format(name)
  472                 if fname not in minion_instance.executors:
  473                     raise SaltInvocationError(
  474                         "Executor '{}' is not available".format(name)
  475                     )
  476                 return_data = minion_instance.executors[fname](
  477                     opts, data, func, args, kwargs
  478                 )
  479                 if return_data is not None:
  480                     break
  481 
  482             if isinstance(return_data, types.GeneratorType):
  483                 ind = 0
  484                 iret = {}
  485                 for single in return_data:
  486                     if isinstance(single, dict) and isinstance(iret, dict):
  487                         iret.update(single)
  488                     else:
  489                         if not iret:
  490                             iret = []
  491                         iret.append(single)
  492                     tag = tagify([data["jid"], "prog", opts["id"], str(ind)], "job")
  493                     event_data = {"return": single}
  494                     minion_instance._fire_master(event_data, tag)
  495                     ind += 1
  496                 ret["return"] = iret
  497             else:
  498                 ret["return"] = return_data
  499 
  500             retcode = minion_instance.functions.pack["__context__"].get(
  501                 "retcode", salt.defaults.exitcodes.EX_OK
  502             )
  503             if retcode == salt.defaults.exitcodes.EX_OK:
  504                 # No nonzero retcode in __context__ dunder. Check if return
  505                 # is a dictionary with a "result" or "success" key.
  506                 try:
  507                     func_result = all(
  508                         return_data.get(x, True) for x in ("result", "success")
  509                     )
  510                 except Exception:  # pylint: disable=broad-except
  511                     # return data is not a dict
  512                     func_result = True
  513                 if not func_result:
  514                     retcode = salt.defaults.exitcodes.EX_GENERIC
  515 
  516             ret["retcode"] = retcode
  517             ret["success"] = retcode == salt.defaults.exitcodes.EX_OK
  518         except CommandNotFoundError as exc:
  519             msg = "Command required for '{}' not found".format(function_name)
  520             log.debug(msg, exc_info=True)
  521             ret["return"] = "{}: {}".format(msg, exc)
  522             ret["out"] = "nested"
  523             ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
  524         except CommandExecutionError as exc:
  525             log.error(
  526                 "A command in '%s' had a problem: %s",
  527                 function_name,
  528                 exc,
  529                 exc_info_on_loglevel=logging.DEBUG,
  530             )
  531             ret["return"] = "ERROR: {}".format(exc)
  532             ret["out"] = "nested"
  533             ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
  534         except SaltInvocationError as exc:
  535             log.error(
  536                 "Problem executing '%s': %s",
  537                 function_name,
  538                 exc,
  539                 exc_info_on_loglevel=logging.DEBUG,
  540             )
  541             ret["return"] = "ERROR executing '{}': {}".format(function_name, exc)
  542             ret["out"] = "nested"
  543             ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
  544         except TypeError as exc:
  545             msg = "Passed invalid arguments to {}: {}\n{}".format(
  546                 function_name, exc, func.__doc__ or ""
  547             )
  548             log.warning(msg, exc_info_on_loglevel=logging.DEBUG)
  549             ret["return"] = msg
  550             ret["out"] = "nested"
  551             ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
  552         except Exception:  # pylint: disable=broad-except
  553             msg = "The minion function caused an exception"
  554             log.warning(msg, exc_info=True)
  555             salt.utils.error.fire_exception(
  556                 salt.exceptions.MinionError(msg), opts, job=data
  557             )
  558             ret["return"] = "{}: {}".format(msg, traceback.format_exc())
  559             ret["out"] = "nested"
  560             ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
  561     else:
  562         docs = minion_instance.functions["sys.doc"]("{}*".format(function_name))
  563         if docs:
  564             docs[function_name] = minion_instance.functions.missing_fun_string(
  565                 function_name
  566             )
  567             ret["return"] = docs
  568         else:
  569             ret["return"] = minion_instance.functions.missing_fun_string(function_name)
  570             mod_name = function_name.split(".")[0]
  571             if mod_name in minion_instance.function_errors:
  572                 ret["return"] += " Possible reasons: '{}'".format(
  573                     minion_instance.function_errors[mod_name]
  574                 )
  575         ret["success"] = False
  576         ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
  577         ret["out"] = "nested"
  578 
  579     ret["jid"] = data["jid"]
  580     ret["fun"] = data["fun"]
  581     ret["fun_args"] = data["arg"]
  582     if "master_id" in data:
  583         ret["master_id"] = data["master_id"]
  584     if "metadata" in data:
  585         if isinstance(data["metadata"], dict):
  586             ret["metadata"] = data["metadata"]
  587         else:
  588             log.warning("The metadata parameter must be a dictionary. Ignoring.")
  589     if minion_instance.connected:
  590         minion_instance._return_pub(ret, timeout=minion_instance._return_retry_timer())
  591 
  592     # Add default returners from minion config
  593     # Should have been coverted to comma-delimited string already
  594     if isinstance(opts.get("return"), str):
  595         if data["ret"]:
  596             data["ret"] = ",".join((data["ret"], opts["return"]))
  597         else:
  598             data["ret"] = opts["return"]
  599 
  600     log.debug("minion return: %s", ret)
  601     # TODO: make a list? Seems odd to split it this late :/
  602     if data["ret"] and isinstance(data["ret"], str):
  603         if "ret_config" in data:
  604             ret["ret_config"] = data["ret_config"]
  605         if "ret_kwargs" in data:
  606             ret["ret_kwargs"] = data["ret_kwargs"]
  607         ret["id"] = opts["id"]
  608         for returner in set(data["ret"].split(",")):
  609             try:
  610                 returner_str = "{}.returner".format(returner)
  611                 if returner_str in minion_instance.returners:
  612                     minion_instance.returners[returner_str](ret)
  613                 else:
  614                     returner_err = minion_instance.returners.missing_fun_string(
  615                         returner_str
  616                     )
  617                     log.error(
  618                         "Returner %s could not be loaded: %s",
  619                         returner_str,
  620                         returner_err,
  621                     )
  622             except Exception as exc:  # pylint: disable=broad-except
  623                 log.exception("The return failed for job %s: %s", data["jid"], exc)
  624 
  625 
  626 def thread_multi_return(cls, minion_instance, opts, data):
  627     """
  628     This method should be used as a threading target, start the actual
  629     minion side execution.
  630     """
  631     fn_ = os.path.join(minion_instance.proc_dir, data["jid"])
  632 
  633     salt.utils.process.appendproctitle(
  634         "{}._thread_multi_return {}".format(cls.__name__, data["jid"])
  635     )
  636 
  637     sdata = {"pid": os.getpid()}
  638     sdata.update(data)
  639     log.info("Starting a new job with PID %s", sdata["pid"])
  640     with salt.utils.files.fopen(fn_, "w+b") as fp_:
  641         fp_.write(minion_instance.serial.dumps(sdata))
  642 
  643     multifunc_ordered = opts.get("multifunc_ordered", False)
  644     num_funcs = len(data["fun"])
  645     if multifunc_ordered:
  646         ret = {
  647             "return": [None] * num_funcs,
  648             "retcode": [None] * num_funcs,
  649             "success": [False] * num_funcs,
  650         }
  651     else:
  652         ret = {"return": {}, "retcode": {}, "success": {}}
  653 
  654     for ind in range(0, num_funcs):
  655         if not multifunc_ordered:
  656             ret["success"][data["fun"][ind]] = False
  657         try:
  658             minion_blackout_violation = False
  659             if minion_instance.connected and minion_instance.opts["pillar"].get(
  660                 "minion_blackout", False
  661             ):
  662                 whitelist = minion_instance.opts["pillar"].get(
  663                     "minion_blackout_whitelist", []
  664                 )
  665                 # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
  666                 if (
  667                     data["fun"][ind] != "saltutil.refresh_pillar"
  668                     and data["fun"][ind] not in whitelist
  669                 ):
  670                     minion_blackout_violation = True
  671             elif minion_instance.opts["grains"].get("minion_blackout", False):
  672                 whitelist = minion_instance.opts["grains"].get(
  673                     "minion_blackout_whitelist", []
  674                 )
  675                 if (
  676                     data["fun"][ind] != "saltutil.refresh_pillar"
  677                     and data["fun"][ind] not in whitelist
  678                 ):
  679                     minion_blackout_violation = True
  680             if minion_blackout_violation:
  681                 raise SaltInvocationError(
  682                     "Minion in blackout mode. Set 'minion_blackout' "
  683                     "to False in pillar or grains to resume operations. Only "
  684                     "saltutil.refresh_pillar allowed in blackout mode."
  685                 )
  686 
  687             func = minion_instance.functions[data["fun"][ind]]
  688 
  689             args, kwargs = salt.minion.load_args_and_kwargs(
  690                 func, data["arg"][ind], data
  691             )
  692             minion_instance.functions.pack["__context__"]["retcode"] = 0
  693             key = ind if multifunc_ordered else data["fun"][ind]
  694             ret["return"][key] = func(*args, **kwargs)
  695             retcode = minion_instance.functions.pack["__context__"].get("retcode", 0)
  696             if retcode == 0:
  697                 # No nonzero retcode in __context__ dunder. Check if return
  698                 # is a dictionary with a "result" or "success" key.
  699                 try:
  700                     func_result = all(
  701                         ret["return"][key].get(x, True) for x in ("result", "success")
  702                     )
  703                 except Exception:  # pylint: disable=broad-except
  704                     # return data is not a dict
  705                     func_result = True
  706                 if not func_result:
  707                     retcode = 1
  708 
  709             ret["retcode"][key] = retcode
  710             ret["success"][key] = retcode == 0
  711         except Exception as exc:  # pylint: disable=broad-except
  712             trb = traceback.format_exc()
  713             log.warning("The minion function caused an exception: %s", exc)
  714             if multifunc_ordered:
  715                 ret["return"][ind] = trb
  716             else:
  717                 ret["return"][data["fun"][ind]] = trb
  718         ret["jid"] = data["jid"]
  719         ret["fun"] = data["fun"]
  720         ret["fun_args"] = data["arg"]
  721     if "metadata" in data:
  722         ret["metadata"] = data["metadata"]
  723     if minion_instance.connected:
  724         minion_instance._return_pub(ret, timeout=minion_instance._return_retry_timer())
  725     if data["ret"]:
  726         if "ret_config" in data:
  727             ret["ret_config"] = data["ret_config"]
  728         if "ret_kwargs" in data:
  729             ret["ret_kwargs"] = data["ret_kwargs"]
  730         for returner in set(data["ret"].split(",")):
  731             ret["id"] = opts["id"]
  732             try:
  733                 minion_instance.returners["{}.returner".format(returner)](ret)
  734             except Exception as exc:  # pylint: disable=broad-except
  735                 log.error("The return failed for job %s: %s", data["jid"], exc)
  736 
  737 
  738 def handle_payload(self, payload):
  739     if payload is not None and payload["enc"] == "aes":
  740         if self._target_load(payload["load"]):
  741 
  742             self._handle_decoded_payload(payload["load"])
  743         elif self.opts["zmq_filtering"]:
  744             # In the filtering enabled case, we'd like to know when minion sees something it shouldnt
  745             log.trace(
  746                 "Broadcast message received not for this minion, Load: %s",
  747                 payload["load"],
  748             )
  749     # If it's not AES, and thus has not been verified, we do nothing.
  750     # In the future, we could add support for some clearfuncs, but
  751     # the minion currently has no need.
  752 
  753 
  754 def handle_decoded_payload(self, data):
  755     """
  756     Override this method if you wish to handle the decoded data
  757     differently.
  758     """
  759     # Ensure payload is unicode. Disregard failure to decode binary blobs.
  760     if six.PY2:
  761         data = salt.utils.data.decode(data, keep=True)
  762     if "user" in data:
  763         log.info(
  764             "User %s Executing command %s with jid %s",
  765             data["user"],
  766             data["fun"],
  767             data["jid"],
  768         )
  769     else:
  770         log.info("Executing command %s with jid %s", data["fun"], data["jid"])
  771     log.debug("Command details %s", data)
  772 
  773     # Don't duplicate jobs
  774     log.trace("Started JIDs: %s", self.jid_queue)
  775     if self.jid_queue is not None:
  776         if data["jid"] in self.jid_queue:
  777             return
  778         else:
  779             self.jid_queue.append(data["jid"])
  780             if len(self.jid_queue) > self.opts["minion_jid_queue_hwm"]:
  781                 self.jid_queue.pop(0)
  782 
  783     if isinstance(data["fun"], str):
  784         if data["fun"] == "sys.reload_modules":
  785             (
  786                 self.functions,
  787                 self.returners,
  788                 self.function_errors,
  789                 self.executors,
  790             ) = self._load_modules()
  791             self.schedule.functions = self.functions
  792             self.schedule.returners = self.returners
  793 
  794     process_count_max = self.opts.get("process_count_max")
  795     if process_count_max > 0:
  796         process_count = len(salt.utils.minion.running(self.opts))
  797         while process_count >= process_count_max:
  798             log.warning(
  799                 "Maximum number of processes reached while executing jid {}, waiting...".format(
  800                     data["jid"]
  801                 )
  802             )
  803             yield salt.ext.tornado.gen.sleep(10)
  804             process_count = len(salt.utils.minion.running(self.opts))
  805 
  806     # We stash an instance references to allow for the socket
  807     # communication in Windows. You can't pickle functions, and thus
  808     # python needs to be able to reconstruct the reference on the other
  809     # side.
  810     instance = self
  811     multiprocessing_enabled = self.opts.get("multiprocessing", True)
  812     if multiprocessing_enabled:
  813         if sys.platform.startswith("win"):
  814             # let python reconstruct the minion on the other side if we're
  815             # running on windows
  816             instance = None
  817         with default_signals(signal.SIGINT, signal.SIGTERM):
  818             process = SignalHandlingProcess(
  819                 target=self._target,
  820                 name="ProcessPayload",
  821                 args=(instance, self.opts, data, self.connected),
  822             )
  823     else:
  824         process = threading.Thread(
  825             target=self._target,
  826             args=(instance, self.opts, data, self.connected),
  827             name=data["jid"],
  828         )
  829 
  830     if multiprocessing_enabled:
  831         with default_signals(signal.SIGINT, signal.SIGTERM):
  832             # Reset current signals before starting the process in
  833             # order not to inherit the current signal handlers
  834             process.start()
  835     else:
  836         process.start()
  837     process.name = "{}-Job-{}".format(process.name, data["jid"])
  838     self.subprocess_list.add(process)
  839 
  840 
  841 def target_load(self, load):
  842     # Verify that the publication is valid
  843     if "tgt" not in load or "jid" not in load or "fun" not in load or "arg" not in load:
  844         return False
  845     # Verify that the publication applies to this minion
  846 
  847     # It's important to note that the master does some pre-processing
  848     # to determine which minions to send a request to. So for example,
  849     # a "salt -G 'grain_key:grain_val' test.ping" will invoke some
  850     # pre-processing on the master and this minion should not see the
  851     # publication if the master does not determine that it should.
  852     if "tgt_type" in load:
  853         match_func = self.matchers.get("{}_match.match".format(load["tgt_type"]), None)
  854         if match_func is None:
  855             return False
  856         if load["tgt_type"] in ("grain", "grain_pcre", "pillar"):
  857             delimiter = load.get("delimiter", DEFAULT_TARGET_DELIM)
  858             if not match_func(load["tgt"], delimiter=delimiter):
  859                 return False
  860         elif not match_func(load["tgt"]):
  861             return False
  862     else:
  863         if not self.matchers["glob_match.match"](load["tgt"]):
  864             return False
  865 
  866     return True
  867 
  868 
  869 # Main Minion Tune In
  870 def tune_in(self, start=True):
  871     """
  872     Lock onto the publisher. This is the main event loop for the minion
  873     :rtype : None
  874     """
  875     super(ProxyMinion, self).tune_in(start=start)