"Fossies" - the Fresh Open Source Software Archive 
Member "polysh-polysh-0.13/polysh/control_commands.py" (11 May 2020, 7524 Bytes) of package /linux/privat/polysh-polysh-0.13.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "control_commands.py" see the
Fossies "Dox" file reference documentation.
1 """Polysh - Control Commands
2
3 The control commands are documented on the README.
4
5 Copyright (c) 2006 Guillaume Chazarain <guichaz@gmail.com>
6 Copyright (c) 2018 InnoGames GmbH
7 """
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 2 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21 import asyncore
22 import os
23 import shlex
24 from typing import List
25
26 from polysh.control_commands_helpers import (
27 complete_shells,
28 expand_local_path,
29 selected_shells,
30 toggle_shells,
31 )
32 from polysh.completion import complete_local_path, add_to_history
33 from polysh.console import console_output
34 from polysh import dispatchers
35 from polysh import remote_dispatcher
36 from polysh import stdin
37
38
39 def complete_list(line: str, text: str) -> List[str]:
40 return complete_shells(line, text)
41
42
43 def do_list(command: str) -> None:
44 instances = [i.get_info() for i in selected_shells(command)]
45 flat_instances = dispatchers.format_info(instances)
46 console_output(b''.join(flat_instances))
47
48
49 def do_quit(command: str) -> None:
50 raise asyncore.ExitNow(0)
51
52
53 def complete_chdir(line: str, text: str) -> List[str]:
54 return list(filter(os.path.isdir, complete_local_path(text)))
55
56
57 def do_chdir(command: str) -> None:
58 try:
59 os.chdir(expand_local_path(command.strip()))
60 except OSError as e:
61 console_output('{}\n'.format(str(e)).encode())
62
63
64 def complete_send_ctrl(line: str, text: str) -> List[str]:
65 if len(line[:-1].split()) >= 2:
66 # Control letter already given in command line
67 return complete_shells(line, text, lambda i: i.enabled)
68 if text in ('c', 'd', 'z'):
69 return [text + ' ']
70 return ['c ', 'd ', 'z ']
71
72
73 def do_send_ctrl(command: str) -> None:
74 split = command.split()
75 if not split:
76 console_output(b'Expected at least a letter\n')
77 return
78 letter = split[0]
79 if len(letter) != 1:
80 console_output('Expected a single letter, got: {}\n'.format(
81 letter).encode())
82 return
83 control_letter = chr(ord(letter.lower()) - ord('a') + 1)
84 for i in selected_shells(' '.join(split[1:])):
85 if i.enabled:
86 i.dispatch_write(control_letter.encode())
87
88
89 def complete_reset_prompt(line: str, text: str) -> List[str]:
90 return complete_shells(line, text, lambda i: i.enabled)
91
92
93 def do_reset_prompt(command: str) -> None:
94 for i in selected_shells(command):
95 i.dispatch_command(i.init_string)
96
97
98 def complete_enable(line: str, text: str) -> List[str]:
99 return complete_shells(line, text, lambda i:
100 i.state != remote_dispatcher.STATE_DEAD)
101
102
103 def do_enable(command: str) -> None:
104 toggle_shells(command, True)
105
106
107 def complete_disable(line: str, text: str) -> List[str]:
108 return complete_shells(line, text, lambda i:
109 i.state != remote_dispatcher.STATE_DEAD)
110
111
112
113 def do_disable(command: str) -> None:
114 toggle_shells(command, False)
115
116
117 def complete_reconnect(line: str, text: str) -> List[str]:
118 return complete_shells(line, text, lambda i:
119 i.state == remote_dispatcher.STATE_DEAD)
120
121
122 def do_reconnect(command: str) -> None:
123 selec = selected_shells(command)
124 to_reconnect = [i for i in selec if i.state ==
125 remote_dispatcher.STATE_DEAD]
126 for i in to_reconnect:
127 i.disconnect()
128 i.close()
129
130 hosts = [i.hostname for i in to_reconnect]
131 dispatchers.create_remote_dispatchers(hosts)
132
133
134 def do_add(command: str) -> None:
135 dispatchers.create_remote_dispatchers(command.split())
136
137
138 def complete_purge(line: str, text: str) -> List[str]:
139 return complete_shells(line, text, lambda i: not i.enabled)
140
141
142 def do_purge(command: str) -> None:
143 to_delete = []
144 for i in selected_shells(command):
145 if not i.enabled:
146 to_delete.append(i)
147 for i in to_delete:
148 i.disconnect()
149 i.close()
150
151
152 def do_rename(command: str) -> None:
153 for i in dispatchers.all_instances():
154 if i.enabled:
155 i.rename(command.encode())
156
157
158 def do_hide_password(command: str) -> None:
159 warned = False
160 for i in dispatchers.all_instances():
161 if i.enabled and i.debug:
162 i.debug = False
163 if not warned:
164 console_output(b'Debugging disabled to avoid displaying '
165 b'passwords\n')
166 warned = True
167 stdin.set_echo(False)
168
169 if remote_dispatcher.options.log_file:
170 console_output(b'Logging disabled to avoid writing passwords\n')
171 remote_dispatcher.options.log_file = None
172
173
174 def complete_set_debug(line: str, text: str) -> List[str]:
175 if len(line[:-1].split()) >= 2:
176 # Debug value already given in command line
177 return complete_shells(line, text)
178 if text.lower() in ('y', 'n'):
179 return [text + ' ']
180 return ['y ', 'n ']
181
182
183 def do_set_debug(command: str) -> None:
184 split = command.split()
185 if not split:
186 console_output(b'Expected at least a letter\n')
187 return
188 letter = split[0].lower()
189 if letter not in ('y', 'n'):
190 console_output("Expected 'y' or 'n', got: {}\n".format(
191 split[0]).encode())
192 return
193 debug = letter == 'y'
194 for i in selected_shells(' '.join(split[1:])):
195 i.debug = debug
196
197
198 def do_export_vars(command: str) -> None:
199 rank = 0
200 for shell in dispatchers.all_instances():
201 if shell.enabled:
202 environment_variables = {
203 'POLYSH_RANK': str(rank),
204 'POLYSH_NAME': shell.hostname,
205 'POLYSH_DISPLAY_NAME': shell.display_name,
206 }
207 for name, value in environment_variables.items():
208 shell.dispatch_command('export {}={}\n'.format(
209 name, shlex.quote(value)).encode())
210 rank += 1
211
212 for shell in dispatchers.all_instances():
213 if shell.enabled:
214 shell.dispatch_command('export POLYSH_NR_SHELLS={:d}\n'.format(
215 rank).encode())
216
217
218 add_to_history('$POLYSH_RANK $POLYSH_NAME $POLYSH_DISPLAY_NAME')
219 add_to_history('$POLYSH_NR_SHELLS')
220
221
222 def complete_set_log(line: str, text: str) -> List[str]:
223 return complete_local_path(text)
224
225
226 def do_set_log(command: str) -> None:
227 command = command.strip()
228 if command:
229 try:
230 remote_dispatcher.options.log_file = open(command, 'a')
231 except IOError as e:
232 console_output('{}\n'.format(str(e)).encode())
233 command = None
234 if not command:
235 remote_dispatcher.options.log_file = None
236 console_output(b'Logging disabled\n')
237
238
239 def complete_show_read_buffer(line: str, text: str) -> List[str]:
240 return complete_shells(line, text, lambda i: i.read_buffer or
241 i.read_in_state_not_started)
242
243
244 def do_show_read_buffer(command: str) -> None:
245 for i in selected_shells(command):
246 if i.read_in_state_not_started:
247 i.print_lines(i.read_in_state_not_started)
248 i.read_in_state_not_started = b''