"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/swift/claims.py" (13 May 2020, 9895 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "claims.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License");
    2 # you may not use this file except in compliance with the License.
    3 # You may obtain a copy of the License at
    4 #
    5 #    http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS,
    9 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   10 # implied.
   11 # See the License for the specific language governing permissions and
   12 # limitations under the License.
   13 
   14 import hashlib
   15 import math
   16 
   17 from oslo_serialization import jsonutils
   18 from oslo_utils import timeutils
   19 from oslo_utils import uuidutils
   20 import swiftclient
   21 
   22 from zaqar.common import decorators
   23 from zaqar import storage
   24 from zaqar.storage import errors
   25 from zaqar.storage.swift import utils
   26 
   27 
   28 class ClaimController(storage.Claim):
   29     """Implements claims resource operations with swift backend
   30 
   31     Claims are scoped by project + queue.
   32     """
   33     def __init__(self, *args, **kwargs):
   34         super(ClaimController, self).__init__(*args, **kwargs)
   35         self._client = self.driver.connection
   36 
   37     @decorators.lazy_property(write=False)
   38     def _queue_ctrl(self):
   39         return self.driver.queue_controller
   40 
   41     def _exists(self, queue, claim_id, project=None):
   42         try:
   43             return self._client.head_object(
   44                 utils._claim_container(queue, project),
   45                 claim_id)
   46         except swiftclient.ClientException as exc:
   47             if exc.http_status == 404:
   48                 raise errors.ClaimDoesNotExist(claim_id, queue, project)
   49             raise
   50 
   51     def _get(self, queue, claim_id, project=None):
   52         try:
   53             container = utils._claim_container(queue, project)
   54             headers, claim = self._client.get_object(container, claim_id)
   55         except swiftclient.ClientException as exc:
   56             if exc.http_status != 404:
   57                 raise
   58             return
   59         now = timeutils.utcnow_ts(True)
   60         return {
   61             'id': claim_id,
   62             'age': now - float(headers['x-timestamp']),
   63             'ttl': int(headers['x-delete-at']) - math.floor(now),
   64         }
   65 
   66     def get(self, queue, claim_id, project=None):
   67         message_ctrl = self.driver.message_controller
   68         now = timeutils.utcnow_ts(True)
   69         self._exists(queue, claim_id, project)
   70 
   71         container = utils._claim_container(queue, project)
   72 
   73         headers, claim_obj = self._client.get_object(container, claim_id)
   74 
   75         def g():
   76             for msg_id in jsonutils.loads(claim_obj):
   77                 try:
   78                     headers, msg = message_ctrl._find_message(queue, msg_id,
   79                                                               project)
   80                 except errors.MessageDoesNotExist:
   81                     continue
   82                 else:
   83                     yield utils._message_to_json(msg_id, msg, headers, now)
   84 
   85         claim_meta = {
   86             'id': claim_id,
   87             'age': now - float(headers['x-timestamp']),
   88             'ttl': int(headers['x-delete-at']) - math.floor(now),
   89         }
   90 
   91         return claim_meta, g()
   92 
   93     def create(self, queue, metadata, project=None,
   94                limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
   95         message_ctrl = self.driver.message_controller
   96         queue_ctrl = self.driver.queue_controller
   97         try:
   98             queue_meta = queue_ctrl.get_metadata(queue, project=project)
   99         except errors.QueueDoesNotExist:
  100             return None, iter([])
  101         ttl = metadata['ttl']
  102         grace = metadata['grace']
  103         msg_ts = ttl + grace
  104         claim_id = uuidutils.generate_uuid()
  105 
  106         dlq = True if ('_max_claim_count' in queue_meta and
  107                        '_dead_letter_queue' in queue_meta) else False
  108 
  109         include_delayed = False if queue_meta.get('_default_message_delay',
  110                                                   0) else True
  111 
  112         messages, marker = message_ctrl._list(queue, project, limit=limit,
  113                                               include_claimed=False,
  114                                               include_delayed=include_delayed)
  115 
  116         claimed = []
  117         for msg in messages:
  118             claim_count = msg.get('claim_count', 0)
  119             md5 = hashlib.md5()
  120             md5.update(
  121                 jsonutils.dumps(
  122                     {'body': msg['body'], 'claim_id': None,
  123                      'ttl': msg['ttl'],
  124                      'claim_count': claim_count}).encode('utf-8'))
  125             md5 = md5.hexdigest()
  126             msg_ttl = max(msg['ttl'], msg_ts)
  127             move_to_dlq = False
  128             if dlq:
  129                 if claim_count < queue_meta['_max_claim_count']:
  130                     # Check if the message's claim count has exceeded the
  131                     # max claim count defined in the queue, if not ,
  132                     # Save the new max claim count for message
  133                     claim_count = claim_count + 1
  134                 else:
  135                     # if the message's claim count has exceeded the
  136                     # max claim count defined in the queue, move the
  137                     # message to the dead letter queue.
  138                     # NOTE: We're moving message by changing the
  139                     # project info directly. That means, the queue and dead
  140                     # letter queue must be created on the same pool.
  141                     dlq_ttl = queue_meta.get("_dead_letter_queue_messages_ttl")
  142                     move_to_dlq = True
  143                     if dlq_ttl:
  144                         msg_ttl = dlq_ttl
  145 
  146             content = jsonutils.dumps(
  147                 {'body': msg['body'], 'claim_id': claim_id,
  148                  'ttl': msg_ttl,
  149                  'claim_count': claim_count})
  150 
  151             if move_to_dlq:
  152                 dead_letter_queue = queue_meta.get("_dead_letter_queue")
  153                 utils._put_or_create_container(
  154                     self._client,
  155                     utils._message_container(dead_letter_queue, project),
  156                     msg['id'],
  157                     content,
  158                     content_type='application/json',
  159                     headers={'x-object-meta-clientid': msg['client_uuid'],
  160                              'if-match': md5,
  161                              'x-object-meta-claimid': claim_id,
  162                              'x-delete-after': msg_ttl})
  163 
  164                 message_ctrl._delete(queue, msg['id'], project)
  165 
  166             else:
  167                 try:
  168                     self._client.put_object(
  169                         utils._message_container(queue, project),
  170                         msg['id'],
  171                         content,
  172                         content_type='application/json',
  173                         headers={'x-object-meta-clientid': msg['client_uuid'],
  174                                  'if-match': md5,
  175                                  'x-object-meta-claimid': claim_id,
  176                                  'x-delete-after': msg_ttl})
  177                 except swiftclient.ClientException as exc:
  178                     if exc.http_status == 412:
  179                         continue
  180                     raise
  181                 else:
  182                     msg['claim_id'] = claim_id
  183                     msg['ttl'] = msg_ttl
  184                     msg['claim_count'] = claim_count
  185                     claimed.append(msg)
  186 
  187         utils._put_or_create_container(
  188             self._client,
  189             utils._claim_container(queue, project),
  190             claim_id,
  191             jsonutils.dumps([msg['id'] for msg in claimed]),
  192             content_type='application/json',
  193             headers={'x-delete-after': ttl}
  194         )
  195 
  196         return claim_id, claimed
  197 
  198     def update(self, queue, claim_id, metadata, project=None):
  199         if not self._queue_ctrl.exists(queue, project):
  200             raise errors.QueueDoesNotExist(queue, project)
  201 
  202         container = utils._claim_container(queue, project)
  203         try:
  204             headers, obj = self._client.get_object(container, claim_id)
  205         except swiftclient.ClientException as exc:
  206             if exc.http_status == 404:
  207                 raise errors.ClaimDoesNotExist(claim_id, queue, project)
  208             raise
  209 
  210         self._client.put_object(container, claim_id, obj,
  211                                 content_type='application/json',
  212                                 headers={'x-delete-after': metadata['ttl']})
  213 
  214     def delete(self, queue, claim_id, project=None):
  215         message_ctrl = self.driver.message_controller
  216         try:
  217             header, obj = self._client.get_object(
  218                 utils._claim_container(queue, project),
  219                 claim_id)
  220             for msg_id in jsonutils.loads(obj):
  221                 try:
  222                     headers, msg = message_ctrl._find_message(queue, msg_id,
  223                                                               project)
  224                 except errors.MessageDoesNotExist:
  225                     continue
  226                 md5 = hashlib.md5()
  227                 md5.update(msg)
  228                 md5 = md5.hexdigest()
  229                 msg = jsonutils.loads(msg)
  230                 content = jsonutils.dumps(
  231                     {'body': msg['body'], 'claim_id': None, 'ttl': msg['ttl']})
  232                 client_id = headers['x-object-meta-clientid']
  233                 self._client.put_object(
  234                     utils._message_container(queue, project),
  235                     msg_id,
  236                     content,
  237                     content_type='application/json',
  238                     headers={'x-object-meta-clientid': client_id,
  239                              'if-match': md5,
  240                              'x-delete-at': headers['x-delete-at']})
  241 
  242             self._client.delete_object(
  243                 utils._claim_container(queue, project),
  244                 claim_id)
  245         except swiftclient.ClientException as exc:
  246             if exc.http_status != 404:
  247                 raise