"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/services/trunk/rpc/server.py" (22 Oct 2019, 8554 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "server.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 14.0.2_vs_14.0.3.

    1 # Copyright 2016 Hewlett Packard Enterprise Development LP
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #     http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import collections
   16 
   17 from neutron_lib.api.definitions import portbindings
   18 from neutron_lib.db import api as db_api
   19 from neutron_lib.plugins import directory
   20 from neutron_lib import rpc as n_rpc
   21 from oslo_log import helpers as log_helpers
   22 from oslo_log import log as logging
   23 import oslo_messaging
   24 from sqlalchemy.orm import exc
   25 
   26 from neutron.api.rpc.callbacks import events
   27 from neutron.api.rpc.callbacks.producer import registry
   28 from neutron.api.rpc.callbacks import resources
   29 from neutron.api.rpc.handlers import resources_rpc
   30 from neutron.objects import trunk as trunk_objects
   31 from neutron.services.trunk import constants as trunk_consts
   32 from neutron.services.trunk import exceptions as trunk_exc
   33 from neutron.services.trunk.rpc import constants
   34 
   35 LOG = logging.getLogger(__name__)
   36 
   37 # This module contains stub (client-side) and skeleton (server-side)
   38 # proxy code that executes in the Neutron server process space. This
   39 # is needed if any of the trunk service plugin drivers has a remote
   40 # component (e.g. agent), that needs to communicate with the Neutron
   41 # Server.
   42 
   43 # The Server side exposes the following remote methods:
   44 #
   45 # - lookup method to retrieve trunk details: used by the agent to learn
   46 #   about the trunk.
   47 # - update methods for trunk and its subports: used by the agent to
   48 #   inform the server about local trunk status changes.
   49 #
   50 # For agent-side stub and skeleton proxy code, please look at agent.py
   51 
   52 
   53 def trunk_by_port_provider(resource, port_id, context, **kwargs):
   54     """Provider callback to supply trunk information by parent port."""
   55     return trunk_objects.Trunk.get_object(context, port_id=port_id)
   56 
   57 
   58 class TrunkSkeleton(object):
   59     """Skeleton proxy code for agent->server communication."""
   60 
   61     # API version history:
   62     # 1.0 Initial version
   63     target = oslo_messaging.Target(version='1.0',
   64                                    namespace=constants.TRUNK_BASE_NAMESPACE)
   65 
   66     _core_plugin = None
   67 
   68     def __init__(self):
   69         # Used to provide trunk lookups for the agent.
   70         registry.provide(trunk_by_port_provider, resources.TRUNK)
   71         self._connection = n_rpc.Connection()
   72         self._connection.create_consumer(
   73             constants.TRUNK_BASE_TOPIC, [self], fanout=False)
   74         self._connection.consume_in_threads()
   75 
   76     @property
   77     def core_plugin(self):
   78         if not self._core_plugin:
   79             self._core_plugin = directory.get_plugin()
   80         return self._core_plugin
   81 
   82     @log_helpers.log_method_call
   83     def update_subport_bindings(self, context, subports):
   84         """Update subport bindings to match trunk host binding."""
   85         el = context.elevated()
   86         ports_by_trunk_id = collections.defaultdict(list)
   87         updated_ports = collections.defaultdict(list)
   88 
   89         for s in subports:
   90             ports_by_trunk_id[s['trunk_id']].append(s['port_id'])
   91         for trunk_id, subport_ids in ports_by_trunk_id.items():
   92             trunk = trunk_objects.Trunk.get_object(el, id=trunk_id)
   93             if not trunk:
   94                 LOG.debug("Trunk not found. id: %s", trunk_id)
   95                 continue
   96 
   97             trunk_updated_ports = self._process_trunk_subport_bindings(
   98                                                                   el,
   99                                                                   trunk,
  100                                                                   subport_ids)
  101             updated_ports[trunk.id].extend(trunk_updated_ports)
  102 
  103         return updated_ports
  104 
  105     def update_trunk_status(self, context, trunk_id, status):
  106         """Update the trunk status to reflect outcome of data plane wiring."""
  107         with db_api.autonested_transaction(context.session):
  108             trunk = trunk_objects.Trunk.get_object(context, id=trunk_id)
  109             if trunk:
  110                 trunk.update(status=status)
  111 
  112     def _process_trunk_subport_bindings(self, context, trunk, port_ids):
  113         """Process port bindings for subports on the given trunk."""
  114         updated_ports = []
  115         trunk_port_id = trunk.port_id
  116         trunk_port = self.core_plugin.get_port(context, trunk_port_id)
  117         trunk_host = trunk_port.get(portbindings.HOST_ID)
  118 
  119         for try_cnt in range(db_api.MAX_RETRIES):
  120             try:
  121                 # NOTE(status_police) Set the trunk in BUILD state before
  122                 # processing subport bindings. The trunk will stay in BUILD
  123                 # state until an attempt has been made to bind all subports
  124                 # passed here and the agent acknowledges the operation was
  125                 # successful.
  126                 trunk.update(status=trunk_consts.BUILD_STATUS)
  127                 break
  128             except exc.StaleDataError as e:
  129                 if try_cnt < db_api.MAX_RETRIES - 1:
  130                     LOG.debug("Got StaleDataError exception: %s", e)
  131                     continue
  132                 else:
  133                     # re-raise when all tries failed
  134                     raise
  135 
  136         for port_id in port_ids:
  137             try:
  138                 updated_port = self._handle_port_binding(context, port_id,
  139                                                          trunk, trunk_host)
  140                 # NOTE(fitoduarte): consider trimming down the content
  141                 # of the port data structure.
  142                 updated_ports.append(updated_port)
  143             except trunk_exc.SubPortBindingError as e:
  144                 LOG.error("Failed to bind subport: %s", e)
  145 
  146                 # NOTE(status_police) The subport binding has failed in a
  147                 # manner in which we cannot proceed and the user must take
  148                 # action to bring the trunk back to a sane state.
  149                 trunk.update(status=trunk_consts.ERROR_STATUS)
  150                 return []
  151             except Exception as e:
  152                 msg = ("Failed to bind subport port %(port)s on trunk "
  153                        "%(trunk)s: %(exc)s")
  154                 LOG.error(msg, {'port': port_id, 'trunk': trunk.id, 'exc': e})
  155 
  156         if len(port_ids) != len(updated_ports):
  157             trunk.update(status=trunk_consts.DEGRADED_STATUS)
  158 
  159         return updated_ports
  160 
  161     def _handle_port_binding(self, context, port_id, trunk, trunk_host):
  162         """Bind the given port to the given host.
  163 
  164            :param context: The context to use for the operation
  165            :param port_id: The UUID of the port to be bound
  166            :param trunk: The trunk that the given port belongs to
  167            :param trunk_host: The host to bind the given port to
  168         """
  169         port = self.core_plugin.update_port(
  170             context, port_id,
  171             {'port': {portbindings.HOST_ID: trunk_host,
  172                       'device_owner': trunk_consts.TRUNK_SUBPORT_OWNER}})
  173         vif_type = port.get(portbindings.VIF_TYPE)
  174         if vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
  175             raise trunk_exc.SubPortBindingError(port_id=port_id,
  176                                                 trunk_id=trunk.id)
  177         return port
  178 
  179 
  180 class TrunkStub(object):
  181     """Stub proxy code for server->agent communication."""
  182 
  183     def __init__(self):
  184         self._resource_rpc = resources_rpc.ResourcesPushRpcApi()
  185 
  186     @log_helpers.log_method_call
  187     def trunk_created(self, context, trunk):
  188         """Tell the agent about a trunk being created."""
  189         self._resource_rpc.push(context, [trunk], events.CREATED)
  190 
  191     @log_helpers.log_method_call
  192     def trunk_deleted(self, context, trunk):
  193         """Tell the agent about a trunk being deleted."""
  194         self._resource_rpc.push(context, [trunk], events.DELETED)
  195 
  196     @log_helpers.log_method_call
  197     def subports_added(self, context, subports):
  198         """Tell the agent about new subports to add."""
  199         self._resource_rpc.push(context, subports, events.CREATED)
  200 
  201     @log_helpers.log_method_call
  202     def subports_deleted(self, context, subports):
  203         """Tell the agent about existing subports to remove."""
  204         self._resource_rpc.push(context, subports, events.DELETED)