"Fossies" - the Fresh Open Source Software Archive

Member "polysh-polysh-0.13/polysh/remote_dispatcher.py" (11 May 2020, 15075 Bytes) of package /linux/privat/polysh-polysh-0.13.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "remote_dispatcher.py" see the Fossies "Dox" file reference documentation.

    1 """Polysh - Remote Shell Dispatcher
    2 
    3 Copyright (c) 2006 Guillaume Chazarain <guichaz@gmail.com>
    4 Copyright (c) 2018 InnoGames GmbH
    5 """
    6 # This program is free software: you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation, either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # This program is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
   18 
   19 import asyncore
   20 import os
   21 import pty
   22 import signal
   23 import sys
   24 import termios
   25 import select
   26 import platform
   27 from typing import Optional, List
   28 from argparse import Namespace
   29 
   30 from polysh.buffered_dispatcher import BufferedDispatcher
   31 from polysh import callbacks
   32 from polysh.console import console_output
   33 from polysh import display_names
   34 
   35 options = None  # type: Optional[Namespace]
   36 
   37 # Either the remote shell is expecting a command or one is already running
   38 STATE_NAMES = ['not_started', 'idle', 'running', 'terminated', 'dead']
   39 
   40 STATE_NOT_STARTED,         \
   41     STATE_IDLE,                \
   42     STATE_RUNNING,             \
   43     STATE_TERMINATED,          \
   44     STATE_DEAD = list(range(len(STATE_NAMES)))
   45 
   46 # Terminal color codes
   47 COLORS = [1] + list(range(30, 37))
   48 
   49 # Count the total number of RemoteDispatcher.handle_read() invocations
   50 nr_handle_read = 0
   51 
   52 
   53 def main_loop_iteration(timeout: Optional[float] = None) -> int:
   54     """Return the number of RemoteDispatcher.handle_read() calls made by this
   55     iteration"""
   56     prev_nr_read = nr_handle_read
   57     asyncore.loop(count=1, timeout=timeout, use_poll=True)
   58     return nr_handle_read - prev_nr_read
   59 
   60 
   61 def log(msg: bytes) -> None:
   62     if options.log_file:
   63         fd = options.log_file.fileno()
   64         while msg:
   65             try:
   66                 written = os.write(fd, msg)
   67             except OSError as e:
   68                 print('Exception while writing log:', options.log_file.name)
   69                 print(e)
   70                 raise asyncore.ExitNow(1)
   71             msg = msg[written:]
   72 
   73 
   74 class RemoteDispatcher(BufferedDispatcher):
   75     """A RemoteDispatcher is a ssh process we communicate with"""
   76 
   77     def __init__(self, hostname: str, port: str) -> None:
   78         if port != "22":
   79             port = "-p " + port
   80         else:
   81             port = ''
   82 
   83         self.pid, fd = pty.fork()
   84         if self.pid == 0:
   85             # Child
   86             self.launch_ssh(hostname, port)
   87             sys.exit(1)
   88 
   89         # Parent
   90         super().__init__(fd)
   91         self.temporary = False
   92         self.hostname = hostname
   93         self.port = port
   94         self.debug = options.debug
   95         self.enabled = True  # shells can be enabled and disabled
   96         self.state = STATE_NOT_STARTED
   97         self.term_size = (-1, -1)
   98         self.display_name = None  # type: Optional[str]
   99         self.change_name(self.hostname.encode())
  100         self.init_string = self.configure_tty() + self.set_prompt()
  101         self.init_string_sent = False
  102         self.read_in_state_not_started = b''
  103         self.command = options.command
  104         self.last_printed_line = b''
  105         self.color_code = None
  106         if sys.stdout.isatty() and not options.disable_color:
  107             COLORS.insert(0, COLORS.pop())  # Rotate the colors
  108             self.color_code = COLORS[0]
  109 
  110     def launch_ssh(self, name: str, port: str) -> None:
  111         """Launch the ssh command in the child process"""
  112         if options.user:
  113             name = '%s@%s' % (options.user, name)
  114         evaluated = options.ssh % {'host': name, 'port': port}
  115         if evaluated == options.ssh:
  116             evaluated = '%s %s' % (evaluated, name)
  117         os.execlp('/bin/sh', 'sh', '-c', evaluated)
  118 
  119     def set_enabled(self, enabled: bool) -> None:
  120         if enabled != self.enabled and options.interactive:
  121             # In non-interactive mode, remote processes leave as soon
  122             # as they are terminated, but we don't want to break the
  123             # indentation if all the remaining processes have short names.
  124             display_names.set_enabled(self.display_name, enabled)
  125         self.enabled = enabled
  126 
  127     def change_state(self, state: int) -> None:
  128         """Change the state of the remote process, logging the change"""
  129         if state is not self.state:
  130             if self.debug:
  131                 self.print_debug(b'state => ' + STATE_NAMES[state].encode())
  132             if self.state is STATE_NOT_STARTED:
  133                 self.read_in_state_not_started = b''
  134             self.state = state
  135 
  136     def disconnect(self) -> None:
  137         """We are no more interested in this remote process"""
  138         try:
  139             os.kill(-self.pid, signal.SIGKILL)
  140         except OSError:
  141             # The process was already dead, no problem
  142             pass
  143         self.read_buffer = b''
  144         self.write_buffer = b''
  145         self.set_enabled(False)
  146         if self.read_in_state_not_started:
  147             self.print_lines(self.read_in_state_not_started)
  148             self.read_in_state_not_started = b''
  149         if options.abort_error and self.state is STATE_NOT_STARTED:
  150             raise asyncore.ExitNow(1)
  151         self.change_state(STATE_DEAD)
  152 
  153     def configure_tty(self) -> bytes:
  154         """We don't want \n to be replaced with \r\n, and we disable the echo"""
  155         attr = termios.tcgetattr(self.fd)
  156         # The following raises a mypy warning, as python type hints don't allow
  157         # per list item granularity.  The last item in attr is List[bytes], but
  158         # we don't access that here.
  159         attr[1] &= ~termios.ONLCR  # type: ignore # oflag
  160         attr[3] &= ~termios.ECHO  # type: ignore # lflag
  161         termios.tcsetattr(self.fd, termios.TCSANOW, attr)
  162         # unsetopt zle prevents Zsh from resetting the tty
  163         return b'unsetopt zle 2> /dev/null;stty -echo -onlcr -ctlecho;'
  164 
  165     def seen_prompt_cb(self, unused: str) -> None:
  166         if options.interactive:
  167             self.change_state(STATE_IDLE)
  168         elif self.command:
  169             p1, p2 = callbacks.add(b'real prompt ends', lambda d: None, True)
  170             self.dispatch_command(b'PS1="' + p1 + b'""' + p2 + b'\n"\n')
  171             self.dispatch_command(self.command.encode() + b'\n')
  172             self.dispatch_command(b'exit 2>/dev/null\n')
  173             self.command = None
  174 
  175     def set_prompt(self) -> bytes:
  176         """The prompt is important because we detect the readyness of a process
  177         by waiting for its prompt."""
  178         # No right prompt
  179         command_line = b'PS2=;RPS1=;RPROMPT=;'
  180         command_line += b'PROMPT_COMMAND=;'
  181         command_line += b'TERM=ansi;'
  182         command_line += b'unset precmd_functions;'
  183         command_line += b'unset HISTFILE;'
  184         prompt1, prompt2 = callbacks.add(b'prompt', self.seen_prompt_cb, True)
  185         command_line += b'PS1="' + prompt1 + b'""' + prompt2 + b'\n"\n'
  186         return command_line
  187 
  188     def readable(self) -> bool:
  189         """We are always interested in reading from active remote processes if
  190         the buffer is OK"""
  191         return (self.state != STATE_DEAD and
  192                 super().readable())
  193 
  194     def handle_expt(self) -> None:
  195         # Dirty hack to ignore POLLPRI flag that is raised on Mac OS, but not
  196         # on linux. asyncore calls this method in case POLLPRI flag is set, but
  197         # self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) == 0
  198         if platform.system() == 'Darwin' and select.POLLPRI:
  199             return
  200 
  201         self.handle_close()
  202 
  203     def handle_close(self) -> None:
  204         if self.state is STATE_DEAD:
  205             # This connection has already been killed. Asyncore has probably
  206             # called handle_close() or handle_expt() on this connection twice.
  207             return
  208 
  209         pid, status = os.waitpid(self.pid, 0)
  210         exit_code = os.WEXITSTATUS(status)
  211         options.exit_code = max(options.exit_code, exit_code)
  212         if exit_code and options.interactive:
  213             console_output('Error talking to {}\n'.format(
  214                 self.display_name).encode())
  215         self.disconnect()
  216         if self.temporary:
  217             self.close()
  218 
  219     def print_lines(self, lines: bytes) -> None:
  220         from polysh.display_names import max_display_name_length
  221         lines = lines.strip(b'\n')
  222         while True:
  223             no_empty_lines = lines.replace(b'\n\n', b'\n')
  224             if len(no_empty_lines) == len(lines):
  225                 break
  226             lines = no_empty_lines
  227         if not lines:
  228             return
  229         indent = max_display_name_length - len(self.display_name)
  230         log_prefix = self.display_name.encode() + indent * b' ' + b' : '
  231         if self.color_code is None:
  232             console_prefix = log_prefix
  233         else:
  234             console_prefix = (b'\033[1;' + str(self.color_code).encode() +
  235                               b'm' + log_prefix + b'\033[1;m')
  236         console_data = (console_prefix +
  237                         lines.replace(b'\n', b'\n' + console_prefix) + b'\n')
  238         log_data = (log_prefix +
  239                     lines.replace(b'\n', b'\n' + log_prefix) + b'\n')
  240         console_output(console_data, logging_msg=log_data)
  241         self.last_printed_line = lines[lines.rfind(b'\n') + 1:]
  242 
  243     def handle_read_fast_case(self, data: bytes) -> bool:
  244         """If we are in a fast case we'll avoid the long processing of each
  245         line"""
  246         if self.state is not STATE_RUNNING or callbacks.any_in(data):
  247             # Slow case :-(
  248             return False
  249 
  250         last_nl = data.rfind(b'\n')
  251         if last_nl == -1:
  252             # No '\n' in data => slow case
  253             return False
  254         self.read_buffer = data[last_nl + 1:]
  255         self.print_lines(data[:last_nl])
  256         return True
  257 
  258     def handle_read(self) -> None:
  259         """We got some output from a remote shell, this is one of the state
  260         machine"""
  261         if self.state == STATE_DEAD:
  262             return
  263         global nr_handle_read
  264         nr_handle_read += 1
  265         new_data = self._handle_read_chunk()
  266         if self.debug:
  267             self.print_debug(b'==> ' + new_data)
  268         if self.handle_read_fast_case(self.read_buffer):
  269             return
  270         lf_pos = new_data.find(b'\n')
  271         if lf_pos >= 0:
  272             # Optimization: we knew there were no '\n' in the previous read
  273             # buffer, so we searched only in the new_data and we offset the
  274             # found index by the length of the previous buffer
  275             lf_pos += len(self.read_buffer) - len(new_data)
  276         elif self.state is STATE_NOT_STARTED and \
  277                 options.password is not None and \
  278                 b'password:' in self.read_buffer.lower():
  279             self.dispatch_write('{}\n'.format(options.password).encode())
  280             self.read_buffer = b''
  281             return
  282         while lf_pos >= 0:
  283             # For each line in the buffer
  284             line = self.read_buffer[:lf_pos + 1]
  285             if callbacks.process(line):
  286                 pass
  287             elif self.state in (STATE_IDLE, STATE_RUNNING):
  288                 self.print_lines(line)
  289             elif self.state is STATE_NOT_STARTED:
  290                 self.read_in_state_not_started += line
  291                 if b'The authenticity of host' in line:
  292                     msg = line.strip(b'\n') + b' Closing connection.'
  293                     self.disconnect()
  294                 elif b'REMOTE HOST IDENTIFICATION HAS CHANGED' in line:
  295                     msg = b'Remote host identification has changed.'
  296                 else:
  297                     msg = None
  298 
  299                 if msg:
  300                     self.print_lines(msg + b' Consider manually connecting or '
  301                                      b'using ssh-keyscan.')
  302 
  303             # Go to the next line in the buffer
  304             self.read_buffer = self.read_buffer[lf_pos + 1:]
  305             if self.handle_read_fast_case(self.read_buffer):
  306                 return
  307             lf_pos = self.read_buffer.find(b'\n')
  308         if self.state is STATE_NOT_STARTED and not self.init_string_sent:
  309             self.dispatch_write(self.init_string)
  310             self.init_string_sent = True
  311 
  312     def print_unfinished_line(self) -> None:
  313         """The unfinished line stayed long enough in the buffer to be printed"""
  314         if self.state is STATE_RUNNING:
  315             if not callbacks.process(self.read_buffer):
  316                 self.print_lines(self.read_buffer)
  317             self.read_buffer = b''
  318 
  319     def writable(self) -> bool:
  320         """Do we want to write something?"""
  321         return (self.state != STATE_DEAD and
  322                 super().writable())
  323 
  324     def handle_write(self) -> None:
  325         """Let's write as much as we can"""
  326         num_sent = self.send(self.write_buffer)
  327         if self.debug:
  328             if self.state is not STATE_NOT_STARTED or options.password is None:
  329                 self.print_debug(b'<== ' + self.write_buffer[:num_sent])
  330         self.write_buffer = self.write_buffer[num_sent:]
  331 
  332     def print_debug(self, msg: bytes) -> None:
  333         """Log some debugging information to the console"""
  334         state = STATE_NAMES[self.state].encode()
  335         console_output(b'[dbg] ' + self.display_name.encode() + b'[' + state +
  336                        b']: ' + msg + b'\n')
  337 
  338     def get_info(self) -> List[bytes]:
  339         """Return a list with all information available about this process"""
  340         return [self.display_name.encode(),
  341                 self.enabled and b'enabled' or b'disabled',
  342                 STATE_NAMES[self.state].encode() + b':',
  343                 self.last_printed_line.strip()]
  344 
  345     def dispatch_write(self, buf: bytes) -> bool:
  346         """There is new stuff to write when possible"""
  347         if self.state != STATE_DEAD and self.enabled:
  348             super().dispatch_write(buf)
  349             return True
  350         return False
  351 
  352     def dispatch_command(self, command: bytes) -> None:
  353         if self.dispatch_write(command):
  354             self.change_state(STATE_RUNNING)
  355 
  356     def change_name(self, new_name: Optional[bytes]) -> None:
  357         """Change the name of the shell, possibly updating the maximum name
  358         length"""
  359         if not new_name:
  360             name = self.hostname
  361         else:
  362             name = new_name.decode()
  363         self.display_name = display_names.change(
  364             self.display_name, name)
  365 
  366     def rename(self, name: bytes) -> None:
  367         """Send to the remote shell, its new name to be shell expanded"""
  368         if name:
  369             # defug callback add?
  370             rename1, rename2 = callbacks.add(
  371                 b'rename', self.change_name, False)
  372             self.dispatch_command(b'/bin/echo "' + rename1 + b'""' + rename2 +
  373                                   b'"' + name + b'\n')
  374         else:
  375             self.change_name(self.hostname.encode())
  376 
  377     def close(self) -> None:
  378         display_names.change(self.display_name, None)
  379         super().close()