"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/agent/rpc.py" (22 Oct 2019, 16592 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "rpc.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 14.0.2_vs_14.0.3.

    1 # Copyright (c) 2012 OpenStack Foundation.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 from datetime import datetime
   17 import itertools
   18 
   19 import netaddr
   20 from neutron_lib.agent import topics
   21 from neutron_lib.api.definitions import portbindings_extended as pb_ext
   22 from neutron_lib.callbacks import events as callback_events
   23 from neutron_lib.callbacks import registry
   24 from neutron_lib.callbacks import resources as callback_resources
   25 from neutron_lib import constants
   26 from neutron_lib.plugins import utils
   27 from neutron_lib import rpc as lib_rpc
   28 from oslo_log import log as logging
   29 import oslo_messaging
   30 from oslo_utils import uuidutils
   31 
   32 from neutron.agent import resource_cache
   33 from neutron.api.rpc.callbacks import resources
   34 from neutron.common import constants as n_const
   35 from neutron import objects
   36 
   37 LOG = logging.getLogger(__name__)
   38 BINDING_DEACTIVATE = 'binding_deactivate'
   39 
   40 
   41 def create_consumers(endpoints, prefix, topic_details, start_listening=True):
   42     """Create agent RPC consumers.
   43 
   44     :param endpoints: The list of endpoints to process the incoming messages.
   45     :param prefix: Common prefix for the plugin/agent message queues.
   46     :param topic_details: A list of topics. Each topic has a name, an
   47                           operation, and an optional host param keying the
   48                           subscription to topic.host for plugin calls.
   49     :param start_listening: if True, it starts the processing loop
   50 
   51     :returns: A common Connection.
   52     """
   53 
   54     connection = lib_rpc.Connection()
   55     for details in topic_details:
   56         topic, operation, node_name = itertools.islice(
   57             itertools.chain(details, [None]), 3)
   58 
   59         topic_name = topics.get_topic_name(prefix, topic, operation)
   60         connection.create_consumer(topic_name, endpoints, fanout=True)
   61         if node_name:
   62             node_topic_name = '%s.%s' % (topic_name, node_name)
   63             connection.create_consumer(node_topic_name,
   64                                        endpoints,
   65                                        fanout=False)
   66     if start_listening:
   67         connection.consume_in_threads()
   68     return connection
   69 
   70 
   71 class PluginReportStateAPI(object):
   72     """RPC client used to report state back to plugin.
   73 
   74     This class implements the client side of an rpc interface.  The server side
   75     can be found in neutron.db.agents_db.AgentExtRpcCallback.  For more
   76     information on changing rpc interfaces, see
   77     doc/source/contributor/internals/rpc_api.rst.
   78     """
   79     def __init__(self, topic):
   80         target = oslo_messaging.Target(topic=topic, version='1.2',
   81                                        namespace=n_const.RPC_NAMESPACE_STATE)
   82         self.client = lib_rpc.get_client(target)
   83 
   84     def has_alive_neutron_server(self, context, **kwargs):
   85         cctxt = self.client.prepare()
   86         return cctxt.call(context, 'has_alive_neutron_server', **kwargs)
   87 
   88     def report_state(self, context, agent_state, use_call=False):
   89         cctxt = self.client.prepare(
   90             timeout=lib_rpc.TRANSPORT.conf.rpc_response_timeout)
   91         # add unique identifier to a report
   92         # that can be logged on server side.
   93         # This create visible correspondence between events on
   94         # the agent and on the server
   95         agent_state['uuid'] = uuidutils.generate_uuid()
   96         kwargs = {
   97             'agent_state': {'agent_state': agent_state},
   98             'time': datetime.utcnow().strftime(constants.ISO8601_TIME_FORMAT),
   99         }
  100         method = cctxt.call if use_call else cctxt.cast
  101         return method(context, 'report_state', **kwargs)
  102 
  103 
  104 class PluginApi(object):
  105     '''Agent side of the rpc API.
  106 
  107     API version history:
  108         1.0 - Initial version.
  109         1.3 - get_device_details rpc signature upgrade to obtain 'host' and
  110               return value to include fixed_ips and device_owner for
  111               the device port
  112         1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
  113         1.5 - Support update_device_list and
  114               get_devices_details_list_and_failed_devices
  115     '''
  116 
  117     def __init__(self, topic):
  118         target = oslo_messaging.Target(topic=topic, version='1.0')
  119         self.client = lib_rpc.get_client(target)
  120 
  121     def get_device_details(self, context, device, agent_id, host=None):
  122         cctxt = self.client.prepare()
  123         return cctxt.call(context, 'get_device_details', device=device,
  124                           agent_id=agent_id, host=host)
  125 
  126     def get_devices_details_list(self, context, devices, agent_id, host=None):
  127         cctxt = self.client.prepare(version='1.3')
  128         return cctxt.call(context, 'get_devices_details_list',
  129                           devices=devices, agent_id=agent_id, host=host)
  130 
  131     def get_devices_details_list_and_failed_devices(self, context, devices,
  132                                                     agent_id, host=None,
  133                                                     **kwargs):
  134         """Get devices details and the list of devices that failed.
  135 
  136         This method returns the devices details. If an error is thrown when
  137         retrieving the devices details, the device is put in a list of
  138         failed devices.
  139         """
  140         cctxt = self.client.prepare(version='1.5')
  141         return cctxt.call(
  142             context,
  143             'get_devices_details_list_and_failed_devices',
  144             devices=devices, agent_id=agent_id, host=host)
  145 
  146     def update_device_down(self, context, device, agent_id, host=None):
  147         cctxt = self.client.prepare()
  148         return cctxt.call(context, 'update_device_down', device=device,
  149                           agent_id=agent_id, host=host)
  150 
  151     def update_device_up(self, context, device, agent_id, host=None):
  152         cctxt = self.client.prepare()
  153         return cctxt.call(context, 'update_device_up', device=device,
  154                           agent_id=agent_id, host=host)
  155 
  156     def update_device_list(self, context, devices_up, devices_down,
  157                            agent_id, host, agent_restarted=False):
  158         cctxt = self.client.prepare(version='1.5')
  159 
  160         ret_devices_up = []
  161         failed_devices_up = []
  162         ret_devices_down = []
  163         failed_devices_down = []
  164 
  165         step = n_const.RPC_RES_PROCESSING_STEP
  166         devices_up = list(devices_up)
  167         devices_down = list(devices_down)
  168         for i in range(0, max(len(devices_up), len(devices_down)), step):
  169             # Divide-and-conquer RPC timeout
  170             ret = cctxt.call(context, 'update_device_list',
  171                              devices_up=devices_up[i:i + step],
  172                              devices_down=devices_down[i:i + step],
  173                              agent_id=agent_id, host=host,
  174                              agent_restarted=agent_restarted)
  175             ret_devices_up.extend(ret.get("devices_up", []))
  176             failed_devices_up.extend(ret.get("failed_devices_up", []))
  177             ret_devices_down.extend(ret.get("devices_down", []))
  178             failed_devices_down.extend(ret.get("failed_devices_down", []))
  179 
  180         return {'devices_up': ret_devices_up,
  181                 'failed_devices_up': failed_devices_up,
  182                 'devices_down': ret_devices_down,
  183                 'failed_devices_down': failed_devices_down}
  184 
  185     def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
  186         cctxt = self.client.prepare(version='1.4')
  187         return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
  188                           tunnel_type=tunnel_type, host=host)
  189 
  190 
  191 def create_cache_for_l2_agent():
  192     """Create a push-notifications cache for L2 agent related resources."""
  193 
  194     objects.register_objects()
  195     resource_types = [
  196         resources.PORT,
  197         resources.SECURITYGROUP,
  198         resources.SECURITYGROUPRULE,
  199         resources.NETWORK,
  200         resources.SUBNET
  201     ]
  202     rcache = resource_cache.RemoteResourceCache(resource_types)
  203     rcache.start_watcher()
  204     return rcache
  205 
  206 
  207 class CacheBackedPluginApi(PluginApi):
  208 
  209     def __init__(self, *args, **kwargs):
  210         super(CacheBackedPluginApi, self).__init__(*args, **kwargs)
  211         self.remote_resource_cache = create_cache_for_l2_agent()
  212 
  213     def register_legacy_notification_callbacks(self, legacy_interface):
  214         """Emulates the server-side notifications from ml2 AgentNotifierApi.
  215 
  216         legacy_interface is an object with 'delete'/'update' methods for
  217         core resources.
  218         """
  219         self._legacy_interface = legacy_interface
  220         for e in (callback_events.AFTER_UPDATE, callback_events.AFTER_DELETE):
  221             for r in (resources.PORT, resources.NETWORK):
  222                 registry.subscribe(self._legacy_notifier, r, e)
  223 
  224     def _legacy_notifier(self, rtype, event, trigger, context, resource_id,
  225                          **kwargs):
  226         """Checks if legacy interface is expecting calls for resource.
  227 
  228         looks for port_update, network_delete, etc and calls them with
  229         the payloads the handlers are expecting (an ID).
  230         """
  231         rtype = rtype.lower()  # all legacy handlers don't camelcase
  232         agent_restarted = kwargs.pop("agent_restarted", None)
  233         method, host_with_activation, host_with_deactivation = (
  234             self._get_method_host(rtype, event, **kwargs))
  235         if not hasattr(self._legacy_interface, method):
  236             # TODO(kevinbenton): once these notifications are stable, emit
  237             # a deprecation warning for legacy handlers
  238             return
  239         # If there is a binding deactivation, we must also notify the
  240         # corresponding activation
  241         if method == BINDING_DEACTIVATE:
  242             self._legacy_interface.binding_deactivate(
  243                 context, port_id=resource_id, host=host_with_deactivation)
  244             self._legacy_interface.binding_activate(
  245                 context, port_id=resource_id, host=host_with_activation)
  246         else:
  247             payload = {rtype: {'id': resource_id},
  248                        '%s_id' % rtype: resource_id}
  249             if method == "port_update" and agent_restarted is not None:
  250                 # Mark ovs-agent restart for local port_update
  251                 payload["agent_restarted"] = agent_restarted
  252             getattr(self._legacy_interface, method)(context, **payload)
  253 
  254     def _get_method_host(self, rtype, event, **kwargs):
  255         """Constructs the name of method to be called in the legacy interface.
  256 
  257         If the event received is a port update that contains a binding
  258         activation where a previous binding is deactivated, the method name
  259         is 'binding_deactivate' and the host where the binding has to be
  260         deactivated is returned. Otherwise, the method name is constructed from
  261         rtype and the event received and the host is None.
  262         """
  263         is_delete = event == callback_events.AFTER_DELETE
  264         suffix = 'delete' if is_delete else 'update'
  265         method = "%s_%s" % (rtype, suffix)
  266         host_with_activation = None
  267         host_with_deactivation = None
  268         if is_delete or rtype != callback_resources.PORT:
  269             return method, host_with_activation, host_with_deactivation
  270 
  271         # A port update was received. Find out if it is a binding activation
  272         # where a previous binding was deactivated
  273         BINDINGS = pb_ext.COLLECTION_NAME
  274         if BINDINGS in kwargs.get('changed_fields', set()):
  275             existing_active_binding = (
  276                 utils.get_port_binding_by_status_and_host(
  277                     getattr(kwargs['existing'], 'bindings', []),
  278                     constants.ACTIVE))
  279             updated_active_binding = (
  280                 utils.get_port_binding_by_status_and_host(
  281                     getattr(kwargs['updated'], 'bindings', []),
  282                     constants.ACTIVE))
  283             if (existing_active_binding and updated_active_binding and
  284                     existing_active_binding.host !=
  285                     updated_active_binding.host):
  286                 if (utils.get_port_binding_by_status_and_host(
  287                         getattr(kwargs['updated'], 'bindings', []),
  288                         constants.INACTIVE,
  289                         host=existing_active_binding.host)):
  290                     method = BINDING_DEACTIVATE
  291                     host_with_activation = updated_active_binding.host
  292                     host_with_deactivation = existing_active_binding.host
  293         return method, host_with_activation, host_with_deactivation
  294 
  295     def get_devices_details_list_and_failed_devices(self, context, devices,
  296                                                     agent_id, host=None,
  297                                                     agent_restarted=False):
  298         result = {'devices': [], 'failed_devices': []}
  299         for device in devices:
  300             try:
  301                 result['devices'].append(
  302                     self.get_device_details(context, device, agent_id, host,
  303                                             agent_restarted))
  304             except Exception:
  305                 LOG.exception("Failed to get details for device %s", device)
  306                 result['failed_devices'].append(device)
  307         return result
  308 
  309     def get_device_details(self, context, device, agent_id, host=None,
  310                            agent_restarted=False):
  311         port_obj = self.remote_resource_cache.get_resource_by_id(
  312             resources.PORT, device, agent_restarted)
  313         if not port_obj:
  314             LOG.debug("Device %s does not exist in cache.", device)
  315             return {'device': device}
  316         if not port_obj.binding_levels:
  317             LOG.warning("Device %s is not bound.", port_obj)
  318             return {'device': device}
  319         segment = port_obj.binding_levels[-1].segment
  320         if not segment:
  321             LOG.debug("Device %s is not bound to any segment.", port_obj)
  322             return {'device': device}
  323         binding = utils.get_port_binding_by_status_and_host(
  324             port_obj.bindings, constants.ACTIVE, raise_if_not_found=True,
  325             port_id=port_obj.id)
  326         if (port_obj.device_owner.startswith(
  327                 constants.DEVICE_OWNER_COMPUTE_PREFIX) and
  328                 binding[pb_ext.HOST] != host):
  329             LOG.debug("Device %s has no active binding in this host",
  330                       port_obj)
  331             return {'device': device,
  332                     n_const.NO_ACTIVE_BINDING: True}
  333         net = self.remote_resource_cache.get_resource_by_id(
  334             resources.NETWORK, port_obj.network_id)
  335         net_qos_policy_id = net.qos_policy_id
  336         # match format of old RPC interface
  337         mac_addr = str(netaddr.EUI(str(port_obj.mac_address),
  338                                    dialect=netaddr.mac_unix_expanded))
  339         entry = {
  340             'device': device,
  341             'network_id': port_obj.network_id,
  342             'port_id': port_obj.id,
  343             'mac_address': mac_addr,
  344             'admin_state_up': port_obj.admin_state_up,
  345             'network_type': segment.network_type,
  346             'segmentation_id': segment.segmentation_id,
  347             'physical_network': segment.physical_network,
  348             'fixed_ips': [{'subnet_id': o.subnet_id,
  349                            'ip_address': str(o.ip_address)}
  350                           for o in port_obj.fixed_ips],
  351             'device_owner': port_obj.device_owner,
  352             'allowed_address_pairs': [{'mac_address': o.mac_address,
  353                                        'ip_address': o.ip_address}
  354                                       for o in port_obj.allowed_address_pairs],
  355             'port_security_enabled': getattr(port_obj.security,
  356                                              'port_security_enabled', True),
  357             'qos_policy_id': port_obj.qos_policy_id,
  358             'network_qos_policy_id': net_qos_policy_id,
  359             'profile': binding.profile,
  360             'security_groups': list(port_obj.security_group_ids)
  361         }
  362         LOG.debug("Returning: %s", entry)
  363         return entry
  364 
  365     def get_devices_details_list(self, context, devices, agent_id, host=None):
  366         return [self.get_device_details(context, device, agent_id, host)
  367                 for device in devices]