"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "salt/client/__init__.py" between
salt-3002.1.tar.gz and salt-3002.2.tar.gz

About: SaltStack is a systems management software for data center automation, cloud orchestration, server provisioning, configuration management and more. Community version.

__init__.py  (salt-3002.1):__init__.py  (salt-3002.2)
# -*- coding: utf-8 -*-
""" """
The client module is used to create a client connection to the publisher The client module is used to create a client connection to the publisher
The data structure needs to be: The data structure needs to be:
{'enc': 'clear', {'enc': 'clear',
'load': {'fun': '<mod.callable>', 'load': {'fun': '<mod.callable>',
'arg':, ('arg1', 'arg2', ...), 'arg':, ('arg1', 'arg2', ...),
'tgt': '<glob or id>', 'tgt': '<glob or id>',
'key': '<read in the key file>'} 'key': '<read in the key file>'}
""" """
# Import salt libs
# Import third party libs
# pylint: disable=import-error # pylint: disable=import-error
# Try to import range from https://github.com/ytoolshed/range # Try to import range from https://github.com/ytoolshed/range
# #
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging import logging
# The components here are simple, and they need to be and stay simple, we # The components here are simple, and they need to be and stay simple, we
# want a client to have 3 external concerns, and maybe a forth configurable # want a client to have 3 external concerns, and maybe a forth configurable
# option. # option.
# The concerns are: # The concerns are:
# 1. Who executes the command? # 1. Who executes the command?
# 2. What is the function being run? # 2. What is the function being run?
# 3. What arguments need to be passed to the function? # 3. What arguments need to be passed to the function?
skipping to change at line 116 skipping to change at line 109
opts = salt.config.client_config(c_path) opts = salt.config.client_config(c_path)
# TODO: AIO core is separate from transport # TODO: AIO core is separate from transport
return LocalClient( return LocalClient(
mopts=opts, mopts=opts,
skip_perm_errors=skip_perm_errors, skip_perm_errors=skip_perm_errors,
io_loop=io_loop, io_loop=io_loop,
auto_reconnect=auto_reconnect, auto_reconnect=auto_reconnect,
) )
class LocalClient(object): class LocalClient:
""" """
The interface used by the :command:`salt` CLI tool on the Salt Master The interface used by the :command:`salt` CLI tool on the Salt Master
``LocalClient`` is used to send a command to Salt minions to execute ``LocalClient`` is used to send a command to Salt minions to execute
:ref:`execution modules <all-salt.modules>` and return the results to the :ref:`execution modules <all-salt.modules>` and return the results to the
Salt Master. Salt Master.
Importing and using ``LocalClient`` must be done on the same machine as the Importing and using ``LocalClient`` must be done on the same machine as the
Salt Master and it must be done using the same user that the Salt Master is Salt Master and it must be done using the same user that the Salt Master is
running as. (Unless :conf_master:`external_auth` is configured and running as. (Unless :conf_master:`external_auth` is configured and
skipping to change at line 204 skipping to change at line 197
key_user = self.salt_user key_user = self.salt_user
if key_user == "root": if key_user == "root":
if self.opts.get("user", "root") != "root": if self.opts.get("user", "root") != "root":
key_user = self.opts.get("user", "root") key_user = self.opts.get("user", "root")
if key_user.startswith("sudo_"): if key_user.startswith("sudo_"):
key_user = self.opts.get("user", "root") key_user = self.opts.get("user", "root")
if salt.utils.platform.is_windows(): if salt.utils.platform.is_windows():
# The username may contain '\' if it is in Windows # The username may contain '\' if it is in Windows
# 'DOMAIN\username' format. Fix this for the keyfile path. # 'DOMAIN\username' format. Fix this for the keyfile path.
key_user = key_user.replace("\\", "_") key_user = key_user.replace("\\", "_")
keyfile = os.path.join(self.opts["cachedir"], ".{0}_key".format(key_user )) keyfile = os.path.join(self.opts["cachedir"], ".{}_key".format(key_user) )
try: try:
# Make sure all key parent directories are accessible # Make sure all key parent directories are accessible
salt.utils.verify.check_path_traversal( salt.utils.verify.check_path_traversal(
self.opts["cachedir"], key_user, self.skip_perm_errors self.opts["cachedir"], key_user, self.skip_perm_errors
) )
with salt.utils.files.fopen(keyfile, "r") as key: with salt.utils.files.fopen(keyfile, "r") as key:
return salt.utils.stringutils.to_unicode(key.read()) return salt.utils.stringutils.to_unicode(key.read())
except (OSError, IOError, SaltClientError): except (OSError, SaltClientError):
# Fall back to eauth # Fall back to eauth
return "" return ""
def _convert_range_to_list(self, tgt): def _convert_range_to_list(self, tgt):
""" """
convert a seco.range range into a list target convert a seco.range range into a list target
""" """
range_ = seco.range.Range(self.opts["range_server"]) range_ = seco.range.Range(self.opts["range_server"])
try: try:
return range_.expand(tgt) return range_.expand(tgt)
except seco.range.RangeException as err: except seco.range.RangeException as err:
print("Range server exception: {0}".format(err)) print("Range server exception: {}".format(err))
return [] return []
def _get_timeout(self, timeout): def _get_timeout(self, timeout):
""" """
Return the timeout to use Return the timeout to use
""" """
if timeout is None: if timeout is None:
return self.opts["timeout"] return self.opts["timeout"]
if isinstance(timeout, int): if isinstance(timeout, int):
return timeout return timeout
if isinstance(timeout, six.string_types): if isinstance(timeout, str):
try: try:
return int(timeout) return int(timeout)
except ValueError: except ValueError:
return self.opts["timeout"] return self.opts["timeout"]
# Looks like the timeout is invalid, use config # Looks like the timeout is invalid, use config
return self.opts["timeout"] return self.opts["timeout"]
def gather_job_info(self, jid, tgt, tgt_type, listen=True, **kwargs): def gather_job_info(self, jid, tgt, tgt_type, listen=True, **kwargs):
""" """
Return the information about a given job Return the information about a given job
skipping to change at line 306 skipping to change at line 299
"No command was sent, no jid was assigned." "No command was sent, no jid was assigned."
) )
return {} return {}
# don't install event subscription listeners when the request is asynchr onous # don't install event subscription listeners when the request is asynchr onous
# and doesn't care. this is important as it will create event leaks othe rwise # and doesn't care. this is important as it will create event leaks othe rwise
if not listen: if not listen:
return pub_data return pub_data
if self.opts.get("order_masters"): if self.opts.get("order_masters"):
self.event.subscribe("syndic/.*/{0}".format(pub_data["jid"]), "regex ") self.event.subscribe("syndic/.*/{}".format(pub_data["jid"]), "regex" )
self.event.subscribe("salt/job/{0}".format(pub_data["jid"])) self.event.subscribe("salt/job/{}".format(pub_data["jid"]))
return pub_data return pub_data
def run_job( def run_job(
self, self,
tgt, tgt,
fun, fun,
arg=(), arg=(),
tgt_type="glob", tgt_type="glob",
ret="", ret="",
skipping to change at line 359 skipping to change at line 352
timeout=self._get_timeout(timeout), timeout=self._get_timeout(timeout),
listen=listen, listen=listen,
**kwargs **kwargs
) )
except SaltClientError: except SaltClientError:
# Re-raise error with specific message # Re-raise error with specific message
raise SaltClientError( raise SaltClientError(
"The salt master could not be contacted. Is master running?" "The salt master could not be contacted. Is master running?"
) )
except AuthenticationError as err: except AuthenticationError as err:
six.reraise(*sys.exc_info()) raise
except AuthorizationError as err: except AuthorizationError as err:
six.reraise(*sys.exc_info()) raise
except Exception as general_exception: # pylint: disable=broad-except except Exception as general_exception: # pylint: disable=broad-except
# Convert to generic client error and pass along message # Convert to generic client error and pass along message
raise SaltClientError(general_exception) raise SaltClientError(general_exception)
return self._check_pub_data(pub_data, listen=listen) return self._check_pub_data(pub_data, listen=listen)
def gather_minions(self, tgt, expr_form): def gather_minions(self, tgt, expr_form):
_res = salt.utils.minions.CkMinions(self.opts).check_minions( _res = salt.utils.minions.CkMinions(self.opts).check_minions(
tgt, tgt_type=expr_form tgt, tgt_type=expr_form
) )
skipping to change at line 578 skipping to change at line 571
eauth = {} eauth = {}
if "eauth" in kwargs: if "eauth" in kwargs:
eauth["eauth"] = kwargs.pop("eauth") eauth["eauth"] = kwargs.pop("eauth")
if "username" in kwargs: if "username" in kwargs:
eauth["username"] = kwargs.pop("username") eauth["username"] = kwargs.pop("username")
if "password" in kwargs: if "password" in kwargs:
eauth["password"] = kwargs.pop("password") eauth["password"] = kwargs.pop("password")
if "token" in kwargs: if "token" in kwargs:
eauth["token"] = kwargs.pop("token") eauth["token"] = kwargs.pop("token")
for key, val in six.iteritems(self.opts): for key, val in self.opts.items():
if key not in opts: if key not in opts:
opts[key] = val opts[key] = val
batch = salt.cli.batch.Batch(opts, eauth=eauth, quiet=True) batch = salt.cli.batch.Batch(opts, eauth=eauth, quiet=True)
for ret in batch.run(): for ret in batch.run():
yield ret yield ret
def cmd( def cmd(
self, self,
tgt, tgt,
fun, fun,
skipping to change at line 731 skipping to change at line 724
for fn_ret in self.get_cli_event_returns( for fn_ret in self.get_cli_event_returns(
pub_data["jid"], pub_data["jid"],
pub_data["minions"], pub_data["minions"],
self._get_timeout(timeout), self._get_timeout(timeout),
tgt, tgt,
tgt_type, tgt_type,
**kwargs **kwargs
): ):
if fn_ret: if fn_ret:
for mid, data in six.iteritems(fn_ret): for mid, data in fn_ret.items():
ret[mid] = data if full_return else data.get("ret", {}) ret[mid] = data if full_return else data.get("ret", {})
for failed in list(set(pub_data["minions"]) - set(ret)): for failed in list(set(pub_data["minions"]) - set(ret)):
ret[failed] = False ret[failed] = False
return ret return ret
finally: finally:
if not was_listening: if not was_listening:
self.event.close_pub() self.event.close_pub()
def cmd_cli( def cmd_cli(
skipping to change at line 1010 skipping to change at line 1003
verbose=False, verbose=False,
show_jid=False, show_jid=False,
**kwargs **kwargs
): ):
""" """
Starts a watcher looking at the return data for a specified JID Starts a watcher looking at the return data for a specified JID
:returns: all of the information for the JID :returns: all of the information for the JID
""" """
if verbose: if verbose:
msg = "Executing job with jid {0}".format(jid) msg = "Executing job with jid {}".format(jid)
print(msg) print(msg)
print("-" * len(msg) + "\n") print("-" * len(msg) + "\n")
elif show_jid: elif show_jid:
print("jid: {0}".format(jid)) print("jid: {}".format(jid))
if timeout is None: if timeout is None:
timeout = self.opts["timeout"] timeout = self.opts["timeout"]
fret = {} fret = {}
# make sure the minions is a set (since we do set operations on it) # make sure the minions is a set (since we do set operations on it)
minions = set(minions) minions = set(minions)
found = set() found = set()
# start this before the cache lookup-- in case new stuff comes in # start this before the cache lookup-- in case new stuff comes in
event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout) event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout)
skipping to change at line 1084 skipping to change at line 1077
expect_minions=False, expect_minions=False,
block=True, block=True,
**kwargs **kwargs
): ):
""" """
Watch the event system and return job data as it comes in Watch the event system and return job data as it comes in
:returns: all of the information for the JID :returns: all of the information for the JID
""" """
if not isinstance(minions, set): if not isinstance(minions, set):
if isinstance(minions, six.string_types): if isinstance(minions, str):
minions = set([minions]) minions = {minions}
elif isinstance(minions, (list, tuple)): elif isinstance(minions, (list, tuple)):
minions = set(list(minions)) minions = set(list(minions))
if timeout is None: if timeout is None:
timeout = self.opts["timeout"] timeout = self.opts["timeout"]
gather_job_timeout = int( gather_job_timeout = int(
kwargs.get("gather_job_timeout", self.opts["gather_job_timeout"]) kwargs.get("gather_job_timeout", self.opts["gather_job_timeout"])
) )
start = int(time.time()) start = int(time.time())
# timeouts per minion, id_ -> timeout time # timeouts per minion, id_ -> timeout time
minion_timeouts = {} minion_timeouts = {}
found = set() found = set()
missing = set() missing = set()
# Check to see if the jid is real, if not return the empty dict # Check to see if the jid is real, if not return the empty dict
try: try:
if ( if (
self.returners["{0}.get_load".format(self.opts["master_job_cache self.returners["{}.get_load".format(self.opts["master_job_cache"
"])]( ])](jid)
jid
)
== {} == {}
): ):
log.warning("jid does not exist") log.warning("jid does not exist")
yield {} yield {}
# stop the iteration, since the jid is invalid # stop the iteration, since the jid is invalid
raise StopIteration() raise StopIteration()
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
log.warning( log.warning(
"Returner unavailable: %s", exc, exc_info_on_loglevel=logging.DE BUG "Returner unavailable: %s", exc, exc_info_on_loglevel=logging.DE BUG
) )
# Wait for the hosts to check in # Wait for the hosts to check in
last_time = False last_time = False
# iterator for this job's return # iterator for this job's return
if self.opts["order_masters"]: if self.opts["order_masters"]:
# If we are a MoM, we need to gather expected minions from downstrea ms masters. # If we are a MoM, we need to gather expected minions from downstrea ms masters.
ret_iter = self.get_returns_no_block( ret_iter = self.get_returns_no_block(
"(salt/job|syndic/.*)/{0}".format(jid), "regex" "(salt/job|syndic/.*)/{}".format(jid), "regex"
) )
else: else:
ret_iter = self.get_returns_no_block("salt/job/{0}".format(jid)) ret_iter = self.get_returns_no_block("salt/job/{}".format(jid))
# iterator for the info of this job # iterator for the info of this job
jinfo_iter = [] jinfo_iter = []
# open event jids that need to be un-subscribed from later # open event jids that need to be un-subscribed from later
open_jids = set() open_jids = set()
timeout_at = time.time() + timeout timeout_at = time.time() + timeout
gather_syndic_wait = time.time() + self.opts["syndic_wait"] gather_syndic_wait = time.time() + self.opts["syndic_wait"]
# are there still minions running the job out there # are there still minions running the job out there
# start as True so that we ping at least once # start as True so that we ping at least once
minions_running = True minions_running = True
log.debug( log.debug(
skipping to change at line 1219 skipping to change at line 1210
jinfo = self.gather_job_info( jinfo = self.gather_job_info(
jid, list(minions - found), "list", **kwargs jid, list(minions - found), "list", **kwargs
) )
minions_running = False minions_running = False
# if we weren't assigned any jid that means the master thinks # if we weren't assigned any jid that means the master thinks
# we have nothing to send # we have nothing to send
if "jid" not in jinfo: if "jid" not in jinfo:
jinfo_iter = [] jinfo_iter = []
else: else:
jinfo_iter = self.get_returns_no_block( jinfo_iter = self.get_returns_no_block(
"salt/job/{0}".format(jinfo["jid"]) "salt/job/{}".format(jinfo["jid"])
) )
timeout_at = time.time() + gather_job_timeout timeout_at = time.time() + gather_job_timeout
# if you are a syndic, wait a little longer # if you are a syndic, wait a little longer
if self.opts["order_masters"]: if self.opts["order_masters"]:
timeout_at += self.opts.get("syndic_wait", 1) timeout_at += self.opts.get("syndic_wait", 1)
# check for minions that are running the job still # check for minions that are running the job still
for raw in jinfo_iter: for raw in jinfo_iter:
# if there are no more events, lets stop waiting for the jinfo # if there are no more events, lets stop waiting for the jinfo
if raw is None: if raw is None:
skipping to change at line 1271 skipping to change at line 1262
if "return" not in raw.get("data", {}): if "return" not in raw.get("data", {}):
continue continue
# if the job isn't running there anymore... don't count # if the job isn't running there anymore... don't count
if raw["data"]["return"] == {}: if raw["data"]["return"] == {}:
continue continue
# if the minion throws an exception containing the word "return" # if the minion throws an exception containing the word "return"
# the master will try to handle the string as a dict in the next # the master will try to handle the string as a dict in the next
# step. Check if we have a string, log the issue and continue. # step. Check if we have a string, log the issue and continue.
if isinstance(raw["data"]["return"], six.string_types): if isinstance(raw["data"]["return"], str):
log.error("unexpected return from minion: %s", raw) log.error("unexpected return from minion: %s", raw)
continue continue
if ( if (
"return" in raw["data"]["return"] "return" in raw["data"]["return"]
and raw["data"]["return"]["return"] == {} and raw["data"]["return"]["return"] == {}
): ):
continue continue
# if we didn't originally target the minion, lets add it to the list # if we didn't originally target the minion, lets add it to the list
skipping to change at line 1316 skipping to change at line 1307
time.sleep(0.01) time.sleep(0.01)
else: else:
yield yield
# If there are any remaining open events, clean them up. # If there are any remaining open events, clean them up.
if open_jids: if open_jids:
for jid in open_jids: for jid in open_jids:
self.event.unsubscribe(jid) self.event.unsubscribe(jid)
if expect_minions: if expect_minions:
for minion in list((minions - found)): for minion in list(minions - found):
yield {minion: {"failed": True}} yield {minion: {"failed": True}}
# Filter out any minions marked as missing for which we received # Filter out any minions marked as missing for which we received
# returns (prevents false events sent due to higher-level masters not # returns (prevents false events sent due to higher-level masters not
# knowing about lower-level minions). # knowing about lower-level minions).
missing -= found missing -= found
# Report on missing minions # Report on missing minions
if missing: if missing:
for minion in missing: for minion in missing:
skipping to change at line 1350 skipping to change at line 1341
jid, jid,
minions, minions,
datetime.fromtimestamp(timeout_at).time(), datetime.fromtimestamp(timeout_at).time(),
) )
found = set() found = set()
ret = {} ret = {}
# Check to see if the jid is real, if not return the empty dict # Check to see if the jid is real, if not return the empty dict
try: try:
if ( if (
self.returners["{0}.get_load".format(self.opts["master_job_cache self.returners["{}.get_load".format(self.opts["master_job_cache"
"])]( ])](jid)
jid
)
== {} == {}
): ):
log.warning("jid does not exist") log.warning("jid does not exist")
return ret return ret
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
raise SaltClientError( raise SaltClientError(
"Master job cache returner [{0}] failed to verify jid. " "Master job cache returner [{}] failed to verify jid. "
"Exception details: {1}".format(self.opts["master_job_cache"], e "Exception details: {}".format(self.opts["master_job_cache"], ex
xc) c)
) )
# Wait for the hosts to check in # Wait for the hosts to check in
while True: while True:
time_left = timeout_at - int(time.time()) time_left = timeout_at - int(time.time())
wait = max(1, time_left) wait = max(1, time_left)
raw = self.event.get_event(wait, jid, auto_reconnect=self.auto_recon nect) raw = self.event.get_event(wait, jid, auto_reconnect=self.auto_recon nect)
if raw is not None and "return" in raw: if raw is not None and "return" in raw:
found.add(raw["id"]) found.add(raw["id"])
ret[raw["id"]] = raw["return"] ret[raw["id"]] = raw["return"]
skipping to change at line 1402 skipping to change at line 1391
a specified jid, it returns all of the information for the jid a specified jid, it returns all of the information for the jid
""" """
# TODO: change this from ret to return... or the other way. # TODO: change this from ret to return... or the other way.
# Its inconsistent, we should pick one # Its inconsistent, we should pick one
ret = {} ret = {}
# create the iterator-- since we want to get anyone in the middle # create the iterator-- since we want to get anyone in the middle
event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout) event_iter = self.get_event_iter_returns(jid, minions, timeout=timeout)
try: try:
data = self.returners["{0}.get_jid".format(self.opts["master_job_cac he"])]( data = self.returners["{}.get_jid".format(self.opts["master_job_cach e"])](
jid jid
) )
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
raise SaltClientError( raise SaltClientError(
"Returner {0} could not fetch jid data. " "Returner {} could not fetch jid data. "
"Exception details: {1}".format(self.opts["master_job_cache"], e "Exception details: {}".format(self.opts["master_job_cache"], ex
xc) c)
) )
for minion in data: for minion in data:
m_data = {} m_data = {}
if "return" in data[minion]: if "return" in data[minion]:
m_data["ret"] = data[minion].get("return") m_data["ret"] = data[minion].get("return")
else: else:
m_data["ret"] = data[minion].get("return") m_data["ret"] = data[minion].get("return")
if "out" in data[minion]: if "out" in data[minion]:
m_data["out"] = data[minion]["out"] m_data["out"] = data[minion]["out"]
if minion in ret: if minion in ret:
skipping to change at line 1433 skipping to change at line 1422
# if we have all the minion returns, lets just return # if we have all the minion returns, lets just return
if len(set(ret).intersection(minions)) >= len(minions): if len(set(ret).intersection(minions)) >= len(minions):
return ret return ret
# otherwise lets use the listener we created above to get the rest # otherwise lets use the listener we created above to get the rest
for event_ret in event_iter: for event_ret in event_iter:
# if nothing in the event_ret, skip # if nothing in the event_ret, skip
if event_ret == {}: if event_ret == {}:
time.sleep(0.02) time.sleep(0.02)
continue continue
for minion, m_data in six.iteritems(event_ret): for minion, m_data in event_ret.items():
if minion in ret: if minion in ret:
ret[minion].update(m_data) ret[minion].update(m_data)
else: else:
ret[minion] = m_data ret[minion] = m_data
# are we done yet? # are we done yet?
if len(set(ret).intersection(minions)) >= len(minions): if len(set(ret).intersection(minions)) >= len(minions):
return ret return ret
# otherwise we hit the timeout, return what we have # otherwise we hit the timeout, return what we have
return ret return ret
def get_cache_returns(self, jid): def get_cache_returns(self, jid):
""" """
Execute a single pass to gather the contents of the job cache Execute a single pass to gather the contents of the job cache
""" """
ret = {} ret = {}
try: try:
data = self.returners["{0}.get_jid".format(self.opts["master_job_cac he"])]( data = self.returners["{}.get_jid".format(self.opts["master_job_cach e"])](
jid jid
) )
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
raise SaltClientError( raise SaltClientError(
"Could not examine master job cache. " "Could not examine master job cache. "
"Error occurred in {0} returner. " "Error occurred in {} returner. "
"Exception details: {1}".format(self.opts["master_job_cache"], e "Exception details: {}".format(self.opts["master_job_cache"], ex
xc) c)
) )
for minion in data: for minion in data:
m_data = {} m_data = {}
if "return" in data[minion]: if "return" in data[minion]:
m_data["ret"] = data[minion].get("return") m_data["ret"] = data[minion].get("return")
else: else:
m_data["ret"] = data[minion].get("return") m_data["ret"] = data[minion].get("return")
if "out" in data[minion]: if "out" in data[minion]:
m_data["out"] = data[minion]["out"] m_data["out"] = data[minion]["out"]
if minion in ret: if minion in ret:
skipping to change at line 1494 skipping to change at line 1483
verbose=False, verbose=False,
show_timeout=False, show_timeout=False,
show_jid=False, show_jid=False,
): ):
""" """
Get the returns for the command line interface via the event system Get the returns for the command line interface via the event system
""" """
log.trace("entered - function get_cli_static_event_returns()") log.trace("entered - function get_cli_static_event_returns()")
minions = set(minions) minions = set(minions)
if verbose: if verbose:
msg = "Executing job with jid {0}".format(jid) msg = "Executing job with jid {}".format(jid)
print(msg) print(msg)
print("-" * len(msg) + "\n") print("-" * len(msg) + "\n")
elif show_jid: elif show_jid:
print("jid: {0}".format(jid)) print("jid: {}".format(jid))
if timeout is None: if timeout is None:
timeout = self.opts["timeout"] timeout = self.opts["timeout"]
start = int(time.time()) start = int(time.time())
timeout_at = start + timeout timeout_at = start + timeout
found = set() found = set()
ret = {} ret = {}
# Check to see if the jid is real, if not return the empty dict # Check to see if the jid is real, if not return the empty dict
try: try:
if ( if (
self.returners["{0}.get_load".format(self.opts["master_job_cache self.returners["{}.get_load".format(self.opts["master_job_cache"
"])]( ])](jid)
jid
)
== {} == {}
): ):
log.warning("jid does not exist") log.warning("jid does not exist")
return ret return ret
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
raise SaltClientError( raise SaltClientError(
"Load could not be retrieved from " "Load could not be retrieved from "
"returner {0}. Exception details: {1}".format( "returner {}. Exception details: {}".format(
self.opts["master_job_cache"], exc self.opts["master_job_cache"], exc
) )
) )
# Wait for the hosts to check in # Wait for the hosts to check in
while True: while True:
# Process events until timeout is reached or all minions have return ed # Process events until timeout is reached or all minions have return ed
time_left = timeout_at - int(time.time()) time_left = timeout_at - int(time.time())
# Wait 0 == forever, use a minimum of 1s # Wait 0 == forever, use a minimum of 1s
wait = max(1, time_left) wait = max(1, time_left)
jid_tag = "salt/job/{0}".format(jid) jid_tag = "salt/job/{}".format(jid)
raw = self.event.get_event( raw = self.event.get_event(
wait, jid_tag, auto_reconnect=self.auto_reconnect wait, jid_tag, auto_reconnect=self.auto_reconnect
) )
if raw is not None and "return" in raw: if raw is not None and "return" in raw:
if "minions" in raw.get("data", {}): if "minions" in raw.get("data", {}):
minions.update(raw["data"]["minions"]) minions.update(raw["data"]["minions"])
continue continue
found.add(raw["id"]) found.add(raw["id"])
ret[raw["id"]] = {"ret": raw["return"]} ret[raw["id"]] = {"ret": raw["return"]}
ret[raw["id"]]["success"] = raw.get("success", False) ret[raw["id"]]["success"] = raw.get("success", False)
skipping to change at line 1590 skipping to change at line 1577
show_timeout=False, show_timeout=False,
show_jid=False, show_jid=False,
**kwargs **kwargs
): ):
""" """
Get the returns for the command line interface via the event system Get the returns for the command line interface via the event system
""" """
log.trace("func get_cli_event_returns()") log.trace("func get_cli_event_returns()")
if verbose: if verbose:
msg = "Executing job with jid {0}".format(jid) msg = "Executing job with jid {}".format(jid)
print(msg) print(msg)
print("-" * len(msg) + "\n") print("-" * len(msg) + "\n")
elif show_jid: elif show_jid:
print("jid: {0}".format(jid)) print("jid: {}".format(jid))
# lazy load the connected minions # lazy load the connected minions
connected_minions = None connected_minions = None
return_count = 0 return_count = 0
for ret in self.get_iter_returns( for ret in self.get_iter_returns(
jid, jid,
minions, minions,
timeout=timeout, timeout=timeout,
tgt=tgt, tgt=tgt,
skipping to change at line 1617 skipping to change at line 1604
# call. If this is not popped, then it would be passed twice to # call. If this is not popped, then it would be passed twice to
# get_iter_returns. # get_iter_returns.
expect_minions=( expect_minions=(
kwargs.pop("expect_minions", False) or verbose or show_timeout kwargs.pop("expect_minions", False) or verbose or show_timeout
), ),
**kwargs **kwargs
): ):
log.debug("return event: %s", ret) log.debug("return event: %s", ret)
return_count = return_count + 1 return_count = return_count + 1
if progress: if progress:
for id_, min_ret in six.iteritems(ret): for id_, min_ret in ret.items():
if not min_ret.get("failed") is True: if not min_ret.get("failed") is True:
yield { yield {
"minion_count": len(minions), "minion_count": len(minions),
"return_count": return_count, "return_count": return_count,
} }
# replace the return structure for missing minions # replace the return structure for missing minions
for id_, min_ret in six.iteritems(ret): for id_, min_ret in ret.items():
if min_ret.get("failed") is True: if min_ret.get("failed") is True:
if connected_minions is None: if connected_minions is None:
connected_minions = salt.utils.minions.CkMinions( connected_minions = salt.utils.minions.CkMinions(
self.opts self.opts
).connected_ids() ).connected_ids()
if ( if (
self.opts["minion_data_cache"] self.opts["minion_data_cache"]
and salt.cache.factory(self.opts).contains( and salt.cache.factory(self.opts).contains(
"minions/{0}".format(id_), "data" "minions/{}".format(id_), "data"
) )
and connected_minions and connected_minions
and id_ not in connected_minions and id_ not in connected_minions
): ):
yield { yield {
id_: { id_: {
"out": "no_return", "out": "no_return",
"ret": "Minion did not return. [Not connected]", "ret": "Minion did not return. [Not connected]",
"retcode": salt.defaults.exitcodes.EX_GENERIC, "retcode": salt.defaults.exitcodes.EX_GENERIC,
skipping to change at line 1659 skipping to change at line 1646
os.path.join(self.opts["syndic_dir"], id_) os.path.join(self.opts["syndic_dir"], id_)
): ):
yield { yield {
id_: { id_: {
"out": "no_return", "out": "no_return",
"ret": "Minion did not return. [No response] " "ret": "Minion did not return. [No response] "
"\nThe minions may not have all finished run ning and any " "\nThe minions may not have all finished run ning and any "
"remaining minions will return upon completi on. To look " "remaining minions will return upon completi on. To look "
"up the return data for this job later, run the following " "up the return data for this job later, run the following "
"command:\n\n" "command:\n\n"
"salt-run jobs.lookup_jid {0}".format(jid), "salt-run jobs.lookup_jid {}".format(jid),
"retcode": salt.defaults.exitcodes.EX_GENERI C, "retcode": salt.defaults.exitcodes.EX_GENERI C,
} }
} }
else: else:
yield {id_: min_ret} yield {id_: min_ret}
self._clean_up_subscriptions(jid) self._clean_up_subscriptions(jid)
def get_event_iter_returns(self, jid, minions, timeout=None): def get_event_iter_returns(self, jid, minions, timeout=None):
""" """
skipping to change at line 1682 skipping to change at line 1669
""" """
log.trace("entered - function get_event_iter_returns()") log.trace("entered - function get_event_iter_returns()")
if timeout is None: if timeout is None:
timeout = self.opts["timeout"] timeout = self.opts["timeout"]
timeout_at = time.time() + timeout timeout_at = time.time() + timeout
found = set() found = set()
# Check to see if the jid is real, if not return the empty dict # Check to see if the jid is real, if not return the empty dict
if ( if (
self.returners["{0}.get_load".format(self.opts["master_job_cache"])] (jid) self.returners["{}.get_load".format(self.opts["master_job_cache"])]( jid)
== {} == {}
): ):
log.warning("jid does not exist") log.warning("jid does not exist")
yield {} yield {}
# stop the iteration, since the jid is invalid # stop the iteration, since the jid is invalid
raise StopIteration() raise StopIteration()
# Wait for the hosts to check in # Wait for the hosts to check in
while True: while True:
raw = self.event.get_event(timeout, auto_reconnect=self.auto_reconne ct) raw = self.event.get_event(timeout, auto_reconnect=self.auto_reconne ct)
if raw is None or time.time() > timeout_at: if raw is None or time.time() > timeout_at:
skipping to change at line 1719 skipping to change at line 1706
yield ret yield ret
time.sleep(0.02) time.sleep(0.02)
def _resolve_nodegroup(self, ng): def _resolve_nodegroup(self, ng):
""" """
Resolve a nodegroup into its configured components Resolve a nodegroup into its configured components
""" """
if ng not in self.opts["nodegroups"]: if ng not in self.opts["nodegroups"]:
conf_file = self.opts.get("conf_file", "the master config file") conf_file = self.opts.get("conf_file", "the master config file")
raise SaltInvocationError( raise SaltInvocationError(
"Node group {0} unavailable in {1}".format(ng, conf_file) "Node group {} unavailable in {}".format(ng, conf_file)
) )
return salt.utils.minions.nodegroup_comp(ng, self.opts["nodegroups"]) return salt.utils.minions.nodegroup_comp(ng, self.opts["nodegroups"])
def _prep_pub(self, tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs): def _prep_pub(self, tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs):
""" """
Set up the payload_kwargs to be sent down to the master Set up the payload_kwargs to be sent down to the master
""" """
if tgt_type == "nodegroup": if tgt_type == "nodegroup":
tgt = self._resolve_nodegroup(tgt) tgt = self._resolve_nodegroup(tgt)
tgt_type = "compound" tgt_type = "compound"
if tgt_type == "compound": if tgt_type == "compound":
# Resolve all nodegroups, so that the minions don't have to. # Resolve all nodegroups, so that the minions don't have to.
new_tgt = list() new_tgt = list()
log.debug("compound resolution: original tgt: %s", tgt) log.debug("compound resolution: original tgt: %s", tgt)
if isinstance(tgt, six.string_types): if isinstance(tgt, str):
tgt = tgt.split() tgt = tgt.split()
for word in tgt: for word in tgt:
if word.startswith("N@") and len(word) > 2: if word.startswith("N@") and len(word) > 2:
resolved = self._resolve_nodegroup(word[2:]) resolved = self._resolve_nodegroup(word[2:])
new_tgt.extend(resolved) new_tgt.extend(resolved)
else: else:
new_tgt.append(word) new_tgt.append(word)
log.debug("compound resolution: new_tgt: %s", new_tgt) log.debug("compound resolution: new_tgt: %s", new_tgt)
skipping to change at line 1758 skipping to change at line 1745
# Convert a range expression to a list of nodes and change expression # Convert a range expression to a list of nodes and change expression
# form to list # form to list
if tgt_type == "range" and HAS_RANGE: if tgt_type == "range" and HAS_RANGE:
tgt = self._convert_range_to_list(tgt) tgt = self._convert_range_to_list(tgt)
tgt_type = "list" tgt_type = "list"
# If an external job cache is specified add it to the ret list # If an external job cache is specified add it to the ret list
if self.opts.get("ext_job_cache"): if self.opts.get("ext_job_cache"):
if ret: if ret:
ret += ",{0}".format(self.opts["ext_job_cache"]) ret += ",{}".format(self.opts["ext_job_cache"])
else: else:
ret = self.opts["ext_job_cache"] ret = self.opts["ext_job_cache"]
# format the payload - make a function that does this in the payload # format the payload - make a function that does this in the payload
# module # module
# Generate the standard keyword args to feed to format_payload # Generate the standard keyword args to feed to format_payload
payload_kwargs = { payload_kwargs = {
"cmd": "publish", "cmd": "publish",
"tgt": tgt, "tgt": tgt,
skipping to change at line 1842 skipping to change at line 1829
self.opts["sock_dir"], self.opts["sock_dir"],
) )
raise SaltClientError raise SaltClientError
payload_kwargs = self._prep_pub( payload_kwargs = self._prep_pub(
tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs
) )
master_uri = "tcp://{}:{}".format( master_uri = "tcp://{}:{}".format(
salt.utils.zeromq.ip_bracket(self.opts["interface"]), salt.utils.zeromq.ip_bracket(self.opts["interface"]),
six.text_type(self.opts["ret_port"]), str(self.opts["ret_port"]),
) )
with salt.transport.client.ReqChannel.factory( with salt.transport.client.ReqChannel.factory(
self.opts, crypt="clear", master_uri=master_uri self.opts, crypt="clear", master_uri=master_uri
) as channel: ) as channel:
try: try:
# Ensure that the event subscriber is connected. # Ensure that the event subscriber is connected.
# If not, we won't get a response, so error out # If not, we won't get a response, so error out
if listen and not self.event.connect_pub(timeout=timeout): if listen and not self.event.connect_pub(timeout=timeout):
raise SaltReqTimeoutError() raise SaltReqTimeoutError()
skipping to change at line 1946 skipping to change at line 1933
raise SaltClientError raise SaltClientError
payload_kwargs = self._prep_pub( payload_kwargs = self._prep_pub(
tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs tgt, fun, arg, tgt_type, ret, jid, timeout, **kwargs
) )
master_uri = ( master_uri = (
"tcp://" "tcp://"
+ salt.utils.zeromq.ip_bracket(self.opts["interface"]) + salt.utils.zeromq.ip_bracket(self.opts["interface"])
+ ":" + ":"
+ six.text_type(self.opts["ret_port"]) + str(self.opts["ret_port"])
) )
with salt.transport.client.AsyncReqChannel.factory( with salt.transport.client.AsyncReqChannel.factory(
self.opts, io_loop=io_loop, crypt="clear", master_uri=master_uri self.opts, io_loop=io_loop, crypt="clear", master_uri=master_uri
) as channel: ) as channel:
try: try:
# Ensure that the event subscriber is connected. # Ensure that the event subscriber is connected.
# If not, we won't get a response, so error out # If not, we won't get a response, so error out
if listen and not self.event.connect_pub(timeout=timeout): if listen and not self.event.connect_pub(timeout=timeout):
raise SaltReqTimeoutError() raise SaltReqTimeoutError()
skipping to change at line 2011 skipping to change at line 1998
# When running tests, if self.events is not destroyed, we leak 2 # When running tests, if self.events is not destroyed, we leak 2
# threads per test case which uses self.client # threads per test case which uses self.client
if hasattr(self, "event"): if hasattr(self, "event"):
# The call below will take care of calling 'self.event.destroy()' # The call below will take care of calling 'self.event.destroy()'
del self.event del self.event
# pylint: enable=W1701 # pylint: enable=W1701
def _clean_up_subscriptions(self, job_id): def _clean_up_subscriptions(self, job_id):
if self.opts.get("order_masters"): if self.opts.get("order_masters"):
self.event.unsubscribe("syndic/.*/{0}".format(job_id), "regex") self.event.unsubscribe("syndic/.*/{}".format(job_id), "regex")
self.event.unsubscribe("salt/job/{0}".format(job_id)) self.event.unsubscribe("salt/job/{}".format(job_id))
class FunctionWrapper(dict): class FunctionWrapper(dict):
""" """
Create a function wrapper that looks like the functions dict on the minion Create a function wrapper that looks like the functions dict on the minion
but invoked commands on the minion via a LocalClient. but invoked commands on the minion via a LocalClient.
This allows SLS files to be loaded with an object that calls down to the This allows SLS files to be loaded with an object that calls down to the
minion when the salt functions dict is referenced. minion when the salt functions dict is referenced.
""" """
def __init__(self, opts, minion): def __init__(self, opts, minion):
super(FunctionWrapper, self).__init__() super().__init__()
self.opts = opts self.opts = opts
self.minion = minion self.minion = minion
self.local = LocalClient(self.opts["conf_file"]) self.local = LocalClient(self.opts["conf_file"])
self.functions = self.__load_functions() self.functions = self.__load_functions()
def __missing__(self, key): def __missing__(self, key):
""" """
Since the function key is missing, wrap this call to a command to the Since the function key is missing, wrap this call to a command to the
minion of said key if it is available in the self.functions set minion of said key if it is available in the self.functions set
""" """
skipping to change at line 2059 skipping to change at line 2046
Return a function that executes the arguments passed via the local Return a function that executes the arguments passed via the local
client client
""" """
def func(*args, **kwargs): def func(*args, **kwargs):
""" """
Run a remote call Run a remote call
""" """
args = list(args) args = list(args)
for _key, _val in kwargs.items(): for _key, _val in kwargs.items():
args.append("{0}={1}".format(_key, _val)) args.append("{}={}".format(_key, _val))
return self.local.cmd(self.minion, key, args) return self.local.cmd(self.minion, key, args)
return func return func
class Caller(object): class Caller:
""" """
``Caller`` is the same interface used by the :command:`salt-call` ``Caller`` is the same interface used by the :command:`salt-call`
command-line tool on the Salt Minion. command-line tool on the Salt Minion.
.. versionchanged:: 2015.8.0 .. versionchanged:: 2015.8.0
Added the ``cmd`` method for consistency with the other Salt clients. Added the ``cmd`` method for consistency with the other Salt clients.
The existing ``function`` and ``sminion.functions`` interfaces still The existing ``function`` and ``sminion.functions`` interfaces still
exist but have been removed from the docs. exist but have been removed from the docs.
Importing and using ``Caller`` must be done on the same machine as a Importing and using ``Caller`` must be done on the same machine as a
skipping to change at line 2141 skipping to change at line 2128
def function(self, fun, *args, **kwargs): def function(self, fun, *args, **kwargs):
""" """
Call a single salt function Call a single salt function
""" """
func = self.sminion.functions[fun] func = self.sminion.functions[fun]
args, kwargs = salt.minion.load_args_and_kwargs( args, kwargs = salt.minion.load_args_and_kwargs(
func, salt.utils.args.parse_input(args), kwargs func, salt.utils.args.parse_input(args), kwargs
) )
return func(*args, **kwargs) return func(*args, **kwargs)
class ProxyCaller(object): class ProxyCaller:
""" """
``ProxyCaller`` is the same interface used by the :command:`salt-call` ``ProxyCaller`` is the same interface used by the :command:`salt-call`
with the args ``--proxyid <proxyid>`` command-line tool on the Salt Proxy with the args ``--proxyid <proxyid>`` command-line tool on the Salt Proxy
Minion. Minion.
Importing and using ``ProxyCaller`` must be done on the same machine as a Importing and using ``ProxyCaller`` must be done on the same machine as a
Salt Minion and it must be done using the same user that the Salt Minion is Salt Minion and it must be done using the same user that the Salt Minion is
running as. running as.
Usage: Usage:
skipping to change at line 2204 skipping to change at line 2191
caller.cmd('event.send', 'myco/myevent/something', caller.cmd('event.send', 'myco/myevent/something',
data={'foo': 'Foo'}, with_env=['GIT_COMMIT'], with_grains=True) data={'foo': 'Foo'}, with_env=['GIT_COMMIT'], with_grains=True)
""" """
func = self.sminion.functions[fun] func = self.sminion.functions[fun]
data = {"arg": args, "fun": fun} data = {"arg": args, "fun": fun}
data.update(kwargs) data.update(kwargs)
executors = getattr(self.sminion, "module_executors", []) or self.opts.g et( executors = getattr(self.sminion, "module_executors", []) or self.opts.g et(
"module_executors", ["direct_call"] "module_executors", ["direct_call"]
) )
if isinstance(executors, six.string_types): if isinstance(executors, str):
executors = [executors] executors = [executors]
for name in executors: for name in executors:
fname = "{0}.execute".format(name) fname = "{}.execute".format(name)
if fname not in self.sminion.executors: if fname not in self.sminion.executors:
raise SaltInvocationError( raise SaltInvocationError("Executor '{}' is not available".forma
"Executor '{0}' is not available".format(name) t(name))
)
return_data = self.sminion.executors[fname]( return_data = self.sminion.executors[fname](
self.opts, data, func, args, kwargs self.opts, data, func, args, kwargs
) )
if return_data is not None: if return_data is not None:
break break
return return_data return return_data
 End of changes. 55 change blocks. 
78 lines changed or deleted 64 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)