"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/tests/functional/agent/linux/test_ip_lib.py" (22 Oct 2019, 26236 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_ip_lib.py": 14.0.2_vs_14.0.3.

    1 # Copyright (c) 2014 Red Hat, Inc.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import collections
   17 
   18 import netaddr
   19 from neutron_lib import constants
   20 from neutron_lib.utils import net
   21 from oslo_config import cfg
   22 from oslo_log import log as logging
   23 from oslo_utils import importutils
   24 from oslo_utils import uuidutils
   25 import testtools
   26 
   27 from neutron.agent.linux import ip_lib
   28 from neutron.common import utils
   29 from neutron.conf.agent import common as config
   30 from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
   31 from neutron.tests.common import net_helpers
   32 from neutron.tests.functional import base as functional_base
   33 
   34 LOG = logging.getLogger(__name__)
   35 Device = collections.namedtuple('Device',
   36                                 'name ip_cidrs mac_address namespace')
   37 
   38 WRONG_IP = '0.0.0.0'
   39 TEST_IP = '240.0.0.1'
   40 TEST_IP_NEIGH = '240.0.0.2'
   41 TEST_IP_SECONDARY = '240.0.0.3'
   42 
   43 
   44 class IpLibTestFramework(functional_base.BaseSudoTestCase):
   45     def setUp(self):
   46         super(IpLibTestFramework, self).setUp()
   47         self._configure()
   48 
   49     def _configure(self):
   50         config.register_interface_driver_opts_helper(cfg.CONF)
   51         cfg.CONF.set_override(
   52             'interface_driver',
   53             'neutron.agent.linux.interface.OVSInterfaceDriver')
   54         config.register_interface_opts()
   55         self.driver = importutils.import_object(cfg.CONF.interface_driver,
   56                                                 cfg.CONF)
   57 
   58     def generate_device_details(self, name=None, ip_cidrs=None,
   59                                 mac_address=None, namespace=None):
   60         if ip_cidrs is None:
   61             ip_cidrs = ["%s/24" % TEST_IP]
   62         return Device(name or utils.get_rand_name(),
   63                       ip_cidrs,
   64                       mac_address or
   65                       net.get_random_mac('fa:16:3e:00:00:00'.split(':')),
   66                       namespace or utils.get_rand_name())
   67 
   68     def _safe_delete_device(self, device):
   69         try:
   70             device.link.delete()
   71         except RuntimeError:
   72             LOG.debug('Could not delete %s, was it already deleted?', device)
   73 
   74     def manage_device(self, attr):
   75         """Create a tuntap with the specified attributes.
   76 
   77         The device is cleaned up at the end of the test.
   78 
   79         :param attr: A Device namedtuple
   80         :return: A tuntap ip_lib.IPDevice
   81         """
   82         ip = ip_lib.IPWrapper(namespace=attr.namespace)
   83         if attr.namespace:
   84             ip.netns.add(attr.namespace)
   85             self.addCleanup(ip.netns.delete, attr.namespace)
   86         tap_device = ip.add_tuntap(attr.name)
   87         self.addCleanup(self._safe_delete_device, tap_device)
   88         tap_device.link.set_address(attr.mac_address)
   89         self.driver.init_l3(attr.name, attr.ip_cidrs,
   90                             namespace=attr.namespace)
   91         tap_device.link.set_up()
   92         return tap_device
   93 
   94 
   95 class IpLibTestCase(IpLibTestFramework):
   96 
   97     def test_rules_lifecycle(self):
   98         PRIORITY = 32768
   99         TABLE = 16
  100         attr = self.generate_device_details()
  101         device = self.manage_device(attr)
  102 
  103         test_cases = {
  104             constants.IP_VERSION_4: [
  105                 {
  106                     'ip': '1.1.1.1',
  107                     'to': '8.8.8.0/24'
  108                 },
  109                 {
  110                     'ip': '1.1.1.1',
  111                     'iif': device.name,
  112                     'to': '7.7.7.0/24'
  113                 }
  114             ],
  115             constants.IP_VERSION_6: [
  116                 {
  117                     'ip': 'abcd::1',
  118                     'to': '1234::/64'
  119                 },
  120                 {
  121                     'ip': 'abcd::1',
  122                     'iif': device.name,
  123                     'to': '4567::/64'
  124                 }
  125             ]
  126         }
  127         expected_rules = {
  128             constants.IP_VERSION_4: [
  129                 {
  130                     'from': '1.1.1.1',
  131                     'to': '8.8.8.0/24',
  132                     'priority': str(PRIORITY),
  133                     'table': str(TABLE),
  134                     'type': 'unicast'
  135                 }, {
  136                     'from': '0.0.0.0/0',
  137                     'to': '7.7.7.0/24',
  138                     'iif': device.name,
  139                     'priority': str(PRIORITY),
  140                     'table': str(TABLE),
  141                     'type': 'unicast'
  142                 }
  143             ],
  144             constants.IP_VERSION_6: [
  145                 {
  146                     'from': 'abcd::1',
  147                     'to': '1234::/64',
  148                     'priority': str(PRIORITY),
  149                     'table': str(TABLE),
  150                     'type': 'unicast'
  151                 },
  152                 {
  153                     'from': '::/0',
  154                     'to': '4567::/64',
  155                     'iif': device.name,
  156                     'priority': str(PRIORITY),
  157                     'table': str(TABLE),
  158                     'type': 'unicast',
  159                 }
  160             ]
  161         }
  162 
  163         for ip_version, test_case in test_cases.items():
  164             for rule in test_case:
  165                 ip_lib.add_ip_rule(namespace=device.namespace, table=TABLE,
  166                                    priority=PRIORITY, **rule)
  167 
  168             rules = ip_lib.list_ip_rules(device.namespace, ip_version)
  169             for expected_rule in expected_rules[ip_version]:
  170                 self.assertIn(expected_rule, rules)
  171 
  172             for rule in test_case:
  173                 ip_lib.delete_ip_rule(device.namespace, table=TABLE,
  174                                       priority=PRIORITY, **rule)
  175 
  176             rules = priv_ip_lib.list_ip_rules(device.namespace, ip_version)
  177             for expected_rule in expected_rules[ip_version]:
  178                 self.assertNotIn(expected_rule, rules)
  179 
  180     def test_device_exists(self):
  181         attr = self.generate_device_details()
  182 
  183         self.assertFalse(
  184             ip_lib.device_exists(attr.name, namespace=attr.namespace))
  185 
  186         device = self.manage_device(attr)
  187 
  188         self.assertTrue(
  189             ip_lib.device_exists(device.name, namespace=attr.namespace))
  190 
  191         self.assertFalse(
  192             ip_lib.device_exists(attr.name, namespace='wrong_namespace'))
  193 
  194         device.link.delete()
  195 
  196         self.assertFalse(
  197             ip_lib.device_exists(attr.name, namespace=attr.namespace))
  198 
  199     def test_ipdevice_exists(self):
  200         attr = self.generate_device_details()
  201         device = self.manage_device(attr)
  202         self.assertTrue(device.exists())
  203         device.link.delete()
  204         self.assertFalse(device.exists())
  205 
  206     def test_vlan_exists(self):
  207         attr = self.generate_device_details()
  208         ip = ip_lib.IPWrapper(namespace=attr.namespace)
  209         ip.netns.add(attr.namespace)
  210         self.addCleanup(ip.netns.delete, attr.namespace)
  211         priv_ip_lib.create_interface(attr.name, attr.namespace, 'dummy')
  212         self.assertFalse(ip_lib.vlan_in_use(1999, namespace=attr.namespace))
  213         device = ip.add_vlan('vlan1999', attr.name, 1999)
  214         self.assertTrue(ip_lib.vlan_in_use(1999, namespace=attr.namespace))
  215         device.link.delete()
  216         self.assertFalse(ip_lib.vlan_in_use(1999, namespace=attr.namespace))
  217 
  218     def test_vxlan_exists(self):
  219         attr = self.generate_device_details()
  220         ip = ip_lib.IPWrapper(namespace=attr.namespace)
  221         ip.netns.add(attr.namespace)
  222         self.addCleanup(ip.netns.delete, attr.namespace)
  223         self.assertFalse(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
  224         device = ip.add_vxlan(attr.name, 9999)
  225         self.addCleanup(self._safe_delete_device, device)
  226         self.assertTrue(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
  227         device.link.delete()
  228         self.assertFalse(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
  229 
  230     def test_ipwrapper_get_device_by_ip_None(self):
  231         ip_wrapper = ip_lib.IPWrapper(namespace=None)
  232         self.assertIsNone(ip_wrapper.get_device_by_ip(ip=None))
  233 
  234     def test_ipwrapper_get_device_by_ip(self):
  235         # We need to pass both IP and cidr values to get_device_by_ip()
  236         # to make sure it filters correctly.
  237         test_ip = "%s/24" % TEST_IP
  238         test_ip_secondary = "%s/24" % TEST_IP_SECONDARY
  239         attr = self.generate_device_details(
  240             ip_cidrs=[test_ip, test_ip_secondary]
  241         )
  242         self.manage_device(attr)
  243         ip_wrapper = ip_lib.IPWrapper(namespace=attr.namespace)
  244         self.assertEqual(attr.name, ip_wrapper.get_device_by_ip(TEST_IP).name)
  245         self.assertEqual(attr.name,
  246                          ip_wrapper.get_device_by_ip(TEST_IP_SECONDARY).name)
  247         self.assertIsNone(ip_wrapper.get_device_by_ip(TEST_IP_NEIGH))
  248         # this is in the same subnet, so will match if we pass as cidr
  249         test_ip_neigh = "%s/24" % TEST_IP_NEIGH
  250         self.assertEqual(attr.name,
  251                          ip_wrapper.get_device_by_ip(test_ip_neigh).name)
  252         self.assertIsNone(ip_wrapper.get_device_by_ip(WRONG_IP))
  253 
  254     def test_device_exists_with_ips_and_mac(self):
  255         attr = self.generate_device_details()
  256         device = self.manage_device(attr)
  257         self.assertTrue(
  258             ip_lib.device_exists_with_ips_and_mac(*attr))
  259 
  260         wrong_ip_cidr = '10.0.0.1/8'
  261         wrong_mac_address = 'aa:aa:aa:aa:aa:aa'
  262 
  263         attr = self.generate_device_details(name='wrong_name')
  264         self.assertFalse(
  265             ip_lib.device_exists_with_ips_and_mac(*attr))
  266 
  267         attr = self.generate_device_details(ip_cidrs=[wrong_ip_cidr])
  268         self.assertFalse(ip_lib.device_exists_with_ips_and_mac(*attr))
  269 
  270         attr = self.generate_device_details(mac_address=wrong_mac_address)
  271         self.assertFalse(ip_lib.device_exists_with_ips_and_mac(*attr))
  272 
  273         attr = self.generate_device_details(namespace='wrong_namespace')
  274         self.assertFalse(ip_lib.device_exists_with_ips_and_mac(*attr))
  275 
  276         device.link.delete()
  277 
  278     def test_get_device_mac(self):
  279         attr = self.generate_device_details()
  280         device = self.manage_device(attr)
  281 
  282         mac_address = ip_lib.get_device_mac(attr.name,
  283                                             namespace=attr.namespace)
  284 
  285         self.assertEqual(attr.mac_address, mac_address)
  286 
  287         device.link.delete()
  288 
  289     def test_get_device_mac_too_long_name(self):
  290         name = utils.get_rand_name(
  291             max_length=constants.DEVICE_NAME_MAX_LEN + 5)
  292         attr = self.generate_device_details(name=name)
  293         device = self.manage_device(attr)
  294 
  295         mac_address = ip_lib.get_device_mac(attr.name,
  296                                             namespace=attr.namespace)
  297 
  298         self.assertEqual(attr.mac_address, mac_address)
  299 
  300         device.link.delete()
  301 
  302     def test_gateway_lifecycle(self):
  303         attr = self.generate_device_details(
  304             ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
  305         )
  306         metric = 1000
  307         device = self.manage_device(attr)
  308         gateways = {
  309             constants.IP_VERSION_4: attr.ip_cidrs[0].split('/')[0],
  310             constants.IP_VERSION_6: "fd00::ff"
  311         }
  312         expected_gateways = {
  313             constants.IP_VERSION_4: {
  314                 'metric': metric,
  315                 'gateway': gateways[constants.IP_VERSION_4]},
  316             constants.IP_VERSION_6: {
  317                 'metric': metric,
  318                 'gateway': gateways[constants.IP_VERSION_6]}}
  319 
  320         for ip_version, gateway_ip in gateways.items():
  321             device.route.add_gateway(gateway_ip, metric)
  322 
  323             self.assertEqual(
  324                 expected_gateways[ip_version],
  325                 device.route.get_gateway(ip_version=ip_version))
  326 
  327             device.route.delete_gateway(gateway_ip)
  328             self.assertIsNone(
  329                 device.route.get_gateway(ip_version=ip_version))
  330 
  331     def test_gateway_flush(self):
  332         attr = self.generate_device_details(
  333             ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
  334         )
  335         device = self.manage_device(attr)
  336 
  337         gateways = {
  338             constants.IP_VERSION_4: attr.ip_cidrs[0].split('/')[0],
  339             constants.IP_VERSION_6: "fd00::ff"
  340         }
  341         for ip_version, gateway_ip in gateways.items():
  342             # Ensure that there is no gateway configured
  343             self.assertIsNone(
  344                 device.route.get_gateway(ip_version=ip_version))
  345 
  346             # Now lets add gateway
  347             device.route.add_gateway(gateway_ip, table="main")
  348             self.assertIsNotNone(
  349                 device.route.get_gateway(ip_version=ip_version))
  350 
  351             # Flush gateway and check that there is no any gateway configured
  352             device.route.flush(ip_version, table="main")
  353             self.assertIsNone(
  354                 device.route.get_gateway(ip_version=ip_version))
  355 
  356     def test_get_routing_table(self):
  357         attr = self.generate_device_details(
  358             ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
  359         )
  360         device = self.manage_device(attr)
  361         device_ip = attr.ip_cidrs[0].split('/')[0]
  362         destination = '8.8.8.0/24'
  363         device.route.add_route(destination, device_ip)
  364 
  365         destination6 = 'fd01::/64'
  366         device.route.add_route(destination6, "fd00::2")
  367 
  368         expected_routes = [{'nexthop': device_ip,
  369                             'device': attr.name,
  370                             'destination': destination,
  371                             'scope': 'universe'},
  372                            {'nexthop': None,
  373                             'device': attr.name,
  374                             'destination': str(
  375                                 netaddr.IPNetwork(attr.ip_cidrs[0]).cidr),
  376                             'scope': 'link'}]
  377 
  378         routes = ip_lib.get_routing_table(4, namespace=attr.namespace)
  379         self.assertItemsEqual(expected_routes, routes)
  380         self.assertIsInstance(routes, list)
  381 
  382         expected_routes6 = [{'nexthop': "fd00::2",
  383                              'device': attr.name,
  384                              'destination': destination6,
  385                              'scope': 'universe'},
  386                             {'nexthop': None,
  387                              'device': attr.name,
  388                              'destination': str(
  389                                  netaddr.IPNetwork(attr.ip_cidrs[1]).cidr),
  390                              'scope': 'universe'}]
  391         routes6 = ip_lib.get_routing_table(6, namespace=attr.namespace)
  392         self.assertItemsEqual(expected_routes6, routes6)
  393         self.assertIsInstance(routes6, list)
  394 
  395     def test_get_routing_table_no_namespace(self):
  396         with testtools.ExpectedException(ip_lib.NetworkNamespaceNotFound):
  397             ip_lib.get_routing_table(4, namespace="nonexistent-netns")
  398 
  399     def test_get_neigh_entries(self):
  400         attr = self.generate_device_details(
  401             ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
  402         )
  403         mac_address = net.get_random_mac('fa:16:3e:00:00:00'.split(':'))
  404         device = self.manage_device(attr)
  405         device.neigh.add(TEST_IP_NEIGH, mac_address)
  406 
  407         expected_neighs = [{'dst': TEST_IP_NEIGH,
  408                             'lladdr': mac_address,
  409                             'device': attr.name}]
  410 
  411         neighs = device.neigh.dump(4)
  412         self.assertItemsEqual(expected_neighs, neighs)
  413         self.assertIsInstance(neighs, list)
  414 
  415         device.neigh.delete(TEST_IP_NEIGH, mac_address)
  416         neighs = device.neigh.dump(4, dst=TEST_IP_NEIGH, lladdr=mac_address)
  417         self.assertEqual([], neighs)
  418 
  419     def test_get_neigh_entries_no_namespace(self):
  420         with testtools.ExpectedException(ip_lib.NetworkNamespaceNotFound):
  421             ip_lib.dump_neigh_entries(4, namespace="nonexistent-netns")
  422 
  423     def test_get_neigh_entries_no_interface(self):
  424         attr = self.generate_device_details(
  425             ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
  426         )
  427         self.manage_device(attr)
  428         with testtools.ExpectedException(ip_lib.NetworkInterfaceNotFound):
  429             ip_lib.dump_neigh_entries(4, device="nosuchdevice",
  430                                       namespace=attr.namespace)
  431 
  432     def test_delete_neigh_entries(self):
  433         attr = self.generate_device_details(
  434             ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
  435         )
  436         mac_address = net.get_random_mac('fa:16:3e:00:00:00'.split(':'))
  437         device = self.manage_device(attr)
  438 
  439         # trying to delete a non-existent entry shouldn't raise an error
  440         device.neigh.delete(TEST_IP_NEIGH, mac_address)
  441 
  442     def _check_for_device_name(self, ip, name, should_exist):
  443         exist = any(d for d in ip.get_devices() if d.name == name)
  444         self.assertEqual(should_exist, exist)
  445 
  446     def test_veth_exists(self):
  447         namespace1 = self.useFixture(net_helpers.NamespaceFixture())
  448         namespace2 = self.useFixture(net_helpers.NamespaceFixture())
  449         dev_name1 = utils.get_rand_name()
  450         dev_name2 = utils.get_rand_name()
  451 
  452         device1, device2 = namespace1.ip_wrapper.add_veth(
  453             dev_name1, dev_name2, namespace2.name)
  454         self.addCleanup(self._safe_delete_device, device1)
  455         self.addCleanup(self._safe_delete_device, device2)
  456 
  457         self._check_for_device_name(namespace1.ip_wrapper, dev_name1, True)
  458         self._check_for_device_name(namespace2.ip_wrapper, dev_name2, True)
  459         self._check_for_device_name(namespace1.ip_wrapper, dev_name2, False)
  460         self._check_for_device_name(namespace2.ip_wrapper, dev_name1, False)
  461 
  462         # As it is veth pair, remove of device1 should be enough to remove
  463         # both devices
  464         device1.link.delete()
  465         self._check_for_device_name(namespace1.ip_wrapper, dev_name1, False)
  466         self._check_for_device_name(namespace2.ip_wrapper, dev_name2, False)
  467 
  468     def test_macvtap_exists(self):
  469         namespace = self.useFixture(net_helpers.NamespaceFixture())
  470         src_dev_name = utils.get_rand_name()
  471         src_dev = namespace.ip_wrapper.add_dummy(src_dev_name)
  472         self.addCleanup(self._safe_delete_device, src_dev)
  473 
  474         dev_name = utils.get_rand_name()
  475         device = namespace.ip_wrapper.add_macvtap(dev_name, src_dev_name)
  476         self.addCleanup(self._safe_delete_device, device)
  477 
  478         self._check_for_device_name(namespace.ip_wrapper, dev_name, True)
  479         device.link.delete()
  480         self._check_for_device_name(namespace.ip_wrapper, dev_name, False)
  481 
  482     def test_dummy_exists(self):
  483         namespace = self.useFixture(net_helpers.NamespaceFixture())
  484         dev_name = utils.get_rand_name()
  485         device = namespace.ip_wrapper.add_dummy(dev_name)
  486         self.addCleanup(self._safe_delete_device, device)
  487         self._check_for_device_name(namespace.ip_wrapper, dev_name, True)
  488         device.link.delete()
  489         self._check_for_device_name(namespace.ip_wrapper, dev_name, False)
  490 
  491     def test_set_link_mtu(self):
  492         attr = self.generate_device_details()
  493         device = self.manage_device(attr)
  494         device.link.set_mtu(1450)
  495 
  496         self.assertEqual(1450, device.link.mtu)
  497 
  498         # Check if proper exception will be raised when wrong MTU value is
  499         # provided
  500         self.assertRaises(ip_lib.InvalidArgument, device.link.set_mtu, 1)
  501 
  502     def test_set_link_allmulticast_on(self):
  503         attr = self.generate_device_details()
  504         device = self.manage_device(attr)
  505 
  506         self.assertFalse(device.link.allmulticast)
  507         device.link.set_allmulticast_on()
  508         self.assertTrue(device.link.allmulticast)
  509 
  510     def test_set_link_netns(self):
  511         attr = self.generate_device_details()
  512         device = self.manage_device(attr)
  513         original_namespace = device.namespace
  514         original_ip_wrapper = ip_lib.IPWrapper(namespace=original_namespace)
  515         new_namespace = self.useFixture(net_helpers.NamespaceFixture())
  516 
  517         device.link.set_netns(new_namespace.name)
  518 
  519         self.assertEqual(new_namespace.name, device.namespace)
  520         self._check_for_device_name(
  521             new_namespace.ip_wrapper, device.name, True)
  522         self._check_for_device_name(
  523             original_ip_wrapper, device.name, False)
  524 
  525     def test_set_link_name(self):
  526         attr = self.generate_device_details()
  527         device = self.manage_device(attr)
  528         ip_wrapper = ip_lib.IPWrapper(namespace=device.namespace)
  529         original_name = device.name
  530         new_name = utils.get_rand_name()
  531 
  532         # device has to be DOWN to rename it
  533         device.link.set_down()
  534         device.link.set_name(new_name)
  535 
  536         self.assertEqual(new_name, device.name)
  537         self._check_for_device_name(ip_wrapper, new_name, True)
  538         self._check_for_device_name(ip_wrapper, original_name, False)
  539 
  540     def test_set_link_alias(self):
  541         attr = self.generate_device_details()
  542         device = self.manage_device(attr)
  543         alias = utils.get_rand_name()
  544 
  545         device.link.set_alias(alias)
  546 
  547         self.assertEqual(alias, device.link.alias)
  548 
  549     def _add_and_check_ips(self, device, ip_addresses):
  550         for cidr, scope, expected_broadcast in ip_addresses:
  551             # For IPv4 address add_broadcast flag will be set to True only
  552             # if expected_broadcast is given.
  553             # For IPv6 add_broadcast flag can be set to True always but
  554             # broadcast address will not be set, so expected_broadcast for
  555             # IPv6 should be always given as None.
  556             add_broadcast = True
  557             if cidr.version == constants.IP_VERSION_4:
  558                 add_broadcast = bool(expected_broadcast)
  559             device.addr.add(str(cidr), scope, add_broadcast)
  560 
  561         device_ips_info = [
  562             (netaddr.IPNetwork(ip_info['cidr']),
  563              ip_info['scope'],
  564              ip_info['broadcast']) for
  565             ip_info in device.addr.list()]
  566         self.assertItemsEqual(ip_addresses, device_ips_info)
  567 
  568     def _flush_ips(self, device, ip_version):
  569         device.addr.flush(ip_version)
  570         for ip_address in device.addr.list():
  571             cidr = netaddr.IPNetwork(ip_address['cidr'])
  572             self.assertNotEqual(ip_version, cidr.version)
  573 
  574     def test_add_ip_address(self):
  575         ip_addresses = [
  576             (netaddr.IPNetwork("10.10.10.10/30"), "global", '10.10.10.11'),
  577             (netaddr.IPNetwork("11.11.11.11/28"), "link", None),
  578             (netaddr.IPNetwork("2801::1/120"), "global", None),
  579             (netaddr.IPNetwork("fe80::/64"), "link", None)]
  580         attr = self.generate_device_details(ip_cidrs=[])
  581         device = self.manage_device(attr)
  582         self._add_and_check_ips(device, ip_addresses)
  583 
  584         # Now let's check if adding already existing IP address will raise
  585         # RuntimeError
  586         ip_address = ip_addresses[0]
  587         self.assertRaises(RuntimeError,
  588                           device.addr.add, str(ip_address[0]), ip_address[1])
  589 
  590     def test_delete_ip_address(self):
  591         attr = self.generate_device_details()
  592         cidr = attr.ip_cidrs[0]
  593         device = self.manage_device(attr)
  594 
  595         device_cidrs = [ip_info['cidr'] for ip_info in device.addr.list()]
  596         self.assertIn(cidr, device_cidrs)
  597 
  598         device.addr.delete(cidr)
  599         device_cidrs = [ip_info['cidr'] for ip_info in device.addr.list()]
  600         self.assertNotIn(cidr, device_cidrs)
  601 
  602         # Try to delete not existing IP address, it should be just fine and
  603         # finish without any error raised
  604         device.addr.delete(cidr)
  605 
  606     def test_flush_ip_addresses(self):
  607         ip_addresses = [
  608             (netaddr.IPNetwork("10.10.10.10/30"), "global", '10.10.10.11'),
  609             (netaddr.IPNetwork("11.11.11.11/28"), "link", None),
  610             (netaddr.IPNetwork("2801::1/120"), "global", None),
  611             (netaddr.IPNetwork("fe80::/64"), "link", None)]
  612         attr = self.generate_device_details(ip_cidrs=[])
  613         device = self.manage_device(attr)
  614 
  615         self._add_and_check_ips(device, ip_addresses)
  616         self._flush_ips(device, constants.IP_VERSION_4)
  617         self._flush_ips(device, constants.IP_VERSION_6)
  618 
  619 
  620 class TestSetIpNonlocalBind(functional_base.BaseSudoTestCase):
  621     def test_assigned_value(self):
  622         namespace = self.useFixture(net_helpers.NamespaceFixture())
  623         for expected in (0, 1):
  624             failed = ip_lib.set_ip_nonlocal_bind(expected, namespace.name)
  625             try:
  626                 observed = ip_lib.get_ip_nonlocal_bind(namespace.name)
  627             except RuntimeError as rte:
  628                 stat_message = (
  629                     'cannot stat /proc/sys/net/ipv4/ip_nonlocal_bind')
  630                 if stat_message in str(rte):
  631                     raise self.skipException(
  632                         "This kernel doesn't support %s in network "
  633                         "namespaces." % ip_lib.IP_NONLOCAL_BIND)
  634                 raise
  635 
  636             self.assertFalse(failed)
  637             self.assertEqual(expected, observed)
  638 
  639 
  640 class NamespaceTestCase(functional_base.BaseSudoTestCase):
  641 
  642     def setUp(self):
  643         super(NamespaceTestCase, self).setUp()
  644         self.namespace = 'test_ns_' + uuidutils.generate_uuid()
  645         ip_lib.create_network_namespace(self.namespace)
  646         self.addCleanup(self._delete_namespace)
  647 
  648     def _delete_namespace(self):
  649         ip_lib.delete_network_namespace(self.namespace)
  650 
  651     def test_network_namespace_exists_ns_exists(self):
  652         self.assertTrue(ip_lib.network_namespace_exists(self.namespace))
  653 
  654     def test_network_namespace_exists_ns_doesnt_exists(self):
  655         self.assertFalse(ip_lib.network_namespace_exists('another_ns'))
  656 
  657     def test_network_namespace_exists_ns_exists_try_is_ready(self):
  658         self.assertTrue(ip_lib.network_namespace_exists(self.namespace,
  659                                                         try_is_ready=True))
  660 
  661     def test_network_namespace_exists_ns_doesnt_exists_try_is_ready(self):
  662         self.assertFalse(ip_lib.network_namespace_exists('another_ns',
  663                                                          try_is_ready=True))