"Fossies" - the Fresh Open Source Software Archive

Member "neutron-14.0.3/neutron/agent/resource_cache.py" (22 Oct 2019, 11574 Bytes) of package /linux/misc/openstack/neutron-14.0.3.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "resource_cache.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 14.0.2_vs_14.0.3.

    1 # All Rights Reserved.
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 
   15 from neutron_lib.callbacks import events
   16 from neutron_lib.callbacks import registry
   17 from neutron_lib import context as n_ctx
   18 from neutron_lib import rpc as n_rpc
   19 from oslo_log import log as logging
   20 
   21 from neutron._i18n import _
   22 from neutron.api.rpc.callbacks.consumer import registry as registry_rpc
   23 from neutron.api.rpc.callbacks import events as events_rpc
   24 from neutron.api.rpc.handlers import resources_rpc
   25 from neutron import objects
   26 
   27 LOG = logging.getLogger(__name__)
   28 objects.register_objects()
   29 
   30 
   31 class RemoteResourceCache(object):
   32     """Retrieves and stashes logical resources in their OVO format.
   33 
   34     This is currently only compatible with OVO objects that have an ID.
   35     """
   36     def __init__(self, resource_types):
   37         self.resource_types = resource_types
   38         self._cache_by_type_and_id = {rt: {} for rt in self.resource_types}
   39         self._deleted_ids_by_type = {rt: set() for rt in self.resource_types}
   40         # track everything we've asked the server so we don't ask again
   41         self._satisfied_server_queries = set()
   42         self._puller = resources_rpc.ResourcesPullRpcApi()
   43 
   44     def _type_cache(self, rtype):
   45         if rtype not in self.resource_types:
   46             raise RuntimeError(_("Resource cache not tracking %s") % rtype)
   47         return self._cache_by_type_and_id[rtype]
   48 
   49     def start_watcher(self):
   50         self._watcher = RemoteResourceWatcher(self)
   51 
   52     def get_resource_by_id(self, rtype, obj_id, agent_restarted=False):
   53         """Returns None if it doesn't exist."""
   54         if obj_id in self._deleted_ids_by_type[rtype]:
   55             return None
   56         cached_item = self._type_cache(rtype).get(obj_id)
   57         if cached_item:
   58             return cached_item
   59         # try server in case object existed before agent start
   60         self._flood_cache_for_query(rtype, id=(obj_id, ),
   61                                     agent_restarted=agent_restarted)
   62         return self._type_cache(rtype).get(obj_id)
   63 
   64     def _flood_cache_for_query(self, rtype, agent_restarted=False,
   65                                **filter_kwargs):
   66         """Load info from server for first query.
   67 
   68         Queries the server if this is the first time a given query for
   69         rtype has been issued.
   70         """
   71         query_ids = self._get_query_ids(rtype, filter_kwargs)
   72         if query_ids.issubset(self._satisfied_server_queries):
   73             # we've already asked the server this question so we don't
   74             # ask directly again because any updates will have been
   75             # pushed to us
   76             return
   77         context = n_ctx.get_admin_context()
   78         resources = self._puller.bulk_pull(context, rtype,
   79                                            filter_kwargs=filter_kwargs)
   80         for resource in resources:
   81             if self._is_stale(rtype, resource):
   82                 # if the server was slow enough to respond the object may have
   83                 # been updated already and pushed to us in another thread.
   84                 LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
   85                 continue
   86             self.record_resource_update(context, rtype, resource,
   87                                         agent_restarted=agent_restarted)
   88         LOG.debug("%s resources returned for queries %s", len(resources),
   89                   query_ids)
   90         self._satisfied_server_queries.update(query_ids)
   91 
   92     def _get_query_ids(self, rtype, filters):
   93         """Turns filters for a given rypte into a set of query IDs.
   94 
   95         This can result in multiple queries due to the nature of the query
   96         processing on the server side. Since multiple values are treated as
   97         an OR condition, a query for {'id': ('1', '2')} is equivalent
   98         to a query for {'id': ('1',)} and {'id': ('2')}. This method splits
   99         the former into the latter to ensure we aren't asking the server
  100         something we already know.
  101         """
  102         query_ids = set()
  103         for k, values in tuple(sorted(filters.items())):
  104             if len(values) > 1:
  105                 for v in values:
  106                     new_filters = filters.copy()
  107                     new_filters[k] = (v, )
  108                     query_ids.update(self._get_query_ids(rtype, new_filters))
  109                 break
  110         else:
  111             # no multiple value filters left so add an ID
  112             query_ids.add((rtype, ) + tuple(sorted(filters.items())))
  113         return query_ids
  114 
  115     def get_resources(self, rtype, filters):
  116         """Find resources that match key:values in filters dict.
  117 
  118         If the attribute on the object is a list, each value is checked if it
  119         is in the list.
  120 
  121         The values in the dicionary for a single key are matched in an OR
  122         fashion.
  123         """
  124         self._flood_cache_for_query(rtype, **filters)
  125 
  126         def match(obj):
  127             for key, values in filters.items():
  128                 for value in values:
  129                     attr = getattr(obj, key)
  130                     if isinstance(attr, (list, tuple, set)):
  131                         # attribute is a list so we check if value is in
  132                         # list
  133                         if value in attr:
  134                             break
  135                     elif value == attr:
  136                         break
  137                 else:
  138                     # no match found for this key
  139                     return False
  140             return True
  141         return self.match_resources_with_func(rtype, match)
  142 
  143     def match_resources_with_func(self, rtype, matcher):
  144         """Returns a list of all resources satisfying func matcher."""
  145         # TODO(kevinbenton): this is O(N), offer better lookup functions
  146         return [r for r in self._type_cache(rtype).values()
  147                 if matcher(r)]
  148 
  149     def _is_stale(self, rtype, resource):
  150         """Determines if a given resource update is safe to ignore.
  151 
  152         It can be safe to ignore if it has already been deleted or if
  153         we have a copy with a higher revision number.
  154         """
  155         if resource.id in self._deleted_ids_by_type[rtype]:
  156             return True
  157         existing = self._type_cache(rtype).get(resource.id)
  158         if existing and existing.revision_number > resource.revision_number:
  159             # NOTE(kevinbenton): we could be strict and check for >=, but this
  160             # makes us more tolerant of bugs on the server where we forget to
  161             # bump the revision_number.
  162             return True
  163         return False
  164 
  165     def record_resource_update(self, context, rtype, resource,
  166                                agent_restarted=False):
  167         """Takes in an OVO and generates an event on relevant changes.
  168 
  169         A change is deemed to be relevant if it is not stale and if any
  170         fields changed beyond the revision number and update time.
  171 
  172         Both creates and updates are handled in this function.
  173         """
  174         if self._is_stale(rtype, resource):
  175             LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
  176             return
  177         existing = self._type_cache(rtype).get(resource.id)
  178         self._type_cache(rtype)[resource.id] = resource
  179         changed_fields = self._get_changed_fields(existing, resource)
  180         if not changed_fields:
  181             LOG.debug("Received resource %s update without any changes: %s",
  182                       rtype, resource.id)
  183             return
  184         if existing:
  185             LOG.debug("Resource %s %s updated (revision_number %s->%s). "
  186                       "Old fields: %s New fields: %s",
  187                       rtype, existing.id, existing.revision_number,
  188                       resource.revision_number,
  189                       {f: existing.get(f) for f in changed_fields},
  190                       {f: resource.get(f) for f in changed_fields})
  191         else:
  192             LOG.debug("Received new resource %s: %s", rtype, resource)
  193         # local notification for agent internals to subscribe to
  194         registry.notify(rtype, events.AFTER_UPDATE, self,
  195                         context=context, changed_fields=changed_fields,
  196                         existing=existing, updated=resource,
  197                         resource_id=resource.id,
  198                         agent_restarted=agent_restarted)
  199 
  200     def record_resource_delete(self, context, rtype, resource_id):
  201         # deletions are final, record them so we never
  202         # accept new data for the same ID.
  203         LOG.debug("Resource %s deleted: %s", rtype, resource_id)
  204         # TODO(kevinbenton): we need a way to expire items from the set at
  205         # some TTL so it doesn't grow indefinitely with churn
  206         if resource_id in self._deleted_ids_by_type[rtype]:
  207             LOG.debug("Skipped duplicate delete event for %s", resource_id)
  208             return
  209         self._deleted_ids_by_type[rtype].add(resource_id)
  210         existing = self._type_cache(rtype).pop(resource_id, None)
  211         # local notification for agent internals to subscribe to
  212         registry.notify(rtype, events.AFTER_DELETE, self, context=context,
  213                         existing=existing, resource_id=resource_id)
  214 
  215     def _get_changed_fields(self, old, new):
  216         """Returns changed fields excluding update time and revision."""
  217         new = new.to_dict()
  218         changed = set(new)
  219         if old:
  220             for k, v in old.to_dict().items():
  221                 if v == new.get(k):
  222                     changed.discard(k)
  223         for ignore in ('revision_number', 'updated_at'):
  224             changed.discard(ignore)
  225         return changed
  226 
  227 
  228 class RemoteResourceWatcher(object):
  229     """Converts RPC callback notifications to local registry notifications.
  230 
  231     This allows a constructor to listen for RPC callbacks for a given
  232     dictionary of resources and fields desired.
  233     This watcher will listen to the RPC callbacks as sent on the wire and
  234     handle things like out-of-order message detection and throwing away
  235     updates to fields the constructor doesn't care about.
  236 
  237     All watched resources must be primary keyed on a field called 'id' and
  238     have a standard attr revision number.
  239     """
  240 
  241     def __init__(self, remote_resource_cache):
  242         self.rcache = remote_resource_cache
  243         self._init_rpc_listeners()
  244 
  245     def _init_rpc_listeners(self):
  246         endpoints = [resources_rpc.ResourcesPushRpcCallback()]
  247         self._connection = n_rpc.Connection()
  248         for rtype in self.rcache.resource_types:
  249             registry_rpc.register(self.resource_change_handler, rtype)
  250             topic = resources_rpc.resource_type_versioned_topic(rtype)
  251             self._connection.create_consumer(topic, endpoints, fanout=True)
  252         self._connection.consume_in_threads()
  253 
  254     def resource_change_handler(self, context, rtype, resources, event_type):
  255         for r in resources:
  256             if event_type == events_rpc.DELETED:
  257                 self.rcache.record_resource_delete(context, rtype, r.id)
  258             else:
  259                 # creates and updates are treated equally
  260                 self.rcache.record_resource_update(context, rtype, r)