"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-9.0.0/zaqar/storage/mongodb/messages.py" (16 Oct 2019, 42517 Bytes) of package /linux/misc/openstack/zaqar-9.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "messages.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 7.0.0_vs_8.0.0.

    1 # Copyright (c) 2013 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 """Implements MongoDB the storage controller for messages.
   17 
   18 Field Mappings:
   19     In order to reduce the disk / memory space used,
   20     field names will be, most of the time, the first
   21     letter of their long name.
   22 """
   23 
   24 import datetime
   25 import time
   26 
   27 from bson import errors as bsonerror
   28 from bson import objectid
   29 from oslo_log import log as logging
   30 from oslo_utils import timeutils
   31 import pymongo.errors
   32 import pymongo.read_preferences
   33 
   34 from zaqar.i18n import _
   35 from zaqar import storage
   36 from zaqar.storage import errors
   37 from zaqar.storage.mongodb import utils
   38 from zaqar.storage import utils as s_utils
   39 
   40 
   41 LOG = logging.getLogger(__name__)
   42 
   43 # NOTE(kgriffs): This value, in seconds, should be at least less than the
   44 # minimum allowed TTL for messages (60 seconds). Make it 45 to allow for
   45 # some fudge room.
   46 MAX_RETRY_POST_DURATION = 45
   47 
   48 # NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang
   49 # for more than 5 seconds, without a single one being able to succeed in
   50 # posting some messages and incrementing the counter, thus allowing the other
   51 # producers to succeed in turn.
   52 COUNTER_STALL_WINDOW = 5
   53 
   54 # For hinting
   55 ID_INDEX_FIELDS = [('_id', 1)]
   56 
   57 # For removing expired messages
   58 TTL_INDEX_FIELDS = [
   59     ('e', 1),
   60 ]
   61 
   62 # NOTE(cpp-cabrera): to unify use of project/queue across mongodb
   63 # storage impls.
   64 PROJ_QUEUE = utils.PROJ_QUEUE_KEY
   65 
   66 # NOTE(kgriffs): This index is for listing messages, usually
   67 # filtering out claimed ones.
   68 ACTIVE_INDEX_FIELDS = [
   69     (PROJ_QUEUE, 1),  # Project will be unique, so put first
   70     ('k', 1),  # Used for sorting and paging, must come before range queries
   71     ('c.e', 1),  # Used for filtering out claimed messages
   72 
   73     # NOTE(kgriffs): We do not include 'u' and 'tx' here on
   74     # purpose. It was found experimentally that adding 'u' did
   75     # not improve performance, and so it was left out in order
   76     # to reduce index size and make updating the index
   77     # faster. When 'tx' was added, it was assumed that it would
   78     # follow a similar performance pattern to 'u', since by
   79     # the time you traverse the index down past the fields
   80     # listed above, there is very little left to scan, esp.
   81     # considering all queries are limited (limit=) to a fairly
   82     # small number.
   83     #
   84     # TODO(kgriffs): The extrapolation wrt 'tx' needs to be
   85     # proven empirically.
   86 ]
   87 
   88 # For counting
   89 COUNTING_INDEX_FIELDS = [
   90     (PROJ_QUEUE, 1),  # Project will be unique, so put first
   91     ('c.e', 1),  # Used for filtering out claimed messages
   92 ]
   93 
   94 # Index used for claims
   95 CLAIMED_INDEX_FIELDS = [
   96     (PROJ_QUEUE, 1),
   97     ('c.id', 1),
   98     ('k', 1),
   99     ('c.e', 1),
  100 ]
  101 
  102 # This index is meant to be used as a shard-key and to ensure
  103 # uniqueness for markers.
  104 #
  105 # As for other compound indexes, order matters. The marker `k`
  106 # gives enough cardinality to ensure chunks are evenly distributed,
  107 # whereas the `p_q` field helps keeping chunks from the same project
  108 # and queue together.
  109 #
  110 # In a sharded environment, uniqueness of this index is still guaranteed
  111 # because it's used as a shard key.
  112 MARKER_INDEX_FIELDS = [
  113     ('k', 1),
  114     (PROJ_QUEUE, 1),
  115 ]
  116 
  117 TRANSACTION_INDEX_FIELDS = [
  118     ('tx', 1),
  119 ]
  120 
  121 
  122 class MessageController(storage.Message):
  123     """Implements message resource operations using MongoDB.
  124 
  125     Messages are scoped by project + queue.
  126 
  127     ::
  128 
  129         Messages:
  130             Name                Field
  131             -------------------------
  132             scope            ->   p_q
  133             ttl              ->     t
  134             expires          ->     e
  135             marker           ->     k
  136             body             ->     b
  137             claim            ->     c
  138             client uuid      ->     u
  139             transaction      ->    tx
  140             delay            ->     d
  141             checksum         ->    cs
  142     """
  143 
  144     def __init__(self, *args, **kwargs):
  145         super(MessageController, self).__init__(*args, **kwargs)
  146 
  147         # Cache for convenience and performance
  148         self._num_partitions = self.driver.mongodb_conf.partitions
  149         self._queue_ctrl = self.driver.queue_controller
  150         self._retry_range = range(self.driver.mongodb_conf.max_attempts)
  151 
  152         # Create a list of 'messages' collections, one for each database
  153         # partition, ordered by partition number.
  154         #
  155         # NOTE(kgriffs): Order matters, since it is used to lookup the
  156         # collection by partition number. For example, self._collections[2]
  157         # would provide access to zaqar_p2.messages (partition numbers are
  158         # zero-based).
  159         self._collections = [db.messages
  160                              for db in self.driver.message_databases]
  161 
  162         # Ensure indexes are initialized before any queries are performed
  163         for collection in self._collections:
  164             self._ensure_indexes(collection)
  165 
  166     # ----------------------------------------------------------------------
  167     # Helpers
  168     # ----------------------------------------------------------------------
  169 
  170     def _ensure_indexes(self, collection):
  171         """Ensures that all indexes are created."""
  172 
  173         collection.ensure_index(TTL_INDEX_FIELDS,
  174                                 name='ttl',
  175                                 expireAfterSeconds=0,
  176                                 background=True)
  177 
  178         collection.ensure_index(ACTIVE_INDEX_FIELDS,
  179                                 name='active',
  180                                 background=True)
  181 
  182         collection.ensure_index(CLAIMED_INDEX_FIELDS,
  183                                 name='claimed',
  184                                 background=True)
  185 
  186         collection.ensure_index(COUNTING_INDEX_FIELDS,
  187                                 name='counting',
  188                                 background=True)
  189 
  190         collection.ensure_index(MARKER_INDEX_FIELDS,
  191                                 name='queue_marker',
  192                                 background=True)
  193 
  194         collection.ensure_index(TRANSACTION_INDEX_FIELDS,
  195                                 name='transaction',
  196                                 background=True)
  197 
  198     def _collection(self, queue_name, project=None):
  199         """Get a partitioned collection instance."""
  200         return self._collections[utils.get_partition(self._num_partitions,
  201                                                      queue_name, project)]
  202 
  203     def _backoff_sleep(self, attempt):
  204         """Sleep between retries using a jitter algorithm.
  205 
  206         Mitigates thrashing between multiple parallel requests, and
  207         creates backpressure on clients to slow down the rate
  208         at which they submit requests.
  209 
  210         :param attempt: current attempt number, zero-based
  211         """
  212         conf = self.driver.mongodb_conf
  213         seconds = utils.calculate_backoff(attempt, conf.max_attempts,
  214                                           conf.max_retry_sleep,
  215                                           conf.max_retry_jitter)
  216 
  217         time.sleep(seconds)
  218 
  219     def _purge_queue(self, queue_name, project=None):
  220         """Removes all messages from the queue.
  221 
  222         Warning: Only use this when deleting the queue; otherwise
  223         you can cause a side-effect of reseting the marker counter
  224         which can cause clients to miss tons of messages.
  225 
  226         If the queue does not exist, this method fails silently.
  227 
  228         :param queue_name: name of the queue to purge
  229         :param project: ID of the project to which the queue belongs
  230         """
  231         scope = utils.scope_queue_name(queue_name, project)
  232         collection = self._collection(queue_name, project)
  233         collection.delete_many({PROJ_QUEUE: scope})
  234 
  235     def _list(self, queue_name, project=None, marker=None,
  236               echo=False, client_uuid=None, projection=None,
  237               include_claimed=False, include_delayed=False,
  238               sort=1, limit=None):
  239         """Message document listing helper.
  240 
  241         :param queue_name: Name of the queue to list
  242         :param project: (Default None) Project `queue_name` belongs to. If
  243             not specified, queries the "global" namespace/project.
  244         :param marker: (Default None) Message marker from which to start
  245             iterating. If not specified, starts with the first message
  246             available in the queue.
  247         :param echo: (Default False) Whether to return messages that match
  248             client_uuid
  249         :param client_uuid: (Default None) UUID for the client that
  250             originated this request
  251         :param projection: (Default None) a list of field names that should be
  252             returned in the result set or a dict specifying the fields to
  253             include or exclude
  254         :param include_claimed: (Default False) Whether to include
  255             claimed messages, not just active ones
  256         :param include_delayed: (Default False) Whether to include
  257             delayed messages, not just active ones
  258         :param sort: (Default 1) Sort order for the listing. Pass 1 for
  259             ascending (oldest message first), or -1 for descending (newest
  260             message first).
  261         :param limit: (Default None) The maximum number of messages
  262             to list. The results may include fewer messages than the
  263             requested `limit` if not enough are available. If limit is
  264             not specified
  265 
  266         :returns: Generator yielding up to `limit` messages.
  267         """
  268 
  269         if sort not in (1, -1):
  270             raise ValueError(u'sort must be either 1 (ascending) '
  271                              u'or -1 (descending)')
  272 
  273         now = timeutils.utcnow_ts()
  274 
  275         query = {
  276             # Messages must belong to this queue and project.
  277             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  278 
  279             # NOTE(kgriffs): Messages must be finalized (i.e., must not
  280             # be part of an unfinalized transaction).
  281             #
  282             # See also the note wrt 'tx' within the definition
  283             # of ACTIVE_INDEX_FIELDS.
  284             'tx': None,
  285         }
  286 
  287         if not echo:
  288             query['u'] = {'$ne': client_uuid}
  289 
  290         if marker is not None:
  291             query['k'] = {'$gt': marker}
  292 
  293         collection = self._collection(queue_name, project)
  294 
  295         if not include_claimed:
  296             # Only include messages that are not part of
  297             # any claim, or are part of an expired claim.
  298             query['c.e'] = {'$lte': now}
  299 
  300         if not include_delayed:
  301             # NOTE(cdyangzhenyu): Only include messages that are not
  302             # part of any delay, or are part of an expired delay. if
  303             # the message has no attribute 'd', it will also be obtained.
  304             # This is for compatibility with old data.
  305             query['$or'] = [{'d': {'$lte': now}},
  306                             {'d': {'$exists': False}}]
  307 
  308         # Construct the request
  309         cursor = collection.find(query,
  310                                  projection=projection,
  311                                  sort=[('k', sort)])
  312 
  313         if limit is not None:
  314             cursor.limit(limit)
  315 
  316         # NOTE(flaper87): Suggest the index to use for this query to
  317         # ensure the most performant one is chosen.
  318         return cursor.hint(ACTIVE_INDEX_FIELDS)
  319 
  320     # ----------------------------------------------------------------------
  321     # "Friends" interface
  322     # ----------------------------------------------------------------------
  323 
  324     def _count(self, queue_name, project=None, include_claimed=False):
  325         """Return total number of messages in a queue.
  326 
  327         This method is designed to very quickly count the number
  328         of messages in a given queue. Expired messages are not
  329         counted, of course. If the queue does not exist, the
  330         count will always be 0.
  331 
  332         Note: Some expired messages may be included in the count if
  333             they haven't been GC'd yet. This is done for performance.
  334         """
  335         query = {
  336             # Messages must belong to this queue and project.
  337             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  338 
  339             # NOTE(kgriffs): Messages must be finalized (i.e., must not
  340             # be part of an unfinalized transaction).
  341             #
  342             # See also the note wrt 'tx' within the definition
  343             # of ACTIVE_INDEX_FIELDS.
  344             'tx': None,
  345         }
  346 
  347         if not include_claimed:
  348             # Exclude messages that are claimed
  349             query['c.e'] = {'$lte': timeutils.utcnow_ts()}
  350 
  351         collection = self._collection(queue_name, project)
  352         return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS)
  353 
  354     def _active(self, queue_name, marker=None, echo=False,
  355                 client_uuid=None, projection=None, project=None,
  356                 limit=None, include_delayed=False):
  357 
  358         return self._list(queue_name, project=project, marker=marker,
  359                           echo=echo, client_uuid=client_uuid,
  360                           projection=projection, include_claimed=False,
  361                           include_delayed=include_delayed, limit=limit)
  362 
  363     def _claimed(self, queue_name, claim_id,
  364                  expires=None, limit=None, project=None):
  365 
  366         if claim_id is None:
  367             claim_id = {'$ne': None}
  368 
  369         query = {
  370             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  371             'c.id': claim_id,
  372             'c.e': {'$gt': expires or timeutils.utcnow_ts()},
  373         }
  374 
  375         kwargs = {}
  376         collection = self._collection(queue_name, project)
  377 
  378         # NOTE(kgriffs): Claimed messages bust be queried from
  379         # the primary to avoid a race condition caused by the
  380         # multi-phased "create claim" algorithm.
  381         # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
  382         # `read_preference` is read only. We'd need to set it when the
  383         # client is created.
  384         msgs = collection.find(query, sort=[('k', 1)], **kwargs).hint(
  385             CLAIMED_INDEX_FIELDS)
  386 
  387         if limit is not None:
  388             msgs = msgs.limit(limit)
  389 
  390         now = timeutils.utcnow_ts()
  391 
  392         def denormalizer(msg):
  393             doc = _basic_message(msg, now)
  394             doc['claim'] = msg['c']
  395 
  396             return doc
  397 
  398         return utils.HookedCursor(msgs, denormalizer)
  399 
  400     def _unclaim(self, queue_name, claim_id, project=None):
  401         cid = utils.to_oid(claim_id)
  402 
  403         # NOTE(cpp-cabrera): early abort - avoid a DB query if we're handling
  404         # an invalid ID
  405         if cid is None:
  406             return
  407 
  408         # NOTE(cpp-cabrera):  unclaim by setting the claim ID to None
  409         # and the claim expiration time to now
  410         now = timeutils.utcnow_ts()
  411         scope = utils.scope_queue_name(queue_name, project)
  412         collection = self._collection(queue_name, project)
  413 
  414         collection.update_many({PROJ_QUEUE: scope, 'c.id': cid},
  415                                {'$set': {'c': {'id': None, 'e': now}}},
  416                                upsert=False)
  417 
  418     def _inc_counter(self, queue_name, project=None, amount=1, window=None):
  419         """Increments the message counter and returns the new value.
  420 
  421         :param queue_name: Name of the queue to which the counter is scoped
  422         :param project: Queue's project name
  423         :param amount: (Default 1) Amount by which to increment the counter
  424         :param window: (Default None) A time window, in seconds, that
  425             must have elapsed since the counter was last updated, in
  426             order to increment the counter.
  427 
  428         :returns: Updated message counter value, or None if window
  429             was specified, and the counter has already been updated
  430             within the specified time period.
  431 
  432         :raises QueueDoesNotExist: if not found
  433         """
  434 
  435         # NOTE(flaper87): If this `if` is True, it means we're
  436         # using a mongodb in the control plane. To avoid breaking
  437         # environments doing so already, we'll keep using the counter
  438         # in the mongodb queue_controller rather than the one in the
  439         # message_controller. This should go away, eventually
  440         if hasattr(self._queue_ctrl, '_inc_counter'):
  441             return self._queue_ctrl._inc_counter(queue_name, project,
  442                                                  amount, window)
  443 
  444         now = timeutils.utcnow_ts()
  445 
  446         update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
  447         query = _get_scoped_query(queue_name, project)
  448         if window is not None:
  449             threshold = now - window
  450             query['c.t'] = {'$lt': threshold}
  451 
  452         while True:
  453             try:
  454                 collection = self._collection(queue_name, project).stats
  455                 doc = collection.find_one_and_update(
  456                     query, update,
  457                     return_document=pymongo.ReturnDocument.AFTER,
  458                     projection={'c.v': 1, '_id': 0})
  459 
  460                 break
  461             except pymongo.errors.AutoReconnect as ex:
  462                 LOG.exception(ex)
  463 
  464         if doc is None:
  465             if window is None:
  466                 # NOTE(kgriffs): Since we did not filter by a time window,
  467                 # the queue should have been found and updated. Perhaps
  468                 # the queue has been deleted?
  469                 message = (u'Failed to increment the message '
  470                            u'counter for queue %(name)s and '
  471                            u'project %(project)s')
  472                 message %= dict(name=queue_name, project=project)
  473 
  474                 LOG.warning(message)
  475 
  476                 raise errors.QueueDoesNotExist(queue_name, project)
  477 
  478             # NOTE(kgriffs): Assume the queue existed, but the counter
  479             # was recently updated, causing the range query on 'c.t' to
  480             # exclude the record.
  481             return None
  482 
  483         return doc['c']['v']
  484 
  485     def _get_counter(self, queue_name, project=None):
  486         """Retrieves the current message counter value for a given queue.
  487 
  488         This helper is used to generate monotonic pagination
  489         markers that are saved as part of the message
  490         document.
  491 
  492         Note 1: Markers are scoped per-queue and so are *not*
  493             globally unique or globally ordered.
  494 
  495         Note 2: If two or more requests to this method are made
  496             in parallel, this method will return the same counter
  497             value. This is done intentionally so that the caller
  498             can detect a parallel message post, allowing it to
  499             mitigate race conditions between producer and
  500             observer clients.
  501 
  502         :param queue_name: Name of the queue to which the counter is scoped
  503         :param project: Queue's project
  504         :returns: current message counter as an integer
  505         """
  506 
  507         # NOTE(flaper87): If this `if` is True, it means we're
  508         # using a mongodb in the control plane. To avoid breaking
  509         # environments doing so already, we'll keep using the counter
  510         # in the mongodb queue_controller rather than the one in the
  511         # message_controller. This should go away, eventually
  512         if hasattr(self._queue_ctrl, '_get_counter'):
  513             return self._queue_ctrl._get_counter(queue_name, project)
  514 
  515         update = {'$inc': {'c.v': 0, 'c.t': 0}}
  516         query = _get_scoped_query(queue_name, project)
  517 
  518         try:
  519             collection = self._collection(queue_name, project).stats
  520             doc = collection.find_one_and_update(
  521                 query, update, upsert=True,
  522                 return_document=pymongo.ReturnDocument.AFTER,
  523                 projection={'c.v': 1, '_id': 0})
  524 
  525             return doc['c']['v']
  526         except pymongo.errors.AutoReconnect as ex:
  527             LOG.exception(ex)
  528 
  529     # ----------------------------------------------------------------------
  530     # Public interface
  531     # ----------------------------------------------------------------------
  532 
  533     def list(self, queue_name, project=None, marker=None,
  534              limit=storage.DEFAULT_MESSAGES_PER_PAGE,
  535              echo=False, client_uuid=None, include_claimed=False,
  536              include_delayed=False):
  537 
  538         if marker is not None:
  539             try:
  540                 marker = int(marker)
  541             except ValueError:
  542                 yield iter([])
  543 
  544         messages = self._list(queue_name, project=project, marker=marker,
  545                               client_uuid=client_uuid, echo=echo,
  546                               include_claimed=include_claimed,
  547                               include_delayed=include_delayed, limit=limit)
  548 
  549         marker_id = {}
  550 
  551         now = timeutils.utcnow_ts()
  552 
  553         # NOTE (kgriffs) @utils.raises_conn_error not needed on this
  554         # function, since utils.HookedCursor already has it.
  555         def denormalizer(msg):
  556             marker_id['next'] = msg['k']
  557 
  558             return _basic_message(msg, now)
  559 
  560         yield utils.HookedCursor(messages, denormalizer)
  561         yield str(marker_id['next'])
  562 
  563     @utils.raises_conn_error
  564     @utils.retries_on_autoreconnect
  565     def first(self, queue_name, project=None, sort=1):
  566         cursor = self._list(queue_name, project=project,
  567                             include_claimed=True, sort=sort,
  568                             limit=1)
  569         try:
  570             message = next(cursor)
  571         except StopIteration:
  572             raise errors.QueueIsEmpty(queue_name, project)
  573 
  574         now = timeutils.utcnow_ts()
  575         return _basic_message(message, now)
  576 
  577     @utils.raises_conn_error
  578     @utils.retries_on_autoreconnect
  579     def get(self, queue_name, message_id, project=None):
  580         mid = utils.to_oid(message_id)
  581         if mid is None:
  582             raise errors.MessageDoesNotExist(message_id, queue_name,
  583                                              project)
  584 
  585         now = timeutils.utcnow_ts()
  586 
  587         query = {
  588             '_id': mid,
  589             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  590         }
  591 
  592         collection = self._collection(queue_name, project)
  593         message = list(collection.find(query).limit(1).hint(ID_INDEX_FIELDS))
  594 
  595         if not message:
  596             raise errors.MessageDoesNotExist(message_id, queue_name,
  597                                              project)
  598 
  599         return _basic_message(message[0], now)
  600 
  601     @utils.raises_conn_error
  602     @utils.retries_on_autoreconnect
  603     def bulk_get(self, queue_name, message_ids, project=None):
  604         message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
  605         if not message_ids:
  606             return iter([])
  607 
  608         now = timeutils.utcnow_ts()
  609 
  610         # Base query, always check expire time
  611         query = {
  612             '_id': {'$in': message_ids},
  613             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  614         }
  615 
  616         collection = self._collection(queue_name, project)
  617 
  618         # NOTE(flaper87): Should this query
  619         # be sorted?
  620         messages = collection.find(query).hint(ID_INDEX_FIELDS)
  621 
  622         def denormalizer(msg):
  623             return _basic_message(msg, now)
  624 
  625         return utils.HookedCursor(messages, denormalizer)
  626 
  627     @utils.raises_conn_error
  628     @utils.retries_on_autoreconnect
  629     def post(self, queue_name, messages, client_uuid, project=None):
  630         # NOTE(flaper87): This method should be safe to retry on
  631         # autoreconnect, since we've a 2-step insert for messages.
  632         # The worst-case scenario is that we'll increase the counter
  633         # several times and we'd end up with some non-active messages.
  634 
  635         if not self._queue_ctrl.exists(queue_name, project):
  636             raise errors.QueueDoesNotExist(queue_name, project)
  637 
  638         # NOTE(flaper87): Make sure the counter exists. This method
  639         # is an upsert.
  640         self._get_counter(queue_name, project)
  641         now = timeutils.utcnow_ts()
  642         now_dt = datetime.datetime.utcfromtimestamp(now)
  643         collection = self._collection(queue_name, project)
  644 
  645         messages = list(messages)
  646         msgs_n = len(messages)
  647         next_marker = self._inc_counter(queue_name,
  648                                         project,
  649                                         amount=msgs_n) - msgs_n
  650 
  651         prepared_messages = []
  652         for index, message in enumerate(messages):
  653             msg = {
  654                 PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  655                 't': message['ttl'],
  656                 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
  657                 'u': client_uuid,
  658                 'c': {'id': None, 'e': now, 'c': 0},
  659                 'd': now + message.get('delay', 0),
  660                 'b': message['body'] if 'body' in message else {},
  661                 'k': next_marker + index,
  662                 'tx': None
  663                 }
  664             if self.driver.conf.enable_checksum:
  665                 msg['cs'] = s_utils.get_checksum(message.get('body', None))
  666 
  667             prepared_messages.append(msg)
  668 
  669         res = collection.insert_many(prepared_messages,
  670                                      bypass_document_validation=True)
  671 
  672         return [str(id_) for id_ in res.inserted_ids]
  673 
  674     @utils.raises_conn_error
  675     @utils.retries_on_autoreconnect
  676     def delete(self, queue_name, message_id, project=None, claim=None):
  677         # NOTE(cpp-cabrera): return early - this is an invalid message
  678         # id so we won't be able to find it any way
  679         mid = utils.to_oid(message_id)
  680         if mid is None:
  681             return
  682 
  683         collection = self._collection(queue_name, project)
  684 
  685         query = {
  686             '_id': mid,
  687             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  688         }
  689 
  690         cid = utils.to_oid(claim)
  691         if cid is None:
  692             raise errors.ClaimDoesNotExist(claim, queue_name, project)
  693 
  694         now = timeutils.utcnow_ts()
  695         cursor = collection.find(query).hint(ID_INDEX_FIELDS)
  696 
  697         try:
  698             message = next(cursor)
  699         except StopIteration:
  700             return
  701 
  702         if claim is None:
  703             if _is_claimed(message, now):
  704                 raise errors.MessageIsClaimed(message_id)
  705 
  706         else:
  707             if message['c']['id'] != cid:
  708                 kwargs = {}
  709                 # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
  710                 # `read_preference` is read only. We'd need to set it when the
  711                 # client is created.
  712                 # NOTE(kgriffs): Read from primary in case the message
  713                 # was just barely claimed, and claim hasn't made it to
  714                 # the secondary.
  715                 message = collection.find_one(query, **kwargs)
  716 
  717                 if message['c']['id'] != cid:
  718                     if _is_claimed(message, now):
  719                         raise errors.MessageNotClaimedBy(message_id, claim)
  720 
  721                     raise errors.MessageNotClaimed(message_id)
  722 
  723         collection.delete_one(query)
  724 
  725     @utils.raises_conn_error
  726     @utils.retries_on_autoreconnect
  727     def bulk_delete(self, queue_name, message_ids, project=None,
  728                     claim_ids=None):
  729         message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
  730         if claim_ids:
  731             claim_ids = [cid for cid in map(utils.to_oid, claim_ids) if cid]
  732         query = {
  733             '_id': {'$in': message_ids},
  734             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  735         }
  736 
  737         collection = self._collection(queue_name, project)
  738         if claim_ids:
  739             message_claim_ids = []
  740             messages = collection.find(query).hint(ID_INDEX_FIELDS)
  741             for message in messages:
  742                 message_claim_ids.append(message['c']['id'])
  743             for cid in claim_ids:
  744                 if cid not in message_claim_ids:
  745                     raise errors.ClaimDoesNotExist(cid, queue_name, project)
  746 
  747         collection.delete_many(query)
  748 
  749     @utils.raises_conn_error
  750     @utils.retries_on_autoreconnect
  751     def pop(self, queue_name, limit, project=None):
  752         query = {
  753             PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  754         }
  755 
  756         # Only include messages that are not part of
  757         # any claim, or are part of an expired claim.
  758         now = timeutils.utcnow_ts()
  759         query['c.e'] = {'$lte': now}
  760 
  761         collection = self._collection(queue_name, project)
  762         projection = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1}
  763 
  764         messages = (collection.find_one_and_delete(query,
  765                                                    projection=projection)
  766                     for _ in range(limit))
  767 
  768         final_messages = [_basic_message(message, now)
  769                           for message in messages
  770                           if message]
  771 
  772         return final_messages
  773 
  774 
  775 class FIFOMessageController(MessageController):
  776 
  777     def _ensure_indexes(self, collection):
  778         """Ensures that all indexes are created."""
  779 
  780         collection.ensure_index(TTL_INDEX_FIELDS,
  781                                 name='ttl',
  782                                 expireAfterSeconds=0,
  783                                 background=True)
  784 
  785         collection.ensure_index(ACTIVE_INDEX_FIELDS,
  786                                 name='active',
  787                                 background=True)
  788 
  789         collection.ensure_index(CLAIMED_INDEX_FIELDS,
  790                                 name='claimed',
  791                                 background=True)
  792 
  793         collection.ensure_index(COUNTING_INDEX_FIELDS,
  794                                 name='counting',
  795                                 background=True)
  796 
  797         # NOTE(kgriffs): This index must be unique so that
  798         # inserting a message with the same marker to the
  799         # same queue will fail; this is used to detect a
  800         # race condition which can cause an observer client
  801         # to miss a message when there is more than one
  802         # producer posting messages to the same queue, in
  803         # parallel.
  804         collection.ensure_index(MARKER_INDEX_FIELDS,
  805                                 name='queue_marker',
  806                                 unique=True,
  807                                 background=True)
  808 
  809         collection.ensure_index(TRANSACTION_INDEX_FIELDS,
  810                                 name='transaction',
  811                                 background=True)
  812 
  813     @utils.raises_conn_error
  814     @utils.retries_on_autoreconnect
  815     def post(self, queue_name, messages, client_uuid, project=None):
  816         # NOTE(flaper87): This method should be safe to retry on
  817         # autoreconnect, since we've a 2-step insert for messages.
  818         # The worst-case scenario is that we'll increase the counter
  819         # several times and we'd end up with some non-active messages.
  820 
  821         if not self._queue_ctrl.exists(queue_name, project):
  822             raise errors.QueueDoesNotExist(queue_name, project)
  823 
  824         # NOTE(flaper87): Make sure the counter exists. This method
  825         # is an upsert.
  826         self._get_counter(queue_name, project)
  827         now = timeutils.utcnow_ts()
  828         now_dt = datetime.datetime.utcfromtimestamp(now)
  829         collection = self._collection(queue_name, project)
  830 
  831         # Set the next basis marker for the first attempt.
  832         #
  833         # Note that we don't increment the counter right away because
  834         # if 2 concurrent posts happen and the one with the higher counter
  835         # ends before the one with the lower counter, there's a window
  836         # where a client paging through the queue may get the messages
  837         # with the higher counter and skip the previous ones. This would
  838         # make our FIFO guarantee unsound.
  839         next_marker = self._get_counter(queue_name, project)
  840 
  841         # Unique transaction ID to facilitate atomic batch inserts
  842         transaction = objectid.ObjectId()
  843 
  844         prepared_messages = []
  845         for index, message in enumerate(messages):
  846             msg = {
  847                 PROJ_QUEUE: utils.scope_queue_name(queue_name, project),
  848                 't': message['ttl'],
  849                 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
  850                 'u': client_uuid,
  851                 'c': {'id': None, 'e': now, 'c': 0},
  852                 'd': now + message.get('delay', 0),
  853                 'b': message['body'] if 'body' in message else {},
  854                 'k': next_marker + index,
  855                 'tx': None
  856                 }
  857             if self.driver.conf.enable_checksum:
  858                 msg['cs'] = s_utils.get_checksum(message.get('body', None))
  859 
  860             prepared_messages.append(msg)
  861 
  862         # NOTE(kgriffs): Don't take the time to do a 2-phase insert
  863         # if there is no way for it to partially succeed.
  864         if len(prepared_messages) == 1:
  865             transaction = None
  866             prepared_messages[0]['tx'] = None
  867 
  868         # Use a retry range for sanity, although we expect
  869         # to rarely, if ever, reach the maximum number of
  870         # retries.
  871         #
  872         # NOTE(kgriffs): With the default configuration (100 ms
  873         # max sleep, 1000 max attempts), the max stall time
  874         # before the operation is abandoned is 49.95 seconds.
  875         for attempt in self._retry_range:
  876             try:
  877                 res = collection.insert_many(prepared_messages,
  878                                              bypass_document_validation=True)
  879 
  880                 # Log a message if we retried, for debugging perf issues
  881                 if attempt != 0:
  882                     msgtmpl = _(u'%(attempts)d attempt(s) required to post '
  883                                 u'%(num_messages)d messages to queue '
  884                                 u'"%(queue)s" under project %(project)s')
  885 
  886                     LOG.debug(msgtmpl,
  887                               dict(queue=queue_name,
  888                                    attempts=attempt + 1,
  889                                    num_messages=len(res.inserted_ids),
  890                                    project=project))
  891 
  892                 # Update the counter in preparation for the next batch
  893                 #
  894                 # NOTE(kgriffs): Due to the unique index on the messages
  895                 # collection, competing inserts will fail as a whole,
  896                 # and keep retrying until the counter is incremented
  897                 # such that the competing marker's will start at a
  898                 # unique number, 1 past the max of the messages just
  899                 # inserted above.
  900                 self._inc_counter(queue_name, project,
  901                                   amount=len(res.inserted_ids))
  902 
  903                 # NOTE(kgriffs): Finalize the insert once we can say that
  904                 # all the messages made it. This makes bulk inserts
  905                 # atomic, assuming queries filter out any non-finalized
  906                 # messages.
  907                 if transaction is not None:
  908                     collection.update_many({'tx': transaction},
  909                                            {'$set': {'tx': None}},
  910                                            upsert=False)
  911 
  912                 return [str(id_) for id_ in res.inserted_ids]
  913 
  914             except (pymongo.errors.DuplicateKeyError,
  915                     pymongo.errors.BulkWriteError) as ex:
  916                 # TODO(kgriffs): Record stats of how often retries happen,
  917                 # and how many attempts, on average, are required to insert
  918                 # messages.
  919 
  920                 # NOTE(kgriffs): This can be used in conjunction with the
  921                 # log line, above, that is emitted after all messages have
  922                 # been posted, to gauge how long it is taking for messages
  923                 # to be posted to a given queue, or overall.
  924                 #
  925                 # TODO(kgriffs): Add transaction ID to help match up loglines
  926                 if attempt == 0:
  927                     msgtmpl = _(u'First attempt failed while '
  928                                 u'adding messages to queue '
  929                                 u'"%(queue)s" under project %(project)s')
  930 
  931                     LOG.debug(msgtmpl, dict(queue=queue_name, project=project))
  932 
  933                 # NOTE(kgriffs): Never retry past the point that competing
  934                 # messages expire and are GC'd, since once they are gone,
  935                 # the unique index no longer protects us from getting out
  936                 # of order, which could cause an observer to miss this
  937                 # message. The code below provides a sanity-check to ensure
  938                 # this situation can not happen.
  939                 elapsed = timeutils.utcnow_ts() - now
  940                 if elapsed > MAX_RETRY_POST_DURATION:
  941                     msgtmpl = (u'Exceeded maximum retry duration for queue '
  942                                u'"%(queue)s" under project %(project)s')
  943 
  944                     LOG.warning(msgtmpl,
  945                                 dict(queue=queue_name, project=project))
  946                     break
  947 
  948                 # Chill out for a moment to mitigate thrashing/thundering
  949                 self._backoff_sleep(attempt)
  950 
  951                 # NOTE(kgriffs): Perhaps we failed because a worker crashed
  952                 # after inserting messages, but before incrementing the
  953                 # counter; that would cause all future requests to stall,
  954                 # since they would keep getting the same base marker that is
  955                 # conflicting with existing messages, until the messages that
  956                 # "won" expire, at which time we would end up reusing markers,
  957                 # and that could make some messages invisible to an observer
  958                 # that is querying with a marker that is large than the ones
  959                 # being reused.
  960                 #
  961                 # To mitigate this, we apply a heuristic to determine whether
  962                 # a counter has stalled. We attempt to increment the counter,
  963                 # but only if it hasn't been updated for a few seconds, which
  964                 # should mean that nobody is left to update it!
  965                 #
  966                 # Note that we increment one at a time until the logjam is
  967                 # broken, since we don't know how many messages were posted
  968                 # by the worker before it crashed.
  969                 next_marker = self._inc_counter(
  970                     queue_name, project, window=COUNTER_STALL_WINDOW)
  971 
  972                 # Retry the entire batch with a new sequence of markers.
  973                 #
  974                 # NOTE(kgriffs): Due to the unique index, and how
  975                 # MongoDB works with batch requests, we will never
  976                 # end up with a partially-successful update. The first
  977                 # document in the batch will fail to insert, and the
  978                 # remainder of the documents will not be attempted.
  979                 if next_marker is None:
  980                     # NOTE(kgriffs): Usually we will end up here, since
  981                     # it should be rare that a counter becomes stalled.
  982                     next_marker = self._get_counter(
  983                         queue_name, project)
  984                 else:
  985                     msgtmpl = (u'Detected a stalled message counter '
  986                                u'for queue "%(queue)s" under '
  987                                u'project %(project)s.'
  988                                u'The counter was incremented to %(value)d.')
  989 
  990                     LOG.warning(msgtmpl,
  991                                 dict(queue=queue_name,
  992                                      project=project,
  993                                      value=next_marker))
  994 
  995                 for index, message in enumerate(prepared_messages):
  996                     message['k'] = next_marker + index
  997             except bsonerror.InvalidDocument as ex:
  998                 LOG.exception(ex)
  999                 raise
 1000             except Exception as ex:
 1001                 LOG.exception(ex)
 1002                 raise
 1003 
 1004         msgtmpl = (u'Hit maximum number of attempts (%(max)s) for queue '
 1005                    u'"%(queue)s" under project %(project)s')
 1006 
 1007         LOG.warning(msgtmpl,
 1008                     dict(max=self.driver.mongodb_conf.max_attempts,
 1009                          queue=queue_name,
 1010                          project=project))
 1011 
 1012         raise errors.MessageConflict(queue_name, project)
 1013 
 1014 
 1015 def _is_claimed(msg, now):
 1016     return (msg['c']['id'] is not None and
 1017             msg['c']['e'] > now)
 1018 
 1019 
 1020 def _basic_message(msg, now):
 1021     oid = msg['_id']
 1022     age = now - utils.oid_ts(oid)
 1023     res = {
 1024         'id': str(oid),
 1025         'age': int(age),
 1026         'ttl': msg['t'],
 1027         'claim_count': msg['c'].get('c', 0),
 1028         'body': msg['b'],
 1029         'claim_id': str(msg['c']['id']) if msg['c']['id'] else None
 1030         }
 1031     if msg.get('cs'):
 1032         res['checksum'] = msg.get('cs')
 1033 
 1034     return res
 1035 
 1036 
 1037 class MessageQueueHandler(object):
 1038 
 1039     def __init__(self, driver, control_driver):
 1040         self.driver = driver
 1041         self._cache = self.driver.cache
 1042         self.queue_controller = self.driver.queue_controller
 1043         self.message_controller = self.driver.message_controller
 1044 
 1045     def delete(self, queue_name, project=None):
 1046         self.message_controller._purge_queue(queue_name, project)
 1047 
 1048     @utils.raises_conn_error
 1049     @utils.retries_on_autoreconnect
 1050     def stats(self, name, project=None):
 1051         if not self.queue_controller.exists(name, project=project):
 1052             raise errors.QueueDoesNotExist(name, project)
 1053 
 1054         controller = self.message_controller
 1055 
 1056         active = controller._count(name, project=project,
 1057                                    include_claimed=False)
 1058 
 1059         total = controller._count(name, project=project,
 1060                                   include_claimed=True)
 1061 
 1062         message_stats = {
 1063             'claimed': total - active,
 1064             'free': active,
 1065             'total': total,
 1066         }
 1067 
 1068         try:
 1069             oldest = controller.first(name, project=project, sort=1)
 1070             newest = controller.first(name, project=project, sort=-1)
 1071         except errors.QueueIsEmpty:
 1072             pass
 1073         else:
 1074             now = timeutils.utcnow_ts()
 1075             message_stats['oldest'] = utils.stat_message(oldest, now)
 1076             message_stats['newest'] = utils.stat_message(newest, now)
 1077 
 1078         return {'messages': message_stats}
 1079 
 1080 
 1081 def _get_scoped_query(name, project):
 1082     return {'p_q': utils.scope_queue_name(name, project)}