"Fossies" - the Fresh Open Source Software Archive 
Member "neutron-14.0.3/neutron/services/trunk/rpc/server.py" (22 Oct 2019, 8554 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "server.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
14.0.2_vs_14.0.3.
1 # Copyright 2016 Hewlett Packard Enterprise Development LP
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14
15 import collections
16
17 from neutron_lib.api.definitions import portbindings
18 from neutron_lib.db import api as db_api
19 from neutron_lib.plugins import directory
20 from neutron_lib import rpc as n_rpc
21 from oslo_log import helpers as log_helpers
22 from oslo_log import log as logging
23 import oslo_messaging
24 from sqlalchemy.orm import exc
25
26 from neutron.api.rpc.callbacks import events
27 from neutron.api.rpc.callbacks.producer import registry
28 from neutron.api.rpc.callbacks import resources
29 from neutron.api.rpc.handlers import resources_rpc
30 from neutron.objects import trunk as trunk_objects
31 from neutron.services.trunk import constants as trunk_consts
32 from neutron.services.trunk import exceptions as trunk_exc
33 from neutron.services.trunk.rpc import constants
34
35 LOG = logging.getLogger(__name__)
36
37 # This module contains stub (client-side) and skeleton (server-side)
38 # proxy code that executes in the Neutron server process space. This
39 # is needed if any of the trunk service plugin drivers has a remote
40 # component (e.g. agent), that needs to communicate with the Neutron
41 # Server.
42
43 # The Server side exposes the following remote methods:
44 #
45 # - lookup method to retrieve trunk details: used by the agent to learn
46 # about the trunk.
47 # - update methods for trunk and its subports: used by the agent to
48 # inform the server about local trunk status changes.
49 #
50 # For agent-side stub and skeleton proxy code, please look at agent.py
51
52
53 def trunk_by_port_provider(resource, port_id, context, **kwargs):
54 """Provider callback to supply trunk information by parent port."""
55 return trunk_objects.Trunk.get_object(context, port_id=port_id)
56
57
58 class TrunkSkeleton(object):
59 """Skeleton proxy code for agent->server communication."""
60
61 # API version history:
62 # 1.0 Initial version
63 target = oslo_messaging.Target(version='1.0',
64 namespace=constants.TRUNK_BASE_NAMESPACE)
65
66 _core_plugin = None
67
68 def __init__(self):
69 # Used to provide trunk lookups for the agent.
70 registry.provide(trunk_by_port_provider, resources.TRUNK)
71 self._connection = n_rpc.Connection()
72 self._connection.create_consumer(
73 constants.TRUNK_BASE_TOPIC, [self], fanout=False)
74 self._connection.consume_in_threads()
75
76 @property
77 def core_plugin(self):
78 if not self._core_plugin:
79 self._core_plugin = directory.get_plugin()
80 return self._core_plugin
81
82 @log_helpers.log_method_call
83 def update_subport_bindings(self, context, subports):
84 """Update subport bindings to match trunk host binding."""
85 el = context.elevated()
86 ports_by_trunk_id = collections.defaultdict(list)
87 updated_ports = collections.defaultdict(list)
88
89 for s in subports:
90 ports_by_trunk_id[s['trunk_id']].append(s['port_id'])
91 for trunk_id, subport_ids in ports_by_trunk_id.items():
92 trunk = trunk_objects.Trunk.get_object(el, id=trunk_id)
93 if not trunk:
94 LOG.debug("Trunk not found. id: %s", trunk_id)
95 continue
96
97 trunk_updated_ports = self._process_trunk_subport_bindings(
98 el,
99 trunk,
100 subport_ids)
101 updated_ports[trunk.id].extend(trunk_updated_ports)
102
103 return updated_ports
104
105 def update_trunk_status(self, context, trunk_id, status):
106 """Update the trunk status to reflect outcome of data plane wiring."""
107 with db_api.autonested_transaction(context.session):
108 trunk = trunk_objects.Trunk.get_object(context, id=trunk_id)
109 if trunk:
110 trunk.update(status=status)
111
112 def _process_trunk_subport_bindings(self, context, trunk, port_ids):
113 """Process port bindings for subports on the given trunk."""
114 updated_ports = []
115 trunk_port_id = trunk.port_id
116 trunk_port = self.core_plugin.get_port(context, trunk_port_id)
117 trunk_host = trunk_port.get(portbindings.HOST_ID)
118
119 for try_cnt in range(db_api.MAX_RETRIES):
120 try:
121 # NOTE(status_police) Set the trunk in BUILD state before
122 # processing subport bindings. The trunk will stay in BUILD
123 # state until an attempt has been made to bind all subports
124 # passed here and the agent acknowledges the operation was
125 # successful.
126 trunk.update(status=trunk_consts.BUILD_STATUS)
127 break
128 except exc.StaleDataError as e:
129 if try_cnt < db_api.MAX_RETRIES - 1:
130 LOG.debug("Got StaleDataError exception: %s", e)
131 continue
132 else:
133 # re-raise when all tries failed
134 raise
135
136 for port_id in port_ids:
137 try:
138 updated_port = self._handle_port_binding(context, port_id,
139 trunk, trunk_host)
140 # NOTE(fitoduarte): consider trimming down the content
141 # of the port data structure.
142 updated_ports.append(updated_port)
143 except trunk_exc.SubPortBindingError as e:
144 LOG.error("Failed to bind subport: %s", e)
145
146 # NOTE(status_police) The subport binding has failed in a
147 # manner in which we cannot proceed and the user must take
148 # action to bring the trunk back to a sane state.
149 trunk.update(status=trunk_consts.ERROR_STATUS)
150 return []
151 except Exception as e:
152 msg = ("Failed to bind subport port %(port)s on trunk "
153 "%(trunk)s: %(exc)s")
154 LOG.error(msg, {'port': port_id, 'trunk': trunk.id, 'exc': e})
155
156 if len(port_ids) != len(updated_ports):
157 trunk.update(status=trunk_consts.DEGRADED_STATUS)
158
159 return updated_ports
160
161 def _handle_port_binding(self, context, port_id, trunk, trunk_host):
162 """Bind the given port to the given host.
163
164 :param context: The context to use for the operation
165 :param port_id: The UUID of the port to be bound
166 :param trunk: The trunk that the given port belongs to
167 :param trunk_host: The host to bind the given port to
168 """
169 port = self.core_plugin.update_port(
170 context, port_id,
171 {'port': {portbindings.HOST_ID: trunk_host,
172 'device_owner': trunk_consts.TRUNK_SUBPORT_OWNER}})
173 vif_type = port.get(portbindings.VIF_TYPE)
174 if vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
175 raise trunk_exc.SubPortBindingError(port_id=port_id,
176 trunk_id=trunk.id)
177 return port
178
179
180 class TrunkStub(object):
181 """Stub proxy code for server->agent communication."""
182
183 def __init__(self):
184 self._resource_rpc = resources_rpc.ResourcesPushRpcApi()
185
186 @log_helpers.log_method_call
187 def trunk_created(self, context, trunk):
188 """Tell the agent about a trunk being created."""
189 self._resource_rpc.push(context, [trunk], events.CREATED)
190
191 @log_helpers.log_method_call
192 def trunk_deleted(self, context, trunk):
193 """Tell the agent about a trunk being deleted."""
194 self._resource_rpc.push(context, [trunk], events.DELETED)
195
196 @log_helpers.log_method_call
197 def subports_added(self, context, subports):
198 """Tell the agent about new subports to add."""
199 self._resource_rpc.push(context, subports, events.CREATED)
200
201 @log_helpers.log_method_call
202 def subports_deleted(self, context, subports):
203 """Tell the agent about existing subports to remove."""
204 self._resource_rpc.push(context, subports, events.DELETED)