"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-7.0.0/zaqar/transport/wsgi/v2_0/messages.py" (30 Aug 2018, 15271 Bytes) of package /linux/misc/openstack/zaqar-7.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "messages.py": 5.0.0_vs_7.0.0.

    1 # Copyright (c) 2013 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import falcon
   17 from oslo_log import log as logging
   18 import six
   19 
   20 from zaqar.common import decorators
   21 from zaqar.common.transport.wsgi import helpers as wsgi_helpers
   22 from zaqar.i18n import _
   23 from zaqar.storage import errors as storage_errors
   24 from zaqar.transport import acl
   25 from zaqar.transport import utils
   26 from zaqar.transport import validation
   27 from zaqar.transport.wsgi import errors as wsgi_errors
   28 from zaqar.transport.wsgi import utils as wsgi_utils
   29 
   30 LOG = logging.getLogger(__name__)
   31 
   32 
   33 class CollectionResource(object):
   34 
   35     __slots__ = (
   36         '_message_controller',
   37         '_queue_controller',
   38         '_wsgi_conf',
   39         '_validate',
   40         '_default_message_ttl'
   41     )
   42 
   43     def __init__(self, wsgi_conf, validate,
   44                  message_controller, queue_controller,
   45                  default_message_ttl):
   46 
   47         self._wsgi_conf = wsgi_conf
   48         self._validate = validate
   49         self._message_controller = message_controller
   50         self._queue_controller = queue_controller
   51         self._default_message_ttl = default_message_ttl
   52 
   53     # ----------------------------------------------------------------------
   54     # Helpers
   55     # ----------------------------------------------------------------------
   56 
   57     def _get_by_id(self, base_path, project_id, queue_name, ids):
   58         """Returns one or more messages from the queue by ID."""
   59         try:
   60             self._validate.message_listing(limit=len(ids))
   61             messages = self._message_controller.bulk_get(
   62                 queue_name,
   63                 message_ids=ids,
   64                 project=project_id)
   65 
   66         except validation.ValidationFailed as ex:
   67             LOG.debug(ex)
   68             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
   69 
   70         except Exception as ex:
   71             LOG.exception(ex)
   72             description = _(u'Message could not be retrieved.')
   73             raise wsgi_errors.HTTPServiceUnavailable(description)
   74 
   75         # Prepare response
   76         messages = list(messages)
   77         if not messages:
   78             return None
   79 
   80         messages = [wsgi_utils.format_message_v1_1(m, base_path, m['claim_id'])
   81                     for m in messages]
   82 
   83         return {'messages': messages}
   84 
   85     def _get(self, req, project_id, queue_name):
   86         client_uuid = wsgi_helpers.get_client_uuid(req)
   87         kwargs = {}
   88 
   89         # NOTE(kgriffs): This syntax ensures that
   90         # we don't clobber default values with None.
   91         req.get_param('marker', store=kwargs)
   92         req.get_param_as_int('limit', store=kwargs)
   93         req.get_param_as_bool('echo', store=kwargs)
   94         req.get_param_as_bool('include_claimed', store=kwargs)
   95         req.get_param_as_bool('include_delayed', store=kwargs)
   96 
   97         try:
   98             queue_meta = {}
   99             try:
  100                 # NOTE(cdyangzhenyu): In order to determine whether the
  101                 # queue has a delay attribute, the metadata of the queue
  102                 # is obtained here. This may have a little performance impact.
  103                 # So maybe a refactor is needed in the future.
  104                 queue_meta = self._queue_controller.get_metadata(queue_name,
  105                                                                  project_id)
  106             except storage_errors.DoesNotExist as ex:
  107                 LOG.exception(ex)
  108             queue_delay = queue_meta.get('_default_message_delay')
  109             if not queue_delay:
  110                 # NOTE(cdyangzhenyu): If the queue without the metadata
  111                 # attribute _default_message_delay, we don't filter
  112                 # for delay messages.
  113                 kwargs['include_delayed'] = True
  114 
  115             self._validate.message_listing(**kwargs)
  116             results = self._message_controller.list(
  117                 queue_name,
  118                 project=project_id,
  119                 client_uuid=client_uuid,
  120                 **kwargs)
  121 
  122             # Buffer messages
  123             cursor = next(results)
  124             messages = list(cursor)
  125 
  126         except validation.ValidationFailed as ex:
  127             LOG.debug(ex)
  128             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  129 
  130         except storage_errors.QueueDoesNotExist as ex:
  131             LOG.debug(ex)
  132             messages = None
  133 
  134         except Exception as ex:
  135             LOG.exception(ex)
  136             description = _(u'Messages could not be listed.')
  137             raise wsgi_errors.HTTPServiceUnavailable(description)
  138 
  139         if not messages:
  140             messages = []
  141 
  142         else:
  143             # Found some messages, so prepare the response
  144             kwargs['marker'] = next(results)
  145             base_path = req.path.rsplit('/', 1)[0]
  146             messages = [wsgi_utils.format_message_v1_1(m, base_path,
  147                                                        m['claim_id'])
  148                         for m in messages]
  149 
  150         links = []
  151         if messages:
  152             links = [
  153                 {
  154                     'rel': 'next',
  155                     'href': req.path + falcon.to_query_str(kwargs)
  156                 }
  157             ]
  158 
  159         return {
  160             'messages': messages,
  161             'links': links
  162         }
  163 
  164     # ----------------------------------------------------------------------
  165     # Interface
  166     # ----------------------------------------------------------------------
  167 
  168     @decorators.TransportLog("Messages collection")
  169     @acl.enforce("messages:create")
  170     def on_post(self, req, resp, project_id, queue_name):
  171         client_uuid = wsgi_helpers.get_client_uuid(req)
  172         try:
  173             # NOTE(flwang): Replace 'exists' with 'get_metadata' won't impact
  174             # the performance since both of them will call
  175             # collection.find_one()
  176             queue_meta = None
  177             try:
  178                 queue_meta = self._queue_controller.get_metadata(queue_name,
  179                                                                  project_id)
  180             except storage_errors.DoesNotExist as ex:
  181                 self._validate.queue_identification(queue_name, project_id)
  182                 self._queue_controller.create(queue_name, project=project_id)
  183                 # NOTE(flwang): Queue is created in lazy mode, so no metadata
  184                 # set.
  185                 queue_meta = {}
  186 
  187             queue_max_msg_size = queue_meta.get('_max_messages_post_size')
  188             queue_default_ttl = queue_meta.get('_default_message_ttl')
  189             queue_delay = queue_meta.get('_default_message_delay')
  190 
  191             if queue_default_ttl:
  192                 message_post_spec = (('ttl', int, queue_default_ttl),
  193                                      ('body', '*', None),)
  194             else:
  195                 message_post_spec = (('ttl', int, self._default_message_ttl),
  196                                      ('body', '*', None),)
  197             if queue_delay:
  198                 message_post_spec += (('delay', int, queue_delay),)
  199             # Place JSON size restriction before parsing
  200             self._validate.message_length(req.content_length,
  201                                           max_msg_post_size=queue_max_msg_size)
  202         except validation.ValidationFailed as ex:
  203             LOG.debug(ex)
  204             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  205 
  206         # Deserialize and validate the incoming messages
  207         document = wsgi_utils.deserialize(req.stream, req.content_length)
  208 
  209         if 'messages' not in document:
  210             description = _(u'No messages were found in the request body.')
  211             raise wsgi_errors.HTTPBadRequestAPI(description)
  212 
  213         messages = wsgi_utils.sanitize(document['messages'],
  214                                        message_post_spec,
  215                                        doctype=wsgi_utils.JSONArray)
  216 
  217         try:
  218             self._validate.message_posting(messages)
  219 
  220             message_ids = self._message_controller.post(
  221                 queue_name,
  222                 messages=messages,
  223                 project=project_id,
  224                 client_uuid=client_uuid)
  225 
  226         except validation.ValidationFailed as ex:
  227             LOG.debug(ex)
  228             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  229 
  230         except storage_errors.DoesNotExist as ex:
  231             LOG.debug(ex)
  232             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
  233 
  234         except storage_errors.MessageConflict as ex:
  235             LOG.exception(ex)
  236             description = _(u'No messages could be enqueued.')
  237             raise wsgi_errors.HTTPServiceUnavailable(description)
  238 
  239         except Exception as ex:
  240             LOG.exception(ex)
  241             description = _(u'Messages could not be enqueued.')
  242             raise wsgi_errors.HTTPServiceUnavailable(description)
  243 
  244         # Prepare the response
  245         ids_value = ','.join(message_ids)
  246         resp.location = req.path + '?ids=' + ids_value
  247 
  248         hrefs = [req.path + '/' + id for id in message_ids]
  249         body = {'resources': hrefs}
  250         resp.body = utils.to_json(body)
  251         resp.status = falcon.HTTP_201
  252 
  253     @decorators.TransportLog("Messages collection")
  254     @acl.enforce("messages:get_all")
  255     def on_get(self, req, resp, project_id, queue_name):
  256         ids = req.get_param_as_list('ids')
  257 
  258         if ids is None:
  259             response = self._get(req, project_id, queue_name)
  260 
  261         else:
  262             response = self._get_by_id(req.path.rsplit('/', 1)[0], project_id,
  263                                        queue_name, ids)
  264 
  265         if response is None:
  266             # NOTE(TheSriram): Trying to get a message by id, should
  267             # return the message if its present, otherwise a 404 since
  268             # the message might have been deleted.
  269             msg = _(u'No messages with IDs: {ids} found in the queue {queue} '
  270                     u'for project {project}.')
  271             description = msg.format(queue=queue_name, project=project_id,
  272                                      ids=ids)
  273             raise wsgi_errors.HTTPNotFound(description)
  274 
  275         else:
  276             resp.body = utils.to_json(response)
  277         # status defaults to 200
  278 
  279     @decorators.TransportLog("Messages collection")
  280     @acl.enforce("messages:delete_all")
  281     def on_delete(self, req, resp, project_id, queue_name):
  282         ids = req.get_param_as_list('ids')
  283         pop_limit = req.get_param_as_int('pop')
  284         try:
  285             self._validate.message_deletion(ids, pop_limit)
  286 
  287         except validation.ValidationFailed as ex:
  288             LOG.debug(ex)
  289             raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
  290 
  291         if ids:
  292             resp.status = self._delete_messages_by_id(queue_name, ids,
  293                                                       project_id)
  294 
  295         elif pop_limit:
  296             resp.status, resp.body = self._pop_messages(queue_name,
  297                                                         project_id,
  298                                                         pop_limit)
  299 
  300     def _delete_messages_by_id(self, queue_name, ids, project_id):
  301         try:
  302             self._message_controller.bulk_delete(
  303                 queue_name,
  304                 message_ids=ids,
  305                 project=project_id)
  306 
  307         except Exception as ex:
  308             LOG.exception(ex)
  309             description = _(u'Messages could not be deleted.')
  310             raise wsgi_errors.HTTPServiceUnavailable(description)
  311 
  312         return falcon.HTTP_204
  313 
  314     def _pop_messages(self, queue_name, project_id, pop_limit):
  315         try:
  316             LOG.debug(u'POP messages - queue: %(queue)s, '
  317                       u'project: %(project)s',
  318                       {'queue': queue_name, 'project': project_id})
  319 
  320             messages = self._message_controller.pop(
  321                 queue_name,
  322                 project=project_id,
  323                 limit=pop_limit)
  324 
  325         except Exception as ex:
  326             LOG.exception(ex)
  327             description = _(u'Messages could not be popped.')
  328             raise wsgi_errors.HTTPServiceUnavailable(description)
  329 
  330         # Prepare response
  331         if not messages:
  332             messages = []
  333         body = {'messages': messages}
  334         body = utils.to_json(body)
  335 
  336         return falcon.HTTP_200, body
  337 
  338 
  339 class ItemResource(object):
  340 
  341     __slots__ = '_message_controller'
  342 
  343     def __init__(self, message_controller):
  344         self._message_controller = message_controller
  345 
  346     @decorators.TransportLog("Messages item")
  347     @acl.enforce("messages:get")
  348     def on_get(self, req, resp, project_id, queue_name, message_id):
  349         try:
  350             message = self._message_controller.get(
  351                 queue_name,
  352                 message_id,
  353                 project=project_id)
  354 
  355         except storage_errors.DoesNotExist as ex:
  356             LOG.debug(ex)
  357             raise wsgi_errors.HTTPNotFound(six.text_type(ex))
  358 
  359         except Exception as ex:
  360             LOG.exception(ex)
  361             description = _(u'Message could not be retrieved.')
  362             raise wsgi_errors.HTTPServiceUnavailable(description)
  363 
  364         # Prepare response
  365         message['href'] = req.path
  366         message = wsgi_utils.format_message_v1_1(message,
  367                                                  req.path.rsplit('/', 2)[0],
  368                                                  message['claim_id'])
  369 
  370         resp.body = utils.to_json(message)
  371         # status defaults to 200
  372 
  373     @decorators.TransportLog("Messages item")
  374     @acl.enforce("messages:delete")
  375     def on_delete(self, req, resp, project_id, queue_name, message_id):
  376         error_title = _(u'Unable to delete')
  377 
  378         try:
  379             self._message_controller.delete(
  380                 queue_name,
  381                 message_id=message_id,
  382                 project=project_id,
  383                 claim=req.get_param('claim_id'))
  384 
  385         except storage_errors.MessageNotClaimed as ex:
  386             LOG.debug(ex)
  387             description = _(u'A claim was specified, but the message '
  388                             u'is not currently claimed.')
  389             raise falcon.HTTPBadRequest(error_title, description)
  390 
  391         except storage_errors.ClaimDoesNotExist as ex:
  392             LOG.debug(ex)
  393             description = _(u'The specified claim does not exist or '
  394                             u'has expired.')
  395             raise falcon.HTTPBadRequest(error_title, description)
  396 
  397         except storage_errors.NotPermitted as ex:
  398             LOG.debug(ex)
  399             description = _(u'This message is claimed; it cannot be '
  400                             u'deleted without a valid claim ID.')
  401             raise falcon.HTTPForbidden(error_title, description)
  402 
  403         except Exception as ex:
  404             LOG.exception(ex)
  405             description = _(u'Message could not be deleted.')
  406             raise wsgi_errors.HTTPServiceUnavailable(description)
  407 
  408         # Alles guete
  409         resp.status = falcon.HTTP_204