"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/mongodb/queues.py" (13 May 2020, 10970 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "queues.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 """Implements the MongoDB storage controller for queues.
   17 
   18 Field Mappings:
   19     In order to reduce the disk / memory space used,
   20     field names will be, most of the time, the first
   21     letter of their long name.
   22 """
   23 
   24 from oslo_log import log as logging
   25 from oslo_utils import timeutils
   26 from pymongo.collection import ReturnDocument
   27 import pymongo.errors
   28 
   29 from zaqar.common import decorators
   30 from zaqar.i18n import _
   31 from zaqar import storage
   32 from zaqar.storage import errors
   33 from zaqar.storage.mongodb import utils
   34 
   35 LOG = logging.getLogger(__name__)
   36 
   37 # NOTE(kgriffs): E.g.: 'queuecontroller:exists:5083853/my-queue'
   38 _QUEUE_CACHE_PREFIX = 'queuecontroller:'
   39 
   40 # NOTE(kgriffs): This causes some race conditions, but they are
   41 # harmless. If a queue was deleted, but we are still returning
   42 # that it exists, some messages may get inserted without the
   43 # client getting an error. In this case, those messages would
   44 # be orphaned and expire eventually according to their TTL.
   45 #
   46 # What this means for the client is that they have a bug; they
   47 # deleted a queue and then immediately tried to post messages
   48 # to it. If they keep trying to use the queue, they will
   49 # eventually start getting an error, once the cache entry
   50 # expires, which should clue them in on what happened.
   51 #
   52 # TODO(kgriffs): Make dynamic?
   53 _QUEUE_CACHE_TTL = 5
   54 
   55 
   56 def _queue_exists_key(queue, project=None):
   57     # NOTE(kgriffs): Use string concatenation for performance,
   58     # also put project first since it is guaranteed to be
   59     # unique, which should reduce lookup time.
   60     return _QUEUE_CACHE_PREFIX + 'exists:' + str(project) + '/' + queue
   61 
   62 
   63 class QueueController(storage.Queue):
   64     """Implements queue resource operations using MongoDB.
   65 
   66     Queues are scoped by project, which is prefixed to the
   67     queue name.
   68 
   69     ::
   70 
   71         Queues:
   72 
   73             Name            Field
   74             ---------------------
   75             name         ->   p_q
   76             msg counter  ->     c
   77             metadata     ->     m
   78 
   79         Message Counter:
   80 
   81             Name          Field
   82             -------------------
   83             value        ->   v
   84             modified ts  ->   t
   85     """
   86 
   87     def __init__(self, *args, **kwargs):
   88         super(QueueController, self).__init__(*args, **kwargs)
   89 
   90         self._cache = self.driver.cache
   91         self._collection = self.driver.queues_database.queues
   92 
   93         # NOTE(flaper87): This creates a unique index for
   94         # project and name. Using project as the prefix
   95         # allows for querying by project and project+name.
   96         # This is also useful for retrieving the queues list for
   97         # a specific project, for example. Order matters!
   98         self._collection.ensure_index([('p_q', 1)], unique=True)
   99 
  100     # ----------------------------------------------------------------------
  101     # Helpers
  102     # ----------------------------------------------------------------------
  103 
  104     def _get_counter(self, name, project=None):
  105         """Retrieves the current message counter value for a given queue.
  106 
  107         This helper is used to generate monotonic pagination
  108         markers that are saved as part of the message
  109         document.
  110 
  111         Note 1: Markers are scoped per-queue and so are *not*
  112             globally unique or globally ordered.
  113 
  114         Note 2: If two or more requests to this method are made
  115             in parallel, this method will return the same counter
  116             value. This is done intentionally so that the caller
  117             can detect a parallel message post, allowing it to
  118             mitigate race conditions between producer and
  119             observer clients.
  120 
  121         :param name: Name of the queue to which the counter is scoped
  122         :param project: Queue's project
  123         :returns: current message counter as an integer
  124         """
  125 
  126         doc = self._collection.find_one(_get_scoped_query(name, project),
  127                                         projection={'c.v': 1, '_id': 0})
  128 
  129         if doc is None:
  130             raise errors.QueueDoesNotExist(name, project)
  131 
  132         return doc['c']['v']
  133 
  134     def _inc_counter(self, name, project=None, amount=1, window=None):
  135         """Increments the message counter and returns the new value.
  136 
  137         :param name: Name of the queue to which the counter is scoped
  138         :param project: Queue's project name
  139         :param amount: (Default 1) Amount by which to increment the counter
  140         :param window: (Default None) A time window, in seconds, that
  141             must have elapsed since the counter was last updated, in
  142             order to increment the counter.
  143 
  144         :returns: Updated message counter value, or None if window
  145             was specified, and the counter has already been updated
  146             within the specified time period.
  147 
  148         :raises QueueDoesNotExist: if not found
  149         """
  150         now = timeutils.utcnow_ts()
  151 
  152         update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
  153         query = _get_scoped_query(name, project)
  154         if window is not None:
  155             threshold = now - window
  156             query['c.t'] = {'$lt': threshold}
  157 
  158         while True:
  159             try:
  160                 doc = self._collection.find_one_and_update(
  161                     query, update, return_document=ReturnDocument.AFTER,
  162                     projection={'c.v': 1, '_id': 0})
  163 
  164                 break
  165             except pymongo.errors.AutoReconnect:
  166                 LOG.exception('Auto reconnect failure')
  167 
  168         if doc is None:
  169             if window is None:
  170                 # NOTE(kgriffs): Since we did not filter by a time window,
  171                 # the queue should have been found and updated. Perhaps
  172                 # the queue has been deleted?
  173                 message = _(u'Failed to increment the message '
  174                             u'counter for queue %(name)s and '
  175                             u'project %(project)s')
  176                 message %= dict(name=name, project=project)
  177 
  178                 LOG.warning(message)
  179 
  180                 raise errors.QueueDoesNotExist(name, project)
  181 
  182             # NOTE(kgriffs): Assume the queue existed, but the counter
  183             # was recently updated, causing the range query on 'c.t' to
  184             # exclude the record.
  185             return None
  186 
  187         return doc['c']['v']
  188 
  189     # ----------------------------------------------------------------------
  190     # Interface
  191     # ----------------------------------------------------------------------
  192 
  193     def _get(self, name, project=None):
  194         try:
  195             return self.get_metadata(name, project)
  196         except errors.QueueDoesNotExist:
  197             return {}
  198 
  199     def _list(self, project=None, kfilter={}, marker=None,
  200               limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False,
  201               name=None):
  202 
  203         query = utils.scoped_query(marker, project, name, kfilter)
  204 
  205         projection = {'p_q': 1, '_id': 0}
  206         if detailed:
  207             projection['m'] = 1
  208 
  209         cursor = self._collection.find(query, projection=projection)
  210         cursor = cursor.limit(limit).sort('p_q')
  211         marker_name = {}
  212 
  213         def normalizer(record):
  214             queue = {'name': utils.descope_queue_name(record['p_q'])}
  215             marker_name['next'] = queue['name']
  216             if detailed:
  217                 queue['metadata'] = record['m']
  218             return queue
  219 
  220         yield utils.HookedCursor(cursor, normalizer)
  221         yield marker_name and marker_name['next']
  222 
  223     @utils.raises_conn_error
  224     @utils.retries_on_autoreconnect
  225     def get_metadata(self, name, project=None):
  226         queue = self._collection.find_one(_get_scoped_query(name, project),
  227                                           projection={'m': 1, '_id': 0})
  228         if queue is None:
  229             raise errors.QueueDoesNotExist(name, project)
  230 
  231         return queue.get('m', {})
  232 
  233     @utils.raises_conn_error
  234     # @utils.retries_on_autoreconnect
  235     def _create(self, name, metadata=None, project=None):
  236         # NOTE(flaper87): If the connection fails after it was called
  237         # and we retry to insert the queue, we could end up returning
  238         # `False` because of the `DuplicatedKeyError` although the
  239         # queue was indeed created by this API call.
  240         #
  241         # TODO(kgriffs): Commented out `retries_on_autoreconnect` for
  242         # now due to the above issue, since creating a queue is less
  243         # important to make super HA.
  244 
  245         try:
  246             # NOTE(kgriffs): Start counting at 1, and assume the first
  247             # message ever posted will succeed and set t to a UNIX
  248             # "modified at" timestamp.
  249             counter = {'v': 1, 't': 0}
  250 
  251             scoped_name = utils.scope_queue_name(name, project)
  252             self._collection.insert_one(
  253                 {'p_q': scoped_name, 'm': metadata or {},
  254                  'c': counter})
  255 
  256         except pymongo.errors.DuplicateKeyError:
  257             return False
  258         else:
  259             return True
  260 
  261     # NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and
  262     # someone creates it, we want it to be immediately visible.
  263     @utils.raises_conn_error
  264     @utils.retries_on_autoreconnect
  265     @decorators.caches(_queue_exists_key, _QUEUE_CACHE_TTL, lambda v: v)
  266     def _exists(self, name, project=None):
  267         query = _get_scoped_query(name, project)
  268         return self._collection.find_one(query) is not None
  269 
  270     @utils.raises_conn_error
  271     @utils.retries_on_autoreconnect
  272     def set_metadata(self, name, metadata, project=None):
  273         rst = self._collection.update_one(_get_scoped_query(name, project),
  274                                           {'$set': {'m': metadata}})
  275 
  276         if rst.matched_count == 0:
  277             raise errors.QueueDoesNotExist(name, project)
  278 
  279     @utils.raises_conn_error
  280     @utils.retries_on_autoreconnect
  281     @_exists.purges
  282     def _delete(self, name, project=None):
  283         self._collection.delete_one(_get_scoped_query(name, project))
  284 
  285     @utils.raises_conn_error
  286     @utils.retries_on_autoreconnect
  287     def _stats(self, name, project=None):
  288         pass
  289 
  290     @utils.raises_conn_error
  291     @utils.retries_on_autoreconnect
  292     def _calculate_resource_count(self, project=None):
  293         query = utils.scoped_query(None, project, None, {})
  294         projection = {'p_q': 1, '_id': 0}
  295         return self._collection.find(query, projection=projection).count()
  296 
  297 
  298 def _get_scoped_query(name, project):
  299     return {'p_q': utils.scope_queue_name(name, project)}