"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/transport/wsgi/v2_0/topic.py" (13 May 2020, 12981 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "topic.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2019 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import copy
   17 import falcon
   18 from oslo_log import log as logging
   19 import six
   20 
   21 from zaqar.common import decorators
   22 from zaqar.i18n import _
   23 from zaqar.storage import errors as storage_errors
   24 from zaqar.transport import acl
   25 from zaqar.transport import utils
   26 from zaqar.transport import validation
   27 from zaqar.transport.wsgi import errors as wsgi_errors
   28 from zaqar.transport.wsgi import utils as wsgi_utils
   29 
   30 LOG = logging.getLogger(__name__)
   31 
   32 
   33 def _get_reserved_metadata(validate):
   34     _reserved_metadata = ['max_messages_post_size', 'default_message_ttl',
   35                           'default_message_delay']
   36     reserved_metadata = {
   37         '_%s' % meta:
   38             validate.get_limit_conf_value(meta)
   39         for meta in _reserved_metadata
   40     }
   41 
   42     return reserved_metadata
   43 
   44 
   45 class ItemResource(object):
   46 
   47     __slots__ = ('_validate', '_topic_controller', '_message_controller',
   48                  '_reserved_metadata')
   49 
   50     def __init__(self, validate, topic_controller, message_controller):
   51         self._validate = validate
   52         self._topic_controller = topic_controller
   53         self._message_controller = message_controller
   54 
   55     @decorators.TransportLog("Topics item")
   56     @acl.enforce("topics:get")
   57     def on_get(self, req, resp, project_id, topic_name):
   58         try:
   59             resp_dict = self._topic_controller.get(topic_name,
   60                                                    project=project_id)
   61             for meta, value in _get_reserved_metadata(self._validate).items():
   62                 if not resp_dict.get(meta):
   63                     resp_dict[meta] = value
   64         except storage_errors.DoesNotExist as ex:
   65             LOG.debug(ex)
   66             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
   67 
   68         except Exception:
   69             description = _(u'Topic metadata could not be retrieved.')
   70             LOG.exception(description)
   71             raise wsgi_errors.HTTPServiceUnavailable(description)
   72 
   73         resp.body = utils.to_json(resp_dict)
   74         # status defaults to 200
   75 
   76     @decorators.TransportLog("Topics item")
   77     @acl.enforce("topics:create")
   78     def on_put(self, req, resp, project_id, topic_name):
   79         try:
   80             # Place JSON size restriction before parsing
   81             self._validate.queue_metadata_length(req.content_length)
   82             # Deserialize Topic metadata
   83             metadata = None
   84             if req.content_length:
   85                 document = wsgi_utils.deserialize(req.stream,
   86                                                   req.content_length)
   87                 metadata = wsgi_utils.sanitize(document)
   88             self._validate.queue_metadata_putting(metadata)
   89         except validation.ValidationFailed as ex:
   90             LOG.debug(ex)
   91             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
   92 
   93         try:
   94             created = self._topic_controller.create(topic_name,
   95                                                     metadata=metadata,
   96                                                     project=project_id)
   97 
   98         except storage_errors.FlavorDoesNotExist as ex:
   99             LOG.exception('Flavor "%s" does not exist', topic_name)
  100             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  101         except Exception:
  102             description = _(u'Topic could not be created.')
  103             LOG.exception(description)
  104             raise wsgi_errors.HTTPServiceUnavailable(description)
  105 
  106         resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
  107         resp.location = req.path
  108 
  109     @decorators.TransportLog("Topics item")
  110     @acl.enforce("topics:delete")
  111     def on_delete(self, req, resp, project_id, topic_name):
  112         LOG.debug(u'Topic item DELETE - topic: %(topic)s, '
  113                   u'project: %(project)s',
  114                   {'topic': topic_name, 'project': project_id})
  115         try:
  116             self._topic_controller.delete(topic_name, project=project_id)
  117 
  118         except Exception:
  119             description = _(u'Topic could not be deleted.')
  120             LOG.exception(description)
  121             raise wsgi_errors.HTTPServiceUnavailable(description)
  122 
  123         resp.status = falcon.HTTP_204
  124 
  125     @decorators.TransportLog("Topics item")
  126     @acl.enforce("topics:update")
  127     def on_patch(self, req, resp, project_id, topic_name):
  128         """Allows one to update a topic's metadata.
  129 
  130         This method expects the user to submit a JSON object. There is also
  131         strict format checking through the use of
  132         jsonschema. Appropriate errors are returned in each case for
  133         badly formatted input.
  134 
  135         :returns: HTTP | 200,400,409,503
  136         """
  137         LOG.debug(u'PATCH topic - name: %s', topic_name)
  138 
  139         try:
  140             # Place JSON size restriction before parsing
  141             self._validate.queue_metadata_length(req.content_length)
  142         except validation.ValidationFailed as ex:
  143             LOG.debug(ex)
  144             raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
  145 
  146         # NOTE(flwang): See below link to get more details about draft 10,
  147         # tools.ietf.org/html/draft-ietf-appsawg-json-patch-10
  148         content_types = {
  149             'application/openstack-messaging-v2.0-json-patch': 10,
  150         }
  151 
  152         if req.content_type not in content_types:
  153             headers = {'Accept-Patch':
  154                        ', '.join(sorted(content_types.keys()))}
  155             msg = _("Accepted media type for PATCH: %s.")
  156             LOG.debug(msg, headers)
  157             raise wsgi_errors.HTTPUnsupportedMediaType(msg % headers)
  158 
  159         if req.content_length:
  160             try:
  161                 changes = utils.read_json(req.stream, req.content_length)
  162                 changes = wsgi_utils.sanitize(changes, doctype=list)
  163             except utils.MalformedJSON as ex:
  164                 LOG.debug(ex)
  165                 description = _(u'Request body could not be parsed.')
  166                 raise wsgi_errors.HTTPBadRequestBody(description)
  167 
  168             except utils.OverflowedJSONInteger as ex:
  169                 LOG.debug(ex)
  170                 description = _(u'JSON contains integer that is too large.')
  171                 raise wsgi_errors.HTTPBadRequestBody(description)
  172 
  173             except Exception:
  174                 # Error while reading from the network/server
  175                 description = _(u'Request body could not be read.')
  176                 LOG.exception(description)
  177                 raise wsgi_errors.HTTPServiceUnavailable(description)
  178         else:
  179             msg = _("PATCH body could not be empty for update.")
  180             LOG.debug(msg)
  181             raise wsgi_errors.HTTPBadRequestBody(msg)
  182 
  183         try:
  184             changes = self._validate.queue_patching(req, changes)
  185 
  186             # NOTE(Eva-i): using 'get_metadata' instead of 'get', so
  187             # QueueDoesNotExist error will be thrown in case of non-existent
  188             # queue.
  189             metadata = self._topic_controller.get_metadata(topic_name,
  190                                                            project=project_id)
  191             reserved_metadata = _get_reserved_metadata(self._validate)
  192             for change in changes:
  193                 change_method_name = '_do_%s' % change['op']
  194                 change_method = getattr(self, change_method_name)
  195                 change_method(req, metadata, reserved_metadata, change)
  196 
  197             self._validate.queue_metadata_putting(metadata)
  198 
  199             self._topic_controller.set_metadata(topic_name,
  200                                                 metadata,
  201                                                 project_id)
  202         except storage_errors.DoesNotExist as ex:
  203             LOG.debug(ex)
  204             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
  205         except validation.ValidationFailed as ex:
  206             LOG.debug(ex)
  207             raise wsgi_errors.HTTPBadRequestBody(six.text_type(ex))
  208         except wsgi_errors.HTTPConflict as ex:
  209             raise
  210         except Exception:
  211             description = _(u'Topic could not be updated.')
  212             LOG.exception(description)
  213             raise wsgi_errors.HTTPServiceUnavailable(description)
  214         for meta, value in _get_reserved_metadata(self._validate).items():
  215             if not metadata.get(meta):
  216                 metadata[meta] = value
  217         resp.body = utils.to_json(metadata)
  218 
  219     def _do_replace(self, req, metadata, reserved_metadata, change):
  220         path = change['path']
  221         path_child = path[1]
  222         value = change['value']
  223         if path_child in metadata or path_child in reserved_metadata:
  224             metadata[path_child] = value
  225         else:
  226             msg = _("Can't replace non-existent object %s.")
  227             raise wsgi_errors.HTTPConflict(msg % path_child)
  228 
  229     def _do_add(self, req, metadata, reserved_metadata, change):
  230         path = change['path']
  231         path_child = path[1]
  232         value = change['value']
  233         metadata[path_child] = value
  234 
  235     def _do_remove(self, req, metadata, reserved_metadata, change):
  236         path = change['path']
  237         path_child = path[1]
  238         if path_child in metadata:
  239             metadata.pop(path_child)
  240         elif path_child not in reserved_metadata:
  241             msg = _("Can't remove non-existent object %s.")
  242             raise wsgi_errors.HTTPConflict(msg % path_child)
  243 
  244 
  245 class CollectionResource(object):
  246 
  247     __slots__ = ('_topic_controller', '_validate', '_reserved_metadata')
  248 
  249     def __init__(self, validate, topic_controller):
  250         self._topic_controller = topic_controller
  251         self._validate = validate
  252 
  253     def _topic_list(self, project_id, path, kfilter, **kwargs):
  254         try:
  255             self._validate.queue_listing(**kwargs)
  256             results = self._topic_controller.list(project=project_id,
  257                                                   kfilter=kfilter, **kwargs)
  258 
  259             # Buffer list of topics
  260             topics = list(next(results))
  261 
  262         except validation.ValidationFailed as ex:
  263             LOG.debug(ex)
  264             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  265 
  266         except Exception:
  267             description = _(u'Topics could not be listed.')
  268             LOG.exception(description)
  269             raise wsgi_errors.HTTPServiceUnavailable(description)
  270 
  271         # Got some. Prepare the response.
  272         kwargs['marker'] = next(results) or kwargs.get('marker', '')
  273         reserved_metadata = _get_reserved_metadata(self._validate).items()
  274         for each_topic in topics:
  275             each_topic['href'] = path + '/' + each_topic['name']
  276             if kwargs.get('detailed'):
  277                 for meta, value in reserved_metadata:
  278                     if not each_topic.get('metadata', {}).get(meta):
  279                         each_topic['metadata'][meta] = value
  280 
  281         return topics, kwargs['marker']
  282 
  283     def _on_get_with_kfilter(self, req, resp, project_id, kfilter={}):
  284         kwargs = {}
  285 
  286         # NOTE(kgriffs): This syntax ensures that
  287         # we don't clobber default values with None.
  288         req.get_param('marker', store=kwargs)
  289         req.get_param_as_int('limit', store=kwargs)
  290         req.get_param_as_bool('detailed', store=kwargs)
  291         req.get_param('name', store=kwargs)
  292 
  293         topics, marker = self._topic_list(project_id,
  294                                           req.path, kfilter, **kwargs)
  295 
  296         links = []
  297         kwargs['marker'] = marker
  298         if topics:
  299             links = [
  300                 {
  301                     'rel': 'next',
  302                     'href': req.path + falcon.to_query_str(kwargs)
  303                 }
  304             ]
  305 
  306         response_body = {
  307             'topics': topics,
  308             'links': links
  309         }
  310 
  311         resp.body = utils.to_json(response_body)
  312         # status defaults to 200
  313 
  314     @decorators.TransportLog("Topics collection")
  315     @acl.enforce("topics:get_all")
  316     def on_get(self, req, resp, project_id):
  317         field = ('marker', 'limit', 'detailed', 'name')
  318         kfilter = copy.deepcopy(req.params)
  319 
  320         for key in req.params.keys():
  321             if key in field:
  322                 kfilter.pop(key)
  323 
  324         kfilter = kfilter if len(kfilter) > 0 else {}
  325         for key in kfilter.keys():
  326             # Since we get the filter value from URL, so need to
  327             # turn the string to integer if using integer filter value.
  328             try:
  329                 kfilter[key] = int(kfilter[key])
  330             except ValueError:
  331                 continue
  332         self._on_get_with_kfilter(req, resp, project_id, kfilter)
  333         # status defaults to 200