"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/mongodb/messages.py" (13 May 2020, 42410 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "messages.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 """Implements MongoDB the storage controller for messages.
   17 
   18 Field Mappings:
   19     In order to reduce the disk / memory space used,
   20     field names will be, most of the time, the first
   21     letter of their long name.
   22 """
   23 
   24 import datetime
   25 import time
   26 
   27 from bson import objectid
   28 from oslo_log import log as logging
   29 from oslo_utils import timeutils
   30 import pymongo.errors
   31 import pymongo.read_preferences
   32 
   33 from zaqar.i18n import _
   34 from zaqar import storage
   35 from zaqar.storage import errors
   36 from zaqar.storage.mongodb import utils
   37 from zaqar.storage import utils as s_utils
   38 
   39 
   40 LOG = logging.getLogger(__name__)
   41 
   42 # NOTE(kgriffs): This value, in seconds, should be at least less than the
   43 # minimum allowed TTL for messages (60 seconds). Make it 45 to allow for
   44 # some fudge room.
   45 MAX_RETRY_POST_DURATION = 45
   46 
   47 # NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang
   48 # for more than 5 seconds, without a single one being able to succeed in
   49 # posting some messages and incrementing the counter, thus allowing the other
   50 # producers to succeed in turn.
   51 COUNTER_STALL_WINDOW = 5
   52 
   53 # For hinting
   54 ID_INDEX_FIELDS = [('_id', 1)]
   55 
   56 # For removing expired messages
   57 TTL_INDEX_FIELDS = [
   58     ('e', 1),
   59 ]
   60 
   61 # NOTE(cpp-cabrera): to unify use of project/queue across mongodb
   62 # storage impls.
   63 PROJ_QUEUE = utils.PROJ_QUEUE_KEY
   64 
   65 # NOTE(kgriffs): This index is for listing messages, usually
   66 # filtering out claimed ones.
   67 ACTIVE_INDEX_FIELDS = [
   68     (PROJ_QUEUE, 1),  # Project will be unique, so put first
   69     ('k', 1),  # Used for sorting and paging, must come before range queries
   70     ('c.e', 1),  # Used for filtering out claimed messages
   71 
   72     # NOTE(kgriffs): We do not include 'u' and 'tx' here on
   73     # purpose. It was found experimentally that adding 'u' did
   74     # not improve performance, and so it was left out in order
   75     # to reduce index size and make updating the index
   76     # faster. When 'tx' was added, it was assumed that it would
   77     # follow a similar performance pattern to 'u', since by
   78     # the time you traverse the index down past the fields
   79     # listed above, there is very little left to scan, esp.
   80     # considering all queries are limited (limit=) to a fairly
   81     # small number.
   82     #
   83     # TODO(kgriffs): The extrapolation wrt 'tx' needs to be
   84     # proven empirically.
   85 ]
   86 
   87 # For counting
   88 COUNTING_INDEX_FIELDS = [
   89     (PROJ_QUEUE, 1),  # Project will be unique, so put first
   90     ('c.e', 1),  # Used for filtering out claimed messages
   91 ]
   92 
   93 # Index used for claims
   94 CLAIMED_INDEX_FIELDS = [
   95     (PROJ_QUEUE, 1),
   96     ('c.id', 1),
   97     ('k', 1),
   98     ('c.e', 1),
   99 ]
  100 
  101 # This index is meant to be used as a shard-key and to ensure
  102 # uniqueness for markers.
  103 #
  104 # As for other compound indexes, order matters. The marker `k`
  105 # gives enough cardinality to ensure chunks are evenly distributed,
  106 # whereas the `p_q` field helps keeping chunks from the same project
  107 # and queue together.
  108 #
  109 # In a sharded environment, uniqueness of this index is still guaranteed
  110 # because it's used as a shard key.
  111 MARKER_INDEX_FIELDS = [
  112     ('k', 1),
  113     (PROJ_QUEUE, 1),
  114 ]
  115 
  116 TRANSACTION_INDEX_FIELDS = [
  117     ('tx', 1),
  118 ]
  119 
  120 
  121 class MessageController(storage.Message):
  122     """Implements message resource operations using MongoDB.
  123 
  124     Messages are scoped by project + queue.
  125 
  126     ::
  127 
  128         Messages:
  129             Name                Field
  130             -------------------------
  131             scope            ->   p_q
  132             ttl              ->     t
  133             expires          ->     e
  134             marker           ->     k
  135             body             ->     b
  136             claim            ->     c
  137             client uuid      ->     u
  138             transaction      ->    tx
  139             delay            ->     d
  140             checksum         ->    cs
  141     """
  142 
  143     def __init__(self, *args, **kwargs):
  144         super(MessageController, self).__init__(*args, **kwargs)
  145 
  146         # Cache for convenience and performance
  147         self._num_partitions = self.driver.mongodb_conf.partitions
  148         self._queue_ctrl = self.driver.queue_controller
  149         self._retry_range = range(self.driver.mongodb_conf.max_attempts)
  150 
  151         # Create a list of 'messages' collections, one for each database
  152         # partition, ordered by partition number.
  153         #
  154         # NOTE(kgriffs): Order matters, since it is used to lookup the
  155         # collection by partition number. For example, self._collections[2]
  156         # would provide access to zaqar_p2.messages (partition numbers are
  157         # zero-based).
  158         self._collections = [db.messages
  159                              for db in self.driver.message_databases]
  160 
  161         # Ensure indexes are initialized before any queries are performed
  162         for collection in self._collections:
  163             self._ensure_indexes(collection)
  164 
  165     # ----------------------------------------------------------------------
  166     # Helpers
  167     # ----------------------------------------------------------------------
  168 
  169     def _ensure_indexes(self, collection):
  170         """Ensures that all indexes are created."""
  171 
  172         collection.ensure_index(TTL_INDEX_FIELDS,
  173                                 name='ttl',
  174                                 expireAfterSeconds=0,
  175                                 background=True)
  176 
  177         collection.ensure_index(ACTIVE_INDEX_FIELDS,
  178                                 name='active',
  179                                 background=True)
  180 
  181         collection.ensure_index(CLAIMED_INDEX_FIELDS,
  182                                 name='claimed',
  183                                 background=True)
  184 
  185         collection.ensure_index(COUNTING_INDEX_FIELDS,
  186                                 name='counting',
  187                                 background=True)
  188 
  189         collection.ensure_index(MARKER_INDEX_FIELDS,
  190                                 name='queue_marker',
  191                                 background=True)
  192 
  193         collection.ensure_index(TRANSACTION_INDEX_FIELDS,
  194                                 name='transaction',
  195                                 background=True)
  196 
  197     def _collection(self, queue_name, project=None):
  198         """Get a partitioned collection instance."""
  199         return self._collections[utils.get_partition(self._num_partitions,
  200                                                      queue_name, project)]
  201 
  202     def _backoff_sleep(self, attempt):
  203         """Sleep between retries using a jitter algorithm.
  204 
  205         Mitigates thrashing between multiple parallel requests, and
  206         creates backpressure on clients to slow down the rate
  207         at which they submit requests.
  208 
  209         :param attempt: current attempt number, zero-based
  210         """
  211         conf = self.driver.mongodb_conf
  212         seconds = utils.calculate_backoff(attempt, conf.max_attempts,
  213                                           conf.max_retry_sleep,
  214                                           conf.max_retry_jitter)
  215 
  216         time.sleep(seconds)
  217 
  218     def _purge_queue(self, queue_name, project=None):
  219         """Removes all messages from the queue.
  220 
  221         Warning: Only use this when deleting the queue; otherwise
  222         you can cause a side-effect of reseting the marker counter
  223         which can cause clients to miss tons of messages.
  224 
  225         If the queue does not exist, this method fails silently.
  226 
  227         :param queue_name: name of the queue to purge
  228         :param project: ID of the project to which the queue belongs
  229         """
  230         scope = utils.scope_queue_name(queue_name, project)
  231         collection = self._collection(queue_name, project)
  232         collection.delete_many({PROJ_QUEUE: scope})
  233 
  234     def _list(self, queue_name, project=None, marker=None,
  235               echo=False, client_uuid=None, projection=None,
  236               include_claimed=False, include_delayed=False,
  237               sort=1, limit=None):
  238         """Message document listing helper.
  239 
  240         :param queue_name: Name of the queue to list
  241         :param project: (Default None) Project `queue_name` belongs to. If
  242             not specified, queries the "global" namespace/project.
  243         :param marker: (Default None) Message marker from which to start
  244             iterating. If not specified, starts with the first message
  245             available in the queue.
  246         :param echo: (Default False) Whether to return messages that match
  247             client_uuid
  248         :param client_uuid: (Default None) UUID for the client that
  249             originated this request
  250         :param projection: (Default None) a list of field names that should be
  251             returned in the result set or a dict specifying the fields to
  252             include or exclude
  253         :param include_claimed: (Default False) Whether to include
  254             claimed messages, not just active ones
  255         :param include_delayed: (Default False) Whether to include
  256             delayed messages, not just active ones
  257         :param sort: (Default 1) Sort order for the listing. Pass 1 for
  258             ascending (oldest message first), or -1 for descending (newest
  259             message first).
  260         :param limit: (Default None) The maximum number of messages
  261             to list. The results may include fewer messages than the
  262             requested `limit` if not enough are available. If limit is
  263             not specified
  264 
  265         :returns: Generator yielding up to `limit` messages.
  266         """
  267 
  268         if sort not in (1, -1):
  269             raise ValueError(u'sort must be either 1 (ascending) '
  270                              u'or -1 (descending)')
  271 
  272         now = timeutils.utcnow_ts()
  273 
  274         query = {
  275             # Messages must belong to this queue and project.
  276             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  277 
  278             # NOTE(kgriffs): Messages must be finalized (i.e., must not
  279             # be part of an unfinalized transaction).
  280             #
  281             # See also the note wrt 'tx' within the definition
  282             # of ACTIVE_INDEX_FIELDS.
  283             'tx': None,
  284         }
  285 
  286         if not echo:
  287             query['u'] = {'$ne': client_uuid}
  288 
  289         if marker is not None:
  290             query['k'] = {'$gt': marker}
  291 
  292         collection = self._collection(queue_name, project)
  293 
  294         if not include_claimed:
  295             # Only include messages that are not part of
  296             # any claim, or are part of an expired claim.
  297             query['c.e'] = {'$lte': now}
  298 
  299         if not include_delayed:
  300             # NOTE(cdyangzhenyu): Only include messages that are not
  301             # part of any delay, or are part of an expired delay. if
  302             # the message has no attribute 'd', it will also be obtained.
  303             # This is for compatibility with old data.
  304             query['$or'] = [{'d': {'$lte': now}},
  305                             {'d': {'$exists': False}}]
  306 
  307         # Construct the request
  308         cursor = collection.find(query,
  309                                  projection=projection,
  310                                  sort=[('k', sort)])
  311 
  312         if limit is not None:
  313             cursor.limit(limit)
  314 
  315         # NOTE(flaper87): Suggest the index to use for this query to
  316         # ensure the most performant one is chosen.
  317         return cursor.hint(ACTIVE_INDEX_FIELDS)
  318 
  319     # ----------------------------------------------------------------------
  320     # "Friends" interface
  321     # ----------------------------------------------------------------------
  322 
  323     def _count(self, queue_name, project=None, include_claimed=False):
  324         """Return total number of messages in a queue.
  325 
  326         This method is designed to very quickly count the number
  327         of messages in a given queue. Expired messages are not
  328         counted, of course. If the queue does not exist, the
  329         count will always be 0.
  330 
  331         Note: Some expired messages may be included in the count if
  332             they haven't been GC'd yet. This is done for performance.
  333         """
  334         query = {
  335             # Messages must belong to this queue and project.
  336             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  337 
  338             # NOTE(kgriffs): Messages must be finalized (i.e., must not
  339             # be part of an unfinalized transaction).
  340             #
  341             # See also the note wrt 'tx' within the definition
  342             # of ACTIVE_INDEX_FIELDS.
  343             'tx': None,
  344         }
  345 
  346         if not include_claimed:
  347             # Exclude messages that are claimed
  348             query['c.e'] = {'$lte': timeutils.utcnow_ts()}
  349 
  350         collection = self._collection(queue_name, project)
  351         return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS)
  352 
  353     def _active(self, queue_name, marker=None, echo=False,
  354                 client_uuid=None, projection=None, project=None,
  355                 limit=None, include_delayed=False):
  356 
  357         return self._list(queue_name, project=project, marker=marker,
  358                           echo=echo, client_uuid=client_uuid,
  359                           projection=projection, include_claimed=False,
  360                           include_delayed=include_delayed, limit=limit)
  361 
  362     def _claimed(self, queue_name, claim_id,
  363                  expires=None, limit=None, project=None):
  364 
  365         if claim_id is None:
  366             claim_id = {'$ne': None}
  367 
  368         query = {
  369             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  370             'c.id': claim_id,
  371             'c.e': {'$gt': expires or timeutils.utcnow_ts()},
  372         }
  373 
  374         kwargs = {}
  375         collection = self._collection(queue_name, project)
  376 
  377         # NOTE(kgriffs): Claimed messages bust be queried from
  378         # the primary to avoid a race condition caused by the
  379         # multi-phased "create claim" algorithm.
  380         # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
  381         # `read_preference` is read only. We'd need to set it when the
  382         # client is created.
  383         msgs = collection.find(query, sort=[('k', 1)], **kwargs).hint(
  384             CLAIMED_INDEX_FIELDS)
  385 
  386         if limit is not None:
  387             msgs = msgs.limit(limit)
  388 
  389         now = timeutils.utcnow_ts()
  390 
  391         def denormalizer(msg):
  392             doc = _basic_message(msg, now)
  393             doc['claim'] = msg['c']
  394 
  395             return doc
  396 
  397         return utils.HookedCursor(msgs, denormalizer)
  398 
  399     def _unclaim(self, queue_name, claim_id, project=None):
  400         cid = utils.to_oid(claim_id)
  401 
  402         # NOTE(cpp-cabrera): early abort - avoid a DB query if we're handling
  403         # an invalid ID
  404         if cid is None:
  405             return
  406 
  407         # NOTE(cpp-cabrera):  unclaim by setting the claim ID to None
  408         # and the claim expiration time to now
  409         now = timeutils.utcnow_ts()
  410         scope = utils.scope_queue_name(queue_name, project)
  411         collection = self._collection(queue_name, project)
  412 
  413         collection.update_many({PROJ_QUEUE: scope, 'c.id': cid},
  414                                {'$set': {'c': {'id': None, 'e': now}}},
  415                                upsert=False)
  416 
  417     def _inc_counter(self, queue_name, project=None, amount=1, window=None):
  418         """Increments the message counter and returns the new value.
  419 
  420         :param queue_name: Name of the queue to which the counter is scoped
  421         :param project: Queue's project name
  422         :param amount: (Default 1) Amount by which to increment the counter
  423         :param window: (Default None) A time window, in seconds, that
  424             must have elapsed since the counter was last updated, in
  425             order to increment the counter.
  426 
  427         :returns: Updated message counter value, or None if window
  428             was specified, and the counter has already been updated
  429             within the specified time period.
  430 
  431         :raises QueueDoesNotExist: if not found
  432         """
  433 
  434         # NOTE(flaper87): If this `if` is True, it means we're
  435         # using a mongodb in the control plane. To avoid breaking
  436         # environments doing so already, we'll keep using the counter
  437         # in the mongodb queue_controller rather than the one in the
  438         # message_controller. This should go away, eventually
  439         if hasattr(self._queue_ctrl, '_inc_counter'):
  440             return self._queue_ctrl._inc_counter(queue_name, project,
  441                                                  amount, window)
  442 
  443         now = timeutils.utcnow_ts()
  444 
  445         update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
  446         query = _get_scoped_query(queue_name, project)
  447         if window is not None:
  448             threshold = now - window
  449             query['c.t'] = {'$lt': threshold}
  450 
  451         while True:
  452             try:
  453                 collection = self._collection(queue_name, project).stats
  454                 doc = collection.find_one_and_update(
  455                     query, update,
  456                     return_document=pymongo.ReturnDocument.AFTER,
  457                     projection={'c.v': 1, '_id': 0})
  458 
  459                 break
  460             except pymongo.errors.AutoReconnect:
  461                 LOG.exception('Auto reconnect error')
  462 
  463         if doc is None:
  464             if window is None:
  465                 # NOTE(kgriffs): Since we did not filter by a time window,
  466                 # the queue should have been found and updated. Perhaps
  467                 # the queue has been deleted?
  468                 message = (u'Failed to increment the message '
  469                            u'counter for queue %(name)s and '
  470                            u'project %(project)s')
  471                 message %= dict(name=queue_name, project=project)
  472 
  473                 LOG.warning(message)
  474 
  475                 raise errors.QueueDoesNotExist(queue_name, project)
  476 
  477             # NOTE(kgriffs): Assume the queue existed, but the counter
  478             # was recently updated, causing the range query on 'c.t' to
  479             # exclude the record.
  480             return None
  481 
  482         return doc['c']['v']
  483 
  484     def _get_counter(self, queue_name, project=None):
  485         """Retrieves the current message counter value for a given queue.
  486 
  487         This helper is used to generate monotonic pagination
  488         markers that are saved as part of the message
  489         document.
  490 
  491         Note 1: Markers are scoped per-queue and so are *not*
  492             globally unique or globally ordered.
  493 
  494         Note 2: If two or more requests to this method are made
  495             in parallel, this method will return the same counter
  496             value. This is done intentionally so that the caller
  497             can detect a parallel message post, allowing it to
  498             mitigate race conditions between producer and
  499             observer clients.
  500 
  501         :param queue_name: Name of the queue to which the counter is scoped
  502         :param project: Queue's project
  503         :returns: current message counter as an integer
  504         """
  505 
  506         # NOTE(flaper87): If this `if` is True, it means we're
  507         # using a mongodb in the control plane. To avoid breaking
  508         # environments doing so already, we'll keep using the counter
  509         # in the mongodb queue_controller rather than the one in the
  510         # message_controller. This should go away, eventually
  511         if hasattr(self._queue_ctrl, '_get_counter'):
  512             return self._queue_ctrl._get_counter(queue_name, project)
  513 
  514         update = {'$inc': {'c.v': 0, 'c.t': 0}}
  515         query = _get_scoped_query(queue_name, project)
  516 
  517         try:
  518             collection = self._collection(queue_name, project).stats
  519             doc = collection.find_one_and_update(
  520                 query, update, upsert=True,
  521                 return_document=pymongo.ReturnDocument.AFTER,
  522                 projection={'c.v': 1, '_id': 0})
  523 
  524             return doc['c']['v']
  525         except pymongo.errors.AutoReconnect:
  526             LOG.exception('Auto reconnect error')
  527 
  528     # ----------------------------------------------------------------------
  529     # Public interface
  530     # ----------------------------------------------------------------------
  531 
  532     def list(self, queue_name, project=None, marker=None,
  533              limit=storage.DEFAULT_MESSAGES_PER_PAGE,
  534              echo=False, client_uuid=None, include_claimed=False,
  535              include_delayed=False):
  536 
  537         if marker is not None:
  538             try:
  539                 marker = int(marker)
  540             except ValueError:
  541                 yield iter([])
  542 
  543         messages = self._list(queue_name, project=project, marker=marker,
  544                               client_uuid=client_uuid, echo=echo,
  545                               include_claimed=include_claimed,
  546                               include_delayed=include_delayed, limit=limit)
  547 
  548         marker_id = {}
  549 
  550         now = timeutils.utcnow_ts()
  551 
  552         # NOTE (kgriffs) @utils.raises_conn_error not needed on this
  553         # function, since utils.HookedCursor already has it.
  554         def denormalizer(msg):
  555             marker_id['next'] = msg['k']
  556 
  557             return _basic_message(msg, now)
  558 
  559         yield utils.HookedCursor(messages, denormalizer)
  560         yield str(marker_id['next'])
  561 
  562     @utils.raises_conn_error
  563     @utils.retries_on_autoreconnect
  564     def first(self, queue_name, project=None, sort=1):
  565         cursor = self._list(queue_name, project=project,
  566                             include_claimed=True, sort=sort,
  567                             limit=1)
  568         try:
  569             message = next(cursor)
  570         except StopIteration:
  571             raise errors.QueueIsEmpty(queue_name, project)
  572 
  573         now = timeutils.utcnow_ts()
  574         return _basic_message(message, now)
  575 
  576     @utils.raises_conn_error
  577     @utils.retries_on_autoreconnect
  578     def get(self, queue_name, message_id, project=None):
  579         mid = utils.to_oid(message_id)
  580         if mid is None:
  581             raise errors.MessageDoesNotExist(message_id, queue_name,
  582                                              project)
  583 
  584         now = timeutils.utcnow_ts()
  585 
  586         query = {
  587             '_id': mid,
  588             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  589         }
  590 
  591         collection = self._collection(queue_name, project)
  592         message = list(collection.find(query).limit(1).hint(ID_INDEX_FIELDS))
  593 
  594         if not message:
  595             raise errors.MessageDoesNotExist(message_id, queue_name,
  596                                              project)
  597 
  598         return _basic_message(message[0], now)
  599 
  600     @utils.raises_conn_error
  601     @utils.retries_on_autoreconnect
  602     def bulk_get(self, queue_name, message_ids, project=None):
  603         message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
  604         if not message_ids:
  605             return iter([])
  606 
  607         now = timeutils.utcnow_ts()
  608 
  609         # Base query, always check expire time
  610         query = {
  611             '_id': {'$in': message_ids},
  612             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  613         }
  614 
  615         collection = self._collection(queue_name, project)
  616 
  617         # NOTE(flaper87): Should this query
  618         # be sorted?
  619         messages = collection.find(query).hint(ID_INDEX_FIELDS)
  620 
  621         def denormalizer(msg):
  622             return _basic_message(msg, now)
  623 
  624         return utils.HookedCursor(messages, denormalizer)
  625 
  626     @utils.raises_conn_error
  627     @utils.retries_on_autoreconnect
  628     def post(self, queue_name, messages, client_uuid, project=None):
  629         # NOTE(flaper87): This method should be safe to retry on
  630         # autoreconnect, since we've a 2-step insert for messages.
  631         # The worst-case scenario is that we'll increase the counter
  632         # several times and we'd end up with some non-active messages.
  633 
  634         if not self._queue_ctrl.exists(queue_name, project):
  635             raise errors.QueueDoesNotExist(queue_name, project)
  636 
  637         # NOTE(flaper87): Make sure the counter exists. This method
  638         # is an upsert.
  639         self._get_counter(queue_name, project)
  640         now = timeutils.utcnow_ts()
  641         now_dt = datetime.datetime.utcfromtimestamp(now)
  642         collection = self._collection(queue_name, project)
  643 
  644         messages = list(messages)
  645         msgs_n = len(messages)
  646         next_marker = self._inc_counter(queue_name,
  647                                         project,
  648                                         amount=msgs_n) - msgs_n
  649 
  650         prepared_messages = []
  651         for index, message in enumerate(messages):
  652             msg = {
  653                 PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  654                 't': message['ttl'],
  655                 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
  656                 'u': client_uuid,
  657                 'c': {'id': None, 'e': now, 'c': 0},
  658                 'd': now + message.get('delay', 0),
  659                 'b': message['body'] if 'body' in message else {},
  660                 'k': next_marker + index,
  661                 'tx': None
  662                 }
  663             if self.driver.conf.enable_checksum:
  664                 msg['cs'] = s_utils.get_checksum(message.get('body', None))
  665 
  666             prepared_messages.append(msg)
  667 
  668         res = collection.insert_many(prepared_messages,
  669                                      bypass_document_validation=True)
  670 
  671         return [str(id_) for id_ in res.inserted_ids]
  672 
  673     @utils.raises_conn_error
  674     @utils.retries_on_autoreconnect
  675     def delete(self, queue_name, message_id, project=None, claim=None):
  676         # NOTE(cpp-cabrera): return early - this is an invalid message
  677         # id so we won't be able to find it any way
  678         mid = utils.to_oid(message_id)
  679         if mid is None:
  680             return
  681 
  682         collection = self._collection(queue_name, project)
  683 
  684         query = {
  685             '_id': mid,
  686             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  687         }
  688 
  689         cid = utils.to_oid(claim)
  690         if cid is None:
  691             raise errors.ClaimDoesNotExist(claim, queue_name, project)
  692 
  693         now = timeutils.utcnow_ts()
  694         cursor = collection.find(query).hint(ID_INDEX_FIELDS)
  695 
  696         try:
  697             message = next(cursor)
  698         except StopIteration:
  699             return
  700 
  701         if claim is None:
  702             if _is_claimed(message, now):
  703                 raise errors.MessageIsClaimed(message_id)
  704 
  705         else:
  706             if message['c']['id'] != cid:
  707                 kwargs = {}
  708                 # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
  709                 # `read_preference` is read only. We'd need to set it when the
  710                 # client is created.
  711                 # NOTE(kgriffs): Read from primary in case the message
  712                 # was just barely claimed, and claim hasn't made it to
  713                 # the secondary.
  714                 message = collection.find_one(query, **kwargs)
  715 
  716                 if message['c']['id'] != cid:
  717                     if _is_claimed(message, now):
  718                         raise errors.MessageNotClaimedBy(message_id, claim)
  719 
  720                     raise errors.MessageNotClaimed(message_id)
  721 
  722         collection.delete_one(query)
  723 
  724     @utils.raises_conn_error
  725     @utils.retries_on_autoreconnect
  726     def bulk_delete(self, queue_name, message_ids, project=None,
  727                     claim_ids=None):
  728         message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
  729         if claim_ids:
  730             claim_ids = [cid for cid in map(utils.to_oid, claim_ids) if cid]
  731         query = {
  732             '_id': {'$in': message_ids},
  733             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  734         }
  735 
  736         collection = self._collection(queue_name, project)
  737         if claim_ids:
  738             message_claim_ids = []
  739             messages = collection.find(query).hint(ID_INDEX_FIELDS)
  740             for message in messages:
  741                 message_claim_ids.append(message['c']['id'])
  742             for cid in claim_ids:
  743                 if cid not in message_claim_ids:
  744                     raise errors.ClaimDoesNotExist(cid, queue_name, project)
  745 
  746         collection.delete_many(query)
  747 
  748     @utils.raises_conn_error
  749     @utils.retries_on_autoreconnect
  750     def pop(self, queue_name, limit, project=None):
  751         query = {
  752             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  753         }
  754 
  755         # Only include messages that are not part of
  756         # any claim, or are part of an expired claim.
  757         now = timeutils.utcnow_ts()
  758         query['c.e'] = {'$lte': now}
  759 
  760         collection = self._collection(queue_name, project)
  761         projection = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1}
  762 
  763         messages = (collection.find_one_and_delete(query,
  764                                                    projection=projection)
  765                     for _ in range(limit))
  766 
  767         final_messages = [_basic_message(message, now)
  768                           for message in messages
  769                           if message]
  770 
  771         return final_messages
  772 
  773 
  774 class FIFOMessageController(MessageController):
  775 
  776     def _ensure_indexes(self, collection):
  777         """Ensures that all indexes are created."""
  778 
  779         collection.ensure_index(TTL_INDEX_FIELDS,
  780                                 name='ttl',
  781                                 expireAfterSeconds=0,
  782                                 background=True)
  783 
  784         collection.ensure_index(ACTIVE_INDEX_FIELDS,
  785                                 name='active',
  786                                 background=True)
  787 
  788         collection.ensure_index(CLAIMED_INDEX_FIELDS,
  789                                 name='claimed',
  790                                 background=True)
  791 
  792         collection.ensure_index(COUNTING_INDEX_FIELDS,
  793                                 name='counting',
  794                                 background=True)
  795 
  796         # NOTE(kgriffs): This index must be unique so that
  797         # inserting a message with the same marker to the
  798         # same queue will fail; this is used to detect a
  799         # race condition which can cause an observer client
  800         # to miss a message when there is more than one
  801         # producer posting messages to the same queue, in
  802         # parallel.
  803         collection.ensure_index(MARKER_INDEX_FIELDS,
  804                                 name='queue_marker',
  805                                 unique=True,
  806                                 background=True)
  807 
  808         collection.ensure_index(TRANSACTION_INDEX_FIELDS,
  809                                 name='transaction',
  810                                 background=True)
  811 
  812     @utils.raises_conn_error
  813     @utils.retries_on_autoreconnect
  814     def post(self, queue_name, messages, client_uuid, project=None):
  815         # NOTE(flaper87): This method should be safe to retry on
  816         # autoreconnect, since we've a 2-step insert for messages.
  817         # The worst-case scenario is that we'll increase the counter
  818         # several times and we'd end up with some non-active messages.
  819 
  820         if not self._queue_ctrl.exists(queue_name, project):
  821             raise errors.QueueDoesNotExist(queue_name, project)
  822 
  823         # NOTE(flaper87): Make sure the counter exists. This method
  824         # is an upsert.
  825         self._get_counter(queue_name, project)
  826         now = timeutils.utcnow_ts()
  827         now_dt = datetime.datetime.utcfromtimestamp(now)
  828         collection = self._collection(queue_name, project)
  829 
  830         # Set the next basis marker for the first attempt.
  831         #
  832         # Note that we don't increment the counter right away because
  833         # if 2 concurrent posts happen and the one with the higher counter
  834         # ends before the one with the lower counter, there's a window
  835         # where a client paging through the queue may get the messages
  836         # with the higher counter and skip the previous ones. This would
  837         # make our FIFO guarantee unsound.
  838         next_marker = self._get_counter(queue_name, project)
  839 
  840         # Unique transaction ID to facilitate atomic batch inserts
  841         transaction = objectid.ObjectId()
  842 
  843         prepared_messages = []
  844         for index, message in enumerate(messages):
  845             msg = {
  846                 PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  847                 't': message['ttl'],
  848                 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
  849                 'u': client_uuid,
  850                 'c': {'id': None, 'e': now, 'c': 0},
  851                 'd': now + message.get('delay', 0),
  852                 'b': message['body'] if 'body' in message else {},
  853                 'k': next_marker + index,
  854                 'tx': None
  855                 }
  856             if self.driver.conf.enable_checksum:
  857                 msg['cs'] = s_utils.get_checksum(message.get('body', None))
  858 
  859             prepared_messages.append(msg)
  860 
  861         # NOTE(kgriffs): Don't take the time to do a 2-phase insert
  862         # if there is no way for it to partially succeed.
  863         if len(prepared_messages) == 1:
  864             transaction = None
  865             prepared_messages[0]['tx'] = None
  866 
  867         # Use a retry range for sanity, although we expect
  868         # to rarely, if ever, reach the maximum number of
  869         # retries.
  870         #
  871         # NOTE(kgriffs): With the default configuration (100 ms
  872         # max sleep, 1000 max attempts), the max stall time
  873         # before the operation is abandoned is 49.95 seconds.
  874         for attempt in self._retry_range:
  875             try:
  876                 res = collection.insert_many(prepared_messages,
  877                                              bypass_document_validation=True)
  878 
  879                 # Log a message if we retried, for debugging perf issues
  880                 if attempt != 0:
  881                     msgtmpl = _(u'%(attempts)d attempt(s) required to post '
  882                                 u'%(num_messages)d messages to queue '
  883                                 u'"%(queue)s" under project %(project)s')
  884 
  885                     LOG.debug(msgtmpl,
  886                               dict(queue=queue_name,
  887                                    attempts=attempt + 1,
  888                                    num_messages=len(res.inserted_ids),
  889                                    project=project))
  890 
  891                 # Update the counter in preparation for the next batch
  892                 #
  893                 # NOTE(kgriffs): Due to the unique index on the messages
  894                 # collection, competing inserts will fail as a whole,
  895                 # and keep retrying until the counter is incremented
  896                 # such that the competing marker's will start at a
  897                 # unique number, 1 past the max of the messages just
  898                 # inserted above.
  899                 self._inc_counter(queue_name, project,
  900                                   amount=len(res.inserted_ids))
  901 
  902                 # NOTE(kgriffs): Finalize the insert once we can say that
  903                 # all the messages made it. This makes bulk inserts
  904                 # atomic, assuming queries filter out any non-finalized
  905                 # messages.
  906                 if transaction is not None:
  907                     collection.update_many({'tx': transaction},
  908                                            {'$set': {'tx': None}},
  909                                            upsert=False)
  910 
  911                 return [str(id_) for id_ in res.inserted_ids]
  912 
  913             except (pymongo.errors.DuplicateKeyError,
  914                     pymongo.errors.BulkWriteError):
  915                 # TODO(kgriffs): Record stats of how often retries happen,
  916                 # and how many attempts, on average, are required to insert
  917                 # messages.
  918 
  919                 # NOTE(kgriffs): This can be used in conjunction with the
  920                 # log line, above, that is emitted after all messages have
  921                 # been posted, to gauge how long it is taking for messages
  922                 # to be posted to a given queue, or overall.
  923                 #
  924                 # TODO(kgriffs): Add transaction ID to help match up loglines
  925                 if attempt == 0:
  926                     msgtmpl = _(u'First attempt failed while '
  927                                 u'adding messages to queue '
  928                                 u'"%(queue)s" under project %(project)s')
  929 
  930                     LOG.debug(msgtmpl, dict(queue=queue_name, project=project))
  931 
  932                 # NOTE(kgriffs): Never retry past the point that competing
  933                 # messages expire and are GC'd, since once they are gone,
  934                 # the unique index no longer protects us from getting out
  935                 # of order, which could cause an observer to miss this
  936                 # message. The code below provides a sanity-check to ensure
  937                 # this situation can not happen.
  938                 elapsed = timeutils.utcnow_ts() - now
  939                 if elapsed > MAX_RETRY_POST_DURATION:
  940                     msgtmpl = (u'Exceeded maximum retry duration for queue '
  941                                u'"%(queue)s" under project %(project)s')
  942 
  943                     LOG.warning(msgtmpl,
  944                                 dict(queue=queue_name, project=project))
  945                     break
  946 
  947                 # Chill out for a moment to mitigate thrashing/thundering
  948                 self._backoff_sleep(attempt)
  949 
  950                 # NOTE(kgriffs): Perhaps we failed because a worker crashed
  951                 # after inserting messages, but before incrementing the
  952                 # counter; that would cause all future requests to stall,
  953                 # since they would keep getting the same base marker that is
  954                 # conflicting with existing messages, until the messages that
  955                 # "won" expire, at which time we would end up reusing markers,
  956                 # and that could make some messages invisible to an observer
  957                 # that is querying with a marker that is large than the ones
  958                 # being reused.
  959                 #
  960                 # To mitigate this, we apply a heuristic to determine whether
  961                 # a counter has stalled. We attempt to increment the counter,
  962                 # but only if it hasn't been updated for a few seconds, which
  963                 # should mean that nobody is left to update it!
  964                 #
  965                 # Note that we increment one at a time until the logjam is
  966                 # broken, since we don't know how many messages were posted
  967                 # by the worker before it crashed.
  968                 next_marker = self._inc_counter(
  969                     queue_name, project, window=COUNTER_STALL_WINDOW)
  970 
  971                 # Retry the entire batch with a new sequence of markers.
  972                 #
  973                 # NOTE(kgriffs): Due to the unique index, and how
  974                 # MongoDB works with batch requests, we will never
  975                 # end up with a partially-successful update. The first
  976                 # document in the batch will fail to insert, and the
  977                 # remainder of the documents will not be attempted.
  978                 if next_marker is None:
  979                     # NOTE(kgriffs): Usually we will end up here, since
  980                     # it should be rare that a counter becomes stalled.
  981                     next_marker = self._get_counter(
  982                         queue_name, project)
  983                 else:
  984                     msgtmpl = (u'Detected a stalled message counter '
  985                                u'for queue "%(queue)s" under '
  986                                u'project %(project)s.'
  987                                u'The counter was incremented to %(value)d.')
  988 
  989                     LOG.warning(msgtmpl,
  990                                 dict(queue=queue_name,
  991                                      project=project,
  992                                      value=next_marker))
  993 
  994                 for index, message in enumerate(prepared_messages):
  995                     message['k'] = next_marker + index
  996             except Exception:
  997                 LOG.exception('Error parsing document')
  998                 raise
  999 
 1000         msgtmpl = (u'Hit maximum number of attempts (%(max)s) for queue '
 1001                    u'"%(queue)s" under project %(project)s')
 1002 
 1003         LOG.warning(msgtmpl,
 1004                     dict(max=self.driver.mongodb_conf.max_attempts,
 1005                          queue=queue_name,
 1006                          project=project))
 1007 
 1008         raise errors.MessageConflict(queue_name, project)
 1009 
 1010 
 1011 def _is_claimed(msg, now):
 1012     return (msg['c']['id'] is not None and
 1013             msg['c']['e'] > now)
 1014 
 1015 
 1016 def _basic_message(msg, now):
 1017     oid = msg['_id']
 1018     age = now - utils.oid_ts(oid)
 1019     res = {
 1020         'id': str(oid),
 1021         'age': int(age),
 1022         'ttl': msg['t'],
 1023         'claim_count': msg['c'].get('c', 0),
 1024         'body': msg['b'],
 1025         'claim_id': str(msg['c']['id']) if msg['c']['id'] else None
 1026         }
 1027     if msg.get('cs'):
 1028         res['checksum'] = msg.get('cs')
 1029 
 1030     return res
 1031 
 1032 
 1033 class MessageQueueHandler(object):
 1034 
 1035     def __init__(self, driver, control_driver):
 1036         self.driver = driver
 1037         self._cache = self.driver.cache
 1038         self.queue_controller = self.driver.queue_controller
 1039         self.message_controller = self.driver.message_controller
 1040 
 1041     def delete(self, queue_name, project=None):
 1042         self.message_controller._purge_queue(queue_name, project)
 1043 
 1044     @utils.raises_conn_error
 1045     @utils.retries_on_autoreconnect
 1046     def stats(self, name, project=None):
 1047         if not self.queue_controller.exists(name, project=project):
 1048             raise errors.QueueDoesNotExist(name, project)
 1049 
 1050         controller = self.message_controller
 1051 
 1052         active = controller._count(name, project=project,
 1053                                    include_claimed=False)
 1054 
 1055         total = controller._count(name, project=project,
 1056                                   include_claimed=True)
 1057 
 1058         message_stats = {
 1059             'claimed': total - active,
 1060             'free': active,
 1061             'total': total,
 1062         }
 1063 
 1064         try:
 1065             oldest = controller.first(name, project=project, sort=1)
 1066             newest = controller.first(name, project=project, sort=-1)
 1067         except errors.QueueIsEmpty:
 1068             pass
 1069         else:
 1070             now = timeutils.utcnow_ts()
 1071             message_stats['oldest'] = utils.stat_message(oldest, now)
 1072             message_stats['newest'] = utils.stat_message(newest, now)
 1073 
 1074         return {'messages': message_stats}
 1075 
 1076 
 1077 def _get_scoped_query(name, project):
 1078     return {'p_q': utils.scope_queue_name(name, project)}