"Fossies" - the Fresh Open Source Software Archive

Member "ospd-2.0.1/ospd/network.py" (12 May 2020, 13519 Bytes) of package /linux/misc/openvas/ospd-2.0.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "network.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 2.0.0_vs_2.0.1.

    1 # Copyright (C) 2019 Greenbone Networks GmbH
    2 #
    3 # SPDX-License-Identifier: GPL-2.0-or-later
    4 #
    5 # This program is free software; you can redistribute it and/or
    6 # modify it under the terms of the GNU General Public License
    7 # as published by the Free Software Foundation; either version 2
    8 # of the License, or (at your option) any later version.
    9 #
   10 # This program is distributed in the hope that it will be useful,
   11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   13 # GNU General Public License for more details.
   14 #
   15 # You should have received a copy of the GNU General Public License
   16 # along with this program; if not, write to the Free Software
   17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
   18 
   19 """ Helper module for network related functions
   20 """
   21 
   22 import binascii
   23 import collections
   24 import itertools
   25 import logging
   26 import re
   27 import socket
   28 import struct
   29 
   30 from typing import List, Optional, Tuple
   31 
   32 __LOGGER = logging.getLogger(__name__)
   33 
   34 
   35 def target_to_ipv4(target: str) -> Optional[List]:
   36     """ Attempt to return a single IPv4 host list from a target string. """
   37 
   38     try:
   39         socket.inet_pton(socket.AF_INET, target)
   40         return [target]
   41     except socket.error:
   42         return None
   43 
   44 
   45 def target_to_ipv6(target: str) -> Optional[List]:
   46     """ Attempt to return a single IPv6 host list from a target string. """
   47 
   48     try:
   49         socket.inet_pton(socket.AF_INET6, target)
   50         return [target]
   51     except socket.error:
   52         return None
   53 
   54 
   55 def ipv4_range_to_list(start_packed, end_packed) -> Optional[List]:
   56     """ Return a list of IPv4 entries from start_packed to end_packed. """
   57 
   58     new_list = list()
   59     start = struct.unpack('!L', start_packed)[0]
   60     end = struct.unpack('!L', end_packed)[0]
   61 
   62     for value in range(start, end + 1):
   63         new_ip = socket.inet_ntoa(struct.pack('!L', value))
   64         new_list.append(new_ip)
   65 
   66     return new_list
   67 
   68 
   69 def target_to_ipv4_short(target: str) -> Optional[List]:
   70     """ Attempt to return a IPv4 short range list from a target string. """
   71 
   72     splitted = target.split('-')
   73     if len(splitted) != 2:
   74         return None
   75 
   76     try:
   77         start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
   78         end_value = int(splitted[1])
   79     except (socket.error, ValueError):
   80         return None
   81 
   82     start_value = int(binascii.hexlify(bytes(start_packed[3])), 16)
   83     if end_value < 0 or end_value > 255 or end_value < start_value:
   84         return None
   85 
   86     end_packed = start_packed[0:3] + struct.pack('B', end_value)
   87 
   88     return ipv4_range_to_list(start_packed, end_packed)
   89 
   90 
   91 def target_to_ipv4_cidr(target: str) -> Optional[List]:
   92     """ Attempt to return a IPv4 CIDR list from a target string. """
   93 
   94     splitted = target.split('/')
   95     if len(splitted) != 2:
   96         return None
   97 
   98     try:
   99         start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
  100         block = int(splitted[1])
  101     except (socket.error, ValueError):
  102         return None
  103 
  104     if block <= 0 or block > 30:
  105         return None
  106 
  107     start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
  108     start_value = (start_value << (32 - block)) + 1
  109 
  110     end_value = (start_value | (0xFFFFFFFF >> block)) - 1
  111 
  112     start_packed = struct.pack('!I', start_value)
  113     end_packed = struct.pack('!I', end_value)
  114 
  115     return ipv4_range_to_list(start_packed, end_packed)
  116 
  117 
  118 def target_to_ipv6_cidr(target: str) -> Optional[List]:
  119     """ Attempt to return a IPv6 CIDR list from a target string. """
  120 
  121     splitted = target.split('/')
  122     if len(splitted) != 2:
  123         return None
  124 
  125     try:
  126         start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
  127         block = int(splitted[1])
  128     except (socket.error, ValueError):
  129         return None
  130 
  131     if block <= 0 or block > 126:
  132         return None
  133 
  134     start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
  135     start_value = (start_value << (128 - block)) + 1
  136 
  137     end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
  138 
  139     high = start_value >> 64
  140     low = start_value & ((1 << 64) - 1)
  141 
  142     start_packed = struct.pack('!QQ', high, low)
  143 
  144     high = end_value >> 64
  145     low = end_value & ((1 << 64) - 1)
  146 
  147     end_packed = struct.pack('!QQ', high, low)
  148 
  149     return ipv6_range_to_list(start_packed, end_packed)
  150 
  151 
  152 def target_to_ipv4_long(target: str) -> Optional[List]:
  153     """ Attempt to return a IPv4 long-range list from a target string. """
  154 
  155     splitted = target.split('-')
  156     if len(splitted) != 2:
  157         return None
  158 
  159     try:
  160         start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
  161         end_packed = socket.inet_pton(socket.AF_INET, splitted[1])
  162     except socket.error:
  163         return None
  164 
  165     if end_packed < start_packed:
  166         return None
  167 
  168     return ipv4_range_to_list(start_packed, end_packed)
  169 
  170 
  171 def ipv6_range_to_list(start_packed, end_packed) -> List:
  172     """ Return a list of IPv6 entries from start_packed to end_packed. """
  173 
  174     new_list = list()
  175 
  176     start = int(binascii.hexlify(start_packed), 16)
  177     end = int(binascii.hexlify(end_packed), 16)
  178 
  179     for value in range(start, end + 1):
  180         high = value >> 64
  181         low = value & ((1 << 64) - 1)
  182         new_ip = socket.inet_ntop(
  183             socket.AF_INET6, struct.pack('!2Q', high, low)
  184         )
  185         new_list.append(new_ip)
  186 
  187     return new_list
  188 
  189 
  190 def target_to_ipv6_short(target: str) -> Optional[List]:
  191     """ Attempt to return a IPv6 short-range list from a target string. """
  192 
  193     splitted = target.split('-')
  194     if len(splitted) != 2:
  195         return None
  196 
  197     try:
  198         start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
  199         end_value = int(splitted[1], 16)
  200     except (socket.error, ValueError):
  201         return None
  202 
  203     start_value = int(binascii.hexlify(start_packed[14:]), 16)
  204     if end_value < 0 or end_value > 0xFFFF or end_value < start_value:
  205         return None
  206 
  207     end_packed = start_packed[:14] + struct.pack('!H', end_value)
  208 
  209     return ipv6_range_to_list(start_packed, end_packed)
  210 
  211 
  212 def target_to_ipv6_long(target: str) -> Optional[List]:
  213     """ Attempt to return a IPv6 long-range list from a target string. """
  214 
  215     splitted = target.split('-')
  216     if len(splitted) != 2:
  217         return None
  218 
  219     try:
  220         start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
  221         end_packed = socket.inet_pton(socket.AF_INET6, splitted[1])
  222     except socket.error:
  223         return None
  224 
  225     if end_packed < start_packed:
  226         return None
  227 
  228     return ipv6_range_to_list(start_packed, end_packed)
  229 
  230 
  231 def target_to_hostname(target: str) -> Optional[List]:
  232     """ Attempt to return a single hostname list from a target string. """
  233 
  234     if len(target) == 0 or len(target) > 255:
  235         return None
  236 
  237     if not re.match(r'^[\w.-]+$', target):
  238         return None
  239 
  240     return [target]
  241 
  242 
  243 def target_to_list(target: str) -> Optional[List]:
  244     """ Attempt to return a list of single hosts from a target string. """
  245 
  246     # Is it an IPv4 address ?
  247     new_list = target_to_ipv4(target)
  248     # Is it an IPv6 address ?
  249     if not new_list:
  250         new_list = target_to_ipv6(target)
  251     # Is it an IPv4 CIDR ?
  252     if not new_list:
  253         new_list = target_to_ipv4_cidr(target)
  254     # Is it an IPv6 CIDR ?
  255     if not new_list:
  256         new_list = target_to_ipv6_cidr(target)
  257     # Is it an IPv4 short-range ?
  258     if not new_list:
  259         new_list = target_to_ipv4_short(target)
  260     # Is it an IPv4 long-range ?
  261     if not new_list:
  262         new_list = target_to_ipv4_long(target)
  263     # Is it an IPv6 short-range ?
  264     if not new_list:
  265         new_list = target_to_ipv6_short(target)
  266     # Is it an IPv6 long-range ?
  267     if not new_list:
  268         new_list = target_to_ipv6_long(target)
  269     # Is it a hostname ?
  270     if not new_list:
  271         new_list = target_to_hostname(target)
  272 
  273     return new_list
  274 
  275 
  276 def target_str_to_list(target_str: str) -> Optional[List]:
  277     """ Parses a targets string into a list of individual targets. """
  278     new_list = list()
  279 
  280     if not target_str:
  281         return None
  282 
  283     for target in target_str.split(','):
  284 
  285         target = target.strip()
  286         target_list = target_to_list(target)
  287 
  288         if target_list:
  289             new_list.extend(target_list)
  290         else:
  291             __LOGGER.info("%s: Invalid target value", target)
  292             return None
  293 
  294     return list(collections.OrderedDict.fromkeys(new_list))
  295 
  296 
  297 def resolve_hostname(hostname: str) -> Optional[str]:
  298     """ Returns IP of a hostname. """
  299 
  300     assert hostname
  301     try:
  302         return socket.gethostbyname(hostname)
  303     except socket.gaierror:
  304         return None
  305 
  306 
  307 def is_valid_address(address: str) -> bool:
  308     if not address:
  309         return False
  310 
  311     try:
  312         socket.inet_pton(socket.AF_INET, address)
  313     except OSError:
  314         # invalid IPv4 address
  315         try:
  316             socket.inet_pton(socket.AF_INET6, address)
  317         except OSError:
  318             # invalid IPv6 address
  319             return False
  320 
  321     return True
  322 
  323 
  324 def get_hostname_by_address(address: str) -> str:
  325     """ Returns hostname of an address. """
  326 
  327     if not is_valid_address(address):
  328         return ''
  329 
  330     try:
  331         hostname = socket.getfqdn(address)
  332     except (socket.gaierror, socket.herror):
  333         return ''
  334 
  335     if hostname == address:
  336         return ''
  337 
  338     return hostname
  339 
  340 
  341 def port_range_expand(portrange: str) -> Optional[List]:
  342     """
  343     Receive a port range and expands it in individual ports.
  344 
  345     @input Port range.
  346     e.g. "4-8"
  347 
  348     @return List of integers.
  349     e.g. [4, 5, 6, 7, 8]
  350     """
  351     if not portrange or '-' not in portrange:
  352         __LOGGER.info("Invalid port range format")
  353         return None
  354 
  355     port_list = list()
  356 
  357     for single_port in range(
  358         int(portrange[: portrange.index('-')]),
  359         int(portrange[portrange.index('-') + 1 :]) + 1,
  360     ):
  361         port_list.append(single_port)
  362 
  363     return port_list
  364 
  365 
  366 def port_str_arrange(ports: str) -> str:
  367     """ Gives a str in the format (always tcp listed first).
  368     T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
  369     """
  370     b_tcp = ports.find("T")
  371     b_udp = ports.find("U")
  372 
  373     if (b_udp != -1 and b_tcp != -1) and b_udp < b_tcp:
  374         return ports[b_tcp:] + ports[b_udp:b_tcp]
  375 
  376     return ports
  377 
  378 
  379 def ports_str_check_failed(port_str: str) -> bool:
  380     """
  381     Check if the port string is well formed.
  382     Return True if fail, False other case.
  383     """
  384 
  385     pattern = r'[^TU:0-9, \-]'
  386     if (
  387         re.search(pattern, port_str)
  388         or port_str.count('T') > 1
  389         or port_str.count('U') > 1
  390         or port_str.count(':') < (port_str.count('T') + port_str.count('U'))
  391     ):
  392         return True
  393 
  394     return False
  395 
  396 
  397 def ports_as_list(port_str: str) -> Tuple[Optional[List], Optional[List]]:
  398     """
  399     Parses a ports string into two list of individual tcp and udp ports.
  400 
  401     @input string containing a port list
  402     e.g. T:1,2,3,5-8 U:22,80,600-1024
  403 
  404     @return two list of sorted integers, for tcp and udp ports respectively.
  405     """
  406     if not port_str:
  407         __LOGGER.info("Invalid port value")
  408         return [None, None]
  409 
  410     if ports_str_check_failed(port_str):
  411         __LOGGER.info("{0}: Port list malformed.")
  412         return [None, None]
  413 
  414     tcp_list = list()
  415     udp_list = list()
  416 
  417     ports = port_str.replace(' ', '')
  418 
  419     b_tcp = ports.find("T")
  420     b_udp = ports.find("U")
  421 
  422     if ports[b_tcp - 1] == ',':
  423         ports = ports[: b_tcp - 1] + ports[b_tcp:]
  424 
  425     if ports[b_udp - 1] == ',':
  426         ports = ports[: b_udp - 1] + ports[b_udp:]
  427 
  428     ports = port_str_arrange(ports)
  429 
  430     tports = ''
  431     uports = ''
  432     # TCP ports listed first, then UDP ports
  433     if b_udp != -1 and b_tcp != -1:
  434         tports = ports[ports.index('T:') + 2 : ports.index('U:')]
  435         uports = ports[ports.index('U:') + 2 :]
  436     # Only UDP ports
  437     elif b_tcp == -1 and b_udp != -1:
  438         uports = ports[ports.index('U:') + 2 :]
  439     # Only TCP ports
  440     elif b_udp == -1 and b_tcp != -1:
  441         tports = ports[ports.index('T:') + 2 :]
  442     else:
  443         tports = ports
  444 
  445     if tports:
  446         for port in tports.split(','):
  447             if '-' in port:
  448                 tcp_list.extend(port_range_expand(port))
  449             else:
  450                 tcp_list.append(int(port))
  451         tcp_list.sort()
  452 
  453     if uports:
  454         for port in uports.split(','):
  455             if '-' in port:
  456                 udp_list.extend(port_range_expand(port))
  457             else:
  458                 udp_list.append(int(port))
  459         udp_list.sort()
  460 
  461     return (tcp_list, udp_list)
  462 
  463 
  464 def get_tcp_port_list(port_str: str) -> Optional[List]:
  465     """ Return a list with tcp ports from a given port list in string format """
  466     return ports_as_list(port_str)[0]
  467 
  468 
  469 def get_udp_port_list(port_str: str) -> Optional[List]:
  470     """ Return a list with udp ports from a given port list in string format """
  471     return ports_as_list(port_str)[1]
  472 
  473 
  474 def port_list_compress(port_list: str) -> str:
  475     """ Compress a port list and return a string. """
  476 
  477     if not port_list or len(port_list) == 0:
  478         __LOGGER.info("Invalid or empty port list.")
  479         return ''
  480 
  481     port_list = sorted(set(port_list))
  482     compressed_list = []
  483 
  484     for _key, group in itertools.groupby(
  485         enumerate(port_list), lambda t: t[1] - t[0]
  486     ):
  487         group = list(group)
  488 
  489         if group[0][1] == group[-1][1]:
  490             compressed_list.append(str(group[0][1]))
  491         else:
  492             compressed_list.append(str(group[0][1]) + '-' + str(group[-1][1]))
  493 
  494     return ','.join(compressed_list)