"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/api/v2/endpoints.py" (13 May 2020, 37941 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "endpoints.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2015 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
    4 # use this file except in compliance with the License.  You may obtain a copy
    5 # of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations under
   13 # the License.
   14 
   15 from stevedore import driver
   16 
   17 from oslo_log import log as logging
   18 from oslo_utils import netutils
   19 
   20 from zaqar.common.api import errors as api_errors
   21 from zaqar.common.api import response
   22 from zaqar.common.api import utils as api_utils
   23 from zaqar.i18n import _
   24 from zaqar.storage import errors as storage_errors
   25 from zaqar.transport import validation
   26 
   27 LOG = logging.getLogger(__name__)
   28 
   29 
   30 class Endpoints(object):
   31     """v2 API Endpoints."""
   32 
   33     def __init__(self, storage, control, validate, defaults):
   34         self._queue_controller = storage.queue_controller
   35         self._message_controller = storage.message_controller
   36         self._claim_controller = storage.claim_controller
   37         self._subscription_controller = storage.subscription_controller
   38 
   39         self._pools_controller = control.pools_controller
   40         self._flavors_controller = control.flavors_controller
   41 
   42         self._validate = validate
   43 
   44         self._defaults = defaults
   45         self._subscription_url = None
   46 
   47     # Queues
   48     @api_utils.on_exception_sends_500
   49     def queue_list(self, req):
   50         """Gets a list of queues
   51 
   52         :param req: Request instance ready to be sent.
   53         :type req: `api.common.Request`
   54         :return: resp: Response instance
   55         :type: resp: `api.common.Response`
   56         """
   57         project_id = req._headers.get('X-Project-ID')
   58 
   59         LOG.debug(u'Queue list - project: %(project)s',
   60                   {'project': project_id})
   61 
   62         try:
   63             kwargs = api_utils.get_headers(req)
   64 
   65             self._validate.queue_listing(**kwargs)
   66             results = self._queue_controller.list(
   67                 project=project_id, **kwargs)
   68             # Buffer list of queues. Can raise NoPoolFound error.
   69             queues = list(next(results))
   70         except (ValueError, validation.ValidationFailed) as ex:
   71             LOG.debug(ex)
   72             headers = {'status': 400}
   73             return api_utils.error_response(req, ex, headers)
   74         except storage_errors.ExceptionBase as ex:
   75             error = 'Queues could not be listed.'
   76             headers = {'status': 503}
   77             LOG.exception(error)
   78             return api_utils.error_response(req, ex, headers, error)
   79 
   80         # Got some. Prepare the response.
   81         body = {'queues': queues}
   82         headers = {'status': 200}
   83 
   84         return response.Response(req, body, headers)
   85 
   86     @api_utils.on_exception_sends_500
   87     def queue_create(self, req):
   88         """Creates a queue
   89 
   90         :param req: Request instance ready to be sent.
   91         :type req: `api.common.Request`
   92         :return: resp: Response instance
   93         :type: resp: `api.common.Response`
   94         """
   95         project_id = req._headers.get('X-Project-ID')
   96         queue_name = req._body.get('queue_name')
   97         metadata = req._body.get('metadata', {})
   98 
   99         LOG.debug(u'Queue create - queue: %(queue)s, project: %(project)s',
  100                   {'queue': queue_name,
  101                    'project': project_id})
  102 
  103         try:
  104             self._validate.queue_identification(queue_name, project_id)
  105             self._validate.queue_metadata_length(len(str(metadata)))
  106             self._validate.queue_metadata_putting(metadata)
  107             created = self._queue_controller.create(queue_name,
  108                                                     metadata=metadata,
  109                                                     project=project_id)
  110         except validation.ValidationFailed as ex:
  111             LOG.debug(ex)
  112             headers = {'status': 400}
  113             return api_utils.error_response(req, ex, headers)
  114         except storage_errors.ExceptionBase as ex:
  115             error = _('Queue %s could not be created.') % queue_name
  116             headers = {'status': 503}
  117             LOG.exception(error)
  118             return api_utils.error_response(req, ex, headers, error)
  119         else:
  120             body = _('Queue %s created.') % queue_name
  121             headers = {'status': 201} if created else {'status': 204}
  122             return response.Response(req, body, headers)
  123 
  124     @api_utils.on_exception_sends_500
  125     def queue_delete(self, req):
  126         """Deletes a queue
  127 
  128         :param req: Request instance ready to be sent.
  129         :type req: `api.common.Request`
  130         :return: resp: Response instance
  131         :type: resp: `api.common.Response`
  132         """
  133         project_id = req._headers.get('X-Project-ID')
  134         queue_name = req._body.get('queue_name')
  135 
  136         LOG.debug(u'Queue delete - queue: %(queue)s, project: %(project)s',
  137                   {'queue': queue_name, 'project': project_id})
  138         try:
  139             self._queue_controller.delete(queue_name, project=project_id)
  140         except storage_errors.ExceptionBase as ex:
  141             error = _('Queue %s could not be deleted.') % queue_name
  142             headers = {'status': 503}
  143             LOG.exception(error)
  144             return api_utils.error_response(req, ex, headers, error)
  145         else:
  146             body = _('Queue %s removed.') % queue_name
  147             headers = {'status': 204}
  148             return response.Response(req, body, headers)
  149 
  150     @api_utils.on_exception_sends_500
  151     def queue_get(self, req):
  152         """Gets a queue
  153 
  154         :param req: Request instance ready to be sent.
  155         :type req: `api.common.Request`
  156         :return: resp: Response instance
  157         :type: resp: `api.common.Response`
  158         """
  159         project_id = req._headers.get('X-Project-ID')
  160         queue_name = req._body.get('queue_name')
  161 
  162         LOG.debug(u'Queue get - queue: %(queue)s, '
  163                   u'project: %(project)s',
  164                   {'queue': queue_name, 'project': project_id})
  165 
  166         try:
  167             resp_dict = self._queue_controller.get(queue_name,
  168                                                    project=project_id)
  169         except storage_errors.DoesNotExist as ex:
  170             LOG.debug(ex)
  171             error = _('Queue %s does not exist.') % queue_name
  172             headers = {'status': 404}
  173             return api_utils.error_response(req, ex, headers, error)
  174         except storage_errors.ExceptionBase as ex:
  175             headers = {'status': 503}
  176             error = _('Cannot retrieve queue %s.') % queue_name
  177             LOG.exception(error)
  178             return api_utils.error_response(req, ex, headers, error)
  179         else:
  180             body = resp_dict
  181             headers = {'status': 200}
  182             return response.Response(req, body, headers)
  183 
  184     @api_utils.on_exception_sends_500
  185     def queue_get_stats(self, req):
  186         """Gets queue stats
  187 
  188         :param req: Request instance ready to be sent.
  189         :type req: `api.common.Request`
  190         :return: resp: Response instance
  191         :type: resp: `api.common.Response`
  192         """
  193         project_id = req._headers.get('X-Project-ID')
  194         queue_name = req._body.get('queue_name')
  195 
  196         LOG.debug(u'Get queue stats - queue: %(queue)s, '
  197                   u'project: %(project)s',
  198                   {'queue': queue_name, 'project': project_id})
  199 
  200         try:
  201             resp_dict = self._queue_controller.stats(queue_name,
  202                                                      project=project_id)
  203             body = resp_dict
  204         except storage_errors.QueueDoesNotExist:
  205             LOG.exception('Queue "%s" does not exist', queue_name)
  206             resp_dict = {
  207                 'messages': {
  208                     'claimed': 0,
  209                     'free': 0,
  210                     'total': 0
  211                 }
  212             }
  213             body = resp_dict
  214             headers = {'status': 404}
  215             return response.Response(req, body, headers)
  216         except storage_errors.ExceptionBase as ex:
  217             error = _('Cannot retrieve queue %s stats.') % queue_name
  218             headers = {'status': 503}
  219             LOG.exception(error)
  220             return api_utils.error_response(req, ex, headers, error)
  221         else:
  222             headers = {'status': 200}
  223             return response.Response(req, body, headers)
  224 
  225     @api_utils.on_exception_sends_500
  226     def queue_purge(self, req):
  227         """Purge queue
  228 
  229         :param req: Request instance ready to be sent.
  230         :type req: `api.common.Request`
  231         :return: resp: Response instance
  232         :type: resp: `api.common.Response`
  233         """
  234         project_id = req._headers.get('X-Project-ID')
  235         queue_name = req._body.get('queue_name')
  236         resource_types = req._body.get('resource_types', ["messages",
  237                                                           "subscriptions"])
  238 
  239         LOG.debug(u'Purge queue - queue: %(queue)s, '
  240                   u'project: %(project)s',
  241                   {'queue': queue_name, 'project': project_id})
  242 
  243         try:
  244             pop_limit = 100
  245             if "messages" in resource_types:
  246                 LOG.debug("Purge all messages under queue %s", queue_name)
  247                 resp = self._pop_messages(req, queue_name,
  248                                           project_id, pop_limit)
  249                 while resp.get_response()['body']['messages']:
  250                     resp = self._pop_messages(req, queue_name,
  251                                               project_id, pop_limit)
  252 
  253             if "subscriptions" in resource_types:
  254                 LOG.debug("Purge all subscriptions under queue %s",
  255                           queue_name)
  256                 resp = self._subscription_controller.list(queue_name,
  257                                                           project=project_id)
  258                 subscriptions = list(next(resp))
  259                 for sub in subscriptions:
  260                     self._subscription_controller.delete(queue_name,
  261                                                          sub['id'],
  262                                                          project=project_id)
  263 
  264         except storage_errors.QueueDoesNotExist as ex:
  265             LOG.exception('Queue "%s" does not exist', queue_name)
  266             headers = {'status': 404}
  267             return api_utils.error_response(req, ex, headers)
  268         except storage_errors.ExceptionBase as ex:
  269             LOG.exception('Error deleting queue "%s".', queue_name)
  270             headers = {'status': 503}
  271             return api_utils.error_response(req, ex, headers)
  272         else:
  273             headers = {'status': 204}
  274             return response.Response(req, {}, headers)
  275 
  276     # Messages
  277     @api_utils.on_exception_sends_500
  278     def message_list(self, req):
  279         """Gets a list of messages on a queue
  280 
  281         :param req: Request instance ready to be sent.
  282         :type req: `api.common.Request`
  283         :return: resp: Response instance
  284         :type: resp: `api.common.Response`
  285         """
  286         project_id = req._headers.get('X-Project-ID')
  287         queue_name = req._body.get('queue_name')
  288 
  289         LOG.debug(u'Message list - queue: %(queue)s, '
  290                   u'project: %(project)s',
  291                   {'queue': queue_name, 'project': project_id})
  292 
  293         try:
  294             kwargs = api_utils.get_headers(req)
  295 
  296             self._validate.client_id_uuid_safe(req._headers.get('Client-ID'))
  297             client_uuid = api_utils.get_client_uuid(req)
  298 
  299             self._validate.message_listing(**kwargs)
  300             results = self._message_controller.list(
  301                 queue_name,
  302                 project=project_id,
  303                 client_uuid=client_uuid,
  304                 **kwargs)
  305 
  306             # Buffer messages
  307             cursor = next(results)
  308             messages = list(cursor)
  309         except (ValueError, api_errors.BadRequest,
  310                 validation.ValidationFailed) as ex:
  311             LOG.debug(ex)
  312             headers = {'status': 400}
  313             return api_utils.error_response(req, ex, headers)
  314         except storage_errors.DoesNotExist as ex:
  315             LOG.debug(ex)
  316             headers = {'status': 404}
  317             return api_utils.error_response(req, ex, headers)
  318 
  319         if messages:
  320             # Found some messages, so prepare the response
  321             kwargs['marker'] = next(results)
  322             messages = [api_utils.format_message(message)
  323                         for message in messages]
  324 
  325         headers = {'status': 200}
  326         body = {'messages': messages}
  327 
  328         return response.Response(req, body, headers)
  329 
  330     @api_utils.on_exception_sends_500
  331     def message_get(self, req):
  332         """Gets a message from a queue
  333 
  334         :param req: Request instance ready to be sent.
  335         :type req: `api.common.Request`
  336         :return: resp: Response instance
  337         :type: resp: `api.common.Response`
  338         """
  339         project_id = req._headers.get('X-Project-ID')
  340         queue_name = req._body.get('queue_name')
  341         message_id = req._body.get('message_id')
  342 
  343         LOG.debug(u'Message get - message: %(message)s, '
  344                   u'queue: %(queue)s, project: %(project)s',
  345                   {'message': message_id,
  346                    'queue': queue_name,
  347                    'project': project_id})
  348         try:
  349             message = self._message_controller.get(
  350                 queue_name,
  351                 message_id,
  352                 project=project_id)
  353 
  354         except storage_errors.DoesNotExist as ex:
  355             LOG.debug(ex)
  356             headers = {'status': 404}
  357             return api_utils.error_response(req, ex, headers)
  358 
  359         # Prepare response
  360         message = api_utils.format_message(message)
  361 
  362         headers = {'status': 200}
  363         body = {'messages': message}
  364 
  365         return response.Response(req, body, headers)
  366 
  367     @api_utils.on_exception_sends_500
  368     def message_get_many(self, req):
  369         """Gets a set of messages from a queue
  370 
  371         :param req: Request instance ready to be sent.
  372         :type req: `api.common.Request`
  373         :return: resp: Response instance
  374         :type: resp: `api.common.Response`
  375         """
  376         project_id = req._headers.get('X-Project-ID')
  377         queue_name = req._body.get('queue_name')
  378         message_ids = list(req._body.get('message_ids'))
  379 
  380         LOG.debug(u'Message get - queue: %(queue)s, '
  381                   u'project: %(project)s',
  382                   {'queue': queue_name, 'project': project_id})
  383 
  384         try:
  385             self._validate.message_listing(limit=len(message_ids))
  386             messages = self._message_controller.bulk_get(
  387                 queue_name,
  388                 message_ids=message_ids,
  389                 project=project_id)
  390         except validation.ValidationFailed as ex:
  391             LOG.debug(ex)
  392             headers = {'status': 400}
  393             return api_utils.error_response(req, ex, headers)
  394 
  395         # Prepare response
  396         messages = list(messages)
  397         messages = [api_utils.format_message(message)
  398                     for message in messages]
  399 
  400         headers = {'status': 200}
  401         body = {'messages': messages}
  402 
  403         return response.Response(req, body, headers)
  404 
  405     @api_utils.on_exception_sends_500
  406     def message_post(self, req):
  407         """Post a set of messages to a queue
  408 
  409         :param req: Request instance ready to be sent.
  410         :type req: `api.common.Request`
  411         :return: resp: Response instance
  412         :type: resp: `api.common.Response`
  413         """
  414         project_id = req._headers.get('X-Project-ID')
  415         queue_name = req._body.get('queue_name')
  416 
  417         LOG.debug(u'Messages post - queue:  %(queue)s, '
  418                   u'project: %(project)s',
  419                   {'queue': queue_name, 'project': project_id})
  420 
  421         messages = req._body.get('messages')
  422 
  423         if messages is None:
  424             ex = _(u'Invalid request.')
  425             error = _(u'No messages were found in the request body.')
  426             headers = {'status': 400}
  427             return api_utils.error_response(req, ex, headers, error)
  428 
  429         try:
  430             # NOTE(flwang): Replace 'exists' with 'get_metadata' won't impact
  431             # the performance since both of them will call
  432             # collection.find_one()
  433             queue_meta = None
  434             try:
  435                 queue_meta = self._queue_controller.get_metadata(queue_name,
  436                                                                  project_id)
  437             except storage_errors.DoesNotExist as ex:
  438                 self._validate.queue_identification(queue_name, project_id)
  439                 self._queue_controller.create(queue_name, project=project_id)
  440                 # NOTE(flwang): Queue is created in lazy mode, so no metadata
  441                 # set.
  442                 queue_meta = {}
  443 
  444             queue_max_msg_size = queue_meta.get('_max_messages_post_size',
  445                                                 None)
  446             queue_default_ttl = queue_meta.get('_default_message_ttl')
  447 
  448             if queue_default_ttl:
  449                 _message_post_spec = (('ttl', int, queue_default_ttl),
  450                                       ('body', '*', None),)
  451             else:
  452                 _message_post_spec = (('ttl', int, self._defaults.message_ttl),
  453                                       ('body', '*', None),)
  454             # Place JSON size restriction before parsing
  455             self._validate.message_length(len(str(messages)),
  456                                           max_msg_post_size=queue_max_msg_size)
  457         except validation.ValidationFailed as ex:
  458             LOG.debug(ex)
  459             headers = {'status': 400}
  460             return api_utils.error_response(req, ex, headers)
  461 
  462         try:
  463             messages = api_utils.sanitize(messages,
  464                                           _message_post_spec,
  465                                           doctype=list)
  466         except api_errors.BadRequest as ex:
  467             LOG.debug(ex)
  468             headers = {'status': 400}
  469             return api_utils.error_response(req, ex, headers)
  470 
  471         try:
  472             self._validate.client_id_uuid_safe(req._headers.get('Client-ID'))
  473             client_uuid = api_utils.get_client_uuid(req)
  474 
  475             self._validate.message_posting(messages)
  476 
  477             message_ids = self._message_controller.post(
  478                 queue_name,
  479                 messages=messages,
  480                 project=project_id,
  481                 client_uuid=client_uuid)
  482         except (ValueError, api_errors.BadRequest,
  483                 validation.ValidationFailed) as ex:
  484             LOG.debug(ex)
  485             headers = {'status': 400}
  486             return api_utils.error_response(req, ex, headers)
  487         except storage_errors.DoesNotExist as ex:
  488             LOG.debug(ex)
  489             headers = {'status': 404}
  490             return api_utils.error_response(req, ex, headers)
  491         except storage_errors.MessageConflict as ex:
  492             error = _(u'No messages could be enqueued.')
  493             headers = {'status': 500}
  494             LOG.exception(error)
  495             return api_utils.error_response(req, ex, headers, error)
  496 
  497         # Prepare the response
  498         headers = {'status': 201}
  499         body = {'message_ids': message_ids}
  500 
  501         return response.Response(req, body, headers)
  502 
  503     @api_utils.on_exception_sends_500
  504     def message_delete(self, req):
  505         """Delete a message from a queue
  506 
  507         :param req: Request instance ready to be sent.
  508         :type req: `api.common.Request`
  509         :return: resp: Response instance
  510         :type: resp: `api.common.Response`
  511         """
  512         project_id = req._headers.get('X-Project-ID')
  513         queue_name = req._body.get('queue_name')
  514         message_id = req._body.get('message_id')
  515 
  516         LOG.debug(u'Messages item DELETE - message: %(message)s, '
  517                   u'queue: %(queue)s, project: %(project)s',
  518                   {'message': message_id,
  519                    'queue': queue_name,
  520                    'project': project_id})
  521 
  522         claim_id = req._body.get('claim_id')
  523 
  524         try:
  525             self._message_controller.delete(
  526                 queue_name,
  527                 message_id=message_id,
  528                 project=project_id,
  529                 claim=claim_id)
  530         except storage_errors.MessageNotClaimed as ex:
  531             LOG.debug(ex)
  532             error = _(u'A claim was specified, but the message '
  533                       u'is not currently claimed.')
  534             headers = {'status': 400}
  535             return api_utils.error_response(req, ex, headers, error)
  536         except storage_errors.ClaimDoesNotExist as ex:
  537             LOG.debug(ex)
  538             error = _(u'The specified claim does not exist or '
  539                       u'has expired.')
  540             headers = {'status': 400}
  541             return api_utils.error_response(req, ex, headers, error)
  542         except storage_errors.NotPermitted as ex:
  543             LOG.debug(ex)
  544             error = _(u'This message is claimed; it cannot be '
  545                       u'deleted without a valid claim ID.')
  546             headers = {'status': 403}
  547             return api_utils.error_response(req, ex, headers, error)
  548 
  549         headers = {'status': 204}
  550         body = {}
  551 
  552         return response.Response(req, body, headers)
  553 
  554     @api_utils.on_exception_sends_500
  555     def message_delete_many(self, req):
  556         """Deletes a set of messages from a queue
  557 
  558         :param req: Request instance ready to be sent.
  559         :type req: `api.common.Request`
  560         :return: resp: Response instance
  561         :type: resp: `api.common.Response`
  562         """
  563         project_id = req._headers.get('X-Project-ID')
  564         queue_name = req._body.get('queue_name')
  565         message_ids = req._body.get('message_ids')
  566         claim_ids = None
  567         if self._validate.get_limit_conf_value('message_delete_with_claim_id'):
  568             claim_ids = req._body.get('claim_ids')
  569         pop_limit = req._body.get('pop')
  570 
  571         LOG.debug(u'Messages collection DELETE - queue: %(queue)s,'
  572                   u'project: %(project)s, messages: %(message_ids)s',
  573                   {'queue': queue_name, 'project': project_id,
  574                    'message_ids': message_ids})
  575 
  576         try:
  577             self._validate.message_deletion(message_ids, pop_limit, claim_ids)
  578 
  579         except validation.ValidationFailed as ex:
  580             LOG.debug(ex)
  581             headers = {'status': 400}
  582             return api_utils.error_response(req, ex, headers)
  583 
  584         if message_ids:
  585             return self._delete_messages_by_id(req, queue_name, message_ids,
  586                                                project_id, claim_ids)
  587         elif pop_limit:
  588             return self._pop_messages(req, queue_name, project_id, pop_limit)
  589 
  590     @api_utils.on_exception_sends_500
  591     def _delete_messages_by_id(self, req, queue_name, ids, project_id,
  592                                claim_ids=None):
  593         self._message_controller.bulk_delete(queue_name, message_ids=ids,
  594                                              project=project_id,
  595                                              claim_ids=claim_ids)
  596 
  597         headers = {'status': 204}
  598         body = {}
  599 
  600         return response.Response(req, body, headers)
  601 
  602     @api_utils.on_exception_sends_500
  603     def _pop_messages(self, req, queue_name, project_id, pop_limit):
  604 
  605         LOG.debug(u'Pop messages - queue: %(queue)s, project: %(project)s',
  606                   {'queue': queue_name, 'project': project_id})
  607 
  608         messages = self._message_controller.pop(
  609             queue_name,
  610             project=project_id,
  611             limit=pop_limit)
  612 
  613         # Prepare response
  614         if not messages:
  615             messages = []
  616 
  617         headers = {'status': 200}
  618         body = {'messages': messages}
  619 
  620         return response.Response(req, body, headers)
  621 
  622     # Claims
  623     @api_utils.on_exception_sends_500
  624     def claim_create(self, req):
  625         """Creates a claim
  626 
  627         :param req: Request instance ready to be sent.
  628         :type req: `api.common.Request`
  629         :return: resp: Response instance
  630         :type: resp: `api.common.Response`
  631         """
  632         project_id = req._headers.get('X-Project-ID')
  633         queue_name = req._body.get('queue_name')
  634 
  635         LOG.debug(u'Claims create - queue: %(queue)s, '
  636                   u'project: %(project)s',
  637                   {'queue': queue_name, 'project': project_id})
  638 
  639         self._claim_post_spec = (
  640             ('ttl', int, self._defaults.claim_ttl),
  641             ('grace', int, self._defaults.claim_grace),
  642         )
  643 
  644         # Claim some messages
  645 
  646         # NOTE(vkmc): We build a dict with the ttl and grace
  647         # This is the metadata the storage is waiting for
  648         kwargs = api_utils.get_headers(req)
  649         # Read claim metadata (e.g., ttl) and raise appropriate
  650         # errors as needed.
  651         metadata = api_utils.sanitize(kwargs, self._claim_post_spec)
  652 
  653         limit = (None if kwargs.get('limit') is None
  654                  else kwargs.get('limit'))
  655 
  656         claim_options = {} if limit is None else {'limit': limit}
  657 
  658         try:
  659             self._validate.claim_creation(metadata, limit=limit)
  660         except (ValueError, validation.ValidationFailed) as ex:
  661             LOG.debug(ex)
  662             headers = {'status': 400}
  663             return api_utils.error_response(req, ex, headers)
  664 
  665         cid, msgs = self._claim_controller.create(
  666             queue_name,
  667             metadata=metadata,
  668             project=project_id,
  669             **claim_options)
  670 
  671         # Buffer claimed messages
  672         # TODO(vkmc): optimize, along with serialization (below)
  673         resp_msgs = list(msgs)
  674 
  675         # Serialize claimed messages, if any. This logic assumes
  676         # the storage driver returned well-formed messages.
  677         if len(resp_msgs) != 0:
  678             resp_msgs = [api_utils.format_message(msg, cid)
  679                          for msg in resp_msgs]
  680 
  681             headers = {'status': 201}
  682             body = {'claim_id': cid, 'messages': resp_msgs}
  683         else:
  684             headers = {'status': 204}
  685             body = {'claim_id': cid}
  686 
  687         return response.Response(req, body, headers)
  688 
  689     @api_utils.on_exception_sends_500
  690     def claim_get(self, req):
  691         """Gets a claim
  692 
  693         :param req: Request instance ready to be sent.
  694         :type req: `api.common.Request`
  695         :return: resp: Response instance
  696         :type: resp: `api.common.Response`
  697         """
  698         project_id = req._headers.get('X-Project-ID')
  699         queue_name = req._body.get('queue_name')
  700         claim_id = req._body.get('claim_id')
  701 
  702         LOG.debug(u'Claim get - claim: %(claim_id)s, '
  703                   u'queue: %(queue_name)s, project: %(project_id)s',
  704                   {'queue_name': queue_name,
  705                    'project_id': project_id,
  706                    'claim_id': claim_id})
  707         try:
  708             meta, msgs = self._claim_controller.get(
  709                 queue_name,
  710                 claim_id=claim_id,
  711                 project=project_id)
  712 
  713             # Buffer claimed messages
  714             # TODO(vkmc): Optimize along with serialization (see below)
  715             meta['messages'] = list(msgs)
  716         except storage_errors.DoesNotExist as ex:
  717             LOG.debug(ex)
  718             error = _('Claim %s does not exist.') % claim_id
  719             headers = {'status': 404}
  720             return api_utils.error_response(req, ex, headers, error)
  721 
  722         # Serialize claimed messages
  723         # TODO(vkmc): Optimize
  724         meta['messages'] = [api_utils.format_message(msg, claim_id)
  725                             for msg in meta['messages']]
  726 
  727         del meta['id']
  728 
  729         headers = {'status': 200}
  730         body = meta
  731 
  732         return response.Response(req, body, headers)
  733 
  734     @api_utils.on_exception_sends_500
  735     def claim_update(self, req):
  736         """Updates a claim
  737 
  738         :param req: Request instance ready to be sent.
  739         :type req: `api.common.Request`
  740         :return: resp: Response instance
  741         :type: resp: `api.common.Response`
  742         """
  743         project_id = req._headers.get('X-Project-ID')
  744         queue_name = req._body.get('queue_name')
  745         claim_id = req._body.get('claim_id')
  746 
  747         LOG.debug(u'Claim update - claim: %(claim_id)s, '
  748                   u'queue: %(queue_name)s, project:%(project_id)s',
  749                   {'queue_name': queue_name,
  750                    'project_id': project_id,
  751                    'claim_id': claim_id})
  752 
  753         self._claim_patch_spec = (
  754             ('ttl', int, self._defaults.claim_ttl),
  755             ('grace', int, self._defaults.claim_grace),
  756         )
  757 
  758         # Read claim metadata (e.g., TTL) and raise appropriate
  759         # HTTP errors as needed.
  760         metadata = api_utils.sanitize(req._body, self._claim_patch_spec)
  761 
  762         try:
  763             self._validate.claim_updating(metadata)
  764             self._claim_controller.update(queue_name,
  765                                           claim_id=claim_id,
  766                                           metadata=metadata,
  767                                           project=project_id)
  768             headers = {'status': 204}
  769             body = _('Claim %s updated.') % claim_id
  770             return response.Response(req, body, headers)
  771         except validation.ValidationFailed as ex:
  772             LOG.debug(ex)
  773             headers = {'status': 400}
  774             return api_utils.error_response(req, ex, headers)
  775         except storage_errors.DoesNotExist as ex:
  776             LOG.debug(ex)
  777             error = _('Claim %s does not exist.') % claim_id
  778             headers = {'status': 404}
  779             return api_utils.error_response(req, ex, headers, error)
  780 
  781     @api_utils.on_exception_sends_500
  782     def claim_delete(self, req):
  783         """Deletes a claim
  784 
  785         :param req: Request instance ready to be sent.
  786         :type req: `api.common.Request`
  787         :return: resp: Response instance
  788         :type: resp: `api.common.Response`
  789         """
  790         project_id = req._headers.get('X-Project-ID')
  791         queue_name = req._body.get('queue_name')
  792         claim_id = req._body.get('claim_id')
  793 
  794         LOG.debug(u'Claim delete - claim: %(claim_id)s, '
  795                   u'queue: %(queue_name)s, project: %(project_id)s',
  796                   {'queue_name': queue_name,
  797                    'project_id': project_id,
  798                    'claim_id': claim_id})
  799 
  800         self._claim_controller.delete(queue_name,
  801                                       claim_id=claim_id,
  802                                       project=project_id)
  803 
  804         headers = {'status': 204}
  805         body = _('Claim %s deleted.') % claim_id
  806 
  807         return response.Response(req, body, headers)
  808 
  809     # Subscriptions
  810     @api_utils.on_exception_sends_500
  811     def subscription_list(self, req):
  812         """List all subscriptions for a queue.
  813 
  814         :param req: Request instance ready to be sent.
  815         :type req: `api.common.Request`
  816         :return: resp: Response instance
  817         :type: resp: `api.common.Response`
  818         """
  819         project_id = req._headers.get('X-Project-ID')
  820         queue_name = req._body.get('queue_name')
  821 
  822         LOG.debug(u'Subscription list - project: %(project)s',
  823                   {'project': project_id})
  824 
  825         try:
  826             kwargs = api_utils.get_headers(req)
  827 
  828             self._validate.subscription_listing(**kwargs)
  829             results = self._subscription_controller.list(
  830                 queue_name, project=project_id, **kwargs)
  831             # Buffer list of subscriptions. Can raise NoPoolFound error.
  832             subscriptions = list(next(results))
  833         except (ValueError, validation.ValidationFailed) as ex:
  834             LOG.debug(ex)
  835             headers = {'status': 400}
  836             return api_utils.error_response(req, ex, headers)
  837         except storage_errors.ExceptionBase as ex:
  838             error = 'Subscriptions could not be listed.'
  839             headers = {'status': 503}
  840             LOG.exception(error)
  841             return api_utils.error_response(req, ex, headers, error)
  842 
  843         # Got some. Prepare the response.
  844         body = {'subscriptions': subscriptions}
  845         headers = {'status': 200}
  846 
  847         return response.Response(req, body, headers)
  848 
  849     @api_utils.on_exception_sends_500
  850     def subscription_create(self, req, subscriber):
  851         """Create a subscription for a queue.
  852 
  853         :param req: Request instance ready to be sent.
  854         :type req: `api.common.Request`
  855         :return: resp: Response instance
  856         :type: resp: `api.common.Response`
  857         """
  858         project_id = req._headers.get('X-Project-ID')
  859         queue_name = req._body.get('queue_name')
  860         options = req._body.get('options', {})
  861         ttl = req._body.get('ttl', self._defaults.subscription_ttl)
  862 
  863         LOG.debug(
  864             u'Subscription create - queue: %(queue)s, project: %(project)s',
  865             {'queue': queue_name,
  866              'project': project_id})
  867 
  868         try:
  869             url = netutils.urlsplit(subscriber)
  870             mgr = driver.DriverManager('zaqar.notification.tasks', url.scheme,
  871                                        invoke_on_load=True)
  872             req_data = req._env.copy()
  873             mgr.driver.register(subscriber, options, ttl, project_id, req_data)
  874 
  875             data = {'subscriber': subscriber,
  876                     'options': options,
  877                     'ttl': ttl}
  878             self._validate.subscription_posting(data)
  879             self._validate.queue_identification(queue_name, project_id)
  880             if not self._queue_controller.exists(queue_name, project_id):
  881                 self._queue_controller.create(queue_name, project=project_id)
  882             created = self._subscription_controller.create(queue_name,
  883                                                            subscriber,
  884                                                            data['ttl'],
  885                                                            data['options'],
  886                                                            project=project_id)
  887         except validation.ValidationFailed as ex:
  888             LOG.debug(ex)
  889             headers = {'status': 400}
  890             return api_utils.error_response(req, ex, headers)
  891         except storage_errors.ExceptionBase as ex:
  892             error = _('Subscription %s could not be created.') % queue_name
  893             headers = {'status': 503}
  894             LOG.exception(error)
  895             return api_utils.error_response(req, ex, headers, error)
  896         else:
  897             if created:
  898                 msg = _('Subscription %s created.') % queue_name
  899                 body = {'subscription_id': str(created), 'message': msg}
  900                 headers = {'status': 201}
  901             else:
  902                 body = _('Subscription %s not created.') % queue_name
  903                 headers = {'status': 409}
  904             return response.Response(req, body, headers)
  905 
  906     @api_utils.on_exception_sends_500
  907     def subscription_delete(self, req):
  908         """Delete a specific subscription by ID.
  909 
  910         :param req: Request instance ready to be sent.
  911         :type req: `api.common.Request`
  912         :return: resp: Response instance
  913         :type: resp: `api.common.Response`
  914         """
  915         project_id = req._headers.get('X-Project-ID')
  916         queue_name = req._body.get('queue_name')
  917         subscription_id = req._body.get('subscription_id')
  918 
  919         LOG.debug(
  920             u'Subscription delete - queue: %(queue)s, project: %(project)s',
  921             {'queue': queue_name, 'project': project_id})
  922         try:
  923             self._subscription_controller.delete(queue_name,
  924                                                  subscription_id,
  925                                                  project=project_id)
  926         except storage_errors.ExceptionBase as ex:
  927             error = _('Subscription %(subscription)s for queue %(queue)s '
  928                       'could not be deleted.') % {
  929                 'subscription': subscription_id, 'queue': queue_name}
  930             headers = {'status': 503}
  931             LOG.exception(error)
  932             return api_utils.error_response(req, ex, headers, error)
  933         else:
  934             body = _('Subscription %s removed.') % subscription_id
  935             headers = {'status': 204}
  936             return response.Response(req, body, headers)
  937 
  938     @api_utils.on_exception_sends_500
  939     def subscription_get(self, req):
  940         """Retrieve details about an existing subscription.
  941 
  942         :param req: Request instance ready to be sent.
  943         :type req: `api.common.Request`
  944         :return: resp: Response instance
  945         :type: resp: `api.common.Response`
  946         """
  947         project_id = req._headers.get('X-Project-ID')
  948         queue_name = req._body.get('queue_name')
  949         subscription_id = req._body.get('subscription_id')
  950 
  951         LOG.debug(u'Subscription get - queue: %(queue)s, '
  952                   u'project: %(project)s',
  953                   {'queue': queue_name, 'project': project_id})
  954 
  955         try:
  956             resp_dict = self._subscription_controller.get(queue_name,
  957                                                           subscription_id,
  958                                                           project=project_id)
  959         except storage_errors.DoesNotExist as ex:
  960             LOG.debug(ex)
  961             error = _('Subscription %(subscription)s for queue %(queue)s '
  962                       'does not exist.') % {
  963                 'subscription': subscription_id, 'queue': queue_name}
  964             headers = {'status': 404}
  965             return api_utils.error_response(req, ex, headers, error)
  966         except storage_errors.ExceptionBase as ex:
  967             headers = {'status': 503}
  968             error = _('Cannot retrieve subscription %s.') % subscription_id
  969             LOG.exception(error)
  970             return api_utils.error_response(req, ex, headers, error)
  971         else:
  972             body = resp_dict
  973             headers = {'status': 200}
  974             return response.Response(req, body, headers)