"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/redis/models.py" (13 May 2020, 10126 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "models.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2014 Prashanth Raghu.
    2 # Copyright (c) 2015 Catalyst IT Ltd.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License");
    5 # you may not use this file except in compliance with the License.
    6 # You may obtain a copy of the License at
    7 #
    8 #    http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS,
   12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   13 # implied.
   14 # See the License for the specific language governing permissions and
   15 # limitations under the License.
   16 
   17 import datetime
   18 import functools
   19 import uuid
   20 
   21 import msgpack
   22 from oslo_utils import encodeutils
   23 from oslo_utils import uuidutils
   24 
   25 MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e',
   26                      b'c.c', b'd', b'cs')
   27 SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c')
   28 
   29 
   30 # TODO(kgriffs): Make similar classes for claims and queues
   31 class MessageEnvelope(object):
   32     """Encapsulates the message envelope (metadata only, no body).
   33 
   34     :param id: Message ID in the form of a hexadecimal UUID. If not
   35         given, one will be automatically generated.
   36     :param ttl: Message TTL in seconds
   37     :param created: Message creation time as a UNIX timestamp
   38     :param client_uuid: UUID of the client that posted the message
   39     :param claim_id: If claimed, the UUID of the claim. Set to None
   40         for messages that have never been claimed.
   41     :param claim_expires: Claim expiration as a UNIX timestamp
   42     """
   43 
   44     __slots__ = [
   45         'id',
   46         'ttl',
   47         'created',
   48         'expires',
   49         'client_uuid',
   50         'claim_id',
   51         'claim_expires',
   52         'claim_count',
   53         'delay_expires',
   54         'checksum',
   55     ]
   56 
   57     def __init__(self, **kwargs):
   58         self.id = _validate_uuid4(kwargs.get('id', uuidutils.generate_uuid()))
   59         self.ttl = kwargs['ttl']
   60         self.created = kwargs['created']
   61         self.expires = kwargs.get('expires', self.created + self.ttl)
   62 
   63         self.client_uuid = _validate_uuid4(str(kwargs['client_uuid']))
   64 
   65         self.claim_id = kwargs.get('claim_id')
   66         if self.claim_id:
   67             _validate_uuid4(self.claim_id)
   68         self.claim_expires = kwargs['claim_expires']
   69         self.claim_count = kwargs.get('claim_count', 0)
   70         self.delay_expires = kwargs.get('delay_expires', 0)
   71         self.checksum = kwargs.get('checksum')
   72 
   73     @staticmethod
   74     def from_hmap(hmap):
   75         kwargs = _hmap_to_msgenv_kwargs(hmap)
   76         return MessageEnvelope(**kwargs)
   77 
   78     @staticmethod
   79     def from_redis(mid, client):
   80         values = client.hmget(mid, MSGENV_FIELD_KEYS)
   81 
   82         # NOTE(kgriffs): If the key does not exist, redis-py returns
   83         # an array of None values.
   84         if values[0] is None:
   85             return None
   86 
   87         return _hmap_kv_to_msgenv(MSGENV_FIELD_KEYS, values)
   88 
   89     @staticmethod
   90     def from_redis_bulk(message_ids, client):
   91         with client.pipeline() as pipe:
   92             for mid in message_ids:
   93                 pipe.hmget(mid, MSGENV_FIELD_KEYS)
   94 
   95             results = pipe.execute()
   96 
   97         message_envs = []
   98         for value_list in results:
   99             if value_list is None:
  100                 env = None
  101             else:
  102                 env = _hmap_kv_to_msgenv(MSGENV_FIELD_KEYS, value_list)
  103 
  104             message_envs.append(env)
  105 
  106         return message_envs
  107 
  108     def to_redis(self, pipe):
  109         hmap = _msgenv_to_hmap(self)
  110 
  111         pipe.hmset(self.id, hmap)
  112         pipe.expire(self.id, self.ttl)
  113 
  114 
  115 class SubscriptionEnvelope(object):
  116     """Encapsulates the subscription envelope."""
  117 
  118     __slots__ = [
  119         'id',
  120         'source',
  121         'subscriber',
  122         'ttl',
  123         'expires',
  124         'options',
  125         'project',
  126         'confirmed',
  127     ]
  128 
  129     def __init__(self, **kwargs):
  130         self.id = kwargs.get('id', uuidutils.generate_uuid())
  131         self.source = kwargs['source']
  132         self.subscriber = kwargs['subscriber']
  133         self.ttl = kwargs['ttl']
  134         self.expires = kwargs.get('expires', float('inf'))
  135         self.options = kwargs['options']
  136         self.confirmed = kwargs.get('confirmed', 'True')
  137 
  138     @staticmethod
  139     def from_redis(sid, client):
  140         values = client.hmget(sid, SUBENV_FIELD_KEYS)
  141 
  142         # NOTE(kgriffs): If the key does not exist, redis-py returns
  143         # an array of None values.
  144         if values[0] is None:
  145             return None
  146 
  147         return _hmap_kv_to_subenv(SUBENV_FIELD_KEYS, values)
  148 
  149     def to_redis(self, pipe):
  150         hmap = _subenv_to_hmap(self)
  151 
  152         pipe.hmset(self.id, hmap)
  153         pipe.expire(self.id, self.ttl)
  154 
  155     def to_basic(self, now):
  156         created = self.expires - self.ttl
  157         is_confirmed = self.confirmed == str(True)
  158         basic_msg = {
  159             'id': self.id,
  160             'source': self.source.decode(),
  161             'subscriber': self.subscriber.decode(),
  162             'ttl': self.ttl,
  163             'age': now - created,
  164             'options': self.options,
  165             'confirmed': is_confirmed,
  166         }
  167 
  168         return basic_msg
  169 
  170 
  171 # NOTE(kgriffs): This could have implemented MessageEnvelope functionality
  172 # by adding an "include_body" param to all the methods, but then you end
  173 # up with tons of if statements that make the code rather ugly.
  174 class Message(MessageEnvelope):
  175     """Represents an entire message, including envelope and body.
  176 
  177     :param id: Message ID in the form of a hexadecimal UUID. If not
  178         given, one will be automatically generated.
  179     :param ttl: Message TTL in seconds
  180     :param created: Message creation time as a UNIX timestamp
  181     :param client_uuid: UUID of the client that posted the message
  182     :param claim_id: If claimed, the UUID of the claim. Set to None
  183         for messages that have never been claimed.
  184     :param claim_expires: Claim expiration as a UNIX timestamp
  185     :param body: Message payload. Must be serializable to mspack.
  186     """
  187 
  188     __slots__ = MessageEnvelope.__slots__ + ['body']
  189 
  190     def __init__(self, **kwargs):
  191         super(Message, self).__init__(**kwargs)
  192         self.body = kwargs['body']
  193 
  194     @staticmethod
  195     def from_hmap(hmap):
  196         kwargs = _hmap_to_msgenv_kwargs(hmap)
  197         kwargs['body'] = _unpack(hmap[b'b'])
  198 
  199         return Message(**kwargs)
  200 
  201     @staticmethod
  202     def from_redis(mid, client):
  203         hmap = client.hgetall(mid)
  204         return Message.from_hmap(hmap) if hmap else None
  205 
  206     @staticmethod
  207     def from_redis_bulk(message_ids, client):
  208         with client.pipeline() as pipe:
  209             for mid in message_ids:
  210                 pipe.hgetall(mid)
  211 
  212             results = pipe.execute()
  213 
  214         messages = [Message.from_hmap(hmap) if hmap else None
  215                     for hmap in results]
  216 
  217         return messages
  218 
  219     def to_redis(self, pipe, include_body=True):
  220         if not include_body:
  221             super(Message, self).to_redis(pipe)
  222 
  223         hmap = _msgenv_to_hmap(self)
  224         hmap['b'] = _pack(self.body)
  225 
  226         pipe.hmset(self.id, hmap)
  227         pipe.expire(self.id, self.ttl)
  228 
  229     def to_basic(self, now, include_created=False):
  230         basic_msg = {
  231             'id': self.id,
  232             'age': now - self.created,
  233             'ttl': self.ttl,
  234             'body': self.body,
  235             'claim_id': self.claim_id,
  236             'claim_count': self.claim_count,
  237         }
  238 
  239         if include_created:
  240             created_iso = datetime.datetime.utcfromtimestamp(
  241                 self.created).strftime('%Y-%m-%dT%H:%M:%SZ')
  242             basic_msg['created'] = created_iso
  243         if self.checksum:
  244             basic_msg['checksum'] = self.checksum
  245         return basic_msg
  246 
  247 
  248 # ==========================================================================
  249 # Helpers
  250 # ==========================================================================
  251 
  252 
  253 _pack = msgpack.Packer(encoding='utf-8', use_bin_type=True).pack
  254 _unpack = functools.partial(msgpack.unpackb, encoding='utf-8')
  255 
  256 
  257 def _hmap_kv_to_msgenv(keys, values):
  258     hmap = dict(zip(keys, values))
  259     kwargs = _hmap_to_msgenv_kwargs(hmap)
  260     return MessageEnvelope(**kwargs)
  261 
  262 
  263 def _hmap_to_msgenv_kwargs(hmap):
  264     claim_id = hmap[b'c']
  265     if claim_id:
  266         claim_id = encodeutils.safe_decode(claim_id)
  267     else:
  268         claim_id = None
  269 
  270     # NOTE(kgriffs): Under Py3K, redis-py converts all strings
  271     # into binary. Woohoo!
  272     res = {
  273         'id': encodeutils.safe_decode(hmap[b'id']),
  274         'ttl': int(hmap[b't']),
  275         'created': int(hmap[b'cr']),
  276         'expires': int(hmap[b'e']),
  277 
  278         'client_uuid': encodeutils.safe_decode(hmap[b'u']),
  279 
  280         'claim_id': claim_id,
  281         'claim_expires': int(hmap[b'c.e']),
  282         'claim_count': int(hmap[b'c.c']),
  283         'delay_expires': int(hmap.get(b'd', 0))
  284     }
  285 
  286     checksum = hmap.get(b'cs')
  287     if checksum:
  288         res['checksum'] = encodeutils.safe_decode(hmap[b'cs'])
  289 
  290     return res
  291 
  292 
  293 def _msgenv_to_hmap(msg):
  294     res = {
  295         'id': msg.id,
  296         't': msg.ttl,
  297         'cr': msg.created,
  298         'e': msg.expires,
  299         'u': msg.client_uuid,
  300         'c': msg.claim_id or '',
  301         'c.e': msg.claim_expires,
  302         'c.c': msg.claim_count,
  303         'd': msg.delay_expires
  304         }
  305     if msg.checksum:
  306         res['cs'] = msg.checksum
  307     return res
  308 
  309 
  310 def _hmap_kv_to_subenv(keys, values):
  311     hmap = dict(zip(keys, values))
  312     kwargs = _hmap_to_subenv_kwargs(hmap)
  313     return SubscriptionEnvelope(**kwargs)
  314 
  315 
  316 def _hmap_to_subenv_kwargs(hmap):
  317     # NOTE(kgriffs): Under Py3K, redis-py converts all strings
  318     # into binary. Woohoo!
  319     return {
  320         'id': encodeutils.safe_decode(hmap[b'id']),
  321         'source': hmap[b's'],
  322         'subscriber': hmap[b'u'],
  323         'ttl': int(hmap[b't']),
  324         'expires': int(hmap[b'e']),
  325         'options': _unpack(hmap[b'o']),
  326         'confirmed': hmap[b'c']
  327     }
  328 
  329 
  330 def _subenv_to_hmap(msg):
  331     return {
  332         'id': msg.id,
  333         's': msg.source,
  334         'u': msg.subscriber,
  335         't': msg.ttl,
  336         'e': msg.expires,
  337         'o': msg.options
  338     }
  339 
  340 
  341 def _validate_uuid4(_uuid):
  342     uuid.UUID(str(_uuid), version=4)
  343     return _uuid