"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/pooling.py" (13 May 2020, 34001 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "pooling.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Rackspace, Inc.
    2 # Copyright 2014 Catalyst IT Ltd
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
    5 # use this file except in compliance with the License.  You may obtain a copy
    6 # of the License at
    7 #
    8 #    http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations under
   14 # the License.
   15 
   16 import heapq
   17 import itertools
   18 
   19 from oslo_log import log
   20 from osprofiler import profiler
   21 
   22 from zaqar.common import decorators
   23 from zaqar.common import errors as cerrors
   24 from zaqar.common.storage import select
   25 from zaqar.conf import pooling_catalog
   26 from zaqar.i18n import _
   27 from zaqar import storage
   28 from zaqar.storage import errors
   29 from zaqar.storage import pipeline
   30 from zaqar.storage import utils
   31 
   32 LOG = log.getLogger(__name__)
   33 
   34 
   35 # NOTE(kgriffs): E.g.: 'zaqar-pooling:5083853/my-queue'
   36 _POOL_CACHE_PREFIX = 'pooling:'
   37 
   38 # TODO(kgriffs): If a queue is migrated, everyone's
   39 # caches need to have the relevant entry invalidated
   40 # before "unfreezing" the queue, rather than waiting
   41 # on the TTL.
   42 #
   43 # TODO(kgriffs): Make configurable?
   44 _POOL_CACHE_TTL = 10
   45 
   46 
   47 def _pool_cache_key(queue, project=None):
   48     # NOTE(kgriffs): Use string concatenation for performance,
   49     # also put project first since it is guaranteed to be
   50     # unique, which should reduce lookup time.
   51     return _POOL_CACHE_PREFIX + str(project) + '/' + queue
   52 
   53 
   54 class DataDriver(storage.DataDriverBase):
   55     """Pooling meta-driver for routing requests to multiple backends.
   56 
   57     :param conf: Configuration from which to read pooling options
   58     :param cache: Cache instance that will be passed to individual
   59         storage driver instances that correspond to each pool. will
   60         also be used by the pool controller to reduce latency for
   61         some operations.
   62     """
   63 
   64     BASE_CAPABILITIES = tuple(storage.Capabilities)
   65 
   66     def __init__(self, conf, cache, control, control_driver=None):
   67         super(DataDriver, self).__init__(conf, cache, control_driver)
   68         catalog = Catalog(conf, cache, control)
   69         if self.conf.profiler.enabled:
   70             catalog = profiler.trace_cls("pooling_catalogue_"
   71                                          "controller")(catalog)
   72         self._pool_catalog = catalog
   73 
   74     @property
   75     def capabilities(self):
   76         # NOTE(flaper87): We can't know the capabilities
   77         # of this driver because pools are loaded based on
   78         # the queue and project of the request. Therefore,
   79         # we will just assume all capabilities are supported.
   80         # This shouldn't be an issue because the pooling driver
   81         # is neither used for pools creation nor flavor creation.
   82         return self.BASE_CAPABILITIES
   83 
   84     def close(self):
   85         cursor = self._pool_catalog._pools_ctrl.list(limit=0)
   86         # Messages of each pool
   87         for pool in next(cursor):
   88             driver = self._pool_catalog.get_driver(pool['name'])
   89             driver.close()
   90 
   91     def is_alive(self):
   92         cursor = self._pool_catalog._pools_ctrl.list(limit=0)
   93         pools = next(cursor)
   94         return all(self._pool_catalog.get_driver(pool['name']).is_alive()
   95                    for pool in pools)
   96 
   97     def _health(self):
   98         KPI = {}
   99         # Leverage the is_alive to indicate if the backend storage is
  100         # reachable or not
  101         KPI['catalog_reachable'] = self.is_alive()
  102 
  103         cursor = self._pool_catalog._pools_ctrl.list(limit=0)
  104         # Messages of each pool
  105         for pool in next(cursor):
  106             driver = self._pool_catalog.get_driver(pool['name'])
  107             KPI[pool['name']] = driver._health()
  108 
  109         return KPI
  110 
  111     def gc(self):
  112         cursor = self._pool_catalog._pools_ctrl.list(limit=0)
  113         for pool in next(cursor):
  114             driver = self._pool_catalog.get_driver(pool['name'])
  115             driver.gc()
  116 
  117     @decorators.lazy_property(write=False)
  118     def queue_controller(self):
  119         controller = QueueController(self._pool_catalog)
  120         if self.conf.profiler.enabled:
  121             return profiler.trace_cls("pooling_queue_controller")(controller)
  122         else:
  123             return controller
  124 
  125     @decorators.lazy_property(write=False)
  126     def message_controller(self):
  127         controller = MessageController(self._pool_catalog)
  128         if self.conf.profiler.enabled:
  129             return profiler.trace_cls("pooling_message_controller")(controller)
  130         else:
  131             return controller
  132 
  133     @decorators.lazy_property(write=False)
  134     def claim_controller(self):
  135         controller = ClaimController(self._pool_catalog)
  136         if self.conf.profiler.enabled:
  137             return profiler.trace_cls("pooling_claim_controller")(controller)
  138         else:
  139             return controller
  140 
  141     @decorators.lazy_property(write=False)
  142     def subscription_controller(self):
  143         controller = SubscriptionController(self._pool_catalog)
  144         if self.conf.profiler.enabled:
  145             return (profiler.trace_cls("pooling_subscription_controller")
  146                     (controller))
  147         else:
  148             return controller
  149 
  150     @decorators.lazy_property(write=False)
  151     def topic_controller(self):
  152         controller = TopicController(self._pool_catalog)
  153         if self.conf.profiler.enabled:
  154             return profiler.trace_cls("pooling_topic_controller")(controller)
  155         else:
  156             return controller
  157 
  158 
  159 class QueueController(storage.Queue):
  160     """Routes operations to get the appropriate queue controller.
  161 
  162     :param pool_catalog: a catalog of available pools
  163     :type pool_catalog: queues.pooling.base.Catalog
  164     """
  165 
  166     def __init__(self, pool_catalog):
  167         super(QueueController, self).__init__(None)
  168         self._pool_catalog = pool_catalog
  169         self._mgt_queue_ctrl = self._pool_catalog.control.queue_controller
  170         self._get_controller = self._pool_catalog.get_queue_controller
  171 
  172     def _list(self, project=None, kfilter={}, marker=None,
  173               limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False,
  174               name=None):
  175 
  176         def all_pages():
  177             yield next(self._mgt_queue_ctrl.list(
  178                 project=project,
  179                 kfilter=kfilter,
  180                 marker=marker,
  181                 limit=limit,
  182                 detailed=detailed,
  183                 name=name))
  184 
  185         # make a heap compared with 'name'
  186         ls = heapq.merge(*[
  187             utils.keyify('name', page)
  188             for page in all_pages()
  189         ])
  190 
  191         marker_name = {}
  192 
  193         # limit the iterator and strip out the comparison wrapper
  194         def it():
  195             for queue_cmp in itertools.islice(ls, limit):
  196                 marker_name['next'] = queue_cmp.obj['name']
  197                 yield queue_cmp.obj
  198 
  199         yield it()
  200         yield marker_name and marker_name['next']
  201 
  202     def _get(self, name, project=None):
  203         try:
  204             return self.get_metadata(name, project)
  205         except errors.QueueDoesNotExist:
  206             return {}
  207 
  208     def _create(self, name, metadata=None, project=None):
  209         flavor = None
  210         if isinstance(metadata, dict):
  211             flavor = metadata.get('_flavor')
  212 
  213         self._pool_catalog.register(name, project=project, flavor=flavor)
  214 
  215         # NOTE(cpp-cabrera): This should always succeed since we just
  216         # registered the project/queue. There is a race condition,
  217         # however. If between the time we register a queue and go to
  218         # look it up, the queue is deleted, then this assertion will
  219         # fail.
  220         pool = self._pool_catalog.lookup(name, project)
  221         if not pool:
  222             raise RuntimeError('Failed to register queue')
  223         return self._mgt_queue_ctrl.create(name, metadata=metadata,
  224                                            project=project)
  225 
  226     def _delete(self, name, project=None):
  227         mqHandler = self._get_controller(name, project)
  228         if mqHandler:
  229             # NOTE(cpp-cabrera): delete from the catalogue first. If
  230             # zaqar crashes in the middle of these two operations,
  231             # it is desirable that the entry be missing from the
  232             # catalogue and present in storage, rather than the
  233             # reverse. The former case leads to all operations
  234             # behaving as expected: 404s across the board, and a
  235             # functionally equivalent 204 on a create queue. The
  236             # latter case is more difficult to reason about, and may
  237             # yield 500s in some operations.
  238             self._pool_catalog.deregister(name, project)
  239             mqHandler.delete(name, project)
  240 
  241         return self._mgt_queue_ctrl.delete(name, project)
  242 
  243     def _exists(self, name, project=None):
  244         return self._mgt_queue_ctrl.exists(name, project=project)
  245 
  246     def get_metadata(self, name, project=None):
  247         return self._mgt_queue_ctrl.get_metadata(name, project=project)
  248 
  249     def set_metadata(self, name, metadata, project=None):
  250         # NOTE(gengchc2): If  flavor metadata is modified in queue,
  251         # The queue needs to be re-registered to pools, otherwise
  252         # the queue flavor parameter is not consistent with the pool.
  253         flavor = None
  254         if isinstance(metadata, dict):
  255             flavor = metadata.get('_flavor')
  256         self._pool_catalog.register(name, project=project, flavor=flavor)
  257 
  258         return self._mgt_queue_ctrl.set_metadata(name, metadata=metadata,
  259                                                  project=project)
  260 
  261     def _stats(self, name, project=None):
  262         mqHandler = self._get_controller(name, project)
  263         if mqHandler:
  264             return mqHandler.stats(name, project=project)
  265         raise errors.QueueDoesNotExist(name, project)
  266 
  267     def _calculate_resource_count(self, project=None):
  268         return self._mgt_queue_ctrl.calculate_resource_count(project=project)
  269 
  270 
  271 class MessageController(storage.Message):
  272     """Routes operations to a message controller in the appropriate pool.
  273 
  274     :param pool_catalog: a catalog of available pools
  275     :type pool_catalog: queues.pooling.base.Catalog
  276     """
  277 
  278     def __init__(self, pool_catalog):
  279         super(MessageController, self).__init__(None)
  280         self._pool_catalog = pool_catalog
  281         self._get_controller = self._pool_catalog.get_message_controller
  282 
  283     def post(self, queue, messages, client_uuid, project=None):
  284         control = self._get_controller(queue, project)
  285         if control:
  286             return control.post(queue, project=project,
  287                                 messages=messages,
  288                                 client_uuid=client_uuid)
  289         raise errors.QueueDoesNotExist(queue, project)
  290 
  291     def delete(self, queue, message_id, project=None, claim=None):
  292         control = self._get_controller(queue, project)
  293         if control:
  294             return control.delete(queue, project=project,
  295                                   message_id=message_id, claim=claim)
  296         return None
  297 
  298     def bulk_delete(self, queue, message_ids, project=None, claim_ids=None):
  299         control = self._get_controller(queue, project)
  300         if control:
  301             return control.bulk_delete(queue, project=project,
  302                                        message_ids=message_ids,
  303                                        claim_ids=claim_ids)
  304         return None
  305 
  306     def pop(self, queue, limit, project=None):
  307         control = self._get_controller(queue, project)
  308         if control:
  309             return control.pop(queue, project=project, limit=limit)
  310         return None
  311 
  312     def bulk_get(self, queue, message_ids, project=None):
  313         control = self._get_controller(queue, project)
  314         if control:
  315             return control.bulk_get(queue, project=project,
  316                                     message_ids=message_ids)
  317         return []
  318 
  319     def list(self, queue, project=None, marker=None,
  320              limit=storage.DEFAULT_MESSAGES_PER_PAGE,
  321              echo=False, client_uuid=None, include_claimed=False,
  322              include_delayed=False):
  323         control = self._get_controller(queue, project)
  324         if control:
  325             return control.list(queue, project=project,
  326                                 marker=marker, limit=limit,
  327                                 echo=echo, client_uuid=client_uuid,
  328                                 include_claimed=include_claimed,
  329                                 include_delayed=include_delayed)
  330         return iter([[]])
  331 
  332     def get(self, queue, message_id, project=None):
  333         control = self._get_controller(queue, project)
  334         if control:
  335             return control.get(queue, message_id=message_id,
  336                                project=project)
  337         raise errors.QueueDoesNotExist(queue, project)
  338 
  339     def first(self, queue, project=None, sort=1):
  340         control = self._get_controller(queue, project)
  341         if control:
  342             return control.first(queue, project=project, sort=sort)
  343         raise errors.QueueDoesNotExist(queue, project)
  344 
  345 
  346 class ClaimController(storage.Claim):
  347     """Routes operations to a claim controller in the appropriate pool.
  348 
  349     :param pool_catalog: a catalog of available pools
  350     :type pool_catalog: queues.pooling.base.Catalog
  351     """
  352 
  353     def __init__(self, pool_catalog):
  354         super(ClaimController, self).__init__(None)
  355         self._pool_catalog = pool_catalog
  356         self._get_controller = self._pool_catalog.get_claim_controller
  357 
  358     def create(self, queue, metadata, project=None,
  359                limit=storage.DEFAULT_MESSAGES_PER_CLAIM):
  360         control = self._get_controller(queue, project)
  361         if control:
  362             return control.create(queue, metadata=metadata,
  363                                   project=project, limit=limit)
  364         return [None, []]
  365 
  366     def get(self, queue, claim_id, project=None):
  367         control = self._get_controller(queue, project)
  368         if control:
  369             return control.get(queue, claim_id=claim_id,
  370                                project=project)
  371         raise errors.ClaimDoesNotExist(claim_id, queue, project)
  372 
  373     def update(self, queue, claim_id, metadata, project=None):
  374         control = self._get_controller(queue, project)
  375         if control:
  376             return control.update(queue, claim_id=claim_id,
  377                                   project=project, metadata=metadata)
  378         raise errors.ClaimDoesNotExist(claim_id, queue, project)
  379 
  380     def delete(self, queue, claim_id, project=None):
  381         control = self._get_controller(queue, project)
  382         if control:
  383             return control.delete(queue, claim_id=claim_id,
  384                                   project=project)
  385         return None
  386 
  387 
  388 class SubscriptionController(storage.Subscription):
  389     """Controller to facilitate processing for subscription operations."""
  390 
  391     _resource_name = 'subscription'
  392 
  393     def __init__(self, pool_catalog):
  394         super(SubscriptionController, self).__init__(pool_catalog)
  395         self._pool_catalog = pool_catalog
  396         self._get_controller = self._pool_catalog.get_subscription_controller
  397 
  398     def list(self, queue, project=None, marker=None,
  399              limit=storage.DEFAULT_SUBSCRIPTIONS_PER_PAGE):
  400         control = self._get_controller(queue, project)
  401         if control:
  402             return control.list(queue, project=project,
  403                                 marker=marker, limit=limit)
  404 
  405     def get(self, queue, subscription_id, project=None):
  406         control = self._get_controller(queue, project)
  407         if control:
  408             return control.get(queue, subscription_id, project=project)
  409 
  410     def create(self, queue, subscriber, ttl, options, project=None):
  411         control = self._get_controller(queue, project)
  412         if control:
  413             return control.create(queue, subscriber,
  414                                   ttl, options,
  415                                   project=project)
  416 
  417     def update(self, queue, subscription_id, project=None, **kwargs):
  418         control = self._get_controller(queue, project)
  419         if control:
  420             return control.update(queue, subscription_id,
  421                                   project=project, **kwargs)
  422 
  423     def delete(self, queue, subscription_id, project=None):
  424         control = self._get_controller(queue, project)
  425         if control:
  426             return control.delete(queue, subscription_id,
  427                                   project=project)
  428 
  429     def exists(self, queue, subscription_id, project=None):
  430         control = self._get_controller(queue, project)
  431         if control:
  432             return control.exists(queue, subscription_id,
  433                                   project=project)
  434 
  435     def confirm(self, queue, subscription_id, project=None, confirmed=None):
  436         control = self._get_controller(queue, project)
  437         if control:
  438             return control.confirm(queue, subscription_id,
  439                                    project=project, confirmed=confirmed)
  440 
  441     def get_with_subscriber(self, queue, subscriber, project=None):
  442         control = self._get_controller(queue, project)
  443         if control:
  444             return control.get_with_subscriber(queue, subscriber, project)
  445 
  446 
  447 class Catalog(object):
  448     """Represents the mapping between queues and pool drivers."""
  449 
  450     def __init__(self, conf, cache, control):
  451         self._drivers = {}
  452         self._conf = conf
  453         self._cache = cache
  454         self.control = control
  455 
  456         self._conf.register_opts(pooling_catalog.ALL_OPTS,
  457                                  group=pooling_catalog.GROUP_NAME)
  458         self._catalog_conf = self._conf[pooling_catalog.GROUP_NAME]
  459 
  460         self._pools_ctrl = control.pools_controller
  461         self._flavor_ctrl = control.flavors_controller
  462         self._catalogue_ctrl = control.catalogue_controller
  463 
  464     # FIXME(cpp-cabrera): https://bugs.launchpad.net/zaqar/+bug/1252791
  465     def _init_driver(self, pool_id, pool_conf=None):
  466         """Given a pool name, returns a storage driver.
  467 
  468         :param pool_id: The name of a pool.
  469         :type pool_id: six.text_type
  470         :returns: a storage driver
  471         :rtype: zaqar.storage.base.DataDriverBase
  472         """
  473         if pool_id is not None:
  474             pool = self._pools_ctrl.get(pool_id, detailed=True)
  475         else:
  476             pool = pool_conf
  477         conf = utils.dynamic_conf(pool['uri'], pool['options'],
  478                                   conf=self._conf)
  479         storage = utils.load_storage_driver(conf,
  480                                             self._cache,
  481                                             control_driver=self.control)
  482         return pipeline.DataDriver(conf, storage, self.control)
  483 
  484     @decorators.caches(_pool_cache_key, _POOL_CACHE_TTL)
  485     def _pool_id(self, queue, project=None):
  486         """Get the ID for the pool assigned to the given queue.
  487 
  488         :param queue: name of the queue
  489         :param project: project to which the queue belongs
  490 
  491         :returns: pool id
  492 
  493         :raises QueueNotMapped: if queue is not mapped
  494         """
  495         return self._catalogue_ctrl.get(project, queue)['pool']
  496 
  497     def register(self, queue, project=None, flavor=None):
  498         """Register a new queue in the pool catalog.
  499 
  500         This method should be called whenever a new queue is being
  501         created, and will create an entry in the pool catalog for
  502         the given queue.
  503 
  504         After using this method to register the queue in the
  505         catalog, the caller should call `lookup()` to get a reference
  506         to a storage driver which will allow interacting with the
  507         queue's assigned backend pool.
  508 
  509         :param queue: Name of the new queue to assign to a pool
  510         :type queue: six.text_type
  511         :param project: Project to which the queue belongs, or
  512             None for the "global" or "generic" project.
  513         :type project: six.text_type
  514         :param flavor: Flavor for the queue (OPTIONAL)
  515         :type flavor: six.text_type
  516 
  517         :raises NoPoolFound: if not found
  518 
  519         """
  520 
  521         # NOTE(gengchc): if exist, get queue's pool.flavor:
  522         # if queue's pool.flavor is different, first delete it and add it.
  523         #  Otherwise, if the flavor in the meteredata of the queue is
  524         #  modified, the catalog will be inconsistent.
  525         if self._catalogue_ctrl.exists(project, queue):
  526             catalogue = self._catalogue_ctrl.get(project, queue)
  527             oldpoolids = catalogue['pool']
  528             oldpool = self._pools_ctrl.get(oldpoolids)
  529             oldflavor = oldpool['flavor']
  530             msgtmpl = _(u'register queue to pool: old flavor: %(oldflavor)s '
  531                         ', new flavor: %(flavor)s')
  532             LOG.info(msgtmpl,
  533                      {'oldflavor': oldflavor, 'flavor': flavor})
  534             if oldpool['flavor'] != flavor:
  535                 self._catalogue_ctrl.delete(project, queue)
  536 
  537         if not self._catalogue_ctrl.exists(project, queue):
  538             if flavor is not None:
  539                 flavor = self._flavor_ctrl.get(flavor, project=project)
  540                 pools = self._pools_ctrl.get_pools_by_flavor(
  541                     flavor=flavor,
  542                     detailed=True)
  543                 pool = select.weighted(pools)
  544                 pool = pool and pool['name'] or None
  545                 msgtmpl = _(u'register queue to pool: new flavor:%(flavor)s')
  546                 LOG.info(msgtmpl,
  547                          {'flavor': flavor.get('name', None)})
  548             else:
  549                 # NOTE(flaper87): Get pools assigned to the default
  550                 # group `None`. We should consider adding a `default_group`
  551                 # option in the future.
  552                 pools = self._pools_ctrl.get_pools_by_flavor(detailed=True)
  553                 pool = select.weighted(pools)
  554                 pool = pool and pool['name'] or None
  555 
  556                 if not pool:
  557                     # NOTE(flaper87): We used to raise NoPoolFound in this
  558                     # case but we've decided to support automatic pool
  559                     # creation. Note that we're now returning and the queue
  560                     # is not being registered in the catalogue. This is done
  561                     # on purpose since no pool exists and the "dummy" pool
  562                     # doesn't exist in the storage
  563                     if self.lookup(queue, project) is not None:
  564                         return
  565                     raise errors.NoPoolFound()
  566                     msgtmpl = _(u'register queue to pool: new flavor: None')
  567                     LOG.info(msgtmpl)
  568 
  569             msgtmpl = _(u'register queue: project:%(project)s'
  570                         ' queue:%(queue)s pool:%(pool)s')
  571             LOG.info(msgtmpl,
  572                      {'project': project,
  573                       'queue': queue,
  574                       'pool': pool})
  575             self._catalogue_ctrl.insert(project, queue, pool)
  576 
  577     @_pool_id.purges
  578     def deregister(self, queue, project=None):
  579         """Removes a queue from the pool catalog.
  580 
  581         Call this method after successfully deleting it from a
  582         backend pool.
  583 
  584         :param queue: Name of the new queue to assign to a pool
  585         :type queue: six.text_type
  586         :param project: Project to which the queue belongs, or
  587             None for the "global" or "generic" project.
  588         :type project: six.text_type
  589         """
  590         self._catalogue_ctrl.delete(project, queue)
  591 
  592     def get_queue_controller(self, queue, project=None):
  593         """Lookup the queue controller for the given queue and project.
  594 
  595         :param queue: Name of the queue for which to find a pool
  596         :param project: Project to which the queue belongs, or
  597             None to specify the "global" or "generic" project.
  598 
  599         :returns: The queue controller associated with the data driver for
  600             the pool containing (queue, project) or None if this doesn't exist.
  601         :rtype: Maybe QueueController
  602         """
  603         target = self.lookup(queue, project)
  604         return target and target.queue_controller
  605 
  606     def get_message_controller(self, queue, project=None):
  607         """Lookup the message controller for the given queue and project.
  608 
  609         :param queue: Name of the queue for which to find a pool
  610         :param project: Project to which the queue belongs, or
  611             None to specify the "global" or "generic" project.
  612 
  613         :returns: The message controller associated with the data driver for
  614             the pool containing (queue, project) or None if this doesn't exist.
  615         :rtype: Maybe MessageController
  616         """
  617         target = self.lookup(queue, project)
  618         return target and target.message_controller
  619 
  620     def get_claim_controller(self, queue, project=None):
  621         """Lookup the claim controller for the given queue and project.
  622 
  623         :param queue: Name of the queue for which to find a pool
  624         :param project: Project to which the queue belongs, or
  625             None to specify the "global" or "generic" project.
  626 
  627         :returns: The claim controller associated with the data driver for
  628             the pool containing (queue, project) or None if this doesn't exist.
  629         :rtype: Maybe ClaimController
  630         """
  631         target = self.lookup(queue, project)
  632         return target and target.claim_controller
  633 
  634     def get_subscription_controller(self, queue, project=None):
  635         """Lookup the subscription controller for the given queue and project.
  636 
  637         :param queue: Name of the queue for which to find a pool
  638         :param project: Project to which the queue belongs, or
  639             None to specify the "global" or "generic" project.
  640 
  641         :returns: The subscription controller associated with the data driver
  642             for the pool containing (queue, project) or None if this doesn't
  643             exist.
  644         :rtype: Maybe SubscriptionController
  645         """
  646         target = self.lookup(queue, project)
  647         return target and target.subscription_controller
  648 
  649     def get_topic_controller(self, topic, project=None):
  650         """Lookup the topic controller for the given queue and project.
  651 
  652         :param topic: Name of the topic for which to find a pool
  653         :param project: Project to which the topic belongs, or
  654             None to specify the "global" or "generic" project.
  655 
  656         :returns: The topic controller associated with the data driver for
  657             the pool containing (queue, project) or None if this doesn't exist.
  658         :rtype: Maybe TopicController
  659         """
  660         target = self.lookup(topic, project)
  661         return target and target.topic_controller
  662 
  663     def get_default_pool(self, use_listing=True):
  664         if use_listing:
  665             cursor = self._pools_ctrl.list(limit=0)
  666             pools_list = list(next(cursor))
  667             if pools_list:
  668                 return self.get_driver(pools_list[0]['name'])
  669 
  670         if self._catalog_conf.enable_virtual_pool:
  671             conf_section = ('drivers:message_store:%s' %
  672                             self._conf.drivers.message_store)
  673 
  674             try:
  675                 # NOTE(flaper87): Try to load the driver to check
  676                 # whether it can be used as the default store for
  677                 # the default pool.
  678                 utils.load_storage_driver(self._conf, self._cache,
  679                                           control_driver=self.control)
  680             except cerrors.InvalidDriver:
  681                 # NOTE(kgriffs): Return `None`, rather than letting the
  682                 # exception bubble up, so that the higher layer doesn't
  683                 # have to duplicate the try..except..log code all over
  684                 # the place.
  685                 return None
  686 
  687             if conf_section not in self._conf:
  688                 # NOTE(flaper87): If there's no config section for this storage
  689                 # skip the pool registration entirely since we won't know how
  690                 # to connect to it.
  691                 return None
  692 
  693             # NOTE(flaper87): This assumes the storage driver type is the
  694             # same as the management.
  695             pool_conf = {'uri': self._conf[conf_section].uri,
  696                          'options': {}}
  697 
  698             # NOTE(flaper87): This will be using the config
  699             # storage configuration as the default one if no
  700             # default storage has been registered in the pool
  701             # store.
  702             return self.get_driver(None, pool_conf)
  703 
  704     def lookup(self, queue, project=None):
  705         """Lookup a pool driver for the given queue and project.
  706 
  707         :param queue: Name of the queue for which to find a pool
  708         :param project: Project to which the queue belongs, or
  709             None to specify the "global" or "generic" project.
  710 
  711         :returns: A storage driver instance for the appropriate pool. If
  712             the driver does not exist yet, it is created and cached. If the
  713             queue is not mapped, returns None.
  714         :rtype: Maybe DataDriver
  715         """
  716 
  717         try:
  718             pool_id = self._pool_id(queue, project)
  719         except errors.QueueNotMapped as ex:
  720             LOG.debug(ex)
  721 
  722             return self.get_default_pool(use_listing=False)
  723 
  724         return self.get_driver(pool_id)
  725 
  726     def get_driver(self, pool_id, pool_conf=None):
  727         """Get storage driver, preferably cached, from a pool name.
  728 
  729         :param pool_id: The name of a pool.
  730         :type pool_id: six.text_type
  731         :returns: a storage driver
  732         :rtype: zaqar.storage.base.DataDriver
  733         """
  734 
  735         try:
  736             return self._drivers[pool_id]
  737         except KeyError:
  738             # NOTE(cpp-cabrera): cache storage driver connection
  739             self._drivers[pool_id] = self._init_driver(pool_id, pool_conf)
  740 
  741             return self._drivers[pool_id]
  742 
  743 
  744 class TopicController(storage.Topic):
  745     """Routes operations to get the appropriate topic controller.
  746 
  747     :param pool_catalog: a catalog of available pools
  748     :type pool_catalog: queues.pooling.base.Catalog
  749     """
  750 
  751     def __init__(self, pool_catalog):
  752         super(TopicController, self).__init__(None)
  753         self._pool_catalog = pool_catalog
  754         self._mgt_topic_ctrl = self._pool_catalog.control.topic_controller
  755         self._get_controller = self._pool_catalog.get_topic_controller
  756 
  757     def _list(self, project=None, kfilter={}, marker=None,
  758               limit=storage.DEFAULT_TOPICS_PER_PAGE, detailed=False,
  759               name=None):
  760 
  761         def all_pages():
  762             yield next(self._mgt_topic_ctrl.list(
  763                 project=project,
  764                 kfilter=kfilter,
  765                 marker=marker,
  766                 limit=limit,
  767                 detailed=detailed,
  768                 name=name))
  769 
  770         # make a heap compared with 'name'
  771         ls = heapq.merge(*[
  772             utils.keyify('name', page)
  773             for page in all_pages()
  774         ])
  775 
  776         marker_name = {}
  777 
  778         # limit the iterator and strip out the comparison wrapper
  779         def it():
  780             for topic_cmp in itertools.islice(ls, limit):
  781                 marker_name['next'] = topic_cmp.obj['name']
  782                 yield topic_cmp.obj
  783 
  784         yield it()
  785         yield marker_name and marker_name['next']
  786 
  787     def _get(self, name, project=None):
  788         try:
  789             return self.get_metadata(name, project)
  790         except errors.TopicDoesNotExist:
  791             return {}
  792 
  793     def _create(self, name, metadata=None, project=None):
  794         flavor = None
  795         if isinstance(metadata, dict):
  796             flavor = metadata.get('_flavor')
  797 
  798         self._pool_catalog.register(name, project=project, flavor=flavor)
  799 
  800         # NOTE(cpp-cabrera): This should always succeed since we just
  801         # registered the project/topic. There is a race condition,
  802         # however. If between the time we register a topic and go to
  803         # look it up, the topic is deleted, then this assertion will
  804         # fail.
  805         pool = self._pool_catalog.lookup(name, project)
  806         if not pool:
  807             raise RuntimeError('Failed to register topic')
  808         return self._mgt_topic_ctrl.create(name, metadata=metadata,
  809                                            project=project)
  810 
  811     def _delete(self, name, project=None):
  812         mtHandler = self._get_controller(name, project)
  813         if mtHandler:
  814             # NOTE(cpp-cabrera): delete from the catalogue first. If
  815             # zaqar crashes in the middle of these two operations,
  816             # it is desirable that the entry be missing from the
  817             # catalogue and present in storage, rather than the
  818             # reverse. The former case leads to all operations
  819             # behaving as expected: 404s across the board, and a
  820             # functionally equivalent 204 on a create queue. The
  821             # latter case is more difficult to reason about, and may
  822             # yield 500s in some operations.
  823             self._pool_catalog.deregister(name, project)
  824             mtHandler.delete(name, project)
  825 
  826         return self._mgt_topic_ctrl.delete(name, project)
  827 
  828     def _exists(self, name, project=None):
  829         return self._mgt_topic_ctrl.exists(name, project=project)
  830 
  831     def get_metadata(self, name, project=None):
  832         return self._mgt_topic_ctrl.get_metadata(name, project=project)
  833 
  834     def set_metadata(self, name, metadata, project=None):
  835         # NOTE(gengchc2): If flavor metadata is modified in topic,
  836         # The topic needs to be re-registered to pools, otherwise
  837         # the topic flavor parameter is not consistent with the pool.
  838         flavor = None
  839         if isinstance(metadata, dict):
  840             flavor = metadata.get('_flavor')
  841         self._pool_catalog.register(name, project=project, flavor=flavor)
  842 
  843         return self._mgt_topic_ctrl.set_metadata(name, metadata=metadata,
  844                                                  project=project)
  845 
  846     def _stats(self, name, project=None):
  847         mtHandler = self._get_controller(name, project)
  848         if mtHandler:
  849             return mtHandler.stats(name, project=project)
  850         raise errors.TopicDoesNotExist(name, project)