"Fossies" - the Fresh Open Source Software Archive 
Member "salt-3002.2/salt/client/__init__.py" (18 Nov 2020, 77650 Bytes) of package /linux/misc/salt-3002.2.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "__init__.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
3002.1_vs_3002.2.
1 """
2 The client module is used to create a client connection to the publisher
3 The data structure needs to be:
4 {'enc': 'clear',
5 'load': {'fun': '<mod.callable>',
6 'arg':, ('arg1', 'arg2', ...),
7 'tgt': '<glob or id>',
8 'key': '<read in the key file>'}
9 """
10
11
12 # pylint: disable=import-error
13
14 # Try to import range from https://github.com/ytoolshed/range
15 #
16
17 import logging
18
19 # The components here are simple, and they need to be and stay simple, we
20 # want a client to have 3 external concerns, and maybe a forth configurable
21 # option.
22 # The concerns are:
23 # 1. Who executes the command?
24 # 2. What is the function being run?
25 # 3. What arguments need to be passed to the function?
26 # 4. How long do we wait for all of the replies?
27 import os
28 import random
29 import sys
30 import time
31 from datetime import datetime
32
33 import salt.cache
34 import salt.config
35 import salt.defaults.exitcodes
36
37 # Import tornado
38 import salt.ext.tornado.gen # pylint: disable=F0401
39 import salt.loader
40 import salt.payload
41 import salt.syspaths as syspaths
42 import salt.transport.client
43 import salt.utils.args
44 import salt.utils.event
45 import salt.utils.files
46 import salt.utils.jid
47 import salt.utils.minions
48 import salt.utils.platform
49 import salt.utils.stringutils
50 import salt.utils.user
51 import salt.utils.verify
52 import salt.utils.zeromq
53 from salt.exceptions import (
54 AuthenticationError,
55 AuthorizationError,
56 EauthAuthenticationError,
57 PublishError,
58 SaltClientError,
59 SaltInvocationError,
60 SaltReqTimeoutError,
61 )
62 from salt.ext import six
63
64 HAS_RANGE = False
65 try:
66 import seco.range
67
68 HAS_RANGE = True
69 except ImportError:
70 pass
71 # pylint: enable=import-error
72
73
74 log = logging.getLogger(__name__)
75
76
77 def get_local_client(
78 c_path=os.path.join(syspaths.CONFIG_DIR, "master"),
79 mopts=None,
80 skip_perm_errors=False,
81 io_loop=None,
82 auto_reconnect=False,
83 ):
84 """
85 .. versionadded:: 2014.7.0
86
87 Read in the config and return the correct LocalClient object based on
88 the configured transport
89
90 :param IOLoop io_loop: io_loop used for events.
91 Pass in an io_loop if you want asynchronous
92 operation for obtaining events. Eg use of
93 set_event_handler() API. Otherwise, operation
94 will be synchronous.
95 """
96 if mopts:
97 opts = mopts
98 else:
99 # Late import to prevent circular import
100 import salt.config
101
102 opts = salt.config.client_config(c_path)
103
104 # TODO: AIO core is separate from transport
105 return LocalClient(
106 mopts=opts,
107 skip_perm_errors=skip_perm_errors,
108 io_loop=io_loop,
109 auto_reconnect=auto_reconnect,
110 )
111
112
113 class LocalClient:
114 """
115 The interface used by the :command:`salt` CLI tool on the Salt Master
116
117 ``LocalClient`` is used to send a command to Salt minions to execute
118 :ref:`execution modules <all-salt.modules>` and return the results to the
119 Salt Master.
120
121 Importing and using ``LocalClient`` must be done on the same machine as the
122 Salt Master and it must be done using the same user that the Salt Master is
123 running as. (Unless :conf_master:`external_auth` is configured and
124 authentication credentials are included in the execution).
125
126 .. note::
127 The LocalClient uses a Tornado IOLoop, this can create issues when
128 using the LocalClient inside an existing IOLoop. If creating the
129 LocalClient in partnership with another IOLoop either create the
130 IOLoop before creating the LocalClient, or when creating the IOLoop
131 use ioloop.current() which will return the ioloop created by
132 LocalClient.
133
134 .. code-block:: python
135
136 import salt.client
137
138 local = salt.client.LocalClient()
139 local.cmd('*', 'test.fib', [10])
140 """
141
142 def __init__(
143 self,
144 c_path=os.path.join(syspaths.CONFIG_DIR, "master"),
145 mopts=None,
146 skip_perm_errors=False,
147 io_loop=None,
148 keep_loop=False,
149 auto_reconnect=False,
150 ):
151 """
152 :param IOLoop io_loop: io_loop used for events.
153 Pass in an io_loop if you want asynchronous
154 operation for obtaining events. Eg use of
155 set_event_handler() API. Otherwise,
156 operation will be synchronous.
157 """
158 if mopts:
159 self.opts = mopts
160 else:
161 if os.path.isdir(c_path):
162 log.warning(
163 "%s expects a file path not a directory path(%s) to "
164 "its 'c_path' keyword argument",
165 self.__class__.__name__,
166 c_path,
167 )
168 self.opts = salt.config.client_config(c_path)
169 self.serial = salt.payload.Serial(self.opts)
170 self.salt_user = salt.utils.user.get_specific_user()
171 self.skip_perm_errors = skip_perm_errors
172 self.key = self.__read_master_key()
173 self.auto_reconnect = auto_reconnect
174 self.event = salt.utils.event.get_event(
175 "master",
176 self.opts["sock_dir"],
177 self.opts["transport"],
178 opts=self.opts,
179 listen=False,
180 io_loop=io_loop,
181 keep_loop=keep_loop,
182 )
183 self.utils = salt.loader.utils(self.opts)
184 self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
185 self.returners = salt.loader.returners(self.opts, self.functions)
186
187 def __read_master_key(self):
188 """
189 Read in the rotating master authentication key
190 """
191 key_user = self.salt_user
192 if key_user == "root":
193 if self.opts.get("user", "root") != "root":
194 key_user = self.opts.get("user", "root")
195 if key_user.startswith("sudo_"):
196 key_user = self.opts.get("user", "root")
197 if salt.utils.platform.is_windows():
198 # The username may contain '\' if it is in Windows
199 # 'DOMAIN\username' format. Fix this for the keyfile path.
200 key_user = key_user.replace("\\", "_")
201 keyfile = os.path.join(self.opts["cachedir"], ".{}_key".format(key_user))
202 try:
203 # Make sure all key parent directories are accessible
204 salt.utils.verify.check_path_traversal(
205 self.opts["cachedir"], key_user, self.skip_perm_errors
206 )
207 with salt.utils.files.fopen(keyfile, "r") as key:
208 return salt.utils.stringutils.to_unicode(key.read())
209 except (OSError, SaltClientError):
210 # Fall back to eauth
211 return ""
212
213 def _convert_range_to_list(self, tgt):
214 """
215 convert a seco.range range into a list target
216 """
217 range_ = seco.range.Range(self.opts["range_server"])
218 try:
219 return range_.expand(tgt)
220 except seco.range.RangeException as err:
221 print("Range server exception: {}".format(err))
222 return []
223
224 def _get_timeout(self, timeout):
225 """
226 Return the timeout to use
227 """
228 if timeout is None:
229 return self.opts["timeout"]
230 if isinstance(timeout, int):
231 return timeout
232 if isinstance(timeout, str):
233 try:
234 return int(timeout)
235 except ValueError:
236 return self.opts["timeout"]
237 # Looks like the timeout is invalid, use config
238 return self.opts["timeout"]
239
240 def gather_job_info(self, jid, tgt, tgt_type, listen=True, **kwargs):
241 """
242 Return the information about a given job
243 """
244 log.debug("Checking whether jid %s is still running", jid)
245 timeout = int(kwargs.get("gather_job_timeout", self.opts["gather_job_timeout"]))
246
247 pub_data = self.run_job(
248 tgt,
249 "saltutil.find_job",
250 arg=[jid],
251 tgt_type=tgt_type,
252 timeout=timeout,
253 listen=listen,
254 **kwargs
255 )
256
257 if "jid" in pub_data:
258 self.event.subscribe(pub_data["jid"])
259
260 return pub_data
261
262 def _check_pub_data(self, pub_data, listen=True):
263 """
264 Common checks on the pub_data data structure returned from running pub
265 """
266 if pub_data == "":
267 # Failed to authenticate, this could be a bunch of things
268 raise EauthAuthenticationError(
269 "Failed to authenticate! This is most likely because this "
270 "user is not permitted to execute commands, but there is a "
271 "small possibility that a disk error occurred (check "
272 "disk/inode usage)."
273 )
274
275 # Failed to connect to the master and send the pub
276 if "error" in pub_data:
277 print(pub_data["error"])
278 log.debug("_check_pub_data() error: %s", pub_data["error"])
279 return {}
280 elif "jid" not in pub_data:
281 return {}
282 if pub_data["jid"] == "0":
283 print("Failed to connect to the Master, " "is the Salt Master running?")
284 return {}
285
286 # If we order masters (via a syndic), don't short circuit if no minions
287 # are found
288 if not self.opts.get("order_masters"):
289 # Check for no minions
290 if not pub_data["minions"]:
291 print(
292 "No minions matched the target. "
293 "No command was sent, no jid was assigned."
294 )
295 return {}
296
297 # don't install event subscription listeners when the request is asynchronous
298 # and doesn't care. this is important as it will create event leaks otherwise
299 if not listen:
300 return pub_data
301
302 if self.opts.get("order_masters"):
303 self.event.subscribe("syndic/.*/{}".format(pub_data["jid"]), "regex")
304
305 self.event.subscribe("salt/job/{}".format(pub_data["jid"]))
306
307 return pub_data
308
309 def run_job(
310 self,
311 tgt,
312 fun,
313 arg=(),
314 tgt_type="glob",
315 ret="",
316 timeout=None,
317 jid="",
318 kwarg=None,
319 listen=False,
320 **kwargs
321 ):
322 """
323 Asynchronously send a command to connected minions
324
325 Prep the job directory and publish a command to any targeted minions.
326
327 :return: A dictionary of (validated) ``pub_data`` or an empty
328 dictionary on failure. The ``pub_data`` contains the job ID and a
329 list of all minions that are expected to return data.
330
331 .. code-block:: python
332
333 >>> local.run_job('*', 'test.sleep', [300])
334 {'jid': '20131219215650131543', 'minions': ['jerry']}
335 """
336 arg = salt.utils.args.condition_input(arg, kwarg)
337
338 try:
339 pub_data = self.pub(
340 tgt,
341 fun,
342 arg,
343 tgt_type,
344 ret,
345 jid=jid,
346 timeout=self._get_timeout(timeout),
347 listen=listen,
348 **kwargs
349 )
350 except SaltClientError:
351 # Re-raise error with specific message
352 raise SaltClientError(
353 "The salt master could not be contacted. Is master running?"
354 )
355 except AuthenticationError as err:
356 raise
357 except AuthorizationError as err:
358 raise
359 except Exception as general_exception: # pylint: disable=broad-except
360 # Convert to generic client error and pass along message
361 raise SaltClientError(general_exception)
362
363 return self._check_pub_data(pub_data, listen=listen)
364
365 def gather_minions(self, tgt, expr_form):
366 _res = salt.utils.minions.CkMinions(self.opts).check_minions(
367 tgt, tgt_type=expr_form
368 )
369 return _res["minions"]
370
371 @salt.ext.tornado.gen.coroutine
372 def run_job_async(
373 self,
374 tgt,
375 fun,
376 arg=(),
377 tgt_type="glob",
378 ret="",
379 timeout=None,
380 jid="",
381 kwarg=None,
382 listen=True,
383 io_loop=None,
384 **kwargs
385 ):
386 """
387 Asynchronously send a command to connected minions
388
389 Prep the job directory and publish a command to any targeted minions.
390
391 :return: A dictionary of (validated) ``pub_data`` or an empty
392 dictionary on failure. The ``pub_data`` contains the job ID and a
393 list of all minions that are expected to return data.
394
395 .. code-block:: python
396
397 >>> local.run_job_async('*', 'test.sleep', [300])
398 {'jid': '20131219215650131543', 'minions': ['jerry']}
399 """
400 arg = salt.utils.args.condition_input(arg, kwarg)
401
402 try:
403 pub_data = yield self.pub_async(
404 tgt,
405 fun,
406 arg,
407 tgt_type,
408 ret,
409 jid=jid,
410 timeout=self._get_timeout(timeout),
411 io_loop=io_loop,
412 listen=listen,
413 **kwargs
414 )
415 except SaltClientError:
416 # Re-raise error with specific message
417 raise SaltClientError(
418 "The salt master could not be contacted. Is master running?"
419 )
420 except AuthenticationError as err:
421 raise AuthenticationError(err)
422 except AuthorizationError as err:
423 raise AuthorizationError(err)
424 except Exception as general_exception: # pylint: disable=broad-except
425 # Convert to generic client error and pass along message
426 raise SaltClientError(general_exception)
427
428 raise salt.ext.tornado.gen.Return(self._check_pub_data(pub_data, listen=listen))
429
430 def cmd_async(
431 self, tgt, fun, arg=(), tgt_type="glob", ret="", jid="", kwarg=None, **kwargs
432 ):
433 """
434 Asynchronously send a command to connected minions
435
436 The function signature is the same as :py:meth:`cmd` with the
437 following exceptions.
438
439 :returns: A job ID or 0 on failure.
440
441 .. code-block:: python
442
443 >>> local.cmd_async('*', 'test.sleep', [300])
444 '20131219215921857715'
445 """
446 pub_data = self.run_job(
447 tgt, fun, arg, tgt_type, ret, jid=jid, kwarg=kwarg, listen=False, **kwargs
448 )
449 try:
450 return pub_data["jid"]
451 except KeyError:
452 return 0
453
454 def cmd_subset(
455 self,
456 tgt,
457 fun,
458 arg=(),
459 tgt_type="glob",
460 ret="",
461 kwarg=None,
462 subset=3,
463 cli=False,
464 progress=False,
465 full_return=False,
466 **kwargs
467 ):
468 """
469 Execute a command on a random subset of the targeted systems
470
471 The function signature is the same as :py:meth:`cmd` with the
472 following exceptions.
473
474 :param subset: The number of systems to execute on
475 :param cli: When this is set to True, a generator is returned,
476 otherwise a dictionary of the minion returns is returned
477
478 .. code-block:: python
479
480 >>> SLC.cmd_subset('*', 'test.ping', subset=1)
481 {'jerry': True}
482 """
483 minion_ret = self.cmd(tgt, "sys.list_functions", tgt_type=tgt_type, **kwargs)
484 minions = list(minion_ret)
485 random.shuffle(minions)
486 f_tgt = []
487 for minion in minions:
488 if fun in minion_ret[minion]:
489 f_tgt.append(minion)
490 if len(f_tgt) >= subset:
491 break
492 func = self.cmd
493 if cli:
494 func = self.cmd_cli
495 return func(
496 f_tgt,
497 fun,
498 arg,
499 tgt_type="list",
500 ret=ret,
501 kwarg=kwarg,
502 progress=progress,
503 full_return=full_return,
504 **kwargs
505 )
506
507 def cmd_batch(
508 self,
509 tgt,
510 fun,
511 arg=(),
512 tgt_type="glob",
513 ret="",
514 kwarg=None,
515 batch="10%",
516 **kwargs
517 ):
518 """
519 Iteratively execute a command on subsets of minions at a time
520
521 The function signature is the same as :py:meth:`cmd` with the
522 following exceptions.
523
524 :param batch: The batch identifier of systems to execute on
525
526 :returns: A generator of minion returns
527
528 .. code-block:: python
529
530 >>> returns = local.cmd_batch('*', 'state.highstate', batch='10%')
531 >>> for ret in returns:
532 ... print(ret)
533 {'jerry': {...}}
534 {'dave': {...}}
535 {'stewart': {...}}
536 """
537 # We need to re-import salt.utils.args here
538 # even though it has already been imported.
539 # when cmd_batch is called via the NetAPI
540 # the module is unavailable.
541 import salt.utils.args
542
543 # Late import - not used anywhere else in this file
544 import salt.cli.batch
545
546 arg = salt.utils.args.condition_input(arg, kwarg)
547 opts = {
548 "tgt": tgt,
549 "fun": fun,
550 "arg": arg,
551 "tgt_type": tgt_type,
552 "ret": ret,
553 "batch": batch,
554 "failhard": kwargs.get("failhard", self.opts.get("failhard", False)),
555 "raw": kwargs.get("raw", False),
556 }
557
558 if "timeout" in kwargs:
559 opts["timeout"] = kwargs["timeout"]
560 if "gather_job_timeout" in kwargs:
561 opts["gather_job_timeout"] = kwargs["gather_job_timeout"]
562 if "batch_wait" in kwargs:
563 opts["batch_wait"] = int(kwargs["batch_wait"])
564
565 eauth = {}
566 if "eauth" in kwargs:
567 eauth["eauth"] = kwargs.pop("eauth")
568 if "username" in kwargs:
569 eauth["username"] = kwargs.pop("username")
570 if "password" in kwargs:
571 eauth["password"] = kwargs.pop("password")
572 if "token" in kwargs:
573 eauth["token"] = kwargs.pop("token")
574
575 for key, val in self.opts.items():
576 if key not in opts:
577 opts[key] = val
578 batch = salt.cli.batch.Batch(opts, eauth=eauth, quiet=True)
579 for ret in batch.run():
580 yield ret
581
582 def cmd(
583 self,
584 tgt,
585 fun,
586 arg=(),
587 timeout=None,
588 tgt_type="glob",
589 ret="",
590 jid="",
591 full_return=False,
592 kwarg=None,
593 **kwargs
594 ):
595 """
596 Synchronously execute a command on targeted minions
597
598 The cmd method will execute and wait for the timeout period for all
599 minions to reply, then it will return all minion data at once.
600
601 .. code-block:: python
602
603 >>> import salt.client
604 >>> local = salt.client.LocalClient()
605 >>> local.cmd('*', 'cmd.run', ['whoami'])
606 {'jerry': 'root'}
607
608 With extra keyword arguments for the command function to be run:
609
610 .. code-block:: python
611
612 local.cmd('*', 'test.arg', ['arg1', 'arg2'], kwarg={'foo': 'bar'})
613
614 Compound commands can be used for multiple executions in a single
615 publish. Function names and function arguments are provided in separate
616 lists but the index values must correlate and an empty list must be
617 used if no arguments are required.
618
619 .. code-block:: python
620
621 >>> local.cmd('*', [
622 'grains.items',
623 'sys.doc',
624 'cmd.run',
625 ],
626 [
627 [],
628 [],
629 ['uptime'],
630 ])
631
632 :param tgt: Which minions to target for the execution. Default is shell
633 glob. Modified by the ``tgt_type`` option.
634 :type tgt: string or list
635
636 :param fun: The module and function to call on the specified minions of
637 the form ``module.function``. For example ``test.ping`` or
638 ``grains.items``.
639
640 Compound commands
641 Multiple functions may be called in a single publish by
642 passing a list of commands. This can dramatically lower
643 overhead and speed up the application communicating with Salt.
644
645 This requires that the ``arg`` param is a list of lists. The
646 ``fun`` list and the ``arg`` list must correlate by index
647 meaning a function that does not take arguments must still have
648 a corresponding empty list at the expected index.
649 :type fun: string or list of strings
650
651 :param arg: A list of arguments to pass to the remote function. If the
652 function takes no arguments ``arg`` may be omitted except when
653 executing a compound command.
654 :type arg: list or list-of-lists
655
656 :param timeout: Seconds to wait after the last minion returns but
657 before all minions return.
658
659 :param tgt_type: The type of ``tgt``. Allowed values:
660
661 * ``glob`` - Bash glob completion - Default
662 * ``pcre`` - Perl style regular expression
663 * ``list`` - Python list of hosts
664 * ``grain`` - Match based on a grain comparison
665 * ``grain_pcre`` - Grain comparison with a regex
666 * ``pillar`` - Pillar data comparison
667 * ``pillar_pcre`` - Pillar data comparison with a regex
668 * ``nodegroup`` - Match on nodegroup
669 * ``range`` - Use a Range server for matching
670 * ``compound`` - Pass a compound match string
671 * ``ipcidr`` - Match based on Subnet (CIDR notation) or IPv4 address.
672
673 .. versionchanged:: 2017.7.0
674 Renamed from ``expr_form`` to ``tgt_type``
675
676 :param ret: The returner to use. The value passed can be single
677 returner, or a comma delimited list of returners to call in order
678 on the minions
679
680 :param kwarg: A dictionary with keyword arguments for the function.
681
682 :param full_return: Output the job return only (default) or the full
683 return including exit code and other job metadata.
684
685 :param kwargs: Optional keyword arguments.
686 Authentication credentials may be passed when using
687 :conf_master:`external_auth`.
688
689 For example: ``local.cmd('*', 'test.ping', username='saltdev',
690 password='saltdev', eauth='pam')``.
691 Or: ``local.cmd('*', 'test.ping',
692 token='5871821ea51754fdcea8153c1c745433')``
693
694 :returns: A dictionary with the result of the execution, keyed by
695 minion ID. A compound command will return a sub-dictionary keyed by
696 function name.
697 """
698 was_listening = self.event.cpub
699
700 try:
701 pub_data = self.run_job(
702 tgt,
703 fun,
704 arg,
705 tgt_type,
706 ret,
707 timeout,
708 jid,
709 kwarg=kwarg,
710 listen=True,
711 **kwargs
712 )
713
714 if not pub_data:
715 return pub_data
716
717 ret = {}
718 for fn_ret in self.get_cli_event_returns(
719 pub_data["jid"],
720 pub_data["minions"],
721 self._get_timeout(timeout),
722 tgt,
723 tgt_type,
724 **kwargs
725 ):
726
727 if fn_ret:
728 for mid, data in fn_ret.items():
729 ret[mid] = data if full_return else data.get("ret", {})
730
731 for failed in list(set(pub_data["minions"]) - set(ret)):
732 ret[failed] = False
733 return ret
734 finally:
735 if not was_listening:
736 self.event.close_pub()
737
738 def cmd_cli(
739 self,
740 tgt,
741 fun,
742 arg=(),
743 timeout=None,
744 tgt_type="glob",
745 ret="",
746 verbose=False,
747 kwarg=None,
748 progress=False,
749 **kwargs
750 ):
751 """
752 Used by the :command:`salt` CLI. This method returns minion returns as
753 they come back and attempts to block until all minions return.
754
755 The function signature is the same as :py:meth:`cmd` with the
756 following exceptions.
757
758 :param verbose: Print extra information about the running command
759 :returns: A generator
760 """
761 was_listening = self.event.cpub
762
763 try:
764 self.pub_data = self.run_job(
765 tgt,
766 fun,
767 arg,
768 tgt_type,
769 ret,
770 timeout,
771 kwarg=kwarg,
772 listen=True,
773 **kwargs
774 )
775
776 if not self.pub_data:
777 yield self.pub_data
778 else:
779 try:
780 for fn_ret in self.get_cli_event_returns(
781 self.pub_data["jid"],
782 self.pub_data["minions"],
783 self._get_timeout(timeout),
784 tgt,
785 tgt_type,
786 verbose,
787 progress,
788 **kwargs
789 ):
790
791 if not fn_ret:
792 continue
793
794 yield fn_ret
795 except KeyboardInterrupt:
796 raise SystemExit(
797 "\n"
798 "This job's jid is: {0}\n"
799 "Exiting gracefully on Ctrl-c\n"
800 "The minions may not have all finished running and any "
801 "remaining minions will return upon completion. To look "
802 "up the return data for this job later, run the following "
803 "command:\n\n"
804 "salt-run jobs.lookup_jid {0}".format(self.pub_data["jid"])
805 )
806 finally:
807 if not was_listening:
808 self.event.close_pub()
809
810 def cmd_iter(
811 self,
812 tgt,
813 fun,
814 arg=(),
815 timeout=None,
816 tgt_type="glob",
817 ret="",
818 kwarg=None,
819 **kwargs
820 ):
821 """
822 Yields the individual minion returns as they come in
823
824 The function signature is the same as :py:meth:`cmd` with the
825 following exceptions.
826
827 Normally :py:meth:`cmd_iter` does not yield results for minions that
828 are not connected. If you want it to return results for disconnected
829 minions set `expect_minions=True` in `kwargs`.
830
831 :return: A generator yielding the individual minion returns
832
833 .. code-block:: python
834
835 >>> ret = local.cmd_iter('*', 'test.ping')
836 >>> for i in ret:
837 ... print(i)
838 {'jerry': {'ret': True}}
839 {'dave': {'ret': True}}
840 {'stewart': {'ret': True}}
841 """
842 was_listening = self.event.cpub
843
844 try:
845 pub_data = self.run_job(
846 tgt,
847 fun,
848 arg,
849 tgt_type,
850 ret,
851 timeout,
852 kwarg=kwarg,
853 listen=True,
854 **kwargs
855 )
856
857 if not pub_data:
858 yield pub_data
859 else:
860 if kwargs.get("yield_pub_data"):
861 yield pub_data
862 for fn_ret in self.get_iter_returns(
863 pub_data["jid"],
864 pub_data["minions"],
865 timeout=self._get_timeout(timeout),
866 tgt=tgt,
867 tgt_type=tgt_type,
868 **kwargs
869 ):
870 if not fn_ret:
871 continue
872 yield fn_ret
873 self._clean_up_subscriptions(pub_data["jid"])
874 finally:
875 if not was_listening:
876 self.event.close_pub()
877
878 def cmd_iter_no_block(
879 self,
880 tgt,
881 fun,
882 arg=(),
883 timeout=None,
884 tgt_type="glob",
885 ret="",
886 kwarg=None,
887 show_jid=False,
888 verbose=False,
889 **kwargs
890 ):
891 """
892 Yields the individual minion returns as they come in, or None
893 when no returns are available.
894
895 The function signature is the same as :py:meth:`cmd` with the
896 following exceptions.
897
898 :returns: A generator yielding the individual minion returns, or None
899 when no returns are available. This allows for actions to be
900 injected in between minion returns.
901
902 .. code-block:: python
903
904 >>> ret = local.cmd_iter_no_block('*', 'test.ping')
905 >>> for i in ret:
906 ... print(i)
907 None
908 {'jerry': {'ret': True}}
909 {'dave': {'ret': True}}
910 None
911 {'stewart': {'ret': True}}
912 """
913 was_listening = self.event.cpub
914
915 try:
916 pub_data = self.run_job(
917 tgt,
918 fun,
919 arg,
920 tgt_type,
921 ret,
922 timeout,
923 kwarg=kwarg,
924 listen=True,
925 **kwargs
926 )
927
928 if not pub_data:
929 yield pub_data
930 else:
931 for fn_ret in self.get_iter_returns(
932 pub_data["jid"],
933 pub_data["minions"],
934 timeout=timeout,
935 tgt=tgt,
936 tgt_type=tgt_type,
937 block=False,
938 **kwargs
939 ):
940 if fn_ret and any([show_jid, verbose]):
941 for minion in fn_ret:
942 fn_ret[minion]["jid"] = pub_data["jid"]
943 yield fn_ret
944
945 self._clean_up_subscriptions(pub_data["jid"])
946 finally:
947 if not was_listening:
948 self.event.close_pub()
949
950 def cmd_full_return(
951 self,
952 tgt,
953 fun,
954 arg=(),
955 timeout=None,
956 tgt_type="glob",
957 ret="",
958 verbose=False,
959 kwarg=None,
960 **kwargs
961 ):
962 """
963 Execute a salt command and return
964 """
965 was_listening = self.event.cpub
966
967 try:
968 pub_data = self.run_job(
969 tgt,
970 fun,
971 arg,
972 tgt_type,
973 ret,
974 timeout,
975 kwarg=kwarg,
976 listen=True,
977 **kwargs
978 )
979
980 if not pub_data:
981 return pub_data
982
983 return self.get_cli_static_event_returns(
984 pub_data["jid"], pub_data["minions"], timeout, tgt, tgt_type, verbose
985 )
986 finally:
987 if not was_listening:
988 self.event.close_pub()
989
990 def get_cli_returns(
991 self,
992 jid,
993 minions,
994 timeout=None,
995 tgt="*",
996 tgt_type="glob",
997 verbose=False,
998 show_jid=False,
999 **kwargs
1000 ):
1001 """
1002 Starts a watcher looking at the return data for a specified JID
1003
1004 :returns: all of the information for the JID
1005 """
1006 if verbose:
1007 msg = "Executing job with jid {}".format(jid)
1008 print(msg)
1009 print("-" * len(msg) + "\n")
1010 elif show_jid:
1011 print("jid: {}".format(jid))
1012 if timeout is None:
1013 timeout = self.opts["timeout"]
1014 fret = {}
1015 # make sure the minions is a set (since we do set operations on it)
1016 minions = set(minions)
1017
1018 found = set()
1019 # start this before the cache lookup-- in case new stuff comes in
1020 event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout)
1021
1022 # get the info from the cache
1023 ret = self.get_cache_returns(jid)
1024 if ret != {}:
1025 found.update(set(ret))
1026 yield ret
1027
1028 # if you have all the returns, stop
1029 if len(found.intersection(minions)) >= len(minions):
1030 raise StopIteration()
1031
1032 # otherwise, get them from the event system
1033 for event in event_iter:
1034 if event != {}:
1035 found.update(set(event))
1036 yield event
1037 if len(found.intersection(minions)) >= len(minions):
1038 self._clean_up_subscriptions(jid)
1039 raise StopIteration()
1040
1041 # TODO: tests!!
1042 def get_returns_no_block(self, tag, match_type=None):
1043 """
1044 Raw function to just return events of jid excluding timeout logic
1045
1046 Yield either the raw event data or None
1047
1048 Pass a list of additional regular expressions as `tags_regex` to search
1049 the event bus for non-return data, such as minion lists returned from
1050 syndics.
1051 """
1052
1053 while True:
1054 raw = self.event.get_event(
1055 wait=0.01,
1056 tag=tag,
1057 match_type=match_type,
1058 full=True,
1059 no_block=True,
1060 auto_reconnect=self.auto_reconnect,
1061 )
1062 yield raw
1063
1064 def get_iter_returns(
1065 self,
1066 jid,
1067 minions,
1068 timeout=None,
1069 tgt="*",
1070 tgt_type="glob",
1071 expect_minions=False,
1072 block=True,
1073 **kwargs
1074 ):
1075 """
1076 Watch the event system and return job data as it comes in
1077
1078 :returns: all of the information for the JID
1079 """
1080 if not isinstance(minions, set):
1081 if isinstance(minions, str):
1082 minions = {minions}
1083 elif isinstance(minions, (list, tuple)):
1084 minions = set(list(minions))
1085
1086 if timeout is None:
1087 timeout = self.opts["timeout"]
1088 gather_job_timeout = int(
1089 kwargs.get("gather_job_timeout", self.opts["gather_job_timeout"])
1090 )
1091 start = int(time.time())
1092
1093 # timeouts per minion, id_ -> timeout time
1094 minion_timeouts = {}
1095
1096 found = set()
1097 missing = set()
1098 # Check to see if the jid is real, if not return the empty dict
1099 try:
1100 if (
1101 self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
1102 == {}
1103 ):
1104 log.warning("jid does not exist")
1105 yield {}
1106 # stop the iteration, since the jid is invalid
1107 raise StopIteration()
1108 except Exception as exc: # pylint: disable=broad-except
1109 log.warning(
1110 "Returner unavailable: %s", exc, exc_info_on_loglevel=logging.DEBUG
1111 )
1112 # Wait for the hosts to check in
1113 last_time = False
1114 # iterator for this job's return
1115 if self.opts["order_masters"]:
1116 # If we are a MoM, we need to gather expected minions from downstreams masters.
1117 ret_iter = self.get_returns_no_block(
1118 "(salt/job|syndic/.*)/{}".format(jid), "regex"
1119 )
1120 else:
1121 ret_iter = self.get_returns_no_block("salt/job/{}".format(jid))
1122 # iterator for the info of this job
1123 jinfo_iter = []
1124 # open event jids that need to be un-subscribed from later
1125 open_jids = set()
1126 timeout_at = time.time() + timeout
1127 gather_syndic_wait = time.time() + self.opts["syndic_wait"]
1128 # are there still minions running the job out there
1129 # start as True so that we ping at least once
1130 minions_running = True
1131 log.debug(
1132 "get_iter_returns for jid %s sent to %s will timeout at %s",
1133 jid,
1134 minions,
1135 datetime.fromtimestamp(timeout_at).time(),
1136 )
1137 while True:
1138 # Process events until timeout is reached or all minions have returned
1139 for raw in ret_iter:
1140 # if we got None, then there were no events
1141 if raw is None:
1142 break
1143 if "minions" in raw.get("data", {}):
1144 minions.update(raw["data"]["minions"])
1145 if "missing" in raw.get("data", {}):
1146 missing.update(raw["data"]["missing"])
1147 continue
1148 if "return" not in raw["data"]:
1149 continue
1150 if kwargs.get("raw", False):
1151 found.add(raw["data"]["id"])
1152 yield raw
1153 else:
1154 found.add(raw["data"]["id"])
1155 ret = {raw["data"]["id"]: {"ret": raw["data"]["return"]}}
1156 if "out" in raw["data"]:
1157 ret[raw["data"]["id"]]["out"] = raw["data"]["out"]
1158 if "retcode" in raw["data"]:
1159 ret[raw["data"]["id"]]["retcode"] = raw["data"]["retcode"]
1160 if "jid" in raw["data"]:
1161 ret[raw["data"]["id"]]["jid"] = raw["data"]["jid"]
1162 if kwargs.get("_cmd_meta", False):
1163 ret[raw["data"]["id"]].update(raw["data"])
1164 log.debug("jid %s return from %s", jid, raw["data"]["id"])
1165 yield ret
1166
1167 # if we have all of the returns (and we aren't a syndic), no need for anything fancy
1168 if (
1169 len(found.intersection(minions)) >= len(minions)
1170 and not self.opts["order_masters"]
1171 ):
1172 # All minions have returned, break out of the loop
1173 log.debug("jid %s found all minions %s", jid, found)
1174 break
1175 elif (
1176 len(found.intersection(minions)) >= len(minions)
1177 and self.opts["order_masters"]
1178 ):
1179 if (
1180 len(found) >= len(minions)
1181 and len(minions) > 0
1182 and time.time() > gather_syndic_wait
1183 ):
1184 # There were some minions to find and we found them
1185 # However, this does not imply that *all* masters have yet responded with expected minion lists.
1186 # Therefore, continue to wait up to the syndic_wait period (calculated in gather_syndic_wait) to see
1187 # if additional lower-level masters deliver their lists of expected
1188 # minions.
1189 break
1190 # If we get here we may not have gathered the minion list yet. Keep waiting
1191 # for all lower-level masters to respond with their minion lists
1192
1193 # let start the timeouts for all remaining minions
1194
1195 for id_ in minions - found:
1196 # if we have a new minion in the list, make sure it has a timeout
1197 if id_ not in minion_timeouts:
1198 minion_timeouts[id_] = time.time() + timeout
1199
1200 # if the jinfo has timed out and some minions are still running the job
1201 # re-do the ping
1202 if time.time() > timeout_at and minions_running:
1203 # since this is a new ping, no one has responded yet
1204 jinfo = self.gather_job_info(
1205 jid, list(minions - found), "list", **kwargs
1206 )
1207 minions_running = False
1208 # if we weren't assigned any jid that means the master thinks
1209 # we have nothing to send
1210 if "jid" not in jinfo:
1211 jinfo_iter = []
1212 else:
1213 jinfo_iter = self.get_returns_no_block(
1214 "salt/job/{}".format(jinfo["jid"])
1215 )
1216 timeout_at = time.time() + gather_job_timeout
1217 # if you are a syndic, wait a little longer
1218 if self.opts["order_masters"]:
1219 timeout_at += self.opts.get("syndic_wait", 1)
1220
1221 # check for minions that are running the job still
1222 for raw in jinfo_iter:
1223 # if there are no more events, lets stop waiting for the jinfo
1224 if raw is None:
1225 break
1226 try:
1227 if raw["data"]["retcode"] > 0:
1228 log.error(
1229 "saltutil returning errors on minion %s", raw["data"]["id"]
1230 )
1231 minions.remove(raw["data"]["id"])
1232 break
1233 except KeyError as exc:
1234 # This is a safe pass. We're just using the try/except to
1235 # avoid having to deep-check for keys.
1236 missing_key = exc.__str__().strip("'\"")
1237 if missing_key == "retcode":
1238 log.debug("retcode missing from client return")
1239 else:
1240 log.debug(
1241 "Passing on saltutil error. Key '%s' missing "
1242 "from client return. This may be an error in "
1243 "the client.",
1244 missing_key,
1245 )
1246 # Keep track of the jid events to unsubscribe from later
1247 open_jids.add(jinfo["jid"])
1248
1249 # TODO: move to a library??
1250 if "minions" in raw.get("data", {}):
1251 minions.update(raw["data"]["minions"])
1252 continue
1253 if "syndic" in raw.get("data", {}):
1254 minions.update(raw["syndic"])
1255 continue
1256 if "return" not in raw.get("data", {}):
1257 continue
1258
1259 # if the job isn't running there anymore... don't count
1260 if raw["data"]["return"] == {}:
1261 continue
1262
1263 # if the minion throws an exception containing the word "return"
1264 # the master will try to handle the string as a dict in the next
1265 # step. Check if we have a string, log the issue and continue.
1266 if isinstance(raw["data"]["return"], str):
1267 log.error("unexpected return from minion: %s", raw)
1268 continue
1269
1270 if (
1271 "return" in raw["data"]["return"]
1272 and raw["data"]["return"]["return"] == {}
1273 ):
1274 continue
1275
1276 # if we didn't originally target the minion, lets add it to the list
1277 if raw["data"]["id"] not in minions:
1278 minions.add(raw["data"]["id"])
1279 # update this minion's timeout, as long as the job is still running
1280 minion_timeouts[raw["data"]["id"]] = time.time() + timeout
1281 # a minion returned, so we know its running somewhere
1282 minions_running = True
1283
1284 # if we have hit gather_job_timeout (after firing the job) AND
1285 # if we have hit all minion timeouts, lets call it
1286 now = time.time()
1287 # if we have finished waiting, and no minions are running the job
1288 # then we need to see if each minion has timedout
1289 done = (now > timeout_at) and not minions_running
1290 if done:
1291 # if all minions have timeod out
1292 for id_ in minions - found:
1293 if now < minion_timeouts[id_]:
1294 done = False
1295 break
1296 if done:
1297 break
1298
1299 # don't spin
1300 if block:
1301 time.sleep(0.01)
1302 else:
1303 yield
1304
1305 # If there are any remaining open events, clean them up.
1306 if open_jids:
1307 for jid in open_jids:
1308 self.event.unsubscribe(jid)
1309
1310 if expect_minions:
1311 for minion in list(minions - found):
1312 yield {minion: {"failed": True}}
1313
1314 # Filter out any minions marked as missing for which we received
1315 # returns (prevents false events sent due to higher-level masters not
1316 # knowing about lower-level minions).
1317 missing -= found
1318
1319 # Report on missing minions
1320 if missing:
1321 for minion in missing:
1322 yield {minion: {"failed": True}}
1323
1324 def get_returns(self, jid, minions, timeout=None):
1325 """
1326 Get the returns for the command line interface via the event system
1327 """
1328 minions = set(minions)
1329 if timeout is None:
1330 timeout = self.opts["timeout"]
1331 start = int(time.time())
1332 timeout_at = start + timeout
1333 log.debug(
1334 "get_returns for jid %s sent to %s will timeout at %s",
1335 jid,
1336 minions,
1337 datetime.fromtimestamp(timeout_at).time(),
1338 )
1339
1340 found = set()
1341 ret = {}
1342 # Check to see if the jid is real, if not return the empty dict
1343 try:
1344 if (
1345 self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
1346 == {}
1347 ):
1348 log.warning("jid does not exist")
1349 return ret
1350 except Exception as exc: # pylint: disable=broad-except
1351 raise SaltClientError(
1352 "Master job cache returner [{}] failed to verify jid. "
1353 "Exception details: {}".format(self.opts["master_job_cache"], exc)
1354 )
1355
1356 # Wait for the hosts to check in
1357 while True:
1358 time_left = timeout_at - int(time.time())
1359 wait = max(1, time_left)
1360 raw = self.event.get_event(wait, jid, auto_reconnect=self.auto_reconnect)
1361 if raw is not None and "return" in raw:
1362 found.add(raw["id"])
1363 ret[raw["id"]] = raw["return"]
1364 if len(found.intersection(minions)) >= len(minions):
1365 # All minions have returned, break out of the loop
1366 log.debug("jid %s found all minions", jid)
1367 break
1368 continue
1369 # Then event system timeout was reached and nothing was returned
1370 if len(found.intersection(minions)) >= len(minions):
1371 # All minions have returned, break out of the loop
1372 log.debug("jid %s found all minions", jid)
1373 break
1374 if int(time.time()) > timeout_at:
1375 log.info(
1376 "jid %s minions %s did not return in time", jid, (minions - found)
1377 )
1378 break
1379 time.sleep(0.01)
1380 return ret
1381
1382 def get_full_returns(self, jid, minions, timeout=None):
1383 """
1384 This method starts off a watcher looking at the return data for
1385 a specified jid, it returns all of the information for the jid
1386 """
1387 # TODO: change this from ret to return... or the other way.
1388 # Its inconsistent, we should pick one
1389
1390 ret = {}
1391 # create the iterator-- since we want to get anyone in the middle
1392 event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout)
1393
1394 try:
1395 data = self.returners["{}.get_jid".format(self.opts["master_job_cache"])](
1396 jid
1397 )
1398 except Exception as exc: # pylint: disable=broad-except
1399 raise SaltClientError(
1400 "Returner {} could not fetch jid data. "
1401 "Exception details: {}".format(self.opts["master_job_cache"], exc)
1402 )
1403 for minion in data:
1404 m_data = {}
1405 if "return" in data[minion]:
1406 m_data["ret"] = data[minion].get("return")
1407 else:
1408 m_data["ret"] = data[minion].get("return")
1409 if "out" in data[minion]:
1410 m_data["out"] = data[minion]["out"]
1411 if minion in ret:
1412 ret[minion].update(m_data)
1413 else:
1414 ret[minion] = m_data
1415
1416 # if we have all the minion returns, lets just return
1417 if len(set(ret).intersection(minions)) >= len(minions):
1418 return ret
1419
1420 # otherwise lets use the listener we created above to get the rest
1421 for event_ret in event_iter:
1422 # if nothing in the event_ret, skip
1423 if event_ret == {}:
1424 time.sleep(0.02)
1425 continue
1426 for minion, m_data in event_ret.items():
1427 if minion in ret:
1428 ret[minion].update(m_data)
1429 else:
1430 ret[minion] = m_data
1431
1432 # are we done yet?
1433 if len(set(ret).intersection(minions)) >= len(minions):
1434 return ret
1435
1436 # otherwise we hit the timeout, return what we have
1437 return ret
1438
1439 def get_cache_returns(self, jid):
1440 """
1441 Execute a single pass to gather the contents of the job cache
1442 """
1443 ret = {}
1444
1445 try:
1446 data = self.returners["{}.get_jid".format(self.opts["master_job_cache"])](
1447 jid
1448 )
1449 except Exception as exc: # pylint: disable=broad-except
1450 raise SaltClientError(
1451 "Could not examine master job cache. "
1452 "Error occurred in {} returner. "
1453 "Exception details: {}".format(self.opts["master_job_cache"], exc)
1454 )
1455 for minion in data:
1456 m_data = {}
1457 if "return" in data[minion]:
1458 m_data["ret"] = data[minion].get("return")
1459 else:
1460 m_data["ret"] = data[minion].get("return")
1461 if "out" in data[minion]:
1462 m_data["out"] = data[minion]["out"]
1463 if minion in ret:
1464 ret[minion].update(m_data)
1465 else:
1466 ret[minion] = m_data
1467
1468 return ret
1469
1470 def get_cli_static_event_returns(
1471 self,
1472 jid,
1473 minions,
1474 timeout=None,
1475 tgt="*",
1476 tgt_type="glob",
1477 verbose=False,
1478 show_timeout=False,
1479 show_jid=False,
1480 ):
1481 """
1482 Get the returns for the command line interface via the event system
1483 """
1484 log.trace("entered - function get_cli_static_event_returns()")
1485 minions = set(minions)
1486 if verbose:
1487 msg = "Executing job with jid {}".format(jid)
1488 print(msg)
1489 print("-" * len(msg) + "\n")
1490 elif show_jid:
1491 print("jid: {}".format(jid))
1492
1493 if timeout is None:
1494 timeout = self.opts["timeout"]
1495
1496 start = int(time.time())
1497 timeout_at = start + timeout
1498 found = set()
1499 ret = {}
1500 # Check to see if the jid is real, if not return the empty dict
1501 try:
1502 if (
1503 self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
1504 == {}
1505 ):
1506 log.warning("jid does not exist")
1507 return ret
1508 except Exception as exc: # pylint: disable=broad-except
1509 raise SaltClientError(
1510 "Load could not be retrieved from "
1511 "returner {}. Exception details: {}".format(
1512 self.opts["master_job_cache"], exc
1513 )
1514 )
1515 # Wait for the hosts to check in
1516 while True:
1517 # Process events until timeout is reached or all minions have returned
1518 time_left = timeout_at - int(time.time())
1519 # Wait 0 == forever, use a minimum of 1s
1520 wait = max(1, time_left)
1521 jid_tag = "salt/job/{}".format(jid)
1522 raw = self.event.get_event(
1523 wait, jid_tag, auto_reconnect=self.auto_reconnect
1524 )
1525 if raw is not None and "return" in raw:
1526 if "minions" in raw.get("data", {}):
1527 minions.update(raw["data"]["minions"])
1528 continue
1529 found.add(raw["id"])
1530 ret[raw["id"]] = {"ret": raw["return"]}
1531 ret[raw["id"]]["success"] = raw.get("success", False)
1532 if "out" in raw:
1533 ret[raw["id"]]["out"] = raw["out"]
1534 if len(found.intersection(minions)) >= len(minions):
1535 # All minions have returned, break out of the loop
1536 break
1537 continue
1538 # Then event system timeout was reached and nothing was returned
1539 if len(found.intersection(minions)) >= len(minions):
1540 # All minions have returned, break out of the loop
1541 break
1542 if int(time.time()) > timeout_at:
1543 if verbose or show_timeout:
1544 if self.opts.get("minion_data_cache", False) or tgt_type in (
1545 "glob",
1546 "pcre",
1547 "list",
1548 ):
1549 if len(found) < len(minions):
1550 fail = sorted(list(minions.difference(found)))
1551 for minion in fail:
1552 ret[minion] = {
1553 "out": "no_return",
1554 "ret": "Minion did not return",
1555 }
1556 break
1557 time.sleep(0.01)
1558
1559 self._clean_up_subscriptions(jid)
1560 return ret
1561
1562 def get_cli_event_returns(
1563 self,
1564 jid,
1565 minions,
1566 timeout=None,
1567 tgt="*",
1568 tgt_type="glob",
1569 verbose=False,
1570 progress=False,
1571 show_timeout=False,
1572 show_jid=False,
1573 **kwargs
1574 ):
1575 """
1576 Get the returns for the command line interface via the event system
1577 """
1578 log.trace("func get_cli_event_returns()")
1579
1580 if verbose:
1581 msg = "Executing job with jid {}".format(jid)
1582 print(msg)
1583 print("-" * len(msg) + "\n")
1584 elif show_jid:
1585 print("jid: {}".format(jid))
1586
1587 # lazy load the connected minions
1588 connected_minions = None
1589 return_count = 0
1590
1591 for ret in self.get_iter_returns(
1592 jid,
1593 minions,
1594 timeout=timeout,
1595 tgt=tgt,
1596 tgt_type=tgt_type,
1597 # (gtmanfred) expect_minions is popped here incase it is passed from a client
1598 # call. If this is not popped, then it would be passed twice to
1599 # get_iter_returns.
1600 expect_minions=(
1601 kwargs.pop("expect_minions", False) or verbose or show_timeout
1602 ),
1603 **kwargs
1604 ):
1605 log.debug("return event: %s", ret)
1606 return_count = return_count + 1
1607 if progress:
1608 for id_, min_ret in ret.items():
1609 if not min_ret.get("failed") is True:
1610 yield {
1611 "minion_count": len(minions),
1612 "return_count": return_count,
1613 }
1614 # replace the return structure for missing minions
1615 for id_, min_ret in ret.items():
1616 if min_ret.get("failed") is True:
1617 if connected_minions is None:
1618 connected_minions = salt.utils.minions.CkMinions(
1619 self.opts
1620 ).connected_ids()
1621 if (
1622 self.opts["minion_data_cache"]
1623 and salt.cache.factory(self.opts).contains(
1624 "minions/{}".format(id_), "data"
1625 )
1626 and connected_minions
1627 and id_ not in connected_minions
1628 ):
1629
1630 yield {
1631 id_: {
1632 "out": "no_return",
1633 "ret": "Minion did not return. [Not connected]",
1634 "retcode": salt.defaults.exitcodes.EX_GENERIC,
1635 }
1636 }
1637 else:
1638 # don't report syndics as unresponsive minions
1639 if not os.path.exists(
1640 os.path.join(self.opts["syndic_dir"], id_)
1641 ):
1642 yield {
1643 id_: {
1644 "out": "no_return",
1645 "ret": "Minion did not return. [No response]"
1646 "\nThe minions may not have all finished running and any "
1647 "remaining minions will return upon completion. To look "
1648 "up the return data for this job later, run the following "
1649 "command:\n\n"
1650 "salt-run jobs.lookup_jid {}".format(jid),
1651 "retcode": salt.defaults.exitcodes.EX_GENERIC,
1652 }
1653 }
1654 else:
1655 yield {id_: min_ret}
1656
1657 self._clean_up_subscriptions(jid)
1658
1659 def get_event_iter_returns(self, jid, minions, timeout=None):
1660 """
1661 Gather the return data from the event system, break hard when timeout
1662 is reached.
1663 """
1664 log.trace("entered - function get_event_iter_returns()")
1665 if timeout is None:
1666 timeout = self.opts["timeout"]
1667
1668 timeout_at = time.time() + timeout
1669
1670 found = set()
1671 # Check to see if the jid is real, if not return the empty dict
1672 if (
1673 self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
1674 == {}
1675 ):
1676 log.warning("jid does not exist")
1677 yield {}
1678 # stop the iteration, since the jid is invalid
1679 raise StopIteration()
1680 # Wait for the hosts to check in
1681 while True:
1682 raw = self.event.get_event(timeout, auto_reconnect=self.auto_reconnect)
1683 if raw is None or time.time() > timeout_at:
1684 # Timeout reached
1685 break
1686 if "minions" in raw.get("data", {}):
1687 continue
1688 try:
1689 # There might be two jobs for the same minion, so we have to check for the jid
1690 if jid == raw["jid"]:
1691 found.add(raw["id"])
1692 ret = {raw["id"]: {"ret": raw["return"]}}
1693 else:
1694 continue
1695 except KeyError:
1696 # Ignore other erroneous messages
1697 continue
1698 if "out" in raw:
1699 ret[raw["id"]]["out"] = raw["out"]
1700 yield ret
1701 time.sleep(0.02)
1702
1703 def _resolve_nodegroup(self, ng):
1704 """
1705 Resolve a nodegroup into its configured components
1706 """
1707 if ng not in self.opts["nodegroups"]:
1708 conf_file = self.opts.get("conf_file", "the master config file")
1709 raise SaltInvocationError(
1710 "Node group {} unavailable in {}".format(ng, conf_file)
1711 )
1712 return salt.utils.minions.nodegroup_comp(ng, self.opts["nodegroups"])
1713
1714 def _prep_pub(self, tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs):
1715 """
1716 Set up the payload_kwargs to be sent down to the master
1717 """
1718 if tgt_type == "nodegroup":
1719 tgt = self._resolve_nodegroup(tgt)
1720 tgt_type = "compound"
1721
1722 if tgt_type == "compound":
1723 # Resolve all nodegroups, so that the minions don't have to.
1724 new_tgt = list()
1725 log.debug("compound resolution: original tgt: %s", tgt)
1726
1727 if isinstance(tgt, str):
1728 tgt = tgt.split()
1729
1730 for word in tgt:
1731 if word.startswith("N@") and len(word) > 2:
1732 resolved = self._resolve_nodegroup(word[2:])
1733 new_tgt.extend(resolved)
1734 else:
1735 new_tgt.append(word)
1736
1737 log.debug("compound resolution: new_tgt: %s", new_tgt)
1738 tgt = " ".join(new_tgt)
1739
1740 # Convert a range expression to a list of nodes and change expression
1741 # form to list
1742 if tgt_type == "range" and HAS_RANGE:
1743 tgt = self._convert_range_to_list(tgt)
1744 tgt_type = "list"
1745
1746 # If an external job cache is specified add it to the ret list
1747 if self.opts.get("ext_job_cache"):
1748 if ret:
1749 ret += ",{}".format(self.opts["ext_job_cache"])
1750 else:
1751 ret = self.opts["ext_job_cache"]
1752
1753 # format the payload - make a function that does this in the payload
1754 # module
1755
1756 # Generate the standard keyword args to feed to format_payload
1757 payload_kwargs = {
1758 "cmd": "publish",
1759 "tgt": tgt,
1760 "fun": fun,
1761 "arg": arg,
1762 "key": self.key,
1763 "tgt_type": tgt_type,
1764 "ret": ret,
1765 "jid": jid,
1766 }
1767
1768 # if kwargs are passed, pack them.
1769 if kwargs:
1770 payload_kwargs["kwargs"] = kwargs
1771
1772 # If we have a salt user, add it to the payload
1773 if self.opts["syndic_master"] and "user" in kwargs:
1774 payload_kwargs["user"] = kwargs["user"]
1775 elif self.salt_user:
1776 payload_kwargs["user"] = self.salt_user
1777
1778 # If we're a syndication master, pass the timeout
1779 if self.opts["order_masters"]:
1780 payload_kwargs["to"] = timeout
1781
1782 return payload_kwargs
1783
1784 def pub(
1785 self,
1786 tgt,
1787 fun,
1788 arg=(),
1789 tgt_type="glob",
1790 ret="",
1791 jid="",
1792 timeout=5,
1793 listen=False,
1794 **kwargs
1795 ):
1796 """
1797 Take the required arguments and publish the given command.
1798 Arguments:
1799 tgt:
1800 The tgt is a regex or a glob used to match up the ids on
1801 the minions. Salt works by always publishing every command
1802 to all of the minions and then the minions determine if
1803 the command is for them based on the tgt value.
1804 fun:
1805 The function name to be called on the remote host(s), this
1806 must be a string in the format "<modulename>.<function name>"
1807 arg:
1808 The arg option needs to be a tuple of arguments to pass
1809 to the calling function, if left blank
1810 Returns:
1811 jid:
1812 A string, as returned by the publisher, which is the job
1813 id, this will inform the client where to get the job results
1814 minions:
1815 A set, the targets that the tgt passed should match.
1816 """
1817 # Make sure the publisher is running by checking the unix socket
1818 if self.opts.get("ipc_mode", "") != "tcp" and not os.path.exists(
1819 os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
1820 ):
1821 log.error(
1822 "Unable to connect to the salt master publisher at %s",
1823 self.opts["sock_dir"],
1824 )
1825 raise SaltClientError
1826
1827 payload_kwargs = self._prep_pub(
1828 tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs
1829 )
1830
1831 master_uri = "tcp://{}:{}".format(
1832 salt.utils.zeromq.ip_bracket(self.opts["interface"]),
1833 str(self.opts["ret_port"]),
1834 )
1835
1836 with salt.transport.client.ReqChannel.factory(
1837 self.opts, crypt="clear", master_uri=master_uri
1838 ) as channel:
1839 try:
1840 # Ensure that the event subscriber is connected.
1841 # If not, we won't get a response, so error out
1842 if listen and not self.event.connect_pub(timeout=timeout):
1843 raise SaltReqTimeoutError()
1844 payload = channel.send(payload_kwargs, timeout=timeout)
1845 except SaltReqTimeoutError as err:
1846 log.error(err)
1847 raise SaltReqTimeoutError(
1848 "Salt request timed out. The master is not responding. You "
1849 "may need to run your command with `--async` in order to "
1850 "bypass the congested event bus. With `--async`, the CLI tool "
1851 "will print the job id (jid) and exit immediately without "
1852 "listening for responses. You can then use "
1853 "`salt-run jobs.lookup_jid` to look up the results of the job "
1854 "in the job cache later."
1855 )
1856
1857 if not payload:
1858 # The master key could have changed out from under us! Regen
1859 # and try again if the key has changed
1860 key = self.__read_master_key()
1861 if key == self.key:
1862 return payload
1863 self.key = key
1864 payload_kwargs["key"] = self.key
1865 payload = channel.send(payload_kwargs)
1866
1867 error = payload.pop("error", None)
1868 if error is not None:
1869 if isinstance(error, dict):
1870 err_name = error.get("name", "")
1871 err_msg = error.get("message", "")
1872 if err_name == "AuthenticationError":
1873 raise AuthenticationError(err_msg)
1874 elif err_name == "AuthorizationError":
1875 raise AuthorizationError(err_msg)
1876
1877 raise PublishError(error)
1878
1879 if not payload:
1880 return payload
1881
1882 return {"jid": payload["load"]["jid"], "minions": payload["load"]["minions"]}
1883
1884 @salt.ext.tornado.gen.coroutine
1885 def pub_async(
1886 self,
1887 tgt,
1888 fun,
1889 arg=(),
1890 tgt_type="glob",
1891 ret="",
1892 jid="",
1893 timeout=5,
1894 io_loop=None,
1895 listen=True,
1896 **kwargs
1897 ):
1898 """
1899 Take the required arguments and publish the given command.
1900 Arguments:
1901 tgt:
1902 The tgt is a regex or a glob used to match up the ids on
1903 the minions. Salt works by always publishing every command
1904 to all of the minions and then the minions determine if
1905 the command is for them based on the tgt value.
1906 fun:
1907 The function name to be called on the remote host(s), this
1908 must be a string in the format "<modulename>.<function name>"
1909 arg:
1910 The arg option needs to be a tuple of arguments to pass
1911 to the calling function, if left blank
1912 Returns:
1913 jid:
1914 A string, as returned by the publisher, which is the job
1915 id, this will inform the client where to get the job results
1916 minions:
1917 A set, the targets that the tgt passed should match.
1918 """
1919 # Make sure the publisher is running by checking the unix socket
1920 if self.opts.get("ipc_mode", "") != "tcp" and not os.path.exists(
1921 os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
1922 ):
1923 log.error(
1924 "Unable to connect to the salt master publisher at %s",
1925 self.opts["sock_dir"],
1926 )
1927 raise SaltClientError
1928
1929 payload_kwargs = self._prep_pub(
1930 tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs
1931 )
1932
1933 master_uri = (
1934 "tcp://"
1935 + salt.utils.zeromq.ip_bracket(self.opts["interface"])
1936 + ":"
1937 + str(self.opts["ret_port"])
1938 )
1939
1940 with salt.transport.client.AsyncReqChannel.factory(
1941 self.opts, io_loop=io_loop, crypt="clear", master_uri=master_uri
1942 ) as channel:
1943 try:
1944 # Ensure that the event subscriber is connected.
1945 # If not, we won't get a response, so error out
1946 if listen and not self.event.connect_pub(timeout=timeout):
1947 raise SaltReqTimeoutError()
1948 payload = yield channel.send(payload_kwargs, timeout=timeout)
1949 except SaltReqTimeoutError:
1950 raise SaltReqTimeoutError(
1951 "Salt request timed out. The master is not responding. You "
1952 "may need to run your command with `--async` in order to "
1953 "bypass the congested event bus. With `--async`, the CLI tool "
1954 "will print the job id (jid) and exit immediately without "
1955 "listening for responses. You can then use "
1956 "`salt-run jobs.lookup_jid` to look up the results of the job "
1957 "in the job cache later."
1958 )
1959
1960 if not payload:
1961 # The master key could have changed out from under us! Regen
1962 # and try again if the key has changed
1963 key = self.__read_master_key()
1964 if key == self.key:
1965 raise salt.ext.tornado.gen.Return(payload)
1966 self.key = key
1967 payload_kwargs["key"] = self.key
1968 payload = yield channel.send(payload_kwargs)
1969
1970 error = payload.pop("error", None)
1971 if error is not None:
1972 if isinstance(error, dict):
1973 err_name = error.get("name", "")
1974 err_msg = error.get("message", "")
1975 if err_name == "AuthenticationError":
1976 raise AuthenticationError(err_msg)
1977 elif err_name == "AuthorizationError":
1978 raise AuthorizationError(err_msg)
1979
1980 raise PublishError(error)
1981
1982 if not payload:
1983 raise salt.ext.tornado.gen.Return(payload)
1984
1985 raise salt.ext.tornado.gen.Return(
1986 {"jid": payload["load"]["jid"], "minions": payload["load"]["minions"]}
1987 )
1988
1989 # pylint: disable=W1701
1990 def __del__(self):
1991 # This IS really necessary!
1992 # When running tests, if self.events is not destroyed, we leak 2
1993 # threads per test case which uses self.client
1994 if hasattr(self, "event"):
1995 # The call below will take care of calling 'self.event.destroy()'
1996 del self.event
1997
1998 # pylint: enable=W1701
1999
2000 def _clean_up_subscriptions(self, job_id):
2001 if self.opts.get("order_masters"):
2002 self.event.unsubscribe("syndic/.*/{}".format(job_id), "regex")
2003 self.event.unsubscribe("salt/job/{}".format(job_id))
2004
2005
2006 class FunctionWrapper(dict):
2007 """
2008 Create a function wrapper that looks like the functions dict on the minion
2009 but invoked commands on the minion via a LocalClient.
2010
2011 This allows SLS files to be loaded with an object that calls down to the
2012 minion when the salt functions dict is referenced.
2013 """
2014
2015 def __init__(self, opts, minion):
2016 super().__init__()
2017 self.opts = opts
2018 self.minion = minion
2019 self.local = LocalClient(self.opts["conf_file"])
2020 self.functions = self.__load_functions()
2021
2022 def __missing__(self, key):
2023 """
2024 Since the function key is missing, wrap this call to a command to the
2025 minion of said key if it is available in the self.functions set
2026 """
2027 if key not in self.functions:
2028 raise KeyError
2029 return self.run_key(key)
2030
2031 def __load_functions(self):
2032 """
2033 Find out what functions are available on the minion
2034 """
2035 return set(
2036 self.local.cmd(self.minion, "sys.list_functions").get(self.minion, [])
2037 )
2038
2039 def run_key(self, key):
2040 """
2041 Return a function that executes the arguments passed via the local
2042 client
2043 """
2044
2045 def func(*args, **kwargs):
2046 """
2047 Run a remote call
2048 """
2049 args = list(args)
2050 for _key, _val in kwargs.items():
2051 args.append("{}={}".format(_key, _val))
2052 return self.local.cmd(self.minion, key, args)
2053
2054 return func
2055
2056
2057 class Caller:
2058 """
2059 ``Caller`` is the same interface used by the :command:`salt-call`
2060 command-line tool on the Salt Minion.
2061
2062 .. versionchanged:: 2015.8.0
2063 Added the ``cmd`` method for consistency with the other Salt clients.
2064 The existing ``function`` and ``sminion.functions`` interfaces still
2065 exist but have been removed from the docs.
2066
2067 Importing and using ``Caller`` must be done on the same machine as a
2068 Salt Minion and it must be done using the same user that the Salt Minion is
2069 running as.
2070
2071 Usage:
2072
2073 .. code-block:: python
2074
2075 import salt.client
2076 caller = salt.client.Caller()
2077 caller.cmd('test.ping')
2078
2079 Note, a running master or minion daemon is not required to use this class.
2080 Running ``salt-call --local`` simply sets :conf_minion:`file_client` to
2081 ``'local'``. The same can be achieved at the Python level by including that
2082 setting in a minion config file.
2083
2084 .. versionadded:: 2014.7.0
2085 Pass the minion config as the ``mopts`` dictionary.
2086
2087 .. code-block:: python
2088
2089 import salt.client
2090 import salt.config
2091 __opts__ = salt.config.minion_config('/etc/salt/minion')
2092 __opts__['file_client'] = 'local'
2093 caller = salt.client.Caller(mopts=__opts__)
2094 """
2095
2096 def __init__(self, c_path=os.path.join(syspaths.CONFIG_DIR, "minion"), mopts=None):
2097 # Late-import of the minion module to keep the CLI as light as possible
2098 import salt.minion
2099
2100 if mopts:
2101 self.opts = mopts
2102 else:
2103 self.opts = salt.config.minion_config(c_path)
2104 self.sminion = salt.minion.SMinion(self.opts)
2105
2106 def cmd(self, fun, *args, **kwargs):
2107 """
2108 Call an execution module with the given arguments and keyword arguments
2109
2110 .. versionchanged:: 2015.8.0
2111 Added the ``cmd`` method for consistency with the other Salt clients.
2112 The existing ``function`` and ``sminion.functions`` interfaces still
2113 exist but have been removed from the docs.
2114
2115 .. code-block:: python
2116
2117 caller.cmd('test.arg', 'Foo', 'Bar', baz='Baz')
2118
2119 caller.cmd('event.send', 'myco/myevent/something',
2120 data={'foo': 'Foo'}, with_env=['GIT_COMMIT'], with_grains=True)
2121 """
2122 return self.sminion.functions[fun](*args, **kwargs)
2123
2124 def function(self, fun, *args, **kwargs):
2125 """
2126 Call a single salt function
2127 """
2128 func = self.sminion.functions[fun]
2129 args, kwargs = salt.minion.load_args_and_kwargs(
2130 func, salt.utils.args.parse_input(args), kwargs
2131 )
2132 return func(*args, **kwargs)
2133
2134
2135 class ProxyCaller:
2136 """
2137 ``ProxyCaller`` is the same interface used by the :command:`salt-call`
2138 with the args ``--proxyid <proxyid>`` command-line tool on the Salt Proxy
2139 Minion.
2140
2141 Importing and using ``ProxyCaller`` must be done on the same machine as a
2142 Salt Minion and it must be done using the same user that the Salt Minion is
2143 running as.
2144
2145 Usage:
2146
2147 .. code-block:: python
2148
2149 import salt.client
2150 caller = salt.client.Caller()
2151 caller.cmd('test.ping')
2152
2153 Note, a running master or minion daemon is not required to use this class.
2154 Running ``salt-call --local`` simply sets :conf_minion:`file_client` to
2155 ``'local'``. The same can be achieved at the Python level by including that
2156 setting in a minion config file.
2157
2158 .. code-block:: python
2159
2160 import salt.client
2161 import salt.config
2162 __opts__ = salt.config.proxy_config('/etc/salt/proxy', minion_id='quirky_edison')
2163 __opts__['file_client'] = 'local'
2164 caller = salt.client.ProxyCaller(mopts=__opts__)
2165
2166 .. note::
2167
2168 To use this for calling proxies, the :py:func:`is_proxy functions
2169 <salt.utils.platform.is_proxy>` requires that ``--proxyid`` be an
2170 argument on the commandline for the script this is used in, or that the
2171 string ``proxy`` is in the name of the script.
2172 """
2173
2174 def __init__(self, c_path=os.path.join(syspaths.CONFIG_DIR, "proxy"), mopts=None):
2175 # Late-import of the minion module to keep the CLI as light as possible
2176 import salt.minion
2177
2178 self.opts = mopts or salt.config.proxy_config(c_path)
2179 self.sminion = salt.minion.SProxyMinion(self.opts)
2180
2181 def cmd(self, fun, *args, **kwargs):
2182 """
2183 Call an execution module with the given arguments and keyword arguments
2184
2185 .. code-block:: python
2186
2187 caller.cmd('test.arg', 'Foo', 'Bar', baz='Baz')
2188
2189 caller.cmd('event.send', 'myco/myevent/something',
2190 data={'foo': 'Foo'}, with_env=['GIT_COMMIT'], with_grains=True)
2191 """
2192 func = self.sminion.functions[fun]
2193 data = {"arg": args, "fun": fun}
2194 data.update(kwargs)
2195 executors = getattr(self.sminion, "module_executors", []) or self.opts.get(
2196 "module_executors", ["direct_call"]
2197 )
2198 if isinstance(executors, str):
2199 executors = [executors]
2200 for name in executors:
2201 fname = "{}.execute".format(name)
2202 if fname not in self.sminion.executors:
2203 raise SaltInvocationError("Executor '{}' is not available".format(name))
2204 return_data = self.sminion.executors[fname](
2205 self.opts, data, func, args, kwargs
2206 )
2207 if return_data is not None:
2208 break
2209 return return_data