"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/mongodb/utils.py" (13 May 2020, 10816 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "utils.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 from __future__ import division
   17 import binascii
   18 import collections
   19 import datetime
   20 import functools
   21 import random
   22 import time
   23 
   24 from bson import errors as berrors
   25 from bson import objectid
   26 from bson import tz_util
   27 from oslo_log import log as logging
   28 from oslo_utils import timeutils
   29 from pymongo import errors
   30 
   31 from zaqar.storage import errors as storage_errors
   32 
   33 
   34 # BSON ObjectId gives TZ-aware datetime, so we generate a
   35 # TZ-aware UNIX epoch for convenience.
   36 EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tz_util.utc)
   37 
   38 # NOTE(cpp-cabrera): the authoritative form of project/queue keys.
   39 PROJ_QUEUE_KEY = 'p_q'
   40 
   41 PROJ_TOPIC_KEY = 'p_t'
   42 
   43 LOG = logging.getLogger(__name__)
   44 
   45 
   46 def cached_gen(iterable):
   47     """Converts the iterable into a caching generator.
   48 
   49     Returns a proxy that yields each item of iterable, while at
   50     the same time caching those items in a deque.
   51 
   52     :param iterable: an iterable to wrap in a caching generator
   53 
   54     :returns: (proxy(iterable), cached_items)
   55     """
   56     cached_items = collections.deque()
   57 
   58     def generator(iterable):
   59         for item in iterable:
   60             cached_items.append(item)
   61             yield item
   62 
   63     return generator(iterable), cached_items
   64 
   65 
   66 def calculate_backoff(attempt, max_attempts, max_sleep, max_jitter=0):
   67     """Calculates backoff time, in seconds, when retrying an operation.
   68 
   69     This function calculates a simple linear backoff time with
   70     optional jitter, useful for retrying a request under high
   71     concurrency.
   72 
   73     The result may be passed directly into time.sleep() in order to
   74     mitigate stampeding herd syndrome and introduce backpressure towards
   75     the clients, slowing them down.
   76 
   77     :param attempt: current value of the attempt counter (zero-based)
   78     :param max_attempts: maximum number of attempts that will be tried
   79     :param max_sleep: maximum sleep value to apply before jitter, assumed
   80         to be seconds. Fractional seconds are supported to 1 ms
   81         granularity.
   82     :param max_jitter: maximum jitter value to add to the baseline sleep
   83         time. Actual value will be chosen randomly.
   84 
   85     :raises ValueError: if the parameter is not invalid
   86     :returns: float representing the number of seconds to sleep, within
   87         the interval [0, max_sleep), determined linearly according to
   88         the ratio attempt / max_attempts, with optional jitter.
   89     """
   90 
   91     if max_sleep < 0:
   92         raise ValueError(u'max_sleep must be >= 0')
   93 
   94     if max_jitter < 0:
   95         raise ValueError(u'max_jitter must be >= 0')
   96 
   97     if not (0 <= attempt < max_attempts):
   98         raise ValueError(u'attempt value is out of range')
   99 
  100     ratio = attempt / max_attempts
  101     backoff_sec = ratio * max_sleep
  102     jitter_sec = random.random() * max_jitter
  103 
  104     return backoff_sec + jitter_sec
  105 
  106 
  107 def to_oid(obj):
  108     """Creates a new ObjectId based on the input.
  109 
  110     Returns None when TypeError or berrors.InvalidId
  111     is raised by the ObjectId class.
  112 
  113     :param obj: Anything that can be passed as an
  114         input to `objectid.ObjectId`
  115     """
  116     try:
  117         return objectid.ObjectId(obj)
  118     except (TypeError, berrors.InvalidId):
  119         return None
  120 
  121 
  122 def oid_ts(oid):
  123     """Converts an ObjectId to a UNIX timestamp.
  124 
  125     :raises TypeError: if oid isn't an ObjectId
  126     """
  127     try:
  128         return timeutils.delta_seconds(EPOCH, oid.generation_time)
  129     except AttributeError:
  130         raise TypeError(u'Expected ObjectId and got %s' % type(oid))
  131 
  132 
  133 def stat_message(message, now):
  134     """Creates a stat document from the given message, relative to now."""
  135     msg_id = message['id']
  136     created = oid_ts(to_oid(msg_id))
  137     age = now - created
  138     created_iso = datetime.datetime.utcfromtimestamp(created).strftime(
  139         '%Y-%m-%dT%H:%M:%SZ')
  140     return {
  141         'id': msg_id,
  142         'age': int(age),
  143         'created': created_iso,
  144     }
  145 
  146 
  147 def normalize_none_str(string_or_none):
  148     """Returns '' IFF given value is None, passthrough otherwise.
  149 
  150     This function normalizes None to the empty string to facilitate
  151     string concatenation when a variable could be None.
  152     """
  153     return '' if string_or_none is None else string_or_none
  154 
  155 
  156 def scope_queue_name(queue=None, project=None):
  157     """Returns a scoped name for a queue based on project and queue.
  158 
  159     If only the project name is specified, a scope signifying "all queues"
  160     for that project is returned. If neither queue nor project are
  161     specified, a scope for "all global queues" is returned, which
  162     is to be interpreted as excluding queues scoped by project.
  163     :param queue: name of queue to seek
  164     :type queue: six.text_type
  165     :param project: namespace
  166     :type project: six.text_type
  167     :returns: '{project}/{queue}' if project and queue are given,
  168         '{project}/' if ONLY project is given, '/{queue}' if ONLY
  169         queue is given, and '/' if neither are given.
  170     """
  171 
  172     # NOTE(kgriffs): Concatenation is faster than format, and
  173     # put project first since it is guaranteed to be unique.
  174     return normalize_none_str(project) + '/' + normalize_none_str(queue)
  175 
  176 
  177 def descope_queue_name(scoped_name):
  178     """Returns the unscoped queue name, given a fully-scoped name."""
  179 
  180     # NOTE(kgriffs): scoped_name can be either '/', '/global-queue-name',
  181     # or 'project-id/queue-name'.
  182     return scoped_name.partition('/')[2] or None
  183 
  184 
  185 def parse_scoped_project_queue(scoped_name):
  186     """Returns the project and queue name for a scoped catalogue entry.
  187 
  188     :param scoped_name: a project/queue as given by :scope_queue_name:
  189     :type scoped_name: six.text_type
  190     :returns: (project, queue)
  191     :rtype: (six.text_type, six.text_type)
  192     """
  193     return scoped_name.split('/')
  194 
  195 
  196 def scoped_query(queue, project, name=None, kfilter={},
  197                  key_value=PROJ_QUEUE_KEY):
  198     """Returns a dict usable for querying for scoped project/queues.
  199 
  200     :param queue: name of queue to seek
  201     :type queue: six.text_type
  202     :param project: namespace
  203     :type project: six.text_type
  204     :returns: query to issue
  205     :rtype: dict
  206     """
  207     key = key_value
  208     query = {}
  209     scoped_name = scope_queue_name(queue, project)
  210 
  211     if not scoped_name.startswith('/'):
  212         # NOTE(kgriffs): scoped queue, e.g., 'project-id/queue-name'
  213         if name:
  214             project_prefix = '^' + project + '/.*' + name + '.*'
  215         else:
  216             project_prefix = '^' + project + '/'
  217         query[key] = {'$regex': project_prefix, '$gt': scoped_name}
  218     elif scoped_name == '/':
  219         # NOTE(kgriffs): list global queues, but exclude scoped ones
  220         if name:
  221             query[key] = {'$regex': '^/.*' + name + '.*'}
  222         else:
  223             query[key] = {'$regex': '^/'}
  224     else:
  225         # NOTE(kgriffs): unscoped queue, e.g., '/my-global-queue'
  226         if name:
  227             query[key] = {'$regex': '^/.*' + name + '.*', '$gt': scoped_name}
  228         else:
  229             query[key] = {'$regex': '^/', '$gt': scoped_name}
  230 
  231     # Handler the metadata filter in request.
  232     for key, value in kfilter.items():
  233         key = 'm.' + key
  234         query[key] = {'$eq': value}
  235 
  236     return query
  237 
  238 
  239 def get_partition(num_partitions, queue, project=None):
  240     """Get the partition number for a given queue and project.
  241 
  242     Hashes the queue to a partition number. The hash is stable,
  243     meaning given the same queue name and project ID, the same
  244     partition number will always be returned. Note also that
  245     queues will be uniformly distributed across partitions.
  246 
  247     The number of partitions is taken from the "partitions"
  248     property in the config file, under the [drivers:storage:mongodb]
  249     section.
  250     """
  251 
  252     name = project + queue if project is not None else queue
  253 
  254     # NOTE(kgriffs): For small numbers of partitions, crc32 will
  255     # provide a uniform distribution. This was verified experimentally
  256     # with up to 100 partitions.
  257     return binascii.crc32(name.encode('utf-8')) % num_partitions
  258 
  259 
  260 def raises_conn_error(func):
  261     """Handles the MongoDB ConnectionFailure error.
  262 
  263     This decorator catches MongoDB's ConnectionFailure
  264     error and raises Zaqar's ConnectionError instead.
  265     """
  266 
  267     @functools.wraps(func)
  268     def wrapper(*args, **kwargs):
  269         try:
  270             return func(*args, **kwargs)
  271         except errors.ConnectionFailure:
  272             LOG.exception('Connection failure.')
  273             raise storage_errors.ConnectionError()
  274 
  275     return wrapper
  276 
  277 
  278 def retries_on_autoreconnect(func):
  279     """Causes the wrapped function to be re-called on AutoReconnect.
  280 
  281     This decorator catches MongoDB's AutoReconnect error and retries
  282     the function call.
  283 
  284     .. Note::
  285        Assumes that the decorated function has defined self.driver.mongodb_conf
  286        so that `max_reconnect_attempts` and `reconnect_sleep` can be taken
  287        into account.
  288 
  289     .. Warning:: The decorated function must be idempotent.
  290     """
  291 
  292     @functools.wraps(func)
  293     def wrapper(self, *args, **kwargs):
  294         # TODO(kgriffs): Figure out a way to not have to rely on the
  295         # presence of `mongodb_conf`
  296         max_attemps = self.driver.mongodb_conf.max_reconnect_attempts
  297         sleep_sec = self.driver.mongodb_conf.reconnect_sleep
  298 
  299         last_ex = None
  300         for attempt in range(max_attemps):
  301             try:
  302                 return func(self, *args, **kwargs)
  303                 break
  304 
  305             except errors.AutoReconnect as ex:
  306                 LOG.warning(u'Caught AutoReconnect, retrying the '
  307                             'call to {0}'.format(func))
  308 
  309                 last_ex = ex
  310                 time.sleep(sleep_sec * (2 ** attempt))
  311         else:
  312             LOG.error(u'Caught AutoReconnect, maximum attempts '
  313                       'to {0} exceeded.'.format(func))
  314 
  315             raise last_ex
  316 
  317     return wrapper
  318 
  319 
  320 class HookedCursor(object):
  321 
  322     def __init__(self, cursor, denormalizer):
  323         self.cursor = cursor
  324         self.denormalizer = denormalizer
  325 
  326     def __getattr__(self, attr):
  327         return getattr(self.cursor, attr)
  328 
  329     def __iter__(self):
  330         return self
  331 
  332     def __len__(self):
  333         return self.cursor.count(True)
  334 
  335     @raises_conn_error
  336     def next(self):
  337         item = next(self.cursor)
  338         return self.denormalizer(item)
  339 
  340     def __next__(self):
  341         return self.next()