"Fossies" - the Fresh Open Source Software Archive

Member "ospd-2.0.1/tests/test_scan_and_result.py" (12 May 2020, 41388 Bytes) of package /linux/misc/openvas/ospd-2.0.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_scan_and_result.py": 2.0.0_vs_2.0.1.

    1 # Copyright (C) 2015-2018 Greenbone Networks GmbH
    2 #
    3 # SPDX-License-Identifier: GPL-2.0-or-later
    4 #
    5 # This program is free software; you can redistribute it and/or
    6 # modify it under the terms of the GNU General Public License
    7 # as published by the Free Software Foundation; either version 2
    8 # of the License, or (at your option) any later version.
    9 #
   10 # This program is distributed in the hope that it will be useful,
   11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   13 # GNU General Public License for more details.
   14 #
   15 # You should have received a copy of the GNU General Public License
   16 # along with this program; if not, write to the Free Software
   17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
   18 
   19 # pylint: disable=too-many-lines
   20 
   21 """ Test module for scan runs
   22 """
   23 
   24 import time
   25 import unittest
   26 
   27 from unittest.mock import patch
   28 
   29 import xml.etree.ElementTree as ET
   30 import defusedxml.lxml as secET
   31 
   32 from defusedxml.common import EntitiesForbidden
   33 
   34 from ospd.ospd import OSPDaemon
   35 from ospd.errors import OspdCommandError
   36 
   37 
   38 class Result(object):
   39     def __init__(self, type_, **kwargs):
   40         self.result_type = type_
   41         self.host = ''
   42         self.hostname = ''
   43         self.name = ''
   44         self.value = ''
   45         self.port = ''
   46         self.test_id = ''
   47         self.severity = ''
   48         self.qod = ''
   49         for name, value in kwargs.items():
   50             setattr(self, name, value)
   51 
   52 
   53 class FakeStream:
   54     def __init__(self):
   55         self.response = b''
   56 
   57     def write(self, data):
   58         self.response = self.response + data
   59 
   60     def get_response(self):
   61         return secET.fromstring(self.response)
   62 
   63     def clean_response(self):
   64         self.response = b''
   65 
   66 
   67 class DummyWrapper(OSPDaemon):
   68     def __init__(self, results, checkresult=True):
   69         super().__init__()
   70         self.checkresult = checkresult
   71         self.results = results
   72         self.is_cache_available = True
   73 
   74     def check(self):
   75         return self.checkresult
   76 
   77     @staticmethod
   78     def get_custom_vt_as_xml_str(vt_id, custom):
   79         return '<custom><mytest>static test</mytest></custom>'
   80 
   81     @staticmethod
   82     def get_params_vt_as_xml_str(vt_id, vt_params):
   83         return (
   84             '<params><param id="abc" type="string">'
   85             '<name>ABC</name><description>Test ABC</description>'
   86             '<default>yes</default></param>'
   87             '<param id="def" type="string">'
   88             '<name>DEF</name><description>Test DEF</description>'
   89             '<default>no</default></param></params>'
   90         )
   91 
   92     @staticmethod
   93     def get_refs_vt_as_xml_str(vt_id, vt_refs):
   94         response = (
   95             '<refs><ref type="cve" id="CVE-2010-4480"/>'
   96             '<ref type="url" id="http://example.com"/></refs>'
   97         )
   98         return response
   99 
  100     @staticmethod
  101     def get_dependencies_vt_as_xml_str(vt_id, vt_dependencies):
  102         response = (
  103             '<dependencies>'
  104             '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50282" />'
  105             '<dependency vt_id="1.3.6.1.4.1.25623.1.0.50283" />'
  106             '</dependencies>'
  107         )
  108 
  109         return response
  110 
  111     @staticmethod
  112     def get_severities_vt_as_xml_str(vt_id, severities):
  113         response = (
  114             '<severities><severity cvss_base="5.0" cvss_'
  115             'type="cvss_base_v2">AV:N/AC:L/Au:N/C:N/I:N/'
  116             'A:P</severity></severities>'
  117         )
  118 
  119         return response
  120 
  121     @staticmethod
  122     def get_detection_vt_as_xml_str(
  123         vt_id, detection=None, qod_type=None, qod=None
  124     ):
  125         response = '<detection qod_type="package">some detection</detection>'
  126 
  127         return response
  128 
  129     @staticmethod
  130     def get_summary_vt_as_xml_str(vt_id, summary):
  131         response = '<summary>Some summary</summary>'
  132 
  133         return response
  134 
  135     @staticmethod
  136     def get_affected_vt_as_xml_str(vt_id, affected):
  137         response = '<affected>Some affected</affected>'
  138 
  139         return response
  140 
  141     @staticmethod
  142     def get_impact_vt_as_xml_str(vt_id, impact):
  143         response = '<impact>Some impact</impact>'
  144 
  145         return response
  146 
  147     @staticmethod
  148     def get_insight_vt_as_xml_str(vt_id, insight):
  149         response = '<insight>Some insight</insight>'
  150 
  151         return response
  152 
  153     @staticmethod
  154     def get_solution_vt_as_xml_str(vt_id, solution, solution_type=None):
  155         response = '<solution>Some solution</solution>'
  156 
  157         return response
  158 
  159     @staticmethod
  160     def get_creation_time_vt_as_xml_str(
  161         vt_id, creation_time
  162     ):  # pylint: disable=arguments-differ
  163         response = '<creation_time>%s</creation_time>' % creation_time
  164 
  165         return response
  166 
  167     @staticmethod
  168     def get_modification_time_vt_as_xml_str(
  169         vt_id, modification_time
  170     ):  # pylint: disable=arguments-differ
  171         response = (
  172             '<modification_time>%s</modification_time>' % modification_time
  173         )
  174 
  175         return response
  176 
  177     def exec_scan(self, scan_id, target):
  178         time.sleep(0.01)
  179         for res in self.results:
  180             if res.result_type == 'log':
  181                 self.add_scan_log(
  182                     scan_id,
  183                     res.host or target,
  184                     res.hostname,
  185                     res.name,
  186                     res.value,
  187                     res.port,
  188                 )
  189             if res.result_type == 'error':
  190                 self.add_scan_error(
  191                     scan_id,
  192                     res.host or target,
  193                     res.hostname,
  194                     res.name,
  195                     res.value,
  196                     res.port,
  197                 )
  198             elif res.result_type == 'host-detail':
  199                 self.add_scan_host_detail(
  200                     scan_id,
  201                     res.host or target,
  202                     res.hostname,
  203                     res.name,
  204                     res.value,
  205                 )
  206             elif res.result_type == 'alarm':
  207                 self.add_scan_alarm(
  208                     scan_id,
  209                     res.host or target,
  210                     res.hostname,
  211                     res.name,
  212                     res.value,
  213                     res.port,
  214                     res.test_id,
  215                     res.severity,
  216                     res.qod,
  217                 )
  218             else:
  219                 raise ValueError(res.result_type)
  220 
  221 
  222 class ScanTestCase(unittest.TestCase):
  223     def __init__(self, *args, **kwargs):
  224         super().__init__(*args, **kwargs)
  225         self.fs = FakeStream()
  226 
  227     def test_get_default_scanner_params(self):
  228         daemon = DummyWrapper([])
  229         self.fs.clean_response()
  230         daemon.handle_command('<get_scanner_details />', self.fs)
  231         response = self.fs.get_response()
  232 
  233         # The status of the response must be success (i.e. 200)
  234         self.assertEqual(response.get('status'), '200')
  235         # The response root element must have the correct name
  236         self.assertEqual(response.tag, 'get_scanner_details_response')
  237         # The response must contain a 'scanner_params' element
  238         self.assertIsNotNone(response.find('scanner_params'))
  239 
  240     def test_get_default_help(self):
  241         daemon = DummyWrapper([])
  242         self.fs.clean_response()
  243         daemon.handle_command('<help />', self.fs)
  244         response = self.fs.get_response()
  245 
  246         self.assertEqual(response.get('status'), '200')
  247 
  248         self.fs.clean_response()
  249         daemon.handle_command('<help format="xml" />', self.fs)
  250         response = self.fs.get_response()
  251 
  252         self.assertEqual(response.get('status'), '200')
  253         self.assertEqual(response.tag, 'help_response')
  254 
  255     @patch('ospd.ospd.subprocess')
  256     def test_get_performance(self, mock_subproc):
  257         daemon = DummyWrapper([])
  258         mock_subproc.check_output.return_value = b'foo'
  259 
  260         self.fs.clean_response()
  261         daemon.handle_command(
  262             '<get_performance start="0" end="0" titles="mem"/>', self.fs
  263         )
  264         response = self.fs.get_response()
  265 
  266         self.assertEqual(response.get('status'), '200')
  267         self.assertEqual(response.tag, 'get_performance_response')
  268 
  269     def test_get_performance_fail_int(self):
  270         daemon = DummyWrapper([])
  271         cmd = secET.fromstring(
  272             '<get_performance start="a" end="0" titles="mem"/>'
  273         )
  274 
  275         self.assertRaises(OspdCommandError, daemon.handle_get_performance, cmd)
  276 
  277     def test_get_performance_fail_regex(self):
  278         daemon = DummyWrapper([])
  279         cmd = secET.fromstring(
  280             '<get_performance start="0" end="0" titles="mem|bar"/>'
  281         )
  282 
  283         self.assertRaises(OspdCommandError, daemon.handle_get_performance, cmd)
  284 
  285     def test_get_performance_fail_cmd(self):
  286         daemon = DummyWrapper([])
  287         cmd = secET.fromstring(
  288             '<get_performance start="0" end="0" titles="mem1"/>'
  289         )
  290         self.assertRaises(OspdCommandError, daemon.handle_get_performance, cmd)
  291 
  292     def test_get_default_scanner_version(self):
  293         daemon = DummyWrapper([])
  294         self.fs.clean_response()
  295         daemon.handle_command('<get_version />', self.fs)
  296         response = self.fs.get_response()
  297 
  298         self.assertEqual(response.get('status'), '200')
  299         self.assertIsNotNone(response.find('protocol'))
  300 
  301     def test_get_vts_no_vt(self):
  302         daemon = DummyWrapper([])
  303         self.fs.clean_response()
  304         daemon.handle_command('<get_vts />', self.fs)
  305         response = self.fs.get_response()
  306 
  307         self.assertEqual(response.get('status'), '200')
  308         self.assertIsNotNone(response.find('vts'))
  309 
  310         vts = response.find('vts')
  311         self.assertIsNone(vts.find('vt'))
  312 
  313     def test_get_vts_single_vt(self):
  314         daemon = DummyWrapper([])
  315         daemon.add_vt('1.2.3.4', 'A vulnerability test')
  316         self.fs.clean_response()
  317         daemon.handle_command('<get_vts />', self.fs)
  318         response = self.fs.get_response()
  319 
  320         self.assertEqual(response.get('status'), '200')
  321 
  322         vts = response.find('vts')
  323         self.assertIsNotNone(vts.find('vt'))
  324 
  325         vt = vts.find('vt')
  326         self.assertEqual(vt.get('id'), '1.2.3.4')
  327 
  328     def test_get_vts_non_existent_vt(self):
  329         daemon = DummyWrapper([])
  330         daemon.add_vt('1.2.3.4', 'A vulnerability test')
  331         self.fs.clean_response()
  332         daemon.handle_command('<get_vts vt_id="123"/>', self.fs)
  333         response = self.fs.get_response()
  334 
  335         self.assertEqual(response.get('status'), '404')
  336         self.assertTrue(daemon.is_cache_available)
  337 
  338     def test_get_vts_bad_filter(self):
  339         daemon = DummyWrapper([])
  340         self.fs.clean_response()
  341         cmd = '<get_vts filter="modification_time"/>'
  342 
  343         self.assertRaises(OspdCommandError, daemon.handle_command, cmd, self.fs)
  344         self.assertTrue(daemon.is_cache_available)
  345 
  346     def test_get_vts_filter_positive(self):
  347         daemon = DummyWrapper([])
  348         daemon.add_vt(
  349             '1.2.3.4',
  350             'A vulnerability test',
  351             vt_params="a",
  352             vt_modification_time='19000202',
  353         )
  354 
  355         self.fs.clean_response()
  356         daemon.handle_command(
  357             '<get_vts filter="modification_time&gt;19000201"></get_vts>',
  358             self.fs,
  359         )
  360         response = self.fs.get_response()
  361 
  362         self.assertEqual(response.get('status'), '200')
  363         vts = response.find('vts')
  364 
  365         vt = vts.find('vt')
  366         self.assertIsNotNone(vt)
  367         self.assertEqual(vt.get('id'), '1.2.3.4')
  368 
  369         modification_time = response.findall('vts/vt/modification_time')
  370         self.assertEqual(
  371             '<modification_time>19000202</modification_time>',
  372             ET.tostring(modification_time[0]).decode('utf-8'),
  373         )
  374 
  375     def test_get_vts_filter_negative(self):
  376         daemon = DummyWrapper([])
  377         daemon.add_vt(
  378             '1.2.3.4',
  379             'A vulnerability test',
  380             vt_params="a",
  381             vt_modification_time='19000202',
  382         )
  383 
  384         self.fs.clean_response()
  385         daemon.handle_command(
  386             '<get_vts filter="modification_time&lt;19000203"></get_vts>',
  387             self.fs,
  388         )
  389         response = self.fs.get_response()
  390 
  391         self.assertEqual(response.get('status'), '200')
  392         vts = response.find('vts')
  393 
  394         vt = vts.find('vt')
  395         self.assertIsNotNone(vt)
  396         self.assertEqual(vt.get('id'), '1.2.3.4')
  397 
  398         modification_time = response.findall('vts/vt/modification_time')
  399         self.assertEqual(
  400             '<modification_time>19000202</modification_time>',
  401             ET.tostring(modification_time[0]).decode('utf-8'),
  402         )
  403 
  404     def test_get_vtss_multiple_vts(self):
  405         daemon = DummyWrapper([])
  406         daemon.add_vt('1.2.3.4', 'A vulnerability test')
  407         daemon.add_vt('1.2.3.5', 'Another vulnerability test')
  408         daemon.add_vt('123456789', 'Yet another vulnerability test')
  409 
  410         self.fs.clean_response()
  411         daemon.handle_command('<get_vts />', self.fs)
  412         response = self.fs.get_response()
  413         self.assertEqual(response.get('status'), '200')
  414 
  415         vts = response.find('vts')
  416         self.assertIsNotNone(vts.find('vt'))
  417 
  418     def test_get_vts_multiple_vts_with_custom(self):
  419         daemon = DummyWrapper([])
  420         daemon.add_vt('1.2.3.4', 'A vulnerability test', custom='b')
  421         daemon.add_vt(
  422             '4.3.2.1', 'Another vulnerability test with custom info', custom='b'
  423         )
  424         daemon.add_vt('123456789', 'Yet another vulnerability test', custom='b')
  425 
  426         self.fs.clean_response()
  427         daemon.handle_command('<get_vts />', self.fs)
  428         response = self.fs.get_response()
  429         custom = response.findall('vts/vt/custom')
  430 
  431         self.assertEqual(3, len(custom))
  432 
  433     def test_get_vts_vts_with_params(self):
  434         daemon = DummyWrapper([])
  435         daemon.add_vt(
  436             '1.2.3.4', 'A vulnerability test', vt_params="a", custom="b"
  437         )
  438 
  439         self.fs.clean_response()
  440         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  441         response = self.fs.get_response()
  442 
  443         # The status of the response must be success (i.e. 200)
  444         self.assertEqual(response.get('status'), '200')
  445 
  446         # The response root element must have the correct name
  447         self.assertEqual(response.tag, 'get_vts_response')
  448         # The response must contain a 'scanner_params' element
  449         self.assertIsNotNone(response.find('vts'))
  450 
  451         vt_params = response[0][0].findall('params')
  452         self.assertEqual(1, len(vt_params))
  453 
  454         custom = response[0][0].findall('custom')
  455         self.assertEqual(1, len(custom))
  456 
  457         params = response.findall('vts/vt/params/param')
  458         self.assertEqual(2, len(params))
  459 
  460     def test_get_vts_vts_with_refs(self):
  461         daemon = DummyWrapper([])
  462         daemon.add_vt(
  463             '1.2.3.4',
  464             'A vulnerability test',
  465             vt_params="a",
  466             custom="b",
  467             vt_refs="c",
  468         )
  469 
  470         self.fs.clean_response()
  471         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  472         response = self.fs.get_response()
  473 
  474         # The status of the response must be success (i.e. 200)
  475         self.assertEqual(response.get('status'), '200')
  476 
  477         # The response root element must have the correct name
  478         self.assertEqual(response.tag, 'get_vts_response')
  479 
  480         # The response must contain a 'vts' element
  481         self.assertIsNotNone(response.find('vts'))
  482 
  483         vt_params = response[0][0].findall('params')
  484         self.assertEqual(1, len(vt_params))
  485 
  486         custom = response[0][0].findall('custom')
  487         self.assertEqual(1, len(custom))
  488 
  489         refs = response.findall('vts/vt/refs/ref')
  490         self.assertEqual(2, len(refs))
  491 
  492     def test_get_vts_vts_with_dependencies(self):
  493         daemon = DummyWrapper([])
  494         daemon.add_vt(
  495             '1.2.3.4',
  496             'A vulnerability test',
  497             vt_params="a",
  498             custom="b",
  499             vt_dependencies="c",
  500         )
  501 
  502         self.fs.clean_response()
  503         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  504         response = self.fs.get_response()
  505 
  506         deps = response.findall('vts/vt/dependencies/dependency')
  507         self.assertEqual(2, len(deps))
  508 
  509     def test_get_vts_vts_with_severities(self):
  510         daemon = DummyWrapper([])
  511         daemon.add_vt(
  512             '1.2.3.4',
  513             'A vulnerability test',
  514             vt_params="a",
  515             custom="b",
  516             severities="c",
  517         )
  518 
  519         self.fs.clean_response()
  520         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  521         response = self.fs.get_response()
  522 
  523         severity = response.findall('vts/vt/severities/severity')
  524         self.assertEqual(1, len(severity))
  525 
  526     def test_get_vts_vts_with_detection_qodt(self):
  527         daemon = DummyWrapper([])
  528         daemon.add_vt(
  529             '1.2.3.4',
  530             'A vulnerability test',
  531             vt_params="a",
  532             custom="b",
  533             detection="c",
  534             qod_t="d",
  535         )
  536 
  537         self.fs.clean_response()
  538         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  539         response = self.fs.get_response()
  540 
  541         detection = response.findall('vts/vt/detection')
  542         self.assertEqual(1, len(detection))
  543 
  544     def test_get_vts_vts_with_detection_qodv(self):
  545         daemon = DummyWrapper([])
  546         daemon.add_vt(
  547             '1.2.3.4',
  548             'A vulnerability test',
  549             vt_params="a",
  550             custom="b",
  551             detection="c",
  552             qod_v="d",
  553         )
  554 
  555         self.fs.clean_response()
  556         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  557         response = self.fs.get_response()
  558 
  559         detection = response.findall('vts/vt/detection')
  560         self.assertEqual(1, len(detection))
  561 
  562     def test_get_vts_vts_with_summary(self):
  563         daemon = DummyWrapper([])
  564         daemon.add_vt(
  565             '1.2.3.4',
  566             'A vulnerability test',
  567             vt_params="a",
  568             custom="b",
  569             summary="c",
  570         )
  571 
  572         self.fs.clean_response()
  573 
  574         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  575         response = self.fs.get_response()
  576 
  577         summary = response.findall('vts/vt/summary')
  578         self.assertEqual(1, len(summary))
  579 
  580     def test_get_vts_vts_with_impact(self):
  581         daemon = DummyWrapper([])
  582         daemon.add_vt(
  583             '1.2.3.4',
  584             'A vulnerability test',
  585             vt_params="a",
  586             custom="b",
  587             impact="c",
  588         )
  589 
  590         self.fs.clean_response()
  591         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  592         response = self.fs.get_response()
  593 
  594         impact = response.findall('vts/vt/impact')
  595         self.assertEqual(1, len(impact))
  596 
  597     def test_get_vts_vts_with_affected(self):
  598         daemon = DummyWrapper([])
  599         daemon.add_vt(
  600             '1.2.3.4',
  601             'A vulnerability test',
  602             vt_params="a",
  603             custom="b",
  604             affected="c",
  605         )
  606 
  607         self.fs.clean_response()
  608         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  609         response = self.fs.get_response()
  610 
  611         affect = response.findall('vts/vt/affected')
  612         self.assertEqual(1, len(affect))
  613 
  614     def test_get_vts_vts_with_insight(self):
  615         daemon = DummyWrapper([])
  616         daemon.add_vt(
  617             '1.2.3.4',
  618             'A vulnerability test',
  619             vt_params="a",
  620             custom="b",
  621             insight="c",
  622         )
  623 
  624         self.fs.clean_response()
  625         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  626         response = self.fs.get_response()
  627 
  628         insight = response.findall('vts/vt/insight')
  629         self.assertEqual(1, len(insight))
  630 
  631     def test_get_vts_vts_with_solution(self):
  632         daemon = DummyWrapper([])
  633         daemon.add_vt(
  634             '1.2.3.4',
  635             'A vulnerability test',
  636             vt_params="a",
  637             custom="b",
  638             solution="c",
  639             solution_t="d",
  640         )
  641 
  642         self.fs.clean_response()
  643         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  644         response = self.fs.get_response()
  645 
  646         solution = response.findall('vts/vt/solution')
  647         self.assertEqual(1, len(solution))
  648 
  649     def test_get_vts_vts_with_ctime(self):
  650         daemon = DummyWrapper([])
  651         daemon.add_vt(
  652             '1.2.3.4',
  653             'A vulnerability test',
  654             vt_params="a",
  655             vt_creation_time='01-01-1900',
  656         )
  657 
  658         self.fs.clean_response()
  659         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  660         response = self.fs.get_response()
  661 
  662         creation_time = response.findall('vts/vt/creation_time')
  663         self.assertEqual(
  664             '<creation_time>01-01-1900</creation_time>',
  665             ET.tostring(creation_time[0]).decode('utf-8'),
  666         )
  667 
  668     def test_get_vts_vts_with_mtime(self):
  669         daemon = DummyWrapper([])
  670         daemon.add_vt(
  671             '1.2.3.4',
  672             'A vulnerability test',
  673             vt_params="a",
  674             vt_modification_time='02-01-1900',
  675         )
  676 
  677         self.fs.clean_response()
  678         daemon.handle_command('<get_vts vt_id="1.2.3.4"></get_vts>', self.fs)
  679         response = self.fs.get_response()
  680 
  681         modification_time = response.findall('vts/vt/modification_time')
  682         self.assertEqual(
  683             '<modification_time>02-01-1900</modification_time>',
  684             ET.tostring(modification_time[0]).decode('utf-8'),
  685         )
  686 
  687     def test_clean_forgotten_scans(self):
  688         daemon = DummyWrapper([])
  689 
  690         self.fs.clean_response()
  691         daemon.handle_command(
  692             '<start_scan target="localhost" ports="80, '
  693             '443"><scanner_params /></start_scan>',
  694             self.fs,
  695         )
  696         response = self.fs.get_response()
  697         scan_id = response.findtext('id')
  698 
  699         finished = False
  700         while not finished:
  701             self.fs.clean_response()
  702             daemon.handle_command(
  703                 '<get_scans scan_id="%s" details="1"/>' % scan_id, self.fs
  704             )
  705             response = self.fs.get_response()
  706             scans = response.findall('scan')
  707             self.assertEqual(1, len(scans))
  708 
  709             scan = scans[0]
  710             status = scan.get('status')
  711 
  712             if scan.get('end_time') != '0':
  713                 finished = True
  714             else:
  715                 time.sleep(0.01)
  716 
  717         self.fs.clean_response()
  718         daemon.handle_command(
  719             '<get_scans scan_id="%s" details="1"/>' % scan_id, self.fs
  720         )
  721         response = self.fs.get_response()
  722 
  723         self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
  724 
  725         # Set an old end_time
  726         daemon.scan_collection.scans_table[scan_id]['end_time'] = 123456
  727         # Run the check
  728         daemon.clean_forgotten_scans()
  729         # Not removed
  730         self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 1)
  731 
  732         # Set the max time and run again
  733         daemon.scaninfo_store_time = 1
  734         daemon.clean_forgotten_scans()
  735         # Now is removed
  736         self.assertEqual(len(list(daemon.scan_collection.ids_iterator())), 0)
  737 
  738     def test_scan_with_error(self):
  739         daemon = DummyWrapper([Result('error', value='something went wrong')])
  740 
  741         self.fs.clean_response()
  742         daemon.handle_command(
  743             '<start_scan target="localhost" ports="80, '
  744             '443"><scanner_params /></start_scan>',
  745             self.fs,
  746         )
  747         response = self.fs.get_response()
  748 
  749         scan_id = response.findtext('id')
  750 
  751         finished = False
  752         while not finished:
  753             self.fs.clean_response()
  754             daemon.handle_command(
  755                 '<get_scans scan_id="%s" details="1"/>' % scan_id, self.fs
  756             )
  757             response = self.fs.get_response()
  758             scans = response.findall('scan')
  759             self.assertEqual(1, len(scans))
  760 
  761             scan = scans[0]
  762             status = scan.get('status')
  763 
  764             if status == "init" or status == "running":
  765                 self.assertEqual('0', scan.get('end_time'))
  766                 time.sleep(0.010)
  767             else:
  768                 finished = True
  769 
  770         self.fs.clean_response()
  771         daemon.handle_command(
  772             '<get_scans scan_id="%s" details="1"/>' % scan_id, self.fs
  773         )
  774         response = self.fs.get_response()
  775 
  776         self.assertEqual(
  777             response.findtext('scan/results/result'), 'something went wrong'
  778         )
  779 
  780         self.fs.clean_response()
  781         daemon.handle_command('<delete_scan scan_id="%s" />' % scan_id, self.fs)
  782         response = self.fs.get_response()
  783 
  784         self.assertEqual(response.get('status'), '200')
  785 
  786     def test_get_scan_pop(self):
  787         daemon = DummyWrapper([Result('host-detail', value='Some Host Detail')])
  788 
  789         self.fs.clean_response()
  790         daemon.handle_command(
  791             '<start_scan target="localhost" ports="80, 443">'
  792             '<scanner_params /></start_scan>',
  793             self.fs,
  794         )
  795         response = self.fs.get_response()
  796 
  797         scan_id = response.findtext('id')
  798         time.sleep(1)
  799 
  800         self.fs.clean_response()
  801         daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, self.fs)
  802         response = self.fs.get_response()
  803         self.assertEqual(
  804             response.findtext('scan/results/result'), 'Some Host Detail'
  805         )
  806 
  807         self.fs.clean_response()
  808         daemon.handle_command(
  809             '<get_scans scan_id="%s" pop_results="1"/>' % scan_id, self.fs
  810         )
  811         response = self.fs.get_response()
  812 
  813         self.assertEqual(
  814             response.findtext('scan/results/result'), 'Some Host Detail'
  815         )
  816 
  817         self.fs.clean_response()
  818         daemon.handle_command(
  819             '<get_scans details="0" pop_results="1"/>', self.fs
  820         )
  821         response = self.fs.get_response()
  822         self.assertEqual(response.findtext('scan/results/result'), None)
  823 
  824     def test_stop_scan(self):
  825         daemon = DummyWrapper([])
  826         self.fs.clean_response()
  827         daemon.handle_command(
  828             '<start_scan '
  829             'target="localhost" ports="80, 443">'
  830             '<scanner_params /></start_scan>',
  831             self.fs,
  832         )
  833         response = self.fs.get_response()
  834         scan_id = response.findtext('id')
  835 
  836         # Depending on the sistem this test can end with a race condition
  837         # because the scanner is already stopped when the <stop_scan>
  838         # command is run.
  839         time.sleep(3)
  840 
  841         cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
  842         self.assertRaises(
  843             OspdCommandError, daemon.handle_stop_scan_command, cmd
  844         )
  845 
  846         cmd = secET.fromstring('<stop_scan />')
  847         self.assertRaises(
  848             OspdCommandError, daemon.handle_stop_scan_command, cmd
  849         )
  850 
  851     def test_scan_with_vts(self):
  852         daemon = DummyWrapper([])
  853         cmd = secET.fromstring(
  854             '<start_scan '
  855             'target="localhost" ports="80, 443">'
  856             '<scanner_params /><vt_selection />'
  857             '</start_scan>'
  858         )
  859 
  860         with self.assertRaises(OspdCommandError):
  861             daemon.handle_start_scan_command(cmd)
  862 
  863         # With one vt, without params
  864         self.fs.clean_response()
  865         daemon.handle_command(
  866             '<start_scan '
  867             'target="localhost" ports="80, 443">'
  868             '<scanner_params /><vt_selection>'
  869             '<vt_single id="1.2.3.4" />'
  870             '</vt_selection></start_scan>',
  871             self.fs,
  872         )
  873         response = self.fs.get_response()
  874 
  875         scan_id = response.findtext('id')
  876         time.sleep(0.01)
  877 
  878         self.assertEqual(
  879             daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []}
  880         )
  881         self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
  882 
  883         # With out vtS
  884         self.fs.clean_response()
  885         daemon.handle_command(
  886             '<start_scan '
  887             'target="localhost" ports="80, 443">'
  888             '<scanner_params /></start_scan>',
  889             self.fs,
  890         )
  891         response = self.fs.get_response()
  892 
  893         scan_id = response.findtext('id')
  894         time.sleep(0.01)
  895         self.assertEqual(daemon.get_scan_vts(scan_id), {})
  896 
  897     def test_scan_with_vts_and_param(self):
  898         daemon = DummyWrapper([])
  899 
  900         # Raise because no vt_param id attribute
  901         cmd = secET.fromstring(
  902             '<start_scan '
  903             'target="localhost" ports="80, 443">'
  904             '<scanner_params /><vt_selection><vt_si'
  905             'ngle id="1234"><vt_value>200</vt_value>'
  906             '</vt_single></vt_selection></start_scan>'
  907         )
  908 
  909         with self.assertRaises(OspdCommandError):
  910             daemon.handle_start_scan_command(cmd)
  911 
  912         # No error
  913         self.fs.clean_response()
  914         daemon.handle_command(
  915             '<start_scan '
  916             'target="localhost" ports="80, 443">'
  917             '<scanner_params /><vt_selection><vt'
  918             '_single id="1234"><vt_value id="ABC">200'
  919             '</vt_value></vt_single></vt_selection>'
  920             '</start_scan>',
  921             self.fs,
  922         )
  923         response = self.fs.get_response()
  924 
  925         scan_id = response.findtext('id')
  926         time.sleep(0.01)
  927         self.assertEqual(
  928             daemon.get_scan_vts(scan_id),
  929             {'1234': {'ABC': '200'}, 'vt_groups': []},
  930         )
  931 
  932         # Raise because no vtgroup filter attribute
  933         cmd = secET.fromstring(
  934             '<start_scan '
  935             'target="localhost" ports="80, 443">'
  936             '<scanner_params /><vt_selection><vt_group/>'
  937             '</vt_selection></start_scan>'
  938         )
  939         self.assertRaises(
  940             OspdCommandError, daemon.handle_start_scan_command, cmd
  941         )
  942 
  943         # No error
  944         self.fs.clean_response()
  945         daemon.handle_command(
  946             '<start_scan '
  947             'target="localhost" ports="80, 443">'
  948             '<scanner_params /><vt_selection>'
  949             '<vt_group filter="a"/>'
  950             '</vt_selection></start_scan>',
  951             self.fs,
  952         )
  953         response = self.fs.get_response()
  954 
  955         scan_id = response.findtext('id')
  956         time.sleep(0.01)
  957         self.assertEqual(daemon.get_scan_vts(scan_id), {'vt_groups': ['a']})
  958 
  959     def test_billon_laughs(self):
  960         # pylint: disable=line-too-long
  961         daemon = DummyWrapper([])
  962         lol = (
  963             '<?xml version="1.0"?>'
  964             '<!DOCTYPE lolz ['
  965             ' <!ENTITY lol "lol">'
  966             ' <!ELEMENT lolz (#PCDATA)>'
  967             ' <!ENTITY lol1 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
  968             ' <!ENTITY lol2 "&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;&lol1;">'
  969             ' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
  970             ' <!ENTITY lol4 "&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;&lol3;">'
  971             ' <!ENTITY lol5 "&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;&lol4;">'
  972             ' <!ENTITY lol6 "&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;&lol5;">'
  973             ' <!ENTITY lol7 "&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;&lol6;">'
  974             ' <!ENTITY lol8 "&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;&lol7;">'
  975             ' <!ENTITY lol9 "&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;&lol8;">'
  976             ']>'
  977         )
  978         self.assertRaises(
  979             EntitiesForbidden, daemon.handle_command, lol, self.fs
  980         )
  981 
  982     def test_scan_multi_target(self):
  983         daemon = DummyWrapper([])
  984         self.fs.clean_response()
  985         daemon.handle_command(
  986             '<start_scan>'
  987             '<scanner_params /><vts><vt id="1.2.3.4" />'
  988             '</vts>'
  989             '<targets><target>'
  990             '<hosts>localhosts</hosts>'
  991             '<ports>80,443</ports>'
  992             '</target>'
  993             '<target><hosts>192.168.0.0/24</hosts>'
  994             '<ports>22</ports></target></targets>'
  995             '</start_scan>',
  996             self.fs,
  997         )
  998         response = self.fs.get_response()
  999         self.assertEqual(response.get('status'), '200')
 1000 
 1001     def test_multi_target_with_credentials(self):
 1002         daemon = DummyWrapper([])
 1003         self.fs.clean_response()
 1004         daemon.handle_command(
 1005             '<start_scan>'
 1006             '<scanner_params /><vts><vt id="1.2.3.4" />'
 1007             '</vts>'
 1008             '<targets><target><hosts>localhosts</hosts>'
 1009             '<ports>80,443</ports></target><target>'
 1010             '<hosts>192.168.0.0/24</hosts><ports>22'
 1011             '</ports><credentials>'
 1012             '<credential type="up" service="ssh" port="22">'
 1013             '<username>scanuser</username>'
 1014             '<password>mypass</password>'
 1015             '</credential><credential type="up" service="smb">'
 1016             '<username>smbuser</username>'
 1017             '<password>mypass</password></credential>'
 1018             '</credentials>'
 1019             '</target></targets>'
 1020             '</start_scan>',
 1021             self.fs,
 1022         )
 1023         response = self.fs.get_response()
 1024 
 1025         self.assertEqual(response.get('status'), '200')
 1026 
 1027         cred_dict = {
 1028             'ssh': {
 1029                 'type': 'up',
 1030                 'password': 'mypass',
 1031                 'port': '22',
 1032                 'username': 'scanuser',
 1033             },
 1034             'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
 1035         }
 1036         scan_id = response.findtext('id')
 1037         response = daemon.get_scan_credentials(scan_id, "192.168.0.0/24")
 1038         self.assertEqual(response, cred_dict)
 1039 
 1040     def test_scan_get_target(self):
 1041         daemon = DummyWrapper([])
 1042         self.fs.clean_response()
 1043         daemon.handle_command(
 1044             '<start_scan>'
 1045             '<scanner_params /><vts><vt id="1.2.3.4" />'
 1046             '</vts>'
 1047             '<targets><target>'
 1048             '<hosts>localhosts</hosts>'
 1049             '<ports>80,443</ports>'
 1050             '</target>'
 1051             '<target><hosts>192.168.0.0/24</hosts>'
 1052             '<ports>22</ports></target></targets>'
 1053             '</start_scan>',
 1054             self.fs,
 1055         )
 1056         response = self.fs.get_response()
 1057 
 1058         scan_id = response.findtext('id')
 1059         self.fs.clean_response()
 1060         daemon.handle_command('<get_scans scan_id="%s"/>' % scan_id, self.fs)
 1061         response = self.fs.get_response()
 1062 
 1063         scan_res = response.find('scan')
 1064         self.assertEqual(scan_res.get('target'), 'localhosts,192.168.0.0/24')
 1065 
 1066     def test_scan_get_finished_hosts(self):
 1067         daemon = DummyWrapper([])
 1068         self.fs.clean_response()
 1069         daemon.handle_command(
 1070             '<start_scan>'
 1071             '<scanner_params /><vts><vt id="1.2.3.4" />'
 1072             '</vts>'
 1073             '<targets><target>'
 1074             '<hosts>192.168.10.20-25</hosts>'
 1075             '<ports>80,443</ports>'
 1076             '<finished_hosts>192.168.10.23-24'
 1077             '</finished_hosts>'
 1078             '</target>'
 1079             '<target><hosts>192.168.0.0/24</hosts>'
 1080             '<ports>22</ports></target>'
 1081             '</targets>'
 1082             '</start_scan>',
 1083             self.fs,
 1084         )
 1085         response = self.fs.get_response()
 1086 
 1087         scan_id = response.findtext('id')
 1088         time.sleep(1)
 1089         finished = daemon.get_scan_finished_hosts(scan_id)
 1090         self.assertEqual(finished, ['192.168.10.23', '192.168.10.24'])
 1091 
 1092     def test_scan_multi_target_parallel_with_error(self):
 1093         daemon = DummyWrapper([])
 1094         cmd = secET.fromstring(
 1095             '<start_scan parallel="100a">'
 1096             '<scanner_params />'
 1097             '<targets><target>'
 1098             '<hosts>localhosts</hosts>'
 1099             '<ports>22</ports>'
 1100             '</target></targets>'
 1101             '</start_scan>'
 1102         )
 1103         time.sleep(1)
 1104         self.assertRaises(
 1105             OspdCommandError, daemon.handle_start_scan_command, cmd
 1106         )
 1107 
 1108     def test_scan_multi_target_parallel_100(self):
 1109         daemon = DummyWrapper([])
 1110         self.fs.clean_response()
 1111         daemon.handle_command(
 1112             '<start_scan parallel="100">'
 1113             '<scanner_params />'
 1114             '<targets><target>'
 1115             '<hosts>localhosts</hosts>'
 1116             '<ports>22</ports>'
 1117             '</target></targets>'
 1118             '</start_scan>',
 1119             self.fs,
 1120         )
 1121         response = self.fs.get_response()
 1122         time.sleep(1)
 1123         self.assertEqual(response.get('status'), '200')
 1124 
 1125     def test_progress(self):
 1126         daemon = DummyWrapper([])
 1127 
 1128         self.fs.clean_response()
 1129         daemon.handle_command(
 1130             '<start_scan parallel="2">'
 1131             '<scanner_params />'
 1132             '<targets><target>'
 1133             '<hosts>localhost1</hosts>'
 1134             '<ports>22</ports>'
 1135             '</target><target>'
 1136             '<hosts>localhost2</hosts>'
 1137             '<ports>22</ports>'
 1138             '</target></targets>'
 1139             '</start_scan>',
 1140             self.fs,
 1141         )
 1142         response = self.fs.get_response()
 1143 
 1144         scan_id = response.findtext('id')
 1145 
 1146         daemon.set_scan_host_progress(scan_id, 'localhost1', 'localhost1', 75)
 1147         daemon.set_scan_host_progress(scan_id, 'localhost2', 'localhost2', 25)
 1148 
 1149         self.assertEqual(daemon.calculate_progress(scan_id), 50)
 1150 
 1151     def test_set_get_vts_version(self):
 1152         daemon = DummyWrapper([])
 1153         daemon.set_vts_version('1234')
 1154 
 1155         version = daemon.get_vts_version()
 1156         self.assertEqual('1234', version)
 1157 
 1158     def test_set_get_vts_version_error(self):
 1159         daemon = DummyWrapper([])
 1160         self.assertRaises(TypeError, daemon.set_vts_version)
 1161 
 1162     def test_resume_task(self):
 1163         daemon = DummyWrapper(
 1164             [
 1165                 Result(
 1166                     'host-detail', host='localhost', value='Some Host Detail'
 1167                 ),
 1168                 Result(
 1169                     'host-detail', host='localhost', value='Some Host Detail2'
 1170                 ),
 1171             ]
 1172         )
 1173 
 1174         self.fs.clean_response()
 1175         daemon.handle_command(
 1176             '<start_scan parallel="2">'
 1177             '<scanner_params />'
 1178             '<targets><target>'
 1179             '<hosts>localhost</hosts>'
 1180             '<ports>22</ports>'
 1181             '</target></targets>'
 1182             '</start_scan>',
 1183             self.fs,
 1184         )
 1185         response = self.fs.get_response()
 1186         scan_id = response.findtext('id')
 1187 
 1188         time.sleep(3)
 1189         cmd = secET.fromstring('<stop_scan scan_id="%s" />' % scan_id)
 1190 
 1191         with self.assertRaises(OspdCommandError):
 1192             daemon.handle_stop_scan_command(cmd)
 1193 
 1194         self.fs.clean_response()
 1195         daemon.handle_command(
 1196             '<get_scans scan_id="%s" details="1"/>' % scan_id, self.fs
 1197         )
 1198         response = self.fs.get_response()
 1199 
 1200         result = response.findall('scan/results/result')
 1201         self.assertEqual(len(result), 2)
 1202 
 1203         # Resume the task
 1204         cmd = (
 1205             '<start_scan scan_id="%s" target="localhost" ports="80, 443">'
 1206             '<scanner_params /></start_scan>' % scan_id
 1207         )
 1208         self.fs.clean_response()
 1209         daemon.handle_command(cmd, self.fs)
 1210         response = self.fs.get_response()
 1211 
 1212         # Check unfinished host
 1213         self.assertEqual(response.findtext('id'), scan_id)
 1214         self.assertEqual(
 1215             daemon.get_scan_unfinished_hosts(scan_id), ['localhost']
 1216         )
 1217 
 1218         # Finished the host and check unfinished again.
 1219         daemon.set_scan_host_finished(scan_id, "localhost", "localhost")
 1220         self.assertEqual(daemon.get_scan_unfinished_hosts(scan_id), [])
 1221 
 1222         # Check finished hosts
 1223         self.assertEqual(
 1224             daemon.scan_collection.get_hosts_finished(scan_id), ['localhost']
 1225         )
 1226 
 1227         # Check if the result was removed.
 1228         self.fs.clean_response()
 1229         daemon.handle_command(
 1230             '<get_scans scan_id="%s" details="1"/>' % scan_id, self.fs
 1231         )
 1232         response = self.fs.get_response()
 1233 
 1234         result = response.findall('scan/results/result')
 1235         self.assertEqual(len(result), 0)
 1236 
 1237     def test_result_order(self):
 1238         daemon = DummyWrapper([])
 1239         self.fs.clean_response()
 1240         daemon.handle_command(
 1241             '<start_scan parallel="1">'
 1242             '<scanner_params />'
 1243             '<targets><target>'
 1244             '<hosts>a</hosts>'
 1245             '<ports>22</ports>'
 1246             '</target></targets>'
 1247             '</start_scan>',
 1248             self.fs,
 1249         )
 1250         response = self.fs.get_response()
 1251 
 1252         scan_id = response.findtext('id')
 1253 
 1254         daemon.add_scan_log(scan_id, host='a', name='a')
 1255         daemon.add_scan_log(scan_id, host='c', name='c')
 1256         daemon.add_scan_log(scan_id, host='b', name='b')
 1257         hosts = ['a', 'c', 'b']
 1258         self.fs.clean_response()
 1259         daemon.handle_command('<get_scans details="1"/>', self.fs)
 1260         response = self.fs.get_response()
 1261         results = response.findall("scan/results/")
 1262 
 1263         for idx, res in enumerate(results):
 1264             att_dict = res.attrib
 1265             self.assertEqual(hosts[idx], att_dict['name'])
 1266 
 1267     def test_is_cache_available_false(self):
 1268         daemon = DummyWrapper([])
 1269         daemon.is_cache_available = False
 1270         self.fs.clean_response()
 1271 
 1272         daemon.handle_command("<get_vts/>", self.fs)
 1273         response = self.fs.get_response()
 1274 
 1275         self.assertEqual(response.get('status'), '409')