"Fossies" - the Fresh Open Source Software Archive

Member "zaqar-7.0.0/zaqar/tests/functional/base.py" (30 Aug 2018, 14573 Bytes) of package /linux/misc/openstack/zaqar-7.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "base.py": 5.0.0_vs_7.0.0.

    1 # Copyright (c) 2013 Rackspace, Inc.
    2 # Copyright (c) 2013 Red Hat, Inc.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License");
    5 # you may not use this file except in compliance with the License.
    6 # You may obtain a copy of the License at
    7 #
    8 #    http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS,
   12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   13 # implied.
   14 # See the License for the specific language governing permissions and
   15 # limitations under the License.
   16 
   17 import abc
   18 import multiprocessing
   19 import os
   20 
   21 import jsonschema
   22 from oslo_utils import timeutils
   23 import six
   24 
   25 from zaqar.api.v1 import response as response_v1
   26 from zaqar.api.v1_1 import response as response_v1_1
   27 from zaqar.api.v2 import response as response_v2
   28 from zaqar import bootstrap
   29 from zaqar.storage import mongodb
   30 from zaqar.storage.redis import driver as redis
   31 from zaqar import tests as testing
   32 from zaqar.tests.functional import config
   33 from zaqar.tests.functional import helpers
   34 from zaqar.tests.functional import http
   35 from zaqar.tests import helpers as base_helpers
   36 from zaqar.transport import base as transport_base
   37 # TODO(flaper87): This is necessary to register,
   38 # wsgi configs and won't be permanent. It'll be
   39 # refactored as part of the work for this blueprint
   40 from zaqar.transport import validation
   41 from zaqar.transport import wsgi  # noqa
   42 
   43 # TODO(kgriffs): Run functional tests to a devstack gate job and
   44 # set this using an environment variable or something.
   45 #
   46 # TODO(kgriffs): Find a more general way to do this; we seem to be
   47 # using this environ flag pattern over and over againg.
   48 _TEST_INTEGRATION = os.environ.get('ZAQAR_TEST_INTEGRATION') is not None
   49 
   50 
   51 class FunctionalTestBase(testing.TestBase):
   52 
   53     server = None
   54     server_class = None
   55     config_file = None
   56     class_bootstrap = None
   57     # NOTE(Eva-i): ttl_gc_interval is the known maximum time interval between
   58     # automatic resource TTL expirations. Depends on message store back end.
   59     class_ttl_gc_interval = None
   60     wipe_dbs_projects = set([])
   61 
   62     def setUp(self):
   63         super(FunctionalTestBase, self).setUp()
   64         # NOTE(flaper87): Config can't be a class
   65         # attribute because it may be necessary to
   66         # modify it at runtime which will affect
   67         # other instances running instances.
   68         self.cfg = config.load_config()
   69 
   70         if not self.cfg.run_tests:
   71             self.skipTest("Functional tests disabled")
   72 
   73         config_file = self.config_file or self.cfg.zaqar.config
   74 
   75         config_file = base_helpers.override_mongo_conf(config_file, self)
   76 
   77         self.mconf = self.load_conf(config_file)
   78 
   79         validator = validation.Validator(self.mconf)
   80         self.limits = validator._limits_conf
   81 
   82         self.resource_defaults = transport_base.ResourceDefaults(self.mconf)
   83 
   84         # Always register options
   85         self.__class__.class_bootstrap = bootstrap.Bootstrap(self.mconf)
   86         self.class_bootstrap.transport
   87 
   88         datadriver = self.class_bootstrap.storage._storage
   89         if isinstance(datadriver, redis.DataDriver):
   90             self.__class__.class_ttl_gc_interval = 1
   91         if isinstance(datadriver, mongodb.DataDriver):
   92             # NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
   93             self.__class__.class_ttl_gc_interval = 60
   94 
   95         if _TEST_INTEGRATION:
   96             if not (self.server and self.server.is_alive()):
   97                 self.server = self.server_class()
   98                 self.server.start(self.mconf)
   99                 self.addCleanup(self.server.process.terminate)
  100 
  101             self.client = http.Client()
  102         else:
  103             if self.server_class == ZaqarAdminServer:
  104                 self.mconf.pooling = True
  105                 self.mconf.admin_mode = True
  106 
  107             self.addCleanup(self.class_bootstrap.storage.close)
  108             self.addCleanup(self.class_bootstrap.control.close)
  109             self.client = http.WSGIClient(self.class_bootstrap.transport.app)
  110 
  111         self.headers = helpers.create_zaqar_headers(self.cfg)
  112 
  113         self.headers_response_with_body = {'location', 'content-type'}
  114 
  115         self.client.set_headers(self.headers)
  116 
  117         # Store information required for cleaning databases after
  118         # execution of test class
  119         self.wipe_dbs_projects.add(self.headers["X-Project-ID"])
  120 
  121     def tearDown(self):
  122         super(FunctionalTestBase, self).tearDown()
  123         # Project might has changed during test case execution.
  124         # Lets add it again to the set.
  125         self.wipe_dbs_projects.add(self.headers["X-Project-ID"])
  126 
  127     @staticmethod
  128     def _if_mongo_datadriver_drop_dbs(driver):
  129         """Drops MongoDB datadriver's databases.
  130 
  131         :param driver: instance of zaqar.storage.mongodb.driver.DataDriver
  132         """
  133         if not isinstance(driver, mongodb.DataDriver):
  134             return
  135         for db in driver.message_databases:
  136             driver.connection.drop_database(db)
  137         subscription_db = driver.subscriptions_database
  138         driver.connection.drop_database(subscription_db)
  139 
  140     @staticmethod
  141     def _if_mongo_controldriver_drop_dbs(driver):
  142         """Drops all MongoDB controldriver's databases.
  143 
  144         :param driver: instance of zaqar.storage.mongodb.driver.ControlDriver
  145         """
  146         if not isinstance(driver, mongodb.ControlDriver):
  147             return
  148         driver.connection.drop_database(driver.database)
  149         driver.connection.drop_database(driver.queues_database)
  150 
  151     @classmethod
  152     def _pooling_drop_dbs_by_project(cls, xproject):
  153         """Finds all pool drivers by project, drops all their databases.
  154 
  155         Assumes that pooling is enabled.
  156 
  157         :param xproject: project name to use for pool drivers search
  158         """
  159         datadriver = cls.class_bootstrap.storage._storage
  160         controldriver = cls.class_bootstrap.control
  161         # Let's get list of all queues by project
  162         queue_generator = controldriver.queue_controller.list(project=xproject)
  163         queues = list(next(queue_generator))
  164         # Let's extract all queue names from the list of queues
  165         queue_names = [q['name'] for q in queues]
  166         # Finally let's use queues names to get each one of pool datadrivers
  167         catalog = datadriver._pool_catalog
  168         for queue_name in queue_names:
  169             pool_pipe_driver = catalog.lookup(queue_name, project=xproject)
  170             pool_datadriver = pool_pipe_driver._storage
  171             if pool_datadriver is not None:
  172                 # Let's delete the queue, so the next invocation of
  173                 # pooling_catalog.lookup() will not recreate pool driver
  174                 controldriver.queue_controller.delete(queue_name)
  175                 # Let's drop pool's databases
  176                 cls._if_mongo_datadriver_drop_dbs(pool_datadriver)
  177 
  178     @classmethod
  179     def tearDownClass(cls):
  180         """Cleans up after test class execution.
  181 
  182         Drops all databases left.
  183         Closes connections to databases.
  184         """
  185         # Bootstrap can be None if all test cases were skipped, so nothing to
  186         # clean
  187         if cls.class_bootstrap is None:
  188             return
  189 
  190         datadriver = cls.class_bootstrap.storage._storage
  191         controldriver = cls.class_bootstrap.control
  192 
  193         if cls.class_bootstrap.conf.pooling:
  194             # Pooling detected, let's drop pooling-specific databases
  195             for p in cls.wipe_dbs_projects:
  196                 # This will find all pool databases by project and drop them
  197                 cls._pooling_drop_dbs_by_project(p)
  198             controldriver.pools_controller.drop_all()
  199             controldriver.flavors_controller.drop_all()
  200         else:
  201             # No pooling detected, let's just drop datadriver's databases
  202             cls._if_mongo_datadriver_drop_dbs(datadriver)
  203 
  204         cls.class_bootstrap.storage.close()
  205 
  206         # Let's drop controldriver's databases
  207         controldriver.catalogue_controller.drop_all()
  208         cls._if_mongo_controldriver_drop_dbs(controldriver)
  209 
  210         controldriver.close()
  211 
  212     def assertIsSubset(self, required_values, actual_values):
  213         """Checks if a list is subset of another.
  214 
  215         :param required_values: superset list.
  216         :param required_values: subset list.
  217         """
  218 
  219         form = 'Missing Header(s) - {0}'
  220         self.assertTrue(required_values.issubset(actual_values),
  221                         msg=form.format((required_values - actual_values)))
  222 
  223     def assertMessageCount(self, actualCount, expectedCount):
  224         """Checks if number of messages returned <= limit
  225 
  226         :param expectedCount: limit value passed in the url (OR) default(10).
  227         :param actualCount: number of messages returned in the API response.
  228         """
  229         msg = ('More Messages returned than allowed: expected count = {0}'
  230                ', actual count = {1}'.format(expectedCount, actualCount))
  231         self.assertLessEqual(actualCount, expectedCount, msg)
  232 
  233     def assertQueueStats(self, result_json, claimed):
  234         """Checks the Queue Stats results
  235 
  236         :param result_json: json response returned for Queue Stats.
  237         :param claimed: expected number of claimed messages.
  238         """
  239         total = self.limits.max_messages_per_claim_or_pop
  240         free = total - claimed
  241 
  242         self.assertEqual(claimed, result_json['messages']['claimed'])
  243         self.assertEqual(free, result_json['messages']['free'])
  244         self.assertEqual(total, result_json['messages']['total'])
  245 
  246         if 'oldest' in result_json['messages']:
  247             oldest_message = result_json['messages']['oldest']
  248             self.verify_message_stats(oldest_message)
  249 
  250             newest_message = result_json['messages']['newest']
  251             self.verify_message_stats(newest_message)
  252 
  253     def assertSchema(self, response, expectedSchemaName):
  254         """Compares the json response with the expected schema
  255 
  256         :param response: response json returned by the API.
  257         :type response: dict
  258         :param expectedSchema: expected schema definition for response.
  259         :type expectedSchema: string
  260         """
  261         try:
  262             expectedSchema = self.response.get_schema(expectedSchemaName)
  263             jsonschema.validate(response, expectedSchema)
  264         except jsonschema.ValidationError as message:
  265             assert False, message
  266 
  267     def verify_message_stats(self, message):
  268         """Verifies the oldest & newest message stats
  269 
  270         :param message: oldest (or) newest message returned by
  271                         queue_name/stats.
  272         """
  273         expected_keys = ['age', 'created', 'href']
  274 
  275         response_keys = message.keys()
  276         response_keys = sorted(response_keys)
  277         self.assertEqual(expected_keys, response_keys)
  278 
  279         # Verify that age has valid values
  280         age = message['age']
  281         msg = 'Invalid Age {0}'.format(age)
  282         self.assertLessEqual(0, age, msg)
  283         self.assertLessEqual(age, self.limits.max_message_ttl, msg)
  284 
  285         # Verify that GET on href returns 200
  286         path = message['href']
  287         result = self.client.get(path)
  288         self.assertEqual(200, result.status_code)
  289 
  290         # Verify that created time falls within the last 10 minutes
  291         # NOTE(malini): The messages are created during the test.
  292         created_time = message['created']
  293         created_time = timeutils.normalize_time(
  294             timeutils.parse_isotime(created_time))
  295         now = timeutils.utcnow()
  296 
  297         delta = timeutils.delta_seconds(before=created_time, after=now)
  298         # NOTE(malini): The 'int()' below is a work around  for the small time
  299         # difference between julianday & UTC.
  300         # (needed to pass this test on sqlite driver)
  301         delta = int(delta)
  302 
  303         msg = ('Invalid Time Delta {0}, Created time {1}, Now {2}'
  304                .format(delta, created_time, now))
  305         self.assertLessEqual(0, delta, msg)
  306         self.assertLessEqual(delta, 6000, msg)
  307 
  308 
  309 @six.add_metaclass(abc.ABCMeta)
  310 class Server(object):
  311 
  312     name = "zaqar-functional-test-server"
  313 
  314     def __init__(self):
  315         self.process = None
  316 
  317     @abc.abstractmethod
  318     def get_target(self, conf):
  319         """Prepares the target object
  320 
  321         This method is meant to initialize server's
  322         bootstrap and return a callable to run the
  323         server.
  324 
  325         :param conf: The config instance for the
  326             bootstrap class
  327         :returns: A callable object
  328         """
  329 
  330     def is_alive(self):
  331         """Returns True IFF the server is running."""
  332 
  333         if self.process is None:
  334             return False
  335 
  336         return self.process.is_alive()
  337 
  338     def start(self, conf):
  339         """Starts the server process.
  340 
  341         :param conf: The config instance to use for
  342             the new process
  343         :returns: A `multiprocessing.Process` instance
  344         """
  345 
  346         # TODO(flaper87): Re-use running instances.
  347         target = self.get_target(conf)
  348 
  349         if not callable(target):
  350             raise RuntimeError("Target not callable")
  351 
  352         self.process = multiprocessing.Process(target=target,
  353                                                name=self.name)
  354         self.process.daemon = True
  355         self.process.start()
  356 
  357         # NOTE(flaper87): Give it a second
  358         # to boot.
  359         self.process.join(1)
  360         return self.process
  361 
  362     def stop(self):
  363         """Terminates a process
  364 
  365         This method kills a process by
  366         calling `terminate`. Note that
  367         children of this process won't be
  368         terminated but become orphaned.
  369         """
  370         self.process.terminate()
  371 
  372 
  373 class ZaqarServer(Server):
  374 
  375     name = "zaqar-wsgiref-test-server"
  376 
  377     def get_target(self, conf):
  378         server = bootstrap.Bootstrap(conf)
  379         return server.run
  380 
  381 
  382 class ZaqarAdminServer(Server):
  383 
  384     name = "zaqar-admin-wsgiref-test-server"
  385 
  386     def get_target(self, conf):
  387         conf.admin_mode = True
  388         server = bootstrap.Bootstrap(conf)
  389         return server.run
  390 
  391 
  392 class V1FunctionalTestBase(FunctionalTestBase):
  393     def setUp(self):
  394         super(V1FunctionalTestBase, self).setUp()
  395         self.response = response_v1.ResponseSchema(self.limits)
  396 
  397 
  398 class V1_1FunctionalTestBase(FunctionalTestBase):
  399     def setUp(self):
  400         super(V1_1FunctionalTestBase, self).setUp()
  401         self.response = response_v1_1.ResponseSchema(self.limits)
  402 
  403 
  404 class V2FunctionalTestBase(FunctionalTestBase):
  405     def setUp(self):
  406         super(V2FunctionalTestBase, self).setUp()
  407         self.response = response_v2.ResponseSchema(self.limits)