"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/core.py" (13 May 2020, 36774 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "core.py": 16.0.1_vs_17.0.0.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 from __future__ import absolute_import
   16 import atexit
   17 import base64
   18 import contextlib
   19 import datetime
   20 import functools
   21 import hashlib
   22 import json
   23 import ldap
   24 import os
   25 import shutil
   26 import socket
   27 import sys
   28 import uuid
   29 import warnings
   30 
   31 import fixtures
   32 import flask
   33 from flask import testing as flask_testing
   34 import http.client
   35 from oslo_config import fixture as config_fixture
   36 from oslo_context import context as oslo_context
   37 from oslo_context import fixture as oslo_ctx_fixture
   38 from oslo_log import fixture as log_fixture
   39 from oslo_log import log
   40 from oslo_utils import timeutils
   41 from sqlalchemy import exc
   42 import testtools
   43 from testtools import testcase
   44 
   45 import keystone.api
   46 from keystone.common import context
   47 from keystone.common import json_home
   48 from keystone.common import provider_api
   49 from keystone.common import sql
   50 import keystone.conf
   51 from keystone import exception
   52 from keystone.identity.backends.ldap import common as ks_ldap
   53 from keystone import notifications
   54 from keystone.resource.backends import base as resource_base
   55 from keystone.server.flask import application as flask_app
   56 from keystone.server.flask import core as keystone_flask
   57 from keystone.tests.unit import ksfixtures
   58 
   59 
   60 keystone.conf.configure()
   61 keystone.conf.set_config_defaults()
   62 
   63 PID = str(os.getpid())
   64 TESTSDIR = os.path.dirname(os.path.abspath(__file__))
   65 TESTCONF = os.path.join(TESTSDIR, 'config_files')
   66 ROOTDIR = os.path.normpath(os.path.join(TESTSDIR, '..', '..', '..'))
   67 VENDOR = os.path.join(ROOTDIR, 'vendor')
   68 ETCDIR = os.path.join(ROOTDIR, 'etc')
   69 
   70 
   71 def _calc_tmpdir():
   72     env_val = os.environ.get('KEYSTONE_TEST_TEMP_DIR')
   73     if not env_val:
   74         return os.path.join(TESTSDIR, 'tmp', PID)
   75     return os.path.join(env_val, PID)
   76 
   77 
   78 TMPDIR = _calc_tmpdir()
   79 
   80 CONF = keystone.conf.CONF
   81 PROVIDERS = provider_api.ProviderAPIs
   82 log.register_options(CONF)
   83 
   84 IN_MEM_DB_CONN_STRING = 'sqlite://'
   85 
   86 # Strictly matches ISO 8601 timestamps with subsecond precision like:
   87 # 2016-06-28T20:48:56.000000Z
   88 TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
   89 TIME_FORMAT_REGEX = '^\d{4}-[0-1]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-5]\d\.\d{6}Z$'
   90 
   91 exception._FATAL_EXCEPTION_FORMAT_ERRORS = True
   92 os.makedirs(TMPDIR)
   93 atexit.register(shutil.rmtree, TMPDIR)
   94 
   95 
   96 class dirs(object):
   97     @staticmethod
   98     def root(*p):
   99         return os.path.join(ROOTDIR, *p)
  100 
  101     @staticmethod
  102     def etc(*p):
  103         return os.path.join(ETCDIR, *p)
  104 
  105     @staticmethod
  106     def tests(*p):
  107         return os.path.join(TESTSDIR, *p)
  108 
  109     @staticmethod
  110     def tmp(*p):
  111         return os.path.join(TMPDIR, *p)
  112 
  113     @staticmethod
  114     def tests_conf(*p):
  115         return os.path.join(TESTCONF, *p)
  116 
  117 
  118 @atexit.register
  119 def remove_test_databases():
  120     db = dirs.tmp('test.db')
  121     if os.path.exists(db):
  122         os.unlink(db)
  123     pristine = dirs.tmp('test.db.pristine')
  124     if os.path.exists(pristine):
  125         os.unlink(pristine)
  126 
  127 
  128 def skip_if_cache_disabled(*sections):
  129     """Skip a test if caching is disabled, this is a decorator.
  130 
  131     Caching can be disabled either globally or for a specific section.
  132 
  133     In the code fragment::
  134 
  135         @skip_if_cache_is_disabled('assignment', 'token')
  136         def test_method(*args):
  137             ...
  138 
  139     The method test_method would be skipped if caching is disabled globally via
  140     the `enabled` option in the `cache` section of the configuration or if
  141     the `caching` option is set to false in either `assignment` or `token`
  142     sections of the configuration.  This decorator can be used with no
  143     arguments to only check global caching.
  144 
  145     If a specified configuration section does not define the `caching` option,
  146     this decorator makes the same assumption as the `should_cache_fn` in
  147     keystone.common.cache that caching should be enabled.
  148 
  149     """
  150     def wrapper(f):
  151         @functools.wraps(f)
  152         def inner(*args, **kwargs):
  153             if not CONF.cache.enabled:
  154                 raise testcase.TestSkipped('Cache globally disabled.')
  155             for s in sections:
  156                 conf_sec = getattr(CONF, s, None)
  157                 if conf_sec is not None:
  158                     if not getattr(conf_sec, 'caching', True):
  159                         raise testcase.TestSkipped('%s caching disabled.' % s)
  160             return f(*args, **kwargs)
  161         return inner
  162     return wrapper
  163 
  164 
  165 def skip_if_cache_is_enabled(*sections):
  166     def wrapper(f):
  167         @functools.wraps(f)
  168         def inner(*args, **kwargs):
  169             if CONF.cache.enabled:
  170                 for s in sections:
  171                     conf_sec = getattr(CONF, s, None)
  172                     if conf_sec is not None:
  173                         if getattr(conf_sec, 'caching', True):
  174                             raise testcase.TestSkipped('%s caching enabled.' %
  175                                                        s)
  176             return f(*args, **kwargs)
  177         return inner
  178     return wrapper
  179 
  180 
  181 def skip_if_no_multiple_domains_support(f):
  182     """Decorator to skip tests for identity drivers limited to one domain."""
  183     @functools.wraps(f)
  184     def wrapper(*args, **kwargs):
  185         test_obj = args[0]
  186         if not test_obj.identity_api.multiple_domains_supported:
  187             raise testcase.TestSkipped('No multiple domains support')
  188         return f(*args, **kwargs)
  189     return wrapper
  190 
  191 
  192 class UnexpectedExit(Exception):
  193     pass
  194 
  195 
  196 def new_region_ref(parent_region_id=None, **kwargs):
  197     ref = {
  198         'id': uuid.uuid4().hex,
  199         'description': uuid.uuid4().hex,
  200         'parent_region_id': parent_region_id}
  201 
  202     ref.update(kwargs)
  203     return ref
  204 
  205 
  206 def new_service_ref(**kwargs):
  207     ref = {
  208         'id': uuid.uuid4().hex,
  209         'name': uuid.uuid4().hex,
  210         'description': uuid.uuid4().hex,
  211         'enabled': True,
  212         'type': uuid.uuid4().hex,
  213     }
  214     ref.update(kwargs)
  215     return ref
  216 
  217 
  218 NEEDS_REGION_ID = object()
  219 
  220 
  221 def new_endpoint_ref(service_id, interface='public',
  222                      region_id=NEEDS_REGION_ID, **kwargs):
  223 
  224     ref = {
  225         'id': uuid.uuid4().hex,
  226         'name': uuid.uuid4().hex,
  227         'description': uuid.uuid4().hex,
  228         'interface': interface,
  229         'service_id': service_id,
  230         'url': 'https://' + uuid.uuid4().hex + '.com',
  231     }
  232 
  233     if region_id is NEEDS_REGION_ID:
  234         ref['region_id'] = uuid.uuid4().hex
  235     elif region_id is None and kwargs.get('region') is not None:
  236         # pre-3.2 form endpoints are not supported by this function
  237         raise NotImplementedError("use new_endpoint_ref_with_region")
  238     else:
  239         ref['region_id'] = region_id
  240     ref.update(kwargs)
  241     return ref
  242 
  243 
  244 def new_endpoint_group_ref(filters, **kwargs):
  245     ref = {
  246         'id': uuid.uuid4().hex,
  247         'description': uuid.uuid4().hex,
  248         'filters': filters,
  249         'name': uuid.uuid4().hex
  250     }
  251     ref.update(kwargs)
  252     return ref
  253 
  254 
  255 def new_endpoint_ref_with_region(service_id, region, interface='public',
  256                                  **kwargs):
  257     """Define an endpoint_ref having a pre-3.2 form.
  258 
  259     Contains the deprecated 'region' instead of 'region_id'.
  260     """
  261     ref = new_endpoint_ref(service_id, interface, region=region,
  262                            region_id='invalid', **kwargs)
  263     del ref['region_id']
  264     return ref
  265 
  266 
  267 def new_domain_ref(**kwargs):
  268     ref = {
  269         'id': uuid.uuid4().hex,
  270         'name': uuid.uuid4().hex,
  271         'description': uuid.uuid4().hex,
  272         'enabled': True,
  273         'tags': [],
  274         'options': {}
  275     }
  276     ref.update(kwargs)
  277     return ref
  278 
  279 
  280 def new_project_ref(domain_id=None, is_domain=False, **kwargs):
  281     ref = {
  282         'id': uuid.uuid4().hex,
  283         'name': uuid.uuid4().hex,
  284         'description': uuid.uuid4().hex,
  285         'enabled': True,
  286         'domain_id': domain_id,
  287         'is_domain': is_domain,
  288         'tags': [],
  289         'options': {}
  290     }
  291     # NOTE(henry-nash): We don't include parent_id in the initial list above
  292     # since specifying it is optional depending on where the project sits in
  293     # the hierarchy (and a parent_id of None has meaning - i.e. it's a top
  294     # level project).
  295     ref.update(kwargs)
  296     return ref
  297 
  298 
  299 def new_user_ref(domain_id, project_id=None, **kwargs):
  300     ref = {
  301         'id': uuid.uuid4().hex,
  302         'name': uuid.uuid4().hex,
  303         'enabled': True,
  304         'domain_id': domain_id,
  305         'email': uuid.uuid4().hex,
  306         'password': uuid.uuid4().hex,
  307     }
  308     if project_id:
  309         ref['default_project_id'] = project_id
  310     ref.update(kwargs)
  311     return ref
  312 
  313 
  314 def new_federated_user_ref(idp_id=None, protocol_id=None, **kwargs):
  315     ref = {
  316         'idp_id': idp_id or 'ORG_IDP',
  317         'protocol_id': protocol_id or 'saml2',
  318         'unique_id': uuid.uuid4().hex,
  319         'display_name': uuid.uuid4().hex,
  320     }
  321     ref.update(kwargs)
  322     return ref
  323 
  324 
  325 def new_mapping_ref(mapping_id=None, rules=None, **kwargs):
  326     ref = {
  327         'id': mapping_id or uuid.uuid4().hex,
  328         'rules': rules or []
  329     }
  330     ref.update(kwargs)
  331     return ref
  332 
  333 
  334 def new_protocol_ref(protocol_id=None, idp_id=None, mapping_id=None, **kwargs):
  335     ref = {
  336         'id': protocol_id or 'saml2',
  337         'idp_id': idp_id or 'ORG_IDP',
  338         'mapping_id': mapping_id or uuid.uuid4().hex
  339     }
  340     ref.update(kwargs)
  341     return ref
  342 
  343 
  344 def new_identity_provider_ref(idp_id=None, **kwargs):
  345     ref = {
  346         'id': idp_id or 'ORG_IDP',
  347         'enabled': True,
  348         'description': '',
  349     }
  350     ref.update(kwargs)
  351     return ref
  352 
  353 
  354 def new_service_provider_ref(**kwargs):
  355     ref = {
  356         'auth_url': 'https://' + uuid.uuid4().hex + '.com',
  357         'enabled': True,
  358         'description': uuid.uuid4().hex,
  359         'sp_url': 'https://' + uuid.uuid4().hex + '.com',
  360         'relay_state_prefix': CONF.saml.relay_state_prefix
  361     }
  362     ref.update(kwargs)
  363     return ref
  364 
  365 
  366 def new_group_ref(domain_id, **kwargs):
  367     ref = {
  368         'id': uuid.uuid4().hex,
  369         'name': uuid.uuid4().hex,
  370         'description': uuid.uuid4().hex,
  371         'domain_id': domain_id
  372     }
  373     ref.update(kwargs)
  374     return ref
  375 
  376 
  377 def new_credential_ref(user_id, project_id=None, type='cert', **kwargs):
  378     ref = {
  379         'id': uuid.uuid4().hex,
  380         'user_id': user_id,
  381         'type': type,
  382     }
  383 
  384     if project_id:
  385         ref['project_id'] = project_id
  386     if 'blob' not in kwargs:
  387         ref['blob'] = uuid.uuid4().hex
  388 
  389     ref.update(kwargs)
  390     return ref
  391 
  392 
  393 def new_cert_credential(user_id, project_id=None, blob=None, **kwargs):
  394     if blob is None:
  395         blob = {'access': uuid.uuid4().hex, 'secret': uuid.uuid4().hex}
  396 
  397     credential = new_credential_ref(user_id=user_id,
  398                                     project_id=project_id,
  399                                     blob=json.dumps(blob),
  400                                     type='cert',
  401                                     **kwargs)
  402     return blob, credential
  403 
  404 
  405 def new_ec2_credential(user_id, project_id=None, blob=None, **kwargs):
  406     if blob is None:
  407         blob = {
  408             'access': uuid.uuid4().hex,
  409             'secret': uuid.uuid4().hex,
  410             'trust_id': None
  411         }
  412 
  413     if 'id' not in kwargs:
  414         access = blob['access'].encode('utf-8')
  415         kwargs['id'] = hashlib.sha256(access).hexdigest()
  416 
  417     credential = new_credential_ref(user_id=user_id,
  418                                     project_id=project_id,
  419                                     blob=json.dumps(blob),
  420                                     type='ec2',
  421                                     **kwargs)
  422     return blob, credential
  423 
  424 
  425 def new_totp_credential(user_id, project_id=None, blob=None):
  426     if not blob:
  427         # NOTE(notmorgan): 20 bytes of data from os.urandom for
  428         # a totp secret.
  429         blob = base64.b32encode(os.urandom(20)).decode('utf-8')
  430     credential = new_credential_ref(user_id=user_id,
  431                                     project_id=project_id,
  432                                     blob=blob,
  433                                     type='totp')
  434     return credential
  435 
  436 
  437 def new_application_credential_ref(roles=None,
  438                                    name=None,
  439                                    expires=None,
  440                                    secret=None):
  441     ref = {
  442         'id': uuid.uuid4().hex,
  443         'name': uuid.uuid4().hex,
  444         'description': uuid.uuid4().hex,
  445     }
  446     if roles:
  447         ref['roles'] = roles
  448     if secret:
  449         ref['secret'] = secret
  450 
  451     if isinstance(expires, str):
  452         ref['expires_at'] = expires
  453     elif isinstance(expires, dict):
  454         ref['expires_at'] = (
  455             timeutils.utcnow() + datetime.timedelta(**expires)
  456         ).strftime(TIME_FORMAT)
  457     elif expires is None:
  458         pass
  459     else:
  460         raise NotImplementedError('Unexpected value for "expires"')
  461 
  462     return ref
  463 
  464 
  465 def new_role_ref(**kwargs):
  466     ref = {
  467         'id': uuid.uuid4().hex,
  468         'name': uuid.uuid4().hex,
  469         'description': uuid.uuid4().hex,
  470         'domain_id': None,
  471         'options': {},
  472     }
  473     ref.update(kwargs)
  474     return ref
  475 
  476 
  477 def new_policy_ref(**kwargs):
  478     ref = {
  479         'id': uuid.uuid4().hex,
  480         'name': uuid.uuid4().hex,
  481         'description': uuid.uuid4().hex,
  482         'enabled': True,
  483         # Store serialized JSON data as the blob to mimic real world usage.
  484         'blob': json.dumps({'data': uuid.uuid4().hex, }),
  485         'type': uuid.uuid4().hex,
  486     }
  487 
  488     ref.update(kwargs)
  489     return ref
  490 
  491 
  492 def new_domain_config_ref(**kwargs):
  493     ref = {
  494         "identity": {
  495             "driver": "ldap"
  496         },
  497         "ldap": {
  498             "url": "ldap://myldap.com:389/",
  499             "user_tree_dn": "ou=Users,dc=my_new_root,dc=org"
  500         }
  501     }
  502     ref.update(kwargs)
  503     return ref
  504 
  505 
  506 def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
  507                   impersonation=None, expires=None, role_ids=None,
  508                   role_names=None, remaining_uses=None,
  509                   allow_redelegation=False, redelegation_count=None, **kwargs):
  510     ref = {
  511         'id': uuid.uuid4().hex,
  512         'trustor_user_id': trustor_user_id,
  513         'trustee_user_id': trustee_user_id,
  514         'impersonation': impersonation or False,
  515         'project_id': project_id,
  516         'remaining_uses': remaining_uses,
  517         'allow_redelegation': allow_redelegation,
  518     }
  519 
  520     if isinstance(redelegation_count, int):
  521         ref.update(redelegation_count=redelegation_count)
  522 
  523     if isinstance(expires, str):
  524         ref['expires_at'] = expires
  525     elif isinstance(expires, dict):
  526         ref['expires_at'] = (
  527             timeutils.utcnow() + datetime.timedelta(**expires)
  528         ).strftime(TIME_FORMAT)
  529     elif expires is None:
  530         pass
  531     else:
  532         raise NotImplementedError('Unexpected value for "expires"')
  533 
  534     role_ids = role_ids or []
  535     role_names = role_names or []
  536     if role_ids or role_names:
  537         ref['roles'] = []
  538         for role_id in role_ids:
  539             ref['roles'].append({'id': role_id})
  540         for role_name in role_names:
  541             ref['roles'].append({'name': role_name})
  542 
  543     ref.update(kwargs)
  544     return ref
  545 
  546 
  547 def new_registered_limit_ref(**kwargs):
  548     ref = {
  549         'service_id': uuid.uuid4().hex,
  550         'resource_name': uuid.uuid4().hex,
  551         'default_limit': 10,
  552         'description': uuid.uuid4().hex
  553     }
  554 
  555     ref.update(kwargs)
  556     return ref
  557 
  558 
  559 def new_limit_ref(**kwargs):
  560     ref = {
  561         'service_id': uuid.uuid4().hex,
  562         'resource_name': uuid.uuid4().hex,
  563         'resource_limit': 10,
  564         'description': uuid.uuid4().hex
  565     }
  566 
  567     ref.update(kwargs)
  568     return ref
  569 
  570 
  571 def create_user(api, domain_id, **kwargs):
  572     """Create a user via the API. Keep the created password.
  573 
  574     The password is saved and restored when api.create_user() is called.
  575     Only use this routine if there is a requirement for the user object to
  576     have a valid password after api.create_user() is called.
  577     """
  578     user = new_user_ref(domain_id=domain_id, **kwargs)
  579     password = user['password']
  580     user = api.create_user(user)
  581     user['password'] = password
  582     return user
  583 
  584 
  585 def _assert_expected_status(f):
  586     """Add `expected_status_code` as an argument to the test_client methods.
  587 
  588     `expected_status_code` must be passed as a kwarg.
  589     """
  590     TEAPOT_HTTP_STATUS = 418
  591 
  592     _default_expected_responses = {
  593         'get': http.client.OK,
  594         'head': http.client.OK,
  595         'post': http.client.CREATED,
  596         'put': http.client.NO_CONTENT,
  597         'patch': http.client.OK,
  598         'delete': http.client.NO_CONTENT,
  599     }
  600 
  601     @functools.wraps(f)
  602     def inner(*args, **kwargs):
  603         # Get the "expected_status_code" kwarg if supplied. If not supplied use
  604         # the `_default_expected_response` mapping, or fall through to
  605         # "HTTP OK" if the method is somehow unknown.
  606         expected_status_code = kwargs.pop(
  607             'expected_status_code',
  608             _default_expected_responses.get(
  609                 f.__name__.lower(), http.client.OK))
  610         response = f(*args, **kwargs)
  611 
  612         # Logic to verify the response object is sane. Expand as needed
  613         if response.status_code == TEAPOT_HTTP_STATUS:
  614             # NOTE(morgan): We use 418 internally during tests to indicate
  615             # an un-routed HTTP call was made. This allows us to avoid
  616             # misinterpreting HTTP 404 from Flask and HTTP 404 from a
  617             # resource that is not found (e.g. USER NOT FOUND) programmatically
  618             raise AssertionError("I AM A TEAPOT(418): %s" % response.data)
  619 
  620         if response.status_code != expected_status_code:
  621             raise AssertionError(
  622                 'Expected HTTP Status does not match observed HTTP '
  623                 'Status: %(expected)s != %(observed)s (%(data)s)' % {
  624                     'expected': expected_status_code,
  625                     'observed': response.status_code,
  626                     'data': response.data})
  627 
  628         # return the original response object
  629         return response
  630     return inner
  631 
  632 
  633 class KeystoneFlaskTestClient(flask_testing.FlaskClient):
  634     """Subclass of flask.testing.FlaskClient implementing assertions.
  635 
  636     Implements custom "expected" HTTP Status assertion for
  637     GET/HEAD/PUT/PATCH/DELETE.
  638     """
  639 
  640     @_assert_expected_status
  641     def get(self, *args, **kwargs):
  642         return super(KeystoneFlaskTestClient, self).get(*args, **kwargs)
  643 
  644     @_assert_expected_status
  645     def head(self, *args, **kwargs):
  646         return super(KeystoneFlaskTestClient, self).head(*args, **kwargs)
  647 
  648     @_assert_expected_status
  649     def post(self, *args, **kwargs):
  650         return super(KeystoneFlaskTestClient, self).post(*args, **kwargs)
  651 
  652     @_assert_expected_status
  653     def patch(self, *args, **kwargs):
  654         return super(KeystoneFlaskTestClient, self).patch(*args, **kwargs)
  655 
  656     @_assert_expected_status
  657     def put(self, *args, **kwargs):
  658         return super(KeystoneFlaskTestClient, self).put(*args, **kwargs)
  659 
  660     @_assert_expected_status
  661     def delete(self, *args, **kwargs):
  662         return super(KeystoneFlaskTestClient, self).delete(*args, **kwargs)
  663 
  664 
  665 class BaseTestCase(testtools.TestCase):
  666     """Light weight base test class.
  667 
  668     This is a placeholder that will eventually go away once the
  669     setup/teardown in TestCase is properly trimmed down to the bare
  670     essentials. This is really just a play to speed up the tests by
  671     eliminating unnecessary work.
  672     """
  673 
  674     def setUp(self):
  675         super(BaseTestCase, self).setUp()
  676 
  677         self.useFixture(fixtures.NestedTempfile())
  678         self.useFixture(fixtures.TempHomeDir())
  679 
  680         self.useFixture(fixtures.MockPatchObject(sys, 'exit',
  681                                                  side_effect=UnexpectedExit))
  682         self.useFixture(log_fixture.get_logging_handle_error_fixture())
  683 
  684         warnings.filterwarnings('error', category=DeprecationWarning,
  685                                 module='^keystone\\.')
  686         warnings.simplefilter('error', exc.SAWarning)
  687         if hasattr(exc, "RemovedIn20Warning"):
  688             warnings.simplefilter('ignore', exc.RemovedIn20Warning)
  689 
  690         self.addCleanup(warnings.resetwarnings)
  691         # Ensure we have an empty threadlocal context at the start of each
  692         # test.
  693         self.assertIsNone(oslo_context.get_current())
  694         self.useFixture(oslo_ctx_fixture.ClearRequestContext())
  695 
  696         orig_debug_level = ldap.get_option(ldap.OPT_DEBUG_LEVEL)
  697         self.addCleanup(ldap.set_option, ldap.OPT_DEBUG_LEVEL,
  698                         orig_debug_level)
  699         orig_tls_cacertfile = ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)
  700         if orig_tls_cacertfile is None:
  701             orig_tls_cacertfile = ''
  702         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_CACERTFILE,
  703                         orig_tls_cacertfile)
  704         orig_tls_cacertdir = ldap.get_option(ldap.OPT_X_TLS_CACERTDIR)
  705         # Setting orig_tls_cacertdir to None is not allowed.
  706         if orig_tls_cacertdir is None:
  707             orig_tls_cacertdir = ''
  708         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_CACERTDIR,
  709                         orig_tls_cacertdir)
  710         orig_tls_require_cert = ldap.get_option(ldap.OPT_X_TLS_REQUIRE_CERT)
  711         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_REQUIRE_CERT,
  712                         orig_tls_require_cert)
  713         self.addCleanup(ks_ldap.PooledLDAPHandler.connection_pools.clear)
  714 
  715     def cleanup_instance(self, *names):
  716         """Create a function suitable for use with self.addCleanup.
  717 
  718         :returns: a callable that uses a closure to delete instance attributes
  719 
  720         """
  721         def cleanup():
  722             for name in names:
  723                 # TODO(dstanek): remove this 'if' statement once
  724                 # load_backend in test_backend_ldap is only called once
  725                 # per test
  726                 if hasattr(self, name):
  727                     delattr(self, name)
  728         return cleanup
  729 
  730     def skip_if_env_not_set(self, env_var):
  731         if not os.environ.get(env_var):
  732             self.skipTest('Env variable %s is not set.' % env_var)
  733 
  734     def skip_test_overrides(self, *args, **kwargs):
  735         if self._check_for_method_in_parents(self._testMethodName):
  736             return super(BaseTestCase, self).skipTest(*args, **kwargs)
  737         raise Exception('%r is not a previously defined test method'
  738                         % self._testMethodName)
  739 
  740     def _check_for_method_in_parents(self, name):
  741         # skip first to get to parents
  742         for cls in self.__class__.__mro__[1:]:
  743             if hasattr(cls, name):
  744                 return True
  745         return False
  746 
  747     def loadapp(self, name='public'):
  748         app = flask_app.application_factory(name)
  749         app.testing = True
  750         app.test_client_class = KeystoneFlaskTestClient
  751 
  752         # NOTE(morgan): any unexpected 404s, not handled by the routed apis,
  753         # is a hard error and should not pass testing.
  754         def page_not_found_teapot(e):
  755             content = (
  756                 'TEST PROGRAMMING ERROR - Reached a 404 from an unrouted (`%s`'
  757                 ') path. Be sure the test is requesting the right resource '
  758                 'and that all blueprints are registered with the flask app.' %
  759                 flask.request.url)
  760             return content, 418
  761 
  762         app.register_error_handler(404, page_not_found_teapot)
  763 
  764         self.test_client = app.test_client
  765         self.test_request_context = app.test_request_context
  766         self.cleanup_instance('test_request_context')
  767         self.cleanup_instance('test_client')
  768         return keystone_flask.setup_app_middleware(app)
  769 
  770 
  771 class TestCase(BaseTestCase):
  772 
  773     def config_files(self):
  774         return []
  775 
  776     def _policy_fixture(self):
  777         return ksfixtures.Policy(self.config_fixture)
  778 
  779     @contextlib.contextmanager
  780     def make_request(self, path='/', **kwargs):
  781         # standup a fake app and request context with a passed in/known
  782         # environment.
  783 
  784         is_admin = kwargs.pop('is_admin', False)
  785         environ = kwargs.setdefault('environ', {})
  786         query_string = kwargs.pop('query_string', None)
  787         if query_string:
  788             # Make sure query string is properly added to the context
  789             path = '{path}?{qs}'.format(path=path, qs=query_string)
  790 
  791         if not environ.get(context.REQUEST_CONTEXT_ENV):
  792             environ[context.REQUEST_CONTEXT_ENV] = context.RequestContext(
  793                 is_admin=is_admin,
  794                 authenticated=kwargs.pop('authenticated', True))
  795 
  796         # Create a dummy flask app to work with
  797         app = flask.Flask(__name__)
  798         with app.test_request_context(path=path, environ_overrides=environ):
  799             yield
  800 
  801     def config_overrides(self):
  802         # NOTE(morganfainberg): enforce config_overrides can only ever be
  803         # called a single time.
  804         assert self.__config_overrides_called is False
  805         self.__config_overrides_called = True
  806 
  807         signing_certfile = 'examples/pki/certs/signing_cert.pem'
  808         signing_keyfile = 'examples/pki/private/signing_key.pem'
  809 
  810         self.useFixture(self._policy_fixture())
  811 
  812         self.config_fixture.config(
  813             # TODO(morganfainberg): Make Cache Testing a separate test case
  814             # in tempest, and move it out of the base unit tests.
  815             group='cache',
  816             backend='dogpile.cache.memory',
  817             enabled=True,
  818             proxies=['oslo_cache.testing.CacheIsolatingProxy'])
  819         self.config_fixture.config(
  820             group='catalog',
  821             driver='sql',
  822             template_file=dirs.tests('default_catalog.templates'))
  823         self.config_fixture.config(
  824             group='saml', certfile=signing_certfile, keyfile=signing_keyfile)
  825         self.config_fixture.config(
  826             default_log_levels=[
  827                 'amqp=WARN',
  828                 'amqplib=WARN',
  829                 'boto=WARN',
  830                 'qpid=WARN',
  831                 'sqlalchemy=WARN',
  832                 'suds=INFO',
  833                 'oslo.messaging=INFO',
  834                 'iso8601=WARN',
  835                 'requests.packages.urllib3.connectionpool=WARN',
  836                 'routes.middleware=INFO',
  837                 'stevedore.extension=INFO',
  838                 'keystone.notifications=INFO',
  839                 'keystone.identity.backends.ldap.common=INFO',
  840             ])
  841         # NOTE(notmorgan): Set password rounds low here to ensure speedy
  842         # tests. This is explicitly set because the tests here are not testing
  843         # the integrity of the password hashing, just that the correct form
  844         # of hashing has been used. Note that 4 is the lowest for bcrypt
  845         # allowed in the `[identity] password_hash_rounds` setting
  846         self.config_fixture.config(group='identity', password_hash_rounds=4)
  847 
  848         self.useFixture(
  849             ksfixtures.KeyRepository(
  850                 self.config_fixture,
  851                 'fernet_tokens',
  852                 CONF.fernet_tokens.max_active_keys
  853             )
  854         )
  855 
  856         self.useFixture(
  857             ksfixtures.KeyRepository(
  858                 self.config_fixture,
  859                 'fernet_receipts',
  860                 CONF.fernet_receipts.max_active_keys
  861             )
  862         )
  863 
  864     def _assert_config_overrides_called(self):
  865         assert self.__config_overrides_called is True
  866 
  867     def setUp(self):
  868         super(TestCase, self).setUp()
  869         self.__config_overrides_called = False
  870         self.__load_backends_called = False
  871         self.config_fixture = self.useFixture(config_fixture.Config(CONF))
  872         self.addCleanup(delattr, self, 'config_fixture')
  873         self.config(self.config_files())
  874 
  875         # NOTE(morganfainberg): mock the auth plugin setup to use the config
  876         # fixture which automatically unregisters options when performing
  877         # cleanup.
  878         def mocked_register_auth_plugin_opt(conf, opt):
  879             self.config_fixture.register_opt(opt, group='auth')
  880         self.useFixture(fixtures.MockPatchObject(
  881             keystone.conf.auth, '_register_auth_plugin_opt',
  882             new=mocked_register_auth_plugin_opt))
  883 
  884         self.config_overrides()
  885         # explicitly load auth configuration
  886         keystone.conf.auth.setup_authentication()
  887         # NOTE(morganfainberg): ensure config_overrides has been called.
  888         self.addCleanup(self._assert_config_overrides_called)
  889 
  890         self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
  891 
  892         # NOTE(morganfainberg): This code is a copy from the oslo-incubator
  893         # log module. This is not in a function or otherwise available to use
  894         # without having a CONF object to setup logging. This should help to
  895         # reduce the log size by limiting what we log (similar to how Keystone
  896         # would run under mod_wsgi).
  897         for pair in CONF.default_log_levels:
  898             mod, _sep, level_name = pair.partition('=')
  899             logger = log.getLogger(mod)
  900             logger.logger.setLevel(level_name)
  901 
  902         self.useFixture(ksfixtures.Cache())
  903 
  904         # Clear the registry of providers so that providers from previous
  905         # tests aren't used.
  906         self.addCleanup(provider_api.ProviderAPIs._clear_registry_instances)
  907 
  908         # Clear the registry of JSON Home Resources
  909         self.addCleanup(json_home.JsonHomeResources._reset)
  910 
  911         # Ensure Notification subscriptions and resource types are empty
  912         self.addCleanup(notifications.clear_subscribers)
  913         self.addCleanup(notifications.reset_notifier)
  914 
  915     def config(self, config_files):
  916         sql.initialize()
  917         CONF(args=[], project='keystone', default_config_files=config_files)
  918 
  919     def load_backends(self):
  920         """Initialize each manager and assigns them to an attribute."""
  921         # TODO(morgan): Ensure our tests only ever call load_backends
  922         # a single time via this method. for now just clear the registry
  923         # if we are reloading.
  924         provider_api.ProviderAPIs._clear_registry_instances()
  925         self.useFixture(ksfixtures.BackendLoader(self))
  926 
  927     def load_fixtures(self, fixtures):
  928         """Hacky basic and naive fixture loading based on a python module.
  929 
  930         Expects that the various APIs into the various services are already
  931         defined on `self`.
  932 
  933         """
  934         # NOTE(dstanek): create a list of attribute names to be removed
  935         # from this instance during cleanup
  936         fixtures_to_cleanup = []
  937 
  938         # TODO(termie): doing something from json, probably based on Django's
  939         #               loaddata will be much preferred.
  940         if (hasattr(self, 'identity_api') and
  941             hasattr(self, 'assignment_api') and
  942                 hasattr(self, 'resource_api')):
  943             try:
  944                 PROVIDERS.resource_api.create_domain(
  945                     resource_base.NULL_DOMAIN_ID, fixtures.ROOT_DOMAIN)
  946             except exception.Conflict:
  947                 # the root domain already exists, skip now.
  948                 pass
  949             for domain in fixtures.DOMAINS:
  950                 rv = PROVIDERS.resource_api.create_domain(domain['id'], domain)
  951                 attrname = 'domain_%s' % domain['id']
  952                 setattr(self, attrname, rv)
  953                 fixtures_to_cleanup.append(attrname)
  954 
  955             for project in fixtures.PROJECTS:
  956                 project_attr_name = 'project_%s' % project['name'].lower()
  957                 rv = PROVIDERS.resource_api.create_project(
  958                     project['id'], project)
  959                 setattr(self, project_attr_name, rv)
  960                 fixtures_to_cleanup.append(project_attr_name)
  961 
  962             for role in fixtures.ROLES:
  963                 rv = PROVIDERS.role_api.create_role(role['id'], role)
  964                 attrname = 'role_%s' % role['name']
  965                 setattr(self, attrname, rv)
  966                 fixtures_to_cleanup.append(attrname)
  967 
  968             for user in fixtures.USERS:
  969                 user_copy = user.copy()
  970                 projects = user_copy.pop('projects')
  971 
  972                 # For users, the manager layer will generate the ID
  973                 user_copy = PROVIDERS.identity_api.create_user(user_copy)
  974                 # Our tests expect that the password is still in the user
  975                 # record so that they can reference it, so put it back into
  976                 # the dict returned.
  977                 user_copy['password'] = user['password']
  978 
  979                 # fixtures.ROLES[2] is the _member_ role.
  980                 for project_id in projects:
  981                     PROVIDERS.assignment_api.add_role_to_user_and_project(
  982                         user_copy['id'], project_id, fixtures.ROLES[2]['id'])
  983 
  984                 # Use the ID from the fixture as the attribute name, so
  985                 # that our tests can easily reference each user dict, while
  986                 # the ID in the dict will be the real public ID.
  987                 attrname = 'user_%s' % user['name']
  988                 setattr(self, attrname, user_copy)
  989                 fixtures_to_cleanup.append(attrname)
  990 
  991             for role_assignment in fixtures.ROLE_ASSIGNMENTS:
  992                 role_id = role_assignment['role_id']
  993                 user = role_assignment['user']
  994                 project_id = role_assignment['project_id']
  995                 user_id = getattr(self, 'user_%s' % user)['id']
  996                 PROVIDERS.assignment_api.add_role_to_user_and_project(
  997                     user_id, project_id, role_id)
  998 
  999             self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
 1000 
 1001     def assertCloseEnoughForGovernmentWork(self, a, b, delta=3):
 1002         """Assert that two datetimes are nearly equal within a small delta.
 1003 
 1004         :param delta: Maximum allowable time delta, defined in seconds.
 1005         """
 1006         if a == b:
 1007             # Short-circuit if the values are the same.
 1008             return
 1009 
 1010         msg = '%s != %s within %s delta' % (a, b, delta)
 1011 
 1012         self.assertLessEqual(abs(a - b).seconds, delta, msg)
 1013 
 1014     def assertTimestampEqual(self, expected, value):
 1015         # Compare two timestamps but ignore the microseconds part
 1016         # of the expected timestamp. Keystone does not track microseconds and
 1017         # is working to eliminate microseconds from it's datetimes used.
 1018         expected = timeutils.parse_isotime(expected).replace(microsecond=0)
 1019         value = timeutils.parse_isotime(value).replace(microsecond=0)
 1020         self.assertEqual(
 1021             expected,
 1022             value,
 1023             "%s != %s" % (expected, value))
 1024 
 1025     def assertNotEmpty(self, l):
 1026         self.assertGreater(len(l), 0)
 1027 
 1028     def assertUserDictEqual(self, expected, observed, message=''):
 1029         """Assert that a user dict is equal to another user dict.
 1030 
 1031         User dictionaries have some variable values that should be ignored in
 1032         the comparison. This method is a helper that strips those elements out
 1033         when comparing the user dictionary. This normalized these differences
 1034         that should not change the comparison.
 1035         """
 1036         # NOTE(notmorgan): An empty option list is the same as no options being
 1037         # specified in the user_ref. This removes options if it is empty in
 1038         # observed if options is not specified in the expected value.
 1039         if ('options' in observed and not observed['options'] and
 1040                 'options' not in expected):
 1041             observed = observed.copy()
 1042             del observed['options']
 1043 
 1044         self.assertDictEqual(expected, observed, message)
 1045 
 1046     @property
 1047     def ipv6_enabled(self):
 1048         if socket.has_ipv6:
 1049             sock = None
 1050             try:
 1051                 sock = socket.socket(socket.AF_INET6)
 1052                 # NOTE(Mouad): Try to bind to IPv6 loopback ip address.
 1053                 sock.bind(("::1", 0))
 1054                 return True
 1055             except socket.error:
 1056                 pass
 1057             finally:
 1058                 if sock:
 1059                     sock.close()
 1060         return False
 1061 
 1062     def skip_if_no_ipv6(self):
 1063         if not self.ipv6_enabled:
 1064             raise self.skipTest("IPv6 is not enabled in the system")
 1065 
 1066 
 1067 class SQLDriverOverrides(object):
 1068     """A mixin for consolidating sql-specific test overrides."""
 1069 
 1070     def config_overrides(self):
 1071         super(SQLDriverOverrides, self).config_overrides()
 1072         # SQL specific driver overrides
 1073         self.config_fixture.config(group='catalog', driver='sql')
 1074         self.config_fixture.config(group='identity', driver='sql')
 1075         self.config_fixture.config(group='policy', driver='sql')
 1076         self.config_fixture.config(group='trust', driver='sql')