"Fossies" - the Fresh Open Source Software Archive

Member "keystone-19.0.0/keystone/tests/unit/core.py" (14 Apr 2021, 36887 Bytes) of package /linux/misc/openstack/keystone-19.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "core.py": 18.0.0_vs_19.0.0.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 import atexit
   15 import base64
   16 import contextlib
   17 import datetime
   18 import functools
   19 import hashlib
   20 import json
   21 import ldap
   22 import os
   23 import shutil
   24 import socket
   25 import sys
   26 import uuid
   27 import warnings
   28 
   29 import fixtures
   30 import flask
   31 from flask import testing as flask_testing
   32 import http.client
   33 from oslo_config import fixture as config_fixture
   34 from oslo_context import context as oslo_context
   35 from oslo_context import fixture as oslo_ctx_fixture
   36 from oslo_log import fixture as log_fixture
   37 from oslo_log import log
   38 from oslo_utils import timeutils
   39 from sqlalchemy import exc
   40 import testtools
   41 from testtools import testcase
   42 
   43 import keystone.api
   44 from keystone.common import context
   45 from keystone.common import json_home
   46 from keystone.common import provider_api
   47 from keystone.common import sql
   48 import keystone.conf
   49 from keystone import exception
   50 from keystone.identity.backends.ldap import common as ks_ldap
   51 from keystone import notifications
   52 from keystone.resource.backends import base as resource_base
   53 from keystone.server.flask import application as flask_app
   54 from keystone.server.flask import core as keystone_flask
   55 from keystone.tests.unit import ksfixtures
   56 
   57 
   58 keystone.conf.configure()
   59 keystone.conf.set_config_defaults()
   60 
   61 PID = str(os.getpid())
   62 TESTSDIR = os.path.dirname(os.path.abspath(__file__))
   63 TESTCONF = os.path.join(TESTSDIR, 'config_files')
   64 ROOTDIR = os.path.normpath(os.path.join(TESTSDIR, '..', '..', '..'))
   65 VENDOR = os.path.join(ROOTDIR, 'vendor')
   66 ETCDIR = os.path.join(ROOTDIR, 'etc')
   67 
   68 
   69 def _calc_tmpdir():
   70     env_val = os.environ.get('KEYSTONE_TEST_TEMP_DIR')
   71     if not env_val:
   72         return os.path.join(TESTSDIR, 'tmp', PID)
   73     return os.path.join(env_val, PID)
   74 
   75 
   76 TMPDIR = _calc_tmpdir()
   77 
   78 CONF = keystone.conf.CONF
   79 PROVIDERS = provider_api.ProviderAPIs
   80 log.register_options(CONF)
   81 
   82 IN_MEM_DB_CONN_STRING = 'sqlite://'
   83 
   84 # Strictly matches ISO 8601 timestamps with subsecond precision like:
   85 # 2016-06-28T20:48:56.000000Z
   86 TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
   87 TIME_FORMAT_REGEX = r'^\d{4}-[0-1]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-5]\d\.\d{6}Z$'
   88 
   89 exception._FATAL_EXCEPTION_FORMAT_ERRORS = True
   90 os.makedirs(TMPDIR)
   91 atexit.register(shutil.rmtree, TMPDIR)
   92 
   93 
   94 class dirs(object):
   95     @staticmethod
   96     def root(*p):
   97         return os.path.join(ROOTDIR, *p)
   98 
   99     @staticmethod
  100     def etc(*p):
  101         return os.path.join(ETCDIR, *p)
  102 
  103     @staticmethod
  104     def tests(*p):
  105         return os.path.join(TESTSDIR, *p)
  106 
  107     @staticmethod
  108     def tmp(*p):
  109         return os.path.join(TMPDIR, *p)
  110 
  111     @staticmethod
  112     def tests_conf(*p):
  113         return os.path.join(TESTCONF, *p)
  114 
  115 
  116 @atexit.register
  117 def remove_test_databases():
  118     db = dirs.tmp('test.db')
  119     if os.path.exists(db):
  120         os.unlink(db)
  121     pristine = dirs.tmp('test.db.pristine')
  122     if os.path.exists(pristine):
  123         os.unlink(pristine)
  124 
  125 
  126 def skip_if_cache_disabled(*sections):
  127     """Skip a test if caching is disabled, this is a decorator.
  128 
  129     Caching can be disabled either globally or for a specific section.
  130 
  131     In the code fragment::
  132 
  133         @skip_if_cache_is_disabled('assignment', 'token')
  134         def test_method(*args):
  135             ...
  136 
  137     The method test_method would be skipped if caching is disabled globally via
  138     the `enabled` option in the `cache` section of the configuration or if
  139     the `caching` option is set to false in either `assignment` or `token`
  140     sections of the configuration.  This decorator can be used with no
  141     arguments to only check global caching.
  142 
  143     If a specified configuration section does not define the `caching` option,
  144     this decorator makes the caching enabled if `enabled` option in the `cache`
  145     section of the configuration is true.
  146 
  147     """
  148     def wrapper(f):
  149         @functools.wraps(f)
  150         def inner(*args, **kwargs):
  151             if not CONF.cache.enabled:
  152                 raise testcase.TestSkipped('Cache globally disabled.')
  153             for s in sections:
  154                 conf_sec = getattr(CONF, s, None)
  155                 if conf_sec is not None:
  156                     if not getattr(conf_sec, 'caching', True):
  157                         raise testcase.TestSkipped('%s caching disabled.' % s)
  158             return f(*args, **kwargs)
  159         return inner
  160     return wrapper
  161 
  162 
  163 def skip_if_cache_is_enabled(*sections):
  164     def wrapper(f):
  165         @functools.wraps(f)
  166         def inner(*args, **kwargs):
  167             if CONF.cache.enabled:
  168                 for s in sections:
  169                     conf_sec = getattr(CONF, s, None)
  170                     if conf_sec is not None:
  171                         if getattr(conf_sec, 'caching', True):
  172                             raise testcase.TestSkipped('%s caching enabled.' %
  173                                                        s)
  174             return f(*args, **kwargs)
  175         return inner
  176     return wrapper
  177 
  178 
  179 def skip_if_no_multiple_domains_support(f):
  180     """Decorator to skip tests for identity drivers limited to one domain."""
  181     @functools.wraps(f)
  182     def wrapper(*args, **kwargs):
  183         test_obj = args[0]
  184         if not test_obj.identity_api.multiple_domains_supported:
  185             raise testcase.TestSkipped('No multiple domains support')
  186         return f(*args, **kwargs)
  187     return wrapper
  188 
  189 
  190 class UnexpectedExit(Exception):
  191     pass
  192 
  193 
  194 def new_region_ref(parent_region_id=None, **kwargs):
  195     ref = {
  196         'id': uuid.uuid4().hex,
  197         'description': uuid.uuid4().hex,
  198         'parent_region_id': parent_region_id}
  199 
  200     ref.update(kwargs)
  201     return ref
  202 
  203 
  204 def new_service_ref(**kwargs):
  205     ref = {
  206         'id': uuid.uuid4().hex,
  207         'name': uuid.uuid4().hex,
  208         'description': uuid.uuid4().hex,
  209         'enabled': True,
  210         'type': uuid.uuid4().hex,
  211     }
  212     ref.update(kwargs)
  213     return ref
  214 
  215 
  216 NEEDS_REGION_ID = object()
  217 
  218 
  219 def new_endpoint_ref(service_id, interface='public',
  220                      region_id=NEEDS_REGION_ID, **kwargs):
  221 
  222     ref = {
  223         'id': uuid.uuid4().hex,
  224         'name': uuid.uuid4().hex,
  225         'description': uuid.uuid4().hex,
  226         'interface': interface,
  227         'service_id': service_id,
  228         'url': 'https://' + uuid.uuid4().hex + '.com',
  229     }
  230 
  231     if region_id is NEEDS_REGION_ID:
  232         ref['region_id'] = uuid.uuid4().hex
  233     elif region_id is None and kwargs.get('region') is not None:
  234         # pre-3.2 form endpoints are not supported by this function
  235         raise NotImplementedError("use new_endpoint_ref_with_region")
  236     else:
  237         ref['region_id'] = region_id
  238     ref.update(kwargs)
  239     return ref
  240 
  241 
  242 def new_endpoint_group_ref(filters, **kwargs):
  243     ref = {
  244         'id': uuid.uuid4().hex,
  245         'description': uuid.uuid4().hex,
  246         'filters': filters,
  247         'name': uuid.uuid4().hex
  248     }
  249     ref.update(kwargs)
  250     return ref
  251 
  252 
  253 def new_endpoint_ref_with_region(service_id, region, interface='public',
  254                                  **kwargs):
  255     """Define an endpoint_ref having a pre-3.2 form.
  256 
  257     Contains the deprecated 'region' instead of 'region_id'.
  258     """
  259     ref = new_endpoint_ref(service_id, interface, region=region,
  260                            region_id='invalid', **kwargs)
  261     del ref['region_id']
  262     return ref
  263 
  264 
  265 def new_domain_ref(**kwargs):
  266     ref = {
  267         'id': uuid.uuid4().hex,
  268         'name': uuid.uuid4().hex,
  269         'description': uuid.uuid4().hex,
  270         'enabled': True,
  271         'tags': [],
  272         'options': {}
  273     }
  274     ref.update(kwargs)
  275     return ref
  276 
  277 
  278 def new_project_ref(domain_id=None, is_domain=False, **kwargs):
  279     ref = {
  280         'id': uuid.uuid4().hex,
  281         'name': uuid.uuid4().hex,
  282         'description': uuid.uuid4().hex,
  283         'enabled': True,
  284         'domain_id': domain_id,
  285         'is_domain': is_domain,
  286         'tags': [],
  287         'options': {}
  288     }
  289     # NOTE(henry-nash): We don't include parent_id in the initial list above
  290     # since specifying it is optional depending on where the project sits in
  291     # the hierarchy (and a parent_id of None has meaning - i.e. it's a top
  292     # level project).
  293     ref.update(kwargs)
  294     return ref
  295 
  296 
  297 def new_user_ref(domain_id, project_id=None, **kwargs):
  298     ref = {
  299         'id': uuid.uuid4().hex,
  300         'name': uuid.uuid4().hex,
  301         'enabled': True,
  302         'domain_id': domain_id,
  303         'email': uuid.uuid4().hex,
  304         'password': uuid.uuid4().hex,
  305     }
  306     if project_id:
  307         ref['default_project_id'] = project_id
  308     ref.update(kwargs)
  309     return ref
  310 
  311 
  312 def new_federated_user_ref(idp_id=None, protocol_id=None, **kwargs):
  313     ref = {
  314         'idp_id': idp_id or 'ORG_IDP',
  315         'protocol_id': protocol_id or 'saml2',
  316         'unique_id': uuid.uuid4().hex,
  317         'display_name': uuid.uuid4().hex,
  318     }
  319     ref.update(kwargs)
  320     return ref
  321 
  322 
  323 def new_mapping_ref(mapping_id=None, rules=None, **kwargs):
  324     ref = {
  325         'id': mapping_id or uuid.uuid4().hex,
  326         'rules': rules or []
  327     }
  328     ref.update(kwargs)
  329     return ref
  330 
  331 
  332 def new_protocol_ref(protocol_id=None, idp_id=None, mapping_id=None, **kwargs):
  333     ref = {
  334         'id': protocol_id or 'saml2',
  335         'idp_id': idp_id or 'ORG_IDP',
  336         'mapping_id': mapping_id or uuid.uuid4().hex
  337     }
  338     ref.update(kwargs)
  339     return ref
  340 
  341 
  342 def new_identity_provider_ref(idp_id=None, **kwargs):
  343     ref = {
  344         'id': idp_id or 'ORG_IDP',
  345         'enabled': True,
  346         'description': '',
  347     }
  348     ref.update(kwargs)
  349     return ref
  350 
  351 
  352 def new_service_provider_ref(**kwargs):
  353     ref = {
  354         'auth_url': 'https://' + uuid.uuid4().hex + '.com',
  355         'enabled': True,
  356         'description': uuid.uuid4().hex,
  357         'sp_url': 'https://' + uuid.uuid4().hex + '.com',
  358         'relay_state_prefix': CONF.saml.relay_state_prefix
  359     }
  360     ref.update(kwargs)
  361     return ref
  362 
  363 
  364 def new_group_ref(domain_id, **kwargs):
  365     ref = {
  366         'id': uuid.uuid4().hex,
  367         'name': uuid.uuid4().hex,
  368         'description': uuid.uuid4().hex,
  369         'domain_id': domain_id
  370     }
  371     ref.update(kwargs)
  372     return ref
  373 
  374 
  375 def new_credential_ref(user_id, project_id=None, type='cert', **kwargs):
  376     ref = {
  377         'id': uuid.uuid4().hex,
  378         'user_id': user_id,
  379         'type': type,
  380     }
  381 
  382     if project_id:
  383         ref['project_id'] = project_id
  384     if 'blob' not in kwargs:
  385         ref['blob'] = uuid.uuid4().hex
  386 
  387     ref.update(kwargs)
  388     return ref
  389 
  390 
  391 def new_cert_credential(user_id, project_id=None, blob=None, **kwargs):
  392     if blob is None:
  393         blob = {'access': uuid.uuid4().hex, 'secret': uuid.uuid4().hex}
  394 
  395     credential = new_credential_ref(user_id=user_id,
  396                                     project_id=project_id,
  397                                     blob=json.dumps(blob),
  398                                     type='cert',
  399                                     **kwargs)
  400     return blob, credential
  401 
  402 
  403 def new_ec2_credential(user_id, project_id=None, blob=None, **kwargs):
  404     if blob is None:
  405         blob = {
  406             'access': uuid.uuid4().hex,
  407             'secret': uuid.uuid4().hex,
  408             'trust_id': None
  409         }
  410 
  411     if 'id' not in kwargs:
  412         access = blob['access'].encode('utf-8')
  413         kwargs['id'] = hashlib.sha256(access).hexdigest()
  414 
  415     credential = new_credential_ref(user_id=user_id,
  416                                     project_id=project_id,
  417                                     blob=json.dumps(blob),
  418                                     type='ec2',
  419                                     **kwargs)
  420     return blob, credential
  421 
  422 
  423 def new_totp_credential(user_id, project_id=None, blob=None):
  424     if not blob:
  425         # NOTE(notmorgan): 20 bytes of data from os.urandom for
  426         # a totp secret.
  427         blob = base64.b32encode(os.urandom(20)).decode('utf-8')
  428     credential = new_credential_ref(user_id=user_id,
  429                                     project_id=project_id,
  430                                     blob=blob,
  431                                     type='totp')
  432     return credential
  433 
  434 
  435 def new_application_credential_ref(roles=None,
  436                                    name=None,
  437                                    expires=None,
  438                                    secret=None):
  439     ref = {
  440         'id': uuid.uuid4().hex,
  441         'name': uuid.uuid4().hex,
  442         'description': uuid.uuid4().hex,
  443     }
  444     if roles:
  445         ref['roles'] = roles
  446     if secret:
  447         ref['secret'] = secret
  448 
  449     if isinstance(expires, str):
  450         ref['expires_at'] = expires
  451     elif isinstance(expires, dict):
  452         ref['expires_at'] = (
  453             timeutils.utcnow() + datetime.timedelta(**expires)
  454         ).strftime(TIME_FORMAT)
  455     elif expires is None:
  456         pass
  457     else:
  458         raise NotImplementedError('Unexpected value for "expires"')
  459 
  460     return ref
  461 
  462 
  463 def new_role_ref(**kwargs):
  464     ref = {
  465         'id': uuid.uuid4().hex,
  466         'name': uuid.uuid4().hex,
  467         'description': uuid.uuid4().hex,
  468         'domain_id': None,
  469         'options': {},
  470     }
  471     ref.update(kwargs)
  472     return ref
  473 
  474 
  475 def new_policy_ref(**kwargs):
  476     ref = {
  477         'id': uuid.uuid4().hex,
  478         'name': uuid.uuid4().hex,
  479         'description': uuid.uuid4().hex,
  480         'enabled': True,
  481         # Store serialized JSON data as the blob to mimic real world usage.
  482         'blob': json.dumps({'data': uuid.uuid4().hex, }),
  483         'type': uuid.uuid4().hex,
  484     }
  485 
  486     ref.update(kwargs)
  487     return ref
  488 
  489 
  490 def new_domain_config_ref(**kwargs):
  491     ref = {
  492         "identity": {
  493             "driver": "ldap"
  494         },
  495         "ldap": {
  496             "url": "ldap://myldap.com:389/",
  497             "user_tree_dn": "ou=Users,dc=my_new_root,dc=org"
  498         }
  499     }
  500     ref.update(kwargs)
  501     return ref
  502 
  503 
  504 def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
  505                   impersonation=None, expires=None, role_ids=None,
  506                   role_names=None, remaining_uses=None,
  507                   allow_redelegation=False, redelegation_count=None, **kwargs):
  508     ref = {
  509         'id': uuid.uuid4().hex,
  510         'trustor_user_id': trustor_user_id,
  511         'trustee_user_id': trustee_user_id,
  512         'impersonation': impersonation or False,
  513         'project_id': project_id,
  514         'remaining_uses': remaining_uses,
  515         'allow_redelegation': allow_redelegation,
  516     }
  517 
  518     if isinstance(redelegation_count, int):
  519         ref.update(redelegation_count=redelegation_count)
  520 
  521     if isinstance(expires, str):
  522         ref['expires_at'] = expires
  523     elif isinstance(expires, dict):
  524         ref['expires_at'] = (
  525             timeutils.utcnow() + datetime.timedelta(**expires)
  526         ).strftime(TIME_FORMAT)
  527     elif expires is None:
  528         pass
  529     else:
  530         raise NotImplementedError('Unexpected value for "expires"')
  531 
  532     role_ids = role_ids or []
  533     role_names = role_names or []
  534     if role_ids or role_names:
  535         ref['roles'] = []
  536         for role_id in role_ids:
  537             ref['roles'].append({'id': role_id})
  538         for role_name in role_names:
  539             ref['roles'].append({'name': role_name})
  540 
  541     ref.update(kwargs)
  542     return ref
  543 
  544 
  545 def new_registered_limit_ref(**kwargs):
  546     ref = {
  547         'service_id': uuid.uuid4().hex,
  548         'resource_name': uuid.uuid4().hex,
  549         'default_limit': 10,
  550         'description': uuid.uuid4().hex
  551     }
  552 
  553     ref.update(kwargs)
  554     return ref
  555 
  556 
  557 def new_limit_ref(**kwargs):
  558     ref = {
  559         'service_id': uuid.uuid4().hex,
  560         'resource_name': uuid.uuid4().hex,
  561         'resource_limit': 10,
  562         'description': uuid.uuid4().hex
  563     }
  564 
  565     ref.update(kwargs)
  566     return ref
  567 
  568 
  569 def create_user(api, domain_id, **kwargs):
  570     """Create a user via the API. Keep the created password.
  571 
  572     The password is saved and restored when api.create_user() is called.
  573     Only use this routine if there is a requirement for the user object to
  574     have a valid password after api.create_user() is called.
  575     """
  576     user = new_user_ref(domain_id=domain_id, **kwargs)
  577     password = user['password']
  578     user = api.create_user(user)
  579     user['password'] = password
  580     return user
  581 
  582 
  583 def _assert_expected_status(f):
  584     """Add `expected_status_code` as an argument to the test_client methods.
  585 
  586     `expected_status_code` must be passed as a kwarg.
  587     """
  588     TEAPOT_HTTP_STATUS = 418
  589 
  590     _default_expected_responses = {
  591         'get': http.client.OK,
  592         'head': http.client.OK,
  593         'post': http.client.CREATED,
  594         'put': http.client.NO_CONTENT,
  595         'patch': http.client.OK,
  596         'delete': http.client.NO_CONTENT,
  597     }
  598 
  599     @functools.wraps(f)
  600     def inner(*args, **kwargs):
  601         # Get the "expected_status_code" kwarg if supplied. If not supplied use
  602         # the `_default_expected_response` mapping, or fall through to
  603         # "HTTP OK" if the method is somehow unknown.
  604         expected_status_code = kwargs.pop(
  605             'expected_status_code',
  606             _default_expected_responses.get(
  607                 f.__name__.lower(), http.client.OK))
  608         response = f(*args, **kwargs)
  609 
  610         # Logic to verify the response object is sane. Expand as needed
  611         if response.status_code == TEAPOT_HTTP_STATUS:
  612             # NOTE(morgan): We use 418 internally during tests to indicate
  613             # an un-routed HTTP call was made. This allows us to avoid
  614             # misinterpreting HTTP 404 from Flask and HTTP 404 from a
  615             # resource that is not found (e.g. USER NOT FOUND) programmatically
  616             raise AssertionError("I AM A TEAPOT(418): %s" % response.data)
  617 
  618         if response.status_code != expected_status_code:
  619             raise AssertionError(
  620                 'Expected HTTP Status does not match observed HTTP '
  621                 'Status: %(expected)s != %(observed)s (%(data)s)' % {
  622                     'expected': expected_status_code,
  623                     'observed': response.status_code,
  624                     'data': response.data})
  625 
  626         # return the original response object
  627         return response
  628     return inner
  629 
  630 
  631 class KeystoneFlaskTestClient(flask_testing.FlaskClient):
  632     """Subclass of flask.testing.FlaskClient implementing assertions.
  633 
  634     Implements custom "expected" HTTP Status assertion for
  635     GET/HEAD/PUT/PATCH/DELETE.
  636     """
  637 
  638     @_assert_expected_status
  639     def get(self, *args, **kwargs):
  640         return super(KeystoneFlaskTestClient, self).get(*args, **kwargs)
  641 
  642     @_assert_expected_status
  643     def head(self, *args, **kwargs):
  644         return super(KeystoneFlaskTestClient, self).head(*args, **kwargs)
  645 
  646     @_assert_expected_status
  647     def post(self, *args, **kwargs):
  648         return super(KeystoneFlaskTestClient, self).post(*args, **kwargs)
  649 
  650     @_assert_expected_status
  651     def patch(self, *args, **kwargs):
  652         return super(KeystoneFlaskTestClient, self).patch(*args, **kwargs)
  653 
  654     @_assert_expected_status
  655     def put(self, *args, **kwargs):
  656         return super(KeystoneFlaskTestClient, self).put(*args, **kwargs)
  657 
  658     @_assert_expected_status
  659     def delete(self, *args, **kwargs):
  660         return super(KeystoneFlaskTestClient, self).delete(*args, **kwargs)
  661 
  662 
  663 class BaseTestCase(testtools.TestCase):
  664     """Light weight base test class.
  665 
  666     This is a placeholder that will eventually go away once the
  667     setup/teardown in TestCase is properly trimmed down to the bare
  668     essentials. This is really just a play to speed up the tests by
  669     eliminating unnecessary work.
  670     """
  671 
  672     def setUp(self):
  673         super(BaseTestCase, self).setUp()
  674 
  675         self.useFixture(fixtures.NestedTempfile())
  676         self.useFixture(fixtures.TempHomeDir())
  677 
  678         self.useFixture(fixtures.MockPatchObject(sys, 'exit',
  679                                                  side_effect=UnexpectedExit))
  680         self.useFixture(log_fixture.get_logging_handle_error_fixture())
  681 
  682         warnings.filterwarnings('error', category=DeprecationWarning,
  683                                 module='^keystone\\.')
  684         warnings.filterwarnings(
  685             'ignore', category=DeprecationWarning,
  686             message=r"Using function/method 'db_version\(\)' is deprecated")
  687         warnings.simplefilter('error', exc.SAWarning)
  688         if hasattr(exc, "RemovedIn20Warning"):
  689             warnings.simplefilter('ignore', exc.RemovedIn20Warning)
  690 
  691         self.addCleanup(warnings.resetwarnings)
  692         # Ensure we have an empty threadlocal context at the start of each
  693         # test.
  694         self.assertIsNone(oslo_context.get_current())
  695         self.useFixture(oslo_ctx_fixture.ClearRequestContext())
  696 
  697         orig_debug_level = ldap.get_option(ldap.OPT_DEBUG_LEVEL)
  698         self.addCleanup(ldap.set_option, ldap.OPT_DEBUG_LEVEL,
  699                         orig_debug_level)
  700         orig_tls_cacertfile = ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)
  701         if orig_tls_cacertfile is None:
  702             orig_tls_cacertfile = ''
  703         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_CACERTFILE,
  704                         orig_tls_cacertfile)
  705         orig_tls_cacertdir = ldap.get_option(ldap.OPT_X_TLS_CACERTDIR)
  706         # Setting orig_tls_cacertdir to None is not allowed.
  707         if orig_tls_cacertdir is None:
  708             orig_tls_cacertdir = ''
  709         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_CACERTDIR,
  710                         orig_tls_cacertdir)
  711         orig_tls_require_cert = ldap.get_option(ldap.OPT_X_TLS_REQUIRE_CERT)
  712         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_REQUIRE_CERT,
  713                         orig_tls_require_cert)
  714         self.addCleanup(ks_ldap.PooledLDAPHandler.connection_pools.clear)
  715 
  716     def cleanup_instance(self, *names):
  717         """Create a function suitable for use with self.addCleanup.
  718 
  719         :returns: a callable that uses a closure to delete instance attributes
  720 
  721         """
  722         def cleanup():
  723             for name in names:
  724                 # TODO(dstanek): remove this 'if' statement once
  725                 # load_backend in test_backend_ldap is only called once
  726                 # per test
  727                 if hasattr(self, name):
  728                     delattr(self, name)
  729         return cleanup
  730 
  731     def skip_if_env_not_set(self, env_var):
  732         if not os.environ.get(env_var):
  733             self.skipTest('Env variable %s is not set.' % env_var)
  734 
  735     def skip_test_overrides(self, *args, **kwargs):
  736         if self._check_for_method_in_parents(self._testMethodName):
  737             return super(BaseTestCase, self).skipTest(*args, **kwargs)
  738         raise Exception('%r is not a previously defined test method'
  739                         % self._testMethodName)
  740 
  741     def _check_for_method_in_parents(self, name):
  742         # skip first to get to parents
  743         for cls in self.__class__.__mro__[1:]:
  744             if hasattr(cls, name):
  745                 return True
  746         return False
  747 
  748     def loadapp(self, name='public'):
  749         app = flask_app.application_factory(name)
  750         app.testing = True
  751         app.test_client_class = KeystoneFlaskTestClient
  752 
  753         # NOTE(morgan): any unexpected 404s, not handled by the routed apis,
  754         # is a hard error and should not pass testing.
  755         def page_not_found_teapot(e):
  756             content = (
  757                 'TEST PROGRAMMING ERROR - Reached a 404 from an unrouted (`%s`'
  758                 ') path. Be sure the test is requesting the right resource '
  759                 'and that all blueprints are registered with the flask app.' %
  760                 flask.request.url)
  761             return content, 418
  762 
  763         app.register_error_handler(404, page_not_found_teapot)
  764 
  765         self.test_client = app.test_client
  766         self.test_request_context = app.test_request_context
  767         self.cleanup_instance('test_request_context')
  768         self.cleanup_instance('test_client')
  769         return keystone_flask.setup_app_middleware(app)
  770 
  771 
  772 class TestCase(BaseTestCase):
  773 
  774     def config_files(self):
  775         return []
  776 
  777     def _policy_fixture(self):
  778         return ksfixtures.Policy(self.config_fixture)
  779 
  780     @contextlib.contextmanager
  781     def make_request(self, path='/', **kwargs):
  782         # standup a fake app and request context with a passed in/known
  783         # environment.
  784 
  785         is_admin = kwargs.pop('is_admin', False)
  786         environ = kwargs.setdefault('environ', {})
  787         query_string = kwargs.pop('query_string', None)
  788         if query_string:
  789             # Make sure query string is properly added to the context
  790             path = '{path}?{qs}'.format(path=path, qs=query_string)
  791 
  792         if not environ.get(context.REQUEST_CONTEXT_ENV):
  793             environ[context.REQUEST_CONTEXT_ENV] = context.RequestContext(
  794                 is_admin=is_admin,
  795                 authenticated=kwargs.pop('authenticated', True))
  796 
  797         # Create a dummy flask app to work with
  798         app = flask.Flask(__name__)
  799         with app.test_request_context(path=path, environ_overrides=environ):
  800             yield
  801 
  802     def config_overrides(self):
  803         # NOTE(morganfainberg): enforce config_overrides can only ever be
  804         # called a single time.
  805         assert self.__config_overrides_called is False
  806         self.__config_overrides_called = True
  807 
  808         signing_certfile = 'examples/pki/certs/signing_cert.pem'
  809         signing_keyfile = 'examples/pki/private/signing_key.pem'
  810 
  811         self.useFixture(self._policy_fixture())
  812 
  813         self.config_fixture.config(
  814             # TODO(morganfainberg): Make Cache Testing a separate test case
  815             # in tempest, and move it out of the base unit tests.
  816             group='cache',
  817             backend='dogpile.cache.memory',
  818             enabled=True,
  819             proxies=['oslo_cache.testing.CacheIsolatingProxy'])
  820         self.config_fixture.config(
  821             group='catalog',
  822             driver='sql',
  823             template_file=dirs.tests('default_catalog.templates'))
  824         self.config_fixture.config(
  825             group='saml', certfile=signing_certfile, keyfile=signing_keyfile)
  826         self.config_fixture.config(
  827             default_log_levels=[
  828                 'amqp=WARN',
  829                 'amqplib=WARN',
  830                 'boto=WARN',
  831                 'qpid=WARN',
  832                 'sqlalchemy=WARN',
  833                 'suds=INFO',
  834                 'oslo.messaging=INFO',
  835                 'iso8601=WARN',
  836                 'requests.packages.urllib3.connectionpool=WARN',
  837                 'routes.middleware=INFO',
  838                 'stevedore.extension=INFO',
  839                 'keystone.notifications=INFO',
  840                 'keystone.identity.backends.ldap.common=INFO',
  841             ])
  842         # NOTE(notmorgan): Set password rounds low here to ensure speedy
  843         # tests. This is explicitly set because the tests here are not testing
  844         # the integrity of the password hashing, just that the correct form
  845         # of hashing has been used. Note that 4 is the lowest for bcrypt
  846         # allowed in the `[identity] password_hash_rounds` setting
  847         self.config_fixture.config(group='identity', password_hash_rounds=4)
  848 
  849         self.useFixture(
  850             ksfixtures.KeyRepository(
  851                 self.config_fixture,
  852                 'fernet_tokens',
  853                 CONF.fernet_tokens.max_active_keys
  854             )
  855         )
  856 
  857         self.useFixture(
  858             ksfixtures.KeyRepository(
  859                 self.config_fixture,
  860                 'fernet_receipts',
  861                 CONF.fernet_receipts.max_active_keys
  862             )
  863         )
  864 
  865     def _assert_config_overrides_called(self):
  866         assert self.__config_overrides_called is True
  867 
  868     def setUp(self):
  869         super(TestCase, self).setUp()
  870         self.__config_overrides_called = False
  871         self.__load_backends_called = False
  872         self.config_fixture = self.useFixture(config_fixture.Config(CONF))
  873         self.addCleanup(delattr, self, 'config_fixture')
  874         self.config(self.config_files())
  875 
  876         # NOTE(morganfainberg): mock the auth plugin setup to use the config
  877         # fixture which automatically unregisters options when performing
  878         # cleanup.
  879         def mocked_register_auth_plugin_opt(conf, opt):
  880             self.config_fixture.register_opt(opt, group='auth')
  881         self.useFixture(fixtures.MockPatchObject(
  882             keystone.conf.auth, '_register_auth_plugin_opt',
  883             new=mocked_register_auth_plugin_opt))
  884 
  885         self.config_overrides()
  886         # explicitly load auth configuration
  887         keystone.conf.auth.setup_authentication()
  888         # NOTE(morganfainberg): ensure config_overrides has been called.
  889         self.addCleanup(self._assert_config_overrides_called)
  890 
  891         self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
  892 
  893         # NOTE(morganfainberg): This code is a copy from the oslo-incubator
  894         # log module. This is not in a function or otherwise available to use
  895         # without having a CONF object to setup logging. This should help to
  896         # reduce the log size by limiting what we log (similar to how Keystone
  897         # would run under mod_wsgi).
  898         for pair in CONF.default_log_levels:
  899             mod, _sep, level_name = pair.partition('=')
  900             logger = log.getLogger(mod)
  901             logger.logger.setLevel(level_name)
  902 
  903         self.useFixture(ksfixtures.Cache())
  904 
  905         # Clear the registry of providers so that providers from previous
  906         # tests aren't used.
  907         self.addCleanup(provider_api.ProviderAPIs._clear_registry_instances)
  908 
  909         # Clear the registry of JSON Home Resources
  910         self.addCleanup(json_home.JsonHomeResources._reset)
  911 
  912         # Ensure Notification subscriptions and resource types are empty
  913         self.addCleanup(notifications.clear_subscribers)
  914         self.addCleanup(notifications.reset_notifier)
  915 
  916     def config(self, config_files):
  917         sql.initialize()
  918         CONF(args=[], project='keystone', default_config_files=config_files)
  919 
  920     def load_backends(self):
  921         """Initialize each manager and assigns them to an attribute."""
  922         # TODO(morgan): Ensure our tests only ever call load_backends
  923         # a single time via this method. for now just clear the registry
  924         # if we are reloading.
  925         provider_api.ProviderAPIs._clear_registry_instances()
  926         self.useFixture(ksfixtures.BackendLoader(self))
  927 
  928     def load_fixtures(self, fixtures):
  929         """Hacky basic and naive fixture loading based on a python module.
  930 
  931         Expects that the various APIs into the various services are already
  932         defined on `self`.
  933 
  934         """
  935         # NOTE(dstanek): create a list of attribute names to be removed
  936         # from this instance during cleanup
  937         fixtures_to_cleanup = []
  938 
  939         # TODO(termie): doing something from json, probably based on Django's
  940         #               loaddata will be much preferred.
  941         if (hasattr(self, 'identity_api') and
  942             hasattr(self, 'assignment_api') and
  943                 hasattr(self, 'resource_api')):
  944             try:
  945                 PROVIDERS.resource_api.create_domain(
  946                     resource_base.NULL_DOMAIN_ID, fixtures.ROOT_DOMAIN)
  947             except exception.Conflict:
  948                 # the root domain already exists, skip now.
  949                 pass
  950             for domain in fixtures.DOMAINS:
  951                 rv = PROVIDERS.resource_api.create_domain(domain['id'], domain)
  952                 attrname = 'domain_%s' % domain['id']
  953                 setattr(self, attrname, rv)
  954                 fixtures_to_cleanup.append(attrname)
  955 
  956             for project in fixtures.PROJECTS:
  957                 project_attr_name = 'project_%s' % project['name'].lower()
  958                 rv = PROVIDERS.resource_api.create_project(
  959                     project['id'], project)
  960                 setattr(self, project_attr_name, rv)
  961                 fixtures_to_cleanup.append(project_attr_name)
  962 
  963             for role in fixtures.ROLES:
  964                 rv = PROVIDERS.role_api.create_role(role['id'], role)
  965                 attrname = 'role_%s' % role['name']
  966                 setattr(self, attrname, rv)
  967                 fixtures_to_cleanup.append(attrname)
  968 
  969             for user in fixtures.USERS:
  970                 user_copy = user.copy()
  971                 projects = user_copy.pop('projects')
  972 
  973                 # For users, the manager layer will generate the ID
  974                 user_copy = PROVIDERS.identity_api.create_user(user_copy)
  975                 # Our tests expect that the password is still in the user
  976                 # record so that they can reference it, so put it back into
  977                 # the dict returned.
  978                 user_copy['password'] = user['password']
  979 
  980                 # fixtures.ROLES[2] is the _member_ role.
  981                 for project_id in projects:
  982                     PROVIDERS.assignment_api.add_role_to_user_and_project(
  983                         user_copy['id'], project_id, fixtures.ROLES[2]['id'])
  984 
  985                 # Use the ID from the fixture as the attribute name, so
  986                 # that our tests can easily reference each user dict, while
  987                 # the ID in the dict will be the real public ID.
  988                 attrname = 'user_%s' % user['name']
  989                 setattr(self, attrname, user_copy)
  990                 fixtures_to_cleanup.append(attrname)
  991 
  992             for role_assignment in fixtures.ROLE_ASSIGNMENTS:
  993                 role_id = role_assignment['role_id']
  994                 user = role_assignment['user']
  995                 project_id = role_assignment['project_id']
  996                 user_id = getattr(self, 'user_%s' % user)['id']
  997                 PROVIDERS.assignment_api.add_role_to_user_and_project(
  998                     user_id, project_id, role_id)
  999 
 1000             self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
 1001 
 1002     def assertCloseEnoughForGovernmentWork(self, a, b, delta=3):
 1003         """Assert that two datetimes are nearly equal within a small delta.
 1004 
 1005         :param delta: Maximum allowable time delta, defined in seconds.
 1006         """
 1007         if a == b:
 1008             # Short-circuit if the values are the same.
 1009             return
 1010 
 1011         msg = '%s != %s within %s delta' % (a, b, delta)
 1012 
 1013         self.assertLessEqual(abs(a - b).seconds, delta, msg)
 1014 
 1015     def assertTimestampEqual(self, expected, value):
 1016         # Compare two timestamps but ignore the microseconds part
 1017         # of the expected timestamp. Keystone does not track microseconds and
 1018         # is working to eliminate microseconds from it's datetimes used.
 1019         expected = timeutils.parse_isotime(expected).replace(microsecond=0)
 1020         value = timeutils.parse_isotime(value).replace(microsecond=0)
 1021         self.assertEqual(
 1022             expected,
 1023             value,
 1024             "%s != %s" % (expected, value))
 1025 
 1026     def assertNotEmpty(self, l):
 1027         self.assertGreater(len(l), 0)
 1028 
 1029     def assertUserDictEqual(self, expected, observed, message=''):
 1030         """Assert that a user dict is equal to another user dict.
 1031 
 1032         User dictionaries have some variable values that should be ignored in
 1033         the comparison. This method is a helper that strips those elements out
 1034         when comparing the user dictionary. This normalized these differences
 1035         that should not change the comparison.
 1036         """
 1037         # NOTE(notmorgan): An empty option list is the same as no options being
 1038         # specified in the user_ref. This removes options if it is empty in
 1039         # observed if options is not specified in the expected value.
 1040         if ('options' in observed and not observed['options'] and
 1041                 'options' not in expected):
 1042             observed = observed.copy()
 1043             del observed['options']
 1044 
 1045         self.assertDictEqual(expected, observed, message)
 1046 
 1047     @property
 1048     def ipv6_enabled(self):
 1049         if socket.has_ipv6:
 1050             sock = None
 1051             try:
 1052                 sock = socket.socket(socket.AF_INET6)
 1053                 # NOTE(Mouad): Try to bind to IPv6 loopback ip address.
 1054                 sock.bind(("::1", 0))
 1055                 return True
 1056             except socket.error:
 1057                 pass
 1058             finally:
 1059                 if sock:
 1060                     sock.close()
 1061         return False
 1062 
 1063     def skip_if_no_ipv6(self):
 1064         if not self.ipv6_enabled:
 1065             raise self.skipTest("IPv6 is not enabled in the system")
 1066 
 1067 
 1068 class SQLDriverOverrides(object):
 1069     """A mixin for consolidating sql-specific test overrides."""
 1070 
 1071     def config_overrides(self):
 1072         super(SQLDriverOverrides, self).config_overrides()
 1073         # SQL specific driver overrides
 1074         self.config_fixture.config(group='catalog', driver='sql')
 1075         self.config_fixture.config(group='identity', driver='sql')
 1076         self.config_fixture.config(group='policy', driver='sql')
 1077         self.config_fixture.config(group='trust', driver='sql')