"Fossies" - the Fresh Open Source Software Archive

Member "keystone-18.0.0/keystone/tests/unit/core.py" (14 Oct 2020, 36726 Bytes) of package /linux/misc/openstack/keystone-18.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "core.py": 17.0.0_vs_18.0.0.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 import atexit
   15 import base64
   16 import contextlib
   17 import datetime
   18 import functools
   19 import hashlib
   20 import json
   21 import ldap
   22 import os
   23 import shutil
   24 import socket
   25 import sys
   26 import uuid
   27 import warnings
   28 
   29 import fixtures
   30 import flask
   31 from flask import testing as flask_testing
   32 import http.client
   33 from oslo_config import fixture as config_fixture
   34 from oslo_context import context as oslo_context
   35 from oslo_context import fixture as oslo_ctx_fixture
   36 from oslo_log import fixture as log_fixture
   37 from oslo_log import log
   38 from oslo_utils import timeutils
   39 from sqlalchemy import exc
   40 import testtools
   41 from testtools import testcase
   42 
   43 import keystone.api
   44 from keystone.common import context
   45 from keystone.common import json_home
   46 from keystone.common import provider_api
   47 from keystone.common import sql
   48 import keystone.conf
   49 from keystone import exception
   50 from keystone.identity.backends.ldap import common as ks_ldap
   51 from keystone import notifications
   52 from keystone.resource.backends import base as resource_base
   53 from keystone.server.flask import application as flask_app
   54 from keystone.server.flask import core as keystone_flask
   55 from keystone.tests.unit import ksfixtures
   56 
   57 
   58 keystone.conf.configure()
   59 keystone.conf.set_config_defaults()
   60 
   61 PID = str(os.getpid())
   62 TESTSDIR = os.path.dirname(os.path.abspath(__file__))
   63 TESTCONF = os.path.join(TESTSDIR, 'config_files')
   64 ROOTDIR = os.path.normpath(os.path.join(TESTSDIR, '..', '..', '..'))
   65 VENDOR = os.path.join(ROOTDIR, 'vendor')
   66 ETCDIR = os.path.join(ROOTDIR, 'etc')
   67 
   68 
   69 def _calc_tmpdir():
   70     env_val = os.environ.get('KEYSTONE_TEST_TEMP_DIR')
   71     if not env_val:
   72         return os.path.join(TESTSDIR, 'tmp', PID)
   73     return os.path.join(env_val, PID)
   74 
   75 
   76 TMPDIR = _calc_tmpdir()
   77 
   78 CONF = keystone.conf.CONF
   79 PROVIDERS = provider_api.ProviderAPIs
   80 log.register_options(CONF)
   81 
   82 IN_MEM_DB_CONN_STRING = 'sqlite://'
   83 
   84 # Strictly matches ISO 8601 timestamps with subsecond precision like:
   85 # 2016-06-28T20:48:56.000000Z
   86 TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
   87 TIME_FORMAT_REGEX = r'^\d{4}-[0-1]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-5]\d\.\d{6}Z$'
   88 
   89 exception._FATAL_EXCEPTION_FORMAT_ERRORS = True
   90 os.makedirs(TMPDIR)
   91 atexit.register(shutil.rmtree, TMPDIR)
   92 
   93 
   94 class dirs(object):
   95     @staticmethod
   96     def root(*p):
   97         return os.path.join(ROOTDIR, *p)
   98 
   99     @staticmethod
  100     def etc(*p):
  101         return os.path.join(ETCDIR, *p)
  102 
  103     @staticmethod
  104     def tests(*p):
  105         return os.path.join(TESTSDIR, *p)
  106 
  107     @staticmethod
  108     def tmp(*p):
  109         return os.path.join(TMPDIR, *p)
  110 
  111     @staticmethod
  112     def tests_conf(*p):
  113         return os.path.join(TESTCONF, *p)
  114 
  115 
  116 @atexit.register
  117 def remove_test_databases():
  118     db = dirs.tmp('test.db')
  119     if os.path.exists(db):
  120         os.unlink(db)
  121     pristine = dirs.tmp('test.db.pristine')
  122     if os.path.exists(pristine):
  123         os.unlink(pristine)
  124 
  125 
  126 def skip_if_cache_disabled(*sections):
  127     """Skip a test if caching is disabled, this is a decorator.
  128 
  129     Caching can be disabled either globally or for a specific section.
  130 
  131     In the code fragment::
  132 
  133         @skip_if_cache_is_disabled('assignment', 'token')
  134         def test_method(*args):
  135             ...
  136 
  137     The method test_method would be skipped if caching is disabled globally via
  138     the `enabled` option in the `cache` section of the configuration or if
  139     the `caching` option is set to false in either `assignment` or `token`
  140     sections of the configuration.  This decorator can be used with no
  141     arguments to only check global caching.
  142 
  143     If a specified configuration section does not define the `caching` option,
  144     this decorator makes the caching enabled if `enabled` option in the `cache`
  145     section of the configuration is true.
  146 
  147     """
  148     def wrapper(f):
  149         @functools.wraps(f)
  150         def inner(*args, **kwargs):
  151             if not CONF.cache.enabled:
  152                 raise testcase.TestSkipped('Cache globally disabled.')
  153             for s in sections:
  154                 conf_sec = getattr(CONF, s, None)
  155                 if conf_sec is not None:
  156                     if not getattr(conf_sec, 'caching', True):
  157                         raise testcase.TestSkipped('%s caching disabled.' % s)
  158             return f(*args, **kwargs)
  159         return inner
  160     return wrapper
  161 
  162 
  163 def skip_if_cache_is_enabled(*sections):
  164     def wrapper(f):
  165         @functools.wraps(f)
  166         def inner(*args, **kwargs):
  167             if CONF.cache.enabled:
  168                 for s in sections:
  169                     conf_sec = getattr(CONF, s, None)
  170                     if conf_sec is not None:
  171                         if getattr(conf_sec, 'caching', True):
  172                             raise testcase.TestSkipped('%s caching enabled.' %
  173                                                        s)
  174             return f(*args, **kwargs)
  175         return inner
  176     return wrapper
  177 
  178 
  179 def skip_if_no_multiple_domains_support(f):
  180     """Decorator to skip tests for identity drivers limited to one domain."""
  181     @functools.wraps(f)
  182     def wrapper(*args, **kwargs):
  183         test_obj = args[0]
  184         if not test_obj.identity_api.multiple_domains_supported:
  185             raise testcase.TestSkipped('No multiple domains support')
  186         return f(*args, **kwargs)
  187     return wrapper
  188 
  189 
  190 class UnexpectedExit(Exception):
  191     pass
  192 
  193 
  194 def new_region_ref(parent_region_id=None, **kwargs):
  195     ref = {
  196         'id': uuid.uuid4().hex,
  197         'description': uuid.uuid4().hex,
  198         'parent_region_id': parent_region_id}
  199 
  200     ref.update(kwargs)
  201     return ref
  202 
  203 
  204 def new_service_ref(**kwargs):
  205     ref = {
  206         'id': uuid.uuid4().hex,
  207         'name': uuid.uuid4().hex,
  208         'description': uuid.uuid4().hex,
  209         'enabled': True,
  210         'type': uuid.uuid4().hex,
  211     }
  212     ref.update(kwargs)
  213     return ref
  214 
  215 
  216 NEEDS_REGION_ID = object()
  217 
  218 
  219 def new_endpoint_ref(service_id, interface='public',
  220                      region_id=NEEDS_REGION_ID, **kwargs):
  221 
  222     ref = {
  223         'id': uuid.uuid4().hex,
  224         'name': uuid.uuid4().hex,
  225         'description': uuid.uuid4().hex,
  226         'interface': interface,
  227         'service_id': service_id,
  228         'url': 'https://' + uuid.uuid4().hex + '.com',
  229     }
  230 
  231     if region_id is NEEDS_REGION_ID:
  232         ref['region_id'] = uuid.uuid4().hex
  233     elif region_id is None and kwargs.get('region') is not None:
  234         # pre-3.2 form endpoints are not supported by this function
  235         raise NotImplementedError("use new_endpoint_ref_with_region")
  236     else:
  237         ref['region_id'] = region_id
  238     ref.update(kwargs)
  239     return ref
  240 
  241 
  242 def new_endpoint_group_ref(filters, **kwargs):
  243     ref = {
  244         'id': uuid.uuid4().hex,
  245         'description': uuid.uuid4().hex,
  246         'filters': filters,
  247         'name': uuid.uuid4().hex
  248     }
  249     ref.update(kwargs)
  250     return ref
  251 
  252 
  253 def new_endpoint_ref_with_region(service_id, region, interface='public',
  254                                  **kwargs):
  255     """Define an endpoint_ref having a pre-3.2 form.
  256 
  257     Contains the deprecated 'region' instead of 'region_id'.
  258     """
  259     ref = new_endpoint_ref(service_id, interface, region=region,
  260                            region_id='invalid', **kwargs)
  261     del ref['region_id']
  262     return ref
  263 
  264 
  265 def new_domain_ref(**kwargs):
  266     ref = {
  267         'id': uuid.uuid4().hex,
  268         'name': uuid.uuid4().hex,
  269         'description': uuid.uuid4().hex,
  270         'enabled': True,
  271         'tags': [],
  272         'options': {}
  273     }
  274     ref.update(kwargs)
  275     return ref
  276 
  277 
  278 def new_project_ref(domain_id=None, is_domain=False, **kwargs):
  279     ref = {
  280         'id': uuid.uuid4().hex,
  281         'name': uuid.uuid4().hex,
  282         'description': uuid.uuid4().hex,
  283         'enabled': True,
  284         'domain_id': domain_id,
  285         'is_domain': is_domain,
  286         'tags': [],
  287         'options': {}
  288     }
  289     # NOTE(henry-nash): We don't include parent_id in the initial list above
  290     # since specifying it is optional depending on where the project sits in
  291     # the hierarchy (and a parent_id of None has meaning - i.e. it's a top
  292     # level project).
  293     ref.update(kwargs)
  294     return ref
  295 
  296 
  297 def new_user_ref(domain_id, project_id=None, **kwargs):
  298     ref = {
  299         'id': uuid.uuid4().hex,
  300         'name': uuid.uuid4().hex,
  301         'enabled': True,
  302         'domain_id': domain_id,
  303         'email': uuid.uuid4().hex,
  304         'password': uuid.uuid4().hex,
  305     }
  306     if project_id:
  307         ref['default_project_id'] = project_id
  308     ref.update(kwargs)
  309     return ref
  310 
  311 
  312 def new_federated_user_ref(idp_id=None, protocol_id=None, **kwargs):
  313     ref = {
  314         'idp_id': idp_id or 'ORG_IDP',
  315         'protocol_id': protocol_id or 'saml2',
  316         'unique_id': uuid.uuid4().hex,
  317         'display_name': uuid.uuid4().hex,
  318     }
  319     ref.update(kwargs)
  320     return ref
  321 
  322 
  323 def new_mapping_ref(mapping_id=None, rules=None, **kwargs):
  324     ref = {
  325         'id': mapping_id or uuid.uuid4().hex,
  326         'rules': rules or []
  327     }
  328     ref.update(kwargs)
  329     return ref
  330 
  331 
  332 def new_protocol_ref(protocol_id=None, idp_id=None, mapping_id=None, **kwargs):
  333     ref = {
  334         'id': protocol_id or 'saml2',
  335         'idp_id': idp_id or 'ORG_IDP',
  336         'mapping_id': mapping_id or uuid.uuid4().hex
  337     }
  338     ref.update(kwargs)
  339     return ref
  340 
  341 
  342 def new_identity_provider_ref(idp_id=None, **kwargs):
  343     ref = {
  344         'id': idp_id or 'ORG_IDP',
  345         'enabled': True,
  346         'description': '',
  347     }
  348     ref.update(kwargs)
  349     return ref
  350 
  351 
  352 def new_service_provider_ref(**kwargs):
  353     ref = {
  354         'auth_url': 'https://' + uuid.uuid4().hex + '.com',
  355         'enabled': True,
  356         'description': uuid.uuid4().hex,
  357         'sp_url': 'https://' + uuid.uuid4().hex + '.com',
  358         'relay_state_prefix': CONF.saml.relay_state_prefix
  359     }
  360     ref.update(kwargs)
  361     return ref
  362 
  363 
  364 def new_group_ref(domain_id, **kwargs):
  365     ref = {
  366         'id': uuid.uuid4().hex,
  367         'name': uuid.uuid4().hex,
  368         'description': uuid.uuid4().hex,
  369         'domain_id': domain_id
  370     }
  371     ref.update(kwargs)
  372     return ref
  373 
  374 
  375 def new_credential_ref(user_id, project_id=None, type='cert', **kwargs):
  376     ref = {
  377         'id': uuid.uuid4().hex,
  378         'user_id': user_id,
  379         'type': type,
  380     }
  381 
  382     if project_id:
  383         ref['project_id'] = project_id
  384     if 'blob' not in kwargs:
  385         ref['blob'] = uuid.uuid4().hex
  386 
  387     ref.update(kwargs)
  388     return ref
  389 
  390 
  391 def new_cert_credential(user_id, project_id=None, blob=None, **kwargs):
  392     if blob is None:
  393         blob = {'access': uuid.uuid4().hex, 'secret': uuid.uuid4().hex}
  394 
  395     credential = new_credential_ref(user_id=user_id,
  396                                     project_id=project_id,
  397                                     blob=json.dumps(blob),
  398                                     type='cert',
  399                                     **kwargs)
  400     return blob, credential
  401 
  402 
  403 def new_ec2_credential(user_id, project_id=None, blob=None, **kwargs):
  404     if blob is None:
  405         blob = {
  406             'access': uuid.uuid4().hex,
  407             'secret': uuid.uuid4().hex,
  408             'trust_id': None
  409         }
  410 
  411     if 'id' not in kwargs:
  412         access = blob['access'].encode('utf-8')
  413         kwargs['id'] = hashlib.sha256(access).hexdigest()
  414 
  415     credential = new_credential_ref(user_id=user_id,
  416                                     project_id=project_id,
  417                                     blob=json.dumps(blob),
  418                                     type='ec2',
  419                                     **kwargs)
  420     return blob, credential
  421 
  422 
  423 def new_totp_credential(user_id, project_id=None, blob=None):
  424     if not blob:
  425         # NOTE(notmorgan): 20 bytes of data from os.urandom for
  426         # a totp secret.
  427         blob = base64.b32encode(os.urandom(20)).decode('utf-8')
  428     credential = new_credential_ref(user_id=user_id,
  429                                     project_id=project_id,
  430                                     blob=blob,
  431                                     type='totp')
  432     return credential
  433 
  434 
  435 def new_application_credential_ref(roles=None,
  436                                    name=None,
  437                                    expires=None,
  438                                    secret=None):
  439     ref = {
  440         'id': uuid.uuid4().hex,
  441         'name': uuid.uuid4().hex,
  442         'description': uuid.uuid4().hex,
  443     }
  444     if roles:
  445         ref['roles'] = roles
  446     if secret:
  447         ref['secret'] = secret
  448 
  449     if isinstance(expires, str):
  450         ref['expires_at'] = expires
  451     elif isinstance(expires, dict):
  452         ref['expires_at'] = (
  453             timeutils.utcnow() + datetime.timedelta(**expires)
  454         ).strftime(TIME_FORMAT)
  455     elif expires is None:
  456         pass
  457     else:
  458         raise NotImplementedError('Unexpected value for "expires"')
  459 
  460     return ref
  461 
  462 
  463 def new_role_ref(**kwargs):
  464     ref = {
  465         'id': uuid.uuid4().hex,
  466         'name': uuid.uuid4().hex,
  467         'description': uuid.uuid4().hex,
  468         'domain_id': None,
  469         'options': {},
  470     }
  471     ref.update(kwargs)
  472     return ref
  473 
  474 
  475 def new_policy_ref(**kwargs):
  476     ref = {
  477         'id': uuid.uuid4().hex,
  478         'name': uuid.uuid4().hex,
  479         'description': uuid.uuid4().hex,
  480         'enabled': True,
  481         # Store serialized JSON data as the blob to mimic real world usage.
  482         'blob': json.dumps({'data': uuid.uuid4().hex, }),
  483         'type': uuid.uuid4().hex,
  484     }
  485 
  486     ref.update(kwargs)
  487     return ref
  488 
  489 
  490 def new_domain_config_ref(**kwargs):
  491     ref = {
  492         "identity": {
  493             "driver": "ldap"
  494         },
  495         "ldap": {
  496             "url": "ldap://myldap.com:389/",
  497             "user_tree_dn": "ou=Users,dc=my_new_root,dc=org"
  498         }
  499     }
  500     ref.update(kwargs)
  501     return ref
  502 
  503 
  504 def new_trust_ref(trustor_user_id, trustee_user_id, project_id=None,
  505                   impersonation=None, expires=None, role_ids=None,
  506                   role_names=None, remaining_uses=None,
  507                   allow_redelegation=False, redelegation_count=None, **kwargs):
  508     ref = {
  509         'id': uuid.uuid4().hex,
  510         'trustor_user_id': trustor_user_id,
  511         'trustee_user_id': trustee_user_id,
  512         'impersonation': impersonation or False,
  513         'project_id': project_id,
  514         'remaining_uses': remaining_uses,
  515         'allow_redelegation': allow_redelegation,
  516     }
  517 
  518     if isinstance(redelegation_count, int):
  519         ref.update(redelegation_count=redelegation_count)
  520 
  521     if isinstance(expires, str):
  522         ref['expires_at'] = expires
  523     elif isinstance(expires, dict):
  524         ref['expires_at'] = (
  525             timeutils.utcnow() + datetime.timedelta(**expires)
  526         ).strftime(TIME_FORMAT)
  527     elif expires is None:
  528         pass
  529     else:
  530         raise NotImplementedError('Unexpected value for "expires"')
  531 
  532     role_ids = role_ids or []
  533     role_names = role_names or []
  534     if role_ids or role_names:
  535         ref['roles'] = []
  536         for role_id in role_ids:
  537             ref['roles'].append({'id': role_id})
  538         for role_name in role_names:
  539             ref['roles'].append({'name': role_name})
  540 
  541     ref.update(kwargs)
  542     return ref
  543 
  544 
  545 def new_registered_limit_ref(**kwargs):
  546     ref = {
  547         'service_id': uuid.uuid4().hex,
  548         'resource_name': uuid.uuid4().hex,
  549         'default_limit': 10,
  550         'description': uuid.uuid4().hex
  551     }
  552 
  553     ref.update(kwargs)
  554     return ref
  555 
  556 
  557 def new_limit_ref(**kwargs):
  558     ref = {
  559         'service_id': uuid.uuid4().hex,
  560         'resource_name': uuid.uuid4().hex,
  561         'resource_limit': 10,
  562         'description': uuid.uuid4().hex
  563     }
  564 
  565     ref.update(kwargs)
  566     return ref
  567 
  568 
  569 def create_user(api, domain_id, **kwargs):
  570     """Create a user via the API. Keep the created password.
  571 
  572     The password is saved and restored when api.create_user() is called.
  573     Only use this routine if there is a requirement for the user object to
  574     have a valid password after api.create_user() is called.
  575     """
  576     user = new_user_ref(domain_id=domain_id, **kwargs)
  577     password = user['password']
  578     user = api.create_user(user)
  579     user['password'] = password
  580     return user
  581 
  582 
  583 def _assert_expected_status(f):
  584     """Add `expected_status_code` as an argument to the test_client methods.
  585 
  586     `expected_status_code` must be passed as a kwarg.
  587     """
  588     TEAPOT_HTTP_STATUS = 418
  589 
  590     _default_expected_responses = {
  591         'get': http.client.OK,
  592         'head': http.client.OK,
  593         'post': http.client.CREATED,
  594         'put': http.client.NO_CONTENT,
  595         'patch': http.client.OK,
  596         'delete': http.client.NO_CONTENT,
  597     }
  598 
  599     @functools.wraps(f)
  600     def inner(*args, **kwargs):
  601         # Get the "expected_status_code" kwarg if supplied. If not supplied use
  602         # the `_default_expected_response` mapping, or fall through to
  603         # "HTTP OK" if the method is somehow unknown.
  604         expected_status_code = kwargs.pop(
  605             'expected_status_code',
  606             _default_expected_responses.get(
  607                 f.__name__.lower(), http.client.OK))
  608         response = f(*args, **kwargs)
  609 
  610         # Logic to verify the response object is sane. Expand as needed
  611         if response.status_code == TEAPOT_HTTP_STATUS:
  612             # NOTE(morgan): We use 418 internally during tests to indicate
  613             # an un-routed HTTP call was made. This allows us to avoid
  614             # misinterpreting HTTP 404 from Flask and HTTP 404 from a
  615             # resource that is not found (e.g. USER NOT FOUND) programmatically
  616             raise AssertionError("I AM A TEAPOT(418): %s" % response.data)
  617 
  618         if response.status_code != expected_status_code:
  619             raise AssertionError(
  620                 'Expected HTTP Status does not match observed HTTP '
  621                 'Status: %(expected)s != %(observed)s (%(data)s)' % {
  622                     'expected': expected_status_code,
  623                     'observed': response.status_code,
  624                     'data': response.data})
  625 
  626         # return the original response object
  627         return response
  628     return inner
  629 
  630 
  631 class KeystoneFlaskTestClient(flask_testing.FlaskClient):
  632     """Subclass of flask.testing.FlaskClient implementing assertions.
  633 
  634     Implements custom "expected" HTTP Status assertion for
  635     GET/HEAD/PUT/PATCH/DELETE.
  636     """
  637 
  638     @_assert_expected_status
  639     def get(self, *args, **kwargs):
  640         return super(KeystoneFlaskTestClient, self).get(*args, **kwargs)
  641 
  642     @_assert_expected_status
  643     def head(self, *args, **kwargs):
  644         return super(KeystoneFlaskTestClient, self).head(*args, **kwargs)
  645 
  646     @_assert_expected_status
  647     def post(self, *args, **kwargs):
  648         return super(KeystoneFlaskTestClient, self).post(*args, **kwargs)
  649 
  650     @_assert_expected_status
  651     def patch(self, *args, **kwargs):
  652         return super(KeystoneFlaskTestClient, self).patch(*args, **kwargs)
  653 
  654     @_assert_expected_status
  655     def put(self, *args, **kwargs):
  656         return super(KeystoneFlaskTestClient, self).put(*args, **kwargs)
  657 
  658     @_assert_expected_status
  659     def delete(self, *args, **kwargs):
  660         return super(KeystoneFlaskTestClient, self).delete(*args, **kwargs)
  661 
  662 
  663 class BaseTestCase(testtools.TestCase):
  664     """Light weight base test class.
  665 
  666     This is a placeholder that will eventually go away once the
  667     setup/teardown in TestCase is properly trimmed down to the bare
  668     essentials. This is really just a play to speed up the tests by
  669     eliminating unnecessary work.
  670     """
  671 
  672     def setUp(self):
  673         super(BaseTestCase, self).setUp()
  674 
  675         self.useFixture(fixtures.NestedTempfile())
  676         self.useFixture(fixtures.TempHomeDir())
  677 
  678         self.useFixture(fixtures.MockPatchObject(sys, 'exit',
  679                                                  side_effect=UnexpectedExit))
  680         self.useFixture(log_fixture.get_logging_handle_error_fixture())
  681 
  682         warnings.filterwarnings('error', category=DeprecationWarning,
  683                                 module='^keystone\\.')
  684         warnings.simplefilter('error', exc.SAWarning)
  685         if hasattr(exc, "RemovedIn20Warning"):
  686             warnings.simplefilter('ignore', exc.RemovedIn20Warning)
  687 
  688         self.addCleanup(warnings.resetwarnings)
  689         # Ensure we have an empty threadlocal context at the start of each
  690         # test.
  691         self.assertIsNone(oslo_context.get_current())
  692         self.useFixture(oslo_ctx_fixture.ClearRequestContext())
  693 
  694         orig_debug_level = ldap.get_option(ldap.OPT_DEBUG_LEVEL)
  695         self.addCleanup(ldap.set_option, ldap.OPT_DEBUG_LEVEL,
  696                         orig_debug_level)
  697         orig_tls_cacertfile = ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)
  698         if orig_tls_cacertfile is None:
  699             orig_tls_cacertfile = ''
  700         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_CACERTFILE,
  701                         orig_tls_cacertfile)
  702         orig_tls_cacertdir = ldap.get_option(ldap.OPT_X_TLS_CACERTDIR)
  703         # Setting orig_tls_cacertdir to None is not allowed.
  704         if orig_tls_cacertdir is None:
  705             orig_tls_cacertdir = ''
  706         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_CACERTDIR,
  707                         orig_tls_cacertdir)
  708         orig_tls_require_cert = ldap.get_option(ldap.OPT_X_TLS_REQUIRE_CERT)
  709         self.addCleanup(ldap.set_option, ldap.OPT_X_TLS_REQUIRE_CERT,
  710                         orig_tls_require_cert)
  711         self.addCleanup(ks_ldap.PooledLDAPHandler.connection_pools.clear)
  712 
  713     def cleanup_instance(self, *names):
  714         """Create a function suitable for use with self.addCleanup.
  715 
  716         :returns: a callable that uses a closure to delete instance attributes
  717 
  718         """
  719         def cleanup():
  720             for name in names:
  721                 # TODO(dstanek): remove this 'if' statement once
  722                 # load_backend in test_backend_ldap is only called once
  723                 # per test
  724                 if hasattr(self, name):
  725                     delattr(self, name)
  726         return cleanup
  727 
  728     def skip_if_env_not_set(self, env_var):
  729         if not os.environ.get(env_var):
  730             self.skipTest('Env variable %s is not set.' % env_var)
  731 
  732     def skip_test_overrides(self, *args, **kwargs):
  733         if self._check_for_method_in_parents(self._testMethodName):
  734             return super(BaseTestCase, self).skipTest(*args, **kwargs)
  735         raise Exception('%r is not a previously defined test method'
  736                         % self._testMethodName)
  737 
  738     def _check_for_method_in_parents(self, name):
  739         # skip first to get to parents
  740         for cls in self.__class__.__mro__[1:]:
  741             if hasattr(cls, name):
  742                 return True
  743         return False
  744 
  745     def loadapp(self, name='public'):
  746         app = flask_app.application_factory(name)
  747         app.testing = True
  748         app.test_client_class = KeystoneFlaskTestClient
  749 
  750         # NOTE(morgan): any unexpected 404s, not handled by the routed apis,
  751         # is a hard error and should not pass testing.
  752         def page_not_found_teapot(e):
  753             content = (
  754                 'TEST PROGRAMMING ERROR - Reached a 404 from an unrouted (`%s`'
  755                 ') path. Be sure the test is requesting the right resource '
  756                 'and that all blueprints are registered with the flask app.' %
  757                 flask.request.url)
  758             return content, 418
  759 
  760         app.register_error_handler(404, page_not_found_teapot)
  761 
  762         self.test_client = app.test_client
  763         self.test_request_context = app.test_request_context
  764         self.cleanup_instance('test_request_context')
  765         self.cleanup_instance('test_client')
  766         return keystone_flask.setup_app_middleware(app)
  767 
  768 
  769 class TestCase(BaseTestCase):
  770 
  771     def config_files(self):
  772         return []
  773 
  774     def _policy_fixture(self):
  775         return ksfixtures.Policy(self.config_fixture)
  776 
  777     @contextlib.contextmanager
  778     def make_request(self, path='/', **kwargs):
  779         # standup a fake app and request context with a passed in/known
  780         # environment.
  781 
  782         is_admin = kwargs.pop('is_admin', False)
  783         environ = kwargs.setdefault('environ', {})
  784         query_string = kwargs.pop('query_string', None)
  785         if query_string:
  786             # Make sure query string is properly added to the context
  787             path = '{path}?{qs}'.format(path=path, qs=query_string)
  788 
  789         if not environ.get(context.REQUEST_CONTEXT_ENV):
  790             environ[context.REQUEST_CONTEXT_ENV] = context.RequestContext(
  791                 is_admin=is_admin,
  792                 authenticated=kwargs.pop('authenticated', True))
  793 
  794         # Create a dummy flask app to work with
  795         app = flask.Flask(__name__)
  796         with app.test_request_context(path=path, environ_overrides=environ):
  797             yield
  798 
  799     def config_overrides(self):
  800         # NOTE(morganfainberg): enforce config_overrides can only ever be
  801         # called a single time.
  802         assert self.__config_overrides_called is False
  803         self.__config_overrides_called = True
  804 
  805         signing_certfile = 'examples/pki/certs/signing_cert.pem'
  806         signing_keyfile = 'examples/pki/private/signing_key.pem'
  807 
  808         self.useFixture(self._policy_fixture())
  809 
  810         self.config_fixture.config(
  811             # TODO(morganfainberg): Make Cache Testing a separate test case
  812             # in tempest, and move it out of the base unit tests.
  813             group='cache',
  814             backend='dogpile.cache.memory',
  815             enabled=True,
  816             proxies=['oslo_cache.testing.CacheIsolatingProxy'])
  817         self.config_fixture.config(
  818             group='catalog',
  819             driver='sql',
  820             template_file=dirs.tests('default_catalog.templates'))
  821         self.config_fixture.config(
  822             group='saml', certfile=signing_certfile, keyfile=signing_keyfile)
  823         self.config_fixture.config(
  824             default_log_levels=[
  825                 'amqp=WARN',
  826                 'amqplib=WARN',
  827                 'boto=WARN',
  828                 'qpid=WARN',
  829                 'sqlalchemy=WARN',
  830                 'suds=INFO',
  831                 'oslo.messaging=INFO',
  832                 'iso8601=WARN',
  833                 'requests.packages.urllib3.connectionpool=WARN',
  834                 'routes.middleware=INFO',
  835                 'stevedore.extension=INFO',
  836                 'keystone.notifications=INFO',
  837                 'keystone.identity.backends.ldap.common=INFO',
  838             ])
  839         # NOTE(notmorgan): Set password rounds low here to ensure speedy
  840         # tests. This is explicitly set because the tests here are not testing
  841         # the integrity of the password hashing, just that the correct form
  842         # of hashing has been used. Note that 4 is the lowest for bcrypt
  843         # allowed in the `[identity] password_hash_rounds` setting
  844         self.config_fixture.config(group='identity', password_hash_rounds=4)
  845 
  846         self.useFixture(
  847             ksfixtures.KeyRepository(
  848                 self.config_fixture,
  849                 'fernet_tokens',
  850                 CONF.fernet_tokens.max_active_keys
  851             )
  852         )
  853 
  854         self.useFixture(
  855             ksfixtures.KeyRepository(
  856                 self.config_fixture,
  857                 'fernet_receipts',
  858                 CONF.fernet_receipts.max_active_keys
  859             )
  860         )
  861 
  862     def _assert_config_overrides_called(self):
  863         assert self.__config_overrides_called is True
  864 
  865     def setUp(self):
  866         super(TestCase, self).setUp()
  867         self.__config_overrides_called = False
  868         self.__load_backends_called = False
  869         self.config_fixture = self.useFixture(config_fixture.Config(CONF))
  870         self.addCleanup(delattr, self, 'config_fixture')
  871         self.config(self.config_files())
  872 
  873         # NOTE(morganfainberg): mock the auth plugin setup to use the config
  874         # fixture which automatically unregisters options when performing
  875         # cleanup.
  876         def mocked_register_auth_plugin_opt(conf, opt):
  877             self.config_fixture.register_opt(opt, group='auth')
  878         self.useFixture(fixtures.MockPatchObject(
  879             keystone.conf.auth, '_register_auth_plugin_opt',
  880             new=mocked_register_auth_plugin_opt))
  881 
  882         self.config_overrides()
  883         # explicitly load auth configuration
  884         keystone.conf.auth.setup_authentication()
  885         # NOTE(morganfainberg): ensure config_overrides has been called.
  886         self.addCleanup(self._assert_config_overrides_called)
  887 
  888         self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
  889 
  890         # NOTE(morganfainberg): This code is a copy from the oslo-incubator
  891         # log module. This is not in a function or otherwise available to use
  892         # without having a CONF object to setup logging. This should help to
  893         # reduce the log size by limiting what we log (similar to how Keystone
  894         # would run under mod_wsgi).
  895         for pair in CONF.default_log_levels:
  896             mod, _sep, level_name = pair.partition('=')
  897             logger = log.getLogger(mod)
  898             logger.logger.setLevel(level_name)
  899 
  900         self.useFixture(ksfixtures.Cache())
  901 
  902         # Clear the registry of providers so that providers from previous
  903         # tests aren't used.
  904         self.addCleanup(provider_api.ProviderAPIs._clear_registry_instances)
  905 
  906         # Clear the registry of JSON Home Resources
  907         self.addCleanup(json_home.JsonHomeResources._reset)
  908 
  909         # Ensure Notification subscriptions and resource types are empty
  910         self.addCleanup(notifications.clear_subscribers)
  911         self.addCleanup(notifications.reset_notifier)
  912 
  913     def config(self, config_files):
  914         sql.initialize()
  915         CONF(args=[], project='keystone', default_config_files=config_files)
  916 
  917     def load_backends(self):
  918         """Initialize each manager and assigns them to an attribute."""
  919         # TODO(morgan): Ensure our tests only ever call load_backends
  920         # a single time via this method. for now just clear the registry
  921         # if we are reloading.
  922         provider_api.ProviderAPIs._clear_registry_instances()
  923         self.useFixture(ksfixtures.BackendLoader(self))
  924 
  925     def load_fixtures(self, fixtures):
  926         """Hacky basic and naive fixture loading based on a python module.
  927 
  928         Expects that the various APIs into the various services are already
  929         defined on `self`.
  930 
  931         """
  932         # NOTE(dstanek): create a list of attribute names to be removed
  933         # from this instance during cleanup
  934         fixtures_to_cleanup = []
  935 
  936         # TODO(termie): doing something from json, probably based on Django's
  937         #               loaddata will be much preferred.
  938         if (hasattr(self, 'identity_api') and
  939             hasattr(self, 'assignment_api') and
  940                 hasattr(self, 'resource_api')):
  941             try:
  942                 PROVIDERS.resource_api.create_domain(
  943                     resource_base.NULL_DOMAIN_ID, fixtures.ROOT_DOMAIN)
  944             except exception.Conflict:
  945                 # the root domain already exists, skip now.
  946                 pass
  947             for domain in fixtures.DOMAINS:
  948                 rv = PROVIDERS.resource_api.create_domain(domain['id'], domain)
  949                 attrname = 'domain_%s' % domain['id']
  950                 setattr(self, attrname, rv)
  951                 fixtures_to_cleanup.append(attrname)
  952 
  953             for project in fixtures.PROJECTS:
  954                 project_attr_name = 'project_%s' % project['name'].lower()
  955                 rv = PROVIDERS.resource_api.create_project(
  956                     project['id'], project)
  957                 setattr(self, project_attr_name, rv)
  958                 fixtures_to_cleanup.append(project_attr_name)
  959 
  960             for role in fixtures.ROLES:
  961                 rv = PROVIDERS.role_api.create_role(role['id'], role)
  962                 attrname = 'role_%s' % role['name']
  963                 setattr(self, attrname, rv)
  964                 fixtures_to_cleanup.append(attrname)
  965 
  966             for user in fixtures.USERS:
  967                 user_copy = user.copy()
  968                 projects = user_copy.pop('projects')
  969 
  970                 # For users, the manager layer will generate the ID
  971                 user_copy = PROVIDERS.identity_api.create_user(user_copy)
  972                 # Our tests expect that the password is still in the user
  973                 # record so that they can reference it, so put it back into
  974                 # the dict returned.
  975                 user_copy['password'] = user['password']
  976 
  977                 # fixtures.ROLES[2] is the _member_ role.
  978                 for project_id in projects:
  979                     PROVIDERS.assignment_api.add_role_to_user_and_project(
  980                         user_copy['id'], project_id, fixtures.ROLES[2]['id'])
  981 
  982                 # Use the ID from the fixture as the attribute name, so
  983                 # that our tests can easily reference each user dict, while
  984                 # the ID in the dict will be the real public ID.
  985                 attrname = 'user_%s' % user['name']
  986                 setattr(self, attrname, user_copy)
  987                 fixtures_to_cleanup.append(attrname)
  988 
  989             for role_assignment in fixtures.ROLE_ASSIGNMENTS:
  990                 role_id = role_assignment['role_id']
  991                 user = role_assignment['user']
  992                 project_id = role_assignment['project_id']
  993                 user_id = getattr(self, 'user_%s' % user)['id']
  994                 PROVIDERS.assignment_api.add_role_to_user_and_project(
  995                     user_id, project_id, role_id)
  996 
  997             self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
  998 
  999     def assertCloseEnoughForGovernmentWork(self, a, b, delta=3):
 1000         """Assert that two datetimes are nearly equal within a small delta.
 1001 
 1002         :param delta: Maximum allowable time delta, defined in seconds.
 1003         """
 1004         if a == b:
 1005             # Short-circuit if the values are the same.
 1006             return
 1007 
 1008         msg = '%s != %s within %s delta' % (a, b, delta)
 1009 
 1010         self.assertLessEqual(abs(a - b).seconds, delta, msg)
 1011 
 1012     def assertTimestampEqual(self, expected, value):
 1013         # Compare two timestamps but ignore the microseconds part
 1014         # of the expected timestamp. Keystone does not track microseconds and
 1015         # is working to eliminate microseconds from it's datetimes used.
 1016         expected = timeutils.parse_isotime(expected).replace(microsecond=0)
 1017         value = timeutils.parse_isotime(value).replace(microsecond=0)
 1018         self.assertEqual(
 1019             expected,
 1020             value,
 1021             "%s != %s" % (expected, value))
 1022 
 1023     def assertNotEmpty(self, l):
 1024         self.assertGreater(len(l), 0)
 1025 
 1026     def assertUserDictEqual(self, expected, observed, message=''):
 1027         """Assert that a user dict is equal to another user dict.
 1028 
 1029         User dictionaries have some variable values that should be ignored in
 1030         the comparison. This method is a helper that strips those elements out
 1031         when comparing the user dictionary. This normalized these differences
 1032         that should not change the comparison.
 1033         """
 1034         # NOTE(notmorgan): An empty option list is the same as no options being
 1035         # specified in the user_ref. This removes options if it is empty in
 1036         # observed if options is not specified in the expected value.
 1037         if ('options' in observed and not observed['options'] and
 1038                 'options' not in expected):
 1039             observed = observed.copy()
 1040             del observed['options']
 1041 
 1042         self.assertDictEqual(expected, observed, message)
 1043 
 1044     @property
 1045     def ipv6_enabled(self):
 1046         if socket.has_ipv6:
 1047             sock = None
 1048             try:
 1049                 sock = socket.socket(socket.AF_INET6)
 1050                 # NOTE(Mouad): Try to bind to IPv6 loopback ip address.
 1051                 sock.bind(("::1", 0))
 1052                 return True
 1053             except socket.error:
 1054                 pass
 1055             finally:
 1056                 if sock:
 1057                     sock.close()
 1058         return False
 1059 
 1060     def skip_if_no_ipv6(self):
 1061         if not self.ipv6_enabled:
 1062             raise self.skipTest("IPv6 is not enabled in the system")
 1063 
 1064 
 1065 class SQLDriverOverrides(object):
 1066     """A mixin for consolidating sql-specific test overrides."""
 1067 
 1068     def config_overrides(self):
 1069         super(SQLDriverOverrides, self).config_overrides()
 1070         # SQL specific driver overrides
 1071         self.config_fixture.config(group='catalog', driver='sql')
 1072         self.config_fixture.config(group='identity', driver='sql')
 1073         self.config_fixture.config(group='policy', driver='sql')
 1074         self.config_fixture.config(group='trust', driver='sql')