"Fossies" - the Fresh Open Source Software Archive 
Member "neutron-14.0.3/neutron/tests/functional/agent/linux/test_ip_lib.py" (22 Oct 2019, 26236 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
See also the latest
Fossies "Diffs" side-by-side code changes report for "test_ip_lib.py":
14.0.2_vs_14.0.3.
1 # Copyright (c) 2014 Red Hat, Inc.
2 # All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
14 # under the License.
15
16 import collections
17
18 import netaddr
19 from neutron_lib import constants
20 from neutron_lib.utils import net
21 from oslo_config import cfg
22 from oslo_log import log as logging
23 from oslo_utils import importutils
24 from oslo_utils import uuidutils
25 import testtools
26
27 from neutron.agent.linux import ip_lib
28 from neutron.common import utils
29 from neutron.conf.agent import common as config
30 from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
31 from neutron.tests.common import net_helpers
32 from neutron.tests.functional import base as functional_base
33
34 LOG = logging.getLogger(__name__)
35 Device = collections.namedtuple('Device',
36 'name ip_cidrs mac_address namespace')
37
38 WRONG_IP = '0.0.0.0'
39 TEST_IP = '240.0.0.1'
40 TEST_IP_NEIGH = '240.0.0.2'
41 TEST_IP_SECONDARY = '240.0.0.3'
42
43
44 class IpLibTestFramework(functional_base.BaseSudoTestCase):
45 def setUp(self):
46 super(IpLibTestFramework, self).setUp()
47 self._configure()
48
49 def _configure(self):
50 config.register_interface_driver_opts_helper(cfg.CONF)
51 cfg.CONF.set_override(
52 'interface_driver',
53 'neutron.agent.linux.interface.OVSInterfaceDriver')
54 config.register_interface_opts()
55 self.driver = importutils.import_object(cfg.CONF.interface_driver,
56 cfg.CONF)
57
58 def generate_device_details(self, name=None, ip_cidrs=None,
59 mac_address=None, namespace=None):
60 if ip_cidrs is None:
61 ip_cidrs = ["%s/24" % TEST_IP]
62 return Device(name or utils.get_rand_name(),
63 ip_cidrs,
64 mac_address or
65 net.get_random_mac('fa:16:3e:00:00:00'.split(':')),
66 namespace or utils.get_rand_name())
67
68 def _safe_delete_device(self, device):
69 try:
70 device.link.delete()
71 except RuntimeError:
72 LOG.debug('Could not delete %s, was it already deleted?', device)
73
74 def manage_device(self, attr):
75 """Create a tuntap with the specified attributes.
76
77 The device is cleaned up at the end of the test.
78
79 :param attr: A Device namedtuple
80 :return: A tuntap ip_lib.IPDevice
81 """
82 ip = ip_lib.IPWrapper(namespace=attr.namespace)
83 if attr.namespace:
84 ip.netns.add(attr.namespace)
85 self.addCleanup(ip.netns.delete, attr.namespace)
86 tap_device = ip.add_tuntap(attr.name)
87 self.addCleanup(self._safe_delete_device, tap_device)
88 tap_device.link.set_address(attr.mac_address)
89 self.driver.init_l3(attr.name, attr.ip_cidrs,
90 namespace=attr.namespace)
91 tap_device.link.set_up()
92 return tap_device
93
94
95 class IpLibTestCase(IpLibTestFramework):
96
97 def test_rules_lifecycle(self):
98 PRIORITY = 32768
99 TABLE = 16
100 attr = self.generate_device_details()
101 device = self.manage_device(attr)
102
103 test_cases = {
104 constants.IP_VERSION_4: [
105 {
106 'ip': '1.1.1.1',
107 'to': '8.8.8.0/24'
108 },
109 {
110 'ip': '1.1.1.1',
111 'iif': device.name,
112 'to': '7.7.7.0/24'
113 }
114 ],
115 constants.IP_VERSION_6: [
116 {
117 'ip': 'abcd::1',
118 'to': '1234::/64'
119 },
120 {
121 'ip': 'abcd::1',
122 'iif': device.name,
123 'to': '4567::/64'
124 }
125 ]
126 }
127 expected_rules = {
128 constants.IP_VERSION_4: [
129 {
130 'from': '1.1.1.1',
131 'to': '8.8.8.0/24',
132 'priority': str(PRIORITY),
133 'table': str(TABLE),
134 'type': 'unicast'
135 }, {
136 'from': '0.0.0.0/0',
137 'to': '7.7.7.0/24',
138 'iif': device.name,
139 'priority': str(PRIORITY),
140 'table': str(TABLE),
141 'type': 'unicast'
142 }
143 ],
144 constants.IP_VERSION_6: [
145 {
146 'from': 'abcd::1',
147 'to': '1234::/64',
148 'priority': str(PRIORITY),
149 'table': str(TABLE),
150 'type': 'unicast'
151 },
152 {
153 'from': '::/0',
154 'to': '4567::/64',
155 'iif': device.name,
156 'priority': str(PRIORITY),
157 'table': str(TABLE),
158 'type': 'unicast',
159 }
160 ]
161 }
162
163 for ip_version, test_case in test_cases.items():
164 for rule in test_case:
165 ip_lib.add_ip_rule(namespace=device.namespace, table=TABLE,
166 priority=PRIORITY, **rule)
167
168 rules = ip_lib.list_ip_rules(device.namespace, ip_version)
169 for expected_rule in expected_rules[ip_version]:
170 self.assertIn(expected_rule, rules)
171
172 for rule in test_case:
173 ip_lib.delete_ip_rule(device.namespace, table=TABLE,
174 priority=PRIORITY, **rule)
175
176 rules = priv_ip_lib.list_ip_rules(device.namespace, ip_version)
177 for expected_rule in expected_rules[ip_version]:
178 self.assertNotIn(expected_rule, rules)
179
180 def test_device_exists(self):
181 attr = self.generate_device_details()
182
183 self.assertFalse(
184 ip_lib.device_exists(attr.name, namespace=attr.namespace))
185
186 device = self.manage_device(attr)
187
188 self.assertTrue(
189 ip_lib.device_exists(device.name, namespace=attr.namespace))
190
191 self.assertFalse(
192 ip_lib.device_exists(attr.name, namespace='wrong_namespace'))
193
194 device.link.delete()
195
196 self.assertFalse(
197 ip_lib.device_exists(attr.name, namespace=attr.namespace))
198
199 def test_ipdevice_exists(self):
200 attr = self.generate_device_details()
201 device = self.manage_device(attr)
202 self.assertTrue(device.exists())
203 device.link.delete()
204 self.assertFalse(device.exists())
205
206 def test_vlan_exists(self):
207 attr = self.generate_device_details()
208 ip = ip_lib.IPWrapper(namespace=attr.namespace)
209 ip.netns.add(attr.namespace)
210 self.addCleanup(ip.netns.delete, attr.namespace)
211 priv_ip_lib.create_interface(attr.name, attr.namespace, 'dummy')
212 self.assertFalse(ip_lib.vlan_in_use(1999, namespace=attr.namespace))
213 device = ip.add_vlan('vlan1999', attr.name, 1999)
214 self.assertTrue(ip_lib.vlan_in_use(1999, namespace=attr.namespace))
215 device.link.delete()
216 self.assertFalse(ip_lib.vlan_in_use(1999, namespace=attr.namespace))
217
218 def test_vxlan_exists(self):
219 attr = self.generate_device_details()
220 ip = ip_lib.IPWrapper(namespace=attr.namespace)
221 ip.netns.add(attr.namespace)
222 self.addCleanup(ip.netns.delete, attr.namespace)
223 self.assertFalse(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
224 device = ip.add_vxlan(attr.name, 9999)
225 self.addCleanup(self._safe_delete_device, device)
226 self.assertTrue(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
227 device.link.delete()
228 self.assertFalse(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
229
230 def test_ipwrapper_get_device_by_ip_None(self):
231 ip_wrapper = ip_lib.IPWrapper(namespace=None)
232 self.assertIsNone(ip_wrapper.get_device_by_ip(ip=None))
233
234 def test_ipwrapper_get_device_by_ip(self):
235 # We need to pass both IP and cidr values to get_device_by_ip()
236 # to make sure it filters correctly.
237 test_ip = "%s/24" % TEST_IP
238 test_ip_secondary = "%s/24" % TEST_IP_SECONDARY
239 attr = self.generate_device_details(
240 ip_cidrs=[test_ip, test_ip_secondary]
241 )
242 self.manage_device(attr)
243 ip_wrapper = ip_lib.IPWrapper(namespace=attr.namespace)
244 self.assertEqual(attr.name, ip_wrapper.get_device_by_ip(TEST_IP).name)
245 self.assertEqual(attr.name,
246 ip_wrapper.get_device_by_ip(TEST_IP_SECONDARY).name)
247 self.assertIsNone(ip_wrapper.get_device_by_ip(TEST_IP_NEIGH))
248 # this is in the same subnet, so will match if we pass as cidr
249 test_ip_neigh = "%s/24" % TEST_IP_NEIGH
250 self.assertEqual(attr.name,
251 ip_wrapper.get_device_by_ip(test_ip_neigh).name)
252 self.assertIsNone(ip_wrapper.get_device_by_ip(WRONG_IP))
253
254 def test_device_exists_with_ips_and_mac(self):
255 attr = self.generate_device_details()
256 device = self.manage_device(attr)
257 self.assertTrue(
258 ip_lib.device_exists_with_ips_and_mac(*attr))
259
260 wrong_ip_cidr = '10.0.0.1/8'
261 wrong_mac_address = 'aa:aa:aa:aa:aa:aa'
262
263 attr = self.generate_device_details(name='wrong_name')
264 self.assertFalse(
265 ip_lib.device_exists_with_ips_and_mac(*attr))
266
267 attr = self.generate_device_details(ip_cidrs=[wrong_ip_cidr])
268 self.assertFalse(ip_lib.device_exists_with_ips_and_mac(*attr))
269
270 attr = self.generate_device_details(mac_address=wrong_mac_address)
271 self.assertFalse(ip_lib.device_exists_with_ips_and_mac(*attr))
272
273 attr = self.generate_device_details(namespace='wrong_namespace')
274 self.assertFalse(ip_lib.device_exists_with_ips_and_mac(*attr))
275
276 device.link.delete()
277
278 def test_get_device_mac(self):
279 attr = self.generate_device_details()
280 device = self.manage_device(attr)
281
282 mac_address = ip_lib.get_device_mac(attr.name,
283 namespace=attr.namespace)
284
285 self.assertEqual(attr.mac_address, mac_address)
286
287 device.link.delete()
288
289 def test_get_device_mac_too_long_name(self):
290 name = utils.get_rand_name(
291 max_length=constants.DEVICE_NAME_MAX_LEN + 5)
292 attr = self.generate_device_details(name=name)
293 device = self.manage_device(attr)
294
295 mac_address = ip_lib.get_device_mac(attr.name,
296 namespace=attr.namespace)
297
298 self.assertEqual(attr.mac_address, mac_address)
299
300 device.link.delete()
301
302 def test_gateway_lifecycle(self):
303 attr = self.generate_device_details(
304 ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
305 )
306 metric = 1000
307 device = self.manage_device(attr)
308 gateways = {
309 constants.IP_VERSION_4: attr.ip_cidrs[0].split('/')[0],
310 constants.IP_VERSION_6: "fd00::ff"
311 }
312 expected_gateways = {
313 constants.IP_VERSION_4: {
314 'metric': metric,
315 'gateway': gateways[constants.IP_VERSION_4]},
316 constants.IP_VERSION_6: {
317 'metric': metric,
318 'gateway': gateways[constants.IP_VERSION_6]}}
319
320 for ip_version, gateway_ip in gateways.items():
321 device.route.add_gateway(gateway_ip, metric)
322
323 self.assertEqual(
324 expected_gateways[ip_version],
325 device.route.get_gateway(ip_version=ip_version))
326
327 device.route.delete_gateway(gateway_ip)
328 self.assertIsNone(
329 device.route.get_gateway(ip_version=ip_version))
330
331 def test_gateway_flush(self):
332 attr = self.generate_device_details(
333 ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
334 )
335 device = self.manage_device(attr)
336
337 gateways = {
338 constants.IP_VERSION_4: attr.ip_cidrs[0].split('/')[0],
339 constants.IP_VERSION_6: "fd00::ff"
340 }
341 for ip_version, gateway_ip in gateways.items():
342 # Ensure that there is no gateway configured
343 self.assertIsNone(
344 device.route.get_gateway(ip_version=ip_version))
345
346 # Now lets add gateway
347 device.route.add_gateway(gateway_ip, table="main")
348 self.assertIsNotNone(
349 device.route.get_gateway(ip_version=ip_version))
350
351 # Flush gateway and check that there is no any gateway configured
352 device.route.flush(ip_version, table="main")
353 self.assertIsNone(
354 device.route.get_gateway(ip_version=ip_version))
355
356 def test_get_routing_table(self):
357 attr = self.generate_device_details(
358 ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
359 )
360 device = self.manage_device(attr)
361 device_ip = attr.ip_cidrs[0].split('/')[0]
362 destination = '8.8.8.0/24'
363 device.route.add_route(destination, device_ip)
364
365 destination6 = 'fd01::/64'
366 device.route.add_route(destination6, "fd00::2")
367
368 expected_routes = [{'nexthop': device_ip,
369 'device': attr.name,
370 'destination': destination,
371 'scope': 'universe'},
372 {'nexthop': None,
373 'device': attr.name,
374 'destination': str(
375 netaddr.IPNetwork(attr.ip_cidrs[0]).cidr),
376 'scope': 'link'}]
377
378 routes = ip_lib.get_routing_table(4, namespace=attr.namespace)
379 self.assertItemsEqual(expected_routes, routes)
380 self.assertIsInstance(routes, list)
381
382 expected_routes6 = [{'nexthop': "fd00::2",
383 'device': attr.name,
384 'destination': destination6,
385 'scope': 'universe'},
386 {'nexthop': None,
387 'device': attr.name,
388 'destination': str(
389 netaddr.IPNetwork(attr.ip_cidrs[1]).cidr),
390 'scope': 'universe'}]
391 routes6 = ip_lib.get_routing_table(6, namespace=attr.namespace)
392 self.assertItemsEqual(expected_routes6, routes6)
393 self.assertIsInstance(routes6, list)
394
395 def test_get_routing_table_no_namespace(self):
396 with testtools.ExpectedException(ip_lib.NetworkNamespaceNotFound):
397 ip_lib.get_routing_table(4, namespace="nonexistent-netns")
398
399 def test_get_neigh_entries(self):
400 attr = self.generate_device_details(
401 ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
402 )
403 mac_address = net.get_random_mac('fa:16:3e:00:00:00'.split(':'))
404 device = self.manage_device(attr)
405 device.neigh.add(TEST_IP_NEIGH, mac_address)
406
407 expected_neighs = [{'dst': TEST_IP_NEIGH,
408 'lladdr': mac_address,
409 'device': attr.name}]
410
411 neighs = device.neigh.dump(4)
412 self.assertItemsEqual(expected_neighs, neighs)
413 self.assertIsInstance(neighs, list)
414
415 device.neigh.delete(TEST_IP_NEIGH, mac_address)
416 neighs = device.neigh.dump(4, dst=TEST_IP_NEIGH, lladdr=mac_address)
417 self.assertEqual([], neighs)
418
419 def test_get_neigh_entries_no_namespace(self):
420 with testtools.ExpectedException(ip_lib.NetworkNamespaceNotFound):
421 ip_lib.dump_neigh_entries(4, namespace="nonexistent-netns")
422
423 def test_get_neigh_entries_no_interface(self):
424 attr = self.generate_device_details(
425 ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
426 )
427 self.manage_device(attr)
428 with testtools.ExpectedException(ip_lib.NetworkInterfaceNotFound):
429 ip_lib.dump_neigh_entries(4, device="nosuchdevice",
430 namespace=attr.namespace)
431
432 def test_delete_neigh_entries(self):
433 attr = self.generate_device_details(
434 ip_cidrs=["%s/24" % TEST_IP, "fd00::1/64"]
435 )
436 mac_address = net.get_random_mac('fa:16:3e:00:00:00'.split(':'))
437 device = self.manage_device(attr)
438
439 # trying to delete a non-existent entry shouldn't raise an error
440 device.neigh.delete(TEST_IP_NEIGH, mac_address)
441
442 def _check_for_device_name(self, ip, name, should_exist):
443 exist = any(d for d in ip.get_devices() if d.name == name)
444 self.assertEqual(should_exist, exist)
445
446 def test_veth_exists(self):
447 namespace1 = self.useFixture(net_helpers.NamespaceFixture())
448 namespace2 = self.useFixture(net_helpers.NamespaceFixture())
449 dev_name1 = utils.get_rand_name()
450 dev_name2 = utils.get_rand_name()
451
452 device1, device2 = namespace1.ip_wrapper.add_veth(
453 dev_name1, dev_name2, namespace2.name)
454 self.addCleanup(self._safe_delete_device, device1)
455 self.addCleanup(self._safe_delete_device, device2)
456
457 self._check_for_device_name(namespace1.ip_wrapper, dev_name1, True)
458 self._check_for_device_name(namespace2.ip_wrapper, dev_name2, True)
459 self._check_for_device_name(namespace1.ip_wrapper, dev_name2, False)
460 self._check_for_device_name(namespace2.ip_wrapper, dev_name1, False)
461
462 # As it is veth pair, remove of device1 should be enough to remove
463 # both devices
464 device1.link.delete()
465 self._check_for_device_name(namespace1.ip_wrapper, dev_name1, False)
466 self._check_for_device_name(namespace2.ip_wrapper, dev_name2, False)
467
468 def test_macvtap_exists(self):
469 namespace = self.useFixture(net_helpers.NamespaceFixture())
470 src_dev_name = utils.get_rand_name()
471 src_dev = namespace.ip_wrapper.add_dummy(src_dev_name)
472 self.addCleanup(self._safe_delete_device, src_dev)
473
474 dev_name = utils.get_rand_name()
475 device = namespace.ip_wrapper.add_macvtap(dev_name, src_dev_name)
476 self.addCleanup(self._safe_delete_device, device)
477
478 self._check_for_device_name(namespace.ip_wrapper, dev_name, True)
479 device.link.delete()
480 self._check_for_device_name(namespace.ip_wrapper, dev_name, False)
481
482 def test_dummy_exists(self):
483 namespace = self.useFixture(net_helpers.NamespaceFixture())
484 dev_name = utils.get_rand_name()
485 device = namespace.ip_wrapper.add_dummy(dev_name)
486 self.addCleanup(self._safe_delete_device, device)
487 self._check_for_device_name(namespace.ip_wrapper, dev_name, True)
488 device.link.delete()
489 self._check_for_device_name(namespace.ip_wrapper, dev_name, False)
490
491 def test_set_link_mtu(self):
492 attr = self.generate_device_details()
493 device = self.manage_device(attr)
494 device.link.set_mtu(1450)
495
496 self.assertEqual(1450, device.link.mtu)
497
498 # Check if proper exception will be raised when wrong MTU value is
499 # provided
500 self.assertRaises(ip_lib.InvalidArgument, device.link.set_mtu, 1)
501
502 def test_set_link_allmulticast_on(self):
503 attr = self.generate_device_details()
504 device = self.manage_device(attr)
505
506 self.assertFalse(device.link.allmulticast)
507 device.link.set_allmulticast_on()
508 self.assertTrue(device.link.allmulticast)
509
510 def test_set_link_netns(self):
511 attr = self.generate_device_details()
512 device = self.manage_device(attr)
513 original_namespace = device.namespace
514 original_ip_wrapper = ip_lib.IPWrapper(namespace=original_namespace)
515 new_namespace = self.useFixture(net_helpers.NamespaceFixture())
516
517 device.link.set_netns(new_namespace.name)
518
519 self.assertEqual(new_namespace.name, device.namespace)
520 self._check_for_device_name(
521 new_namespace.ip_wrapper, device.name, True)
522 self._check_for_device_name(
523 original_ip_wrapper, device.name, False)
524
525 def test_set_link_name(self):
526 attr = self.generate_device_details()
527 device = self.manage_device(attr)
528 ip_wrapper = ip_lib.IPWrapper(namespace=device.namespace)
529 original_name = device.name
530 new_name = utils.get_rand_name()
531
532 # device has to be DOWN to rename it
533 device.link.set_down()
534 device.link.set_name(new_name)
535
536 self.assertEqual(new_name, device.name)
537 self._check_for_device_name(ip_wrapper, new_name, True)
538 self._check_for_device_name(ip_wrapper, original_name, False)
539
540 def test_set_link_alias(self):
541 attr = self.generate_device_details()
542 device = self.manage_device(attr)
543 alias = utils.get_rand_name()
544
545 device.link.set_alias(alias)
546
547 self.assertEqual(alias, device.link.alias)
548
549 def _add_and_check_ips(self, device, ip_addresses):
550 for cidr, scope, expected_broadcast in ip_addresses:
551 # For IPv4 address add_broadcast flag will be set to True only
552 # if expected_broadcast is given.
553 # For IPv6 add_broadcast flag can be set to True always but
554 # broadcast address will not be set, so expected_broadcast for
555 # IPv6 should be always given as None.
556 add_broadcast = True
557 if cidr.version == constants.IP_VERSION_4:
558 add_broadcast = bool(expected_broadcast)
559 device.addr.add(str(cidr), scope, add_broadcast)
560
561 device_ips_info = [
562 (netaddr.IPNetwork(ip_info['cidr']),
563 ip_info['scope'],
564 ip_info['broadcast']) for
565 ip_info in device.addr.list()]
566 self.assertItemsEqual(ip_addresses, device_ips_info)
567
568 def _flush_ips(self, device, ip_version):
569 device.addr.flush(ip_version)
570 for ip_address in device.addr.list():
571 cidr = netaddr.IPNetwork(ip_address['cidr'])
572 self.assertNotEqual(ip_version, cidr.version)
573
574 def test_add_ip_address(self):
575 ip_addresses = [
576 (netaddr.IPNetwork("10.10.10.10/30"), "global", '10.10.10.11'),
577 (netaddr.IPNetwork("11.11.11.11/28"), "link", None),
578 (netaddr.IPNetwork("2801::1/120"), "global", None),
579 (netaddr.IPNetwork("fe80::/64"), "link", None)]
580 attr = self.generate_device_details(ip_cidrs=[])
581 device = self.manage_device(attr)
582 self._add_and_check_ips(device, ip_addresses)
583
584 # Now let's check if adding already existing IP address will raise
585 # RuntimeError
586 ip_address = ip_addresses[0]
587 self.assertRaises(RuntimeError,
588 device.addr.add, str(ip_address[0]), ip_address[1])
589
590 def test_delete_ip_address(self):
591 attr = self.generate_device_details()
592 cidr = attr.ip_cidrs[0]
593 device = self.manage_device(attr)
594
595 device_cidrs = [ip_info['cidr'] for ip_info in device.addr.list()]
596 self.assertIn(cidr, device_cidrs)
597
598 device.addr.delete(cidr)
599 device_cidrs = [ip_info['cidr'] for ip_info in device.addr.list()]
600 self.assertNotIn(cidr, device_cidrs)
601
602 # Try to delete not existing IP address, it should be just fine and
603 # finish without any error raised
604 device.addr.delete(cidr)
605
606 def test_flush_ip_addresses(self):
607 ip_addresses = [
608 (netaddr.IPNetwork("10.10.10.10/30"), "global", '10.10.10.11'),
609 (netaddr.IPNetwork("11.11.11.11/28"), "link", None),
610 (netaddr.IPNetwork("2801::1/120"), "global", None),
611 (netaddr.IPNetwork("fe80::/64"), "link", None)]
612 attr = self.generate_device_details(ip_cidrs=[])
613 device = self.manage_device(attr)
614
615 self._add_and_check_ips(device, ip_addresses)
616 self._flush_ips(device, constants.IP_VERSION_4)
617 self._flush_ips(device, constants.IP_VERSION_6)
618
619
620 class TestSetIpNonlocalBind(functional_base.BaseSudoTestCase):
621 def test_assigned_value(self):
622 namespace = self.useFixture(net_helpers.NamespaceFixture())
623 for expected in (0, 1):
624 failed = ip_lib.set_ip_nonlocal_bind(expected, namespace.name)
625 try:
626 observed = ip_lib.get_ip_nonlocal_bind(namespace.name)
627 except RuntimeError as rte:
628 stat_message = (
629 'cannot stat /proc/sys/net/ipv4/ip_nonlocal_bind')
630 if stat_message in str(rte):
631 raise self.skipException(
632 "This kernel doesn't support %s in network "
633 "namespaces." % ip_lib.IP_NONLOCAL_BIND)
634 raise
635
636 self.assertFalse(failed)
637 self.assertEqual(expected, observed)
638
639
640 class NamespaceTestCase(functional_base.BaseSudoTestCase):
641
642 def setUp(self):
643 super(NamespaceTestCase, self).setUp()
644 self.namespace = 'test_ns_' + uuidutils.generate_uuid()
645 ip_lib.create_network_namespace(self.namespace)
646 self.addCleanup(self._delete_namespace)
647
648 def _delete_namespace(self):
649 ip_lib.delete_network_namespace(self.namespace)
650
651 def test_network_namespace_exists_ns_exists(self):
652 self.assertTrue(ip_lib.network_namespace_exists(self.namespace))
653
654 def test_network_namespace_exists_ns_doesnt_exists(self):
655 self.assertFalse(ip_lib.network_namespace_exists('another_ns'))
656
657 def test_network_namespace_exists_ns_exists_try_is_ready(self):
658 self.assertTrue(ip_lib.network_namespace_exists(self.namespace,
659 try_is_ready=True))
660
661 def test_network_namespace_exists_ns_doesnt_exists_try_is_ready(self):
662 self.assertFalse(ip_lib.network_namespace_exists('another_ns',
663 try_is_ready=True))