"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/mongodb/topic_messages.py" (13 May 2020, 38537 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "topic_messages.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 """Implements MongoDB the storage controller for messages.
   17 
   18 Field Mappings:
   19     In order to reduce the disk / memory space used,
   20     field names will be, most of the time, the first
   21     letter of their long name.
   22 """
   23 
   24 import datetime
   25 import time
   26 
   27 from bson import objectid
   28 from oslo_log import log as logging
   29 from oslo_utils import timeutils
   30 import pymongo.errors
   31 import pymongo.read_preferences
   32 
   33 from zaqar.i18n import _
   34 from zaqar import storage
   35 from zaqar.storage import errors
   36 from zaqar.storage.mongodb import utils
   37 from zaqar.storage import utils as s_utils
   38 
   39 
   40 LOG = logging.getLogger(__name__)
   41 
   42 # NOTE(kgriffs): This value, in seconds, should be at least less than the
   43 # minimum allowed TTL for messages (60 seconds). Make it 45 to allow for
   44 # some fudge room.
   45 MAX_RETRY_POST_DURATION = 45
   46 
   47 # NOTE(kgriffs): It is extremely unlikely that all workers would somehow hang
   48 # for more than 5 seconds, without a single one being able to succeed in
   49 # posting some messages and incrementing the counter, thus allowing the other
   50 # producers to succeed in turn.
   51 COUNTER_STALL_WINDOW = 5
   52 
   53 # For hinting
   54 ID_INDEX_FIELDS = [('_id', 1)]
   55 
   56 # For removing expired messages
   57 TTL_INDEX_FIELDS = [
   58     ('e', 1),
   59 ]
   60 
   61 # to unify use of project/topic across mongodb
   62 # storage impls.
   63 PROJ_TOPIC = utils.PROJ_TOPIC_KEY
   64 
   65 # NOTE(kgriffs): This index is for listing messages, usually
   66 # filtering out claimed ones.
   67 ACTIVE_INDEX_FIELDS = [
   68     (PROJ_TOPIC, 1),  # Project will be unique, so put first
   69     ('k', 1),  # Used for sorting and paging, must come before range queries
   70 ]
   71 
   72 # For counting
   73 COUNTING_INDEX_FIELDS = [
   74     (PROJ_TOPIC, 1),  # Project will be unique, so put first
   75 ]
   76 
   77 # This index is meant to be used as a shard-key and to ensure
   78 # uniqueness for markers.
   79 #
   80 # As for other compound indexes, order matters. The marker `k`
   81 # gives enough cardinality to ensure chunks are evenly distributed,
   82 # whereas the `p_q` field helps keeping chunks from the same project
   83 # and queue together.
   84 #
   85 # In a sharded environment, uniqueness of this index is still guaranteed
   86 # because it's used as a shard key.
   87 MARKER_INDEX_FIELDS = [
   88     ('k', 1),
   89     (PROJ_TOPIC, 1),
   90 ]
   91 
   92 TRANSACTION_INDEX_FIELDS = [
   93     ('tx', 1),
   94 ]
   95 
   96 
   97 class MessageController(storage.Message):
   98     """Implements message resource operations using MongoDB.
   99 
  100     Messages are scoped by project + topic.
  101 
  102     ::
  103 
  104         Messages:
  105             Name                Field
  106             -------------------------
  107             scope            ->   p_t
  108             ttl              ->     t
  109             expires          ->     e
  110             marker           ->     k
  111             body             ->     b
  112             client uuid      ->     u
  113             transaction      ->    tx
  114             delay            ->     d
  115             checksum         ->    cs
  116     """
  117 
  118     def __init__(self, *args, **kwargs):
  119         super(MessageController, self).__init__(*args, **kwargs)
  120 
  121         # Cache for convenience and performance
  122         self._num_partitions = self.driver.mongodb_conf.partitions
  123         self._topic_ctrl = self.driver.topic_controller
  124         self._retry_range = range(self.driver.mongodb_conf.max_attempts)
  125 
  126         # Create a list of 'messages' collections, one for each database
  127         # partition, ordered by partition number.
  128         #
  129         # NOTE(kgriffs): Order matters, since it is used to lookup the
  130         # collection by partition number. For example, self._collections[2]
  131         # would provide access to zaqar_p2.messages (partition numbers are
  132         # zero-based).
  133         self._collections = [db.messages
  134                              for db in self.driver.message_databases]
  135 
  136         # Ensure indexes are initialized before any queries are performed
  137         for collection in self._collections:
  138             self._ensure_indexes(collection)
  139 
  140     # ----------------------------------------------------------------------
  141     # Helpers
  142     # ----------------------------------------------------------------------
  143 
  144     def _ensure_indexes(self, collection):
  145         """Ensures that all indexes are created."""
  146 
  147         collection.ensure_index(TTL_INDEX_FIELDS,
  148                                 name='ttl',
  149                                 expireAfterSeconds=0,
  150                                 background=True)
  151 
  152         collection.ensure_index(ACTIVE_INDEX_FIELDS,
  153                                 name='active',
  154                                 background=True)
  155 
  156         collection.ensure_index(COUNTING_INDEX_FIELDS,
  157                                 name='counting',
  158                                 background=True)
  159 
  160         collection.ensure_index(MARKER_INDEX_FIELDS,
  161                                 name='queue_marker',
  162                                 background=True)
  163 
  164         collection.ensure_index(TRANSACTION_INDEX_FIELDS,
  165                                 name='transaction',
  166                                 background=True)
  167 
  168     def _collection(self, topic_name, project=None):
  169         """Get a partitioned collection instance."""
  170         return self._collections[utils.get_partition(self._num_partitions,
  171                                                      topic_name, project)]
  172 
  173     def _backoff_sleep(self, attempt):
  174         """Sleep between retries using a jitter algorithm.
  175 
  176         Mitigates thrashing between multiple parallel requests, and
  177         creates backpressure on clients to slow down the rate
  178         at which they submit requests.
  179 
  180         :param attempt: current attempt number, zero-based
  181         """
  182         conf = self.driver.mongodb_conf
  183         seconds = utils.calculate_backoff(attempt, conf.max_attempts,
  184                                           conf.max_retry_sleep,
  185                                           conf.max_retry_jitter)
  186 
  187         time.sleep(seconds)
  188 
  189     def _purge_topic(self, topic_name, project=None):
  190         """Removes all messages from the queue.
  191 
  192         Warning: Only use this when deleting the queue; otherwise
  193         you can cause a side-effect of reseting the marker counter
  194         which can cause clients to miss tons of messages.
  195 
  196         If the queue does not exist, this method fails silently.
  197 
  198         :param topic_name: name of the queue to purge
  199         :param project: ID of the project to which the queue belongs
  200         """
  201         scope = utils.scope_queue_name(topic_name, project)
  202         collection = self._collection(topic_name, project)
  203         collection.delete_many({PROJ_TOPIC: scope})
  204 
  205     def _list(self, topic_name, project=None, marker=None,
  206               echo=False, client_uuid=None, projection=None,
  207               include_claimed=False, include_delayed=False,
  208               sort=1, limit=None):
  209         """Message document listing helper.
  210 
  211         :param topic_name: Name of the topic to list
  212         :param project: (Default None) Project `topic_name` belongs to. If
  213             not specified, queries the "global" namespace/project.
  214         :param marker: (Default None) Message marker from which to start
  215             iterating. If not specified, starts with the first message
  216             available in the topic.
  217         :param echo: (Default False) Whether to return messages that match
  218             client_uuid
  219         :param client_uuid: (Default None) UUID for the client that
  220             originated this request
  221         :param projection: (Default None) a list of field names that should be
  222             returned in the result set or a dict specifying the fields to
  223             include or exclude
  224         :param include_claimed: (Default False) Whether to include
  225             claimed messages, not just active ones
  226         :param include_delayed: (Default False) Whether to include
  227             delayed messages, not just active ones
  228         :param sort: (Default 1) Sort order for the listing. Pass 1 for
  229             ascending (oldest message first), or -1 for descending (newest
  230             message first).
  231         :param limit: (Default None) The maximum number of messages
  232             to list. The results may include fewer messages than the
  233             requested `limit` if not enough are available. If limit is
  234             not specified
  235 
  236         :returns: Generator yielding up to `limit` messages.
  237         """
  238 
  239         if sort not in (1, -1):
  240             raise ValueError(u'sort must be either 1 (ascending) '
  241                              u'or -1 (descending)')
  242 
  243         now = timeutils.utcnow_ts()
  244 
  245         query = {
  246             # Messages must belong to this topic and project.
  247             PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  248 
  249             # NOTE(kgriffs): Messages must be finalized (i.e., must not
  250             # be part of an unfinalized transaction).
  251             #
  252             # See also the note wrt 'tx' within the definition
  253             # of ACTIVE_INDEX_FIELDS.
  254             'tx': None,
  255         }
  256 
  257         if not echo:
  258             query['u'] = {'$ne': client_uuid}
  259 
  260         if marker is not None:
  261             query['k'] = {'$gt': marker}
  262 
  263         collection = self._collection(topic_name, project)
  264 
  265         if not include_delayed:
  266             # NOTE(cdyangzhenyu): Only include messages that are not
  267             # part of any delay, or are part of an expired delay. if
  268             # the message has no attribute 'd', it will also be obtained.
  269             # This is for compatibility with old data.
  270             query['$or'] = [{'d': {'$lte': now}},
  271                             {'d': {'$exists': False}}]
  272 
  273         # Construct the request
  274         cursor = collection.find(query,
  275                                  projection=projection,
  276                                  sort=[('k', sort)])
  277 
  278         if limit is not None:
  279             cursor.limit(limit)
  280 
  281         # NOTE(flaper87): Suggest the index to use for this query to
  282         # ensure the most performant one is chosen.
  283         return cursor.hint(ACTIVE_INDEX_FIELDS)
  284 
  285     # ----------------------------------------------------------------------
  286     # "Friends" interface
  287     # ----------------------------------------------------------------------
  288 
  289     def _count(self, topic_name, project=None, include_claimed=False):
  290         """Return total number of messages in a topic.
  291 
  292         This method is designed to very quickly count the number
  293         of messages in a given topic. Expired messages are not
  294         counted, of course. If the queue does not exist, the
  295         count will always be 0.
  296 
  297         Note: Some expired messages may be included in the count if
  298             they haven't been GC'd yet. This is done for performance.
  299         """
  300         query = {
  301             # Messages must belong to this queue and project.
  302             PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  303 
  304             # NOTE(kgriffs): Messages must be finalized (i.e., must not
  305             # be part of an unfinalized transaction).
  306             #
  307             # See also the note wrt 'tx' within the definition
  308             # of ACTIVE_INDEX_FIELDS.
  309             'tx': None,
  310         }
  311 
  312         collection = self._collection(topic_name, project)
  313         return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS)
  314 
  315     def _active(self, topic_name, marker=None, echo=False,
  316                 client_uuid=None, projection=None, project=None,
  317                 limit=None, include_delayed=False):
  318 
  319         return self._list(topic_name, project=project, marker=marker,
  320                           echo=echo, client_uuid=client_uuid,
  321                           projection=projection, include_claimed=False,
  322                           include_delayed=include_delayed, limit=limit)
  323 
  324     def _inc_counter(self, topic_name, project=None, amount=1, window=None):
  325         """Increments the message counter and returns the new value.
  326 
  327         :param topic_name: Name of the topic to which the counter is scoped
  328         :param project: Queue's project name
  329         :param amount: (Default 1) Amount by which to increment the counter
  330         :param window: (Default None) A time window, in seconds, that
  331             must have elapsed since the counter was last updated, in
  332             order to increment the counter.
  333 
  334         :returns: Updated message counter value, or None if window
  335             was specified, and the counter has already been updated
  336             within the specified time period.
  337 
  338         :raises QueueDoesNotExist: if not found
  339         """
  340 
  341         # NOTE(flaper87): If this `if` is True, it means we're
  342         # using a mongodb in the control plane. To avoid breaking
  343         # environments doing so already, we'll keep using the counter
  344         # in the mongodb topic_controller rather than the one in the
  345         # message_controller. This should go away, eventually
  346         if hasattr(self._topic_ctrl, '_inc_counter'):
  347             return self._topic_ctrl._inc_counter(topic_name, project,
  348                                                  amount, window)
  349 
  350         now = timeutils.utcnow_ts()
  351 
  352         update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
  353         query = _get_scoped_query(topic_name, project)
  354         if window is not None:
  355             threshold = now - window
  356             query['c.t'] = {'$lt': threshold}
  357 
  358         while True:
  359             try:
  360                 collection = self._collection(topic_name, project).stats
  361                 doc = collection.find_one_and_update(
  362                     query, update,
  363                     return_document=pymongo.ReturnDocument.AFTER,
  364                     projection={'c.v': 1, '_id': 0})
  365 
  366                 break
  367             except pymongo.errors.AutoReconnect:
  368                 LOG.exception('Auto reconnect error.')
  369 
  370         if doc is None:
  371             if window is None:
  372                 # NOTE(kgriffs): Since we did not filter by a time window,
  373                 # the topic should have been found and updated. Perhaps
  374                 # the topic has been deleted?
  375                 message = (u'Failed to increment the message '
  376                            u'counter for topic %(name)s and '
  377                            u'project %(project)s')
  378                 message %= dict(name=topic_name, project=project)
  379 
  380                 LOG.warning(message)
  381 
  382                 raise errors.TopicDoesNotExist(topic_name, project)
  383 
  384             # NOTE(kgriffs): Assume the queue existed, but the counter
  385             # was recently updated, causing the range query on 'c.t' to
  386             # exclude the record.
  387             return None
  388 
  389         return doc['c']['v']
  390 
  391     def _get_counter(self, topic_name, project=None):
  392         """Retrieves the current message counter value for a given topic.
  393 
  394         This helper is used to generate monotonic pagination
  395         markers that are saved as part of the message
  396         document.
  397 
  398         Note 1: Markers are scoped per-queue and so are *not*
  399             globally unique or globally ordered.
  400 
  401         Note 2: If two or more requests to this method are made
  402             in parallel, this method will return the same counter
  403             value. This is done intentionally so that the caller
  404             can detect a parallel message post, allowing it to
  405             mitigate race conditions between producer and
  406             observer clients.
  407 
  408         :param topic_name: Name of the topic to which the counter is scoped
  409         :param project: Topic's project
  410         :returns: current message counter as an integer
  411         """
  412 
  413         # NOTE(flaper87): If this `if` is True, it means we're
  414         # using a mongodb in the control plane. To avoid breaking
  415         # environments doing so already, we'll keep using the counter
  416         # in the mongodb queue_controller rather than the one in the
  417         # message_controller. This should go away, eventually
  418         if hasattr(self._topic_ctrl, '_get_counter'):
  419             return self._topic_ctrl._get_counter(topic_name, project)
  420 
  421         update = {'$inc': {'c.v': 0, 'c.t': 0}}
  422         query = _get_scoped_query(topic_name, project)
  423 
  424         try:
  425             collection = self._collection(topic_name, project).stats
  426             doc = collection.find_one_and_update(
  427                 query, update, upsert=True,
  428                 return_document=pymongo.ReturnDocument.AFTER,
  429                 projection={'c.v': 1, '_id': 0})
  430 
  431             return doc['c']['v']
  432         except pymongo.errors.AutoReconnect:
  433             LOG.exception('Auto reconnect error.')
  434 
  435     # ----------------------------------------------------------------------
  436     # Public interface
  437     # ----------------------------------------------------------------------
  438 
  439     def list(self, topic_name, project=None, marker=None,
  440              limit=storage.DEFAULT_MESSAGES_PER_PAGE,
  441              echo=False, client_uuid=None, include_claimed=False,
  442              include_delayed=False):
  443 
  444         if marker is not None:
  445             try:
  446                 marker = int(marker)
  447             except ValueError:
  448                 yield iter([])
  449 
  450         messages = self._list(topic_name, project=project, marker=marker,
  451                               client_uuid=client_uuid, echo=echo,
  452                               include_claimed=include_claimed,
  453                               include_delayed=include_delayed, limit=limit)
  454 
  455         marker_id = {}
  456 
  457         now = timeutils.utcnow_ts()
  458 
  459         # NOTE (kgriffs) @utils.raises_conn_error not needed on this
  460         # function, since utils.HookedCursor already has it.
  461         def denormalizer(msg):
  462             marker_id['next'] = msg['k']
  463 
  464             return _basic_message(msg, now)
  465 
  466         yield utils.HookedCursor(messages, denormalizer)
  467         yield str(marker_id['next'])
  468 
  469     @utils.raises_conn_error
  470     @utils.retries_on_autoreconnect
  471     def first(self, topic_name, project=None, sort=1):
  472         cursor = self._list(topic_name, project=project,
  473                             include_claimed=True, sort=sort,
  474                             limit=1)
  475         try:
  476             message = next(cursor)
  477         except StopIteration:
  478             raise errors.TopicIsEmpty(topic_name, project)
  479 
  480         now = timeutils.utcnow_ts()
  481         return _basic_message(message, now)
  482 
  483     @utils.raises_conn_error
  484     @utils.retries_on_autoreconnect
  485     def get(self, topic_name, message_id, project=None):
  486         mid = utils.to_oid(message_id)
  487         if mid is None:
  488             raise errors.MessageDoesNotExist(message_id, topic_name,
  489                                              project)
  490 
  491         now = timeutils.utcnow_ts()
  492 
  493         query = {
  494             '_id': mid,
  495             PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  496         }
  497 
  498         collection = self._collection(topic_name, project)
  499         message = list(collection.find(query).limit(1).hint(ID_INDEX_FIELDS))
  500 
  501         if not message:
  502             raise errors.MessageDoesNotExist(message_id, topic_name,
  503                                              project)
  504 
  505         return _basic_message(message[0], now)
  506 
  507     @utils.raises_conn_error
  508     @utils.retries_on_autoreconnect
  509     def bulk_get(self, topic_name, message_ids, project=None):
  510         message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
  511         if not message_ids:
  512             return iter([])
  513 
  514         now = timeutils.utcnow_ts()
  515 
  516         # Base query, always check expire time
  517         query = {
  518             '_id': {'$in': message_ids},
  519             PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  520         }
  521 
  522         collection = self._collection(topic_name, project)
  523 
  524         # NOTE(flaper87): Should this query
  525         # be sorted?
  526         messages = collection.find(query).hint(ID_INDEX_FIELDS)
  527 
  528         def denormalizer(msg):
  529             return _basic_message(msg, now)
  530 
  531         return utils.HookedCursor(messages, denormalizer)
  532 
  533     @utils.raises_conn_error
  534     @utils.retries_on_autoreconnect
  535     def post(self, topic_name, messages, client_uuid, project=None):
  536         # NOTE(flaper87): This method should be safe to retry on
  537         # autoreconnect, since we've a 2-step insert for messages.
  538         # The worst-case scenario is that we'll increase the counter
  539         # several times and we'd end up with some non-active messages.
  540 
  541         if not self._topic_ctrl.exists(topic_name, project):
  542             raise errors.TopicDoesNotExist(topic_name, project)
  543 
  544         # NOTE(flaper87): Make sure the counter exists. This method
  545         # is an upsert.
  546         self._get_counter(topic_name, project)
  547         now = timeutils.utcnow_ts()
  548         now_dt = datetime.datetime.utcfromtimestamp(now)
  549         collection = self._collection(topic_name, project)
  550 
  551         messages = list(messages)
  552         msgs_n = len(messages)
  553         next_marker = self._inc_counter(topic_name,
  554                                         project,
  555                                         amount=msgs_n) - msgs_n
  556 
  557         prepared_messages = []
  558         for index, message in enumerate(messages):
  559             msg = {
  560                 PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  561                 't': message['ttl'],
  562                 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
  563                 'u': client_uuid,
  564                 'd': now + message.get('delay', 0),
  565                 'b': message['body'] if 'body' in message else {},
  566                 'k': next_marker + index,
  567                 'tx': None
  568                 }
  569             if self.driver.conf.enable_checksum:
  570                 msg['cs'] = s_utils.get_checksum(message.get('body', None))
  571 
  572             prepared_messages.append(msg)
  573 
  574         res = collection.insert_many(prepared_messages,
  575                                      bypass_document_validation=True)
  576 
  577         return [str(id_) for id_ in res.inserted_ids]
  578 
  579     @utils.raises_conn_error
  580     @utils.retries_on_autoreconnect
  581     def delete(self, topic_name, message_id, project=None, claim=None):
  582         # NOTE(cpp-cabrera): return early - this is an invalid message
  583         # id so we won't be able to find it any way
  584         mid = utils.to_oid(message_id)
  585         if mid is None:
  586             return
  587 
  588         collection = self._collection(topic_name, project)
  589 
  590         query = {
  591             '_id': mid,
  592             PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  593         }
  594 
  595         cid = utils.to_oid(claim)
  596         if cid is None:
  597             raise errors.ClaimDoesNotExist(claim, topic_name, project)
  598 
  599         now = timeutils.utcnow_ts()
  600         cursor = collection.find(query).hint(ID_INDEX_FIELDS)
  601 
  602         try:
  603             message = next(cursor)
  604         except StopIteration:
  605             return
  606 
  607         if claim is None:
  608             if _is_claimed(message, now):
  609                 raise errors.MessageIsClaimed(message_id)
  610 
  611         else:
  612             if message['c']['id'] != cid:
  613                 kwargs = {}
  614                 # NOTE(flaper87): In pymongo 3.0 PRIMARY is the default and
  615                 # `read_preference` is read only. We'd need to set it when the
  616                 # client is created.
  617                 # NOTE(kgriffs): Read from primary in case the message
  618                 # was just barely claimed, and claim hasn't made it to
  619                 # the secondary.
  620                 message = collection.find_one(query, **kwargs)
  621 
  622                 if message['c']['id'] != cid:
  623                     if _is_claimed(message, now):
  624                         raise errors.MessageNotClaimedBy(message_id, claim)
  625 
  626                     raise errors.MessageNotClaimed(message_id)
  627 
  628         collection.delete_one(query)
  629 
  630     @utils.raises_conn_error
  631     @utils.retries_on_autoreconnect
  632     def bulk_delete(self, topic_name, message_ids, project=None,
  633                     claim_ids=None):
  634         message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid]
  635         if claim_ids:
  636             claim_ids = [cid for cid in map(utils.to_oid, claim_ids) if cid]
  637         query = {
  638             '_id': {'$in': message_ids},
  639             PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  640         }
  641 
  642         collection = self._collection(topic_name, project)
  643         if claim_ids:
  644             message_claim_ids = []
  645             messages = collection.find(query).hint(ID_INDEX_FIELDS)
  646             for message in messages:
  647                 message_claim_ids.append(message['c']['id'])
  648             for cid in claim_ids:
  649                 if cid not in message_claim_ids:
  650                     raise errors.ClaimDoesNotExist(cid, topic_name, project)
  651 
  652         collection.delete_many(query)
  653 
  654     @utils.raises_conn_error
  655     @utils.retries_on_autoreconnect
  656     def pop(self, topic_name, limit, project=None):
  657         query = {
  658             PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  659         }
  660 
  661         # Only include messages that are not part of
  662         # any claim, or are part of an expired claim.
  663         now = timeutils.utcnow_ts()
  664         query['c.e'] = {'$lte': now}
  665 
  666         collection = self._collection(topic_name, project)
  667         projection = {'_id': 1, 't': 1, 'b': 1, 'c.id': 1}
  668 
  669         messages = (collection.find_one_and_delete(query,
  670                                                    projection=projection)
  671                     for _ in range(limit))
  672 
  673         final_messages = [_basic_message(message, now)
  674                           for message in messages
  675                           if message]
  676 
  677         return final_messages
  678 
  679 
  680 class FIFOMessageController(MessageController):
  681 
  682     def _ensure_indexes(self, collection):
  683         """Ensures that all indexes are created."""
  684 
  685         collection.ensure_index(TTL_INDEX_FIELDS,
  686                                 name='ttl',
  687                                 expireAfterSeconds=0,
  688                                 background=True)
  689 
  690         collection.ensure_index(ACTIVE_INDEX_FIELDS,
  691                                 name='active',
  692                                 background=True)
  693 
  694         collection.ensure_index(COUNTING_INDEX_FIELDS,
  695                                 name='counting',
  696                                 background=True)
  697 
  698         # NOTE(kgriffs): This index must be unique so that
  699         # inserting a message with the same marker to the
  700         # same queue will fail; this is used to detect a
  701         # race condition which can cause an observer client
  702         # to miss a message when there is more than one
  703         # producer posting messages to the same queue, in
  704         # parallel.
  705         collection.ensure_index(MARKER_INDEX_FIELDS,
  706                                 name='queue_marker',
  707                                 unique=True,
  708                                 background=True)
  709 
  710         collection.ensure_index(TRANSACTION_INDEX_FIELDS,
  711                                 name='transaction',
  712                                 background=True)
  713 
  714     @utils.raises_conn_error
  715     @utils.retries_on_autoreconnect
  716     def post(self, topic_name, messages, client_uuid, project=None):
  717         # NOTE(flaper87): This method should be safe to retry on
  718         # autoreconnect, since we've a 2-step insert for messages.
  719         # The worst-case scenario is that we'll increase the counter
  720         # several times and we'd end up with some non-active messages.
  721 
  722         if not self._topic_ctrl.exists(topic_name, project):
  723             raise errors.TopicDoesNotExist(topic_name, project)
  724 
  725         # NOTE(flaper87): Make sure the counter exists. This method
  726         # is an upsert.
  727         self._get_counter(topic_name, project)
  728         now = timeutils.utcnow_ts()
  729         now_dt = datetime.datetime.utcfromtimestamp(now)
  730         collection = self._collection(topic_name, project)
  731 
  732         # Set the next basis marker for the first attempt.
  733         #
  734         # Note that we don't increment the counter right away because
  735         # if 2 concurrent posts happen and the one with the higher counter
  736         # ends before the one with the lower counter, there's a window
  737         # where a client paging through the queue may get the messages
  738         # with the higher counter and skip the previous ones. This would
  739         # make our FIFO guarantee unsound.
  740         next_marker = self._get_counter(topic_name, project)
  741 
  742         # Unique transaction ID to facilitate atomic batch inserts
  743         transaction = objectid.ObjectId()
  744 
  745         prepared_messages = []
  746         for index, message in enumerate(messages):
  747             msg = {
  748                 PROJ_TOPIC: utils.scope_queue_name(topic_name, project),
  749                 't': message['ttl'],
  750                 'e': now_dt + datetime.timedelta(seconds=message['ttl']),
  751                 'u': client_uuid,
  752                 'd': now + message.get('delay', 0),
  753                 'b': message['body'] if 'body' in message else {},
  754                 'k': next_marker + index,
  755                 'tx': None
  756                 }
  757             if self.driver.conf.enable_checksum:
  758                 msg['cs'] = s_utils.get_checksum(message.get('body', None))
  759 
  760             prepared_messages.append(msg)
  761 
  762         # NOTE(kgriffs): Don't take the time to do a 2-phase insert
  763         # if there is no way for it to partially succeed.
  764         if len(prepared_messages) == 1:
  765             transaction = None
  766             prepared_messages[0]['tx'] = None
  767 
  768         # Use a retry range for sanity, although we expect
  769         # to rarely, if ever, reach the maximum number of
  770         # retries.
  771         #
  772         # NOTE(kgriffs): With the default configuration (100 ms
  773         # max sleep, 1000 max attempts), the max stall time
  774         # before the operation is abandoned is 49.95 seconds.
  775         for attempt in self._retry_range:
  776             try:
  777                 res = collection.insert_many(prepared_messages,
  778                                              bypass_document_validation=True)
  779 
  780                 # Log a message if we retried, for debugging perf issues
  781                 if attempt != 0:
  782                     msgtmpl = _(u'%(attempts)d attempt(s) required to post '
  783                                 u'%(num_messages)d messages to queue '
  784                                 u'"%(topic)s" under project %(project)s')
  785 
  786                     LOG.debug(msgtmpl,
  787                               dict(topic=topic_name,
  788                                    attempts=attempt + 1,
  789                                    num_messages=len(res.inserted_ids),
  790                                    project=project))
  791 
  792                 # Update the counter in preparation for the next batch
  793                 #
  794                 # NOTE(kgriffs): Due to the unique index on the messages
  795                 # collection, competing inserts will fail as a whole,
  796                 # and keep retrying until the counter is incremented
  797                 # such that the competing marker's will start at a
  798                 # unique number, 1 past the max of the messages just
  799                 # inserted above.
  800                 self._inc_counter(topic_name, project,
  801                                   amount=len(res.inserted_ids))
  802 
  803                 # NOTE(kgriffs): Finalize the insert once we can say that
  804                 # all the messages made it. This makes bulk inserts
  805                 # atomic, assuming queries filter out any non-finalized
  806                 # messages.
  807                 if transaction is not None:
  808                     collection.update_many({'tx': transaction},
  809                                            {'$set': {'tx': None}},
  810                                            upsert=False)
  811 
  812                 return [str(id_) for id_ in res.inserted_ids]
  813 
  814             except (pymongo.errors.DuplicateKeyError,
  815                     pymongo.errors.BulkWriteError):
  816                 # TODO(kgriffs): Record stats of how often retries happen,
  817                 # and how many attempts, on average, are required to insert
  818                 # messages.
  819 
  820                 # NOTE(kgriffs): This can be used in conjunction with the
  821                 # log line, above, that is emitted after all messages have
  822                 # been posted, to gauge how long it is taking for messages
  823                 # to be posted to a given topic, or overall.
  824                 #
  825                 # TODO(kgriffs): Add transaction ID to help match up loglines
  826                 if attempt == 0:
  827                     msgtmpl = _(u'First attempt failed while '
  828                                 u'adding messages to topic '
  829                                 u'"%(topic)s" under project %(project)s')
  830 
  831                     LOG.debug(msgtmpl, dict(topic=topic_name, project=project))
  832 
  833                 # NOTE(kgriffs): Never retry past the point that competing
  834                 # messages expire and are GC'd, since once they are gone,
  835                 # the unique index no longer protects us from getting out
  836                 # of order, which could cause an observer to miss this
  837                 # message. The code below provides a sanity-check to ensure
  838                 # this situation can not happen.
  839                 elapsed = timeutils.utcnow_ts() - now
  840                 if elapsed > MAX_RETRY_POST_DURATION:
  841                     msgtmpl = (u'Exceeded maximum retry duration for topic '
  842                                u'"%(topic)s" under project %(project)s')
  843 
  844                     LOG.warning(msgtmpl,
  845                                 dict(topic=topic_name, project=project))
  846                     break
  847 
  848                 # Chill out for a moment to mitigate thrashing/thundering
  849                 self._backoff_sleep(attempt)
  850 
  851                 # NOTE(kgriffs): Perhaps we failed because a worker crashed
  852                 # after inserting messages, but before incrementing the
  853                 # counter; that would cause all future requests to stall,
  854                 # since they would keep getting the same base marker that is
  855                 # conflicting with existing messages, until the messages that
  856                 # "won" expire, at which time we would end up reusing markers,
  857                 # and that could make some messages invisible to an observer
  858                 # that is querying with a marker that is large than the ones
  859                 # being reused.
  860                 #
  861                 # To mitigate this, we apply a heuristic to determine whether
  862                 # a counter has stalled. We attempt to increment the counter,
  863                 # but only if it hasn't been updated for a few seconds, which
  864                 # should mean that nobody is left to update it!
  865                 #
  866                 # Note that we increment one at a time until the logjam is
  867                 # broken, since we don't know how many messages were posted
  868                 # by the worker before it crashed.
  869                 next_marker = self._inc_counter(
  870                     topic_name, project, window=COUNTER_STALL_WINDOW)
  871 
  872                 # Retry the entire batch with a new sequence of markers.
  873                 #
  874                 # NOTE(kgriffs): Due to the unique index, and how
  875                 # MongoDB works with batch requests, we will never
  876                 # end up with a partially-successful update. The first
  877                 # document in the batch will fail to insert, and the
  878                 # remainder of the documents will not be attempted.
  879                 if next_marker is None:
  880                     # NOTE(kgriffs): Usually we will end up here, since
  881                     # it should be rare that a counter becomes stalled.
  882                     next_marker = self._get_counter(
  883                         topic_name, project)
  884                 else:
  885                     msgtmpl = (u'Detected a stalled message counter '
  886                                u'for topic "%(topic)s" under '
  887                                u'project %(project)s.'
  888                                u'The counter was incremented to %(value)d.')
  889 
  890                     LOG.warning(msgtmpl,
  891                                 dict(topic=topic_name,
  892                                      project=project,
  893                                      value=next_marker))
  894 
  895                 for index, message in enumerate(prepared_messages):
  896                     message['k'] = next_marker + index
  897             except Exception:
  898                 LOG.exception('Error parsing document.')
  899                 raise
  900 
  901         msgtmpl = (u'Hit maximum number of attempts (%(max)s) for topic '
  902                    u'"%(topic)s" under project %(project)s')
  903 
  904         LOG.warning(msgtmpl,
  905                     dict(max=self.driver.mongodb_conf.max_attempts,
  906                          topic=topic_name,
  907                          project=project))
  908 
  909         raise errors.MessageConflict(topic_name, project)
  910 
  911 
  912 def _is_claimed(msg, now):
  913     return (msg['c']['id'] is not None and
  914             msg['c']['e'] > now)
  915 
  916 
  917 def _basic_message(msg, now):
  918     oid = msg['_id']
  919     age = now - utils.oid_ts(oid)
  920     res = {
  921         'id': str(oid),
  922         'age': int(age),
  923         'ttl': msg['t'],
  924         'body': msg['b']
  925         }
  926     if msg.get('cs'):
  927         res['checksum'] = msg.get('cs')
  928 
  929     return res
  930 
  931 
  932 class MessageTopicHandler(object):
  933 
  934     def __init__(self, driver, control_driver):
  935         self.driver = driver
  936         self._cache = self.driver.cache
  937         self.topic_controller = self.driver.topic_controller
  938         self.message_controller = self.driver.message_controller
  939 
  940     def delete(self, topic_name, project=None):
  941         self.message_controller._purge_queue(topic_name, project)
  942 
  943     @utils.raises_conn_error
  944     @utils.retries_on_autoreconnect
  945     def stats(self, name, project=None):
  946         if not self.topic_controller.exists(name, project=project):
  947             raise errors.TopicDoesNotExist(name, project)
  948 
  949         controller = self.message_controller
  950 
  951         total = controller._count(name, project=project,
  952                                   include_claimed=True)
  953 
  954         message_stats = {
  955             'total': total,
  956         }
  957 
  958         try:
  959             oldest = controller.first(name, project=project, sort=1)
  960             newest = controller.first(name, project=project, sort=-1)
  961         except errors.QueueIsEmpty:
  962             pass
  963         else:
  964             now = timeutils.utcnow_ts()
  965             message_stats['oldest'] = utils.stat_message(oldest, now)
  966             message_stats['newest'] = utils.stat_message(newest, now)
  967 
  968         return {'messages': message_stats}
  969 
  970 
  971 def _get_scoped_query(name, project):
  972     return {'p_t': utils.scope_queue_name(name, project)}