"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-7.0.0/zaqar/storage/redis/utils.py" (30 Aug 2018, 12597 Bytes) of package /linux/misc/openstack/zaqar-7.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "utils.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 5.0.0_vs_7.0.0.

    1 # Copyright (c) 2014 Prashanth Raghu.
    2 # Copyright (c) 2015 Catalyst IT Ltd.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License");
    5 # you may not use this file except in compliance with the License.
    6 # You may obtain a copy of the License at
    7 #
    8 #    http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS,
   12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   13 # implied.
   14 # See the License for the specific language governing permissions and
   15 # limitations under the License.
   16 
   17 import functools
   18 import sys
   19 import time
   20 
   21 from oslo_log import log as logging
   22 from oslo_utils import encodeutils
   23 import redis
   24 import six
   25 
   26 from zaqar.storage import errors
   27 
   28 LOG = logging.getLogger(__name__)
   29 MESSAGE_IDS_SUFFIX = 'messages'
   30 SUBSCRIPTION_IDS_SUFFIX = 'subscriptions'
   31 FLAVORS_IDS_SUFFIX = 'flavors'
   32 POOLS_IDS_SUFFIX = 'pools'
   33 
   34 
   35 def descope_queue_name(scoped_name):
   36     """Descope Queue name with '.'.
   37 
   38     Returns the queue name from the scoped name
   39     which is of the form project-id.queue-name
   40     """
   41 
   42     return scoped_name.split('.')[1]
   43 
   44 
   45 def normalize_none_str(string_or_none):
   46     """Returns '' IFF given value is None, passthrough otherwise.
   47 
   48     This function normalizes None to the empty string to facilitate
   49     string concatenation when a variable could be None.
   50     """
   51 
   52     # TODO(prashanthr_) : Try to reuse this utility. Violates DRY
   53     return '' if string_or_none is None else string_or_none
   54 
   55 
   56 def scope_queue_name(queue=None, project=None):
   57     """Returns a scoped name for a queue based on project and queue.
   58 
   59     If only the project name is specified, a scope signifying "all queues"
   60     for that project is returned. If neither queue nor project are
   61     specified, a scope for "all global queues" is returned, which
   62     is to be interpreted as excluding queues scoped by project.
   63 
   64     :returns: '{project}.{queue}' if project and queue are given,
   65         '{project}.' if ONLY project is given, '.{queue}' if ONLY
   66         queue is given, and '.' if neither are given.
   67     """
   68 
   69     # TODO(prashanthr_) : Try to reuse this utility. Violates DRY
   70     return normalize_none_str(project) + '.' + normalize_none_str(queue)
   71 
   72 # NOTE(prashanthr_): Aliase the scope_queue_name function
   73 # to be used in the pools and claims controller as similar
   74 # functionality is required to scope redis id's.
   75 scope_pool_catalogue = scope_claim_messages = scope_queue_name
   76 
   77 
   78 def scope_message_ids_set(queue=None, project=None, message_suffix=''):
   79     """Scope messages set with '.'
   80 
   81     Returns a scoped name for the list of messages in the form
   82     project-id_queue-name_suffix
   83     """
   84 
   85     return (normalize_none_str(project) + '.' +
   86             normalize_none_str(queue) + '.' +
   87             message_suffix)
   88 
   89 
   90 def descope_message_ids_set(msgset_key):
   91     """Descope messages set with '.'
   92 
   93     :returns: (queue, project)
   94     """
   95 
   96     tokens = msgset_key.split('.')
   97 
   98     return tokens[1] or None, tokens[0] or None
   99 
  100 
  101 def scope_subscription_ids_set(queue=None, project=None,
  102                                subscription_suffix=''):
  103     """Scope subscriptions set with '.'
  104 
  105     Returns a scoped name for the list of subscriptions in the form
  106     project-id_queue-name_suffix
  107     """
  108 
  109     return (normalize_none_str(project) + '.' +
  110             normalize_none_str(queue) + '.' +
  111             subscription_suffix)
  112 
  113 
  114 def descope_subscription_ids_set(subset_key):
  115     """Descope subscriptions set with '.'
  116 
  117     :returns: (queue, project)
  118     """
  119 
  120     tokens = subset_key.split('.')
  121 
  122     return (tokens[1] or None, tokens[0] or None)
  123 
  124 
  125 # NOTE(prashanthr_): Aliasing the scope_message_ids_set function
  126 # to be used in the pools and claims controller as similar
  127 # functionality is required to scope redis id's.
  128 scope_queue_catalogue = scope_claims_set = scope_message_ids_set
  129 scope_queue_index = scope_message_ids_set
  130 
  131 
  132 def msgset_key(queue, project=None):
  133     return scope_message_ids_set(queue, project, MESSAGE_IDS_SUFFIX)
  134 
  135 
  136 def subset_key(queue, project=None):
  137     return scope_subscription_ids_set(queue, project, SUBSCRIPTION_IDS_SUFFIX)
  138 
  139 
  140 def raises_conn_error(func):
  141     """Handles the Redis ConnectionFailure error.
  142 
  143     This decorator catches Redis's ConnectionError
  144     and raises Zaqar's ConnectionError instead.
  145     """
  146 
  147     # Note(prashanthr_) : Try to reuse this utility. Violates DRY
  148     # Can pass exception type into the decorator and create a
  149     # storage level utility.
  150 
  151     @functools.wraps(func)
  152     def wrapper(*args, **kwargs):
  153         try:
  154             return func(*args, **kwargs)
  155         except redis.exceptions.ConnectionError as ex:
  156             LOG.exception(ex)
  157             raise errors.ConnectionError()
  158 
  159     return wrapper
  160 
  161 
  162 def retries_on_connection_error(func):
  163     """Causes the wrapped function to be re-called on ConnectionError.
  164 
  165     This decorator catches Redis ConnectionError and retries
  166     the function call.
  167 
  168     .. Note::
  169        Assumes that the decorated function has defined self.driver.redis_cinf
  170        so that `max_reconnect_attempts` and `reconnect_sleep` can be taken
  171        into account.
  172 
  173     .. Warning:: The decorated function must be idempotent.
  174     """
  175 
  176     @functools.wraps(func)
  177     def wrapper(self, *args, **kwargs):
  178         # TODO(prashanthr_) : Try to reuse this utility. Violates DRY
  179         # Can pass config parameters into the decorator and create a
  180         # storage level utility.
  181 
  182         max_attemps = self.driver.redis_conf.max_reconnect_attempts
  183         sleep_sec = self.driver.redis_conf.reconnect_sleep
  184 
  185         for attempt in range(max_attemps):
  186             try:
  187                 return func(self, *args, **kwargs)
  188 
  189             except redis.exceptions.ConnectionError:
  190                 # NOTE(kgriffs): redis-py will retry once itself,
  191                 # but if the command cannot be sent the second time after
  192                 # disconnecting and reconnecting, the error is raised
  193                 # and we will catch it here.
  194                 #
  195                 # NOTE(kgriffs): When using a sentinel, if a master fails
  196                 # the initial retry will gracefully fail over to the
  197                 # new master if the sentinel failover delay is low enough;
  198                 # if the delay is too long, then redis-py will get a
  199                 # MasterNotFoundError (a subclass of ConnectionError) on
  200                 # it's retry, which will then just get raised and caught
  201                 # here, in which case we will keep retrying until the
  202                 # sentinel completes the failover and stops raising
  203                 # MasterNotFoundError.
  204 
  205                 ex = sys.exc_info()[1]
  206                 LOG.warning(u'Caught ConnectionError, retrying the '
  207                             'call to {0}'.format(func))
  208 
  209                 time.sleep(sleep_sec * (2 ** attempt))
  210         else:
  211             LOG.error(u'Caught ConnectionError, maximum attempts '
  212                       'to {0} exceeded.'.format(func))
  213             raise ex
  214 
  215     return wrapper
  216 
  217 
  218 def msg_claimed_filter(message, now):
  219     """Return True IFF the message is currently claimed."""
  220 
  221     return message.claim_id and (now < message.claim_expires)
  222 
  223 
  224 def msg_delayed_filter(message, now):
  225     """Return True IFF the message is currently delayed."""
  226 
  227     return now < message.delay_expires
  228 
  229 
  230 def msg_echo_filter(message, client_uuid):
  231     """Return True IFF the specified client posted the message."""
  232 
  233     return message.client_uuid == six.text_type(client_uuid)
  234 
  235 
  236 def msg_expired_filter(message, now):
  237     """Return True IFF the message has expired."""
  238 
  239     return message.expires <= now
  240 
  241 
  242 class QueueListCursor(object):
  243 
  244     def __init__(self, client, queues, denormalizer):
  245         self.queue_iter = queues
  246         self.denormalizer = denormalizer
  247         self.client = client
  248 
  249     def __iter__(self):
  250         return self
  251 
  252     @raises_conn_error
  253     def next(self):
  254         curr = next(self.queue_iter)
  255         queue = self.client.hmget(curr, ['c', 'm'])
  256         return self.denormalizer(queue, encodeutils.safe_decode(curr))
  257 
  258     def __next__(self):
  259         return self.next()
  260 
  261 
  262 class SubscriptionListCursor(object):
  263 
  264     def __init__(self, client, subscriptions, denormalizer):
  265         self.subscription_iter = subscriptions
  266         self.denormalizer = denormalizer
  267         self.client = client
  268 
  269     def __iter__(self):
  270         return self
  271 
  272     @raises_conn_error
  273     def next(self):
  274         curr = next(self.subscription_iter)
  275         subscription = self.client.hmget(curr, ['s', 'u', 't', 'e', 'o', 'c'])
  276         # NOTE(flwang): The expired subscription will be removed
  277         # automatically, but the key can't be deleted automatically as well.
  278         # Though we clean up those expired ids when create new subscription,
  279         # we still need to filter them out before a new subscription creation.
  280         if not subscription[0]:
  281             return self.next()
  282         return self.denormalizer(subscription, encodeutils.safe_decode(curr))
  283 
  284     def __next__(self):
  285         return self.next()
  286 
  287 
  288 def scope_flavors_ids_set(flavors_suffix=''):
  289     """Scope flavors set with '.'
  290 
  291     Returns a scoped name for the list of flavors in the form
  292     suffix
  293     """
  294 
  295     return flavors_suffix
  296 
  297 
  298 def scope_project_flavors_ids_set(project=None,
  299                                   flavors_suffix=''):
  300     """Scope flavors set with '.'
  301 
  302     Returns a scoped name for the list of flavors in the form
  303     project-id_suffix
  304     """
  305 
  306     return (normalize_none_str(project) + '.' + flavors_suffix)
  307 
  308 
  309 def scope_name_flavors_ids_set(name=None,
  310                                flavors_suffix=''):
  311     """Scope flavors set with '.'
  312 
  313     Returns a scoped name for the list of flavors in the form
  314     flavors_name_suffix
  315     """
  316 
  317     return (normalize_none_str(name) + '.' + flavors_suffix)
  318 
  319 
  320 def flavor_set_key():
  321     return scope_flavors_ids_set(FLAVORS_IDS_SUFFIX)
  322 
  323 
  324 def flavor_project_subset_key(project=None):
  325     return scope_project_flavors_ids_set(project,
  326                                          FLAVORS_IDS_SUFFIX)
  327 
  328 
  329 def flavor_name_hash_key(name=None):
  330     return scope_name_flavors_ids_set(name,
  331                                       FLAVORS_IDS_SUFFIX)
  332 
  333 
  334 class FlavorListCursor(object):
  335 
  336     def __init__(self, client, flavors, denormalizer):
  337         self.flavor_iter = flavors
  338         self.denormalizer = denormalizer
  339         self.client = client
  340 
  341     def __iter__(self):
  342         return self
  343 
  344     @raises_conn_error
  345     def next(self):
  346         curr = next(self.flavor_iter)
  347         flavor = self.client.hmget(curr, ['f', 'p', 'c'])
  348         flavor_dict = {}
  349         flavor_dict['f'] = flavor[0]
  350         flavor_dict['p'] = flavor[1]
  351         flavor_dict['c'] = flavor[2]
  352         return self.denormalizer(flavor_dict)
  353 
  354     def __next__(self):
  355         return self.next()
  356 
  357 
  358 def scope_pools_ids_set(pools_suffix=''):
  359     """Scope pools set with '.'
  360 
  361     Returns a scoped name for the list of pools in the form
  362     suffix
  363     """
  364 
  365     return pools_suffix
  366 
  367 
  368 def scope_flavor_pools_ids_set(flavor=None,
  369                                pools_suffix=''):
  370     """Scope pools set with '.'
  371 
  372     Returns a scoped name for the list of pools in the form
  373     project-id_suffix
  374     """
  375     return (normalize_none_str(flavor) + '.' +
  376             pools_suffix)
  377 
  378 
  379 def scope_name_pools_ids_set(name=None,
  380                              pools_suffix=''):
  381     """Scope pools set with '.'
  382 
  383     Returns a scoped name for the list of pools in the form
  384     pools_name_suffix
  385     """
  386     return (normalize_none_str(name) + '.' +
  387             pools_suffix)
  388 
  389 
  390 def pools_set_key():
  391     return scope_pools_ids_set(POOLS_IDS_SUFFIX)
  392 
  393 
  394 def pools_subset_key(flavor=None):
  395     return scope_flavor_pools_ids_set(flavor,
  396                                       POOLS_IDS_SUFFIX)
  397 
  398 
  399 def pools_name_hash_key(name=None):
  400     return scope_name_pools_ids_set(name,
  401                                     POOLS_IDS_SUFFIX)
  402 
  403 
  404 class PoolsListCursor(object):
  405 
  406     def __init__(self, client, pools, denormalizer):
  407         self.pools_iter = pools
  408         self.denormalizer = denormalizer
  409         self.client = client
  410 
  411     def __iter__(self):
  412         return self
  413 
  414     @raises_conn_error
  415     def next(self):
  416         curr = next(self.pools_iter)
  417         pools = self.client.hmget(curr, ['pl', 'u', 'w', 'f', 'o'])
  418         pool_dict = {}
  419         pool_dict['pl'] = pools[0]
  420         pool_dict['u'] = pools[1]
  421         pool_dict['w'] = pools[2]
  422         pool_dict['f'] = pools[3]
  423         pool_dict['o'] = pools[4]
  424         return self.denormalizer(pool_dict)
  425 
  426     def __next__(self):
  427         return self.next()