"Fossies" - the Fresh Open Source Software Archive 
Member "polysh-polysh-0.13/polysh/remote_dispatcher.py" (11 May 2020, 15075 Bytes) of package /linux/privat/polysh-polysh-0.13.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "remote_dispatcher.py" see the
Fossies "Dox" file reference documentation.
1 """Polysh - Remote Shell Dispatcher
2
3 Copyright (c) 2006 Guillaume Chazarain <guichaz@gmail.com>
4 Copyright (c) 2018 InnoGames GmbH
5 """
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import asyncore
20 import os
21 import pty
22 import signal
23 import sys
24 import termios
25 import select
26 import platform
27 from typing import Optional, List
28 from argparse import Namespace
29
30 from polysh.buffered_dispatcher import BufferedDispatcher
31 from polysh import callbacks
32 from polysh.console import console_output
33 from polysh import display_names
34
35 options = None # type: Optional[Namespace]
36
37 # Either the remote shell is expecting a command or one is already running
38 STATE_NAMES = ['not_started', 'idle', 'running', 'terminated', 'dead']
39
40 STATE_NOT_STARTED, \
41 STATE_IDLE, \
42 STATE_RUNNING, \
43 STATE_TERMINATED, \
44 STATE_DEAD = list(range(len(STATE_NAMES)))
45
46 # Terminal color codes
47 COLORS = [1] + list(range(30, 37))
48
49 # Count the total number of RemoteDispatcher.handle_read() invocations
50 nr_handle_read = 0
51
52
53 def main_loop_iteration(timeout: Optional[float] = None) -> int:
54 """Return the number of RemoteDispatcher.handle_read() calls made by this
55 iteration"""
56 prev_nr_read = nr_handle_read
57 asyncore.loop(count=1, timeout=timeout, use_poll=True)
58 return nr_handle_read - prev_nr_read
59
60
61 def log(msg: bytes) -> None:
62 if options.log_file:
63 fd = options.log_file.fileno()
64 while msg:
65 try:
66 written = os.write(fd, msg)
67 except OSError as e:
68 print('Exception while writing log:', options.log_file.name)
69 print(e)
70 raise asyncore.ExitNow(1)
71 msg = msg[written:]
72
73
74 class RemoteDispatcher(BufferedDispatcher):
75 """A RemoteDispatcher is a ssh process we communicate with"""
76
77 def __init__(self, hostname: str, port: str) -> None:
78 if port != "22":
79 port = "-p " + port
80 else:
81 port = ''
82
83 self.pid, fd = pty.fork()
84 if self.pid == 0:
85 # Child
86 self.launch_ssh(hostname, port)
87 sys.exit(1)
88
89 # Parent
90 super().__init__(fd)
91 self.temporary = False
92 self.hostname = hostname
93 self.port = port
94 self.debug = options.debug
95 self.enabled = True # shells can be enabled and disabled
96 self.state = STATE_NOT_STARTED
97 self.term_size = (-1, -1)
98 self.display_name = None # type: Optional[str]
99 self.change_name(self.hostname.encode())
100 self.init_string = self.configure_tty() + self.set_prompt()
101 self.init_string_sent = False
102 self.read_in_state_not_started = b''
103 self.command = options.command
104 self.last_printed_line = b''
105 self.color_code = None
106 if sys.stdout.isatty() and not options.disable_color:
107 COLORS.insert(0, COLORS.pop()) # Rotate the colors
108 self.color_code = COLORS[0]
109
110 def launch_ssh(self, name: str, port: str) -> None:
111 """Launch the ssh command in the child process"""
112 if options.user:
113 name = '%s@%s' % (options.user, name)
114 evaluated = options.ssh % {'host': name, 'port': port}
115 if evaluated == options.ssh:
116 evaluated = '%s %s' % (evaluated, name)
117 os.execlp('/bin/sh', 'sh', '-c', evaluated)
118
119 def set_enabled(self, enabled: bool) -> None:
120 if enabled != self.enabled and options.interactive:
121 # In non-interactive mode, remote processes leave as soon
122 # as they are terminated, but we don't want to break the
123 # indentation if all the remaining processes have short names.
124 display_names.set_enabled(self.display_name, enabled)
125 self.enabled = enabled
126
127 def change_state(self, state: int) -> None:
128 """Change the state of the remote process, logging the change"""
129 if state is not self.state:
130 if self.debug:
131 self.print_debug(b'state => ' + STATE_NAMES[state].encode())
132 if self.state is STATE_NOT_STARTED:
133 self.read_in_state_not_started = b''
134 self.state = state
135
136 def disconnect(self) -> None:
137 """We are no more interested in this remote process"""
138 try:
139 os.kill(-self.pid, signal.SIGKILL)
140 except OSError:
141 # The process was already dead, no problem
142 pass
143 self.read_buffer = b''
144 self.write_buffer = b''
145 self.set_enabled(False)
146 if self.read_in_state_not_started:
147 self.print_lines(self.read_in_state_not_started)
148 self.read_in_state_not_started = b''
149 if options.abort_error and self.state is STATE_NOT_STARTED:
150 raise asyncore.ExitNow(1)
151 self.change_state(STATE_DEAD)
152
153 def configure_tty(self) -> bytes:
154 """We don't want \n to be replaced with \r\n, and we disable the echo"""
155 attr = termios.tcgetattr(self.fd)
156 # The following raises a mypy warning, as python type hints don't allow
157 # per list item granularity. The last item in attr is List[bytes], but
158 # we don't access that here.
159 attr[1] &= ~termios.ONLCR # type: ignore # oflag
160 attr[3] &= ~termios.ECHO # type: ignore # lflag
161 termios.tcsetattr(self.fd, termios.TCSANOW, attr)
162 # unsetopt zle prevents Zsh from resetting the tty
163 return b'unsetopt zle 2> /dev/null;stty -echo -onlcr -ctlecho;'
164
165 def seen_prompt_cb(self, unused: str) -> None:
166 if options.interactive:
167 self.change_state(STATE_IDLE)
168 elif self.command:
169 p1, p2 = callbacks.add(b'real prompt ends', lambda d: None, True)
170 self.dispatch_command(b'PS1="' + p1 + b'""' + p2 + b'\n"\n')
171 self.dispatch_command(self.command.encode() + b'\n')
172 self.dispatch_command(b'exit 2>/dev/null\n')
173 self.command = None
174
175 def set_prompt(self) -> bytes:
176 """The prompt is important because we detect the readyness of a process
177 by waiting for its prompt."""
178 # No right prompt
179 command_line = b'PS2=;RPS1=;RPROMPT=;'
180 command_line += b'PROMPT_COMMAND=;'
181 command_line += b'TERM=ansi;'
182 command_line += b'unset precmd_functions;'
183 command_line += b'unset HISTFILE;'
184 prompt1, prompt2 = callbacks.add(b'prompt', self.seen_prompt_cb, True)
185 command_line += b'PS1="' + prompt1 + b'""' + prompt2 + b'\n"\n'
186 return command_line
187
188 def readable(self) -> bool:
189 """We are always interested in reading from active remote processes if
190 the buffer is OK"""
191 return (self.state != STATE_DEAD and
192 super().readable())
193
194 def handle_expt(self) -> None:
195 # Dirty hack to ignore POLLPRI flag that is raised on Mac OS, but not
196 # on linux. asyncore calls this method in case POLLPRI flag is set, but
197 # self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) == 0
198 if platform.system() == 'Darwin' and select.POLLPRI:
199 return
200
201 self.handle_close()
202
203 def handle_close(self) -> None:
204 if self.state is STATE_DEAD:
205 # This connection has already been killed. Asyncore has probably
206 # called handle_close() or handle_expt() on this connection twice.
207 return
208
209 pid, status = os.waitpid(self.pid, 0)
210 exit_code = os.WEXITSTATUS(status)
211 options.exit_code = max(options.exit_code, exit_code)
212 if exit_code and options.interactive:
213 console_output('Error talking to {}\n'.format(
214 self.display_name).encode())
215 self.disconnect()
216 if self.temporary:
217 self.close()
218
219 def print_lines(self, lines: bytes) -> None:
220 from polysh.display_names import max_display_name_length
221 lines = lines.strip(b'\n')
222 while True:
223 no_empty_lines = lines.replace(b'\n\n', b'\n')
224 if len(no_empty_lines) == len(lines):
225 break
226 lines = no_empty_lines
227 if not lines:
228 return
229 indent = max_display_name_length - len(self.display_name)
230 log_prefix = self.display_name.encode() + indent * b' ' + b' : '
231 if self.color_code is None:
232 console_prefix = log_prefix
233 else:
234 console_prefix = (b'\033[1;' + str(self.color_code).encode() +
235 b'm' + log_prefix + b'\033[1;m')
236 console_data = (console_prefix +
237 lines.replace(b'\n', b'\n' + console_prefix) + b'\n')
238 log_data = (log_prefix +
239 lines.replace(b'\n', b'\n' + log_prefix) + b'\n')
240 console_output(console_data, logging_msg=log_data)
241 self.last_printed_line = lines[lines.rfind(b'\n') + 1:]
242
243 def handle_read_fast_case(self, data: bytes) -> bool:
244 """If we are in a fast case we'll avoid the long processing of each
245 line"""
246 if self.state is not STATE_RUNNING or callbacks.any_in(data):
247 # Slow case :-(
248 return False
249
250 last_nl = data.rfind(b'\n')
251 if last_nl == -1:
252 # No '\n' in data => slow case
253 return False
254 self.read_buffer = data[last_nl + 1:]
255 self.print_lines(data[:last_nl])
256 return True
257
258 def handle_read(self) -> None:
259 """We got some output from a remote shell, this is one of the state
260 machine"""
261 if self.state == STATE_DEAD:
262 return
263 global nr_handle_read
264 nr_handle_read += 1
265 new_data = self._handle_read_chunk()
266 if self.debug:
267 self.print_debug(b'==> ' + new_data)
268 if self.handle_read_fast_case(self.read_buffer):
269 return
270 lf_pos = new_data.find(b'\n')
271 if lf_pos >= 0:
272 # Optimization: we knew there were no '\n' in the previous read
273 # buffer, so we searched only in the new_data and we offset the
274 # found index by the length of the previous buffer
275 lf_pos += len(self.read_buffer) - len(new_data)
276 elif self.state is STATE_NOT_STARTED and \
277 options.password is not None and \
278 b'password:' in self.read_buffer.lower():
279 self.dispatch_write('{}\n'.format(options.password).encode())
280 self.read_buffer = b''
281 return
282 while lf_pos >= 0:
283 # For each line in the buffer
284 line = self.read_buffer[:lf_pos + 1]
285 if callbacks.process(line):
286 pass
287 elif self.state in (STATE_IDLE, STATE_RUNNING):
288 self.print_lines(line)
289 elif self.state is STATE_NOT_STARTED:
290 self.read_in_state_not_started += line
291 if b'The authenticity of host' in line:
292 msg = line.strip(b'\n') + b' Closing connection.'
293 self.disconnect()
294 elif b'REMOTE HOST IDENTIFICATION HAS CHANGED' in line:
295 msg = b'Remote host identification has changed.'
296 else:
297 msg = None
298
299 if msg:
300 self.print_lines(msg + b' Consider manually connecting or '
301 b'using ssh-keyscan.')
302
303 # Go to the next line in the buffer
304 self.read_buffer = self.read_buffer[lf_pos + 1:]
305 if self.handle_read_fast_case(self.read_buffer):
306 return
307 lf_pos = self.read_buffer.find(b'\n')
308 if self.state is STATE_NOT_STARTED and not self.init_string_sent:
309 self.dispatch_write(self.init_string)
310 self.init_string_sent = True
311
312 def print_unfinished_line(self) -> None:
313 """The unfinished line stayed long enough in the buffer to be printed"""
314 if self.state is STATE_RUNNING:
315 if not callbacks.process(self.read_buffer):
316 self.print_lines(self.read_buffer)
317 self.read_buffer = b''
318
319 def writable(self) -> bool:
320 """Do we want to write something?"""
321 return (self.state != STATE_DEAD and
322 super().writable())
323
324 def handle_write(self) -> None:
325 """Let's write as much as we can"""
326 num_sent = self.send(self.write_buffer)
327 if self.debug:
328 if self.state is not STATE_NOT_STARTED or options.password is None:
329 self.print_debug(b'<== ' + self.write_buffer[:num_sent])
330 self.write_buffer = self.write_buffer[num_sent:]
331
332 def print_debug(self, msg: bytes) -> None:
333 """Log some debugging information to the console"""
334 state = STATE_NAMES[self.state].encode()
335 console_output(b'[dbg] ' + self.display_name.encode() + b'[' + state +
336 b']: ' + msg + b'\n')
337
338 def get_info(self) -> List[bytes]:
339 """Return a list with all information available about this process"""
340 return [self.display_name.encode(),
341 self.enabled and b'enabled' or b'disabled',
342 STATE_NAMES[self.state].encode() + b':',
343 self.last_printed_line.strip()]
344
345 def dispatch_write(self, buf: bytes) -> bool:
346 """There is new stuff to write when possible"""
347 if self.state != STATE_DEAD and self.enabled:
348 super().dispatch_write(buf)
349 return True
350 return False
351
352 def dispatch_command(self, command: bytes) -> None:
353 if self.dispatch_write(command):
354 self.change_state(STATE_RUNNING)
355
356 def change_name(self, new_name: Optional[bytes]) -> None:
357 """Change the name of the shell, possibly updating the maximum name
358 length"""
359 if not new_name:
360 name = self.hostname
361 else:
362 name = new_name.decode()
363 self.display_name = display_names.change(
364 self.display_name, name)
365
366 def rename(self, name: bytes) -> None:
367 """Send to the remote shell, its new name to be shell expanded"""
368 if name:
369 # defug callback add?
370 rename1, rename2 = callbacks.add(
371 b'rename', self.change_name, False)
372 self.dispatch_command(b'/bin/echo "' + rename1 + b'""' + rename2 +
373 b'"' + name + b'\n')
374 else:
375 self.change_name(self.hostname.encode())
376
377 def close(self) -> None:
378 display_names.change(self.display_name, None)
379 super().close()