"Fossies" - the Fresh Open Source Software Archive

Member "salt-3002.2/salt/client/ssh/__init__.py" (18 Nov 2020, 63525 Bytes) of package /linux/misc/salt-3002.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "__init__.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3002.1_vs_3002.2.

    1 """
    2 Create ssh executor system
    3 """
    4 
    5 import base64
    6 import binascii
    7 import copy
    8 import datetime
    9 import getpass
   10 import hashlib
   11 import logging
   12 import multiprocessing
   13 import os
   14 import re
   15 import subprocess
   16 import sys
   17 import tarfile
   18 import tempfile
   19 import time
   20 import uuid
   21 
   22 import salt.client.ssh.shell
   23 import salt.client.ssh.wrapper
   24 import salt.config
   25 import salt.defaults.exitcodes
   26 import salt.exceptions
   27 import salt.loader
   28 import salt.log
   29 import salt.minion
   30 import salt.output
   31 import salt.roster
   32 import salt.serializers.yaml
   33 import salt.state
   34 import salt.utils.args
   35 import salt.utils.atomicfile
   36 import salt.utils.event
   37 import salt.utils.files
   38 import salt.utils.hashutils
   39 import salt.utils.json
   40 import salt.utils.network
   41 import salt.utils.path
   42 import salt.utils.stringutils
   43 import salt.utils.thin
   44 import salt.utils.url
   45 import salt.utils.verify
   46 from salt.ext import six
   47 from salt.ext.six.moves import input  # pylint: disable=import-error,redefined-builtin
   48 from salt.template import compile_template
   49 from salt.utils.platform import is_windows
   50 from salt.utils.process import Process
   51 from salt.utils.zeromq import zmq
   52 
   53 try:
   54     import saltwinshell
   55 
   56     HAS_WINSHELL = True
   57 except ImportError:
   58     HAS_WINSHELL = False
   59 
   60 # The directory where salt thin is deployed
   61 DEFAULT_THIN_DIR = "/var/tmp/.%%USER%%_%%FQDNUUID%%_salt"
   62 
   63 # RSTR is just a delimiter to distinguish the beginning of salt STDOUT
   64 # and STDERR.  There is no special meaning.  Messages prior to RSTR in
   65 # stderr and stdout are either from SSH or from the shim.
   66 #
   67 # RSTR on both stdout and stderr:
   68 #    no errors in SHIM - output after RSTR is from salt
   69 # No RSTR in stderr, RSTR in stdout:
   70 #    no errors in SSH_SH_SHIM, but SHIM commands for salt master are after
   71 #    RSTR in stdout
   72 # No RSTR in stderr, no RSTR in stdout:
   73 #    Failure in SHIM
   74 # RSTR in stderr, No RSTR in stdout:
   75 #    Undefined behavior
   76 RSTR = "_edbc7885e4f9aac9b83b35999b68d015148caf467b78fa39c05f669c0ff89878"
   77 
   78 # The regex to find RSTR in output - Must be on an output line by itself
   79 # NOTE - must use non-grouping match groups or output splitting will fail.
   80 RSTR_RE = r"(?:^|\r?\n)" + RSTR + r"(?:\r?\n|$)"
   81 
   82 # METHODOLOGY:
   83 #
   84 #   1) Make the _thinnest_ /bin/sh shim (SSH_SH_SHIM) to find the python
   85 #      interpreter and get it invoked
   86 #   2) Once a qualified python is found start it with the SSH_PY_SHIM
   87 #   3) The shim is converted to a single semicolon separated line, so
   88 #      some constructs are needed to keep it clean.
   89 
   90 # NOTE:
   91 #   * SSH_SH_SHIM is generic and can be used to load+exec *any* python
   92 #     script on the target.
   93 #   * SSH_PY_SHIM is in a separate file rather than stuffed in a string
   94 #     in salt/client/ssh/__init__.py - this makes testing *easy* because
   95 #     it can be invoked directly.
   96 #   * SSH_PY_SHIM is base64 encoded and formatted into the SSH_SH_SHIM
   97 #     string.  This makes the python script "armored" so that it can
   98 #     all be passed in the SSH command and will not need special quoting
   99 #     (which likely would be impossibe to do anyway)
  100 #   * The formatted SSH_SH_SHIM with the SSH_PY_SHIM payload is a bit
  101 #     big (~7.5k).  If this proves problematic for an SSH command we
  102 #     might try simply invoking "/bin/sh -s" and passing the formatted
  103 #     SSH_SH_SHIM on SSH stdin.
  104 
  105 # NOTE: there are two passes of formatting:
  106 # 1) Substitute in static values
  107 #   - EX_THIN_PYTHON_INVALID  - exit code if a suitable python is not found
  108 # 2) Substitute in instance-specific commands
  109 #   - DEBUG       - enable shim debugging (any non-zero string enables)
  110 #   - SUDO        - load python and execute as root (any non-zero string enables)
  111 #   - SSH_PY_CODE - base64-encoded python code to execute
  112 #   - SSH_PY_ARGS - arguments to pass to python code
  113 
  114 # This shim generically loads python code . . . and *no* more.
  115 # - Uses /bin/sh for maximum compatibility - then jumps to
  116 #   python for ultra-maximum compatibility.
  117 #
  118 # 1. Identify a suitable python
  119 # 2. Jump to python
  120 
  121 # Note the list-comprehension syntax to define SSH_SH_SHIM is needed
  122 # to be able to define the string with indentation for readability but
  123 # still strip the white space for compactness and to avoid issues with
  124 # some multi-line embedded python code having indentation errors
  125 SSH_SH_SHIM = "\n".join(
  126     [
  127         s.strip()
  128         for s in r'''/bin/sh << 'EOF'
  129 set -e
  130 set -u
  131 DEBUG="{{DEBUG}}"
  132 if [ -n "$DEBUG" ]
  133     then set -x
  134 fi
  135 SET_PATH="{{SET_PATH}}"
  136 if [ -n "$SET_PATH" ]
  137     then export PATH={{SET_PATH}}
  138 fi
  139 SUDO=""
  140 if [ -n "{{SUDO}}" ]
  141     then SUDO="sudo "
  142 fi
  143 SUDO_USER="{{SUDO_USER}}"
  144 if [ "$SUDO" ] && [ "$SUDO_USER" ]
  145 then SUDO="sudo -u {{SUDO_USER}}"
  146 elif [ "$SUDO" ] && [ -n "$SUDO_USER" ]
  147 then SUDO="sudo "
  148 fi
  149 EX_PYTHON_INVALID={EX_THIN_PYTHON_INVALID}
  150 PYTHON_CMDS="python3 python27 python2.7 python26 python2.6 python2 python"
  151 for py_cmd in $PYTHON_CMDS
  152 do
  153     if command -v "$py_cmd" >/dev/null 2>&1 && "$py_cmd" -c "import sys; sys.exit(not (sys.version_info >= (2, 6)));"
  154     then
  155         py_cmd_path=`"$py_cmd" -c 'from __future__ import print_function;import sys; print(sys.executable);'`
  156         cmdpath=`command -v $py_cmd 2>/dev/null || which $py_cmd 2>/dev/null`
  157         if file $cmdpath | grep "shell script" > /dev/null
  158         then
  159             ex_vars="'PATH', 'LD_LIBRARY_PATH', 'MANPATH', \
  160                    'XDG_DATA_DIRS', 'PKG_CONFIG_PATH'"
  161             export `$py_cmd -c \
  162                   "from __future__ import print_function;
  163                   import sys;
  164                   import os;
  165                   map(sys.stdout.write, ['{{{{0}}}}={{{{1}}}} ' \
  166                   .format(x, os.environ[x]) for x in [$ex_vars]])"`
  167             exec $SUDO PATH=$PATH LD_LIBRARY_PATH=$LD_LIBRARY_PATH \
  168                      MANPATH=$MANPATH XDG_DATA_DIRS=$XDG_DATA_DIRS \
  169                      PKG_CONFIG_PATH=$PKG_CONFIG_PATH \
  170                      "$py_cmd_path" -c \
  171                    'import base64;
  172                    exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))'
  173         else
  174             exec $SUDO "$py_cmd_path" -c \
  175                    'import base64;
  176                    exec(base64.b64decode("""{{SSH_PY_CODE}}""").decode("utf-8"))'
  177         fi
  178         exit 0
  179     else
  180         continue
  181     fi
  182 done
  183 echo "ERROR: Unable to locate appropriate python command" >&2
  184 exit $EX_PYTHON_INVALID
  185 EOF'''.format(
  186             EX_THIN_PYTHON_INVALID=salt.defaults.exitcodes.EX_THIN_PYTHON_INVALID,
  187         ).split(
  188             "\n"
  189         )
  190     ]
  191 )
  192 
  193 if not is_windows():
  194     shim_file = os.path.join(os.path.dirname(__file__), "ssh_py_shim.py")
  195     if not os.path.exists(shim_file):
  196         # On esky builds we only have the .pyc file
  197         shim_file += "c"
  198     with salt.utils.files.fopen(shim_file) as ssh_py_shim:
  199         SSH_PY_SHIM = ssh_py_shim.read()
  200 
  201 log = logging.getLogger(__name__)
  202 
  203 
  204 class SSH:
  205     """
  206     Create an SSH execution system
  207     """
  208 
  209     ROSTER_UPDATE_FLAG = "#__needs_update"
  210 
  211     def __init__(self, opts):
  212         self.__parsed_rosters = {SSH.ROSTER_UPDATE_FLAG: True}
  213         pull_sock = os.path.join(opts["sock_dir"], "master_event_pull.ipc")
  214         if os.path.exists(pull_sock) and zmq:
  215             self.event = salt.utils.event.get_event(
  216                 "master", opts["sock_dir"], opts["transport"], opts=opts, listen=False
  217             )
  218         else:
  219             self.event = None
  220         self.opts = opts
  221         if self.opts["regen_thin"]:
  222             self.opts["ssh_wipe"] = True
  223         if not salt.utils.path.which("ssh"):
  224             raise salt.exceptions.SaltSystemExit(
  225                 code=-1,
  226                 msg="No ssh binary found in path -- ssh must be installed for salt-ssh to run. Exiting.",
  227             )
  228         self.opts["_ssh_version"] = ssh_version()
  229         self.tgt_type = (
  230             self.opts["selected_target_option"]
  231             if self.opts["selected_target_option"]
  232             else "glob"
  233         )
  234         self._expand_target()
  235         self.roster = salt.roster.Roster(self.opts, self.opts.get("roster", "flat"))
  236         self.targets = self.roster.targets(self.opts["tgt"], self.tgt_type)
  237         if not self.targets:
  238             self._update_targets()
  239         # If we're in a wfunc, we need to get the ssh key location from the
  240         # top level opts, stored in __master_opts__
  241         if "__master_opts__" in self.opts:
  242             if self.opts["__master_opts__"].get("ssh_use_home_key") and os.path.isfile(
  243                 os.path.expanduser("~/.ssh/id_rsa")
  244             ):
  245                 priv = os.path.expanduser("~/.ssh/id_rsa")
  246             else:
  247                 priv = self.opts["__master_opts__"].get(
  248                     "ssh_priv",
  249                     os.path.join(
  250                         self.opts["__master_opts__"]["pki_dir"], "ssh", "salt-ssh.rsa"
  251                     ),
  252                 )
  253         else:
  254             priv = self.opts.get(
  255                 "ssh_priv", os.path.join(self.opts["pki_dir"], "ssh", "salt-ssh.rsa")
  256             )
  257         if priv != "agent-forwarding":
  258             if not os.path.isfile(priv):
  259                 try:
  260                     salt.client.ssh.shell.gen_key(priv)
  261                 except OSError:
  262                     raise salt.exceptions.SaltClientError(
  263                         "salt-ssh could not be run because it could not generate keys.\n\n"
  264                         "You can probably resolve this by executing this script with "
  265                         "increased permissions via sudo or by running as root.\n"
  266                         "You could also use the '-c' option to supply a configuration "
  267                         "directory that you have permissions to read and write to."
  268                     )
  269         self.defaults = {
  270             "user": self.opts.get(
  271                 "ssh_user", salt.config.DEFAULT_MASTER_OPTS["ssh_user"]
  272             ),
  273             "port": self.opts.get(
  274                 "ssh_port", salt.config.DEFAULT_MASTER_OPTS["ssh_port"]
  275             ),
  276             "passwd": self.opts.get(
  277                 "ssh_passwd", salt.config.DEFAULT_MASTER_OPTS["ssh_passwd"]
  278             ),
  279             "priv": priv,
  280             "priv_passwd": self.opts.get(
  281                 "ssh_priv_passwd", salt.config.DEFAULT_MASTER_OPTS["ssh_priv_passwd"]
  282             ),
  283             "timeout": self.opts.get(
  284                 "ssh_timeout", salt.config.DEFAULT_MASTER_OPTS["ssh_timeout"]
  285             )
  286             + self.opts.get("timeout", salt.config.DEFAULT_MASTER_OPTS["timeout"]),
  287             "sudo": self.opts.get(
  288                 "ssh_sudo", salt.config.DEFAULT_MASTER_OPTS["ssh_sudo"]
  289             ),
  290             "sudo_user": self.opts.get(
  291                 "ssh_sudo_user", salt.config.DEFAULT_MASTER_OPTS["ssh_sudo_user"]
  292             ),
  293             "identities_only": self.opts.get(
  294                 "ssh_identities_only",
  295                 salt.config.DEFAULT_MASTER_OPTS["ssh_identities_only"],
  296             ),
  297             "remote_port_forwards": self.opts.get("ssh_remote_port_forwards"),
  298             "ssh_options": self.opts.get("ssh_options"),
  299         }
  300         if self.opts.get("rand_thin_dir"):
  301             self.defaults["thin_dir"] = os.path.join(
  302                 "/var/tmp", ".{}".format(uuid.uuid4().hex[:6])
  303             )
  304             self.opts["ssh_wipe"] = "True"
  305         self.serial = salt.payload.Serial(opts)
  306         self.returners = salt.loader.returners(self.opts, {})
  307         self.fsclient = salt.fileclient.FSClient(self.opts)
  308         self.thin = salt.utils.thin.gen_thin(
  309             self.opts["cachedir"],
  310             extra_mods=self.opts.get("thin_extra_mods"),
  311             overwrite=self.opts["regen_thin"],
  312             python2_bin=self.opts["python2_bin"],
  313             python3_bin=self.opts["python3_bin"],
  314             extended_cfg=self.opts.get("ssh_ext_alternatives"),
  315         )
  316         self.mods = mod_data(self.fsclient)
  317 
  318     @property
  319     def parse_tgt(self):
  320         """
  321         Method to determine the hostname and user
  322         when bypassing the roster and using
  323         ssh syntax (ex. root@localhost)
  324         """
  325         if not self.opts.get("ssh_cli_tgt"):
  326             self.opts["ssh_cli_tgt"] = self.opts.get("tgt", "")
  327         hostname = self.opts.get("ssh_cli_tgt", "")
  328         if "@" in hostname:
  329             user, hostname = hostname.split("@", 1)
  330         else:
  331             user = self.opts.get("ssh_user")
  332         return {"hostname": hostname, "user": user}
  333 
  334     def _get_roster(self):
  335         """
  336         Read roster filename as a key to the data.
  337         :return:
  338         """
  339         roster_file = salt.roster.get_roster_file(self.opts)
  340         if roster_file not in self.__parsed_rosters:
  341             roster_data = compile_template(
  342                 roster_file,
  343                 salt.loader.render(self.opts, {}),
  344                 self.opts["renderer"],
  345                 self.opts["renderer_blacklist"],
  346                 self.opts["renderer_whitelist"],
  347             )
  348             self.__parsed_rosters[roster_file] = roster_data
  349         return roster_file
  350 
  351     def _expand_target(self):
  352         """
  353         Figures out if the target is a reachable host without wildcards, expands if any.
  354         :return:
  355         """
  356         # TODO: Support -L
  357         hostname = self.parse_tgt["hostname"]
  358         if isinstance(hostname, list):
  359             return
  360 
  361         needs_expansion = "*" not in hostname and salt.utils.network.is_reachable_host(
  362             hostname
  363         )
  364         if needs_expansion:
  365             if hostname is None:
  366                 # Reverse lookup failed
  367                 return
  368             self._get_roster()
  369             for roster_filename in self.__parsed_rosters:
  370                 roster_data = self.__parsed_rosters[roster_filename]
  371                 if not isinstance(roster_data, bool):
  372                     for host_id in roster_data:
  373                         if hostname in [host_id, roster_data[host_id].get("host")]:
  374                             if hostname != self.opts["tgt"]:
  375                                 self.opts["tgt"] = hostname
  376                             self.__parsed_rosters[self.ROSTER_UPDATE_FLAG] = False
  377                             return
  378 
  379     def _update_roster(self):
  380         """
  381         Update default flat roster with the passed in information.
  382         :return:
  383         """
  384         roster_file = self._get_roster()
  385         if os.access(roster_file, os.W_OK):
  386             if self.__parsed_rosters[self.ROSTER_UPDATE_FLAG]:
  387                 with salt.utils.files.fopen(roster_file, "a") as roster_fp:
  388                     roster_fp.write(
  389                         '# Automatically added by "{s_user}" at {s_time}\n{hostname}:\n    host: '
  390                         "{hostname}\n    user: {user}"
  391                         "\n    passwd: {passwd}\n".format(
  392                             s_user=getpass.getuser(),
  393                             s_time=datetime.datetime.utcnow().isoformat(),
  394                             hostname=self.opts.get("tgt", ""),
  395                             user=self.opts.get("ssh_user", ""),
  396                             passwd=self.opts.get("ssh_passwd", ""),
  397                         )
  398                     )
  399                 log.info(
  400                     "The host {} has been added to the roster {}".format(
  401                         self.opts.get("tgt", ""), roster_file
  402                     )
  403                 )
  404         else:
  405             log.error("Unable to update roster {}: access denied".format(roster_file))
  406 
  407     def _update_targets(self):
  408         """
  409         Uptade targets in case hostname was directly passed without the roster.
  410         :return:
  411         """
  412         hostname = self.parse_tgt["hostname"]
  413         user = self.parse_tgt["user"]
  414         if hostname == "*":
  415             hostname = ""
  416 
  417         if salt.utils.network.is_reachable_host(hostname):
  418             self.opts["tgt"] = hostname
  419             self.targets[hostname] = {
  420                 "passwd": self.opts.get("ssh_passwd", ""),
  421                 "host": hostname,
  422                 "user": user,
  423             }
  424             if self.opts.get("ssh_update_roster"):
  425                 self._update_roster()
  426 
  427     def get_pubkey(self):
  428         """
  429         Return the key string for the SSH public key
  430         """
  431         if (
  432             "__master_opts__" in self.opts
  433             and self.opts["__master_opts__"].get("ssh_use_home_key")
  434             and os.path.isfile(os.path.expanduser("~/.ssh/id_rsa"))
  435         ):
  436             priv = os.path.expanduser("~/.ssh/id_rsa")
  437         else:
  438             priv = self.opts.get(
  439                 "ssh_priv", os.path.join(self.opts["pki_dir"], "ssh", "salt-ssh.rsa")
  440             )
  441         pub = "{}.pub".format(priv)
  442         with salt.utils.files.fopen(pub, "r") as fp_:
  443             return "{} rsa root@master".format(fp_.read().split()[1])
  444 
  445     def key_deploy(self, host, ret):
  446         """
  447         Deploy the SSH key if the minions don't auth
  448         """
  449         if not isinstance(ret[host], dict) or self.opts.get("ssh_key_deploy"):
  450             target = self.targets[host]
  451             if target.get("passwd", False) or self.opts["ssh_passwd"]:
  452                 self._key_deploy_run(host, target, False)
  453             return ret
  454         if ret[host].get("stderr", "").count("Permission denied"):
  455             target = self.targets[host]
  456             # permission denied, attempt to auto deploy ssh key
  457             print(
  458                 (
  459                     "Permission denied for host {}, do you want to deploy "
  460                     "the salt-ssh key? (password required):"
  461                 ).format(host)
  462             )
  463             deploy = input("[Y/n] ")
  464             if deploy.startswith(("n", "N")):
  465                 return ret
  466             target["passwd"] = getpass.getpass(
  467                 "Password for {}@{}: ".format(target["user"], host)
  468             )
  469             return self._key_deploy_run(host, target, True)
  470         return ret
  471 
  472     def _key_deploy_run(self, host, target, re_run=True):
  473         """
  474         The ssh-copy-id routine
  475         """
  476         argv = [
  477             "ssh.set_auth_key",
  478             target.get("user", "root"),
  479             self.get_pubkey(),
  480         ]
  481 
  482         single = Single(
  483             self.opts,
  484             argv,
  485             host,
  486             mods=self.mods,
  487             fsclient=self.fsclient,
  488             thin=self.thin,
  489             **target
  490         )
  491         if salt.utils.path.which("ssh-copy-id"):
  492             # we have ssh-copy-id, use it!
  493             stdout, stderr, retcode = single.shell.copy_id()
  494         else:
  495             stdout, stderr, retcode = single.run()
  496         if re_run:
  497             target.pop("passwd")
  498             single = Single(
  499                 self.opts,
  500                 self.opts["argv"],
  501                 host,
  502                 mods=self.mods,
  503                 fsclient=self.fsclient,
  504                 thin=self.thin,
  505                 **target
  506             )
  507             stdout, stderr, retcode = single.cmd_block()
  508             try:
  509                 data = salt.utils.json.find_json(stdout)
  510                 return {host: data.get("local", data)}
  511             except Exception:  # pylint: disable=broad-except
  512                 if stderr:
  513                     return {host: stderr}
  514                 return {host: "Bad Return"}
  515         if salt.defaults.exitcodes.EX_OK != retcode:
  516             return {host: stderr}
  517         return {host: stdout}
  518 
  519     def handle_routine(self, que, opts, host, target, mine=False):
  520         """
  521         Run the routine in a "Thread", put a dict on the queue
  522         """
  523         opts = copy.deepcopy(opts)
  524         single = Single(
  525             opts,
  526             opts["argv"],
  527             host,
  528             mods=self.mods,
  529             fsclient=self.fsclient,
  530             thin=self.thin,
  531             mine=mine,
  532             **target
  533         )
  534         ret = {"id": single.id}
  535         stdout, stderr, retcode = single.run()
  536         # This job is done, yield
  537         try:
  538             data = salt.utils.json.find_json(stdout)
  539             if len(data) < 2 and "local" in data:
  540                 ret["ret"] = data["local"]
  541             else:
  542                 ret["ret"] = {
  543                     "stdout": stdout,
  544                     "stderr": stderr,
  545                     "retcode": retcode,
  546                 }
  547         except Exception:  # pylint: disable=broad-except
  548             ret["ret"] = {
  549                 "stdout": stdout,
  550                 "stderr": stderr,
  551                 "retcode": retcode,
  552             }
  553         que.put(ret)
  554 
  555     def handle_ssh(self, mine=False):
  556         """
  557         Spin up the needed threads or processes and execute the subsequent
  558         routines
  559         """
  560         que = multiprocessing.Queue()
  561         running = {}
  562         target_iter = self.targets.__iter__()
  563         returned = set()
  564         rets = set()
  565         init = False
  566         while True:
  567             if not self.targets:
  568                 log.error("No matching targets found in roster.")
  569                 break
  570             if len(running) < self.opts.get("ssh_max_procs", 25) and not init:
  571                 try:
  572                     host = next(target_iter)
  573                 except StopIteration:
  574                     init = True
  575                     continue
  576                 for default in self.defaults:
  577                     if default not in self.targets[host]:
  578                         self.targets[host][default] = self.defaults[default]
  579                 if "host" not in self.targets[host]:
  580                     self.targets[host]["host"] = host
  581                 if self.targets[host].get("winrm") and not HAS_WINSHELL:
  582                     returned.add(host)
  583                     rets.add(host)
  584                     log_msg = "Please contact sales@saltstack.com for access to the enterprise saltwinshell module."
  585                     log.debug(log_msg)
  586                     no_ret = {
  587                         "fun_args": [],
  588                         "jid": None,
  589                         "return": log_msg,
  590                         "retcode": 1,
  591                         "fun": "",
  592                         "id": host,
  593                     }
  594                     yield {host: no_ret}
  595                     continue
  596                 args = (
  597                     que,
  598                     self.opts,
  599                     host,
  600                     self.targets[host],
  601                     mine,
  602                 )
  603                 routine = Process(target=self.handle_routine, args=args)
  604                 routine.start()
  605                 running[host] = {"thread": routine}
  606                 continue
  607             ret = {}
  608             try:
  609                 ret = que.get(False)
  610                 if "id" in ret:
  611                     returned.add(ret["id"])
  612                     yield {ret["id"]: ret["ret"]}
  613             except Exception:  # pylint: disable=broad-except
  614                 # This bare exception is here to catch spurious exceptions
  615                 # thrown by que.get during healthy operation. Please do not
  616                 # worry about this bare exception, it is entirely here to
  617                 # control program flow.
  618                 pass
  619             for host in running:
  620                 if not running[host]["thread"].is_alive():
  621                     if host not in returned:
  622                         # Try to get any returns that came through since we
  623                         # last checked
  624                         try:
  625                             while True:
  626                                 ret = que.get(False)
  627                                 if "id" in ret:
  628                                     returned.add(ret["id"])
  629                                     yield {ret["id"]: ret["ret"]}
  630                         except Exception:  # pylint: disable=broad-except
  631                             pass
  632 
  633                         if host not in returned:
  634                             error = (
  635                                 "Target '{}' did not return any data, "
  636                                 "probably due to an error."
  637                             ).format(host)
  638                             ret = {"id": host, "ret": error}
  639                             log.error(error)
  640                             yield {ret["id"]: ret["ret"]}
  641                     running[host]["thread"].join()
  642                     rets.add(host)
  643             for host in rets:
  644                 if host in running:
  645                     running.pop(host)
  646             if len(rets) >= len(self.targets):
  647                 break
  648             # Sleep when limit or all threads started
  649             if len(running) >= self.opts.get("ssh_max_procs", 25) or len(
  650                 self.targets
  651             ) >= len(running):
  652                 time.sleep(0.1)
  653 
  654     def run_iter(self, mine=False, jid=None):
  655         """
  656         Execute and yield returns as they come in, do not print to the display
  657 
  658         mine
  659             The Single objects will use mine_functions defined in the roster,
  660             pillar, or master config (they will be checked in that order) and
  661             will modify the argv with the arguments from mine_functions
  662         """
  663         fstr = "{}.prep_jid".format(self.opts["master_job_cache"])
  664         jid = self.returners[fstr](passed_jid=jid or self.opts.get("jid", None))
  665 
  666         # Save the invocation information
  667         argv = self.opts["argv"]
  668 
  669         if self.opts.get("raw_shell", False):
  670             fun = "ssh._raw"
  671             args = argv
  672         else:
  673             fun = argv[0] if argv else ""
  674             args = argv[1:]
  675 
  676         job_load = {
  677             "jid": jid,
  678             "tgt_type": self.tgt_type,
  679             "tgt": self.opts["tgt"],
  680             "user": self.opts["user"],
  681             "fun": fun,
  682             "arg": args,
  683         }
  684 
  685         # save load to the master job cache
  686         if self.opts["master_job_cache"] == "local_cache":
  687             self.returners["{}.save_load".format(self.opts["master_job_cache"])](
  688                 jid, job_load, minions=self.targets.keys()
  689             )
  690         else:
  691             self.returners["{}.save_load".format(self.opts["master_job_cache"])](
  692                 jid, job_load
  693             )
  694 
  695         for ret in self.handle_ssh(mine=mine):
  696             host = next(iter(ret.keys()))
  697             self.cache_job(jid, host, ret[host], fun)
  698             if self.event:
  699                 id_, data = next(iter(ret.items()))
  700                 if isinstance(data, str):
  701                     data = {"return": data}
  702                 if "id" not in data:
  703                     data["id"] = id_
  704                 if "fun" not in data:
  705                     data["fun"] = fun
  706                 data[
  707                     "jid"
  708                 ] = jid  # make the jid in the payload the same as the jid in the tag
  709                 self.event.fire_event(
  710                     data, salt.utils.event.tagify([jid, "ret", host], "job")
  711                 )
  712             yield ret
  713 
  714     def cache_job(self, jid, id_, ret, fun):
  715         """
  716         Cache the job information
  717         """
  718         self.returners["{}.returner".format(self.opts["master_job_cache"])](
  719             {"jid": jid, "id": id_, "return": ret, "fun": fun}
  720         )
  721 
  722     def run(self, jid=None):
  723         """
  724         Execute the overall routine, print results via outputters
  725         """
  726         if self.opts.get("list_hosts"):
  727             self._get_roster()
  728             ret = {}
  729             for roster_file in self.__parsed_rosters:
  730                 if roster_file.startswith("#"):
  731                     continue
  732                 ret[roster_file] = {}
  733                 for host_id in self.__parsed_rosters[roster_file]:
  734                     hostname = self.__parsed_rosters[roster_file][host_id]["host"]
  735                     ret[roster_file][host_id] = hostname
  736             salt.output.display_output(ret, "nested", self.opts)
  737             sys.exit()
  738 
  739         fstr = "{}.prep_jid".format(self.opts["master_job_cache"])
  740         jid = self.returners[fstr](passed_jid=jid or self.opts.get("jid", None))
  741 
  742         # Save the invocation information
  743         argv = self.opts["argv"]
  744 
  745         if self.opts.get("raw_shell", False):
  746             fun = "ssh._raw"
  747             args = argv
  748         else:
  749             fun = argv[0] if argv else ""
  750             args = argv[1:]
  751 
  752         job_load = {
  753             "jid": jid,
  754             "tgt_type": self.tgt_type,
  755             "tgt": self.opts["tgt"],
  756             "user": self.opts["user"],
  757             "fun": fun,
  758             "arg": args,
  759         }
  760 
  761         # save load to the master job cache
  762         try:
  763             if isinstance(jid, bytes):
  764                 jid = jid.decode("utf-8")
  765             if self.opts["master_job_cache"] == "local_cache":
  766                 self.returners["{}.save_load".format(self.opts["master_job_cache"])](
  767                     jid, job_load, minions=self.targets.keys()
  768                 )
  769             else:
  770                 self.returners["{}.save_load".format(self.opts["master_job_cache"])](
  771                     jid, job_load
  772                 )
  773         except Exception as exc:  # pylint: disable=broad-except
  774             log.exception(exc)
  775             log.error(
  776                 "Could not save load with returner %s: %s",
  777                 self.opts["master_job_cache"],
  778                 exc,
  779             )
  780 
  781         if self.opts.get("verbose"):
  782             msg = "Executing job with jid {}".format(jid)
  783             print(msg)
  784             print("-" * len(msg) + "\n")
  785             print("")
  786         sret = {}
  787         outputter = self.opts.get("output", "nested")
  788         final_exit = 0
  789         for ret in self.handle_ssh():
  790             host = next(iter(ret.keys()))
  791             if isinstance(ret[host], dict):
  792                 host_ret = ret[host].get("retcode", 0)
  793                 if host_ret != 0:
  794                     final_exit = 1
  795             else:
  796                 # Error on host
  797                 final_exit = 1
  798 
  799             self.cache_job(jid, host, ret[host], fun)
  800             ret = self.key_deploy(host, ret)
  801 
  802             if isinstance(ret[host], dict) and (
  803                 ret[host].get("stderr") or ""
  804             ).startswith("ssh:"):
  805                 ret[host] = ret[host]["stderr"]
  806 
  807             if not isinstance(ret[host], dict):
  808                 p_data = {host: ret[host]}
  809             elif "return" not in ret[host]:
  810                 p_data = ret
  811             else:
  812                 outputter = ret[host].get("out", self.opts.get("output", "nested"))
  813                 p_data = {host: ret[host].get("return", {})}
  814             if self.opts.get("static"):
  815                 sret.update(p_data)
  816             else:
  817                 salt.output.display_output(p_data, outputter, self.opts)
  818             if self.event:
  819                 id_, data = next(iter(ret.items()))
  820                 if isinstance(data, str):
  821                     data = {"return": data}
  822                 if "id" not in data:
  823                     data["id"] = id_
  824                 if "fun" not in data:
  825                     data["fun"] = fun
  826                 data[
  827                     "jid"
  828                 ] = jid  # make the jid in the payload the same as the jid in the tag
  829                 self.event.fire_event(
  830                     data, salt.utils.event.tagify([jid, "ret", host], "job")
  831                 )
  832         if self.opts.get("static"):
  833             salt.output.display_output(sret, outputter, self.opts)
  834         if final_exit:
  835             sys.exit(salt.defaults.exitcodes.EX_AGGREGATE)
  836 
  837 
  838 class Single:
  839     """
  840     Hold onto a single ssh execution
  841     """
  842 
  843     # 1. Get command ready
  844     # 2. Check if target has salt
  845     # 3. deploy salt-thin
  846     # 4. execute requested command via salt-thin
  847     def __init__(
  848         self,
  849         opts,
  850         argv,
  851         id_,
  852         host,
  853         user=None,
  854         port=None,
  855         passwd=None,
  856         priv=None,
  857         priv_passwd=None,
  858         timeout=30,
  859         sudo=False,
  860         tty=False,
  861         mods=None,
  862         fsclient=None,
  863         thin=None,
  864         mine=False,
  865         minion_opts=None,
  866         identities_only=False,
  867         sudo_user=None,
  868         remote_port_forwards=None,
  869         winrm=False,
  870         ssh_options=None,
  871         **kwargs
  872     ):
  873         # Get mine setting and mine_functions if defined in kwargs (from roster)
  874         self.mine = mine
  875         self.mine_functions = kwargs.get("mine_functions")
  876         self.cmd_umask = kwargs.get("cmd_umask", None)
  877 
  878         self.winrm = winrm
  879 
  880         self.opts = opts
  881         self.tty = tty
  882         if kwargs.get("disable_wipe"):
  883             self.wipe = False
  884         else:
  885             self.wipe = bool(self.opts.get("ssh_wipe"))
  886         if kwargs.get("thin_dir"):
  887             self.thin_dir = kwargs["thin_dir"]
  888         elif self.winrm:
  889             saltwinshell.set_winvars(self)
  890             self.python_env = kwargs.get("ssh_python_env")
  891         else:
  892             if user:
  893                 thin_dir = DEFAULT_THIN_DIR.replace("%%USER%%", user)
  894             else:
  895                 thin_dir = DEFAULT_THIN_DIR.replace("%%USER%%", "root")
  896             self.thin_dir = thin_dir.replace(
  897                 "%%FQDNUUID%%",
  898                 uuid.uuid3(uuid.NAMESPACE_DNS, salt.utils.network.get_fqhostname()).hex[
  899                     :6
  900                 ],
  901             )
  902         self.opts["thin_dir"] = self.thin_dir
  903         self.fsclient = fsclient
  904         self.context = {"master_opts": self.opts, "fileclient": self.fsclient}
  905 
  906         self.ssh_pre_flight = kwargs.get("ssh_pre_flight", None)
  907 
  908         if self.ssh_pre_flight:
  909             self.ssh_pre_file = os.path.basename(self.ssh_pre_flight)
  910 
  911         if isinstance(argv, str):
  912             self.argv = [argv]
  913         else:
  914             self.argv = argv
  915 
  916         self.fun, self.args, self.kwargs = self.__arg_comps()
  917         self.id = id_
  918         self.set_path = kwargs.get("set_path", "")
  919 
  920         self.mods = mods if isinstance(mods, dict) else {}
  921         args = {
  922             "host": host,
  923             "user": user,
  924             "port": port,
  925             "passwd": passwd,
  926             "priv": priv,
  927             "priv_passwd": priv_passwd,
  928             "timeout": timeout,
  929             "sudo": sudo,
  930             "tty": tty,
  931             "mods": self.mods,
  932             "identities_only": identities_only,
  933             "sudo_user": sudo_user,
  934             "remote_port_forwards": remote_port_forwards,
  935             "winrm": winrm,
  936             "ssh_options": ssh_options,
  937         }
  938         # Pre apply changeable defaults
  939         self.minion_opts = {
  940             "grains_cache": True,
  941             "log_file": "salt-call.log",
  942         }
  943         self.minion_opts.update(opts.get("ssh_minion_opts", {}))
  944         if minion_opts is not None:
  945             self.minion_opts.update(minion_opts)
  946         # Post apply system needed defaults
  947         self.minion_opts.update(
  948             {
  949                 "root_dir": os.path.join(self.thin_dir, "running_data"),
  950                 "id": self.id,
  951                 "sock_dir": "/",
  952                 "fileserver_list_cache_time": 3,
  953             }
  954         )
  955         self.minion_config = salt.serializers.yaml.serialize(self.minion_opts)
  956         self.target = kwargs
  957         self.target.update(args)
  958         self.serial = salt.payload.Serial(opts)
  959         self.wfuncs = salt.loader.ssh_wrapper(opts, None, self.context)
  960         self.shell = salt.client.ssh.shell.gen_shell(opts, **args)
  961         if self.winrm:
  962             # Determine if Windows client is x86 or AMD64
  963             arch, _, _ = self.shell.exec_cmd("powershell $ENV:PROCESSOR_ARCHITECTURE")
  964             self.arch = arch.strip()
  965         self.thin = thin if thin else salt.utils.thin.thin_path(opts["cachedir"])
  966 
  967     def __arg_comps(self):
  968         """
  969         Return the function name and the arg list
  970         """
  971         fun = self.argv[0] if self.argv else ""
  972         parsed = salt.utils.args.parse_input(
  973             self.argv[1:], condition=False, no_parse=self.opts.get("no_parse", [])
  974         )
  975         args = parsed[0]
  976         kws = parsed[1]
  977         return fun, args, kws
  978 
  979     def _escape_arg(self, arg):
  980         """
  981         Properly escape argument to protect special characters from shell
  982         interpretation.  This avoids having to do tricky argument quoting.
  983 
  984         Effectively just escape all characters in the argument that are not
  985         alphanumeric!
  986         """
  987         if self.winrm:
  988             return arg
  989         return "".join(["\\" + char if re.match(r"\W", char) else char for char in arg])
  990 
  991     def run_ssh_pre_flight(self):
  992         """
  993         Run our pre_flight script before running any ssh commands
  994         """
  995         script = os.path.join(tempfile.gettempdir(), self.ssh_pre_file)
  996 
  997         self.shell.send(self.ssh_pre_flight, script)
  998 
  999         return self.execute_script(script)
 1000 
 1001     def check_thin_dir(self):
 1002         """
 1003         check if the thindir exists on the remote machine
 1004         """
 1005         stdout, stderr, retcode = self.shell.exec_cmd(
 1006             "test -d {}".format(self.thin_dir)
 1007         )
 1008         if retcode != 0:
 1009             return False
 1010         return True
 1011 
 1012     def deploy(self):
 1013         """
 1014         Deploy salt-thin
 1015         """
 1016         self.shell.send(
 1017             self.thin, os.path.join(self.thin_dir, "salt-thin.tgz"),
 1018         )
 1019         self.deploy_ext()
 1020         return True
 1021 
 1022     def deploy_ext(self):
 1023         """
 1024         Deploy the ext_mods tarball
 1025         """
 1026         if self.mods.get("file"):
 1027             self.shell.send(
 1028                 self.mods["file"], os.path.join(self.thin_dir, "salt-ext_mods.tgz"),
 1029             )
 1030         return True
 1031 
 1032     def run(self, deploy_attempted=False):
 1033         """
 1034         Execute the routine, the routine can be either:
 1035         1. Execute a raw shell command
 1036         2. Execute a wrapper func
 1037         3. Execute a remote Salt command
 1038 
 1039         If a (re)deploy is needed, then retry the operation after a deploy
 1040         attempt
 1041 
 1042         Returns tuple of (stdout, stderr, retcode)
 1043         """
 1044         stdout = stderr = retcode = None
 1045 
 1046         if self.ssh_pre_flight:
 1047             if not self.opts.get("ssh_run_pre_flight", False) and self.check_thin_dir():
 1048                 log.info(
 1049                     "{} thin dir already exists. Not running ssh_pre_flight script".format(
 1050                         self.thin_dir
 1051                     )
 1052                 )
 1053             elif not os.path.exists(self.ssh_pre_flight):
 1054                 log.error(
 1055                     "The ssh_pre_flight script {} does not exist".format(
 1056                         self.ssh_pre_flight
 1057                     )
 1058                 )
 1059             else:
 1060                 stdout, stderr, retcode = self.run_ssh_pre_flight()
 1061                 if retcode != 0:
 1062                     log.error(
 1063                         "Error running ssh_pre_flight script {}".format(
 1064                             self.ssh_pre_file
 1065                         )
 1066                     )
 1067                     return stdout, stderr, retcode
 1068                 log.info(
 1069                     "Successfully ran the ssh_pre_flight script: {}".format(
 1070                         self.ssh_pre_file
 1071                     )
 1072                 )
 1073 
 1074         if self.opts.get("raw_shell", False):
 1075             cmd_str = " ".join([self._escape_arg(arg) for arg in self.argv])
 1076             stdout, stderr, retcode = self.shell.exec_cmd(cmd_str)
 1077 
 1078         elif self.fun in self.wfuncs or self.mine:
 1079             stdout, retcode = self.run_wfunc()
 1080 
 1081         else:
 1082             stdout, stderr, retcode = self.cmd_block()
 1083 
 1084         return stdout, stderr, retcode
 1085 
 1086     def run_wfunc(self):
 1087         """
 1088         Execute a wrapper function
 1089 
 1090         Returns tuple of (json_data, '')
 1091         """
 1092         # Ensure that opts/grains are up to date
 1093         # Execute routine
 1094         data_cache = False
 1095         data = None
 1096         cdir = os.path.join(self.opts["cachedir"], "minions", self.id)
 1097         if not os.path.isdir(cdir):
 1098             os.makedirs(cdir)
 1099         datap = os.path.join(cdir, "ssh_data.p")
 1100         refresh = False
 1101         if not os.path.isfile(datap):
 1102             refresh = True
 1103         else:
 1104             passed_time = (time.time() - os.stat(datap).st_mtime) / 60
 1105             if passed_time > self.opts.get("cache_life", 60):
 1106                 refresh = True
 1107 
 1108         if self.opts.get("refresh_cache"):
 1109             refresh = True
 1110         conf_grains = {}
 1111         # Save conf file grains before they get clobbered
 1112         if "ssh_grains" in self.opts:
 1113             conf_grains = self.opts["ssh_grains"]
 1114         if not data_cache:
 1115             refresh = True
 1116         if refresh:
 1117             # Make the datap
 1118             # TODO: Auto expire the datap
 1119             pre_wrapper = salt.client.ssh.wrapper.FunctionWrapper(
 1120                 self.opts,
 1121                 self.id,
 1122                 fsclient=self.fsclient,
 1123                 minion_opts=self.minion_opts,
 1124                 **self.target
 1125             )
 1126 
 1127             opts_pkg = pre_wrapper["test.opts_pkg"]()  # pylint: disable=E1102
 1128             if "_error" in opts_pkg:
 1129                 # Refresh failed
 1130                 retcode = opts_pkg["retcode"]
 1131                 ret = salt.utils.json.dumps({"local": opts_pkg})
 1132                 return ret, retcode
 1133 
 1134             opts_pkg["file_roots"] = self.opts["file_roots"]
 1135             opts_pkg["pillar_roots"] = self.opts["pillar_roots"]
 1136             opts_pkg["ext_pillar"] = self.opts["ext_pillar"]
 1137             opts_pkg["extension_modules"] = self.opts["extension_modules"]
 1138             opts_pkg["module_dirs"] = self.opts["module_dirs"]
 1139             opts_pkg["_ssh_version"] = self.opts["_ssh_version"]
 1140             opts_pkg["thin_dir"] = self.opts["thin_dir"]
 1141             opts_pkg["master_tops"] = self.opts["master_tops"]
 1142             opts_pkg["__master_opts__"] = self.context["master_opts"]
 1143             if "known_hosts_file" in self.opts:
 1144                 opts_pkg["known_hosts_file"] = self.opts["known_hosts_file"]
 1145             if "_caller_cachedir" in self.opts:
 1146                 opts_pkg["_caller_cachedir"] = self.opts["_caller_cachedir"]
 1147             else:
 1148                 opts_pkg["_caller_cachedir"] = self.opts["cachedir"]
 1149             # Use the ID defined in the roster file
 1150             opts_pkg["id"] = self.id
 1151 
 1152             retcode = 0
 1153 
 1154             # Restore master grains
 1155             for grain in conf_grains:
 1156                 opts_pkg["grains"][grain] = conf_grains[grain]
 1157             # Enable roster grains support
 1158             if "grains" in self.target:
 1159                 for grain in self.target["grains"]:
 1160                     opts_pkg["grains"][grain] = self.target["grains"][grain]
 1161 
 1162             popts = {}
 1163             popts.update(opts_pkg["__master_opts__"])
 1164             popts.update(opts_pkg)
 1165             pillar = salt.pillar.Pillar(
 1166                 popts,
 1167                 opts_pkg["grains"],
 1168                 opts_pkg["id"],
 1169                 opts_pkg.get("saltenv", "base"),
 1170             )
 1171             pillar_data = pillar.compile_pillar()
 1172 
 1173             # TODO: cache minion opts in datap in master.py
 1174             data = {
 1175                 "opts": opts_pkg,
 1176                 "grains": opts_pkg["grains"],
 1177                 "pillar": pillar_data,
 1178             }
 1179             if data_cache:
 1180                 with salt.utils.files.fopen(datap, "w+b") as fp_:
 1181                     fp_.write(self.serial.dumps(data))
 1182         if not data and data_cache:
 1183             with salt.utils.files.fopen(datap, "rb") as fp_:
 1184                 data = self.serial.load(fp_)
 1185         opts = data.get("opts", {})
 1186         opts["grains"] = data.get("grains")
 1187 
 1188         # Restore master grains
 1189         for grain in conf_grains:
 1190             opts["grains"][grain] = conf_grains[grain]
 1191         # Enable roster grains support
 1192         if "grains" in self.target:
 1193             for grain in self.target["grains"]:
 1194                 opts["grains"][grain] = self.target["grains"][grain]
 1195 
 1196         opts["pillar"] = data.get("pillar")
 1197         wrapper = salt.client.ssh.wrapper.FunctionWrapper(
 1198             opts,
 1199             self.id,
 1200             fsclient=self.fsclient,
 1201             minion_opts=self.minion_opts,
 1202             **self.target
 1203         )
 1204         wrapper.fsclient.opts["cachedir"] = opts["cachedir"]
 1205         self.wfuncs = salt.loader.ssh_wrapper(opts, wrapper, self.context)
 1206         wrapper.wfuncs = self.wfuncs
 1207 
 1208         # We're running in the mine, need to fetch the arguments from the
 1209         # roster, pillar, master config (in that order)
 1210         if self.mine:
 1211             mine_args = None
 1212             mine_fun_data = None
 1213             mine_fun = self.fun
 1214 
 1215             if self.mine_functions and self.fun in self.mine_functions:
 1216                 mine_fun_data = self.mine_functions[self.fun]
 1217             elif opts["pillar"] and self.fun in opts["pillar"].get(
 1218                 "mine_functions", {}
 1219             ):
 1220                 mine_fun_data = opts["pillar"]["mine_functions"][self.fun]
 1221             elif self.fun in self.context["master_opts"].get("mine_functions", {}):
 1222                 mine_fun_data = self.context["master_opts"]["mine_functions"][self.fun]
 1223 
 1224             if isinstance(mine_fun_data, dict):
 1225                 mine_fun = mine_fun_data.pop("mine_function", mine_fun)
 1226                 mine_args = mine_fun_data
 1227             elif isinstance(mine_fun_data, list):
 1228                 for item in mine_fun_data[:]:
 1229                     if isinstance(item, dict) and "mine_function" in item:
 1230                         mine_fun = item["mine_function"]
 1231                         mine_fun_data.pop(mine_fun_data.index(item))
 1232                 mine_args = mine_fun_data
 1233             else:
 1234                 mine_args = mine_fun_data
 1235 
 1236             # If we found mine_args, replace our command's args
 1237             if isinstance(mine_args, dict):
 1238                 self.args = []
 1239                 self.kwargs = mine_args
 1240             elif isinstance(mine_args, list):
 1241                 self.args = mine_args
 1242                 self.kwargs = {}
 1243 
 1244         try:
 1245             if self.mine:
 1246                 result = wrapper[mine_fun](*self.args, **self.kwargs)
 1247             else:
 1248                 result = self.wfuncs[self.fun](*self.args, **self.kwargs)
 1249         except TypeError as exc:
 1250             result = "TypeError encountered executing {}: {}".format(self.fun, exc)
 1251             log.error(result, exc_info_on_loglevel=logging.DEBUG)
 1252             retcode = 1
 1253         except Exception as exc:  # pylint: disable=broad-except
 1254             result = "An Exception occurred while executing {}: {}".format(
 1255                 self.fun, exc
 1256             )
 1257             log.error(result, exc_info_on_loglevel=logging.DEBUG)
 1258             retcode = 1
 1259         # Mimic the json data-structure that "salt-call --local" will
 1260         # emit (as seen in ssh_py_shim.py)
 1261         if isinstance(result, dict) and "local" in result:
 1262             ret = salt.utils.json.dumps({"local": result["local"]})
 1263         else:
 1264             ret = salt.utils.json.dumps({"local": {"return": result}})
 1265         return ret, retcode
 1266 
 1267     def _cmd_str(self):
 1268         """
 1269         Prepare the command string
 1270         """
 1271         sudo = "sudo" if self.target["sudo"] else ""
 1272         sudo_user = self.target["sudo_user"]
 1273         if "_caller_cachedir" in self.opts:
 1274             cachedir = self.opts["_caller_cachedir"]
 1275         else:
 1276             cachedir = self.opts["cachedir"]
 1277         thin_code_digest, thin_sum = salt.utils.thin.thin_sum(cachedir, "sha1")
 1278         debug = ""
 1279         if not self.opts.get("log_level"):
 1280             self.opts["log_level"] = "info"
 1281         if (
 1282             salt.log.LOG_LEVELS["debug"]
 1283             >= salt.log.LOG_LEVELS[self.opts.get("log_level", "info")]
 1284         ):
 1285             debug = "1"
 1286         arg_str = '''
 1287 OPTIONS.config = \
 1288 """
 1289 {config}
 1290 """
 1291 OPTIONS.delimiter = '{delimeter}'
 1292 OPTIONS.saltdir = '{saltdir}'
 1293 OPTIONS.checksum = '{checksum}'
 1294 OPTIONS.hashfunc = '{hashfunc}'
 1295 OPTIONS.version = '{version}'
 1296 OPTIONS.ext_mods = '{ext_mods}'
 1297 OPTIONS.wipe = {wipe}
 1298 OPTIONS.tty = {tty}
 1299 OPTIONS.cmd_umask = {cmd_umask}
 1300 OPTIONS.code_checksum = {code_checksum}
 1301 ARGS = {arguments}\n'''.format(
 1302             config=self.minion_config,
 1303             delimeter=RSTR,
 1304             saltdir=self.thin_dir,
 1305             checksum=thin_sum,
 1306             hashfunc="sha1",
 1307             version=salt.version.__version__,
 1308             ext_mods=self.mods.get("version", ""),
 1309             wipe=self.wipe,
 1310             tty=self.tty,
 1311             cmd_umask=self.cmd_umask,
 1312             code_checksum=thin_code_digest,
 1313             arguments=self.argv,
 1314         )
 1315         py_code = SSH_PY_SHIM.replace("#%%OPTS", arg_str)
 1316         py_code_enc = base64.encodebytes(py_code.encode("utf-8")).decode("utf-8")
 1317         if not self.winrm:
 1318             cmd = SSH_SH_SHIM.format(
 1319                 DEBUG=debug,
 1320                 SUDO=sudo,
 1321                 SUDO_USER=sudo_user,
 1322                 SSH_PY_CODE=py_code_enc,
 1323                 HOST_PY_MAJOR=sys.version_info[0],
 1324                 SET_PATH=self.set_path,
 1325             )
 1326         else:
 1327             cmd = saltwinshell.gen_shim(py_code_enc)
 1328 
 1329         return cmd
 1330 
 1331     def execute_script(self, script, extension="py", pre_dir=""):
 1332         """
 1333         execute a script on the minion then delete
 1334         """
 1335         if extension == "ps1":
 1336             ret = self.shell.exec_cmd('"powershell {}"'.format(script))
 1337         else:
 1338             if not self.winrm:
 1339                 ret = self.shell.exec_cmd("/bin/sh '{}{}'".format(pre_dir, script))
 1340             else:
 1341                 ret = saltwinshell.call_python(self, script)
 1342 
 1343         # Remove file from target system
 1344         if not self.winrm:
 1345             self.shell.exec_cmd("rm '{}{}'".format(pre_dir, script))
 1346         else:
 1347             self.shell.exec_cmd("del {}".format(script))
 1348 
 1349         return ret
 1350 
 1351     def shim_cmd(self, cmd_str, extension="py"):
 1352         """
 1353         Run a shim command.
 1354 
 1355         If tty is enabled, we must scp the shim to the target system and
 1356         execute it there
 1357         """
 1358         if not self.tty and not self.winrm:
 1359             return self.shell.exec_cmd(cmd_str)
 1360 
 1361         # Write the shim to a temporary file in the default temp directory
 1362         with tempfile.NamedTemporaryFile(
 1363             mode="w+b", prefix="shim_", delete=False
 1364         ) as shim_tmp_file:
 1365             shim_tmp_file.write(salt.utils.stringutils.to_bytes(cmd_str))
 1366 
 1367         # Copy shim to target system, under $HOME/.<randomized name>
 1368         target_shim_file = ".{}.{}".format(
 1369             binascii.hexlify(os.urandom(6)).decode("ascii"), extension
 1370         )
 1371         if self.winrm:
 1372             target_shim_file = saltwinshell.get_target_shim_file(self, target_shim_file)
 1373         self.shell.send(shim_tmp_file.name, target_shim_file, makedirs=True)
 1374 
 1375         # Remove our shim file
 1376         try:
 1377             os.remove(shim_tmp_file.name)
 1378         except OSError:
 1379             pass
 1380 
 1381         ret = self.execute_script(script=target_shim_file, extension=extension)
 1382 
 1383         return ret
 1384 
 1385     def cmd_block(self, is_retry=False):
 1386         """
 1387         Prepare the pre-check command to send to the subsystem
 1388 
 1389         1. execute SHIM + command
 1390         2. check if SHIM returns a master request or if it completed
 1391         3. handle any master request
 1392         4. re-execute SHIM + command
 1393         5. split SHIM results from command results
 1394         6. return command results
 1395         """
 1396         self.argv = _convert_args(self.argv)
 1397         log.debug(
 1398             "Performing shimmed, blocking command as follows:\n%s",
 1399             " ".join([str(arg) for arg in self.argv]),
 1400         )
 1401         cmd_str = self._cmd_str()
 1402         stdout, stderr, retcode = self.shim_cmd(cmd_str)
 1403 
 1404         log.trace("STDOUT %s\n%s", self.target["host"], stdout)
 1405         log.trace("STDERR %s\n%s", self.target["host"], stderr)
 1406         log.debug("RETCODE %s: %s", self.target["host"], retcode)
 1407 
 1408         error = self.categorize_shim_errors(stdout, stderr, retcode)
 1409         if error:
 1410             if error == "Python environment not found on Windows system":
 1411                 saltwinshell.deploy_python(self)
 1412                 stdout, stderr, retcode = self.shim_cmd(cmd_str)
 1413                 while re.search(RSTR_RE, stdout):
 1414                     stdout = re.split(RSTR_RE, stdout, 1)[1].strip()
 1415                 while re.search(RSTR_RE, stderr):
 1416                     stderr = re.split(RSTR_RE, stderr, 1)[1].strip()
 1417             elif error == "Undefined SHIM state":
 1418                 self.deploy()
 1419                 stdout, stderr, retcode = self.shim_cmd(cmd_str)
 1420                 if not re.search(RSTR_RE, stdout) or not re.search(RSTR_RE, stderr):
 1421                     # If RSTR is not seen in both stdout and stderr then there
 1422                     # was a thin deployment problem.
 1423                     return (
 1424                         "ERROR: Failure deploying thin, undefined state: {}".format(
 1425                             stdout
 1426                         ),
 1427                         stderr,
 1428                         retcode,
 1429                     )
 1430                 while re.search(RSTR_RE, stdout):
 1431                     stdout = re.split(RSTR_RE, stdout, 1)[1].strip()
 1432                 while re.search(RSTR_RE, stderr):
 1433                     stderr = re.split(RSTR_RE, stderr, 1)[1].strip()
 1434             else:
 1435                 return "ERROR: {}".format(error), stderr, retcode
 1436 
 1437         # FIXME: this discards output from ssh_shim if the shim succeeds.  It should
 1438         # always save the shim output regardless of shim success or failure.
 1439         while re.search(RSTR_RE, stdout):
 1440             stdout = re.split(RSTR_RE, stdout, 1)[1].strip()
 1441 
 1442         if re.search(RSTR_RE, stderr):
 1443             # Found RSTR in stderr which means SHIM completed and only
 1444             # and remaining output is only from salt.
 1445             while re.search(RSTR_RE, stderr):
 1446                 stderr = re.split(RSTR_RE, stderr, 1)[1].strip()
 1447 
 1448         else:
 1449             # RSTR was found in stdout but not stderr - which means there
 1450             # is a SHIM command for the master.
 1451             shim_command = re.split(r"\r?\n", stdout, 1)[0].strip()
 1452             log.debug("SHIM retcode(%s) and command: %s", retcode, shim_command)
 1453             if (
 1454                 "deploy" == shim_command
 1455                 and retcode == salt.defaults.exitcodes.EX_THIN_DEPLOY
 1456             ):
 1457                 self.deploy()
 1458                 stdout, stderr, retcode = self.shim_cmd(cmd_str)
 1459                 if not re.search(RSTR_RE, stdout) or not re.search(RSTR_RE, stderr):
 1460                     if not self.tty:
 1461                         # If RSTR is not seen in both stdout and stderr then there
 1462                         # was a thin deployment problem.
 1463                         log.error(
 1464                             "ERROR: Failure deploying thin, retrying:\n"
 1465                             "STDOUT:\n%s\nSTDERR:\n%s\nRETCODE: %s",
 1466                             stdout,
 1467                             stderr,
 1468                             retcode,
 1469                         )
 1470                         return self.cmd_block()
 1471                     elif not re.search(RSTR_RE, stdout):
 1472                         # If RSTR is not seen in stdout with tty, then there
 1473                         # was a thin deployment problem.
 1474                         log.error(
 1475                             "ERROR: Failure deploying thin, retrying:\n"
 1476                             "STDOUT:\n%s\nSTDERR:\n%s\nRETCODE: %s",
 1477                             stdout,
 1478                             stderr,
 1479                             retcode,
 1480                         )
 1481                 while re.search(RSTR_RE, stdout):
 1482                     stdout = re.split(RSTR_RE, stdout, 1)[1].strip()
 1483                 if self.tty:
 1484                     stderr = ""
 1485                 else:
 1486                     while re.search(RSTR_RE, stderr):
 1487                         stderr = re.split(RSTR_RE, stderr, 1)[1].strip()
 1488             elif "ext_mods" == shim_command:
 1489                 self.deploy_ext()
 1490                 stdout, stderr, retcode = self.shim_cmd(cmd_str)
 1491                 if not re.search(RSTR_RE, stdout) or not re.search(RSTR_RE, stderr):
 1492                     # If RSTR is not seen in both stdout and stderr then there
 1493                     # was a thin deployment problem.
 1494                     return (
 1495                         "ERROR: Failure deploying ext_mods: {}".format(stdout),
 1496                         stderr,
 1497                         retcode,
 1498                     )
 1499                 while re.search(RSTR_RE, stdout):
 1500                     stdout = re.split(RSTR_RE, stdout, 1)[1].strip()
 1501                 while re.search(RSTR_RE, stderr):
 1502                     stderr = re.split(RSTR_RE, stderr, 1)[1].strip()
 1503 
 1504         return stdout, stderr, retcode
 1505 
 1506     def categorize_shim_errors(self, stdout_bytes, stderr_bytes, retcode):
 1507         stdout = salt.utils.stringutils.to_unicode(stdout_bytes)
 1508         stderr = salt.utils.stringutils.to_unicode(stderr_bytes)
 1509         if re.search(RSTR_RE, stdout) and stdout != RSTR + "\n":
 1510             # RSTR was found in stdout which means that the shim
 1511             # functioned without *errors* . . . but there may be shim
 1512             # commands, unless the only thing we found is RSTR
 1513             return None
 1514 
 1515         if re.search(RSTR_RE, stderr):
 1516             # Undefined state
 1517             return "Undefined SHIM state"
 1518 
 1519         if stderr.startswith("Permission denied"):
 1520             # SHIM was not even reached
 1521             return None
 1522 
 1523         perm_error_fmt = (
 1524             "Permissions problem, target user may need " "to be root or use sudo:\n {0}"
 1525         )
 1526 
 1527         errors = [
 1528             (
 1529                 (),
 1530                 "sudo: no tty present and no askpass program specified",
 1531                 "sudo expected a password, NOPASSWD required",
 1532             ),
 1533             (
 1534                 (salt.defaults.exitcodes.EX_THIN_PYTHON_INVALID,),
 1535                 "Python interpreter is too old",
 1536                 "Python version error. Recommendation(s) follow:\n"
 1537                 "- Install Python 3 on the target machine(s)\n"
 1538                 "- You can use ssh_pre_flight or raw shell (-r) to install Python 3",
 1539             ),
 1540             (
 1541                 (salt.defaults.exitcodes.EX_THIN_CHECKSUM,),
 1542                 "checksum mismatched",
 1543                 "The salt thin transfer was corrupted",
 1544             ),
 1545             (
 1546                 (salt.defaults.exitcodes.EX_SCP_NOT_FOUND,),
 1547                 "scp not found",
 1548                 "No scp binary. openssh-clients package required",
 1549             ),
 1550             (
 1551                 (salt.defaults.exitcodes.EX_CANTCREAT,),
 1552                 "salt path .* exists but is not a directory",
 1553                 "A necessary path for salt thin unexpectedly exists:\n " + stderr,
 1554             ),
 1555             (
 1556                 (),
 1557                 "sudo: sorry, you must have a tty to run sudo",
 1558                 "sudo is configured with requiretty",
 1559             ),
 1560             ((), "Failed to open log file", perm_error_fmt.format(stderr)),
 1561             ((), "Permission denied:.*/salt", perm_error_fmt.format(stderr)),
 1562             (
 1563                 (),
 1564                 "Failed to create directory path.*/salt",
 1565                 perm_error_fmt.format(stderr),
 1566             ),
 1567             (
 1568                 (salt.defaults.exitcodes.EX_SOFTWARE,),
 1569                 "exists but is not",
 1570                 "An internal error occurred with the shim, please investigate:\n "
 1571                 + stderr,
 1572             ),
 1573             (
 1574                 (),
 1575                 "The system cannot find the path specified",
 1576                 "Python environment not found on Windows system",
 1577             ),
 1578             (
 1579                 (),
 1580                 "is not recognized",
 1581                 "Python environment not found on Windows system",
 1582             ),
 1583         ]
 1584 
 1585         for error in errors:
 1586             if retcode in error[0] or re.search(error[1], stderr):
 1587                 return error[2]
 1588         return None
 1589 
 1590     def check_refresh(self, data, ret):
 1591         """
 1592         Stub out check_refresh
 1593         """
 1594         return
 1595 
 1596     def module_refresh(self):
 1597         """
 1598         Module refresh is not needed, stub it out
 1599         """
 1600         return
 1601 
 1602 
 1603 def lowstate_file_refs(chunks):
 1604     """
 1605     Create a list of file ref objects to reconcile
 1606     """
 1607     refs = {}
 1608     for chunk in chunks:
 1609         saltenv = "base"
 1610         crefs = []
 1611         for state in chunk:
 1612             if state == "__env__":
 1613                 saltenv = chunk[state]
 1614             elif state == "saltenv":
 1615                 saltenv = chunk[state]
 1616             elif state.startswith("__"):
 1617                 continue
 1618             crefs.extend(salt_refs(chunk[state]))
 1619         if crefs:
 1620             if saltenv not in refs:
 1621                 refs[saltenv] = []
 1622             refs[saltenv].append(crefs)
 1623     return refs
 1624 
 1625 
 1626 def salt_refs(data):
 1627     """
 1628     Pull salt file references out of the states
 1629     """
 1630     proto = "salt://"
 1631     ret = []
 1632     if isinstance(data, str):
 1633         if data.startswith(proto):
 1634             return [data]
 1635     if isinstance(data, list):
 1636         for comp in data:
 1637             if isinstance(comp, str):
 1638                 if comp.startswith(proto):
 1639                     ret.append(comp)
 1640     return ret
 1641 
 1642 
 1643 def mod_data(fsclient):
 1644     """
 1645     Generate the module arguments for the shim data
 1646     """
 1647     # TODO, change out for a fileserver backend
 1648     sync_refs = [
 1649         "modules",
 1650         "states",
 1651         "grains",
 1652         "renderers",
 1653         "returners",
 1654     ]
 1655     ret = {}
 1656     envs = fsclient.envs()
 1657     ver_base = ""
 1658     for env in envs:
 1659         files = fsclient.file_list(env)
 1660         for ref in sync_refs:
 1661             mods_data = {}
 1662             pref = "_{}".format(ref)
 1663             for fn_ in sorted(files):
 1664                 if fn_.startswith(pref):
 1665                     if fn_.endswith((".py", ".so", ".pyx")):
 1666                         full = salt.utils.url.create(fn_)
 1667                         mod_path = fsclient.cache_file(full, env)
 1668                         if not os.path.isfile(mod_path):
 1669                             continue
 1670                         mods_data[os.path.basename(fn_)] = mod_path
 1671                         chunk = salt.utils.hashutils.get_hash(mod_path)
 1672                         ver_base += chunk
 1673             if mods_data:
 1674                 if ref in ret:
 1675                     ret[ref].update(mods_data)
 1676                 else:
 1677                     ret[ref] = mods_data
 1678     if not ret:
 1679         return {}
 1680 
 1681     ver_base = salt.utils.stringutils.to_bytes(ver_base)
 1682 
 1683     ver = hashlib.sha1(ver_base).hexdigest()
 1684     ext_tar_path = os.path.join(
 1685         fsclient.opts["cachedir"], "ext_mods.{}.tgz".format(ver)
 1686     )
 1687     mods = {"version": ver, "file": ext_tar_path}
 1688     if os.path.isfile(ext_tar_path):
 1689         return mods
 1690     tfp = tarfile.open(ext_tar_path, "w:gz")
 1691     verfile = os.path.join(fsclient.opts["cachedir"], "ext_mods.ver")
 1692     with salt.utils.files.fopen(verfile, "w+") as fp_:
 1693         fp_.write(ver)
 1694     tfp.add(verfile, "ext_version")
 1695     for ref in ret:
 1696         for fn_ in ret[ref]:
 1697             tfp.add(ret[ref][fn_], os.path.join(ref, fn_))
 1698     tfp.close()
 1699     return mods
 1700 
 1701 
 1702 def ssh_version():
 1703     """
 1704     Returns the version of the installed ssh command
 1705     """
 1706     # This function needs more granular checks and to be validated against
 1707     # older versions of ssh
 1708     ret = subprocess.Popen(
 1709         ["ssh", "-V"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
 1710     ).communicate()
 1711     try:
 1712         version_parts = ret[1].split(b",")[0].split(b"_")[1]
 1713         parts = []
 1714         for part in version_parts:
 1715             try:
 1716                 parts.append(int(part))
 1717             except ValueError:
 1718                 return tuple(parts)
 1719         return tuple(parts)
 1720     except IndexError:
 1721         return (2, 0)
 1722 
 1723 
 1724 def _convert_args(args):
 1725     """
 1726     Take a list of args, and convert any dicts inside the list to keyword
 1727     args in the form of `key=value`, ready to be passed to salt-ssh
 1728     """
 1729     converted = []
 1730     for arg in args:
 1731         if isinstance(arg, dict):
 1732             for key in list(arg.keys()):
 1733                 if key == "__kwarg__":
 1734                     continue
 1735                 converted.append("{}={}".format(key, arg[key]))
 1736         else:
 1737             converted.append(arg)
 1738     return converted