"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/mongodb/topics.py" (13 May 2020, 10063 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "topics.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2019 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 """Implements the MongoDB storage controller for topics.
   17 
   18 Field Mappings:
   19     In order to reduce the disk / memory space used,
   20     field names will be, most of the time, the first
   21     letter of their long name.
   22 """
   23 
   24 from oslo_log import log as logging
   25 from oslo_utils import timeutils
   26 from pymongo.collection import ReturnDocument
   27 import pymongo.errors
   28 
   29 from zaqar.common import decorators
   30 from zaqar.i18n import _
   31 from zaqar import storage
   32 from zaqar.storage import errors
   33 from zaqar.storage.mongodb import utils
   34 
   35 LOG = logging.getLogger(__name__)
   36 
   37 # NOTE(wanghao): Keep this as same as queues'
   38 _TOPIC_CACHE_PREFIX = 'topiccontroller:'
   39 _TOPIC_CACHE_TTL = 5
   40 
   41 
   42 def _topic_exists_key(topic, project=None):
   43     # NOTE(kgriffs): Use string concatenation for performance,
   44     # also put project first since it is guaranteed to be
   45     # unique, which should reduce lookup time.
   46     return _TOPIC_CACHE_PREFIX + 'exists:' + str(project) + '/' + topic
   47 
   48 
   49 class TopicController(storage.Topic):
   50     """Implements Topic resource operations using MongoDB.
   51 
   52     Topics are scoped by project, which is prefixed to the
   53     topic name.
   54 
   55     ::
   56 
   57         Topic:
   58 
   59             Name            Field
   60             ---------------------
   61             name         ->   p_t
   62             msg counter  ->     c
   63             metadata     ->     m
   64 
   65         Message Counter:
   66 
   67             Name          Field
   68             -------------------
   69             value        ->   v
   70             modified ts  ->   t
   71     """
   72 
   73     def __init__(self, *args, **kwargs):
   74         super(TopicController, self).__init__(*args, **kwargs)
   75 
   76         self._cache = self.driver.cache
   77         self._collection = self.driver.topics_database.topics
   78 
   79         # NOTE(flaper87): This creates a unique index for
   80         # project and name. Using project as the prefix
   81         # allows for querying by project and project+name.
   82         # This is also useful for retrieving the queues list for
   83         # a specific project, for example. Order matters!
   84         self._collection.ensure_index([('p_t', 1)], unique=True)
   85 
   86     # ----------------------------------------------------------------------
   87     # Helpers
   88     # ----------------------------------------------------------------------
   89 
   90     def _get_counter(self, name, project=None):
   91         """Retrieves the current message counter value for a given topic.
   92 
   93         This helper is used to generate monotonic pagination
   94         markers that are saved as part of the message
   95         document.
   96 
   97         Note 1: Markers are scoped per-topic and so are *not*
   98             globally unique or globally ordered.
   99 
  100         Note 2: If two or more requests to this method are made
  101             in parallel, this method will return the same counter
  102             value. This is done intentionally so that the caller
  103             can detect a parallel message post, allowing it to
  104             mitigate race conditions between producer and
  105             observer clients.
  106 
  107         :param name: Name of the queue to which the counter is scoped
  108         :param project: Topic's project
  109         :returns: current message counter as an integer
  110         """
  111 
  112         doc = self._collection.find_one(_get_scoped_query(name, project),
  113                                         projection={'c.v': 1, '_id': 0})
  114 
  115         if doc is None:
  116             raise errors.TopicDoesNotExist(name, project)
  117 
  118         return doc['c']['v']
  119 
  120     def _inc_counter(self, name, project=None, amount=1, window=None):
  121         """Increments the message counter and returns the new value.
  122 
  123         :param name: Name of the topic to which the counter is scoped
  124         :param project: Topic's project name
  125         :param amount: (Default 1) Amount by which to increment the counter
  126         :param window: (Default None) A time window, in seconds, that
  127             must have elapsed since the counter was last updated, in
  128             order to increment the counter.
  129 
  130         :returns: Updated message counter value, or None if window
  131             was specified, and the counter has already been updated
  132             within the specified time period.
  133 
  134         :raises TopicDoesNotExist: if not found
  135         """
  136         now = timeutils.utcnow_ts()
  137 
  138         update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
  139         query = _get_scoped_query(name, project)
  140         if window is not None:
  141             threshold = now - window
  142             query['c.t'] = {'$lt': threshold}
  143 
  144         while True:
  145             try:
  146                 doc = self._collection.find_one_and_update(
  147                     query, update, return_document=ReturnDocument.AFTER,
  148                     projection={'c.v': 1, '_id': 0})
  149 
  150                 break
  151             except pymongo.errors.AutoReconnect:
  152                 LOG.exception('Auto reconnect failure')
  153 
  154         if doc is None:
  155             if window is None:
  156                 # NOTE(kgriffs): Since we did not filter by a time window,
  157                 # the topic should have been found and updated. Perhaps
  158                 # the topic has been deleted?
  159                 message = _(u'Failed to increment the message '
  160                             u'counter for topic %(name)s and '
  161                             u'project %(project)s')
  162                 message %= dict(name=name, project=project)
  163 
  164                 LOG.warning(message)
  165 
  166                 raise errors.TopicDoesNotExist(name, project)
  167 
  168             # NOTE(kgriffs): Assume the topic existed, but the counter
  169             # was recently updated, causing the range topic on 'c.t' to
  170             # exclude the record.
  171             return None
  172 
  173         return doc['c']['v']
  174 
  175     # ----------------------------------------------------------------------
  176     # Interface
  177     # ----------------------------------------------------------------------
  178 
  179     def _get(self, name, project=None):
  180         try:
  181             return self.get_metadata(name, project)
  182         except errors.TopicDoesNotExist:
  183             return {}
  184 
  185     def _list(self, project=None, kfilter={}, marker=None,
  186               limit=storage.DEFAULT_TOPICS_PER_PAGE, detailed=False,
  187               name=None):
  188 
  189         query = utils.scoped_query(marker, project, name, kfilter,
  190                                    key_value='p_t')
  191 
  192         projection = {'p_t': 1, '_id': 0}
  193         if detailed:
  194             projection['m'] = 1
  195 
  196         cursor = self._collection.find(query, projection=projection)
  197         cursor = cursor.limit(limit).sort('p_t')
  198         marker_name = {}
  199 
  200         def normalizer(record):
  201             topic = {'name': utils.descope_queue_name(record['p_t'])}
  202             marker_name['next'] = topic['name']
  203             if detailed:
  204                 topic['metadata'] = record['m']
  205             return topic
  206 
  207         yield utils.HookedCursor(cursor, normalizer)
  208         yield marker_name and marker_name['next']
  209 
  210     @utils.raises_conn_error
  211     @utils.retries_on_autoreconnect
  212     def get_metadata(self, name, project=None):
  213         queue = self._collection.find_one(_get_scoped_query(name, project),
  214                                           projection={'m': 1, '_id': 0})
  215         if queue is None:
  216             raise errors.TopicDoesNotExist(name, project)
  217 
  218         return queue.get('m', {})
  219 
  220     @utils.raises_conn_error
  221     # @utils.retries_on_autoreconnect
  222     def _create(self, name, metadata=None, project=None):
  223         # NOTE(flaper87): If the connection fails after it was called
  224         # and we retry to insert the topic, we could end up returning
  225         # `False` because of the `DuplicatedKeyError` although the
  226         # topic was indeed created by this API call.
  227         #
  228         # TODO(kgriffs): Commented out `retries_on_autoreconnect` for
  229         # now due to the above issue, since creating a topic is less
  230         # important to make super HA.
  231 
  232         try:
  233             # NOTE(kgriffs): Start counting at 1, and assume the first
  234             # message ever posted will succeed and set t to a UNIX
  235             # "modified at" timestamp.
  236             counter = {'v': 1, 't': 0}
  237 
  238             scoped_name = utils.scope_queue_name(name, project)
  239             self._collection.insert_one(
  240                 {'p_t': scoped_name, 'm': metadata or {},
  241                  'c': counter})
  242 
  243         except pymongo.errors.DuplicateKeyError:
  244             return False
  245         else:
  246             return True
  247 
  248     # NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and
  249     # someone creates it, we want it to be immediately visible.
  250     @utils.raises_conn_error
  251     @utils.retries_on_autoreconnect
  252     @decorators.caches(_topic_exists_key, _TOPIC_CACHE_TTL, lambda v: v)
  253     def _exists(self, name, project=None):
  254         query = _get_scoped_query(name, project)
  255         return self._collection.find_one(query) is not None
  256 
  257     @utils.raises_conn_error
  258     @utils.retries_on_autoreconnect
  259     def set_metadata(self, name, metadata, project=None):
  260         rst = self._collection.update_one(_get_scoped_query(name, project),
  261                                           {'$set': {'m': metadata}})
  262 
  263         if rst.matched_count == 0:
  264             raise errors.TopicDoesNotExist(name, project)
  265 
  266     @utils.raises_conn_error
  267     @utils.retries_on_autoreconnect
  268     @_exists.purges
  269     def _delete(self, name, project=None):
  270         self._collection.delete_one(_get_scoped_query(name, project))
  271 
  272     @utils.raises_conn_error
  273     @utils.retries_on_autoreconnect
  274     def _stats(self, name, project=None):
  275         pass
  276 
  277 
  278 def _get_scoped_query(name, project):
  279     return {'p_t': utils.scope_queue_name(name, project)}