"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-10.0.0/zaqar/storage/base.py" (13 May 2020, 40472 Bytes) of package /linux/misc/openstack/zaqar-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "base.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 9.0.0_vs_10.0.0.

    1 # Copyright (c) 2013 Red Hat, Inc.
    2 # Copyright 2014 Catalyst IT Ltd
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License");
    5 # you may not use this file except in compliance with the License.
    6 # You may obtain a copy of the License at
    7 #
    8 #    http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS,
   12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   13 # implied.
   14 # See the License for the specific language governing permissions and
   15 # limitations under the License.
   16 
   17 """Implements the DriverBase abstract class for Zaqar storage drivers."""
   18 
   19 import abc
   20 import functools
   21 import time
   22 
   23 import enum
   24 from oslo_config import cfg
   25 from oslo_log import log as logging
   26 from oslo_utils import uuidutils
   27 import six
   28 
   29 from zaqar.common import decorators
   30 from zaqar.storage import errors
   31 from zaqar.storage import utils
   32 
   33 
   34 DEFAULT_QUEUES_PER_PAGE = 10
   35 DEFAULT_MESSAGES_PER_PAGE = 10
   36 DEFAULT_POOLS_PER_PAGE = 10
   37 DEFAULT_SUBSCRIPTIONS_PER_PAGE = 10
   38 DEFAULT_TOPICS_PER_PAGE = 10
   39 
   40 DEFAULT_MESSAGES_PER_CLAIM = 10
   41 
   42 LOG = logging.getLogger(__name__)
   43 
   44 
   45 @enum.unique
   46 class Capabilities(enum.IntEnum):
   47     """Enum of storage capabilities."""
   48     FIFO = 1
   49     CLAIMS = 2
   50     DURABILITY = 3
   51     AOD = 4  # At least once delivery
   52     HIGH_THROUGHPUT = 5
   53 
   54 
   55 @six.add_metaclass(abc.ABCMeta)
   56 class DriverBase(object):
   57     """Base class for both data and control plane drivers
   58 
   59     :param conf: Configuration containing options for this driver.
   60     :type conf: `oslo_config.ConfigOpts`
   61     :param cache: Cache instance to use for reducing latency
   62         for certain lookups.
   63     :type cache: `dogpile.cache.region.CacheRegion`
   64     """
   65     _DRIVER_OPTIONS = []
   66 
   67     def __init__(self, conf, cache):
   68         self.conf = conf
   69         self.cache = cache
   70         self._register_opts()
   71 
   72     def _register_opts(self):
   73         for group, options in self._DRIVER_OPTIONS:
   74             for opt in options:
   75                 try:
   76                     self.conf.register_opt(opt, group=group)
   77                 except cfg.DuplicateOptError:
   78                     pass
   79 
   80 
   81 @six.add_metaclass(abc.ABCMeta)
   82 class DataDriverBase(DriverBase):
   83     """Interface definition for storage drivers.
   84 
   85     Data plane storage drivers are responsible for implementing the
   86     core functionality of the system.
   87 
   88     Connection information and driver-specific options are
   89     loaded from the config file or the pool catalog.
   90 
   91     :param conf: Configuration containing options for this driver.
   92     :type conf: `oslo_config.ConfigOpts`
   93     :param cache: Cache instance to use for reducing latency
   94         for certain lookups.
   95     :type cache: `dogpile.cache.region.CacheRegion`
   96     """
   97 
   98     BASE_CAPABILITIES = []
   99 
  100     def __init__(self, conf, cache, control_driver):
  101         super(DataDriverBase, self).__init__(conf, cache)
  102         # creating ControlDriver instance for accessing QueueController's
  103         # data from DataDriver
  104         self.control_driver = control_driver
  105 
  106     @abc.abstractmethod
  107     def is_alive(self):
  108         """Check whether the storage is ready."""
  109         raise NotImplementedError
  110 
  111     @abc.abstractproperty
  112     def capabilities(self):
  113         """Returns storage's capabilities."""
  114         return self.BASE_CAPABILITIES
  115 
  116     def health(self):
  117         """Return the health status of service."""
  118         overall_health = {}
  119         # NOTE(flwang): KPI extracted from different storage backends,
  120         # _health() will be implemented by different storage drivers.
  121         backend_health = self._health()
  122         if backend_health:
  123             overall_health.update(backend_health)
  124 
  125         return overall_health
  126 
  127     @abc.abstractmethod
  128     def _health(self):
  129         """Return the health status based on different backends."""
  130         raise NotImplementedError
  131 
  132     @abc.abstractmethod
  133     def close(self):
  134         """Close connections to the backend."""
  135         raise NotImplementedError
  136 
  137     def _get_operation_status(self):
  138         op_status = {}
  139         status_template = lambda s, t, r: {'succeeded': s,
  140                                            'seconds': t,
  141                                            'ref': r}
  142         project = uuidutils.generate_uuid()
  143         queue = uuidutils.generate_uuid()
  144         client = uuidutils.generate_uuid()
  145         msg_template = lambda s: {'ttl': 600, 'body': {'event': 'p_%s' % s}}
  146         messages = [msg_template(i) for i in range(100)]
  147         claim_metadata = {'ttl': 60, 'grace': 300}
  148 
  149         # NOTE (flwang): Using time.time() instead of timeit since timeit will
  150         # make the method calling be complicated.
  151         def _handle_status(operation_type, callable_operation):
  152             succeeded = True
  153             ref = None
  154             result = None
  155             try:
  156                 start = time.time()
  157                 result = callable_operation()
  158             except Exception:
  159                 ref = uuidutils.generate_uuid()
  160                 LOG.exception('Error calling operation.',
  161                               extra={'instance_uuid': ref})
  162                 succeeded = False
  163             status = status_template(succeeded, time.time() - start, ref)
  164             op_status[operation_type] = status
  165             return succeeded, result
  166 
  167         # create queue
  168         func = functools.partial(self.queue_controller.create,
  169                                  queue, project=project)
  170         succeeded, _ = _handle_status('create_queue', func)
  171 
  172         # post messages
  173         if succeeded:
  174             func = functools.partial(self.message_controller.post,
  175                                      queue, messages, client, project=project)
  176             _, msg_ids = _handle_status('post_messages', func)
  177 
  178             # claim messages
  179             if msg_ids:
  180                 func = functools.partial(self.claim_controller.create,
  181                                          queue, claim_metadata,
  182                                          project=project)
  183                 _, (claim_id, claim_msgs) = _handle_status('claim_messages',
  184                                                            func)
  185 
  186                 # list messages
  187                 func = functools.partial(self.message_controller.list,
  188                                          queue, project, echo=True,
  189                                          client_uuid=client,
  190                                          include_claimed=True)
  191                 _handle_status('list_messages', func)
  192 
  193                 # delete messages
  194                 if claim_id and claim_msgs:
  195                     for message in claim_msgs:
  196                         func = functools.partial(self.
  197                                                  message_controller.delete,
  198                                                  queue, message['id'],
  199                                                  project, claim=claim_id)
  200                         succeeded, _ = _handle_status('delete_messages', func)
  201                         if not succeeded:
  202                             break
  203                     # delete claim
  204                     func = functools.partial(self.claim_controller.delete,
  205                                              queue, claim_id, project)
  206                     _handle_status('delete_claim', func)
  207 
  208             # delete queue
  209             func = functools.partial(self.message_controller.bulk_delete,
  210                                      queue, msg_ids, project=project)
  211             _handle_status('bulk_delete_messages', func)
  212             func = functools.partial(self.queue_controller.delete,
  213                                      queue, project=project)
  214             _handle_status('delete_queue', func)
  215         return op_status
  216 
  217     def gc(self):
  218         """Perform manual garbage collection of claims and messages.
  219 
  220         This method can be overridden in order to provide a trigger
  221         that can be called by so-called "garbage collection" scripts
  222         that are required by some drivers.
  223 
  224         By default, this method does nothing.
  225         """
  226         pass
  227 
  228     @decorators.lazy_property(write=False)
  229     def queue_controller(self):
  230         return self.control_driver.queue_controller
  231 
  232     @abc.abstractproperty
  233     def message_controller(self):
  234         """Returns the driver's message controller."""
  235         raise NotImplementedError
  236 
  237     @abc.abstractproperty
  238     def claim_controller(self):
  239         """Returns the driver's claim controller."""
  240         raise NotImplementedError
  241 
  242     @abc.abstractproperty
  243     def subscription_controller(self):
  244         """Returns the driver's subscription controller."""
  245         raise NotImplementedError
  246 
  247     @decorators.lazy_property(write=False)
  248     def topic_controller(self):
  249         """Returns the driver's topic controller."""
  250         return self.control_driver.topic_controller
  251 
  252 
  253 @six.add_metaclass(abc.ABCMeta)
  254 class ControlDriverBase(DriverBase):
  255     """Interface definition for control plane storage drivers.
  256 
  257     Storage drivers that work at the control plane layer allow one to
  258     modify aspects of the functionality of the system. This is ideal
  259     for administrative purposes.
  260 
  261     Allows access to the pool registry through a catalogue and a
  262     pool controller.
  263 
  264     :param conf: Configuration containing options for this driver.
  265     :type conf: `oslo_config.ConfigOpts`
  266     :param cache: Cache instance to use for reducing latency
  267         for certain lookups.
  268     :type cache: `dogpile.cache.region.CacheRegion`
  269     """
  270 
  271     @abc.abstractproperty
  272     def catalogue_controller(self):
  273         """Returns the driver's catalogue controller."""
  274         raise NotImplementedError
  275 
  276     @abc.abstractproperty
  277     def pools_controller(self):
  278         """Returns storage's pool management controller."""
  279         raise NotImplementedError
  280 
  281     @abc.abstractproperty
  282     def flavors_controller(self):
  283         """Returns storage's flavor management controller."""
  284         raise NotImplementedError
  285 
  286     @abc.abstractproperty
  287     def queue_controller(self):
  288         """Returns the driver's queue controller."""
  289         raise NotImplementedError
  290 
  291     @abc.abstractproperty
  292     def topic_controller(self):
  293         """Returns the driver's topic controller."""
  294         raise NotImplementedError
  295 
  296     @abc.abstractmethod
  297     def close(self):
  298         """Close connections to the backend."""
  299         raise NotImplementedError
  300 
  301 
  302 class ControllerBase(object):
  303     """Top-level class for controllers.
  304 
  305     :param driver: Instance of the driver
  306         instantiating this controller.
  307     """
  308 
  309     def __init__(self, driver):
  310         self.driver = driver
  311 
  312 
  313 @six.add_metaclass(abc.ABCMeta)
  314 class Queue(ControllerBase):
  315     """This class is responsible for managing queues.
  316 
  317     Queue operations include CRUD, monitoring, etc.
  318 
  319     Storage driver implementations of this class should
  320     be capable of handling high workloads and huge
  321     numbers of queues.
  322     """
  323 
  324     def list(self, project=None, kfilter={}, marker=None,
  325              limit=DEFAULT_QUEUES_PER_PAGE, detailed=False, name=None):
  326         """Base method for listing queues.
  327 
  328         :param project: Project id
  329         :param kfilter: The key-value of metadata which user want to filter
  330         :param marker: The last queue name
  331         :param limit: (Default 10) Max number of queues to return
  332         :param detailed: Whether metadata is included
  333         :param name: The queue name which user want to filter
  334 
  335         :returns: An iterator giving a sequence of queues
  336             and the marker of the next page.
  337         """
  338         return self._list(project, kfilter, marker, limit, detailed, name)
  339 
  340     _list = abc.abstractmethod(lambda x: None)
  341 
  342     def get(self, name, project=None):
  343         """Base method for queue metadata retrieval.
  344 
  345         :param name: The queue name
  346         :param project: Project id
  347 
  348         :returns: Dictionary containing queue metadata
  349         :raises DoesNotExist: if queue metadata does not exist
  350         """
  351         return self._get(name, project)
  352 
  353     _get = abc.abstractmethod(lambda x: None)
  354 
  355     def get_metadata(self, name, project=None):
  356         """Base method for queue metadata retrieval.
  357 
  358         :param name: The queue name
  359         :param project: Project id
  360 
  361         :returns: Dictionary containing queue metadata
  362         :raises DoesNotExist: if queue metadata does not exist
  363         """
  364         raise NotImplementedError
  365 
  366     def set_metadata(self, name, metadata, project=None):
  367         """Base method for updating a queue metadata.
  368 
  369         :param name: The queue name
  370         :param metadata: Queue metadata as a dict
  371         :param project: Project id
  372         :raises DoesNotExist: if queue metadata can not be updated
  373         """
  374         raise NotImplementedError
  375 
  376     def create(self, name, metadata=None, project=None):
  377         """Base method for queue creation.
  378 
  379         :param name: The queue name
  380         :param project: Project id
  381         :returns: True if a queue was created and False
  382             if it was updated.
  383         """
  384         return self._create(name, metadata, project)
  385 
  386     _create = abc.abstractmethod(lambda x: None)
  387 
  388     def exists(self, name, project=None):
  389         """Base method for testing queue existence.
  390 
  391         :param name: The queue name
  392         :param project: Project id
  393         :returns: True if a queue exists and False
  394             if it does not.
  395         """
  396         return self._exists(name, project)
  397 
  398     _exists = abc.abstractmethod(lambda x: None)
  399 
  400     def delete(self, name, project=None):
  401         """Base method for deleting a queue.
  402 
  403         :param name: The queue name
  404         :param project: Project id
  405         """
  406         return self._delete(name, project)
  407 
  408     _delete = abc.abstractmethod(lambda x: None)
  409 
  410     def stats(self, name, project=None):
  411         """Base method for queue stats.
  412 
  413         :param name: The queue name
  414         :param project: Project id
  415         :returns: Dictionary with the
  416             queue stats
  417         """
  418         return self._stats(name, project)
  419 
  420     _stats = abc.abstractmethod(lambda x: None)
  421 
  422     def calculate_resource_count(self, project=None):
  423         """Base method for calculate queues amount.
  424 
  425         :param project: Project id
  426         :returns: The number of queues.
  427         """
  428         return self._calculate_resource_count(project)
  429 
  430     _calculate_resource_count = abc.abstractmethod(lambda x: None)
  431 
  432 
  433 @six.add_metaclass(abc.ABCMeta)
  434 class Message(ControllerBase):
  435     """This class is responsible for managing message CRUD."""
  436 
  437     @abc.abstractmethod
  438     def list(self, queue, project=None, marker=None,
  439              limit=DEFAULT_MESSAGES_PER_PAGE,
  440              echo=False, client_uuid=None,
  441              include_claimed=False, include_delayed=False):
  442         """Base method for listing messages.
  443 
  444         :param queue: Name of the queue to get the
  445             message from.
  446         :param project: Project id
  447         :param marker: Tail identifier
  448         :param limit: (Default 10) Max number of messages to return.
  449         :type limit: Maybe int
  450         :param echo: (Default False) Boolean expressing whether
  451             or not this client should receive its own messages.
  452         :param client_uuid: A UUID object. Required when echo=False.
  453         :param include_claimed: omit claimed messages from listing?
  454         :type include_claimed: bool
  455         :param include_delayed: omit delayed messages from listing
  456         :type include_delayed: bool
  457 
  458         :returns: An iterator giving a sequence of messages and
  459             the marker of the next page.
  460         """
  461         raise NotImplementedError
  462 
  463     @abc.abstractmethod
  464     def first(self, queue, project=None, sort=1):
  465         """Get first message in the queue (including claimed).
  466 
  467         :param queue: Name of the queue to list
  468         :param sort: (Default 1) Sort order for the listing. Pass 1 for
  469             ascending (oldest message first), or -1 for descending (newest
  470             message first).
  471 
  472         :returns: First message in the queue, or None if the queue is
  473             empty
  474         """
  475         raise NotImplementedError
  476 
  477     @abc.abstractmethod
  478     def get(self, queue, message_id, project=None):
  479         """Base method for getting a message.
  480 
  481         :param queue: Name of the queue to get the
  482             message from.
  483         :param project: Project id
  484         :param message_id: Message ID
  485 
  486         :returns: Dictionary containing message data
  487         :raises DoesNotExist: if message data can not be got
  488         """
  489         raise NotImplementedError
  490 
  491     @abc.abstractmethod
  492     def bulk_get(self, queue, message_ids, project=None):
  493         """Base method for getting multiple messages.
  494 
  495         :param queue: Name of the queue to get the
  496             message from.
  497         :param project: Project id
  498         :param message_ids: A sequence of message IDs.
  499 
  500         :returns: An iterable, yielding dicts containing
  501             message details
  502         """
  503         raise NotImplementedError
  504 
  505     @abc.abstractmethod
  506     def post(self, queue, messages, client_uuid, project=None):
  507         """Base method for posting one or more messages.
  508 
  509         Implementations of this method should guarantee
  510         and preserve the order, in the returned list, of
  511         incoming messages.
  512 
  513         :param queue: Name of the queue to post message to.
  514         :param messages: Messages to post to queue, an iterable
  515             yielding 1 or more elements. An empty iterable
  516             results in undefined behavior.
  517         :param client_uuid: A UUID object.
  518         :param project: Project id
  519 
  520         :returns: List of message ids
  521         """
  522         raise NotImplementedError
  523 
  524     @abc.abstractmethod
  525     def delete(self, queue, message_id, project=None, claim=None):
  526         """Base method for deleting a single message.
  527 
  528         :param queue: Name of the queue to post
  529             message to.
  530         :param message_id: Message to be deleted
  531         :param project: Project id
  532         :param claim: Claim this message
  533             belongs to. When specified, claim must
  534             be valid and message_id must belong to
  535             it.
  536         """
  537         raise NotImplementedError
  538 
  539     @abc.abstractmethod
  540     def bulk_delete(self, queue, message_ids, project=None, claim_ids=None):
  541         """Base method for deleting multiple messages.
  542 
  543         :param queue: Name of the queue to post
  544             message to.
  545         :param message_ids: A sequence of message IDs
  546             to be deleted.
  547         :param project: Project id
  548         :param claim_ids: claim IDs passed in by the delete request
  549         """
  550         raise NotImplementedError
  551 
  552     @abc.abstractmethod
  553     def pop(self, queue, limit, project=None):
  554         """Base method for popping messages.
  555 
  556         :param queue: Name of the queue to pop
  557             message from.
  558         :param limit: Number of messages to pop.
  559         :param project: Project id
  560         """
  561         raise NotImplementedError
  562 
  563 
  564 @six.add_metaclass(abc.ABCMeta)
  565 class Claim(ControllerBase):
  566 
  567     @abc.abstractmethod
  568     def get(self, queue, claim_id, project=None):
  569         """Base method for getting a claim.
  570 
  571         :param queue: Name of the queue this
  572             claim belongs to.
  573         :param claim_id: The claim id
  574         :param project: Project id
  575 
  576         :returns: (Claim's metadata, claimed messages)
  577         :raises DoesNotExist: if claimed messages can not be got
  578         """
  579         raise NotImplementedError
  580 
  581     @abc.abstractmethod
  582     def create(self, queue, metadata, project=None,
  583                limit=DEFAULT_MESSAGES_PER_CLAIM):
  584         """Base method for creating a claim.
  585 
  586         :param queue: Name of the queue this
  587             claim belongs to.
  588         :param metadata: Claim's parameters
  589             to be stored.
  590         :param project: Project id
  591         :param limit: (Default 10) Max number
  592             of messages to claim.
  593 
  594         :returns: (Claim ID, claimed messages)
  595         """
  596         raise NotImplementedError
  597 
  598     @abc.abstractmethod
  599     def update(self, queue, claim_id, metadata, project=None):
  600         """Base method for updating a claim.
  601 
  602         :param queue: Name of the queue this
  603             claim belongs to.
  604         :param claim_id: Claim to be updated
  605         :param metadata: Claim's parameters
  606             to be updated.
  607         :param project: Project id
  608         """
  609         raise NotImplementedError
  610 
  611     @abc.abstractmethod
  612     def delete(self, queue, claim_id, project=None):
  613         """Base method for deleting a claim.
  614 
  615         :param queue: Name of the queue this
  616             claim belongs to.
  617         :param claim_id: Claim to be deleted
  618         :param project: Project id
  619         """
  620         raise NotImplementedError
  621 
  622 
  623 @six.add_metaclass(abc.ABCMeta)
  624 class Subscription(ControllerBase):
  625     """This class is responsible for managing subscriptions of notification.
  626 
  627     """
  628 
  629     @abc.abstractmethod
  630     def list(self, queue, project=None, marker=None,
  631              limit=DEFAULT_SUBSCRIPTIONS_PER_PAGE):
  632         """Base method for listing subscriptions.
  633 
  634         :param queue: Name of the queue to get the subscriptions from.
  635         :type queue: six.text_type
  636         :param project: Project this subscription belongs to.
  637         :type project: six.text_type
  638         :param marker: used to determine which subscription to start with
  639         :type marker: six.text_type
  640         :param limit: (Default 10) Max number of results to return
  641         :type limit: int
  642         :returns: An iterator giving a sequence of subscriptions
  643             and the marker of the next page.
  644         :rtype: [{}]
  645         """
  646         raise NotImplementedError
  647 
  648     @abc.abstractmethod
  649     def get(self, queue, subscription_id, project=None):
  650         """Returns a single subscription entry.
  651 
  652         :param queue: Name of the queue subscription belongs to.
  653         :type queue: six.text_type
  654         :param subscription_id: ID of this subscription
  655         :type subscription_id: six.text_type
  656         :param project: Project this subscription belongs to.
  657         :type project: six.text_type
  658         :returns: Dictionary containing subscription data
  659         :rtype: {}
  660         :raises SubscriptionDoesNotExist: if not found
  661         """
  662         raise NotImplementedError
  663 
  664     @abc.abstractmethod
  665     def create(self, queue, subscriber, ttl, options, project=None):
  666         """Create a new subscription.
  667 
  668         :param queue:The source queue for notifications
  669         :type queue: six.text_type
  670         :param subscriber: The subscriber URI
  671         :type subscriber: six.text_type
  672         :param ttl: time to live for this subscription
  673         :type ttl: int
  674         :param options: Options used to configure this subscription
  675         :type options: dict
  676         :param project: Project id
  677         :type project: six.text_type
  678         :returns: True if a subscription was created and False
  679         if it is failed.
  680         :rtype: boolean
  681         """
  682         raise NotImplementedError
  683 
  684     @abc.abstractmethod
  685     def update(self, queue, subscription_id, project=None, **kwargs):
  686         """Updates the weight, uris, and/or options of this subscription
  687 
  688         :param queue: Name of the queue subscription belongs to.
  689         :type queue: six.text_type
  690         :param name: ID of the subscription
  691         :type name: text
  692         :param kwargs: one of: `source`, `subscriber`, `ttl`, `options`
  693         :type kwargs: dict
  694         :raises SubscriptionDoesNotExist: if not found
  695         :raises SubscriptionAlreadyExists: if attempt to update in a way to
  696             create duplicate subscription
  697         """
  698 
  699         raise NotImplementedError
  700 
  701     @abc.abstractmethod
  702     def exists(self, queue, subscription_id, project=None):
  703         """Base method for testing subscription existence.
  704 
  705         :param queue: Name of the queue subscription belongs to.
  706         :type queue: six.text_type
  707         :param subscription_id: ID of subscription
  708         :type subscription_id: six.text_type
  709         :param project: Project id
  710         :type project: six.text_type
  711         :returns: True if a subscription exists and False
  712             if it does not.
  713         """
  714         raise NotImplementedError
  715 
  716     @abc.abstractmethod
  717     def delete(self, queue, subscription_id, project=None):
  718         """Base method for deleting a subscription.
  719 
  720         :param queue: Name of the queue subscription belongs to.
  721         :type queue: six.text_type
  722         :param subscription_id: ID of the subscription to be deleted.
  723         :type subscription_id: six.text_type
  724         :param project: Project id
  725         :type project: six.text_type
  726         """
  727         raise NotImplementedError
  728 
  729     @abc.abstractmethod
  730     def get_with_subscriber(self, queue, subscriber, project=None):
  731         """Base method for get a subscription with the subscriber.
  732 
  733         :param queue: Name of the queue subscription belongs to.
  734         :type queue: six.text_type
  735         :param subscriber: link of the subscription to be notified.
  736         :type subscriber: six.text_type
  737         :param project: Project id
  738         :type project: six.text_type
  739         :returns: Dictionary containing subscription data
  740         :rtype: dict
  741         """
  742         raise NotImplementedError
  743 
  744     @abc.abstractmethod
  745     def confirm(self, queue, subscription_id, project=None, confirmed=True):
  746         """Base method for confirming a subscription.
  747 
  748         :param queue: Name of the queue subscription belongs to.
  749         :type queue: six.text_type
  750         :param subscription_id: ID of the subscription to be deleted.
  751         :type subscription_id: six.text_type
  752         :param project: Project id
  753         :type project: six.text_type
  754         :param confirmed: Confirm a subscription or cancel the confirmation of
  755             a subscription.
  756         :type confirmed: boolean
  757         """
  758         raise NotImplementedError
  759 
  760 
  761 @six.add_metaclass(abc.ABCMeta)
  762 class PoolsBase(ControllerBase):
  763     """A controller for managing pools."""
  764 
  765     def _check_capabilities(self, uri, flavor=None, name=None):
  766         default_store = self.driver.conf.drivers.message_store
  767         pool_caps = self.capabilities(flavor=flavor, name=name)
  768 
  769         if not pool_caps:
  770             return True
  771 
  772         new_store = utils.load_storage_impl(uri,
  773                                             default_store=default_store)
  774 
  775         # NOTE(flaper87): Since all pools in a pool flavor
  776         # are assumed to have the same capabilities, it's
  777         # fine to check against just 1
  778         return pool_caps == new_store.BASE_CAPABILITIES
  779 
  780     def capabilities(self, flavor=None, name=None):
  781         """Gets the set of capabilities for this flavor/name
  782 
  783         :param flavor: The pool flavor to get capabilities for
  784         :type flavor: six.text_type
  785         :param name: The pool name to get capabilities for
  786         :type name: six.text_type
  787         """
  788         pllt = []
  789         if name:
  790             pool = self.get(name)
  791             pllt.append(pool)
  792         else:
  793             pllt = list(self._get_pools_by_flavor(flavor))
  794 
  795         if not len(pllt) > 0:
  796             return ()
  797 
  798         default_store = self.driver.conf.drivers.message_store
  799 
  800         pool_store = utils.load_storage_impl(pllt[0]['uri'],
  801                                              default_store=default_store)
  802 
  803         return pool_store.BASE_CAPABILITIES
  804 
  805     def list(self, marker=None, limit=DEFAULT_POOLS_PER_PAGE,
  806              detailed=False):
  807         """Lists all registered pools.
  808 
  809         :param marker: used to determine which pool to start with
  810         :type marker: six.text_type
  811         :param limit: (Default 10) Max number of results to return
  812         :type limit: int
  813         :param detailed: whether to include options
  814         :type detailed: bool
  815         :returns: A list of pools - name, weight, uri
  816         :rtype: [{}]
  817         """
  818 
  819         return self._list(marker, limit, detailed)
  820 
  821     _list = abc.abstractmethod(lambda x: None)
  822 
  823     def create(self, name, weight, uri, flavor=None, options=None):
  824         """Registers a pool entry.
  825 
  826         :param name: The name of this pool
  827         :type name: six.text_type
  828         :param weight: the likelihood that this pool will be used
  829         :type weight: int
  830         :param uri: A URI that can be used by a storage client
  831             (e.g., pymongo) to access this pool.
  832         :type uri: six.text_type
  833         :param flavor: The flavor of this pool
  834         :type flavor: six.text_type
  835         :param options: Options used to configure this pool
  836         :type options: dict
  837         """
  838         flavor_obj = {}
  839         if flavor is not None:
  840             flavor_obj["name"] = flavor
  841         if not self._check_capabilities(uri, flavor=flavor_obj):
  842             raise errors.PoolCapabilitiesMismatch()
  843 
  844         return self._create(name, weight, uri, flavor, options)
  845 
  846     _create = abc.abstractmethod(lambda x: None)
  847 
  848     def get_pools_by_flavor(self, flavor=None, detailed=False):
  849         """Returns a pool list filtered by given pool flavor.
  850 
  851         :param flavor: The flavor to filter on. `None` returns
  852             pools that are not assigned to any pool flavor.
  853         :type flavor: six.text_type
  854         :param detailed: Should the options data be included?
  855         :type detailed: bool
  856         :returns: weight, uri, and options for this pool
  857         :rtype: {}
  858         :raises PoolDoesNotExist: if not found
  859         """
  860         return self._get_pools_by_flavor(flavor, detailed)
  861 
  862     _get_pools_by_flavor = abc.abstractmethod(lambda x: None)
  863 
  864     def get(self, name, detailed=False):
  865         """Returns a single pool entry.
  866 
  867         :param name: The name of this pool
  868         :type name: six.text_type
  869         :param detailed: Should the options data be included?
  870         :type detailed: bool
  871         :returns: weight, uri, and options for this pool
  872         :rtype: {}
  873         :raises PoolDoesNotExist: if not found
  874         """
  875         return self._get(name, detailed)
  876 
  877     _get = abc.abstractmethod(lambda x: None)
  878 
  879     def exists(self, name):
  880         """Returns a single pool entry.
  881 
  882         :param name: The name of this pool
  883         :type name: six.text_type
  884         :returns: True if the pool exists
  885         :rtype: bool
  886         """
  887         return self._exists(name)
  888 
  889     _exists = abc.abstractmethod(lambda x: None)
  890 
  891     def delete(self, name):
  892         """Removes a pool entry.
  893 
  894         :param name: The name of this pool
  895         :type name: six.text_type
  896         :rtype: None
  897         """
  898         return self._delete(name)
  899 
  900     _delete = abc.abstractmethod(lambda x: None)
  901 
  902     def update(self, name, **kwargs):
  903         """Updates the weight, uris, and/or options of this pool
  904 
  905         :param name: Name of the pool
  906         :type name: text
  907         :param kwargs: one of: `uri`, `weight`, `options`
  908         :type kwargs: dict
  909         :raises PoolDoesNotExist: if not found
  910         """
  911         uri = kwargs.get('uri')
  912         if uri and not self._check_capabilities(uri, name=name):
  913             raise errors.PoolCapabilitiesMismatch()
  914 
  915         return self._update(name, **kwargs)
  916 
  917     _update = abc.abstractmethod(lambda x: None)
  918 
  919     def drop_all(self):
  920         """Deletes all pools from storage."""
  921         return self._drop_all()
  922 
  923     _drop_all = abc.abstractmethod(lambda x: None)
  924 
  925 
  926 @six.add_metaclass(abc.ABCMeta)
  927 class CatalogueBase(ControllerBase):
  928     """A controller for managing the catalogue.
  929 
  930     The catalogue is responsible for maintaining a mapping
  931     between project.queue entries to their pool.
  932     """
  933 
  934     @abc.abstractmethod
  935     def list(self, project):
  936         """Get a list of queues from the catalogue.
  937 
  938         :param project: The project to use when filtering through queue
  939                         entries.
  940         :type project: six.text_type
  941         :returns: [{'project': ..., 'queue': ..., 'pool': ...},]
  942         :rtype: [dict]
  943         """
  944 
  945         raise NotImplementedError
  946 
  947     @abc.abstractmethod
  948     def get(self, project, queue):
  949         """Returns the pool identifier for the given queue.
  950 
  951         :param project: Namespace to search for the given queue
  952         :type project: six.text_type
  953         :param queue: The name of the queue to search for
  954         :type queue: six.text_type
  955         :returns: {'pool': ...}
  956         :rtype: dict
  957         :raises QueueNotMapped: if queue is not mapped
  958         """
  959 
  960         raise NotImplementedError
  961 
  962     @abc.abstractmethod
  963     def exists(self, project, queue):
  964         """Determines whether the given queue exists under project.
  965 
  966         :param project: Namespace to check.
  967         :type project: six.text_type
  968         :param queue: str - Particular queue to check for
  969         :type queue: six.text_type
  970         :return: True if the queue exists under this project
  971         :rtype: bool
  972         """
  973 
  974     @abc.abstractmethod
  975     def insert(self, project, queue, pool):
  976         """Creates a new catalogue entry, or updates it if it already exists.
  977 
  978         :param project: str - Namespace to insert the given queue into
  979         :type project: six.text_type
  980         :param queue: str - The name of the queue to insert
  981         :type queue: six.text_type
  982         :param pool: pool identifier to associate this queue with
  983         :type pool: six.text_type
  984         """
  985 
  986         raise NotImplementedError
  987 
  988     @abc.abstractmethod
  989     def delete(self, project, queue):
  990         """Removes this entry from the catalogue.
  991 
  992         :param project: The namespace to search for this queue
  993         :type project: six.text_type
  994         :param queue: The queue name to remove
  995         :type queue: six.text_type
  996         """
  997 
  998         raise NotImplementedError
  999 
 1000     @abc.abstractmethod
 1001     def update(self, project, queue, pools=None):
 1002         """Updates the pool identifier for this queue.
 1003 
 1004         :param project: Namespace to search
 1005         :type project: six.text_type
 1006         :param queue: The name of the queue
 1007         :type queue: six.text_type
 1008         :param pools: The name of the pool where this project/queue lives.
 1009         :type pools: six.text_type
 1010         :raises QueueNotMapped: if queue is not mapped
 1011         """
 1012 
 1013         raise NotImplementedError
 1014 
 1015     @abc.abstractmethod
 1016     def drop_all(self):
 1017         """Drops all catalogue entries from storage."""
 1018 
 1019         raise NotImplementedError
 1020 
 1021 
 1022 @six.add_metaclass(abc.ABCMeta)
 1023 class FlavorsBase(ControllerBase):
 1024     """A controller for managing flavors."""
 1025 
 1026     @abc.abstractmethod
 1027     def list(self, project=None, marker=None, limit=10):
 1028         """Lists all registered flavors.
 1029 
 1030         :param project: Project this flavor belongs to.
 1031         :type project: six.text_type
 1032         :param marker: used to determine which flavor to start with
 1033         :type marker: six.text_type
 1034         :param limit: (Default 10) Max number of results to return
 1035         :type limit: int
 1036         :returns: A list of flavors - name, project, flavor
 1037         :rtype: [{}]
 1038         """
 1039 
 1040         raise NotImplementedError
 1041 
 1042     @abc.abstractmethod
 1043     def create(self, name, project=None, capabilities=None):
 1044         """Registers a flavor entry.
 1045 
 1046         :param name: The name of this flavor
 1047         :type name: six.text_type
 1048         :param project: Project this flavor belongs to.
 1049         :type project: six.text_type
 1050         :param pool: The name of the pool to use for this flavor.
 1051         :type pool: six.text_type
 1052         :param capabilities: Flavor capabilities
 1053         :type capabilities: dict
 1054         """
 1055 
 1056         raise NotImplementedError
 1057 
 1058     @abc.abstractmethod
 1059     def get(self, name, project=None):
 1060         """Returns a single flavor entry.
 1061 
 1062         :param name: The name of this flavor
 1063         :type name: six.text_type
 1064         :param project: Project this flavor belongs to.
 1065         :type project: six.text_type
 1066         :rtype: {}
 1067         :raises FlavorDoesNotExist: if not found
 1068         """
 1069 
 1070         raise NotImplementedError
 1071 
 1072     @abc.abstractmethod
 1073     def exists(self, name, project=None):
 1074         """Verifies whether the flavor exists.
 1075 
 1076         :param name: The name of this flavor
 1077         :type name: six.text_type
 1078         :param project: Project this flavor belongs to.
 1079         :type project: six.text_type
 1080         :returns: True if the flavor exists
 1081         :rtype: bool
 1082         """
 1083 
 1084         raise NotImplementedError
 1085 
 1086     @abc.abstractmethod
 1087     def delete(self, name, project=None):
 1088         """Removes a flavor entry.
 1089 
 1090         :param name: The name of this flavor
 1091         :type name: six.text_type
 1092         :param project: Project this flavor belongs to.
 1093         :type project: six.text_type
 1094         :rtype: None
 1095         """
 1096 
 1097         raise NotImplementedError
 1098 
 1099     @abc.abstractmethod
 1100     def update(self, name, project=None, **kwargs):
 1101         """Updates the flavor and/or capabilities of this flavor
 1102 
 1103         :param name: Name of the flavor
 1104         :type name: text
 1105         :param project: Project this flavor belongs to.
 1106         :type project: six.text_type
 1107         :param kwargs: one of: `uri`, `weight`, `options`
 1108         :type kwargs: dict
 1109         :raises FlavorDoesNotExist: if not found
 1110         """
 1111 
 1112         raise NotImplementedError
 1113 
 1114     @abc.abstractmethod
 1115     def drop_all(self):
 1116         """Deletes all flavors from storage."""
 1117 
 1118         raise NotImplementedError
 1119 
 1120 
 1121 @six.add_metaclass(abc.ABCMeta)
 1122 class Topic(ControllerBase):
 1123     """This class is responsible for managing topics.
 1124 
 1125     Topic operations include CRUD, etc.
 1126 
 1127     Storage driver implementations of this class should
 1128     be capable of handling high workloads and huge
 1129     numbers of topics.
 1130     """
 1131 
 1132     def list(self, project=None, kfilter={}, marker=None,
 1133              limit=DEFAULT_TOPICS_PER_PAGE, detailed=False, name=None):
 1134         """Base method for listing topics.
 1135 
 1136         :param project: Project id
 1137         :param kfilter: The key-value of metadata which user want to filter
 1138         :param marker: The last topic name
 1139         :param limit: (Default 10) Max number of topics to return
 1140         :param detailed: Whether metadata is included
 1141         :param name: The topic name which user want to filter
 1142 
 1143         :returns: An iterator giving a sequence of topics
 1144             and the marker of the next page.
 1145         """
 1146         return self._list(project, kfilter, marker, limit, detailed, name)
 1147 
 1148     _list = abc.abstractmethod(lambda x: None)
 1149 
 1150     def get(self, name, project=None):
 1151         """Base method for topic metadata retrieval.
 1152 
 1153         :param name: The topic name
 1154         :param project: Project id
 1155 
 1156         :returns: Dictionary containing topic metadata
 1157         :raises DoesNotExist: if topic metadata does not exist
 1158         """
 1159         return self._get(name, project)
 1160 
 1161     _get = abc.abstractmethod(lambda x: None)
 1162 
 1163     def get_metadata(self, name, project=None):
 1164         """Base method for topic metadata retrieval.
 1165 
 1166         :param name: The topic name
 1167         :param project: Project id
 1168 
 1169         :returns: Dictionary containing topic metadata
 1170         :raises DoesNotExist: if topic metadata does not exist
 1171         """
 1172         raise NotImplementedError
 1173 
 1174     def set_metadata(self, name, metadata, project=None):
 1175         """Base method for updating a topic metadata.
 1176 
 1177         :param name: The topic name
 1178         :param metadata: Topic metadata as a dict
 1179         :param project: Project id
 1180         :raises DoesNotExist: if topic metadata can not be updated
 1181         """
 1182         raise NotImplementedError
 1183 
 1184     def create(self, name, metadata=None, project=None):
 1185         """Base method for topic creation.
 1186 
 1187         :param name: The topic name
 1188         :param project: Project id
 1189         :returns: True if a topic was created and False
 1190             if it was updated.
 1191         """
 1192         return self._create(name, metadata, project)
 1193 
 1194     _create = abc.abstractmethod(lambda x: None)
 1195 
 1196     def exists(self, name, project=None):
 1197         """Base method for testing topic existence.
 1198 
 1199         :param name: The topic name
 1200         :param project: Project id
 1201         :returns: True if a topic exists and False
 1202             if it does not.
 1203         """
 1204         return self._exists(name, project)
 1205 
 1206     _exists = abc.abstractmethod(lambda x: None)
 1207 
 1208     def delete(self, name, project=None):
 1209         """Base method for deleting a topic.
 1210 
 1211         :param name: The topic name
 1212         :param project: Project id
 1213         """
 1214         return self._delete(name, project)
 1215 
 1216     _delete = abc.abstractmethod(lambda x: None)
 1217 
 1218     def stats(self, name, project=None):
 1219         """Base method for topic stats.
 1220 
 1221         :param name: The topic name
 1222         :param project: Project id
 1223         :returns: Dictionary with the
 1224             queue stats
 1225         """
 1226         return self._stats(name, project)
 1227 
 1228     _stats = abc.abstractmethod(lambda x: None)