"Fossies" - the Fresh Open Source Software Archive

Member "ospd-2.0.1/tests/test_ssh_daemon.py" (12 May 2020, 4476 Bytes) of package /linux/misc/openvas/ospd-2.0.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_ssh_daemon.py": 2.0.0_vs_2.0.1.

    1 # Copyright (C) 2015-2018 Greenbone Networks GmbH
    2 #
    3 # SPDX-License-Identifier: GPL-2.0-or-later
    4 #
    5 # This program is free software; you can redistribute it and/or
    6 # modify it under the terms of the GNU General Public License
    7 # as published by the Free Software Foundation; either version 2
    8 # of the License, or (at your option) any later version.
    9 #
   10 # This program is distributed in the hope that it will be useful,
   11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   13 # GNU General Public License for more details.
   14 #
   15 # You should have received a copy of the GNU General Public License
   16 # along with this program; if not, write to the Free Software
   17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
   18 
   19 """ Test module for ospd ssh support.
   20 """
   21 
   22 import unittest
   23 
   24 from ospd import ospd_ssh
   25 from ospd.ospd_ssh import OSPDaemonSimpleSSH
   26 
   27 
   28 class FakeFile(object):
   29     def __init__(self, content):
   30         self.content = content
   31 
   32     def readlines(self):
   33         return self.content.split('\n')
   34 
   35 
   36 commands = None  # pylint: disable=invalid-name
   37 
   38 
   39 class FakeSSHClient(object):
   40     def __init__(self):
   41         global commands
   42         commands = []
   43 
   44     def set_missing_host_key_policy(self, policy):
   45         pass
   46 
   47     def connect(self, **kwargs):
   48         pass
   49 
   50     def exec_command(self, cmd):
   51         commands.append(cmd)
   52         return None, FakeFile(''), None
   53 
   54     def close(self):
   55         pass
   56 
   57 
   58 class FakeExceptions(object):
   59     AuthenticationException = None  # pylint: disable=invalid-name
   60 
   61 
   62 class fakeparamiko(object):  # pylint: disable=invalid-name
   63     @staticmethod
   64     def SSHClient(*args):  # pylint: disable=invalid-name
   65         return FakeSSHClient(*args)
   66 
   67     @staticmethod
   68     def AutoAddPolicy():  # pylint: disable=invalid-name
   69         pass
   70 
   71     ssh_exception = FakeExceptions
   72 
   73 
   74 class SSHDaemonTestCase(unittest.TestCase):
   75     def test_no_paramiko(self):
   76         ospd_ssh.paramiko = None
   77 
   78         with self.assertRaises(ImportError):
   79             OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
   80 
   81     def test_run_command(self):
   82         ospd_ssh.paramiko = fakeparamiko
   83         daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
   84         scanid = daemon.create_scan(
   85             None,
   86             [['host.example.com', '80, 443', '', '', '']],
   87             dict(port=5, ssh_timeout=15, username_password='dummy:pw'),
   88             '',
   89         )
   90 
   91         res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
   92 
   93         self.assertIsInstance(res, list)
   94         self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
   95 
   96     def test_run_command_legacy_credential(self):
   97         ospd_ssh.paramiko = fakeparamiko
   98 
   99         daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
  100         scanid = daemon.create_scan(
  101             None,
  102             [['host.example.com', '80, 443', '', '', '']],
  103             dict(port=5, ssh_timeout=15, username='dummy', password='pw'),
  104             '',
  105         )
  106 
  107         res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
  108 
  109         self.assertIsInstance(res, list)
  110         self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
  111 
  112     def test_run_command_new_credential(self):
  113         ospd_ssh.paramiko = fakeparamiko
  114 
  115         daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
  116 
  117         cred_dict = {
  118             'ssh': {
  119                 'type': 'up',
  120                 'password': 'mypass',
  121                 'port': '22',
  122                 'username': 'scanuser',
  123             },
  124             'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
  125         }
  126 
  127         scanid = daemon.create_scan(
  128             None,
  129             [['host.example.com', '80, 443', cred_dict, '', '']],
  130             dict(port=5, ssh_timeout=15),
  131             '',
  132         )
  133         res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
  134 
  135         self.assertIsInstance(res, list)
  136         self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
  137 
  138     def test_run_command_no_credential(self):
  139         ospd_ssh.paramiko = fakeparamiko
  140 
  141         daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
  142         scanid = daemon.create_scan(
  143             None,
  144             [['host.example.com', '80, 443', '', '', '']],
  145             dict(port=5, ssh_timeout=15),
  146             '',
  147         )
  148 
  149         with self.assertRaises(ValueError):
  150             daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')