"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/transport/wsgi/v2_0/claims.py" (13 May 2020, 7469 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "claims.py": 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import falcon
   17 from oslo_log import log as logging
   18 import six
   19 
   20 from zaqar.common import decorators
   21 from zaqar.i18n import _
   22 from zaqar.storage import errors as storage_errors
   23 from zaqar.transport import acl
   24 from zaqar.transport import utils
   25 from zaqar.transport import validation
   26 from zaqar.transport.wsgi import errors as wsgi_errors
   27 from zaqar.transport.wsgi import utils as wsgi_utils
   28 
   29 LOG = logging.getLogger(__name__)
   30 
   31 
   32 class CollectionResource(object):
   33     __slots__ = (
   34         '_claim_controller',
   35         '_validate',
   36         '_claim_post_spec',
   37         '_default_meta',
   38     )
   39 
   40     def __init__(self, wsgi_conf, validate, claim_controller,
   41                  default_claim_ttl, default_grace_ttl):
   42 
   43         self._claim_controller = claim_controller
   44         self._validate = validate
   45 
   46         self._claim_post_spec = (
   47             ('ttl', int, default_claim_ttl),
   48             ('grace', int, default_grace_ttl),
   49         )
   50 
   51         # NOTE(kgriffs): Create this once up front, rather than creating
   52         # a new dict every time, for the sake of performance.
   53         self._default_meta = {
   54             'ttl': default_claim_ttl,
   55             'grace': default_grace_ttl,
   56         }
   57 
   58     @decorators.TransportLog("Claims collection")
   59     @acl.enforce("claims:create")
   60     def on_post(self, req, resp, project_id, queue_name):
   61         # Check for an explicit limit on the # of messages to claim
   62         limit = req.get_param_as_int('limit')
   63         claim_options = {} if limit is None else {'limit': limit}
   64 
   65         # NOTE(kgriffs): Clients may or may not actually include the
   66         # Content-Length header when the body is empty; the following
   67         # check works for both 0 and None.
   68         if not req.content_length:
   69             # No values given, so use defaults
   70             metadata = self._default_meta
   71         else:
   72             # Read claim metadata (e.g., TTL) and raise appropriate
   73             # HTTP errors as needed.
   74             document = wsgi_utils.deserialize(req.stream, req.content_length)
   75             metadata = wsgi_utils.sanitize(document, self._claim_post_spec)
   76 
   77         # Claim some messages
   78         try:
   79             self._validate.claim_creation(metadata, limit=limit)
   80 
   81             cid, msgs = self._claim_controller.create(
   82                 queue_name,
   83                 metadata=metadata,
   84                 project=project_id,
   85                 **claim_options)
   86 
   87             # Buffer claimed messages
   88             # TODO(kgriffs): optimize, along with serialization (below)
   89             resp_msgs = list(msgs)
   90 
   91         except validation.ValidationFailed as ex:
   92             LOG.debug(ex)
   93             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
   94 
   95         except Exception:
   96             description = _(u'Claim could not be created.')
   97             LOG.exception(description)
   98             raise wsgi_errors.HTTPServiceUnavailable(description)
   99 
  100         # Serialize claimed messages, if any. This logic assumes
  101         # the storage driver returned well-formed messages.
  102         if len(resp_msgs) != 0:
  103             base_path = req.path.rpartition('/')[0]
  104             resp_msgs = [wsgi_utils.format_message_v1_1(msg, base_path, cid)
  105                          for msg in resp_msgs]
  106 
  107             resp.location = req.path + '/' + cid
  108             resp.body = utils.to_json({'messages': resp_msgs})
  109             resp.status = falcon.HTTP_201
  110         else:
  111             resp.status = falcon.HTTP_204
  112 
  113 
  114 class ItemResource(object):
  115 
  116     __slots__ = ('_claim_controller', '_validate', '_claim_patch_spec')
  117 
  118     def __init__(self, wsgi_conf, validate, claim_controller,
  119                  default_claim_ttl, default_grace_ttl):
  120         self._claim_controller = claim_controller
  121         self._validate = validate
  122 
  123         self._claim_patch_spec = (
  124             ('ttl', int, default_claim_ttl),
  125             ('grace', int, default_grace_ttl),
  126         )
  127 
  128     @decorators.TransportLog("Claims item")
  129     @acl.enforce("claims:get")
  130     def on_get(self, req, resp, project_id, queue_name, claim_id):
  131         try:
  132             meta, msgs = self._claim_controller.get(
  133                 queue_name,
  134                 claim_id=claim_id,
  135                 project=project_id)
  136 
  137             # Buffer claimed messages
  138             # TODO(kgriffs): Optimize along with serialization (see below)
  139             meta['messages'] = list(msgs)
  140 
  141         except storage_errors.DoesNotExist as ex:
  142             LOG.debug(ex)
  143             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
  144         except Exception:
  145             description = _(u'Claim could not be queried.')
  146             LOG.exception(description)
  147             raise wsgi_errors.HTTPServiceUnavailable(description)
  148 
  149         # Serialize claimed messages
  150         # TODO(kgriffs): Optimize
  151         base_path = req.path.rsplit('/', 2)[0]
  152         meta['messages'] = [wsgi_utils.format_message_v1_1(msg, base_path,
  153                                                            claim_id)
  154                             for msg in meta['messages']]
  155 
  156         meta['href'] = req.path
  157         del meta['id']
  158 
  159         resp.body = utils.to_json(meta)
  160         # status defaults to 200
  161 
  162     @decorators.TransportLog("Claims item")
  163     @acl.enforce("claims:update")
  164     def on_patch(self, req, resp, project_id, queue_name, claim_id):
  165         # Read claim metadata (e.g., TTL) and raise appropriate
  166         # HTTP errors as needed.
  167         document = wsgi_utils.deserialize(req.stream, req.content_length)
  168         metadata = wsgi_utils.sanitize(document, self._claim_patch_spec)
  169 
  170         try:
  171             self._validate.claim_updating(metadata)
  172             self._claim_controller.update(queue_name,
  173                                           claim_id=claim_id,
  174                                           metadata=metadata,
  175                                           project=project_id)
  176 
  177             resp.status = falcon.HTTP_204
  178 
  179         except validation.ValidationFailed as ex:
  180             LOG.debug(ex)
  181             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  182 
  183         except storage_errors.DoesNotExist as ex:
  184             LOG.debug(ex)
  185             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
  186 
  187         except Exception:
  188             description = _(u'Claim could not be updated.')
  189             LOG.exception(description)
  190             raise wsgi_errors.HTTPServiceUnavailable(description)
  191 
  192     @decorators.TransportLog("Claims item")
  193     @acl.enforce("claims:delete")
  194     def on_delete(self, req, resp, project_id, queue_name, claim_id):
  195         try:
  196             self._claim_controller.delete(queue_name,
  197                                           claim_id=claim_id,
  198                                           project=project_id)
  199 
  200             resp.status = falcon.HTTP_204
  201 
  202         except Exception:
  203             description = _(u'Claim could not be deleted.')
  204             LOG.exception(description)
  205             raise wsgi_errors.HTTPServiceUnavailable(description)