"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/transport/wsgi/v1_1/claims.py" (13 May 2020, 7301 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "claims.py": 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import falcon
   17 from oslo_log import log as logging
   18 import six
   19 
   20 from zaqar.common import decorators
   21 from zaqar.i18n import _
   22 from zaqar.storage import errors as storage_errors
   23 from zaqar.transport import utils
   24 from zaqar.transport import validation
   25 from zaqar.transport.wsgi import errors as wsgi_errors
   26 from zaqar.transport.wsgi import utils as wsgi_utils
   27 
   28 LOG = logging.getLogger(__name__)
   29 
   30 
   31 class CollectionResource(object):
   32     __slots__ = (
   33         '_claim_controller',
   34         '_validate',
   35         '_claim_post_spec',
   36         '_default_meta',
   37     )
   38 
   39     def __init__(self, wsgi_conf, validate, claim_controller,
   40                  default_claim_ttl, default_grace_ttl):
   41 
   42         self._claim_controller = claim_controller
   43         self._validate = validate
   44 
   45         self._claim_post_spec = (
   46             ('ttl', int, default_claim_ttl),
   47             ('grace', int, default_grace_ttl),
   48         )
   49 
   50         # NOTE(kgriffs): Create this once up front, rather than creating
   51         # a new dict every time, for the sake of performance.
   52         self._default_meta = {
   53             'ttl': default_claim_ttl,
   54             'grace': default_grace_ttl,
   55         }
   56 
   57     @decorators.TransportLog("Claims collection")
   58     def on_post(self, req, resp, project_id, queue_name):
   59         # Check for an explicit limit on the # of messages to claim
   60         limit = req.get_param_as_int('limit')
   61         claim_options = {} if limit is None else {'limit': limit}
   62 
   63         # NOTE(kgriffs): Clients may or may not actually include the
   64         # Content-Length header when the body is empty; the following
   65         # check works for both 0 and None.
   66         if not req.content_length:
   67             # No values given, so use defaults
   68             metadata = self._default_meta
   69         else:
   70             # Read claim metadata (e.g., TTL) and raise appropriate
   71             # HTTP errors as needed.
   72             document = wsgi_utils.deserialize(req.stream, req.content_length)
   73             metadata = wsgi_utils.sanitize(document, self._claim_post_spec)
   74 
   75         # Claim some messages
   76         try:
   77             self._validate.claim_creation(metadata, limit=limit)
   78 
   79             cid, msgs = self._claim_controller.create(
   80                 queue_name,
   81                 metadata=metadata,
   82                 project=project_id,
   83                 **claim_options)
   84 
   85             # Buffer claimed messages
   86             # TODO(kgriffs): optimize, along with serialization (below)
   87             resp_msgs = list(msgs)
   88 
   89         except validation.ValidationFailed as ex:
   90             LOG.debug(ex)
   91             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
   92 
   93         except Exception:
   94             description = _(u'Claim could not be created.')
   95             LOG.exception(description)
   96             raise wsgi_errors.HTTPServiceUnavailable(description)
   97 
   98         # Serialize claimed messages, if any. This logic assumes
   99         # the storage driver returned well-formed messages.
  100         if len(resp_msgs) != 0:
  101             base_path = req.path.rpartition('/')[0]
  102             resp_msgs = [wsgi_utils.format_message_v1_1(msg, base_path, cid)
  103                          for msg in resp_msgs]
  104 
  105             resp.location = req.path + '/' + cid
  106             resp.body = utils.to_json({'messages': resp_msgs})
  107             resp.status = falcon.HTTP_201
  108         else:
  109             resp.status = falcon.HTTP_204
  110 
  111 
  112 class ItemResource(object):
  113 
  114     __slots__ = ('_claim_controller', '_validate', '_claim_patch_spec')
  115 
  116     def __init__(self, wsgi_conf, validate, claim_controller,
  117                  default_claim_ttl, default_grace_ttl):
  118         self._claim_controller = claim_controller
  119         self._validate = validate
  120 
  121         self._claim_patch_spec = (
  122             ('ttl', int, default_claim_ttl),
  123             ('grace', int, default_grace_ttl),
  124         )
  125 
  126     @decorators.TransportLog("Claim item")
  127     def on_get(self, req, resp, project_id, queue_name, claim_id):
  128         try:
  129             meta, msgs = self._claim_controller.get(
  130                 queue_name,
  131                 claim_id=claim_id,
  132                 project=project_id)
  133 
  134             # Buffer claimed messages
  135             # TODO(kgriffs): Optimize along with serialization (see below)
  136             meta['messages'] = list(msgs)
  137 
  138         except storage_errors.DoesNotExist as ex:
  139             LOG.debug(ex)
  140             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
  141         except Exception:
  142             description = _(u'Claim could not be queried.')
  143             LOG.exception(description)
  144             raise wsgi_errors.HTTPServiceUnavailable(description)
  145 
  146         # Serialize claimed messages
  147         # TODO(kgriffs): Optimize
  148         base_path = req.path.rsplit('/', 2)[0]
  149         meta['messages'] = [wsgi_utils.format_message_v1_1(msg, base_path,
  150                                                            claim_id)
  151                             for msg in meta['messages']]
  152 
  153         meta['href'] = req.path
  154         del meta['id']
  155 
  156         resp.body = utils.to_json(meta)
  157         # status defaults to 200
  158 
  159     @decorators.TransportLog("Claim item")
  160     def on_patch(self, req, resp, project_id, queue_name, claim_id):
  161         # Read claim metadata (e.g., TTL) and raise appropriate
  162         # HTTP errors as needed.
  163         document = wsgi_utils.deserialize(req.stream, req.content_length)
  164         metadata = wsgi_utils.sanitize(document, self._claim_patch_spec)
  165 
  166         try:
  167             self._validate.claim_updating(metadata)
  168             self._claim_controller.update(queue_name,
  169                                           claim_id=claim_id,
  170                                           metadata=metadata,
  171                                           project=project_id)
  172 
  173             resp.status = falcon.HTTP_204
  174 
  175         except validation.ValidationFailed as ex:
  176             LOG.debug(ex)
  177             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  178 
  179         except storage_errors.DoesNotExist as ex:
  180             LOG.debug(ex)
  181             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
  182 
  183         except Exception:
  184             description = _(u'Claim could not be updated.')
  185             LOG.exception(description)
  186             raise wsgi_errors.HTTPServiceUnavailable(description)
  187 
  188     @decorators.TransportLog("Claim item")
  189     def on_delete(self, req, resp, project_id, queue_name, claim_id):
  190         try:
  191             self._claim_controller.delete(queue_name,
  192                                           claim_id=claim_id,
  193                                           project=project_id)
  194 
  195             resp.status = falcon.HTTP_204
  196 
  197         except Exception:
  198             description = _(u'Claim could not be deleted.')
  199             LOG.exception(description)
  200             raise wsgi_errors.HTTPServiceUnavailable(description)