"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "zaqar/storage/mongodb/topic_messages.py" between
zaqar-9.0.0.tar.gz and zaqar-10.0.0.tar.gz

About: OpenStack Zaqar is a multi-tenant cloud messaging and notification service for web and mobile developers.
The "Ussuri" series (latest release).

topic_messages.py  (zaqar-9.0.0):topic_messages.py  (zaqar-10.0.0)
skipping to change at line 27 skipping to change at line 27
Field Mappings: Field Mappings:
In order to reduce the disk / memory space used, In order to reduce the disk / memory space used,
field names will be, most of the time, the first field names will be, most of the time, the first
letter of their long name. letter of their long name.
""" """
import datetime import datetime
import time import time
from bson import errors as bsonerror
from bson import objectid from bson import objectid
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import timeutils from oslo_utils import timeutils
import pymongo.errors import pymongo.errors
import pymongo.read_preferences import pymongo.read_preferences
from zaqar.i18n import _ from zaqar.i18n import _
from zaqar import storage from zaqar import storage
from zaqar.storage import errors from zaqar.storage import errors
from zaqar.storage.mongodb import utils from zaqar.storage.mongodb import utils
skipping to change at line 366 skipping to change at line 365
while True: while True:
try: try:
collection = self._collection(topic_name, project).stats collection = self._collection(topic_name, project).stats
doc = collection.find_one_and_update( doc = collection.find_one_and_update(
query, update, query, update,
return_document=pymongo.ReturnDocument.AFTER, return_document=pymongo.ReturnDocument.AFTER,
projection={'c.v': 1, '_id': 0}) projection={'c.v': 1, '_id': 0})
break break
except pymongo.errors.AutoReconnect as ex: except pymongo.errors.AutoReconnect:
LOG.exception(ex) LOG.exception('Auto reconnect error.')
if doc is None: if doc is None:
if window is None: if window is None:
# NOTE(kgriffs): Since we did not filter by a time window, # NOTE(kgriffs): Since we did not filter by a time window,
# the topic should have been found and updated. Perhaps # the topic should have been found and updated. Perhaps
# the topic has been deleted? # the topic has been deleted?
message = (u'Failed to increment the message ' message = (u'Failed to increment the message '
u'counter for topic %(name)s and ' u'counter for topic %(name)s and '
u'project %(project)s') u'project %(project)s')
message %= dict(name=topic_name, project=project) message %= dict(name=topic_name, project=project)
skipping to change at line 431 skipping to change at line 430
query = _get_scoped_query(topic_name, project) query = _get_scoped_query(topic_name, project)
try: try:
collection = self._collection(topic_name, project).stats collection = self._collection(topic_name, project).stats
doc = collection.find_one_and_update( doc = collection.find_one_and_update(
query, update, upsert=True, query, update, upsert=True,
return_document=pymongo.ReturnDocument.AFTER, return_document=pymongo.ReturnDocument.AFTER,
projection={'c.v': 1, '_id': 0}) projection={'c.v': 1, '_id': 0})
return doc['c']['v'] return doc['c']['v']
except pymongo.errors.AutoReconnect as ex: except pymongo.errors.AutoReconnect:
LOG.exception(ex) LOG.exception('Auto reconnect error.')
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Public interface # Public interface
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def list(self, topic_name, project=None, marker=None, def list(self, topic_name, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE, limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None, include_claimed=False, echo=False, client_uuid=None, include_claimed=False,
include_delayed=False): include_delayed=False):
skipping to change at line 813 skipping to change at line 812
# atomic, assuming queries filter out any non-finalized # atomic, assuming queries filter out any non-finalized
# messages. # messages.
if transaction is not None: if transaction is not None:
collection.update_many({'tx': transaction}, collection.update_many({'tx': transaction},
{'$set': {'tx': None}}, {'$set': {'tx': None}},
upsert=False) upsert=False)
return [str(id_) for id_ in res.inserted_ids] return [str(id_) for id_ in res.inserted_ids]
except (pymongo.errors.DuplicateKeyError, except (pymongo.errors.DuplicateKeyError,
pymongo.errors.BulkWriteError) as ex: pymongo.errors.BulkWriteError):
# TODO(kgriffs): Record stats of how often retries happen, # TODO(kgriffs): Record stats of how often retries happen,
# and how many attempts, on average, are required to insert # and how many attempts, on average, are required to insert
# messages. # messages.
# NOTE(kgriffs): This can be used in conjunction with the # NOTE(kgriffs): This can be used in conjunction with the
# log line, above, that is emitted after all messages have # log line, above, that is emitted after all messages have
# been posted, to gauge how long it is taking for messages # been posted, to gauge how long it is taking for messages
# to be posted to a given topic, or overall. # to be posted to a given topic, or overall.
# #
# TODO(kgriffs): Add transaction ID to help match up loglines # TODO(kgriffs): Add transaction ID to help match up loglines
skipping to change at line 895 skipping to change at line 894
u'project %(project)s.' u'project %(project)s.'
u'The counter was incremented to %(value)d.') u'The counter was incremented to %(value)d.')
LOG.warning(msgtmpl, LOG.warning(msgtmpl,
dict(topic=topic_name, dict(topic=topic_name,
project=project, project=project,
value=next_marker)) value=next_marker))
for index, message in enumerate(prepared_messages): for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index message['k'] = next_marker + index
except bsonerror.InvalidDocument as ex: except Exception:
LOG.exception(ex) LOG.exception('Error parsing document.')
raise
except Exception as ex:
LOG.exception(ex)
raise raise
msgtmpl = (u'Hit maximum number of attempts (%(max)s) for topic ' msgtmpl = (u'Hit maximum number of attempts (%(max)s) for topic '
u'"%(topic)s" under project %(project)s') u'"%(topic)s" under project %(project)s')
LOG.warning(msgtmpl, LOG.warning(msgtmpl,
dict(max=self.driver.mongodb_conf.max_attempts, dict(max=self.driver.mongodb_conf.max_attempts,
topic=topic_name, topic=topic_name,
project=project)) project=project))
 End of changes. 5 change blocks. 
11 lines changed or deleted 7 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)