"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-7.0.0/zaqar/transport/wsgi/v2_0/queues.py" (30 Aug 2018, 13099 Bytes) of package /linux/misc/openstack/zaqar-7.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "queues.py": 5.0.0_vs_7.0.0.

    1 # Copyright (c) 2013 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import copy
   17 import falcon
   18 from oslo_log import log as logging
   19 import six
   20 
   21 from zaqar.common import decorators
   22 from zaqar.i18n import _
   23 from zaqar.storage import errors as storage_errors
   24 from zaqar.transport import acl
   25 from zaqar.transport import utils
   26 from zaqar.transport import validation
   27 from zaqar.transport.wsgi import errors as wsgi_errors
   28 from zaqar.transport.wsgi import utils as wsgi_utils
   29 
   30 LOG = logging.getLogger(__name__)
   31 
   32 
   33 def _get_reserved_metadata(validate):
   34     _reserved_metadata = ['max_messages_post_size', 'default_message_ttl',
   35                           'default_message_delay']
   36     reserved_metadata = {
   37         '_%s' % meta:
   38             validate.get_limit_conf_value(meta)
   39         for meta in _reserved_metadata
   40     }
   41 
   42     for metadata in ['_dead_letter_queue', '_dead_letter_queue_messages_ttl',
   43                      '_max_claim_count']:
   44         reserved_metadata.update({metadata: None})
   45     return reserved_metadata
   46 
   47 
   48 class ItemResource(object):
   49 
   50     __slots__ = ('_validate', '_queue_controller', '_message_controller',
   51                  '_reserved_metadata')
   52 
   53     def __init__(self, validate, queue_controller, message_controller):
   54         self._validate = validate
   55         self._queue_controller = queue_controller
   56         self._message_controller = message_controller
   57 
   58     @decorators.TransportLog("Queues item")
   59     @acl.enforce("queues:get")
   60     def on_get(self, req, resp, project_id, queue_name):
   61         try:
   62             resp_dict = self._queue_controller.get(queue_name,
   63                                                    project=project_id)
   64             for meta, value in _get_reserved_metadata(self._validate).items():
   65                 if not resp_dict.get(meta):
   66                     resp_dict[meta] = value
   67         except storage_errors.DoesNotExist as ex:
   68             LOG.debug(ex)
   69             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
   70 
   71         except Exception as ex:
   72             LOG.exception(ex)
   73             description = _(u'Queue metadata could not be retrieved.')
   74             raise wsgi_errors.HTTPServiceUnavailable(description)
   75 
   76         resp.body = utils.to_json(resp_dict)
   77         # status defaults to 200
   78 
   79     @decorators.TransportLog("Queues item")
   80     @acl.enforce("queues:create")
   81     def on_put(self, req, resp, project_id, queue_name):
   82         try:
   83             # Place JSON size restriction before parsing
   84             self._validate.queue_metadata_length(req.content_length)
   85             # Deserialize queue metadata
   86             metadata = None
   87             if req.content_length:
   88                 document = wsgi_utils.deserialize(req.stream,
   89                                                   req.content_length)
   90                 metadata = wsgi_utils.sanitize(document)
   91             self._validate.queue_metadata_putting(metadata)
   92         except validation.ValidationFailed as ex:
   93             LOG.debug(ex)
   94             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
   95 
   96         try:
   97             created = self._queue_controller.create(queue_name,
   98                                                     metadata=metadata,
   99                                                     project=project_id)
  100 
  101         except storage_errors.FlavorDoesNotExist as ex:
  102             LOG.exception(ex)
  103             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  104         except Exception as ex:
  105             LOG.exception(ex)
  106             description = _(u'Queue could not be created.')
  107             raise wsgi_errors.HTTPServiceUnavailable(description)
  108 
  109         resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
  110         resp.location = req.path
  111 
  112     @decorators.TransportLog("Queues item")
  113     @acl.enforce("queues:delete")
  114     def on_delete(self, req, resp, project_id, queue_name):
  115         LOG.debug(u'Queue item DELETE - queue: %(queue)s, '
  116                   u'project: %(project)s',
  117                   {'queue': queue_name, 'project': project_id})
  118         try:
  119             self._queue_controller.delete(queue_name, project=project_id)
  120 
  121         except Exception as ex:
  122             LOG.exception(ex)
  123             description = _(u'Queue could not be deleted.')
  124             raise wsgi_errors.HTTPServiceUnavailable(description)
  125 
  126         resp.status = falcon.HTTP_204
  127 
  128     @decorators.TransportLog("Queues item")
  129     @acl.enforce("queues:update")
  130     def on_patch(self, req, resp, project_id, queue_name):
  131         """Allows one to update a queue's metadata.
  132 
  133         This method expects the user to submit a JSON object. There is also
  134         strict format checking through the use of
  135         jsonschema. Appropriate errors are returned in each case for
  136         badly formatted input.
  137 
  138         :returns: HTTP | 200,400,409,503
  139         """
  140         LOG.debug(u'PATCH queue - name: %s', queue_name)
  141 
  142         try:
  143             # Place JSON size restriction before parsing
  144             self._validate.queue_metadata_length(req.content_length)
  145         except validation.ValidationFailed as ex:
  146             LOG.debug(ex)
  147             raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
  148 
  149         # NOTE(flwang): See below link to get more details about draft 10,
  150         # tools.ietf.org/html/draft-ietf-appsawg-json-patch-10
  151         content_types = {
  152             'application/openstack-messaging-v2.0-json-patch': 10,
  153         }
  154 
  155         if req.content_type not in content_types:
  156             headers = {'Accept-Patch':
  157                        ', '.join(sorted(content_types.keys()))}
  158             msg = _("Accepted media type for PATCH: %s.")
  159             LOG.debug(msg, headers)
  160             raise wsgi_errors.HTTPUnsupportedMediaType(msg % headers)
  161 
  162         if req.content_length:
  163             try:
  164                 changes = utils.read_json(req.stream, req.content_length)
  165                 changes = wsgi_utils.sanitize(changes, doctype=list)
  166             except utils.MalformedJSON as ex:
  167                 LOG.debug(ex)
  168                 description = _(u'Request body could not be parsed.')
  169                 raise wsgi_errors.HTTPBadRequestBody(description)
  170 
  171             except utils.OverflowedJSONInteger as ex:
  172                 LOG.debug(ex)
  173                 description = _(u'JSON contains integer that is too large.')
  174                 raise wsgi_errors.HTTPBadRequestBody(description)
  175 
  176             except Exception as ex:
  177                 # Error while reading from the network/server
  178                 LOG.exception(ex)
  179                 description = _(u'Request body could not be read.')
  180                 raise wsgi_errors.HTTPServiceUnavailable(description)
  181         else:
  182             msg = _("PATCH body could not be empty for update.")
  183             LOG.debug(msg)
  184             raise wsgi_errors.HTTPBadRequestBody(msg)
  185 
  186         try:
  187             changes = self._validate.queue_patching(req, changes)
  188 
  189             # NOTE(Eva-i): using 'get_metadata' instead of 'get', so
  190             # QueueDoesNotExist error will be thrown in case of non-existent
  191             # queue.
  192             metadata = self._queue_controller.get_metadata(queue_name,
  193                                                            project=project_id)
  194             reserved_metadata = _get_reserved_metadata(self._validate)
  195             for change in changes:
  196                 change_method_name = '_do_%s' % change['op']
  197                 change_method = getattr(self, change_method_name)
  198                 change_method(req, metadata, reserved_metadata, change)
  199 
  200             self._validate.queue_metadata_putting(metadata)
  201 
  202             self._queue_controller.set_metadata(queue_name,
  203                                                 metadata,
  204                                                 project_id)
  205         except storage_errors.DoesNotExist as ex:
  206             LOG.debug(ex)
  207             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
  208         except validation.ValidationFailed as ex:
  209             LOG.debug(ex)
  210             raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
  211         except wsgi_errors.HTTPConflict as ex:
  212             raise ex
  213         except Exception as ex:
  214             LOG.exception(ex)
  215             description = _(u'Queue could not be updated.')
  216             raise wsgi_errors.HTTPServiceUnavailable(description)
  217         for meta, value in _get_reserved_metadata(self._validate).items():
  218             if not metadata.get(meta):
  219                 metadata[meta] = value
  220         resp.body = utils.to_json(metadata)
  221 
  222     def _do_replace(self, req, metadata, reserved_metadata, change):
  223         path = change['path']
  224         path_child = path[1]
  225         value = change['value']
  226         if path_child in metadata or path_child in reserved_metadata:
  227             metadata[path_child] = value
  228         else:
  229             msg = _("Can't replace non-existent object %s.")
  230             raise wsgi_errors.HTTPConflict(msg % path_child)
  231 
  232     def _do_add(self, req, metadata, reserved_metadata, change):
  233         path = change['path']
  234         path_child = path[1]
  235         value = change['value']
  236         metadata[path_child] = value
  237 
  238     def _do_remove(self, req, metadata, reserved_metadata, change):
  239         path = change['path']
  240         path_child = path[1]
  241         if path_child in metadata:
  242             metadata.pop(path_child)
  243         elif path_child not in reserved_metadata:
  244             msg = _("Can't remove non-existent object %s.")
  245             raise wsgi_errors.HTTPConflict(msg % path_child)
  246 
  247 
  248 class CollectionResource(object):
  249 
  250     __slots__ = ('_queue_controller', '_validate', '_reserved_metadata')
  251 
  252     def __init__(self, validate, queue_controller):
  253         self._queue_controller = queue_controller
  254         self._validate = validate
  255 
  256     def _queue_list(self, project_id, path, kfilter, **kwargs):
  257         try:
  258             self._validate.queue_listing(**kwargs)
  259             results = self._queue_controller.list(project=project_id,
  260                                                   kfilter=kfilter, **kwargs)
  261 
  262             # Buffer list of queues
  263             queues = list(next(results))
  264 
  265         except validation.ValidationFailed as ex:
  266             LOG.debug(ex)
  267             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  268 
  269         except Exception as ex:
  270             LOG.exception(ex)
  271             description = _(u'Queues could not be listed.')
  272             raise wsgi_errors.HTTPServiceUnavailable(description)
  273 
  274         # Got some. Prepare the response.
  275         kwargs['marker'] = next(results) or kwargs.get('marker', '')
  276         reserved_metadata = _get_reserved_metadata(self._validate).items()
  277         for each_queue in queues:
  278             each_queue['href'] = path + '/' + each_queue['name']
  279             if kwargs.get('detailed'):
  280                 for meta, value in reserved_metadata:
  281                     if not each_queue.get('metadata', {}).get(meta):
  282                         each_queue['metadata'][meta] = value
  283 
  284         return queues, kwargs['marker']
  285 
  286     def _on_get_with_kfilter(self, req, resp, project_id, kfilter={}):
  287         kwargs = {}
  288 
  289         # NOTE(kgriffs): This syntax ensures that
  290         # we don't clobber default values with None.
  291         req.get_param('marker', store=kwargs)
  292         req.get_param_as_int('limit', store=kwargs)
  293         req.get_param_as_bool('detailed', store=kwargs)
  294         req.get_param('name', store=kwargs)
  295 
  296         queues, marker = self._queue_list(project_id,
  297                                           req.path, kfilter, **kwargs)
  298 
  299         links = []
  300         kwargs['marker'] = marker
  301         if queues:
  302             links = [
  303                 {
  304                     'rel': 'next',
  305                     'href': req.path + falcon.to_query_str(kwargs)
  306                 }
  307             ]
  308 
  309         response_body = {
  310             'queues': queues,
  311             'links': links
  312         }
  313 
  314         resp.body = utils.to_json(response_body)
  315         # status defaults to 200
  316 
  317     @decorators.TransportLog("Queues collection")
  318     @acl.enforce("queues:get_all")
  319     def on_get(self, req, resp, project_id):
  320         field = ('marker', 'limit', 'detailed', 'name')
  321         kfilter = copy.deepcopy(req.params)
  322 
  323         for key in req.params.keys():
  324             if key in field:
  325                 kfilter.pop(key)
  326 
  327         kfilter = kfilter if len(kfilter) > 0 else {}
  328         for key in kfilter.keys():
  329             # Since we get the filter value from URL, so need to
  330             # turn the string to integer if using integer filter value.
  331             try:
  332                 kfilter[key] = int(kfilter[key])
  333             except ValueError:
  334                 continue
  335         self._on_get_with_kfilter(req, resp, project_id, kfilter)
  336         # status defaults to 200