"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/redis/queues.py" (13 May 2020, 6811 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "queues.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2014 Prashanth Raghu.
    2 # Licensed under the Apache License, Version 2.0 (the "License");
    3 # you may not use this file except in compliance with the License.
    4 # You may obtain a copy of the License at
    5 #
    6 #    http://www.apache.org/licenses/LICENSE-2.0
    7 #
    8 # Unless required by applicable law or agreed to in writing, software
    9 # distributed under the License is distributed on an "AS IS" BASIS,
   10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   11 # implied.
   12 # See the License for the specific language governing permissions and
   13 # limitations under the License.
   14 
   15 import functools
   16 
   17 import msgpack
   18 from oslo_utils import timeutils
   19 import redis
   20 
   21 from zaqar.common import decorators
   22 from zaqar import storage
   23 from zaqar.storage import errors
   24 from zaqar.storage.redis import utils
   25 
   26 QUEUES_SET_STORE_NAME = 'queues_set'
   27 MESSAGE_IDS_SUFFIX = 'messages'
   28 
   29 
   30 class QueueController(storage.Queue):
   31     """Implements queue resource operations using Redis.
   32 
   33     Queues are scoped by project, which is prefixed to the
   34     queue name.
   35 
   36     Redis Data Structures:
   37 
   38     1. Queue Index (Redis sorted set):
   39 
   40         Set of all queues for the given project, ordered by name.
   41 
   42         Key: <project_id>.queues_set
   43 
   44         +--------+-----------------------------+
   45         |  Id    |  Value                      |
   46         +========+=============================+
   47         |  name  |  <project_id>.<queue_name>  |
   48         +--------+-----------------------------+
   49 
   50     2. Queue Information (Redis hash):
   51 
   52         Key: <project_id>.<queue_name>
   53 
   54         +----------------------+---------+
   55         |  Name                |  Field  |
   56         +======================+=========+
   57         |  metadata            |  m      |
   58         +----------------------+---------+
   59         |  creation timestamp  |  t      |
   60         +----------------------+---------+
   61     """
   62 
   63     def __init__(self, *args, **kwargs):
   64         super(QueueController, self).__init__(*args, **kwargs)
   65         self._client = self.driver.connection
   66         self._packer = msgpack.Packer(encoding='utf-8',
   67                                       use_bin_type=True).pack
   68         self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
   69 
   70     @decorators.lazy_property(write=False)
   71     def _claim_ctrl(self):
   72         return self.driver.claim_controller
   73 
   74     @decorators.lazy_property(write=False)
   75     def _subscription_ctrl(self):
   76         return self.driver.subscription_controller
   77 
   78     def _get_queue_info(self, queue_key, fields, transform=str):
   79         """Get one or more fields from Queue Info."""
   80 
   81         values = self._client.hmget(queue_key, fields)
   82         return [transform(v) for v in values] if transform else values
   83 
   84     @utils.raises_conn_error
   85     @utils.retries_on_connection_error
   86     def _list(self, project=None, kfilter={}, marker=None,
   87               limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False,
   88               name=None):
   89         client = self._client
   90         qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
   91         marker = utils.scope_queue_name(marker, project)
   92         if marker:
   93             rank = client.zrank(qset_key, marker)
   94         else:
   95             rank = None
   96         start = rank + 1 if rank else 0
   97 
   98         cursor = (q for q in client.zrange(qset_key, start,
   99                                            start + limit - 1))
  100         marker_next = {}
  101 
  102         def denormalizer(info, name):
  103             queue = {'name': utils.descope_queue_name(name)}
  104             marker_next['next'] = queue['name']
  105             if detailed:
  106                 queue['metadata'] = self._unpacker(info[1])
  107 
  108             return queue
  109 
  110         yield utils.QueueListCursor(self._client, cursor, denormalizer)
  111         yield marker_next and marker_next['next']
  112 
  113     def _get(self, name, project=None):
  114         """Obtain the metadata from the queue."""
  115         try:
  116             return self.get_metadata(name, project)
  117         except errors.QueueDoesNotExist:
  118             return {}
  119 
  120     @utils.raises_conn_error
  121     def _create(self, name, metadata=None, project=None):
  122         # TODO(prashanthr_): Implement as a lua script.
  123         queue_key = utils.scope_queue_name(name, project)
  124         qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
  125 
  126         # Check if the queue already exists.
  127         if self._exists(name, project):
  128             return False
  129 
  130         queue = {
  131             'c': 0,
  132             'cl': 0,
  133             'm': self._packer(metadata or {}),
  134             't': timeutils.utcnow_ts()
  135         }
  136 
  137         # Pipeline ensures atomic inserts.
  138         with self._client.pipeline() as pipe:
  139             pipe.zadd(qset_key, {queue_key: 1}).hmset(queue_key, queue)
  140 
  141             try:
  142                 pipe.execute()
  143             except redis.exceptions.ResponseError:
  144                 return False
  145 
  146         return True
  147 
  148     @utils.raises_conn_error
  149     @utils.retries_on_connection_error
  150     def _exists(self, name, project=None):
  151         # TODO(prashanthr_): Cache this lookup
  152         queue_key = utils.scope_queue_name(name, project)
  153         qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
  154 
  155         return self._client.zrank(qset_key, queue_key) is not None
  156 
  157     @utils.raises_conn_error
  158     @utils.retries_on_connection_error
  159     def set_metadata(self, name, metadata, project=None):
  160         if not self.exists(name, project):
  161             raise errors.QueueDoesNotExist(name, project)
  162 
  163         key = utils.scope_queue_name(name, project)
  164         fields = {'m': self._packer(metadata)}
  165 
  166         self._client.hmset(key, fields)
  167 
  168     @utils.raises_conn_error
  169     @utils.retries_on_connection_error
  170     def get_metadata(self, name, project=None):
  171         if not self.exists(name, project):
  172             raise errors.QueueDoesNotExist(name, project)
  173 
  174         queue_key = utils.scope_queue_name(name, project)
  175         metadata = self._get_queue_info(queue_key, b'm', None)[0]
  176 
  177         return self._unpacker(metadata)
  178 
  179     @utils.raises_conn_error
  180     @utils.retries_on_connection_error
  181     def _delete(self, name, project=None):
  182         queue_key = utils.scope_queue_name(name, project)
  183         qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
  184 
  185         # NOTE(prashanthr_): Pipelining is used to mitigate race conditions
  186         with self._client.pipeline() as pipe:
  187             pipe.zrem(qset_key, queue_key)
  188             pipe.delete(queue_key)
  189             pipe.execute()
  190 
  191     @utils.raises_conn_error
  192     @utils.retries_on_connection_error
  193     def _stats(self, name, project=None):
  194         pass
  195 
  196     @utils.raises_conn_error
  197     @utils.retries_on_connection_error
  198     def _calculate_resource_count(self, project=None):
  199         client = self._client
  200         qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
  201         return client.zlexcount(qset_key, '-', '+')