"Fossies" - the Fresh Open Source Software Archive

Member "polysh-polysh-0.13/polysh/control_commands.py" (11 May 2020, 7524 Bytes) of package /linux/privat/polysh-polysh-0.13.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "control_commands.py" see the Fossies "Dox" file reference documentation.

    1 """Polysh - Control Commands
    2 
    3 The control commands are documented on the README.
    4 
    5 Copyright (c) 2006 Guillaume Chazarain <guichaz@gmail.com>
    6 Copyright (c) 2018 InnoGames GmbH
    7 """
    8 # This program is free software: you can redistribute it and/or modify
    9 # it under the terms of the GNU General Public License as published by
   10 # the Free Software Foundation, either version 2 of the License, or
   11 # (at your option) any later version.
   12 #
   13 # This program is distributed in the hope that it will be useful,
   14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   16 # GNU General Public License for more details.
   17 #
   18 # You should have received a copy of the GNU General Public License
   19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
   20 
   21 import asyncore
   22 import os
   23 import shlex
   24 from typing import List
   25 
   26 from polysh.control_commands_helpers import (
   27     complete_shells,
   28     expand_local_path,
   29     selected_shells,
   30     toggle_shells,
   31 )
   32 from polysh.completion import complete_local_path, add_to_history
   33 from polysh.console import console_output
   34 from polysh import dispatchers
   35 from polysh import remote_dispatcher
   36 from polysh import stdin
   37 
   38 
   39 def complete_list(line: str, text: str) -> List[str]:
   40     return complete_shells(line, text)
   41 
   42 
   43 def do_list(command: str) -> None:
   44     instances = [i.get_info() for i in selected_shells(command)]
   45     flat_instances = dispatchers.format_info(instances)
   46     console_output(b''.join(flat_instances))
   47 
   48 
   49 def do_quit(command: str) -> None:
   50     raise asyncore.ExitNow(0)
   51 
   52 
   53 def complete_chdir(line: str, text: str) -> List[str]:
   54     return list(filter(os.path.isdir, complete_local_path(text)))
   55 
   56 
   57 def do_chdir(command: str) -> None:
   58     try:
   59         os.chdir(expand_local_path(command.strip()))
   60     except OSError as e:
   61         console_output('{}\n'.format(str(e)).encode())
   62 
   63 
   64 def complete_send_ctrl(line: str, text: str) -> List[str]:
   65     if len(line[:-1].split()) >= 2:
   66         # Control letter already given in command line
   67         return complete_shells(line, text, lambda i: i.enabled)
   68     if text in ('c', 'd', 'z'):
   69         return [text + ' ']
   70     return ['c ', 'd ', 'z ']
   71 
   72 
   73 def do_send_ctrl(command: str) -> None:
   74     split = command.split()
   75     if not split:
   76         console_output(b'Expected at least a letter\n')
   77         return
   78     letter = split[0]
   79     if len(letter) != 1:
   80         console_output('Expected a single letter, got: {}\n'.format(
   81             letter).encode())
   82         return
   83     control_letter = chr(ord(letter.lower()) - ord('a') + 1)
   84     for i in selected_shells(' '.join(split[1:])):
   85         if i.enabled:
   86             i.dispatch_write(control_letter.encode())
   87 
   88 
   89 def complete_reset_prompt(line: str, text: str) -> List[str]:
   90     return complete_shells(line, text, lambda i: i.enabled)
   91 
   92 
   93 def do_reset_prompt(command: str) -> None:
   94     for i in selected_shells(command):
   95         i.dispatch_command(i.init_string)
   96 
   97 
   98 def complete_enable(line: str, text: str) -> List[str]:
   99     return complete_shells(line, text, lambda i:
  100                            i.state != remote_dispatcher.STATE_DEAD)
  101 
  102 
  103 def do_enable(command: str) -> None:
  104     toggle_shells(command, True)
  105 
  106 
  107 def complete_disable(line: str, text: str) -> List[str]:
  108     return complete_shells(line, text, lambda i:
  109                            i.state != remote_dispatcher.STATE_DEAD)
  110 
  111 
  112 
  113 def do_disable(command: str) -> None:
  114     toggle_shells(command, False)
  115 
  116 
  117 def complete_reconnect(line: str, text: str) -> List[str]:
  118     return complete_shells(line, text, lambda i:
  119                            i.state == remote_dispatcher.STATE_DEAD)
  120 
  121 
  122 def do_reconnect(command: str) -> None:
  123     selec = selected_shells(command)
  124     to_reconnect = [i for i in selec if i.state ==
  125                     remote_dispatcher.STATE_DEAD]
  126     for i in to_reconnect:
  127         i.disconnect()
  128         i.close()
  129 
  130     hosts = [i.hostname for i in to_reconnect]
  131     dispatchers.create_remote_dispatchers(hosts)
  132 
  133 
  134 def do_add(command: str) -> None:
  135     dispatchers.create_remote_dispatchers(command.split())
  136 
  137 
  138 def complete_purge(line: str, text: str) -> List[str]:
  139     return complete_shells(line, text, lambda i: not i.enabled)
  140 
  141 
  142 def do_purge(command: str) -> None:
  143     to_delete = []
  144     for i in selected_shells(command):
  145         if not i.enabled:
  146             to_delete.append(i)
  147     for i in to_delete:
  148         i.disconnect()
  149         i.close()
  150 
  151 
  152 def do_rename(command: str) -> None:
  153     for i in dispatchers.all_instances():
  154         if i.enabled:
  155             i.rename(command.encode())
  156 
  157 
  158 def do_hide_password(command: str) -> None:
  159     warned = False
  160     for i in dispatchers.all_instances():
  161         if i.enabled and i.debug:
  162             i.debug = False
  163             if not warned:
  164                 console_output(b'Debugging disabled to avoid displaying '
  165                                b'passwords\n')
  166                 warned = True
  167     stdin.set_echo(False)
  168 
  169     if remote_dispatcher.options.log_file:
  170         console_output(b'Logging disabled to avoid writing passwords\n')
  171         remote_dispatcher.options.log_file = None
  172 
  173 
  174 def complete_set_debug(line: str, text: str) -> List[str]:
  175     if len(line[:-1].split()) >= 2:
  176         # Debug value already given in command line
  177         return complete_shells(line, text)
  178     if text.lower() in ('y', 'n'):
  179         return [text + ' ']
  180     return ['y ', 'n ']
  181 
  182 
  183 def do_set_debug(command: str) -> None:
  184     split = command.split()
  185     if not split:
  186         console_output(b'Expected at least a letter\n')
  187         return
  188     letter = split[0].lower()
  189     if letter not in ('y', 'n'):
  190         console_output("Expected 'y' or 'n', got: {}\n".format(
  191             split[0]).encode())
  192         return
  193     debug = letter == 'y'
  194     for i in selected_shells(' '.join(split[1:])):
  195         i.debug = debug
  196 
  197 
  198 def do_export_vars(command: str) -> None:
  199     rank = 0
  200     for shell in dispatchers.all_instances():
  201         if shell.enabled:
  202             environment_variables = {
  203                 'POLYSH_RANK': str(rank),
  204                 'POLYSH_NAME': shell.hostname,
  205                 'POLYSH_DISPLAY_NAME': shell.display_name,
  206             }
  207             for name, value in environment_variables.items():
  208                 shell.dispatch_command('export {}={}\n'.format(
  209                     name, shlex.quote(value)).encode())
  210             rank += 1
  211 
  212     for shell in dispatchers.all_instances():
  213         if shell.enabled:
  214             shell.dispatch_command('export POLYSH_NR_SHELLS={:d}\n'.format(
  215                 rank).encode())
  216 
  217 
  218 add_to_history('$POLYSH_RANK $POLYSH_NAME $POLYSH_DISPLAY_NAME')
  219 add_to_history('$POLYSH_NR_SHELLS')
  220 
  221 
  222 def complete_set_log(line: str, text: str) -> List[str]:
  223     return complete_local_path(text)
  224 
  225 
  226 def do_set_log(command: str) -> None:
  227     command = command.strip()
  228     if command:
  229         try:
  230             remote_dispatcher.options.log_file = open(command, 'a')
  231         except IOError as e:
  232             console_output('{}\n'.format(str(e)).encode())
  233             command = None
  234     if not command:
  235         remote_dispatcher.options.log_file = None
  236         console_output(b'Logging disabled\n')
  237 
  238 
  239 def complete_show_read_buffer(line: str, text: str) -> List[str]:
  240     return complete_shells(line, text, lambda i: i.read_buffer or
  241                            i.read_in_state_not_started)
  242 
  243 
  244 def do_show_read_buffer(command: str) -> None:
  245     for i in selected_shells(command):
  246         if i.read_in_state_not_started:
  247             i.print_lines(i.read_in_state_not_started)
  248             i.read_in_state_not_started = b''