"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/tests/functional/agent/l2/base.py" (22 Oct 2019, 18495 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "base.py": 14.0.2_vs_14.0.3.

    1 # Copyright (c) 2015 Red Hat, Inc.
    2 # Copyright (c) 2015 SUSE Linux Products GmbH
    3 # All Rights Reserved.
    4 #
    5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    6 #    not use this file except in compliance with the License. You may obtain
    7 #    a copy of the License at
    8 #
    9 #         http://www.apache.org/licenses/LICENSE-2.0
   10 #
   11 #    Unless required by applicable law or agreed to in writing, software
   12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   14 #    License for the specific language governing permissions and limitations
   15 #    under the License.
   16 
   17 import random
   18 
   19 import eventlet
   20 import mock
   21 from neutron_lib import constants as n_const
   22 from neutron_lib.utils import net
   23 from oslo_config import cfg
   24 from oslo_utils import uuidutils
   25 
   26 from neutron.agent.common import ovs_lib
   27 from neutron.agent.common import polling
   28 from neutron.agent.l2 import l2_agent_extensions_manager as ext_manager
   29 from neutron.agent.linux import interface
   30 from neutron.common import utils
   31 from neutron.conf.agent import common as agent_config
   32 from neutron.conf import common as common_config
   33 from neutron.conf.plugins.ml2.drivers import agent
   34 from neutron.conf.plugins.ml2.drivers import ovs_conf
   35 from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
   36 from neutron.plugins.ml2.drivers.openvswitch.agent.extension_drivers \
   37     import qos_driver as ovs_qos_driver
   38 from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
   39     import br_int
   40 from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
   41     import br_phys
   42 from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
   43     import br_tun
   44 from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
   45     as ovs_agent
   46 from neutron.tests.common import net_helpers
   47 from neutron.tests.functional.agent.linux import base
   48 
   49 
   50 class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
   51 
   52     def setUp(self):
   53         super(OVSAgentTestFramework, self).setUp()
   54         agent_rpc = ('neutron.plugins.ml2.drivers.openvswitch.agent.'
   55                      'ovs_neutron_agent.OVSPluginApi')
   56         mock.patch(agent_rpc).start()
   57         mock.patch('neutron.agent.rpc.PluginReportStateAPI').start()
   58         self.br_int = utils.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
   59                                           prefix='br-int')
   60         self.br_tun = utils.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
   61                                           prefix='br-tun')
   62         self.br_phys = utils.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
   63                                            prefix='br-phys')
   64         patch_name_len = n_const.DEVICE_NAME_MAX_LEN - len("-patch-tun")
   65         self.patch_tun = "%s-patch-tun" % self.br_int[patch_name_len:]
   66         self.patch_int = "%s-patch-int" % self.br_tun[patch_name_len:]
   67         self.ovs = ovs_lib.BaseOVS()
   68         self.config = self._configure_agent()
   69         self.driver = interface.OVSInterfaceDriver(self.config)
   70         self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
   71 
   72     def _get_config_opts(self):
   73         config = cfg.ConfigOpts()
   74         config.register_opts(common_config.core_opts)
   75         agent.register_agent_opts(config)
   76         ovs_conf.register_ovs_agent_opts(config)
   77         agent_config.register_interface_opts(config)
   78         agent_config.register_interface_driver_opts_helper(config)
   79         agent_config.register_agent_state_opts_helper(config)
   80         ext_manager.register_opts(config)
   81         return config
   82 
   83     def _configure_agent(self):
   84         config = self._get_config_opts()
   85         config.set_override(
   86             'interface_driver',
   87             'neutron.agent.linux.interface.OVSInterfaceDriver')
   88         config.set_override('integration_bridge', self.br_int, "OVS")
   89         config.set_override('ovs_integration_bridge', self.br_int)
   90         config.set_override('tunnel_bridge', self.br_tun, "OVS")
   91         config.set_override('int_peer_patch_port', self.patch_tun, "OVS")
   92         config.set_override('tun_peer_patch_port', self.patch_int, "OVS")
   93         config.set_override('host', 'ovs-agent')
   94         return config
   95 
   96     def _bridge_classes(self):
   97         return {
   98             'br_int': br_int.OVSIntegrationBridge,
   99             'br_phys': br_phys.OVSPhysicalBridge,
  100             'br_tun': br_tun.OVSTunnelBridge
  101         }
  102 
  103     def create_agent(self, create_tunnels=True, ancillary_bridge=None,
  104                      local_ip='192.168.10.1'):
  105         if create_tunnels:
  106             tunnel_types = [n_const.TYPE_VXLAN]
  107         else:
  108             tunnel_types = None
  109         bridge_mappings = ['physnet:%s' % self.br_phys]
  110         self.config.set_override('tunnel_types', tunnel_types, "AGENT")
  111         self.config.set_override('polling_interval', 1, "AGENT")
  112         self.config.set_override('local_ip', local_ip, "OVS")
  113         self.config.set_override('bridge_mappings', bridge_mappings, "OVS")
  114         # Physical bridges should be created prior to running
  115         self._bridge_classes()['br_phys'](self.br_phys).create()
  116         ext_mgr = ext_manager.L2AgentExtensionsManager(self.config)
  117         with mock.patch.object(ovs_qos_driver.QosOVSAgentDriver,
  118                                '_minimum_bandwidth_initialize'):
  119             agent = ovs_agent.OVSNeutronAgent(self._bridge_classes(),
  120                                               ext_mgr, self.config)
  121         self.addCleanup(self.ovs.delete_bridge, self.br_int)
  122         if tunnel_types:
  123             self.addCleanup(self.ovs.delete_bridge, self.br_tun)
  124         self.addCleanup(self.ovs.delete_bridge, self.br_phys)
  125         agent.sg_agent = mock.Mock()
  126         agent.ancillary_brs = []
  127         if ancillary_bridge:
  128             agent.ancillary_brs.append(ancillary_bridge)
  129         return agent
  130 
  131     def _mock_get_events(self, agent, polling_manager, ports):
  132         get_events = polling_manager.get_events
  133         p_ids = [p['id'] for p in ports]
  134 
  135         def filter_events():
  136             events = get_events()
  137             filtered_ports = []
  138             for dev in events['added']:
  139                 iface_id = agent.int_br.portid_from_external_ids(
  140                     dev.get('external_ids', []))
  141                 if iface_id in p_ids:
  142                     # if the event is not about a port that was created by
  143                     # this test, we filter the event out. Since these tests are
  144                     # not run in isolation processing all the events might make
  145                     # some test fail ( e.g. the agent might keep resycing
  146                     # because it keeps finding not ready ports that are created
  147                     # by other tests)
  148                     filtered_ports.append(dev)
  149             return {'added': filtered_ports, 'removed': events['removed']}
  150         polling_manager.get_events = mock.Mock(side_effect=filter_events)
  151 
  152     def stop_agent(self, agent, rpc_loop_thread):
  153         agent.run_daemon_loop = False
  154         rpc_loop_thread.wait()
  155 
  156     def start_agent(self, agent, ports=None, unplug_ports=None):
  157         if unplug_ports is None:
  158             unplug_ports = []
  159         if ports is None:
  160             ports = []
  161         self.setup_agent_rpc_mocks(agent, unplug_ports)
  162         polling_manager = polling.InterfacePollingMinimizer()
  163         self._mock_get_events(agent, polling_manager, ports)
  164         self.addCleanup(polling_manager.stop)
  165         polling_manager.start()
  166         utils.wait_until_true(
  167             polling_manager._monitor.is_active)
  168         agent.check_ovs_status = mock.Mock(
  169             return_value=constants.OVS_NORMAL)
  170         self.agent_thread = eventlet.spawn(agent.rpc_loop,
  171                                            polling_manager)
  172 
  173         self.addCleanup(self.stop_agent, agent, self.agent_thread)
  174         return polling_manager
  175 
  176     def _create_test_port_dict(self):
  177         return {'id': uuidutils.generate_uuid(),
  178                 'mac_address': net.get_random_mac(
  179                     'fa:16:3e:00:00:00'.split(':')),
  180                 'fixed_ips': [{
  181                     'ip_address': '10.%d.%d.%d' % (
  182                          random.randint(3, 254),
  183                          random.randint(3, 254),
  184                          random.randint(3, 254))}],
  185                 'vif_name': utils.get_rand_name(
  186                     self.driver.DEV_NAME_LEN, self.driver.DEV_NAME_PREFIX)}
  187 
  188     def _create_test_network_dict(self):
  189         return {'id': uuidutils.generate_uuid(),
  190                 'tenant_id': uuidutils.generate_uuid()}
  191 
  192     def _plug_ports(self, network, ports, agent,
  193                     bridge=None, namespace=None):
  194         if namespace is None:
  195             namespace = self.namespace
  196         for port in ports:
  197             bridge = bridge or agent.int_br
  198             self.driver.plug(
  199                 network.get('id'), port.get('id'), port.get('vif_name'),
  200                 port.get('mac_address'),
  201                 bridge.br_name, namespace=namespace)
  202             ip_cidrs = ["%s/8" % (port.get('fixed_ips')[0][
  203                 'ip_address'])]
  204             self.driver.init_l3(port.get('vif_name'), ip_cidrs,
  205                                 namespace=namespace)
  206 
  207     def _unplug_ports(self, ports, agent):
  208         for port in ports:
  209             self.driver.unplug(
  210                 port.get('vif_name'), agent.int_br.br_name, self.namespace)
  211 
  212     def _get_device_details(self, port, network):
  213         dev = {'device': port['id'],
  214                'port_id': port['id'],
  215                'network_id': network['id'],
  216                'network_type': network.get('network_type', 'vlan'),
  217                'physical_network': network.get('physical_network', 'physnet'),
  218                'segmentation_id': network.get('segmentation_id', 1),
  219                'fixed_ips': port['fixed_ips'],
  220                'device_owner': n_const.DEVICE_OWNER_COMPUTE_PREFIX,
  221                'admin_state_up': True}
  222         return dev
  223 
  224     def assert_bridge(self, br, exists=True):
  225         self.assertEqual(exists, self.ovs.bridge_exists(br))
  226 
  227     def assert_patch_ports(self, agent):
  228 
  229         def get_peer(port):
  230             return agent.int_br.db_get_val(
  231                 'Interface', port, 'options', check_error=True)
  232 
  233         utils.wait_until_true(
  234             lambda: get_peer(self.patch_int) == {'peer': self.patch_tun})
  235         utils.wait_until_true(
  236             lambda: get_peer(self.patch_tun) == {'peer': self.patch_int})
  237 
  238     def assert_bridge_ports(self):
  239         for port in [self.patch_tun, self.patch_int]:
  240             self.assertTrue(self.ovs.port_exists(port))
  241 
  242     def assert_vlan_tags(self, ports, agent):
  243         for port in ports:
  244             res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
  245             self.assertTrue(res)
  246 
  247     def _expected_plugin_rpc_call(self, call, expected_devices, is_up=True):
  248         """Helper to check expected rpc call are received
  249 
  250         :param call: The call to check
  251         :param expected_devices: The device for which call is expected
  252         :param is_up: True if expected_devices are devices that are set up,
  253                False if expected_devices are devices that are set down
  254         """
  255         if is_up:
  256             rpc_devices = [
  257                 dev for args in call.call_args_list for dev in args[0][1]]
  258         else:
  259             rpc_devices = [
  260                 dev for args in call.call_args_list for dev in args[0][2]]
  261         for dev in rpc_devices:
  262             if dev in expected_devices:
  263                 expected_devices.remove(dev)
  264         # reset mock otherwise if the mock is called again the same call param
  265         # will be processed again
  266         call.reset_mock()
  267         return not expected_devices
  268 
  269     def create_test_ports(self, amount=3, **kwargs):
  270         ports = []
  271         for x in range(amount):
  272             ports.append(self._create_test_port_dict(**kwargs))
  273         return ports
  274 
  275     def _mock_update_device(self, context, devices_up, devices_down, agent_id,
  276                             host=None, agent_restarted=False):
  277         dev_up = []
  278         dev_down = []
  279         for port in self.ports:
  280             if devices_up and port['id'] in devices_up:
  281                 dev_up.append(port['id'])
  282             if devices_down and port['id'] in devices_down:
  283                 dev_down.append({'device': port['id'], 'exists': True})
  284         return {'devices_up': dev_up,
  285                 'failed_devices_up': [],
  286                 'devices_down': dev_down,
  287                 'failed_devices_down': []}
  288 
  289     def setup_agent_rpc_mocks(self, agent, unplug_ports):
  290         def mock_device_details(context, devices, agent_id, host=None,
  291                                 agent_restarted=False):
  292             details = []
  293             for port in self.ports:
  294                 if port['id'] in devices:
  295                     dev = self._get_device_details(
  296                         port, self.network)
  297                     details.append(dev)
  298             ports_to_unplug = [x for x in unplug_ports if x['id'] in devices]
  299             if ports_to_unplug:
  300                 self._unplug_ports(ports_to_unplug, self.agent)
  301             return {'devices': details, 'failed_devices': []}
  302 
  303         (agent.plugin_rpc.get_devices_details_list_and_failed_devices.
  304             side_effect) = mock_device_details
  305         agent.plugin_rpc.update_device_list.side_effect = (
  306             self._mock_update_device)
  307 
  308     def _prepare_resync_trigger(self, agent):
  309         def mock_device_raise_exception(context, devices_up, devices_down,
  310                                         agent_id, host=None):
  311             agent.plugin_rpc.update_device_list.side_effect = (
  312                 self._mock_update_device)
  313             raise Exception('Exception to trigger resync')
  314 
  315         self.agent.plugin_rpc.update_device_list.side_effect = (
  316             mock_device_raise_exception)
  317 
  318     def _prepare_failed_dev_up_trigger(self, agent):
  319 
  320         def mock_failed_devices_up(context, devices_up, devices_down,
  321                                    agent_id, host=None,
  322                                    agent_restarted=False):
  323             failed_devices = []
  324             devices = list(devices_up)
  325             # first port fails
  326             if self.ports[0]['id'] in devices_up:
  327                 # reassign side_effect so that next RPC call will succeed
  328                 agent.plugin_rpc.update_device_list.side_effect = (
  329                     self._mock_update_device)
  330                 devices.remove(self.ports[0]['id'])
  331                 failed_devices.append(self.ports[0]['id'])
  332             return {'devices_up': devices,
  333                     'failed_devices_up': failed_devices,
  334                     'devices_down': [],
  335                     'failed_devices_down': []}
  336 
  337         self.agent.plugin_rpc.update_device_list.side_effect = (
  338             mock_failed_devices_up)
  339 
  340     def _prepare_failed_dev_down_trigger(self, agent):
  341 
  342         def mock_failed_devices_down(context, devices_up, devices_down,
  343                                      agent_id, host=None,
  344                                      agent_restarted=False):
  345             # first port fails
  346             failed_port_id = self.ports[0]['id']
  347             failed_devices_down = []
  348             dev_down = [
  349                 {'device': p['id'], 'exists': True}
  350                 for p in self.ports if p['id'] in devices_down and (
  351                     p['id'] != failed_port_id)]
  352             # check if it's the call to set devices down and if the device
  353             # that is supposed to fail is in the call then modify the
  354             # side_effect so that next RPC call will succeed.
  355             if devices_down and failed_port_id in devices_down:
  356                 agent.plugin_rpc.update_device_list.side_effect = (
  357                      self._mock_update_device)
  358                 failed_devices_down.append(failed_port_id)
  359             return {'devices_up': devices_up,
  360                     'failed_devices_up': [],
  361                     'devices_down': dev_down,
  362                     'failed_devices_down': failed_devices_down}
  363 
  364         self.agent.plugin_rpc.update_device_list.side_effect = (
  365             mock_failed_devices_down)
  366 
  367     def wait_until_ports_state(self, ports, up, timeout=60):
  368         port_ids = [p['id'] for p in ports]
  369         utils.wait_until_true(
  370             lambda: self._expected_plugin_rpc_call(
  371                 self.agent.plugin_rpc.update_device_list, port_ids, up),
  372             timeout=timeout)
  373 
  374     def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
  375                               ancillary_bridge=None,
  376                               trigger_resync=False,
  377                               failed_dev_up=False,
  378                               failed_dev_down=False,
  379                               network=None):
  380         self.ports = port_dicts
  381         self.agent = self.create_agent(create_tunnels=create_tunnels,
  382                                        ancillary_bridge=ancillary_bridge)
  383         self.agent.iter_num += 1
  384         self.polling_manager = self.start_agent(self.agent, ports=self.ports)
  385         self.network = network or self._create_test_network_dict()
  386         if trigger_resync:
  387             self._prepare_resync_trigger(self.agent)
  388         elif failed_dev_up:
  389             self._prepare_failed_dev_up_trigger(self.agent)
  390         elif failed_dev_down:
  391             self._prepare_failed_dev_down_trigger(self.agent)
  392 
  393         self._plug_ports(self.network, self.ports, self.agent,
  394                          bridge=ancillary_bridge)
  395 
  396     def plug_ports_to_phys_br(self, network, ports, namespace=None):
  397         physical_network = network.get('physical_network', 'physnet')
  398         phys_segmentation_id = network.get('segmentation_id', None)
  399         network_type = network.get('network_type', 'flat')
  400 
  401         phys_br = self.agent.phys_brs[physical_network]
  402 
  403         self._plug_ports(network, ports, self.agent, bridge=phys_br,
  404                          namespace=namespace)
  405 
  406         if network_type == 'flat':
  407             # NOTE(slaweq): for OVS implementations remove the DEAD VLAN tag
  408             # on ports that belongs to flat network. DEAD VLAN tag is added
  409             # to each newly created port. This is related to lp#1767422
  410             for port in ports:
  411                 phys_br.clear_db_attribute("Port", port['vif_name'], "tag")
  412         elif phys_segmentation_id and network_type == 'vlan':
  413             for port in ports:
  414                 phys_br.set_db_attribute(
  415                     "Port", port['vif_name'], "tag", phys_segmentation_id)