"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "zaqar/storage/mongodb/messages.py" between
zaqar-9.0.0.tar.gz and zaqar-10.0.0.tar.gz

About: OpenStack Zaqar is a multi-tenant cloud messaging and notification service for web and mobile developers.
The "Ussuri" series (latest release).

messages.py  (zaqar-9.0.0):messages.py  (zaqar-10.0.0)
skipping to change at line 27 skipping to change at line 27
Field Mappings: Field Mappings:
In order to reduce the disk / memory space used, In order to reduce the disk / memory space used,
field names will be, most of the time, the first field names will be, most of the time, the first
letter of their long name. letter of their long name.
""" """
import datetime import datetime
import time import time
from bson import errors as bsonerror
from bson import objectid from bson import objectid
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import timeutils from oslo_utils import timeutils
import pymongo.errors import pymongo.errors
import pymongo.read_preferences import pymongo.read_preferences
from zaqar.i18n import _ from zaqar.i18n import _
from zaqar import storage from zaqar import storage
from zaqar.storage import errors from zaqar.storage import errors
from zaqar.storage.mongodb import utils from zaqar.storage.mongodb import utils
skipping to change at line 459 skipping to change at line 458
while True: while True:
try: try:
collection = self._collection(queue_name, project).stats collection = self._collection(queue_name, project).stats
doc = collection.find_one_and_update( doc = collection.find_one_and_update(
query, update, query, update,
return_document=pymongo.ReturnDocument.AFTER, return_document=pymongo.ReturnDocument.AFTER,
projection={'c.v': 1, '_id': 0}) projection={'c.v': 1, '_id': 0})
break break
except pymongo.errors.AutoReconnect as ex: except pymongo.errors.AutoReconnect:
LOG.exception(ex) LOG.exception('Auto reconnect error')
if doc is None: if doc is None:
if window is None: if window is None:
# NOTE(kgriffs): Since we did not filter by a time window, # NOTE(kgriffs): Since we did not filter by a time window,
# the queue should have been found and updated. Perhaps # the queue should have been found and updated. Perhaps
# the queue has been deleted? # the queue has been deleted?
message = (u'Failed to increment the message ' message = (u'Failed to increment the message '
u'counter for queue %(name)s and ' u'counter for queue %(name)s and '
u'project %(project)s') u'project %(project)s')
message %= dict(name=queue_name, project=project) message %= dict(name=queue_name, project=project)
skipping to change at line 524 skipping to change at line 523
query = _get_scoped_query(queue_name, project) query = _get_scoped_query(queue_name, project)
try: try:
collection = self._collection(queue_name, project).stats collection = self._collection(queue_name, project).stats
doc = collection.find_one_and_update( doc = collection.find_one_and_update(
query, update, upsert=True, query, update, upsert=True,
return_document=pymongo.ReturnDocument.AFTER, return_document=pymongo.ReturnDocument.AFTER,
projection={'c.v': 1, '_id': 0}) projection={'c.v': 1, '_id': 0})
return doc['c']['v'] return doc['c']['v']
except pymongo.errors.AutoReconnect as ex: except pymongo.errors.AutoReconnect:
LOG.exception(ex) LOG.exception('Auto reconnect error')
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Public interface # Public interface
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def list(self, queue_name, project=None, marker=None, def list(self, queue_name, project=None, marker=None,
limit=storage.DEFAULT_MESSAGES_PER_PAGE, limit=storage.DEFAULT_MESSAGES_PER_PAGE,
echo=False, client_uuid=None, include_claimed=False, echo=False, client_uuid=None, include_claimed=False,
include_delayed=False): include_delayed=False):
skipping to change at line 912 skipping to change at line 911
# atomic, assuming queries filter out any non-finalized # atomic, assuming queries filter out any non-finalized
# messages. # messages.
if transaction is not None: if transaction is not None:
collection.update_many({'tx': transaction}, collection.update_many({'tx': transaction},
{'$set': {'tx': None}}, {'$set': {'tx': None}},
upsert=False) upsert=False)
return [str(id_) for id_ in res.inserted_ids] return [str(id_) for id_ in res.inserted_ids]
except (pymongo.errors.DuplicateKeyError, except (pymongo.errors.DuplicateKeyError,
pymongo.errors.BulkWriteError) as ex: pymongo.errors.BulkWriteError):
# TODO(kgriffs): Record stats of how often retries happen, # TODO(kgriffs): Record stats of how often retries happen,
# and how many attempts, on average, are required to insert # and how many attempts, on average, are required to insert
# messages. # messages.
# NOTE(kgriffs): This can be used in conjunction with the # NOTE(kgriffs): This can be used in conjunction with the
# log line, above, that is emitted after all messages have # log line, above, that is emitted after all messages have
# been posted, to gauge how long it is taking for messages # been posted, to gauge how long it is taking for messages
# to be posted to a given queue, or overall. # to be posted to a given queue, or overall.
# #
# TODO(kgriffs): Add transaction ID to help match up loglines # TODO(kgriffs): Add transaction ID to help match up loglines
skipping to change at line 994 skipping to change at line 993
u'project %(project)s.' u'project %(project)s.'
u'The counter was incremented to %(value)d.') u'The counter was incremented to %(value)d.')
LOG.warning(msgtmpl, LOG.warning(msgtmpl,
dict(queue=queue_name, dict(queue=queue_name,
project=project, project=project,
value=next_marker)) value=next_marker))
for index, message in enumerate(prepared_messages): for index, message in enumerate(prepared_messages):
message['k'] = next_marker + index message['k'] = next_marker + index
except bsonerror.InvalidDocument as ex: except Exception:
LOG.exception(ex) LOG.exception('Error parsing document')
raise
except Exception as ex:
LOG.exception(ex)
raise raise
msgtmpl = (u'Hit maximum number of attempts (%(max)s) for queue ' msgtmpl = (u'Hit maximum number of attempts (%(max)s) for queue '
u'"%(queue)s" under project %(project)s') u'"%(queue)s" under project %(project)s')
LOG.warning(msgtmpl, LOG.warning(msgtmpl,
dict(max=self.driver.mongodb_conf.max_attempts, dict(max=self.driver.mongodb_conf.max_attempts,
queue=queue_name, queue=queue_name,
project=project)) project=project))
 End of changes. 5 change blocks. 
11 lines changed or deleted 7 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)