"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/redis/subscriptions.py" (13 May 2020, 11328 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "subscriptions.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2015 Catalyst IT Ltd.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
    4 # use this file except in compliance with the License.  You may obtain a copy
    5 # of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations under
   13 # the License.
   14 
   15 import functools
   16 
   17 import msgpack
   18 from oslo_utils import timeutils
   19 from oslo_utils import uuidutils
   20 import redis
   21 
   22 from zaqar.common import utils as common_utils
   23 from zaqar.storage import base
   24 from zaqar.storage import errors
   25 from zaqar.storage.redis import models
   26 from zaqar.storage.redis import utils
   27 
   28 
   29 SubscriptionEnvelope = models.SubscriptionEnvelope
   30 
   31 SUBSET_INDEX_KEY = 'subset_index'
   32 SUBSCRIPTION_IDS_SUFFIX = 'subscriptions'
   33 
   34 
   35 class SubscriptionController(base.Subscription):
   36     """Implements subscription resource operations using Redis.
   37 
   38     Subscriptions are unique by project + queue + subscriber.
   39 
   40     Schema:
   41       's': source :: six.text_type
   42       'u': subscriber:: six.text_type
   43       't': ttl:: int
   44       'e': expires: int
   45       'o': options :: dict
   46       'p': project :: six.text_type
   47     """
   48     def __init__(self, *args, **kwargs):
   49         super(SubscriptionController, self).__init__(*args, **kwargs)
   50         self._client = self.driver.connection
   51         self._packer = msgpack.Packer(encoding='utf-8',
   52                                       use_bin_type=True).pack
   53         self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
   54 
   55     @utils.raises_conn_error
   56     @utils.retries_on_connection_error
   57     def list(self, queue, project=None, marker=None, limit=10):
   58         client = self._client
   59         subset_key = utils.scope_subscription_ids_set(queue,
   60                                                       project,
   61                                                       SUBSCRIPTION_IDS_SUFFIX)
   62         if marker:
   63             rank = client.zrank(subset_key, marker)
   64         else:
   65             rank = None
   66         start = rank + 1 if rank is not None else 0
   67 
   68         cursor = (q for q in client.zrange(subset_key, start,
   69                                            start + limit - 1))
   70         marker_next = {}
   71 
   72         def denormalizer(record, sid):
   73             now = timeutils.utcnow_ts()
   74             ttl = int(record[2])
   75             expires = int(record[3])
   76             created = expires - ttl
   77             is_confirmed = 1
   78             if len(record) == 6:
   79                 is_confirmed = record[5]
   80             ret = {
   81                 'id': sid,
   82                 'source': record[0].decode(),
   83                 'subscriber': record[1].decode(),
   84                 'ttl': ttl,
   85                 'age': now - created,
   86                 'options': self._unpacker(record[4]),
   87                 'confirmed': is_confirmed.decode(),
   88             }
   89             marker_next['next'] = sid
   90 
   91             return ret
   92 
   93         yield utils.SubscriptionListCursor(self._client, cursor, denormalizer)
   94         yield marker_next and marker_next['next']
   95 
   96     @utils.raises_conn_error
   97     @utils.retries_on_connection_error
   98     def get(self, queue, subscription_id, project=None):
   99         subscription = None
  100         if self.exists(queue, subscription_id, project):
  101             subscription = SubscriptionEnvelope.from_redis(subscription_id,
  102                                                            self._client)
  103         if subscription:
  104             now = timeutils.utcnow_ts()
  105             return subscription.to_basic(now)
  106         else:
  107             raise errors.SubscriptionDoesNotExist(subscription_id)
  108 
  109     @utils.raises_conn_error
  110     @utils.retries_on_connection_error
  111     def create(self, queue, subscriber, ttl, options, project=None):
  112         subscription_id = uuidutils.generate_uuid()
  113         subset_key = utils.scope_subscription_ids_set(queue,
  114                                                       project,
  115                                                       SUBSCRIPTION_IDS_SUFFIX)
  116 
  117         source = queue
  118         now = timeutils.utcnow_ts()
  119         expires = now + ttl
  120         confirmed = 0
  121 
  122         subscription = {'id': subscription_id,
  123                         's': source,
  124                         'u': subscriber,
  125                         't': ttl,
  126                         'e': expires,
  127                         'o': self._packer(options),
  128                         'p': project,
  129                         'c': confirmed}
  130 
  131         try:
  132             # Pipeline ensures atomic inserts.
  133             with self._client.pipeline() as pipe:
  134                 if not self._is_duplicated_subscriber(subscriber,
  135                                                       queue,
  136                                                       project):
  137                     pipe.zadd(subset_key, {subscription_id: 1}).hmset(
  138                         subscription_id, subscription)
  139                     pipe.expire(subscription_id, ttl)
  140                     pipe.execute()
  141                 else:
  142                     return None
  143             return subscription_id
  144         except redis.exceptions.ResponseError:
  145             return None
  146 
  147     def _is_duplicated_subscriber(self, subscriber, queue, project):
  148         """Check if the subscriber is existing or not.
  149 
  150         Given the limitation of Redis' expires(), it's hard to auto expire
  151         subscriber from the set and subscription id from the sorted set, so
  152         this method is used to do a ugly duplication check when adding a new
  153         subscription so that we don't need the set for subscriber. And as a
  154         side effect, this method will remove the unreachable subscription's id
  155         from the sorted set.
  156         """
  157         subset_key = utils.scope_subscription_ids_set(queue,
  158                                                       project,
  159                                                       SUBSCRIPTION_IDS_SUFFIX)
  160         try:
  161             sub_ids = (q for q in self._client.zrange(subset_key, 0, -1))
  162             for s_id in sub_ids:
  163                 subscription = self._client.hmget(s_id,
  164                                                   ['s', 'u', 't', 'o', 'c'])
  165                 if subscription == [None, None, None, None, None]:
  166                     # NOTE(flwang): Under this check, that means the
  167                     # subscription has been expired. So redis can't get
  168                     # the subscription but the id is still there. So let's
  169                     # delete the id for clean up.
  170                     self._client.zrem(subset_key, s_id)
  171                 if subscription[1].decode() == subscriber:
  172                     return True
  173             return False
  174         except redis.exceptions.ResponseError:
  175             return True
  176 
  177     @utils.raises_conn_error
  178     @utils.retries_on_connection_error
  179     def exists(self, queue, subscription_id, project=None):
  180         subset_key = utils.scope_subscription_ids_set(queue, project,
  181                                                       SUBSCRIPTION_IDS_SUFFIX)
  182 
  183         return self._client.zrank(subset_key, subscription_id) is not None
  184 
  185     @utils.raises_conn_error
  186     @utils.retries_on_connection_error
  187     def update(self, queue, subscription_id, project=None, **kwargs):
  188         names = ('subscriber', 'ttl', 'options')
  189         key_transform = lambda x: 'u' if x == 'subscriber' else x[0]
  190         fields = common_utils.fields(kwargs, names,
  191                                      pred=lambda x: x is not None,
  192                                      key_transform=key_transform)
  193         assert fields, ('`subscriber`, `ttl`, '
  194                         'or `options` not found in kwargs')
  195 
  196         # Let's get our subscription by ID. If it does not exist,
  197         # SubscriptionDoesNotExist error will be raised internally.
  198         subscription_to_update = self.get(queue, subscription_id,
  199                                           project=project)
  200 
  201         new_subscriber = fields.get('u')
  202 
  203         # Let's do some checks to prevent subscription duplication.
  204         if new_subscriber:
  205             # Check if 'new_subscriber' is really new for our subscription.
  206             if subscription_to_update['subscriber'] != new_subscriber:
  207                 # It's new. We should raise error if this subscriber already
  208                 # exists for the queue and project.
  209                 if self._is_duplicated_subscriber(new_subscriber, queue,
  210                                                   project):
  211                     raise errors.SubscriptionAlreadyExists()
  212 
  213         # NOTE(Eva-i): if there are new options, we need to pack them before
  214         # sending to the database.
  215         new_options = fields.get('o')
  216         if new_options is not None:
  217             fields['o'] = self._packer(new_options)
  218 
  219         new_ttl = fields.get('t')
  220         if new_ttl is not None:
  221             now = timeutils.utcnow_ts()
  222             expires = now + new_ttl
  223             fields['e'] = expires
  224 
  225         # Pipeline ensures atomic inserts.
  226         with self._client.pipeline() as pipe:
  227             pipe.hmset(subscription_id, fields)
  228             if new_ttl is not None:
  229                 pipe.expire(subscription_id, new_ttl)
  230             pipe.execute()
  231 
  232     @utils.raises_conn_error
  233     @utils.retries_on_connection_error
  234     def delete(self, queue, subscription_id, project=None):
  235         subset_key = utils.scope_subscription_ids_set(queue, project,
  236                                                       SUBSCRIPTION_IDS_SUFFIX)
  237 
  238         if self._client.zrank(subset_key, subscription_id) is not None:
  239             # NOTE(prashanthr_): Pipelining is used to mitigate race conditions
  240             with self._client.pipeline() as pipe:
  241                 pipe.zrem(subset_key, subscription_id)
  242                 pipe.delete(subscription_id)
  243                 pipe.execute()
  244 
  245     @utils.raises_conn_error
  246     @utils.retries_on_connection_error
  247     def get_with_subscriber(self, queue, subscriber, project=None):
  248         subset_key = utils.scope_subscription_ids_set(queue,
  249                                                       project,
  250                                                       SUBSCRIPTION_IDS_SUFFIX)
  251         sub_ids = (q for q in self._client.zrange(subset_key, 0, -1))
  252         for s_id in sub_ids:
  253             subscription = self._client.hmget(s_id,
  254                                               ['s', 'u', 't', 'o', 'c'])
  255             if subscription[1].decode() == subscriber:
  256                 subscription = SubscriptionEnvelope.from_redis(s_id,
  257                                                                self._client)
  258                 now = timeutils.utcnow_ts()
  259                 return subscription.to_basic(now)
  260 
  261     @utils.raises_conn_error
  262     @utils.retries_on_connection_error
  263     def confirm(self, queue, subscription_id, project=None, confirmed=True):
  264         # Let's get our subscription by ID. If it does not exist,
  265         # SubscriptionDoesNotExist error will be raised internally.
  266         self.get(queue, subscription_id, project=project)
  267         confirmed = 1 if confirmed else 0
  268         fields = {'c': confirmed}
  269         with self._client.pipeline() as pipe:
  270             pipe.hmset(subscription_id, fields)
  271             pipe.execute()