"Fossies" - the Fresh Open Source Software Archive 
Member "salt-3002.2/tests/unit/client/test_ssh.py" (18 Nov 2020, 22561 Bytes) of package /linux/misc/salt-3002.2.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
See also the latest
Fossies "Diffs" side-by-side code changes report for "test_ssh.py":
3002.1_vs_3002.2.
1 """
2 :codeauthor: :email:`Daniel Wallace <dwallace@saltstack.com`
3 """
4
5
6 import os
7 import re
8 import shutil
9 import tempfile
10
11 import salt.config
12 import salt.roster
13 import salt.utils.files
14 import salt.utils.path
15 import salt.utils.thin
16 import salt.utils.yaml
17 from salt.client import ssh
18 from tests.support.case import ShellCase
19 from tests.support.helpers import slowTest
20 from tests.support.mock import MagicMock, call, patch
21 from tests.support.runtests import RUNTIME_VARS
22 from tests.support.unit import TestCase, skipIf
23
24
25 @skipIf(not salt.utils.path.which("ssh"), "No ssh binary found in path")
26 class SSHPasswordTests(ShellCase):
27 @slowTest
28 def test_password_failure(self):
29 """
30 Check password failures when trying to deploy keys
31 """
32 opts = salt.config.client_config(self.get_config_file_path("master"))
33 opts["list_hosts"] = False
34 opts["argv"] = ["test.ping"]
35 opts["selected_target_option"] = "glob"
36 opts["tgt"] = "localhost"
37 opts["arg"] = []
38 roster = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "roster")
39 handle_ssh_ret = [
40 {
41 "localhost": {
42 "retcode": 255,
43 "stderr": "Permission denied (publickey).\r\n",
44 "stdout": "",
45 }
46 },
47 ]
48 expected = {"localhost": "Permission denied (publickey)"}
49 display_output = MagicMock()
50 with patch(
51 "salt.roster.get_roster_file", MagicMock(return_value=roster)
52 ), patch(
53 "salt.client.ssh.SSH.handle_ssh", MagicMock(return_value=handle_ssh_ret)
54 ), patch(
55 "salt.client.ssh.SSH.key_deploy", MagicMock(return_value=expected)
56 ), patch(
57 "salt.output.display_output", display_output
58 ):
59 client = ssh.SSH(opts)
60 ret = next(client.run_iter())
61 with self.assertRaises(SystemExit):
62 client.run()
63 display_output.assert_called_once_with(expected, "nested", opts)
64 self.assertIs(ret, handle_ssh_ret[0])
65
66
67 @skipIf(not salt.utils.path.which("ssh"), "No ssh binary found in path")
68 class SSHReturnEventTests(ShellCase):
69 def test_not_missing_fun_calling_wfuncs(self):
70 opts = salt.config.client_config(self.get_config_file_path("master"))
71 opts["list_hosts"] = False
72 opts["argv"] = ["state.show_highstate"]
73 opts["selected_target_option"] = "glob"
74 opts["tgt"] = "localhost"
75 opts["arg"] = []
76 roster = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "roster")
77 handle_ssh_ret = [
78 {"localhost": {}},
79 ]
80
81 expected = {"localhost": {}}
82 display_output = MagicMock()
83 with patch(
84 "salt.roster.get_roster_file", MagicMock(return_value=roster)
85 ), patch(
86 "salt.client.ssh.SSH.handle_ssh", MagicMock(return_value=handle_ssh_ret)
87 ), patch(
88 "salt.client.ssh.SSH.key_deploy", MagicMock(return_value=expected)
89 ), patch(
90 "salt.output.display_output", display_output
91 ):
92 client = ssh.SSH(opts)
93 client.event = MagicMock()
94 ret = next(client.run_iter())
95 assert "localhost" in ret
96 assert "fun" in ret["localhost"]
97 client.run()
98 display_output.assert_called_once_with(expected, "nested", opts)
99 self.assertIs(ret, handle_ssh_ret[0])
100 assert len(client.event.fire_event.call_args_list) == 2
101 assert "fun" in client.event.fire_event.call_args_list[0][0][0]
102 assert "fun" in client.event.fire_event.call_args_list[1][0][0]
103
104
105 class SSHRosterDefaults(TestCase):
106 def setUp(self):
107 self.roster = """
108 localhost:
109 host: 127.0.0.1
110 port: 2827
111 self:
112 host: 0.0.0.0
113 port: 42
114 """
115
116 def test_roster_defaults_flat(self):
117 """
118 Test Roster Defaults on the flat roster
119 """
120 tempdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
121 expected = {
122 "self": {"host": "0.0.0.0", "user": "daniel", "port": 42},
123 "localhost": {"host": "127.0.0.1", "user": "daniel", "port": 2827},
124 }
125 try:
126 root_dir = os.path.join(tempdir, "foo", "bar")
127 os.makedirs(root_dir)
128 fpath = os.path.join(root_dir, "config")
129 with salt.utils.files.fopen(fpath, "w") as fp_:
130 fp_.write(
131 """
132 roster_defaults:
133 user: daniel
134 """
135 )
136 opts = salt.config.master_config(fpath)
137 with patch(
138 "salt.roster.get_roster_file", MagicMock(return_value=self.roster)
139 ):
140 with patch(
141 "salt.template.compile_template",
142 MagicMock(return_value=salt.utils.yaml.safe_load(self.roster)),
143 ):
144 roster = salt.roster.Roster(opts=opts)
145 self.assertEqual(roster.targets("*", "glob"), expected)
146 finally:
147 if os.path.isdir(tempdir):
148 shutil.rmtree(tempdir)
149
150
151 class SSHSingleTests(TestCase):
152 def setUp(self):
153 self.tmp_cachedir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
154 self.argv = [
155 "ssh.set_auth_key",
156 "root",
157 "hobn+amNAXSBTiOXEqlBjGB...rsa root@master",
158 ]
159 self.opts = {
160 "argv": self.argv,
161 "__role": "master",
162 "cachedir": self.tmp_cachedir,
163 "extension_modules": os.path.join(self.tmp_cachedir, "extmods"),
164 }
165 self.target = {
166 "passwd": "abc123",
167 "ssh_options": None,
168 "sudo": False,
169 "identities_only": False,
170 "host": "login1",
171 "user": "root",
172 "timeout": 65,
173 "remote_port_forwards": None,
174 "sudo_user": "",
175 "port": "22",
176 "priv": "/etc/salt/pki/master/ssh/salt-ssh.rsa",
177 }
178
179 def test_single_opts(self):
180 """ Sanity check for ssh.Single options
181 """
182
183 single = ssh.Single(
184 self.opts,
185 self.opts["argv"],
186 "localhost",
187 mods={},
188 fsclient=None,
189 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
190 mine=False,
191 **self.target
192 )
193
194 self.assertEqual(single.shell._ssh_opts(), "")
195 self.assertEqual(
196 single.shell._cmd_str("date +%s"),
197 "ssh login1 "
198 "-o KbdInteractiveAuthentication=no -o "
199 "PasswordAuthentication=yes -o ConnectTimeout=65 -o Port=22 "
200 "-o IdentityFile=/etc/salt/pki/master/ssh/salt-ssh.rsa "
201 "-o User=root date +%s",
202 )
203
204 def test_run_with_pre_flight(self):
205 """
206 test Single.run() when ssh_pre_flight is set
207 and script successfully runs
208 """
209 target = self.target.copy()
210 target["ssh_pre_flight"] = os.path.join(RUNTIME_VARS.TMP, "script.sh")
211 single = ssh.Single(
212 self.opts,
213 self.opts["argv"],
214 "localhost",
215 mods={},
216 fsclient=None,
217 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
218 mine=False,
219 **target
220 )
221
222 cmd_ret = ("Success", "", 0)
223 mock_flight = MagicMock(return_value=cmd_ret)
224 mock_cmd = MagicMock(return_value=cmd_ret)
225 patch_flight = patch("salt.client.ssh.Single.run_ssh_pre_flight", mock_flight)
226 patch_cmd = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
227 patch_exec_cmd = patch(
228 "salt.client.ssh.shell.Shell.exec_cmd", return_value=("", "", 1)
229 )
230 patch_os = patch("os.path.exists", side_effect=[True])
231
232 with patch_os, patch_flight, patch_cmd, patch_exec_cmd:
233 ret = single.run()
234 mock_cmd.assert_called()
235 mock_flight.assert_called()
236 assert ret == cmd_ret
237
238 def test_run_with_pre_flight_stderr(self):
239 """
240 test Single.run() when ssh_pre_flight is set
241 and script errors when run
242 """
243 target = self.target.copy()
244 target["ssh_pre_flight"] = os.path.join(RUNTIME_VARS.TMP, "script.sh")
245 single = ssh.Single(
246 self.opts,
247 self.opts["argv"],
248 "localhost",
249 mods={},
250 fsclient=None,
251 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
252 mine=False,
253 **target
254 )
255
256 cmd_ret = ("", "Error running script", 1)
257 mock_flight = MagicMock(return_value=cmd_ret)
258 mock_cmd = MagicMock(return_value=cmd_ret)
259 patch_flight = patch("salt.client.ssh.Single.run_ssh_pre_flight", mock_flight)
260 patch_cmd = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
261 patch_exec_cmd = patch(
262 "salt.client.ssh.shell.Shell.exec_cmd", return_value=("", "", 1)
263 )
264 patch_os = patch("os.path.exists", side_effect=[True])
265
266 with patch_os, patch_flight, patch_cmd, patch_exec_cmd:
267 ret = single.run()
268 mock_cmd.assert_not_called()
269 mock_flight.assert_called()
270 assert ret == cmd_ret
271
272 def test_run_with_pre_flight_script_doesnot_exist(self):
273 """
274 test Single.run() when ssh_pre_flight is set
275 and the script does not exist
276 """
277 target = self.target.copy()
278 target["ssh_pre_flight"] = os.path.join(RUNTIME_VARS.TMP, "script.sh")
279 single = ssh.Single(
280 self.opts,
281 self.opts["argv"],
282 "localhost",
283 mods={},
284 fsclient=None,
285 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
286 mine=False,
287 **target
288 )
289
290 cmd_ret = ("Success", "", 0)
291 mock_flight = MagicMock(return_value=cmd_ret)
292 mock_cmd = MagicMock(return_value=cmd_ret)
293 patch_flight = patch("salt.client.ssh.Single.run_ssh_pre_flight", mock_flight)
294 patch_cmd = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
295 patch_exec_cmd = patch(
296 "salt.client.ssh.shell.Shell.exec_cmd", return_value=("", "", 1)
297 )
298 patch_os = patch("os.path.exists", side_effect=[False])
299
300 with patch_os, patch_flight, patch_cmd, patch_exec_cmd:
301 ret = single.run()
302 mock_cmd.assert_called()
303 mock_flight.assert_not_called()
304 assert ret == cmd_ret
305
306 def test_run_with_pre_flight_thin_dir_exists(self):
307 """
308 test Single.run() when ssh_pre_flight is set
309 and thin_dir already exists
310 """
311 target = self.target.copy()
312 target["ssh_pre_flight"] = os.path.join(RUNTIME_VARS.TMP, "script.sh")
313 single = ssh.Single(
314 self.opts,
315 self.opts["argv"],
316 "localhost",
317 mods={},
318 fsclient=None,
319 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
320 mine=False,
321 **target
322 )
323
324 cmd_ret = ("", "", 0)
325 mock_flight = MagicMock(return_value=cmd_ret)
326 mock_cmd = MagicMock(return_value=cmd_ret)
327 patch_flight = patch("salt.client.ssh.Single.run_ssh_pre_flight", mock_flight)
328 patch_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_cmd)
329 patch_cmd_block = patch("salt.client.ssh.Single.cmd_block", mock_cmd)
330 patch_os = patch("os.path.exists", return_value=True)
331
332 with patch_os, patch_flight, patch_cmd, patch_cmd_block:
333 ret = single.run()
334 mock_cmd.assert_called()
335 mock_flight.assert_not_called()
336 assert ret == cmd_ret
337
338 def test_execute_script(self):
339 """
340 test Single.execute_script()
341 """
342 single = ssh.Single(
343 self.opts,
344 self.opts["argv"],
345 "localhost",
346 mods={},
347 fsclient=None,
348 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
349 mine=False,
350 winrm=False,
351 **self.target
352 )
353
354 exp_ret = ("Success", "", 0)
355 mock_cmd = MagicMock(return_value=exp_ret)
356 patch_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_cmd)
357 script = os.path.join(RUNTIME_VARS.TMP, "script.sh")
358
359 with patch_cmd:
360 ret = single.execute_script(script=script)
361 assert ret == exp_ret
362 assert mock_cmd.call_count == 2
363 assert [
364 call("/bin/sh '{}'".format(script)),
365 call("rm '{}'".format(script)),
366 ] == mock_cmd.call_args_list
367
368 def test_shim_cmd(self):
369 """
370 test Single.shim_cmd()
371 """
372 single = ssh.Single(
373 self.opts,
374 self.opts["argv"],
375 "localhost",
376 mods={},
377 fsclient=None,
378 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
379 mine=False,
380 winrm=False,
381 tty=True,
382 **self.target
383 )
384
385 exp_ret = ("Success", "", 0)
386 mock_cmd = MagicMock(return_value=exp_ret)
387 patch_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_cmd)
388 patch_send = patch("salt.client.ssh.shell.Shell.send", return_value=("", "", 0))
389 patch_rand = patch("os.urandom", return_value=b"5\xd9l\xca\xc2\xff")
390
391 with patch_cmd, patch_rand, patch_send:
392 ret = single.shim_cmd(cmd_str="echo test")
393 assert ret == exp_ret
394 assert [
395 call("/bin/sh '.35d96ccac2ff.py'"),
396 call("rm '.35d96ccac2ff.py'"),
397 ] == mock_cmd.call_args_list
398
399 def test_run_ssh_pre_flight(self):
400 """
401 test Single.run_ssh_pre_flight
402 """
403 target = self.target.copy()
404 target["ssh_pre_flight"] = os.path.join(RUNTIME_VARS.TMP, "script.sh")
405 single = ssh.Single(
406 self.opts,
407 self.opts["argv"],
408 "localhost",
409 mods={},
410 fsclient=None,
411 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
412 mine=False,
413 winrm=False,
414 tty=True,
415 **target
416 )
417
418 exp_ret = ("Success", "", 0)
419 mock_cmd = MagicMock(return_value=exp_ret)
420 patch_cmd = patch("salt.client.ssh.shell.Shell.exec_cmd", mock_cmd)
421 patch_send = patch("salt.client.ssh.shell.Shell.send", return_value=exp_ret)
422 exp_tmp = os.path.join(
423 tempfile.gettempdir(), os.path.basename(target["ssh_pre_flight"])
424 )
425
426 with patch_cmd, patch_send:
427 ret = single.run_ssh_pre_flight()
428 assert ret == exp_ret
429 assert [
430 call("/bin/sh '{}'".format(exp_tmp)),
431 call("rm '{}'".format(exp_tmp)),
432 ] == mock_cmd.call_args_list
433
434 @skipIf(salt.utils.platform.is_windows(), "SSH_PY_SHIM not set on windows")
435 def test_cmd_run_set_path(self):
436 """
437 test when set_path is set
438 """
439 target = self.target
440 target["set_path"] = "$PATH:/tmp/path/"
441 single = ssh.Single(
442 self.opts,
443 self.opts["argv"],
444 "localhost",
445 mods={},
446 fsclient=None,
447 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
448 mine=False,
449 **self.target
450 )
451
452 ret = single._cmd_str()
453 assert re.search("\\" + target["set_path"], ret)
454
455 @skipIf(salt.utils.platform.is_windows(), "SSH_PY_SHIM not set on windows")
456 def test_cmd_run_not_set_path(self):
457 """
458 test when set_path is not set
459 """
460 target = self.target
461 single = ssh.Single(
462 self.opts,
463 self.opts["argv"],
464 "localhost",
465 mods={},
466 fsclient=None,
467 thin=salt.utils.thin.thin_path(self.opts["cachedir"]),
468 mine=False,
469 **self.target
470 )
471
472 ret = single._cmd_str()
473 assert re.search('SET_PATH=""', ret)
474
475
476 @skipIf(not salt.utils.path.which("ssh"), "No ssh binary found in path")
477 class SSHTests(ShellCase):
478 def setUp(self):
479 self.roster = """
480 localhost:
481 host: 127.0.0.1
482 port: 2827
483 """
484 self.opts = salt.config.client_config(self.get_config_file_path("master"))
485 self.opts["selected_target_option"] = "glob"
486
487 def test_expand_target_ip_address(self):
488 """
489 test expand_target when target is root@<ip address>
490 """
491 host = "127.0.0.1"
492 user = "test-user@"
493 opts = self.opts
494 opts["tgt"] = user + host
495
496 with patch(
497 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
498 ):
499 client = ssh.SSH(opts)
500 assert opts["tgt"] == user + host
501 with patch(
502 "salt.roster.get_roster_file", MagicMock(return_value="/etc/salt/roster")
503 ), patch(
504 "salt.client.ssh.compile_template",
505 MagicMock(return_value=salt.utils.yaml.safe_load(self.roster)),
506 ):
507 client._expand_target()
508 assert opts["tgt"] == host
509
510 def test_expand_target_dns(self):
511 """
512 test expand_target when target is root@<dns>
513 """
514 host = "localhost"
515 user = "test-user@"
516 opts = self.opts
517 opts["tgt"] = user + host
518
519 with patch(
520 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
521 ):
522 client = ssh.SSH(opts)
523 assert opts["tgt"] == user + host
524 with patch(
525 "salt.roster.get_roster_file", MagicMock(return_value="/etc/salt/roster")
526 ), patch(
527 "salt.client.ssh.compile_template",
528 MagicMock(return_value=salt.utils.yaml.safe_load(self.roster)),
529 ):
530 client._expand_target()
531 assert opts["tgt"] == host
532
533 def test_expand_target_no_user(self):
534 """
535 test expand_target when no user defined
536 """
537 host = "127.0.0.1"
538 opts = self.opts
539 opts["tgt"] = host
540
541 with patch(
542 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
543 ):
544 client = ssh.SSH(opts)
545 assert opts["tgt"] == host
546
547 with patch(
548 "salt.roster.get_roster_file", MagicMock(return_value="/etc/salt/roster")
549 ), patch(
550 "salt.client.ssh.compile_template",
551 MagicMock(return_value=salt.utils.yaml.safe_load(self.roster)),
552 ):
553 client._expand_target()
554 assert opts["tgt"] == host
555
556 def test_update_targets_ip_address(self):
557 """
558 test update_targets when host is ip address
559 """
560 host = "127.0.0.1"
561 user = "test-user@"
562 opts = self.opts
563 opts["tgt"] = user + host
564
565 with patch(
566 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
567 ):
568 client = ssh.SSH(opts)
569 assert opts["tgt"] == user + host
570 client._update_targets()
571 assert opts["tgt"] == host
572 assert client.targets[host]["user"] == user.split("@")[0]
573
574 def test_update_targets_dns(self):
575 """
576 test update_targets when host is dns
577 """
578 host = "localhost"
579 user = "test-user@"
580 opts = self.opts
581 opts["tgt"] = user + host
582
583 with patch(
584 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
585 ):
586 client = ssh.SSH(opts)
587 assert opts["tgt"] == user + host
588 client._update_targets()
589 assert opts["tgt"] == host
590 assert client.targets[host]["user"] == user.split("@")[0]
591
592 def test_update_targets_no_user(self):
593 """
594 test update_targets when no user defined
595 """
596 host = "127.0.0.1"
597 opts = self.opts
598 opts["tgt"] = host
599
600 with patch(
601 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
602 ):
603 client = ssh.SSH(opts)
604 assert opts["tgt"] == host
605 client._update_targets()
606 assert opts["tgt"] == host
607
608 def test_update_expand_target_dns(self):
609 """
610 test update_targets and expand_target when host is dns
611 """
612 host = "localhost"
613 user = "test-user@"
614 opts = self.opts
615 opts["tgt"] = user + host
616
617 with patch(
618 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
619 ):
620 client = ssh.SSH(opts)
621 assert opts["tgt"] == user + host
622 with patch(
623 "salt.roster.get_roster_file", MagicMock(return_value="/etc/salt/roster")
624 ), patch(
625 "salt.client.ssh.compile_template",
626 MagicMock(return_value=salt.utils.yaml.safe_load(self.roster)),
627 ):
628 client._expand_target()
629 client._update_targets()
630 assert opts["tgt"] == host
631 assert client.targets[host]["user"] == user.split("@")[0]
632
633 def test_parse_tgt(self):
634 """
635 test parse_tgt when user and host set on
636 the ssh cli tgt
637 """
638 host = "localhost"
639 user = "test-user@"
640 opts = self.opts
641 opts["tgt"] = user + host
642
643 with patch(
644 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
645 ):
646 assert not self.opts.get("ssh_cli_tgt")
647 client = ssh.SSH(opts)
648 assert client.parse_tgt["hostname"] == host
649 assert client.parse_tgt["user"] == user.split("@")[0]
650 assert self.opts.get("ssh_cli_tgt") == user + host
651
652 def test_parse_tgt_no_user(self):
653 """
654 test parse_tgt when only the host set on
655 the ssh cli tgt
656 """
657 host = "localhost"
658 opts = self.opts
659 opts["ssh_user"] = "ssh-usr"
660 opts["tgt"] = host
661
662 with patch(
663 "salt.utils.network.is_reachable_host", MagicMock(return_value=False)
664 ):
665 assert not self.opts.get("ssh_cli_tgt")
666 client = ssh.SSH(opts)
667 assert client.parse_tgt["hostname"] == host
668 assert client.parse_tgt["user"] == opts["ssh_user"]
669 assert self.opts.get("ssh_cli_tgt") == host