"Fossies" - the Fresh Open Source Software Archive

Member "salt-3002.2/salt/client/__init__.py" (18 Nov 2020, 77650 Bytes) of package /linux/misc/salt-3002.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "__init__.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3002.1_vs_3002.2.

    1 """
    2 The client module is used to create a client connection to the publisher
    3 The data structure needs to be:
    4     {'enc': 'clear',
    5      'load': {'fun': '<mod.callable>',
    6               'arg':, ('arg1', 'arg2', ...),
    7               'tgt': '<glob or id>',
    8               'key': '<read in the key file>'}
    9 """
   10 
   11 
   12 # pylint: disable=import-error
   13 
   14 # Try to import range from https://github.com/ytoolshed/range
   15 #
   16 
   17 import logging
   18 
   19 # The components here are simple, and they need to be and stay simple, we
   20 # want a client to have 3 external concerns, and maybe a forth configurable
   21 # option.
   22 # The concerns are:
   23 # 1. Who executes the command?
   24 # 2. What is the function being run?
   25 # 3. What arguments need to be passed to the function?
   26 # 4. How long do we wait for all of the replies?
   27 import os
   28 import random
   29 import sys
   30 import time
   31 from datetime import datetime
   32 
   33 import salt.cache
   34 import salt.config
   35 import salt.defaults.exitcodes
   36 
   37 # Import tornado
   38 import salt.ext.tornado.gen  # pylint: disable=F0401
   39 import salt.loader
   40 import salt.payload
   41 import salt.syspaths as syspaths
   42 import salt.transport.client
   43 import salt.utils.args
   44 import salt.utils.event
   45 import salt.utils.files
   46 import salt.utils.jid
   47 import salt.utils.minions
   48 import salt.utils.platform
   49 import salt.utils.stringutils
   50 import salt.utils.user
   51 import salt.utils.verify
   52 import salt.utils.zeromq
   53 from salt.exceptions import (
   54     AuthenticationError,
   55     AuthorizationError,
   56     EauthAuthenticationError,
   57     PublishError,
   58     SaltClientError,
   59     SaltInvocationError,
   60     SaltReqTimeoutError,
   61 )
   62 from salt.ext import six
   63 
   64 HAS_RANGE = False
   65 try:
   66     import seco.range
   67 
   68     HAS_RANGE = True
   69 except ImportError:
   70     pass
   71 # pylint: enable=import-error
   72 
   73 
   74 log = logging.getLogger(__name__)
   75 
   76 
   77 def get_local_client(
   78     c_path=os.path.join(syspaths.CONFIG_DIR, "master"),
   79     mopts=None,
   80     skip_perm_errors=False,
   81     io_loop=None,
   82     auto_reconnect=False,
   83 ):
   84     """
   85     .. versionadded:: 2014.7.0
   86 
   87     Read in the config and return the correct LocalClient object based on
   88     the configured transport
   89 
   90     :param IOLoop io_loop: io_loop used for events.
   91                            Pass in an io_loop if you want asynchronous
   92                            operation for obtaining events. Eg use of
   93                            set_event_handler() API. Otherwise, operation
   94                            will be synchronous.
   95     """
   96     if mopts:
   97         opts = mopts
   98     else:
   99         # Late import to prevent circular import
  100         import salt.config
  101 
  102         opts = salt.config.client_config(c_path)
  103 
  104     # TODO: AIO core is separate from transport
  105     return LocalClient(
  106         mopts=opts,
  107         skip_perm_errors=skip_perm_errors,
  108         io_loop=io_loop,
  109         auto_reconnect=auto_reconnect,
  110     )
  111 
  112 
  113 class LocalClient:
  114     """
  115     The interface used by the :command:`salt` CLI tool on the Salt Master
  116 
  117     ``LocalClient`` is used to send a command to Salt minions to execute
  118     :ref:`execution modules <all-salt.modules>` and return the results to the
  119     Salt Master.
  120 
  121     Importing and using ``LocalClient`` must be done on the same machine as the
  122     Salt Master and it must be done using the same user that the Salt Master is
  123     running as. (Unless :conf_master:`external_auth` is configured and
  124     authentication credentials are included in the execution).
  125 
  126     .. note::
  127         The LocalClient uses a Tornado IOLoop, this can create issues when
  128         using the LocalClient inside an existing IOLoop. If creating the
  129         LocalClient in partnership with another IOLoop either create the
  130         IOLoop before creating the LocalClient, or when creating the IOLoop
  131         use ioloop.current() which will return the ioloop created by
  132         LocalClient.
  133 
  134     .. code-block:: python
  135 
  136         import salt.client
  137 
  138         local = salt.client.LocalClient()
  139         local.cmd('*', 'test.fib', [10])
  140     """
  141 
  142     def __init__(
  143         self,
  144         c_path=os.path.join(syspaths.CONFIG_DIR, "master"),
  145         mopts=None,
  146         skip_perm_errors=False,
  147         io_loop=None,
  148         keep_loop=False,
  149         auto_reconnect=False,
  150     ):
  151         """
  152         :param IOLoop io_loop: io_loop used for events.
  153                                Pass in an io_loop if you want asynchronous
  154                                operation for obtaining events. Eg use of
  155                                set_event_handler() API. Otherwise,
  156                                operation will be synchronous.
  157         """
  158         if mopts:
  159             self.opts = mopts
  160         else:
  161             if os.path.isdir(c_path):
  162                 log.warning(
  163                     "%s expects a file path not a directory path(%s) to "
  164                     "its 'c_path' keyword argument",
  165                     self.__class__.__name__,
  166                     c_path,
  167                 )
  168             self.opts = salt.config.client_config(c_path)
  169         self.serial = salt.payload.Serial(self.opts)
  170         self.salt_user = salt.utils.user.get_specific_user()
  171         self.skip_perm_errors = skip_perm_errors
  172         self.key = self.__read_master_key()
  173         self.auto_reconnect = auto_reconnect
  174         self.event = salt.utils.event.get_event(
  175             "master",
  176             self.opts["sock_dir"],
  177             self.opts["transport"],
  178             opts=self.opts,
  179             listen=False,
  180             io_loop=io_loop,
  181             keep_loop=keep_loop,
  182         )
  183         self.utils = salt.loader.utils(self.opts)
  184         self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
  185         self.returners = salt.loader.returners(self.opts, self.functions)
  186 
  187     def __read_master_key(self):
  188         """
  189         Read in the rotating master authentication key
  190         """
  191         key_user = self.salt_user
  192         if key_user == "root":
  193             if self.opts.get("user", "root") != "root":
  194                 key_user = self.opts.get("user", "root")
  195         if key_user.startswith("sudo_"):
  196             key_user = self.opts.get("user", "root")
  197         if salt.utils.platform.is_windows():
  198             # The username may contain '\' if it is in Windows
  199             # 'DOMAIN\username' format. Fix this for the keyfile path.
  200             key_user = key_user.replace("\\", "_")
  201         keyfile = os.path.join(self.opts["cachedir"], ".{}_key".format(key_user))
  202         try:
  203             # Make sure all key parent directories are accessible
  204             salt.utils.verify.check_path_traversal(
  205                 self.opts["cachedir"], key_user, self.skip_perm_errors
  206             )
  207             with salt.utils.files.fopen(keyfile, "r") as key:
  208                 return salt.utils.stringutils.to_unicode(key.read())
  209         except (OSError, SaltClientError):
  210             # Fall back to eauth
  211             return ""
  212 
  213     def _convert_range_to_list(self, tgt):
  214         """
  215         convert a seco.range range into a list target
  216         """
  217         range_ = seco.range.Range(self.opts["range_server"])
  218         try:
  219             return range_.expand(tgt)
  220         except seco.range.RangeException as err:
  221             print("Range server exception: {}".format(err))
  222             return []
  223 
  224     def _get_timeout(self, timeout):
  225         """
  226         Return the timeout to use
  227         """
  228         if timeout is None:
  229             return self.opts["timeout"]
  230         if isinstance(timeout, int):
  231             return timeout
  232         if isinstance(timeout, str):
  233             try:
  234                 return int(timeout)
  235             except ValueError:
  236                 return self.opts["timeout"]
  237         # Looks like the timeout is invalid, use config
  238         return self.opts["timeout"]
  239 
  240     def gather_job_info(self, jid, tgt, tgt_type, listen=True, **kwargs):
  241         """
  242         Return the information about a given job
  243         """
  244         log.debug("Checking whether jid %s is still running", jid)
  245         timeout = int(kwargs.get("gather_job_timeout", self.opts["gather_job_timeout"]))
  246 
  247         pub_data = self.run_job(
  248             tgt,
  249             "saltutil.find_job",
  250             arg=[jid],
  251             tgt_type=tgt_type,
  252             timeout=timeout,
  253             listen=listen,
  254             **kwargs
  255         )
  256 
  257         if "jid" in pub_data:
  258             self.event.subscribe(pub_data["jid"])
  259 
  260         return pub_data
  261 
  262     def _check_pub_data(self, pub_data, listen=True):
  263         """
  264         Common checks on the pub_data data structure returned from running pub
  265         """
  266         if pub_data == "":
  267             # Failed to authenticate, this could be a bunch of things
  268             raise EauthAuthenticationError(
  269                 "Failed to authenticate! This is most likely because this "
  270                 "user is not permitted to execute commands, but there is a "
  271                 "small possibility that a disk error occurred (check "
  272                 "disk/inode usage)."
  273             )
  274 
  275         # Failed to connect to the master and send the pub
  276         if "error" in pub_data:
  277             print(pub_data["error"])
  278             log.debug("_check_pub_data() error: %s", pub_data["error"])
  279             return {}
  280         elif "jid" not in pub_data:
  281             return {}
  282         if pub_data["jid"] == "0":
  283             print("Failed to connect to the Master, " "is the Salt Master running?")
  284             return {}
  285 
  286         # If we order masters (via a syndic), don't short circuit if no minions
  287         # are found
  288         if not self.opts.get("order_masters"):
  289             # Check for no minions
  290             if not pub_data["minions"]:
  291                 print(
  292                     "No minions matched the target. "
  293                     "No command was sent, no jid was assigned."
  294                 )
  295                 return {}
  296 
  297         # don't install event subscription listeners when the request is asynchronous
  298         # and doesn't care. this is important as it will create event leaks otherwise
  299         if not listen:
  300             return pub_data
  301 
  302         if self.opts.get("order_masters"):
  303             self.event.subscribe("syndic/.*/{}".format(pub_data["jid"]), "regex")
  304 
  305         self.event.subscribe("salt/job/{}".format(pub_data["jid"]))
  306 
  307         return pub_data
  308 
  309     def run_job(
  310         self,
  311         tgt,
  312         fun,
  313         arg=(),
  314         tgt_type="glob",
  315         ret="",
  316         timeout=None,
  317         jid="",
  318         kwarg=None,
  319         listen=False,
  320         **kwargs
  321     ):
  322         """
  323         Asynchronously send a command to connected minions
  324 
  325         Prep the job directory and publish a command to any targeted minions.
  326 
  327         :return: A dictionary of (validated) ``pub_data`` or an empty
  328             dictionary on failure. The ``pub_data`` contains the job ID and a
  329             list of all minions that are expected to return data.
  330 
  331         .. code-block:: python
  332 
  333             >>> local.run_job('*', 'test.sleep', [300])
  334             {'jid': '20131219215650131543', 'minions': ['jerry']}
  335         """
  336         arg = salt.utils.args.condition_input(arg, kwarg)
  337 
  338         try:
  339             pub_data = self.pub(
  340                 tgt,
  341                 fun,
  342                 arg,
  343                 tgt_type,
  344                 ret,
  345                 jid=jid,
  346                 timeout=self._get_timeout(timeout),
  347                 listen=listen,
  348                 **kwargs
  349             )
  350         except SaltClientError:
  351             # Re-raise error with specific message
  352             raise SaltClientError(
  353                 "The salt master could not be contacted. Is master running?"
  354             )
  355         except AuthenticationError as err:
  356             raise
  357         except AuthorizationError as err:
  358             raise
  359         except Exception as general_exception:  # pylint: disable=broad-except
  360             # Convert to generic client error and pass along message
  361             raise SaltClientError(general_exception)
  362 
  363         return self._check_pub_data(pub_data, listen=listen)
  364 
  365     def gather_minions(self, tgt, expr_form):
  366         _res = salt.utils.minions.CkMinions(self.opts).check_minions(
  367             tgt, tgt_type=expr_form
  368         )
  369         return _res["minions"]
  370 
  371     @salt.ext.tornado.gen.coroutine
  372     def run_job_async(
  373         self,
  374         tgt,
  375         fun,
  376         arg=(),
  377         tgt_type="glob",
  378         ret="",
  379         timeout=None,
  380         jid="",
  381         kwarg=None,
  382         listen=True,
  383         io_loop=None,
  384         **kwargs
  385     ):
  386         """
  387         Asynchronously send a command to connected minions
  388 
  389         Prep the job directory and publish a command to any targeted minions.
  390 
  391         :return: A dictionary of (validated) ``pub_data`` or an empty
  392             dictionary on failure. The ``pub_data`` contains the job ID and a
  393             list of all minions that are expected to return data.
  394 
  395         .. code-block:: python
  396 
  397             >>> local.run_job_async('*', 'test.sleep', [300])
  398             {'jid': '20131219215650131543', 'minions': ['jerry']}
  399         """
  400         arg = salt.utils.args.condition_input(arg, kwarg)
  401 
  402         try:
  403             pub_data = yield self.pub_async(
  404                 tgt,
  405                 fun,
  406                 arg,
  407                 tgt_type,
  408                 ret,
  409                 jid=jid,
  410                 timeout=self._get_timeout(timeout),
  411                 io_loop=io_loop,
  412                 listen=listen,
  413                 **kwargs
  414             )
  415         except SaltClientError:
  416             # Re-raise error with specific message
  417             raise SaltClientError(
  418                 "The salt master could not be contacted. Is master running?"
  419             )
  420         except AuthenticationError as err:
  421             raise AuthenticationError(err)
  422         except AuthorizationError as err:
  423             raise AuthorizationError(err)
  424         except Exception as general_exception:  # pylint: disable=broad-except
  425             # Convert to generic client error and pass along message
  426             raise SaltClientError(general_exception)
  427 
  428         raise salt.ext.tornado.gen.Return(self._check_pub_data(pub_data, listen=listen))
  429 
  430     def cmd_async(
  431         self, tgt, fun, arg=(), tgt_type="glob", ret="", jid="", kwarg=None, **kwargs
  432     ):
  433         """
  434         Asynchronously send a command to connected minions
  435 
  436         The function signature is the same as :py:meth:`cmd` with the
  437         following exceptions.
  438 
  439         :returns: A job ID or 0 on failure.
  440 
  441         .. code-block:: python
  442 
  443             >>> local.cmd_async('*', 'test.sleep', [300])
  444             '20131219215921857715'
  445         """
  446         pub_data = self.run_job(
  447             tgt, fun, arg, tgt_type, ret, jid=jid, kwarg=kwarg, listen=False, **kwargs
  448         )
  449         try:
  450             return pub_data["jid"]
  451         except KeyError:
  452             return 0
  453 
  454     def cmd_subset(
  455         self,
  456         tgt,
  457         fun,
  458         arg=(),
  459         tgt_type="glob",
  460         ret="",
  461         kwarg=None,
  462         subset=3,
  463         cli=False,
  464         progress=False,
  465         full_return=False,
  466         **kwargs
  467     ):
  468         """
  469         Execute a command on a random subset of the targeted systems
  470 
  471         The function signature is the same as :py:meth:`cmd` with the
  472         following exceptions.
  473 
  474         :param subset: The number of systems to execute on
  475         :param cli: When this is set to True, a generator is returned,
  476                     otherwise a dictionary of the minion returns is returned
  477 
  478         .. code-block:: python
  479 
  480             >>> SLC.cmd_subset('*', 'test.ping', subset=1)
  481             {'jerry': True}
  482         """
  483         minion_ret = self.cmd(tgt, "sys.list_functions", tgt_type=tgt_type, **kwargs)
  484         minions = list(minion_ret)
  485         random.shuffle(minions)
  486         f_tgt = []
  487         for minion in minions:
  488             if fun in minion_ret[minion]:
  489                 f_tgt.append(minion)
  490             if len(f_tgt) >= subset:
  491                 break
  492         func = self.cmd
  493         if cli:
  494             func = self.cmd_cli
  495         return func(
  496             f_tgt,
  497             fun,
  498             arg,
  499             tgt_type="list",
  500             ret=ret,
  501             kwarg=kwarg,
  502             progress=progress,
  503             full_return=full_return,
  504             **kwargs
  505         )
  506 
  507     def cmd_batch(
  508         self,
  509         tgt,
  510         fun,
  511         arg=(),
  512         tgt_type="glob",
  513         ret="",
  514         kwarg=None,
  515         batch="10%",
  516         **kwargs
  517     ):
  518         """
  519         Iteratively execute a command on subsets of minions at a time
  520 
  521         The function signature is the same as :py:meth:`cmd` with the
  522         following exceptions.
  523 
  524         :param batch: The batch identifier of systems to execute on
  525 
  526         :returns: A generator of minion returns
  527 
  528         .. code-block:: python
  529 
  530             >>> returns = local.cmd_batch('*', 'state.highstate', batch='10%')
  531             >>> for ret in returns:
  532             ...     print(ret)
  533             {'jerry': {...}}
  534             {'dave': {...}}
  535             {'stewart': {...}}
  536         """
  537         # We need to re-import salt.utils.args here
  538         # even though it has already been imported.
  539         # when cmd_batch is called via the NetAPI
  540         # the module is unavailable.
  541         import salt.utils.args
  542 
  543         # Late import - not used anywhere else in this file
  544         import salt.cli.batch
  545 
  546         arg = salt.utils.args.condition_input(arg, kwarg)
  547         opts = {
  548             "tgt": tgt,
  549             "fun": fun,
  550             "arg": arg,
  551             "tgt_type": tgt_type,
  552             "ret": ret,
  553             "batch": batch,
  554             "failhard": kwargs.get("failhard", self.opts.get("failhard", False)),
  555             "raw": kwargs.get("raw", False),
  556         }
  557 
  558         if "timeout" in kwargs:
  559             opts["timeout"] = kwargs["timeout"]
  560         if "gather_job_timeout" in kwargs:
  561             opts["gather_job_timeout"] = kwargs["gather_job_timeout"]
  562         if "batch_wait" in kwargs:
  563             opts["batch_wait"] = int(kwargs["batch_wait"])
  564 
  565         eauth = {}
  566         if "eauth" in kwargs:
  567             eauth["eauth"] = kwargs.pop("eauth")
  568         if "username" in kwargs:
  569             eauth["username"] = kwargs.pop("username")
  570         if "password" in kwargs:
  571             eauth["password"] = kwargs.pop("password")
  572         if "token" in kwargs:
  573             eauth["token"] = kwargs.pop("token")
  574 
  575         for key, val in self.opts.items():
  576             if key not in opts:
  577                 opts[key] = val
  578         batch = salt.cli.batch.Batch(opts, eauth=eauth, quiet=True)
  579         for ret in batch.run():
  580             yield ret
  581 
  582     def cmd(
  583         self,
  584         tgt,
  585         fun,
  586         arg=(),
  587         timeout=None,
  588         tgt_type="glob",
  589         ret="",
  590         jid="",
  591         full_return=False,
  592         kwarg=None,
  593         **kwargs
  594     ):
  595         """
  596         Synchronously execute a command on targeted minions
  597 
  598         The cmd method will execute and wait for the timeout period for all
  599         minions to reply, then it will return all minion data at once.
  600 
  601         .. code-block:: python
  602 
  603             >>> import salt.client
  604             >>> local = salt.client.LocalClient()
  605             >>> local.cmd('*', 'cmd.run', ['whoami'])
  606             {'jerry': 'root'}
  607 
  608         With extra keyword arguments for the command function to be run:
  609 
  610         .. code-block:: python
  611 
  612             local.cmd('*', 'test.arg', ['arg1', 'arg2'], kwarg={'foo': 'bar'})
  613 
  614         Compound commands can be used for multiple executions in a single
  615         publish. Function names and function arguments are provided in separate
  616         lists but the index values must correlate and an empty list must be
  617         used if no arguments are required.
  618 
  619         .. code-block:: python
  620 
  621             >>> local.cmd('*', [
  622                     'grains.items',
  623                     'sys.doc',
  624                     'cmd.run',
  625                 ],
  626                 [
  627                     [],
  628                     [],
  629                     ['uptime'],
  630                 ])
  631 
  632         :param tgt: Which minions to target for the execution. Default is shell
  633             glob. Modified by the ``tgt_type`` option.
  634         :type tgt: string or list
  635 
  636         :param fun: The module and function to call on the specified minions of
  637             the form ``module.function``. For example ``test.ping`` or
  638             ``grains.items``.
  639 
  640             Compound commands
  641                 Multiple functions may be called in a single publish by
  642                 passing a list of commands. This can dramatically lower
  643                 overhead and speed up the application communicating with Salt.
  644 
  645                 This requires that the ``arg`` param is a list of lists. The
  646                 ``fun`` list and the ``arg`` list must correlate by index
  647                 meaning a function that does not take arguments must still have
  648                 a corresponding empty list at the expected index.
  649         :type fun: string or list of strings
  650 
  651         :param arg: A list of arguments to pass to the remote function. If the
  652             function takes no arguments ``arg`` may be omitted except when
  653             executing a compound command.
  654         :type arg: list or list-of-lists
  655 
  656         :param timeout: Seconds to wait after the last minion returns but
  657             before all minions return.
  658 
  659         :param tgt_type: The type of ``tgt``. Allowed values:
  660 
  661             * ``glob`` - Bash glob completion - Default
  662             * ``pcre`` - Perl style regular expression
  663             * ``list`` - Python list of hosts
  664             * ``grain`` - Match based on a grain comparison
  665             * ``grain_pcre`` - Grain comparison with a regex
  666             * ``pillar`` - Pillar data comparison
  667             * ``pillar_pcre`` - Pillar data comparison with a regex
  668             * ``nodegroup`` - Match on nodegroup
  669             * ``range`` - Use a Range server for matching
  670             * ``compound`` - Pass a compound match string
  671             * ``ipcidr`` - Match based on Subnet (CIDR notation) or IPv4 address.
  672 
  673             .. versionchanged:: 2017.7.0
  674                 Renamed from ``expr_form`` to ``tgt_type``
  675 
  676         :param ret: The returner to use. The value passed can be single
  677             returner, or a comma delimited list of returners to call in order
  678             on the minions
  679 
  680         :param kwarg: A dictionary with keyword arguments for the function.
  681 
  682         :param full_return: Output the job return only (default) or the full
  683             return including exit code and other job metadata.
  684 
  685         :param kwargs: Optional keyword arguments.
  686             Authentication credentials may be passed when using
  687             :conf_master:`external_auth`.
  688 
  689             For example: ``local.cmd('*', 'test.ping', username='saltdev',
  690             password='saltdev', eauth='pam')``.
  691             Or: ``local.cmd('*', 'test.ping',
  692             token='5871821ea51754fdcea8153c1c745433')``
  693 
  694         :returns: A dictionary with the result of the execution, keyed by
  695             minion ID. A compound command will return a sub-dictionary keyed by
  696             function name.
  697         """
  698         was_listening = self.event.cpub
  699 
  700         try:
  701             pub_data = self.run_job(
  702                 tgt,
  703                 fun,
  704                 arg,
  705                 tgt_type,
  706                 ret,
  707                 timeout,
  708                 jid,
  709                 kwarg=kwarg,
  710                 listen=True,
  711                 **kwargs
  712             )
  713 
  714             if not pub_data:
  715                 return pub_data
  716 
  717             ret = {}
  718             for fn_ret in self.get_cli_event_returns(
  719                 pub_data["jid"],
  720                 pub_data["minions"],
  721                 self._get_timeout(timeout),
  722                 tgt,
  723                 tgt_type,
  724                 **kwargs
  725             ):
  726 
  727                 if fn_ret:
  728                     for mid, data in fn_ret.items():
  729                         ret[mid] = data if full_return else data.get("ret", {})
  730 
  731             for failed in list(set(pub_data["minions"]) - set(ret)):
  732                 ret[failed] = False
  733             return ret
  734         finally:
  735             if not was_listening:
  736                 self.event.close_pub()
  737 
  738     def cmd_cli(
  739         self,
  740         tgt,
  741         fun,
  742         arg=(),
  743         timeout=None,
  744         tgt_type="glob",
  745         ret="",
  746         verbose=False,
  747         kwarg=None,
  748         progress=False,
  749         **kwargs
  750     ):
  751         """
  752         Used by the :command:`salt` CLI. This method returns minion returns as
  753         they come back and attempts to block until all minions return.
  754 
  755         The function signature is the same as :py:meth:`cmd` with the
  756         following exceptions.
  757 
  758         :param verbose: Print extra information about the running command
  759         :returns: A generator
  760         """
  761         was_listening = self.event.cpub
  762 
  763         try:
  764             self.pub_data = self.run_job(
  765                 tgt,
  766                 fun,
  767                 arg,
  768                 tgt_type,
  769                 ret,
  770                 timeout,
  771                 kwarg=kwarg,
  772                 listen=True,
  773                 **kwargs
  774             )
  775 
  776             if not self.pub_data:
  777                 yield self.pub_data
  778             else:
  779                 try:
  780                     for fn_ret in self.get_cli_event_returns(
  781                         self.pub_data["jid"],
  782                         self.pub_data["minions"],
  783                         self._get_timeout(timeout),
  784                         tgt,
  785                         tgt_type,
  786                         verbose,
  787                         progress,
  788                         **kwargs
  789                     ):
  790 
  791                         if not fn_ret:
  792                             continue
  793 
  794                         yield fn_ret
  795                 except KeyboardInterrupt:
  796                     raise SystemExit(
  797                         "\n"
  798                         "This job's jid is: {0}\n"
  799                         "Exiting gracefully on Ctrl-c\n"
  800                         "The minions may not have all finished running and any "
  801                         "remaining minions will return upon completion. To look "
  802                         "up the return data for this job later, run the following "
  803                         "command:\n\n"
  804                         "salt-run jobs.lookup_jid {0}".format(self.pub_data["jid"])
  805                     )
  806         finally:
  807             if not was_listening:
  808                 self.event.close_pub()
  809 
  810     def cmd_iter(
  811         self,
  812         tgt,
  813         fun,
  814         arg=(),
  815         timeout=None,
  816         tgt_type="glob",
  817         ret="",
  818         kwarg=None,
  819         **kwargs
  820     ):
  821         """
  822         Yields the individual minion returns as they come in
  823 
  824         The function signature is the same as :py:meth:`cmd` with the
  825         following exceptions.
  826 
  827         Normally :py:meth:`cmd_iter` does not yield results for minions that
  828         are not connected. If you want it to return results for disconnected
  829         minions set `expect_minions=True` in `kwargs`.
  830 
  831         :return: A generator yielding the individual minion returns
  832 
  833         .. code-block:: python
  834 
  835             >>> ret = local.cmd_iter('*', 'test.ping')
  836             >>> for i in ret:
  837             ...     print(i)
  838             {'jerry': {'ret': True}}
  839             {'dave': {'ret': True}}
  840             {'stewart': {'ret': True}}
  841         """
  842         was_listening = self.event.cpub
  843 
  844         try:
  845             pub_data = self.run_job(
  846                 tgt,
  847                 fun,
  848                 arg,
  849                 tgt_type,
  850                 ret,
  851                 timeout,
  852                 kwarg=kwarg,
  853                 listen=True,
  854                 **kwargs
  855             )
  856 
  857             if not pub_data:
  858                 yield pub_data
  859             else:
  860                 if kwargs.get("yield_pub_data"):
  861                     yield pub_data
  862                 for fn_ret in self.get_iter_returns(
  863                     pub_data["jid"],
  864                     pub_data["minions"],
  865                     timeout=self._get_timeout(timeout),
  866                     tgt=tgt,
  867                     tgt_type=tgt_type,
  868                     **kwargs
  869                 ):
  870                     if not fn_ret:
  871                         continue
  872                     yield fn_ret
  873                 self._clean_up_subscriptions(pub_data["jid"])
  874         finally:
  875             if not was_listening:
  876                 self.event.close_pub()
  877 
  878     def cmd_iter_no_block(
  879         self,
  880         tgt,
  881         fun,
  882         arg=(),
  883         timeout=None,
  884         tgt_type="glob",
  885         ret="",
  886         kwarg=None,
  887         show_jid=False,
  888         verbose=False,
  889         **kwargs
  890     ):
  891         """
  892         Yields the individual minion returns as they come in, or None
  893             when no returns are available.
  894 
  895         The function signature is the same as :py:meth:`cmd` with the
  896         following exceptions.
  897 
  898         :returns: A generator yielding the individual minion returns, or None
  899             when no returns are available. This allows for actions to be
  900             injected in between minion returns.
  901 
  902         .. code-block:: python
  903 
  904             >>> ret = local.cmd_iter_no_block('*', 'test.ping')
  905             >>> for i in ret:
  906             ...     print(i)
  907             None
  908             {'jerry': {'ret': True}}
  909             {'dave': {'ret': True}}
  910             None
  911             {'stewart': {'ret': True}}
  912         """
  913         was_listening = self.event.cpub
  914 
  915         try:
  916             pub_data = self.run_job(
  917                 tgt,
  918                 fun,
  919                 arg,
  920                 tgt_type,
  921                 ret,
  922                 timeout,
  923                 kwarg=kwarg,
  924                 listen=True,
  925                 **kwargs
  926             )
  927 
  928             if not pub_data:
  929                 yield pub_data
  930             else:
  931                 for fn_ret in self.get_iter_returns(
  932                     pub_data["jid"],
  933                     pub_data["minions"],
  934                     timeout=timeout,
  935                     tgt=tgt,
  936                     tgt_type=tgt_type,
  937                     block=False,
  938                     **kwargs
  939                 ):
  940                     if fn_ret and any([show_jid, verbose]):
  941                         for minion in fn_ret:
  942                             fn_ret[minion]["jid"] = pub_data["jid"]
  943                     yield fn_ret
  944 
  945                 self._clean_up_subscriptions(pub_data["jid"])
  946         finally:
  947             if not was_listening:
  948                 self.event.close_pub()
  949 
  950     def cmd_full_return(
  951         self,
  952         tgt,
  953         fun,
  954         arg=(),
  955         timeout=None,
  956         tgt_type="glob",
  957         ret="",
  958         verbose=False,
  959         kwarg=None,
  960         **kwargs
  961     ):
  962         """
  963         Execute a salt command and return
  964         """
  965         was_listening = self.event.cpub
  966 
  967         try:
  968             pub_data = self.run_job(
  969                 tgt,
  970                 fun,
  971                 arg,
  972                 tgt_type,
  973                 ret,
  974                 timeout,
  975                 kwarg=kwarg,
  976                 listen=True,
  977                 **kwargs
  978             )
  979 
  980             if not pub_data:
  981                 return pub_data
  982 
  983             return self.get_cli_static_event_returns(
  984                 pub_data["jid"], pub_data["minions"], timeout, tgt, tgt_type, verbose
  985             )
  986         finally:
  987             if not was_listening:
  988                 self.event.close_pub()
  989 
  990     def get_cli_returns(
  991         self,
  992         jid,
  993         minions,
  994         timeout=None,
  995         tgt="*",
  996         tgt_type="glob",
  997         verbose=False,
  998         show_jid=False,
  999         **kwargs
 1000     ):
 1001         """
 1002         Starts a watcher looking at the return data for a specified JID
 1003 
 1004         :returns: all of the information for the JID
 1005         """
 1006         if verbose:
 1007             msg = "Executing job with jid {}".format(jid)
 1008             print(msg)
 1009             print("-" * len(msg) + "\n")
 1010         elif show_jid:
 1011             print("jid: {}".format(jid))
 1012         if timeout is None:
 1013             timeout = self.opts["timeout"]
 1014         fret = {}
 1015         # make sure the minions is a set (since we do set operations on it)
 1016         minions = set(minions)
 1017 
 1018         found = set()
 1019         # start this before the cache lookup-- in case new stuff comes in
 1020         event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout)
 1021 
 1022         # get the info from the cache
 1023         ret = self.get_cache_returns(jid)
 1024         if ret != {}:
 1025             found.update(set(ret))
 1026             yield ret
 1027 
 1028         # if you have all the returns, stop
 1029         if len(found.intersection(minions)) >= len(minions):
 1030             raise StopIteration()
 1031 
 1032         # otherwise, get them from the event system
 1033         for event in event_iter:
 1034             if event != {}:
 1035                 found.update(set(event))
 1036                 yield event
 1037             if len(found.intersection(minions)) >= len(minions):
 1038                 self._clean_up_subscriptions(jid)
 1039                 raise StopIteration()
 1040 
 1041     # TODO: tests!!
 1042     def get_returns_no_block(self, tag, match_type=None):
 1043         """
 1044         Raw function to just return events of jid excluding timeout logic
 1045 
 1046         Yield either the raw event data or None
 1047 
 1048         Pass a list of additional regular expressions as `tags_regex` to search
 1049         the event bus for non-return data, such as minion lists returned from
 1050         syndics.
 1051         """
 1052 
 1053         while True:
 1054             raw = self.event.get_event(
 1055                 wait=0.01,
 1056                 tag=tag,
 1057                 match_type=match_type,
 1058                 full=True,
 1059                 no_block=True,
 1060                 auto_reconnect=self.auto_reconnect,
 1061             )
 1062             yield raw
 1063 
 1064     def get_iter_returns(
 1065         self,
 1066         jid,
 1067         minions,
 1068         timeout=None,
 1069         tgt="*",
 1070         tgt_type="glob",
 1071         expect_minions=False,
 1072         block=True,
 1073         **kwargs
 1074     ):
 1075         """
 1076         Watch the event system and return job data as it comes in
 1077 
 1078         :returns: all of the information for the JID
 1079         """
 1080         if not isinstance(minions, set):
 1081             if isinstance(minions, str):
 1082                 minions = {minions}
 1083             elif isinstance(minions, (list, tuple)):
 1084                 minions = set(list(minions))
 1085 
 1086         if timeout is None:
 1087             timeout = self.opts["timeout"]
 1088         gather_job_timeout = int(
 1089             kwargs.get("gather_job_timeout", self.opts["gather_job_timeout"])
 1090         )
 1091         start = int(time.time())
 1092 
 1093         # timeouts per minion, id_ -> timeout time
 1094         minion_timeouts = {}
 1095 
 1096         found = set()
 1097         missing = set()
 1098         # Check to see if the jid is real, if not return the empty dict
 1099         try:
 1100             if (
 1101                 self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
 1102                 == {}
 1103             ):
 1104                 log.warning("jid does not exist")
 1105                 yield {}
 1106                 # stop the iteration, since the jid is invalid
 1107                 raise StopIteration()
 1108         except Exception as exc:  # pylint: disable=broad-except
 1109             log.warning(
 1110                 "Returner unavailable: %s", exc, exc_info_on_loglevel=logging.DEBUG
 1111             )
 1112         # Wait for the hosts to check in
 1113         last_time = False
 1114         # iterator for this job's return
 1115         if self.opts["order_masters"]:
 1116             # If we are a MoM, we need to gather expected minions from downstreams masters.
 1117             ret_iter = self.get_returns_no_block(
 1118                 "(salt/job|syndic/.*)/{}".format(jid), "regex"
 1119             )
 1120         else:
 1121             ret_iter = self.get_returns_no_block("salt/job/{}".format(jid))
 1122         # iterator for the info of this job
 1123         jinfo_iter = []
 1124         # open event jids that need to be un-subscribed from later
 1125         open_jids = set()
 1126         timeout_at = time.time() + timeout
 1127         gather_syndic_wait = time.time() + self.opts["syndic_wait"]
 1128         # are there still minions running the job out there
 1129         # start as True so that we ping at least once
 1130         minions_running = True
 1131         log.debug(
 1132             "get_iter_returns for jid %s sent to %s will timeout at %s",
 1133             jid,
 1134             minions,
 1135             datetime.fromtimestamp(timeout_at).time(),
 1136         )
 1137         while True:
 1138             # Process events until timeout is reached or all minions have returned
 1139             for raw in ret_iter:
 1140                 # if we got None, then there were no events
 1141                 if raw is None:
 1142                     break
 1143                 if "minions" in raw.get("data", {}):
 1144                     minions.update(raw["data"]["minions"])
 1145                     if "missing" in raw.get("data", {}):
 1146                         missing.update(raw["data"]["missing"])
 1147                     continue
 1148                 if "return" not in raw["data"]:
 1149                     continue
 1150                 if kwargs.get("raw", False):
 1151                     found.add(raw["data"]["id"])
 1152                     yield raw
 1153                 else:
 1154                     found.add(raw["data"]["id"])
 1155                     ret = {raw["data"]["id"]: {"ret": raw["data"]["return"]}}
 1156                     if "out" in raw["data"]:
 1157                         ret[raw["data"]["id"]]["out"] = raw["data"]["out"]
 1158                     if "retcode" in raw["data"]:
 1159                         ret[raw["data"]["id"]]["retcode"] = raw["data"]["retcode"]
 1160                     if "jid" in raw["data"]:
 1161                         ret[raw["data"]["id"]]["jid"] = raw["data"]["jid"]
 1162                     if kwargs.get("_cmd_meta", False):
 1163                         ret[raw["data"]["id"]].update(raw["data"])
 1164                     log.debug("jid %s return from %s", jid, raw["data"]["id"])
 1165                     yield ret
 1166 
 1167             # if we have all of the returns (and we aren't a syndic), no need for anything fancy
 1168             if (
 1169                 len(found.intersection(minions)) >= len(minions)
 1170                 and not self.opts["order_masters"]
 1171             ):
 1172                 # All minions have returned, break out of the loop
 1173                 log.debug("jid %s found all minions %s", jid, found)
 1174                 break
 1175             elif (
 1176                 len(found.intersection(minions)) >= len(minions)
 1177                 and self.opts["order_masters"]
 1178             ):
 1179                 if (
 1180                     len(found) >= len(minions)
 1181                     and len(minions) > 0
 1182                     and time.time() > gather_syndic_wait
 1183                 ):
 1184                     # There were some minions to find and we found them
 1185                     # However, this does not imply that *all* masters have yet responded with expected minion lists.
 1186                     # Therefore, continue to wait up to the syndic_wait period (calculated in gather_syndic_wait) to see
 1187                     # if additional lower-level masters deliver their lists of expected
 1188                     # minions.
 1189                     break
 1190             # If we get here we may not have gathered the minion list yet. Keep waiting
 1191             # for all lower-level masters to respond with their minion lists
 1192 
 1193             # let start the timeouts for all remaining minions
 1194 
 1195             for id_ in minions - found:
 1196                 # if we have a new minion in the list, make sure it has a timeout
 1197                 if id_ not in minion_timeouts:
 1198                     minion_timeouts[id_] = time.time() + timeout
 1199 
 1200             # if the jinfo has timed out and some minions are still running the job
 1201             # re-do the ping
 1202             if time.time() > timeout_at and minions_running:
 1203                 # since this is a new ping, no one has responded yet
 1204                 jinfo = self.gather_job_info(
 1205                     jid, list(minions - found), "list", **kwargs
 1206                 )
 1207                 minions_running = False
 1208                 # if we weren't assigned any jid that means the master thinks
 1209                 # we have nothing to send
 1210                 if "jid" not in jinfo:
 1211                     jinfo_iter = []
 1212                 else:
 1213                     jinfo_iter = self.get_returns_no_block(
 1214                         "salt/job/{}".format(jinfo["jid"])
 1215                     )
 1216                 timeout_at = time.time() + gather_job_timeout
 1217                 # if you are a syndic, wait a little longer
 1218                 if self.opts["order_masters"]:
 1219                     timeout_at += self.opts.get("syndic_wait", 1)
 1220 
 1221             # check for minions that are running the job still
 1222             for raw in jinfo_iter:
 1223                 # if there are no more events, lets stop waiting for the jinfo
 1224                 if raw is None:
 1225                     break
 1226                 try:
 1227                     if raw["data"]["retcode"] > 0:
 1228                         log.error(
 1229                             "saltutil returning errors on minion %s", raw["data"]["id"]
 1230                         )
 1231                         minions.remove(raw["data"]["id"])
 1232                         break
 1233                 except KeyError as exc:
 1234                     # This is a safe pass. We're just using the try/except to
 1235                     # avoid having to deep-check for keys.
 1236                     missing_key = exc.__str__().strip("'\"")
 1237                     if missing_key == "retcode":
 1238                         log.debug("retcode missing from client return")
 1239                     else:
 1240                         log.debug(
 1241                             "Passing on saltutil error. Key '%s' missing "
 1242                             "from client return. This may be an error in "
 1243                             "the client.",
 1244                             missing_key,
 1245                         )
 1246                 # Keep track of the jid events to unsubscribe from later
 1247                 open_jids.add(jinfo["jid"])
 1248 
 1249                 # TODO: move to a library??
 1250                 if "minions" in raw.get("data", {}):
 1251                     minions.update(raw["data"]["minions"])
 1252                     continue
 1253                 if "syndic" in raw.get("data", {}):
 1254                     minions.update(raw["syndic"])
 1255                     continue
 1256                 if "return" not in raw.get("data", {}):
 1257                     continue
 1258 
 1259                 # if the job isn't running there anymore... don't count
 1260                 if raw["data"]["return"] == {}:
 1261                     continue
 1262 
 1263                 # if the minion throws an exception containing the word "return"
 1264                 # the master will try to handle the string as a dict in the next
 1265                 # step. Check if we have a string, log the issue and continue.
 1266                 if isinstance(raw["data"]["return"], str):
 1267                     log.error("unexpected return from minion: %s", raw)
 1268                     continue
 1269 
 1270                 if (
 1271                     "return" in raw["data"]["return"]
 1272                     and raw["data"]["return"]["return"] == {}
 1273                 ):
 1274                     continue
 1275 
 1276                 # if we didn't originally target the minion, lets add it to the list
 1277                 if raw["data"]["id"] not in minions:
 1278                     minions.add(raw["data"]["id"])
 1279                 # update this minion's timeout, as long as the job is still running
 1280                 minion_timeouts[raw["data"]["id"]] = time.time() + timeout
 1281                 # a minion returned, so we know its running somewhere
 1282                 minions_running = True
 1283 
 1284             # if we have hit gather_job_timeout (after firing the job) AND
 1285             # if we have hit all minion timeouts, lets call it
 1286             now = time.time()
 1287             # if we have finished waiting, and no minions are running the job
 1288             # then we need to see if each minion has timedout
 1289             done = (now > timeout_at) and not minions_running
 1290             if done:
 1291                 # if all minions have timeod out
 1292                 for id_ in minions - found:
 1293                     if now < minion_timeouts[id_]:
 1294                         done = False
 1295                         break
 1296             if done:
 1297                 break
 1298 
 1299             # don't spin
 1300             if block:
 1301                 time.sleep(0.01)
 1302             else:
 1303                 yield
 1304 
 1305         # If there are any remaining open events, clean them up.
 1306         if open_jids:
 1307             for jid in open_jids:
 1308                 self.event.unsubscribe(jid)
 1309 
 1310         if expect_minions:
 1311             for minion in list(minions - found):
 1312                 yield {minion: {"failed": True}}
 1313 
 1314         # Filter out any minions marked as missing for which we received
 1315         # returns (prevents false events sent due to higher-level masters not
 1316         # knowing about lower-level minions).
 1317         missing -= found
 1318 
 1319         # Report on missing minions
 1320         if missing:
 1321             for minion in missing:
 1322                 yield {minion: {"failed": True}}
 1323 
 1324     def get_returns(self, jid, minions, timeout=None):
 1325         """
 1326         Get the returns for the command line interface via the event system
 1327         """
 1328         minions = set(minions)
 1329         if timeout is None:
 1330             timeout = self.opts["timeout"]
 1331         start = int(time.time())
 1332         timeout_at = start + timeout
 1333         log.debug(
 1334             "get_returns for jid %s sent to %s will timeout at %s",
 1335             jid,
 1336             minions,
 1337             datetime.fromtimestamp(timeout_at).time(),
 1338         )
 1339 
 1340         found = set()
 1341         ret = {}
 1342         # Check to see if the jid is real, if not return the empty dict
 1343         try:
 1344             if (
 1345                 self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
 1346                 == {}
 1347             ):
 1348                 log.warning("jid does not exist")
 1349                 return ret
 1350         except Exception as exc:  # pylint: disable=broad-except
 1351             raise SaltClientError(
 1352                 "Master job cache returner [{}] failed to verify jid. "
 1353                 "Exception details: {}".format(self.opts["master_job_cache"], exc)
 1354             )
 1355 
 1356         # Wait for the hosts to check in
 1357         while True:
 1358             time_left = timeout_at - int(time.time())
 1359             wait = max(1, time_left)
 1360             raw = self.event.get_event(wait, jid, auto_reconnect=self.auto_reconnect)
 1361             if raw is not None and "return" in raw:
 1362                 found.add(raw["id"])
 1363                 ret[raw["id"]] = raw["return"]
 1364                 if len(found.intersection(minions)) >= len(minions):
 1365                     # All minions have returned, break out of the loop
 1366                     log.debug("jid %s found all minions", jid)
 1367                     break
 1368                 continue
 1369             # Then event system timeout was reached and nothing was returned
 1370             if len(found.intersection(minions)) >= len(minions):
 1371                 # All minions have returned, break out of the loop
 1372                 log.debug("jid %s found all minions", jid)
 1373                 break
 1374             if int(time.time()) > timeout_at:
 1375                 log.info(
 1376                     "jid %s minions %s did not return in time", jid, (minions - found)
 1377                 )
 1378                 break
 1379             time.sleep(0.01)
 1380         return ret
 1381 
 1382     def get_full_returns(self, jid, minions, timeout=None):
 1383         """
 1384         This method starts off a watcher looking at the return data for
 1385         a specified jid, it returns all of the information for the jid
 1386         """
 1387         # TODO: change this from ret to return... or the other way.
 1388         #       Its inconsistent, we should pick one
 1389 
 1390         ret = {}
 1391         # create the iterator-- since we want to get anyone in the middle
 1392         event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout)
 1393 
 1394         try:
 1395             data = self.returners["{}.get_jid".format(self.opts["master_job_cache"])](
 1396                 jid
 1397             )
 1398         except Exception as exc:  # pylint: disable=broad-except
 1399             raise SaltClientError(
 1400                 "Returner {} could not fetch jid data. "
 1401                 "Exception details: {}".format(self.opts["master_job_cache"], exc)
 1402             )
 1403         for minion in data:
 1404             m_data = {}
 1405             if "return" in data[minion]:
 1406                 m_data["ret"] = data[minion].get("return")
 1407             else:
 1408                 m_data["ret"] = data[minion].get("return")
 1409             if "out" in data[minion]:
 1410                 m_data["out"] = data[minion]["out"]
 1411             if minion in ret:
 1412                 ret[minion].update(m_data)
 1413             else:
 1414                 ret[minion] = m_data
 1415 
 1416         # if we have all the minion returns, lets just return
 1417         if len(set(ret).intersection(minions)) >= len(minions):
 1418             return ret
 1419 
 1420         # otherwise lets use the listener we created above to get the rest
 1421         for event_ret in event_iter:
 1422             # if nothing in the event_ret, skip
 1423             if event_ret == {}:
 1424                 time.sleep(0.02)
 1425                 continue
 1426             for minion, m_data in event_ret.items():
 1427                 if minion in ret:
 1428                     ret[minion].update(m_data)
 1429                 else:
 1430                     ret[minion] = m_data
 1431 
 1432             # are we done yet?
 1433             if len(set(ret).intersection(minions)) >= len(minions):
 1434                 return ret
 1435 
 1436         # otherwise we hit the timeout, return what we have
 1437         return ret
 1438 
 1439     def get_cache_returns(self, jid):
 1440         """
 1441         Execute a single pass to gather the contents of the job cache
 1442         """
 1443         ret = {}
 1444 
 1445         try:
 1446             data = self.returners["{}.get_jid".format(self.opts["master_job_cache"])](
 1447                 jid
 1448             )
 1449         except Exception as exc:  # pylint: disable=broad-except
 1450             raise SaltClientError(
 1451                 "Could not examine master job cache. "
 1452                 "Error occurred in {} returner. "
 1453                 "Exception details: {}".format(self.opts["master_job_cache"], exc)
 1454             )
 1455         for minion in data:
 1456             m_data = {}
 1457             if "return" in data[minion]:
 1458                 m_data["ret"] = data[minion].get("return")
 1459             else:
 1460                 m_data["ret"] = data[minion].get("return")
 1461             if "out" in data[minion]:
 1462                 m_data["out"] = data[minion]["out"]
 1463             if minion in ret:
 1464                 ret[minion].update(m_data)
 1465             else:
 1466                 ret[minion] = m_data
 1467 
 1468         return ret
 1469 
 1470     def get_cli_static_event_returns(
 1471         self,
 1472         jid,
 1473         minions,
 1474         timeout=None,
 1475         tgt="*",
 1476         tgt_type="glob",
 1477         verbose=False,
 1478         show_timeout=False,
 1479         show_jid=False,
 1480     ):
 1481         """
 1482         Get the returns for the command line interface via the event system
 1483         """
 1484         log.trace("entered - function get_cli_static_event_returns()")
 1485         minions = set(minions)
 1486         if verbose:
 1487             msg = "Executing job with jid {}".format(jid)
 1488             print(msg)
 1489             print("-" * len(msg) + "\n")
 1490         elif show_jid:
 1491             print("jid: {}".format(jid))
 1492 
 1493         if timeout is None:
 1494             timeout = self.opts["timeout"]
 1495 
 1496         start = int(time.time())
 1497         timeout_at = start + timeout
 1498         found = set()
 1499         ret = {}
 1500         # Check to see if the jid is real, if not return the empty dict
 1501         try:
 1502             if (
 1503                 self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
 1504                 == {}
 1505             ):
 1506                 log.warning("jid does not exist")
 1507                 return ret
 1508         except Exception as exc:  # pylint: disable=broad-except
 1509             raise SaltClientError(
 1510                 "Load could not be retrieved from "
 1511                 "returner {}. Exception details: {}".format(
 1512                     self.opts["master_job_cache"], exc
 1513                 )
 1514             )
 1515         # Wait for the hosts to check in
 1516         while True:
 1517             # Process events until timeout is reached or all minions have returned
 1518             time_left = timeout_at - int(time.time())
 1519             # Wait 0 == forever, use a minimum of 1s
 1520             wait = max(1, time_left)
 1521             jid_tag = "salt/job/{}".format(jid)
 1522             raw = self.event.get_event(
 1523                 wait, jid_tag, auto_reconnect=self.auto_reconnect
 1524             )
 1525             if raw is not None and "return" in raw:
 1526                 if "minions" in raw.get("data", {}):
 1527                     minions.update(raw["data"]["minions"])
 1528                     continue
 1529                 found.add(raw["id"])
 1530                 ret[raw["id"]] = {"ret": raw["return"]}
 1531                 ret[raw["id"]]["success"] = raw.get("success", False)
 1532                 if "out" in raw:
 1533                     ret[raw["id"]]["out"] = raw["out"]
 1534                 if len(found.intersection(minions)) >= len(minions):
 1535                     # All minions have returned, break out of the loop
 1536                     break
 1537                 continue
 1538             # Then event system timeout was reached and nothing was returned
 1539             if len(found.intersection(minions)) >= len(minions):
 1540                 # All minions have returned, break out of the loop
 1541                 break
 1542             if int(time.time()) > timeout_at:
 1543                 if verbose or show_timeout:
 1544                     if self.opts.get("minion_data_cache", False) or tgt_type in (
 1545                         "glob",
 1546                         "pcre",
 1547                         "list",
 1548                     ):
 1549                         if len(found) < len(minions):
 1550                             fail = sorted(list(minions.difference(found)))
 1551                             for minion in fail:
 1552                                 ret[minion] = {
 1553                                     "out": "no_return",
 1554                                     "ret": "Minion did not return",
 1555                                 }
 1556                 break
 1557             time.sleep(0.01)
 1558 
 1559         self._clean_up_subscriptions(jid)
 1560         return ret
 1561 
 1562     def get_cli_event_returns(
 1563         self,
 1564         jid,
 1565         minions,
 1566         timeout=None,
 1567         tgt="*",
 1568         tgt_type="glob",
 1569         verbose=False,
 1570         progress=False,
 1571         show_timeout=False,
 1572         show_jid=False,
 1573         **kwargs
 1574     ):
 1575         """
 1576         Get the returns for the command line interface via the event system
 1577         """
 1578         log.trace("func get_cli_event_returns()")
 1579 
 1580         if verbose:
 1581             msg = "Executing job with jid {}".format(jid)
 1582             print(msg)
 1583             print("-" * len(msg) + "\n")
 1584         elif show_jid:
 1585             print("jid: {}".format(jid))
 1586 
 1587         # lazy load the connected minions
 1588         connected_minions = None
 1589         return_count = 0
 1590 
 1591         for ret in self.get_iter_returns(
 1592             jid,
 1593             minions,
 1594             timeout=timeout,
 1595             tgt=tgt,
 1596             tgt_type=tgt_type,
 1597             # (gtmanfred) expect_minions is popped here incase it is passed from a client
 1598             # call. If this is not popped, then it would be passed twice to
 1599             # get_iter_returns.
 1600             expect_minions=(
 1601                 kwargs.pop("expect_minions", False) or verbose or show_timeout
 1602             ),
 1603             **kwargs
 1604         ):
 1605             log.debug("return event: %s", ret)
 1606             return_count = return_count + 1
 1607             if progress:
 1608                 for id_, min_ret in ret.items():
 1609                     if not min_ret.get("failed") is True:
 1610                         yield {
 1611                             "minion_count": len(minions),
 1612                             "return_count": return_count,
 1613                         }
 1614             # replace the return structure for missing minions
 1615             for id_, min_ret in ret.items():
 1616                 if min_ret.get("failed") is True:
 1617                     if connected_minions is None:
 1618                         connected_minions = salt.utils.minions.CkMinions(
 1619                             self.opts
 1620                         ).connected_ids()
 1621                     if (
 1622                         self.opts["minion_data_cache"]
 1623                         and salt.cache.factory(self.opts).contains(
 1624                             "minions/{}".format(id_), "data"
 1625                         )
 1626                         and connected_minions
 1627                         and id_ not in connected_minions
 1628                     ):
 1629 
 1630                         yield {
 1631                             id_: {
 1632                                 "out": "no_return",
 1633                                 "ret": "Minion did not return. [Not connected]",
 1634                                 "retcode": salt.defaults.exitcodes.EX_GENERIC,
 1635                             }
 1636                         }
 1637                     else:
 1638                         # don't report syndics as unresponsive minions
 1639                         if not os.path.exists(
 1640                             os.path.join(self.opts["syndic_dir"], id_)
 1641                         ):
 1642                             yield {
 1643                                 id_: {
 1644                                     "out": "no_return",
 1645                                     "ret": "Minion did not return. [No response]"
 1646                                     "\nThe minions may not have all finished running and any "
 1647                                     "remaining minions will return upon completion. To look "
 1648                                     "up the return data for this job later, run the following "
 1649                                     "command:\n\n"
 1650                                     "salt-run jobs.lookup_jid {}".format(jid),
 1651                                     "retcode": salt.defaults.exitcodes.EX_GENERIC,
 1652                                 }
 1653                             }
 1654                 else:
 1655                     yield {id_: min_ret}
 1656 
 1657         self._clean_up_subscriptions(jid)
 1658 
 1659     def get_event_iter_returns(self, jid, minions, timeout=None):
 1660         """
 1661         Gather the return data from the event system, break hard when timeout
 1662         is reached.
 1663         """
 1664         log.trace("entered - function get_event_iter_returns()")
 1665         if timeout is None:
 1666             timeout = self.opts["timeout"]
 1667 
 1668         timeout_at = time.time() + timeout
 1669 
 1670         found = set()
 1671         # Check to see if the jid is real, if not return the empty dict
 1672         if (
 1673             self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
 1674             == {}
 1675         ):
 1676             log.warning("jid does not exist")
 1677             yield {}
 1678             # stop the iteration, since the jid is invalid
 1679             raise StopIteration()
 1680         # Wait for the hosts to check in
 1681         while True:
 1682             raw = self.event.get_event(timeout, auto_reconnect=self.auto_reconnect)
 1683             if raw is None or time.time() > timeout_at:
 1684                 # Timeout reached
 1685                 break
 1686             if "minions" in raw.get("data", {}):
 1687                 continue
 1688             try:
 1689                 # There might be two jobs for the same minion, so we have to check for the jid
 1690                 if jid == raw["jid"]:
 1691                     found.add(raw["id"])
 1692                     ret = {raw["id"]: {"ret": raw["return"]}}
 1693                 else:
 1694                     continue
 1695             except KeyError:
 1696                 # Ignore other erroneous messages
 1697                 continue
 1698             if "out" in raw:
 1699                 ret[raw["id"]]["out"] = raw["out"]
 1700             yield ret
 1701             time.sleep(0.02)
 1702 
 1703     def _resolve_nodegroup(self, ng):
 1704         """
 1705         Resolve a nodegroup into its configured components
 1706         """
 1707         if ng not in self.opts["nodegroups"]:
 1708             conf_file = self.opts.get("conf_file", "the master config file")
 1709             raise SaltInvocationError(
 1710                 "Node group {} unavailable in {}".format(ng, conf_file)
 1711             )
 1712         return salt.utils.minions.nodegroup_comp(ng, self.opts["nodegroups"])
 1713 
 1714     def _prep_pub(self, tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs):
 1715         """
 1716         Set up the payload_kwargs to be sent down to the master
 1717         """
 1718         if tgt_type == "nodegroup":
 1719             tgt = self._resolve_nodegroup(tgt)
 1720             tgt_type = "compound"
 1721 
 1722         if tgt_type == "compound":
 1723             #  Resolve all nodegroups, so that the minions don't have to.
 1724             new_tgt = list()
 1725             log.debug("compound resolution: original tgt: %s", tgt)
 1726 
 1727             if isinstance(tgt, str):
 1728                 tgt = tgt.split()
 1729 
 1730             for word in tgt:
 1731                 if word.startswith("N@") and len(word) > 2:
 1732                     resolved = self._resolve_nodegroup(word[2:])
 1733                     new_tgt.extend(resolved)
 1734                 else:
 1735                     new_tgt.append(word)
 1736 
 1737             log.debug("compound resolution: new_tgt: %s", new_tgt)
 1738             tgt = " ".join(new_tgt)
 1739 
 1740         # Convert a range expression to a list of nodes and change expression
 1741         # form to list
 1742         if tgt_type == "range" and HAS_RANGE:
 1743             tgt = self._convert_range_to_list(tgt)
 1744             tgt_type = "list"
 1745 
 1746         # If an external job cache is specified add it to the ret list
 1747         if self.opts.get("ext_job_cache"):
 1748             if ret:
 1749                 ret += ",{}".format(self.opts["ext_job_cache"])
 1750             else:
 1751                 ret = self.opts["ext_job_cache"]
 1752 
 1753         # format the payload - make a function that does this in the payload
 1754         #   module
 1755 
 1756         # Generate the standard keyword args to feed to format_payload
 1757         payload_kwargs = {
 1758             "cmd": "publish",
 1759             "tgt": tgt,
 1760             "fun": fun,
 1761             "arg": arg,
 1762             "key": self.key,
 1763             "tgt_type": tgt_type,
 1764             "ret": ret,
 1765             "jid": jid,
 1766         }
 1767 
 1768         # if kwargs are passed, pack them.
 1769         if kwargs:
 1770             payload_kwargs["kwargs"] = kwargs
 1771 
 1772         # If we have a salt user, add it to the payload
 1773         if self.opts["syndic_master"] and "user" in kwargs:
 1774             payload_kwargs["user"] = kwargs["user"]
 1775         elif self.salt_user:
 1776             payload_kwargs["user"] = self.salt_user
 1777 
 1778         # If we're a syndication master, pass the timeout
 1779         if self.opts["order_masters"]:
 1780             payload_kwargs["to"] = timeout
 1781 
 1782         return payload_kwargs
 1783 
 1784     def pub(
 1785         self,
 1786         tgt,
 1787         fun,
 1788         arg=(),
 1789         tgt_type="glob",
 1790         ret="",
 1791         jid="",
 1792         timeout=5,
 1793         listen=False,
 1794         **kwargs
 1795     ):
 1796         """
 1797         Take the required arguments and publish the given command.
 1798         Arguments:
 1799             tgt:
 1800                 The tgt is a regex or a glob used to match up the ids on
 1801                 the minions. Salt works by always publishing every command
 1802                 to all of the minions and then the minions determine if
 1803                 the command is for them based on the tgt value.
 1804             fun:
 1805                 The function name to be called on the remote host(s), this
 1806                 must be a string in the format "<modulename>.<function name>"
 1807             arg:
 1808                 The arg option needs to be a tuple of arguments to pass
 1809                 to the calling function, if left blank
 1810         Returns:
 1811             jid:
 1812                 A string, as returned by the publisher, which is the job
 1813                 id, this will inform the client where to get the job results
 1814             minions:
 1815                 A set, the targets that the tgt passed should match.
 1816         """
 1817         # Make sure the publisher is running by checking the unix socket
 1818         if self.opts.get("ipc_mode", "") != "tcp" and not os.path.exists(
 1819             os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
 1820         ):
 1821             log.error(
 1822                 "Unable to connect to the salt master publisher at %s",
 1823                 self.opts["sock_dir"],
 1824             )
 1825             raise SaltClientError
 1826 
 1827         payload_kwargs = self._prep_pub(
 1828             tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs
 1829         )
 1830 
 1831         master_uri = "tcp://{}:{}".format(
 1832             salt.utils.zeromq.ip_bracket(self.opts["interface"]),
 1833             str(self.opts["ret_port"]),
 1834         )
 1835 
 1836         with salt.transport.client.ReqChannel.factory(
 1837             self.opts, crypt="clear", master_uri=master_uri
 1838         ) as channel:
 1839             try:
 1840                 # Ensure that the event subscriber is connected.
 1841                 # If not, we won't get a response, so error out
 1842                 if listen and not self.event.connect_pub(timeout=timeout):
 1843                     raise SaltReqTimeoutError()
 1844                 payload = channel.send(payload_kwargs, timeout=timeout)
 1845             except SaltReqTimeoutError as err:
 1846                 log.error(err)
 1847                 raise SaltReqTimeoutError(
 1848                     "Salt request timed out. The master is not responding. You "
 1849                     "may need to run your command with `--async` in order to "
 1850                     "bypass the congested event bus. With `--async`, the CLI tool "
 1851                     "will print the job id (jid) and exit immediately without "
 1852                     "listening for responses. You can then use "
 1853                     "`salt-run jobs.lookup_jid` to look up the results of the job "
 1854                     "in the job cache later."
 1855                 )
 1856 
 1857             if not payload:
 1858                 # The master key could have changed out from under us! Regen
 1859                 # and try again if the key has changed
 1860                 key = self.__read_master_key()
 1861                 if key == self.key:
 1862                     return payload
 1863                 self.key = key
 1864                 payload_kwargs["key"] = self.key
 1865                 payload = channel.send(payload_kwargs)
 1866 
 1867             error = payload.pop("error", None)
 1868             if error is not None:
 1869                 if isinstance(error, dict):
 1870                     err_name = error.get("name", "")
 1871                     err_msg = error.get("message", "")
 1872                     if err_name == "AuthenticationError":
 1873                         raise AuthenticationError(err_msg)
 1874                     elif err_name == "AuthorizationError":
 1875                         raise AuthorizationError(err_msg)
 1876 
 1877                 raise PublishError(error)
 1878 
 1879             if not payload:
 1880                 return payload
 1881 
 1882         return {"jid": payload["load"]["jid"], "minions": payload["load"]["minions"]}
 1883 
 1884     @salt.ext.tornado.gen.coroutine
 1885     def pub_async(
 1886         self,
 1887         tgt,
 1888         fun,
 1889         arg=(),
 1890         tgt_type="glob",
 1891         ret="",
 1892         jid="",
 1893         timeout=5,
 1894         io_loop=None,
 1895         listen=True,
 1896         **kwargs
 1897     ):
 1898         """
 1899         Take the required arguments and publish the given command.
 1900         Arguments:
 1901             tgt:
 1902                 The tgt is a regex or a glob used to match up the ids on
 1903                 the minions. Salt works by always publishing every command
 1904                 to all of the minions and then the minions determine if
 1905                 the command is for them based on the tgt value.
 1906             fun:
 1907                 The function name to be called on the remote host(s), this
 1908                 must be a string in the format "<modulename>.<function name>"
 1909             arg:
 1910                 The arg option needs to be a tuple of arguments to pass
 1911                 to the calling function, if left blank
 1912         Returns:
 1913             jid:
 1914                 A string, as returned by the publisher, which is the job
 1915                 id, this will inform the client where to get the job results
 1916             minions:
 1917                 A set, the targets that the tgt passed should match.
 1918         """
 1919         # Make sure the publisher is running by checking the unix socket
 1920         if self.opts.get("ipc_mode", "") != "tcp" and not os.path.exists(
 1921             os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
 1922         ):
 1923             log.error(
 1924                 "Unable to connect to the salt master publisher at %s",
 1925                 self.opts["sock_dir"],
 1926             )
 1927             raise SaltClientError
 1928 
 1929         payload_kwargs = self._prep_pub(
 1930             tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs
 1931         )
 1932 
 1933         master_uri = (
 1934             "tcp://"
 1935             + salt.utils.zeromq.ip_bracket(self.opts["interface"])
 1936             + ":"
 1937             + str(self.opts["ret_port"])
 1938         )
 1939 
 1940         with salt.transport.client.AsyncReqChannel.factory(
 1941             self.opts, io_loop=io_loop, crypt="clear", master_uri=master_uri
 1942         ) as channel:
 1943             try:
 1944                 # Ensure that the event subscriber is connected.
 1945                 # If not, we won't get a response, so error out
 1946                 if listen and not self.event.connect_pub(timeout=timeout):
 1947                     raise SaltReqTimeoutError()
 1948                 payload = yield channel.send(payload_kwargs, timeout=timeout)
 1949             except SaltReqTimeoutError:
 1950                 raise SaltReqTimeoutError(
 1951                     "Salt request timed out. The master is not responding. You "
 1952                     "may need to run your command with `--async` in order to "
 1953                     "bypass the congested event bus. With `--async`, the CLI tool "
 1954                     "will print the job id (jid) and exit immediately without "
 1955                     "listening for responses. You can then use "
 1956                     "`salt-run jobs.lookup_jid` to look up the results of the job "
 1957                     "in the job cache later."
 1958                 )
 1959 
 1960             if not payload:
 1961                 # The master key could have changed out from under us! Regen
 1962                 # and try again if the key has changed
 1963                 key = self.__read_master_key()
 1964                 if key == self.key:
 1965                     raise salt.ext.tornado.gen.Return(payload)
 1966                 self.key = key
 1967                 payload_kwargs["key"] = self.key
 1968                 payload = yield channel.send(payload_kwargs)
 1969 
 1970             error = payload.pop("error", None)
 1971             if error is not None:
 1972                 if isinstance(error, dict):
 1973                     err_name = error.get("name", "")
 1974                     err_msg = error.get("message", "")
 1975                     if err_name == "AuthenticationError":
 1976                         raise AuthenticationError(err_msg)
 1977                     elif err_name == "AuthorizationError":
 1978                         raise AuthorizationError(err_msg)
 1979 
 1980                 raise PublishError(error)
 1981 
 1982             if not payload:
 1983                 raise salt.ext.tornado.gen.Return(payload)
 1984 
 1985         raise salt.ext.tornado.gen.Return(
 1986             {"jid": payload["load"]["jid"], "minions": payload["load"]["minions"]}
 1987         )
 1988 
 1989     # pylint: disable=W1701
 1990     def __del__(self):
 1991         # This IS really necessary!
 1992         # When running tests, if self.events is not destroyed, we leak 2
 1993         # threads per test case which uses self.client
 1994         if hasattr(self, "event"):
 1995             # The call below will take care of calling 'self.event.destroy()'
 1996             del self.event
 1997 
 1998     # pylint: enable=W1701
 1999 
 2000     def _clean_up_subscriptions(self, job_id):
 2001         if self.opts.get("order_masters"):
 2002             self.event.unsubscribe("syndic/.*/{}".format(job_id), "regex")
 2003         self.event.unsubscribe("salt/job/{}".format(job_id))
 2004 
 2005 
 2006 class FunctionWrapper(dict):
 2007     """
 2008     Create a function wrapper that looks like the functions dict on the minion
 2009     but invoked commands on the minion via a LocalClient.
 2010 
 2011     This allows SLS files to be loaded with an object that calls down to the
 2012     minion when the salt functions dict is referenced.
 2013     """
 2014 
 2015     def __init__(self, opts, minion):
 2016         super().__init__()
 2017         self.opts = opts
 2018         self.minion = minion
 2019         self.local = LocalClient(self.opts["conf_file"])
 2020         self.functions = self.__load_functions()
 2021 
 2022     def __missing__(self, key):
 2023         """
 2024         Since the function key is missing, wrap this call to a command to the
 2025         minion of said key if it is available in the self.functions set
 2026         """
 2027         if key not in self.functions:
 2028             raise KeyError
 2029         return self.run_key(key)
 2030 
 2031     def __load_functions(self):
 2032         """
 2033         Find out what functions are available on the minion
 2034         """
 2035         return set(
 2036             self.local.cmd(self.minion, "sys.list_functions").get(self.minion, [])
 2037         )
 2038 
 2039     def run_key(self, key):
 2040         """
 2041         Return a function that executes the arguments passed via the local
 2042         client
 2043         """
 2044 
 2045         def func(*args, **kwargs):
 2046             """
 2047             Run a remote call
 2048             """
 2049             args = list(args)
 2050             for _key, _val in kwargs.items():
 2051                 args.append("{}={}".format(_key, _val))
 2052             return self.local.cmd(self.minion, key, args)
 2053 
 2054         return func
 2055 
 2056 
 2057 class Caller:
 2058     """
 2059     ``Caller`` is the same interface used by the :command:`salt-call`
 2060     command-line tool on the Salt Minion.
 2061 
 2062     .. versionchanged:: 2015.8.0
 2063         Added the ``cmd`` method for consistency with the other Salt clients.
 2064         The existing ``function`` and ``sminion.functions`` interfaces still
 2065         exist but have been removed from the docs.
 2066 
 2067     Importing and using ``Caller`` must be done on the same machine as a
 2068     Salt Minion and it must be done using the same user that the Salt Minion is
 2069     running as.
 2070 
 2071     Usage:
 2072 
 2073     .. code-block:: python
 2074 
 2075         import salt.client
 2076         caller = salt.client.Caller()
 2077         caller.cmd('test.ping')
 2078 
 2079     Note, a running master or minion daemon is not required to use this class.
 2080     Running ``salt-call --local`` simply sets :conf_minion:`file_client` to
 2081     ``'local'``. The same can be achieved at the Python level by including that
 2082     setting in a minion config file.
 2083 
 2084     .. versionadded:: 2014.7.0
 2085         Pass the minion config as the ``mopts`` dictionary.
 2086 
 2087     .. code-block:: python
 2088 
 2089         import salt.client
 2090         import salt.config
 2091         __opts__ = salt.config.minion_config('/etc/salt/minion')
 2092         __opts__['file_client'] = 'local'
 2093         caller = salt.client.Caller(mopts=__opts__)
 2094     """
 2095 
 2096     def __init__(self, c_path=os.path.join(syspaths.CONFIG_DIR, "minion"), mopts=None):
 2097         # Late-import of the minion module to keep the CLI as light as possible
 2098         import salt.minion
 2099 
 2100         if mopts:
 2101             self.opts = mopts
 2102         else:
 2103             self.opts = salt.config.minion_config(c_path)
 2104         self.sminion = salt.minion.SMinion(self.opts)
 2105 
 2106     def cmd(self, fun, *args, **kwargs):
 2107         """
 2108         Call an execution module with the given arguments and keyword arguments
 2109 
 2110         .. versionchanged:: 2015.8.0
 2111             Added the ``cmd`` method for consistency with the other Salt clients.
 2112             The existing ``function`` and ``sminion.functions`` interfaces still
 2113             exist but have been removed from the docs.
 2114 
 2115         .. code-block:: python
 2116 
 2117             caller.cmd('test.arg', 'Foo', 'Bar', baz='Baz')
 2118 
 2119             caller.cmd('event.send', 'myco/myevent/something',
 2120                 data={'foo': 'Foo'}, with_env=['GIT_COMMIT'], with_grains=True)
 2121         """
 2122         return self.sminion.functions[fun](*args, **kwargs)
 2123 
 2124     def function(self, fun, *args, **kwargs):
 2125         """
 2126         Call a single salt function
 2127         """
 2128         func = self.sminion.functions[fun]
 2129         args, kwargs = salt.minion.load_args_and_kwargs(
 2130             func, salt.utils.args.parse_input(args), kwargs
 2131         )
 2132         return func(*args, **kwargs)
 2133 
 2134 
 2135 class ProxyCaller:
 2136     """
 2137     ``ProxyCaller`` is the same interface used by the :command:`salt-call`
 2138     with the args ``--proxyid <proxyid>`` command-line tool on the Salt Proxy
 2139     Minion.
 2140 
 2141     Importing and using ``ProxyCaller`` must be done on the same machine as a
 2142     Salt Minion and it must be done using the same user that the Salt Minion is
 2143     running as.
 2144 
 2145     Usage:
 2146 
 2147     .. code-block:: python
 2148 
 2149         import salt.client
 2150         caller = salt.client.Caller()
 2151         caller.cmd('test.ping')
 2152 
 2153     Note, a running master or minion daemon is not required to use this class.
 2154     Running ``salt-call --local`` simply sets :conf_minion:`file_client` to
 2155     ``'local'``. The same can be achieved at the Python level by including that
 2156     setting in a minion config file.
 2157 
 2158     .. code-block:: python
 2159 
 2160         import salt.client
 2161         import salt.config
 2162         __opts__ = salt.config.proxy_config('/etc/salt/proxy', minion_id='quirky_edison')
 2163         __opts__['file_client'] = 'local'
 2164         caller = salt.client.ProxyCaller(mopts=__opts__)
 2165 
 2166     .. note::
 2167 
 2168         To use this for calling proxies, the :py:func:`is_proxy functions
 2169         <salt.utils.platform.is_proxy>` requires that ``--proxyid`` be an
 2170         argument on the commandline for the script this is used in, or that the
 2171         string ``proxy`` is in the name of the script.
 2172     """
 2173 
 2174     def __init__(self, c_path=os.path.join(syspaths.CONFIG_DIR, "proxy"), mopts=None):
 2175         # Late-import of the minion module to keep the CLI as light as possible
 2176         import salt.minion
 2177 
 2178         self.opts = mopts or salt.config.proxy_config(c_path)
 2179         self.sminion = salt.minion.SProxyMinion(self.opts)
 2180 
 2181     def cmd(self, fun, *args, **kwargs):
 2182         """
 2183         Call an execution module with the given arguments and keyword arguments
 2184 
 2185         .. code-block:: python
 2186 
 2187             caller.cmd('test.arg', 'Foo', 'Bar', baz='Baz')
 2188 
 2189             caller.cmd('event.send', 'myco/myevent/something',
 2190                 data={'foo': 'Foo'}, with_env=['GIT_COMMIT'], with_grains=True)
 2191         """
 2192         func = self.sminion.functions[fun]
 2193         data = {"arg": args, "fun": fun}
 2194         data.update(kwargs)
 2195         executors = getattr(self.sminion, "module_executors", []) or self.opts.get(
 2196             "module_executors", ["direct_call"]
 2197         )
 2198         if isinstance(executors, str):
 2199             executors = [executors]
 2200         for name in executors:
 2201             fname = "{}.execute".format(name)
 2202             if fname not in self.sminion.executors:
 2203                 raise SaltInvocationError("Executor '{}' is not available".format(name))
 2204             return_data = self.sminion.executors[fname](
 2205                 self.opts, data, func, args, kwargs
 2206             )
 2207             if return_data is not None:
 2208                 break
 2209         return return_data