"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-9.0.0/zaqar/storage/mongodb/subscriptions.py" (16 Oct 2019, 7417 Bytes) of package /linux/misc/openstack/zaqar-9.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "subscriptions.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 5.0.0_vs_7.0.0.

    1 # Copyright (c) 2014 Catalyst IT Ltd.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
    4 # use this file except in compliance with the License.  You may obtain a copy
    5 # of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations under
   13 # the License.
   14 import datetime
   15 
   16 from oslo_utils import timeutils
   17 import pymongo.errors
   18 
   19 from zaqar.common import utils as common_utils
   20 from zaqar import storage
   21 from zaqar.storage import base
   22 from zaqar.storage import errors
   23 from zaqar.storage.mongodb import utils
   24 
   25 ID_INDEX_FIELDS = [('_id', 1)]
   26 
   27 SUBSCRIPTIONS_INDEX = [
   28     ('s', 1),
   29     ('u', 1),
   30     ('p', 1),
   31 ]
   32 
   33 # For removing expired subscriptions
   34 TTL_INDEX_FIELDS = [
   35     ('e', 1),
   36 ]
   37 
   38 
   39 class SubscriptionController(base.Subscription):
   40     """Implements subscription resource operations using MongoDB.
   41 
   42     Subscriptions are unique by project + queue/topic + subscriber.
   43 
   44     Schema:
   45       's': source :: six.text_type
   46       'u': subscriber:: six.text_type
   47       't': ttl:: int
   48       'e': expires: datetime.datetime
   49       'o': options :: dict
   50       'p': project :: six.text_type
   51       'c': confirmed :: boolean
   52     """
   53 
   54     def __init__(self, *args, **kwargs):
   55         super(SubscriptionController, self).__init__(*args, **kwargs)
   56         self._collection = self.driver.subscriptions_database.subscriptions
   57         self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
   58         # NOTE(flwang): MongoDB will automatically delete the subscription
   59         # from the subscriptions collection when the subscription's 'e' value
   60         # is older than the number of seconds specified in expireAfterSeconds,
   61         # i.e. 0 seconds older in this case. As such, the data expires at the
   62         # specified 'e' value.
   63         self._collection.ensure_index(TTL_INDEX_FIELDS, name='ttl',
   64                                       expireAfterSeconds=0,
   65                                       background=True)
   66 
   67     @utils.raises_conn_error
   68     def list(self, queue, project=None, marker=None,
   69              limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE):
   70         query = {'s': queue, 'p': project}
   71         if marker is not None:
   72             query['_id'] = {'$gt': utils.to_oid(marker)}
   73 
   74         projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1, 'c': 1}
   75 
   76         cursor = self._collection.find(query, projection=projection)
   77         cursor = cursor.limit(limit).sort('_id')
   78         marker_name = {}
   79 
   80         now = timeutils.utcnow_ts()
   81 
   82         def normalizer(record):
   83             marker_name['next'] = record['_id']
   84 
   85             return _basic_subscription(record, now)
   86 
   87         yield utils.HookedCursor(cursor, normalizer)
   88         yield marker_name and marker_name['next']
   89 
   90     @utils.raises_conn_error
   91     def get(self, queue, subscription_id, project=None):
   92         res = self._collection.find_one({'_id': utils.to_oid(subscription_id),
   93                                          'p': project,
   94                                          's': queue})
   95 
   96         if not res:
   97             raise errors.SubscriptionDoesNotExist(subscription_id)
   98 
   99         now = timeutils.utcnow_ts()
  100         return _basic_subscription(res, now)
  101 
  102     @utils.raises_conn_error
  103     def create(self, queue, subscriber, ttl, options, project=None):
  104         source = queue
  105         now = timeutils.utcnow_ts()
  106         now_dt = datetime.datetime.utcfromtimestamp(now)
  107         expires = now_dt + datetime.timedelta(seconds=ttl)
  108         confirmed = False
  109 
  110         try:
  111             res = self._collection.insert_one({'s': source,
  112                                                'u': subscriber,
  113                                                't': ttl,
  114                                                'e': expires,
  115                                                'o': options,
  116                                                'p': project,
  117                                                'c': confirmed})
  118             return res.inserted_id
  119         except pymongo.errors.DuplicateKeyError:
  120             return None
  121 
  122     @utils.raises_conn_error
  123     def exists(self, queue, subscription_id, project=None):
  124         return self._collection.find_one({'_id': utils.to_oid(subscription_id),
  125                                           'p': project}) is not None
  126 
  127     @utils.raises_conn_error
  128     def update(self, queue, subscription_id, project=None, **kwargs):
  129         names = ('subscriber', 'ttl', 'options')
  130         key_transform = lambda x: 'u' if x == 'subscriber' else x[0]
  131         fields = common_utils.fields(kwargs, names,
  132                                      pred=lambda x: x is not None,
  133                                      key_transform=key_transform)
  134         assert fields, ('`subscriber`, `ttl`, '
  135                         'or `options` not found in kwargs')
  136 
  137         new_ttl = fields.get('t')
  138         if new_ttl is not None:
  139             now = timeutils.utcnow_ts()
  140             now_dt = datetime.datetime.utcfromtimestamp(now)
  141             expires = now_dt + datetime.timedelta(seconds=new_ttl)
  142             fields['e'] = expires
  143 
  144         try:
  145             res = self._collection.update_one(
  146                 {'_id': utils.to_oid(subscription_id),
  147                  'p': project,
  148                  's': queue},
  149                 {'$set': fields},
  150                 upsert=False)
  151         except pymongo.errors.DuplicateKeyError:
  152             raise errors.SubscriptionAlreadyExists()
  153         if res.matched_count == 0:
  154             raise errors.SubscriptionDoesNotExist(subscription_id)
  155 
  156     @utils.raises_conn_error
  157     def delete(self, queue, subscription_id, project=None):
  158         self._collection.delete_one({'_id': utils.to_oid(subscription_id),
  159                                      'p': project,
  160                                      's': queue})
  161 
  162     @utils.raises_conn_error
  163     def get_with_subscriber(self, queue, subscriber, project=None):
  164         res = self._collection.find_one({'u': subscriber,
  165                                          's': queue,
  166                                          'p': project})
  167         now = timeutils.utcnow_ts()
  168         return _basic_subscription(res, now)
  169 
  170     @utils.raises_conn_error
  171     def confirm(self, queue, subscription_id, project=None, confirmed=True):
  172 
  173         res = self._collection.update_one(
  174             {'_id': utils.to_oid(subscription_id),
  175              'p': project},
  176             {'$set': {'c': confirmed}},
  177             upsert=False)
  178         if res.matched_count == 0:
  179             raise errors.SubscriptionDoesNotExist(subscription_id)
  180 
  181 
  182 def _basic_subscription(record, now):
  183     # NOTE(Eva-i): unused here record's field 'e' (expires) has changed it's
  184     # format from int (timestamp) to datetime since patch
  185     # 1d122b1671792aff0055ed5396111cd441fb8269. Any future change about
  186     # starting using 'e' field should make sure support both of the formats.
  187     oid = record['_id']
  188     age = now - utils.oid_ts(oid)
  189     confirmed = record.get('c', True)
  190     return {
  191         'id': str(oid),
  192         'source': record['s'],
  193         'subscriber': record['u'],
  194         'ttl': record['t'],
  195         'age': int(age),
  196         'options': record['o'],
  197         'confirmed': confirmed,
  198     }