"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/test_cli.py" (13 May 2020, 87969 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_cli.py": 16.0.1_vs_17.0.0.

    1 # Copyright 2014 IBM Corp.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import copy
   16 import datetime
   17 import logging
   18 import os
   19 from unittest import mock
   20 import uuid
   21 
   22 import argparse
   23 import configparser
   24 import fixtures
   25 import freezegun
   26 import http.client
   27 import oslo_config.fixture
   28 from oslo_db.sqlalchemy import migration
   29 from oslo_log import log
   30 from oslo_serialization import jsonutils
   31 from oslo_upgradecheck import upgradecheck
   32 from testtools import matchers
   33 
   34 from keystone.cmd import cli
   35 from keystone.cmd.doctor import caching
   36 from keystone.cmd.doctor import credential
   37 from keystone.cmd.doctor import database as doc_database
   38 from keystone.cmd.doctor import debug
   39 from keystone.cmd.doctor import federation
   40 from keystone.cmd.doctor import ldap
   41 from keystone.cmd.doctor import security_compliance
   42 from keystone.cmd.doctor import tokens
   43 from keystone.cmd.doctor import tokens_fernet
   44 from keystone.cmd import status
   45 from keystone.common import provider_api
   46 from keystone.common.sql import upgrades
   47 import keystone.conf
   48 from keystone import exception
   49 from keystone.i18n import _
   50 from keystone.identity.mapping_backends import mapping as identity_mapping
   51 from keystone.tests import unit
   52 from keystone.tests.unit import default_fixtures
   53 from keystone.tests.unit.ksfixtures import database
   54 from keystone.tests.unit.ksfixtures import ldapdb
   55 from keystone.tests.unit.ksfixtures import policy
   56 from keystone.tests.unit.ksfixtures import temporaryfile
   57 from keystone.tests.unit import mapping_fixtures
   58 
   59 
   60 CONF = keystone.conf.CONF
   61 PROVIDERS = provider_api.ProviderAPIs
   62 
   63 
   64 class CliLoggingTestCase(unit.BaseTestCase):
   65 
   66     def setUp(self):
   67         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
   68         self.config_fixture.register_cli_opt(cli.command_opt)
   69         self.useFixture(fixtures.MockPatch(
   70             'oslo_config.cfg.find_config_files', return_value=[]))
   71         fd = self.useFixture(temporaryfile.SecureTempFile())
   72         self.fake_config_file = fd.file_name
   73         super(CliLoggingTestCase, self).setUp()
   74 
   75         # NOTE(crinkle): the command call doesn't have to actually work,
   76         # that's what the other unit tests are for. So just mock it out.
   77         class FakeConfCommand(object):
   78             def __init__(self):
   79                 self.cmd_class = mock.Mock()
   80         self.useFixture(fixtures.MockPatchObject(
   81             CONF, 'command', FakeConfCommand()))
   82 
   83         self.logging = self.useFixture(fixtures.FakeLogger(level=log.WARN))
   84 
   85     def test_absent_config_logs_warning(self):
   86         expected_msg = 'Config file not found, using default configs.'
   87         cli.main(argv=['keystone-manage', 'db_sync'])
   88         self.assertThat(self.logging.output, matchers.Contains(expected_msg))
   89 
   90     def test_present_config_does_not_log_warning(self):
   91         fake_argv = [
   92             'keystone-manage', '--config-file', self.fake_config_file, 'doctor'
   93         ]
   94         cli.main(argv=fake_argv)
   95         expected_msg = 'Config file not found, using default configs.'
   96         self.assertNotIn(expected_msg, self.logging.output)
   97 
   98 
   99 class CliBootStrapTestCase(unit.SQLDriverOverrides, unit.TestCase):
  100 
  101     def setUp(self):
  102         self.useFixture(database.Database())
  103         super(CliBootStrapTestCase, self).setUp()
  104         self.bootstrap = cli.BootStrap()
  105 
  106     def config_files(self):
  107         self.config_fixture.register_cli_opt(cli.command_opt)
  108         config_files = super(CliBootStrapTestCase, self).config_files()
  109         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
  110         return config_files
  111 
  112     def config(self, config_files):
  113         CONF(args=['bootstrap', '--bootstrap-password', uuid.uuid4().hex],
  114              project='keystone',
  115              default_config_files=config_files)
  116 
  117     def test_bootstrap(self):
  118         self._do_test_bootstrap(self.bootstrap)
  119 
  120     def _do_test_bootstrap(self, bootstrap):
  121         try:
  122             PROVIDERS.resource_api.create_domain(
  123                 default_fixtures.ROOT_DOMAIN['id'],
  124                 default_fixtures.ROOT_DOMAIN)
  125         except exception.Conflict:
  126             pass
  127 
  128         bootstrap.do_bootstrap()
  129         project = PROVIDERS.resource_api.get_project_by_name(
  130             bootstrap.project_name,
  131             'default')
  132         user = PROVIDERS.identity_api.get_user_by_name(
  133             bootstrap.username,
  134             'default')
  135         admin_role = PROVIDERS.role_api.get_role(bootstrap.role_id)
  136         reader_role = PROVIDERS.role_api.get_role(bootstrap.reader_role_id)
  137         member_role = PROVIDERS.role_api.get_role(bootstrap.member_role_id)
  138         role_list = (
  139             PROVIDERS.assignment_api.get_roles_for_user_and_project(
  140                 user['id'],
  141                 project['id']))
  142         self.assertIs(3, len(role_list))
  143         self.assertIn(admin_role['id'], role_list)
  144         self.assertIn(reader_role['id'], role_list)
  145         self.assertIn(member_role['id'], role_list)
  146         system_roles = (
  147             PROVIDERS.assignment_api.list_system_grants_for_user(
  148                 user['id']
  149             )
  150         )
  151         self.assertIs(1, len(system_roles))
  152         self.assertEqual(system_roles[0]['id'], admin_role['id'])
  153         # NOTE(morganfainberg): Pass an empty context, it isn't used by
  154         # `authenticate` method.
  155         with self.make_request():
  156             PROVIDERS.identity_api.authenticate(
  157                 user['id'],
  158                 bootstrap.password)
  159 
  160         if bootstrap.region_id:
  161             region = PROVIDERS.catalog_api.get_region(bootstrap.region_id)
  162             self.assertEqual(self.region_id, region['id'])
  163 
  164         if bootstrap.service_id:
  165             svc = PROVIDERS.catalog_api.get_service(bootstrap.service_id)
  166             self.assertEqual(self.service_name, svc['name'])
  167 
  168             self.assertEqual(set(['admin', 'public', 'internal']),
  169                              set(bootstrap.endpoints))
  170 
  171             urls = {'public': self.public_url,
  172                     'internal': self.internal_url,
  173                     'admin': self.admin_url}
  174 
  175             for interface, url in urls.items():
  176                 endpoint_id = bootstrap.endpoints[interface]
  177                 endpoint = PROVIDERS.catalog_api.get_endpoint(endpoint_id)
  178 
  179                 self.assertEqual(self.region_id, endpoint['region_id'])
  180                 self.assertEqual(url, endpoint['url'])
  181                 self.assertEqual(svc['id'], endpoint['service_id'])
  182                 self.assertEqual(interface, endpoint['interface'])
  183 
  184     def test_bootstrap_is_idempotent_when_password_does_not_change(self):
  185         # NOTE(morganfainberg): Ensure we can run bootstrap with the same
  186         # configuration multiple times without erroring.
  187         self._do_test_bootstrap(self.bootstrap)
  188         app = self.loadapp()
  189         v3_password_data = {
  190             'auth': {
  191                 'identity': {
  192                     "methods": ["password"],
  193                     "password": {
  194                         "user": {
  195                             "name": self.bootstrap.username,
  196                             "password": self.bootstrap.password,
  197                             "domain": {
  198                                 "id": CONF.identity.default_domain_id
  199                             }
  200                         }
  201                     }
  202                 }
  203             }
  204         }
  205         with app.test_client() as c:
  206             auth_response = c.post('/v3/auth/tokens',
  207                                    json=v3_password_data)
  208             token = auth_response.headers['X-Subject-Token']
  209         self._do_test_bootstrap(self.bootstrap)
  210         # build validation request
  211         with app.test_client() as c:
  212             # Get a new X-Auth-Token
  213             r = c.post(
  214                 '/v3/auth/tokens',
  215                 json=v3_password_data)
  216 
  217             # Validate the old token with our new X-Auth-Token.
  218             c.get('/v3/auth/tokens',
  219                   headers={'X-Auth-Token': r.headers['X-Subject-Token'],
  220                            'X-Subject-Token': token})
  221         admin_role = PROVIDERS.role_api.get_role(self.bootstrap.role_id)
  222         reader_role = PROVIDERS.role_api.get_role(
  223             self.bootstrap.reader_role_id)
  224         member_role = PROVIDERS.role_api.get_role(
  225             self.bootstrap.member_role_id)
  226         self.assertEqual(admin_role['options'], {'immutable': True})
  227         self.assertEqual(member_role['options'], {'immutable': True})
  228         self.assertEqual(reader_role['options'], {'immutable': True})
  229 
  230     def test_bootstrap_is_not_idempotent_when_password_does_change(self):
  231         # NOTE(lbragstad): Ensure bootstrap isn't idempotent when run with
  232         # different arguments or configuration values.
  233         self._do_test_bootstrap(self.bootstrap)
  234         app = self.loadapp()
  235         v3_password_data = {
  236             'auth': {
  237                 'identity': {
  238                     "methods": ["password"],
  239                     "password": {
  240                         "user": {
  241                             "name": self.bootstrap.username,
  242                             "password": self.bootstrap.password,
  243                             "domain": {
  244                                 "id": CONF.identity.default_domain_id
  245                             }
  246                         }
  247                     }
  248                 }
  249             }
  250         }
  251         time = datetime.datetime.utcnow()
  252         with freezegun.freeze_time(time) as frozen_time:
  253             with app.test_client() as c:
  254                 auth_response = c.post('/v3/auth/tokens',
  255                                        json=v3_password_data)
  256                 token = auth_response.headers['X-Subject-Token']
  257             new_passwd = uuid.uuid4().hex
  258             os.environ['OS_BOOTSTRAP_PASSWORD'] = new_passwd
  259             self._do_test_bootstrap(self.bootstrap)
  260             v3_password_data['auth']['identity']['password']['user'][
  261                 'password'] = new_passwd
  262             # Move time forward a second to avoid rev. event capturing the new
  263             # auth-token since we're within a single second (possibly) for the
  264             # test case.
  265             frozen_time.tick(delta=datetime.timedelta(seconds=1))
  266             # Validate the old token
  267             with app.test_client() as c:
  268                 # Get a new X-Auth-Token
  269                 r = c.post('/v3/auth/tokens', json=v3_password_data)
  270                 # Since the user account was recovered with a different
  271                 # password, we shouldn't be able to validate this token.
  272                 # Bootstrap should have persisted a revocation event because
  273                 # the user's password was updated. Since this token was
  274                 # obtained using the original password, it should now be
  275                 # invalid.
  276                 c.get('/v3/auth/tokens',
  277                       headers={'X-Auth-Token': r.headers['X-Subject-Token'],
  278                                'X-Subject-Token': token},
  279                       expected_status_code=http.client.NOT_FOUND)
  280 
  281     def test_bootstrap_recovers_user(self):
  282         self._do_test_bootstrap(self.bootstrap)
  283 
  284         # Completely lock the user out.
  285         user_id = PROVIDERS.identity_api.get_user_by_name(
  286             self.bootstrap.username,
  287             'default')['id']
  288         PROVIDERS.identity_api.update_user(
  289             user_id,
  290             {'enabled': False,
  291              'password': uuid.uuid4().hex})
  292 
  293         # The second bootstrap run will recover the account.
  294         self._do_test_bootstrap(self.bootstrap)
  295 
  296         # Sanity check that the original password works again.
  297         with self.make_request():
  298             PROVIDERS.identity_api.authenticate(
  299                 user_id,
  300                 self.bootstrap.password)
  301 
  302     def test_bootstrap_with_explicit_immutable_roles(self):
  303         CONF(args=['bootstrap',
  304                    '--bootstrap-password', uuid.uuid4().hex,
  305                    '--immutable-roles'],
  306              project='keystone')
  307         self._do_test_bootstrap(self.bootstrap)
  308         admin_role = PROVIDERS.role_api.get_role(self.bootstrap.role_id)
  309         reader_role = PROVIDERS.role_api.get_role(
  310             self.bootstrap.reader_role_id)
  311         member_role = PROVIDERS.role_api.get_role(
  312             self.bootstrap.member_role_id)
  313         self.assertTrue(admin_role['options']['immutable'])
  314         self.assertTrue(member_role['options']['immutable'])
  315         self.assertTrue(reader_role['options']['immutable'])
  316 
  317     def test_bootstrap_with_default_immutable_roles(self):
  318         CONF(args=['bootstrap',
  319                    '--bootstrap-password', uuid.uuid4().hex],
  320              project='keystone')
  321         self._do_test_bootstrap(self.bootstrap)
  322         admin_role = PROVIDERS.role_api.get_role(self.bootstrap.role_id)
  323         reader_role = PROVIDERS.role_api.get_role(
  324             self.bootstrap.reader_role_id)
  325         member_role = PROVIDERS.role_api.get_role(
  326             self.bootstrap.member_role_id)
  327         self.assertTrue(admin_role['options']['immutable'])
  328         self.assertTrue(member_role['options']['immutable'])
  329         self.assertTrue(reader_role['options']['immutable'])
  330 
  331     def test_bootstrap_with_no_immutable_roles(self):
  332         CONF(args=['bootstrap',
  333                    '--bootstrap-password', uuid.uuid4().hex,
  334                    '--no-immutable-roles'],
  335              project='keystone')
  336         self._do_test_bootstrap(self.bootstrap)
  337         admin_role = PROVIDERS.role_api.get_role(self.bootstrap.role_id)
  338         reader_role = PROVIDERS.role_api.get_role(
  339             self.bootstrap.reader_role_id)
  340         member_role = PROVIDERS.role_api.get_role(
  341             self.bootstrap.member_role_id)
  342         self.assertNotIn('immutable', admin_role['options'])
  343         self.assertNotIn('immutable', member_role['options'])
  344         self.assertNotIn('immutable', reader_role['options'])
  345 
  346     def test_bootstrap_with_ambiguous_role_names(self):
  347         # bootstrap system to create the default admin role
  348         self._do_test_bootstrap(self.bootstrap)
  349 
  350         # create a domain-specific roles that share the same names as the
  351         # default roles created by keystone-manage bootstrap
  352         domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
  353         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
  354         domain_roles = {}
  355 
  356         for name in ['admin', 'member', 'reader']:
  357             domain_role = {
  358                 'domain_id': domain['id'],
  359                 'id': uuid.uuid4().hex,
  360                 'name': name
  361             }
  362             domain_roles[name] = PROVIDERS.role_api.create_role(
  363                 domain_role['id'], domain_role
  364             )
  365 
  366             # ensure subsequent bootstrap attempts don't fail because of
  367             # ambiguity
  368             self._do_test_bootstrap(self.bootstrap)
  369 
  370 
  371 class CliBootStrapTestCaseWithEnvironment(CliBootStrapTestCase):
  372 
  373     def config(self, config_files):
  374         CONF(args=['bootstrap'], project='keystone',
  375              default_config_files=config_files)
  376 
  377     def setUp(self):
  378         super(CliBootStrapTestCaseWithEnvironment, self).setUp()
  379         self.password = uuid.uuid4().hex
  380         self.username = uuid.uuid4().hex
  381         self.project_name = uuid.uuid4().hex
  382         self.role_name = uuid.uuid4().hex
  383         self.service_name = uuid.uuid4().hex
  384         self.public_url = uuid.uuid4().hex
  385         self.internal_url = uuid.uuid4().hex
  386         self.admin_url = uuid.uuid4().hex
  387         self.region_id = uuid.uuid4().hex
  388         self.default_domain = {
  389             'id': CONF.identity.default_domain_id,
  390             'name': 'Default',
  391         }
  392         self.useFixture(
  393             fixtures.EnvironmentVariable('OS_BOOTSTRAP_PASSWORD',
  394                                          newvalue=self.password))
  395         self.useFixture(
  396             fixtures.EnvironmentVariable('OS_BOOTSTRAP_USERNAME',
  397                                          newvalue=self.username))
  398         self.useFixture(
  399             fixtures.EnvironmentVariable('OS_BOOTSTRAP_PROJECT_NAME',
  400                                          newvalue=self.project_name))
  401         self.useFixture(
  402             fixtures.EnvironmentVariable('OS_BOOTSTRAP_ROLE_NAME',
  403                                          newvalue=self.role_name))
  404         self.useFixture(
  405             fixtures.EnvironmentVariable('OS_BOOTSTRAP_SERVICE_NAME',
  406                                          newvalue=self.service_name))
  407         self.useFixture(
  408             fixtures.EnvironmentVariable('OS_BOOTSTRAP_PUBLIC_URL',
  409                                          newvalue=self.public_url))
  410         self.useFixture(
  411             fixtures.EnvironmentVariable('OS_BOOTSTRAP_INTERNAL_URL',
  412                                          newvalue=self.internal_url))
  413         self.useFixture(
  414             fixtures.EnvironmentVariable('OS_BOOTSTRAP_ADMIN_URL',
  415                                          newvalue=self.admin_url))
  416         self.useFixture(
  417             fixtures.EnvironmentVariable('OS_BOOTSTRAP_REGION_ID',
  418                                          newvalue=self.region_id))
  419 
  420         PROVIDERS.resource_api.create_domain(
  421             default_fixtures.ROOT_DOMAIN['id'], default_fixtures.ROOT_DOMAIN)
  422 
  423     def test_assignment_created_with_user_exists(self):
  424         # test assignment can be created if user already exists.
  425         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  426                                              self.default_domain)
  427         user_ref = unit.new_user_ref(self.default_domain['id'],
  428                                      name=self.username,
  429                                      password=self.password)
  430         PROVIDERS.identity_api.create_user(user_ref)
  431 
  432         self._do_test_bootstrap(self.bootstrap)
  433 
  434     def test_assignment_created_with_project_exists(self):
  435         # test assignment can be created if project already exists.
  436         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  437                                              self.default_domain)
  438         project_ref = unit.new_project_ref(self.default_domain['id'],
  439                                            name=self.project_name)
  440         PROVIDERS.resource_api.create_project(project_ref['id'], project_ref)
  441         self._do_test_bootstrap(self.bootstrap)
  442 
  443     def test_assignment_created_with_role_exists(self):
  444         # test assignment can be created if role already exists.
  445         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  446                                              self.default_domain)
  447         role = unit.new_role_ref(name=self.role_name)
  448         PROVIDERS.role_api.create_role(role['id'], role)
  449         self._do_test_bootstrap(self.bootstrap)
  450 
  451     def test_assignment_created_with_region_exists(self):
  452         # test assignment can be created if region already exists.
  453         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  454                                              self.default_domain)
  455         region = unit.new_region_ref(id=self.region_id)
  456         PROVIDERS.catalog_api.create_region(region)
  457         self._do_test_bootstrap(self.bootstrap)
  458 
  459     def test_endpoints_created_with_service_exists(self):
  460         # test assignment can be created if service already exists.
  461         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  462                                              self.default_domain)
  463         service = unit.new_service_ref(name=self.service_name)
  464         PROVIDERS.catalog_api.create_service(service['id'], service)
  465         self._do_test_bootstrap(self.bootstrap)
  466 
  467     def test_endpoints_created_with_endpoint_exists(self):
  468         # test assignment can be created if endpoint already exists.
  469         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  470                                              self.default_domain)
  471         service = unit.new_service_ref(name=self.service_name)
  472         PROVIDERS.catalog_api.create_service(service['id'], service)
  473 
  474         region = unit.new_region_ref(id=self.region_id)
  475         PROVIDERS.catalog_api.create_region(region)
  476 
  477         endpoint = unit.new_endpoint_ref(interface='public',
  478                                          service_id=service['id'],
  479                                          url=self.public_url,
  480                                          region_id=self.region_id)
  481         PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
  482 
  483         self._do_test_bootstrap(self.bootstrap)
  484 
  485     def test_endpoints_created_with_new_endpoints(self):
  486         service = unit.new_service_ref(name=self.service_name, type='identity')
  487         PROVIDERS.catalog_api.create_service(service['id'], service)
  488         region = unit.new_region_ref(id=self.region_id)
  489         PROVIDERS.catalog_api.create_region(region)
  490         endpoint = unit.new_endpoint_ref(interface='public',
  491                                          service_id=service['id'],
  492                                          url=uuid.uuid4().hex,
  493                                          region_id=self.region_id)
  494         PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
  495 
  496         self._do_test_bootstrap(self.bootstrap)
  497         updated_endpoint = PROVIDERS.catalog_api.get_endpoint(endpoint['id'])
  498         self.assertEqual(updated_endpoint['url'], self.bootstrap.public_url)
  499 
  500 
  501 class CliDomainConfigAllTestCase(unit.SQLDriverOverrides, unit.TestCase):
  502 
  503     def setUp(self):
  504         self.useFixture(database.Database())
  505         super(CliDomainConfigAllTestCase, self).setUp()
  506         self.load_backends()
  507         self.config_fixture.config(
  508             group='identity',
  509             domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap')
  510         self.domain_count = 3
  511         self.setup_initial_domains()
  512         self.logging = self.useFixture(
  513             fixtures.FakeLogger(level=logging.INFO))
  514 
  515     def config_files(self):
  516         self.config_fixture.register_cli_opt(cli.command_opt)
  517         config_files = super(CliDomainConfigAllTestCase, self).config_files()
  518         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
  519         return config_files
  520 
  521     def cleanup_domains(self):
  522         for domain in self.domains:
  523             if domain == 'domain_default':
  524                 # Not allowed to delete the default domain, but should at least
  525                 # delete any domain-specific config for it.
  526                 PROVIDERS.domain_config_api.delete_config(
  527                     CONF.identity.default_domain_id)
  528                 continue
  529             this_domain = self.domains[domain]
  530             this_domain['enabled'] = False
  531             PROVIDERS.resource_api.update_domain(
  532                 this_domain['id'], this_domain
  533             )
  534             PROVIDERS.resource_api.delete_domain(this_domain['id'])
  535         self.domains = {}
  536 
  537     def config(self, config_files):
  538         CONF(args=['domain_config_upload', '--all'], project='keystone',
  539              default_config_files=config_files)
  540 
  541     def setup_initial_domains(self):
  542 
  543         def create_domain(domain):
  544             return PROVIDERS.resource_api.create_domain(domain['id'], domain)
  545 
  546         PROVIDERS.resource_api.create_domain(
  547             default_fixtures.ROOT_DOMAIN['id'], default_fixtures.ROOT_DOMAIN)
  548 
  549         self.domains = {}
  550         self.addCleanup(self.cleanup_domains)
  551         for x in range(1, self.domain_count):
  552             domain = 'domain%s' % x
  553             self.domains[domain] = create_domain(
  554                 {'id': uuid.uuid4().hex, 'name': domain})
  555         self.default_domain = unit.new_domain_ref(
  556             description=u'The default domain',
  557             id=CONF.identity.default_domain_id,
  558             name=u'Default')
  559         self.domains['domain_default'] = create_domain(self.default_domain)
  560 
  561     def test_config_upload(self):
  562         # The values below are the same as in the domain_configs_multi_ldap
  563         # directory of test config_files.
  564         default_config = {
  565             'ldap': {'url': 'fake://memory',
  566                      'user': 'cn=Admin',
  567                      'password': 'password',
  568                      'suffix': 'cn=example,cn=com'},
  569             'identity': {'driver': 'ldap'}
  570         }
  571         domain1_config = {
  572             'ldap': {'url': 'fake://memory1',
  573                      'user': 'cn=Admin',
  574                      'password': 'password',
  575                      'suffix': 'cn=example,cn=com'},
  576             'identity': {'driver': 'ldap',
  577                          'list_limit': '101'}
  578         }
  579         domain2_config = {
  580             'ldap': {'url': 'fake://memory',
  581                      'user': 'cn=Admin',
  582                      'password': 'password',
  583                      'suffix': 'cn=myroot,cn=com',
  584                      'group_tree_dn': 'ou=UserGroups,dc=myroot,dc=org',
  585                      'user_tree_dn': 'ou=Users,dc=myroot,dc=org'},
  586             'identity': {'driver': 'ldap'}
  587         }
  588 
  589         # Clear backend dependencies, since cli loads these manually
  590         provider_api.ProviderAPIs._clear_registry_instances()
  591         cli.DomainConfigUpload.main()
  592 
  593         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  594             CONF.identity.default_domain_id)
  595         self.assertEqual(default_config, res)
  596         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  597             self.domains['domain1']['id'])
  598         self.assertEqual(domain1_config, res)
  599         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  600             self.domains['domain2']['id'])
  601         self.assertEqual(domain2_config, res)
  602 
  603 
  604 class CliDomainConfigSingleDomainTestCase(CliDomainConfigAllTestCase):
  605 
  606     def config(self, config_files):
  607         CONF(args=['domain_config_upload', '--domain-name', 'Default'],
  608              project='keystone', default_config_files=config_files)
  609 
  610     def test_config_upload(self):
  611         # The values below are the same as in the domain_configs_multi_ldap
  612         # directory of test config_files.
  613         default_config = {
  614             'ldap': {'url': 'fake://memory',
  615                      'user': 'cn=Admin',
  616                      'password': 'password',
  617                      'suffix': 'cn=example,cn=com'},
  618             'identity': {'driver': 'ldap'}
  619         }
  620 
  621         # Clear backend dependencies, since cli loads these manually
  622         provider_api.ProviderAPIs._clear_registry_instances()
  623         cli.DomainConfigUpload.main()
  624 
  625         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  626             CONF.identity.default_domain_id)
  627         self.assertEqual(default_config, res)
  628         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  629             self.domains['domain1']['id'])
  630         self.assertEqual({}, res)
  631         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  632             self.domains['domain2']['id'])
  633         self.assertEqual({}, res)
  634 
  635     def test_no_overwrite_config(self):
  636         # Create a config for the default domain
  637         default_config = {
  638             'ldap': {'url': uuid.uuid4().hex},
  639             'identity': {'driver': 'ldap'}
  640         }
  641         PROVIDERS.domain_config_api.create_config(
  642             CONF.identity.default_domain_id, default_config)
  643 
  644         # Now try and upload the settings in the configuration file for the
  645         # default domain
  646         provider_api.ProviderAPIs._clear_registry_instances()
  647         with mock.patch('builtins.print') as mock_print:
  648             self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
  649             file_name = ('keystone.%s.conf' % self.default_domain['name'])
  650             error_msg = _(
  651                 'Domain: %(domain)s already has a configuration defined - '
  652                 'ignoring file: %(file)s.') % {
  653                     'domain': self.default_domain['name'],
  654                     'file': os.path.join(CONF.identity.domain_config_dir,
  655                                          file_name)}
  656             mock_print.assert_has_calls([mock.call(error_msg)])
  657 
  658         res = PROVIDERS.domain_config_api.get_config(
  659             CONF.identity.default_domain_id)
  660         # The initial config should not have been overwritten
  661         self.assertEqual(default_config, res)
  662 
  663 
  664 class CliDomainConfigNoOptionsTestCase(CliDomainConfigAllTestCase):
  665 
  666     def config(self, config_files):
  667         CONF(args=['domain_config_upload'],
  668              project='keystone', default_config_files=config_files)
  669 
  670     def test_config_upload(self):
  671         provider_api.ProviderAPIs._clear_registry_instances()
  672         with mock.patch('builtins.print') as mock_print:
  673             self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
  674             mock_print.assert_has_calls(
  675                 [mock.call(
  676                     _('At least one option must be provided, use either '
  677                       '--all or --domain-name'))])
  678 
  679 
  680 class CliDomainConfigTooManyOptionsTestCase(CliDomainConfigAllTestCase):
  681 
  682     def config(self, config_files):
  683         CONF(args=['domain_config_upload', '--all', '--domain-name',
  684                    'Default'],
  685              project='keystone', default_config_files=config_files)
  686 
  687     def test_config_upload(self):
  688         provider_api.ProviderAPIs._clear_registry_instances()
  689         with mock.patch('builtins.print') as mock_print:
  690             self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
  691             mock_print.assert_has_calls(
  692                 [mock.call(_('The --all option cannot be used with '
  693                              'the --domain-name option'))])
  694 
  695 
  696 class CliDomainConfigInvalidDomainTestCase(CliDomainConfigAllTestCase):
  697 
  698     def config(self, config_files):
  699         self.invalid_domain_name = uuid.uuid4().hex
  700         CONF(args=['domain_config_upload', '--domain-name',
  701                    self.invalid_domain_name],
  702              project='keystone', default_config_files=config_files)
  703 
  704     def test_config_upload(self):
  705         provider_api.ProviderAPIs._clear_registry_instances()
  706         with mock.patch('builtins.print') as mock_print:
  707             self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
  708             file_name = 'keystone.%s.conf' % self.invalid_domain_name
  709             error_msg = (_(
  710                 'Invalid domain name: %(domain)s found in config file name: '
  711                 '%(file)s - ignoring this file.') % {
  712                     'domain': self.invalid_domain_name,
  713                     'file': os.path.join(CONF.identity.domain_config_dir,
  714                                          file_name)})
  715             mock_print.assert_has_calls([mock.call(error_msg)])
  716 
  717 
  718 class TestDomainConfigFinder(unit.BaseTestCase):
  719 
  720     def setUp(self):
  721         super(TestDomainConfigFinder, self).setUp()
  722         self.logging = self.useFixture(fixtures.LoggerFixture())
  723 
  724     @mock.patch('os.walk')
  725     def test_finder_ignores_files(self, mock_walk):
  726         mock_walk.return_value = [
  727             ['.', [], ['file.txt', 'keystone.conf', 'keystone.domain0.conf']],
  728         ]
  729 
  730         domain_configs = list(cli._domain_config_finder('.'))
  731 
  732         expected_domain_configs = [('./keystone.domain0.conf', 'domain0')]
  733         self.assertThat(domain_configs,
  734                         matchers.Equals(expected_domain_configs))
  735 
  736         expected_msg_template = ('Ignoring file (%s) while scanning '
  737                                  'domain config directory')
  738         self.assertThat(
  739             self.logging.output,
  740             matchers.Contains(expected_msg_template % 'file.txt'))
  741         self.assertThat(
  742             self.logging.output,
  743             matchers.Contains(expected_msg_template % 'keystone.conf'))
  744 
  745 
  746 class CliDBSyncTestCase(unit.BaseTestCase):
  747 
  748     class FakeConfCommand(object):
  749         def __init__(self, parent):
  750             self.extension = False
  751             self.check = parent.command_check
  752             self.expand = parent.command_expand
  753             self.migrate = parent.command_migrate
  754             self.contract = parent.command_contract
  755             self.version = None
  756 
  757     def setUp(self):
  758         super(CliDBSyncTestCase, self).setUp()
  759         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
  760         self.config_fixture.register_cli_opt(cli.command_opt)
  761         upgrades.offline_sync_database_to_version = mock.Mock()
  762         upgrades.expand_schema = mock.Mock()
  763         upgrades.migrate_data = mock.Mock()
  764         upgrades.contract_schema = mock.Mock()
  765         self.command_check = False
  766         self.command_expand = False
  767         self.command_migrate = False
  768         self.command_contract = False
  769 
  770     def _assert_correct_call(self, mocked_function):
  771         for func in [upgrades.offline_sync_database_to_version,
  772                      upgrades.expand_schema,
  773                      upgrades.migrate_data,
  774                      upgrades.contract_schema]:
  775             if func == mocked_function:
  776                 self.assertTrue(func.called)
  777             else:
  778                 self.assertFalse(func.called)
  779 
  780     def test_db_sync(self):
  781         self.useFixture(fixtures.MockPatchObject(
  782             CONF, 'command', self.FakeConfCommand(self)))
  783         cli.DbSync.main()
  784         self._assert_correct_call(
  785             upgrades.offline_sync_database_to_version)
  786 
  787     def test_db_sync_expand(self):
  788         self.command_expand = True
  789         self.useFixture(fixtures.MockPatchObject(
  790             CONF, 'command', self.FakeConfCommand(self)))
  791         cli.DbSync.main()
  792         self._assert_correct_call(upgrades.expand_schema)
  793 
  794     def test_db_sync_migrate(self):
  795         self.command_migrate = True
  796         self.useFixture(fixtures.MockPatchObject(
  797             CONF, 'command', self.FakeConfCommand(self)))
  798         cli.DbSync.main()
  799         self._assert_correct_call(upgrades.migrate_data)
  800 
  801     def test_db_sync_contract(self):
  802         self.command_contract = True
  803         self.useFixture(fixtures.MockPatchObject(
  804             CONF, 'command', self.FakeConfCommand(self)))
  805         cli.DbSync.main()
  806         self._assert_correct_call(upgrades.contract_schema)
  807 
  808     @mock.patch('keystone.cmd.cli.upgrades.get_db_version')
  809     def test_db_sync_check_when_database_is_empty(self, mocked_get_db_version):
  810         e = migration.exception.DBMigrationError("Invalid version")
  811         mocked_get_db_version.side_effect = e
  812         checker = cli.DbSync()
  813 
  814         log_info = self.useFixture(fixtures.FakeLogger(level=log.INFO))
  815         status = checker.check_db_sync_status()
  816         self.assertIn("not currently under version control", log_info.output)
  817         self.assertEqual(status, 2)
  818 
  819 
  820 class TestMappingPopulate(unit.SQLDriverOverrides, unit.TestCase):
  821 
  822     def setUp(self):
  823         sqldb = self.useFixture(database.Database())
  824         super(TestMappingPopulate, self).setUp()
  825         self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
  826         self.ldapdb.clear()
  827 
  828         self.load_backends()
  829 
  830         sqldb.recreate()
  831         self.load_fixtures(default_fixtures)
  832 
  833     def config_files(self):
  834         self.config_fixture.register_cli_opt(cli.command_opt)
  835         config_files = super(TestMappingPopulate, self).config_files()
  836         config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
  837         return config_files
  838 
  839     def config_overrides(self):
  840         super(TestMappingPopulate, self).config_overrides()
  841         self.config_fixture.config(group='identity', driver='ldap')
  842         self.config_fixture.config(group='identity_mapping',
  843                                    backward_compatible_ids=False)
  844 
  845     def config(self, config_files):
  846         CONF(args=['mapping_populate', '--domain-name', 'Default'],
  847              project='keystone',
  848              default_config_files=config_files)
  849 
  850     def test_mapping_populate(self):
  851         # mapping_populate should create id mappings. Test plan:
  852         # 0. Purge mappings
  853         # 1. Fetch user list directly via backend. It will not create any
  854         #    mappings because it bypasses identity manager
  855         # 2. Verify that users have no public_id yet
  856         # 3. Execute mapping_populate. It should create id mappings
  857         # 4. For the same users verify that they have public_id now
  858         purge_filter = {}
  859         PROVIDERS.id_mapping_api.purge_mappings(purge_filter)
  860         hints = None
  861         users = PROVIDERS.identity_api.driver.list_users(hints)
  862         for user in users:
  863             local_entity = {
  864                 'domain_id': CONF.identity.default_domain_id,
  865                 'local_id': user['id'],
  866                 'entity_type': identity_mapping.EntityType.USER}
  867             self.assertIsNone(
  868                 PROVIDERS.id_mapping_api.get_public_id(local_entity)
  869             )
  870 
  871         # backends are loaded again in the command handler
  872         provider_api.ProviderAPIs._clear_registry_instances()
  873         cli.MappingPopulate.main()
  874 
  875         for user in users:
  876             local_entity = {
  877                 'domain_id': CONF.identity.default_domain_id,
  878                 'local_id': user['id'],
  879                 'entity_type': identity_mapping.EntityType.USER}
  880             self.assertIsNotNone(
  881                 PROVIDERS.id_mapping_api.get_public_id(local_entity))
  882 
  883     def test_bad_domain_name(self):
  884         CONF(args=['mapping_populate', '--domain-name', uuid.uuid4().hex],
  885              project='keystone')
  886         # backends are loaded again in the command handler
  887         provider_api.ProviderAPIs._clear_registry_instances()
  888         # NOTE: assertEqual is used on purpose. assertFalse passes with None.
  889         self.assertEqual(False, cli.MappingPopulate.main())
  890 
  891 
  892 class CliDomainConfigUploadNothing(unit.BaseTestCase):
  893 
  894     def setUp(self):
  895         super(CliDomainConfigUploadNothing, self).setUp()
  896 
  897         config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
  898         config_fixture.register_cli_opt(cli.command_opt)
  899 
  900         # NOTE(dstanek): since this is not testing any database
  901         # functionality there is no need to go through the motions and
  902         # setup a test database.
  903         def fake_load_backends(self):
  904             self.resource_manager = mock.Mock()
  905         self.useFixture(fixtures.MockPatchObject(
  906             cli.DomainConfigUploadFiles, 'load_backends', fake_load_backends))
  907 
  908         tempdir = self.useFixture(fixtures.TempDir())
  909         config_fixture.config(group='identity', domain_config_dir=tempdir.path)
  910 
  911         self.logging = self.useFixture(
  912             fixtures.FakeLogger(level=logging.DEBUG))
  913 
  914     def test_uploading_all_from_an_empty_directory(self):
  915         CONF(args=['domain_config_upload', '--all'], project='keystone',
  916              default_config_files=[])
  917         cli.DomainConfigUpload.main()
  918 
  919         expected_msg = ('No domain configs uploaded from %r' %
  920                         CONF.identity.domain_config_dir)
  921         self.assertThat(self.logging.output,
  922                         matchers.Contains(expected_msg))
  923 
  924 
  925 class CachingDoctorTests(unit.TestCase):
  926 
  927     def test_symptom_caching_disabled(self):
  928         # Symptom Detected: Caching disabled
  929         self.config_fixture.config(group='cache', enabled=False)
  930         self.assertTrue(caching.symptom_caching_disabled())
  931 
  932         # No Symptom Detected: Caching is enabled
  933         self.config_fixture.config(group='cache', enabled=True)
  934         self.assertFalse(caching.symptom_caching_disabled())
  935 
  936     def test_caching_symptom_caching_enabled_without_a_backend(self):
  937         # Success Case: Caching enabled and backend configured
  938         self.config_fixture.config(group='cache', enabled=True)
  939         self.config_fixture.config(group='cache', backend='dogpile.cache.null')
  940         self.assertTrue(caching.symptom_caching_enabled_without_a_backend())
  941 
  942         # Failure Case 1: Caching disabled and backend not configured
  943         self.config_fixture.config(group='cache', enabled=False)
  944         self.config_fixture.config(group='cache', backend='dogpile.cache.null')
  945         self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
  946 
  947         # Failure Case 2: Caching disabled and backend configured
  948         self.config_fixture.config(group='cache', enabled=False)
  949         self.config_fixture.config(group='cache',
  950                                    backend='dogpile.cache.memory')
  951         self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
  952 
  953         # Failure Case 3: Caching enabled and backend configured
  954         self.config_fixture.config(group='cache', enabled=True)
  955         self.config_fixture.config(group='cache',
  956                                    backend='dogpile.cache.memory')
  957         self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
  958 
  959 
  960 class CredentialDoctorTests(unit.TestCase):
  961 
  962     def test_credential_and_fernet_key_repositories_match(self):
  963         # Symptom Detected: Key repository paths are not unique
  964         directory = self.useFixture(fixtures.TempDir()).path
  965         self.config_fixture.config(group='credential',
  966                                    key_repository=directory)
  967         self.config_fixture.config(group='fernet_tokens',
  968                                    key_repository=directory)
  969         self.assertTrue(credential.symptom_unique_key_repositories())
  970 
  971     def test_credential_and_fernet_key_repositories_are_unique(self):
  972         # No Symptom Detected: Key repository paths are unique
  973         self.config_fixture.config(group='credential',
  974                                    key_repository='/etc/keystone/cred-repo')
  975         self.config_fixture.config(group='fernet_tokens',
  976                                    key_repository='/etc/keystone/fernet-repo')
  977         self.assertFalse(credential.symptom_unique_key_repositories())
  978 
  979     @mock.patch('keystone.cmd.doctor.credential.utils')
  980     def test_usability_of_cred_fernet_key_repo_raised(self, mock_utils):
  981         # Symptom Detected: credential fernet key repository is world readable
  982         self.config_fixture.config(group='credential', provider='fernet')
  983         mock_utils.FernetUtils().validate_key_repository.return_value = False
  984         self.assertTrue(
  985             credential.symptom_usability_of_credential_fernet_key_repository())
  986 
  987     @mock.patch('keystone.cmd.doctor.credential.utils')
  988     def test_usability_of_cred_fernet_key_repo_not_raised(self, mock_utils):
  989         # No Symptom Detected: Custom driver is used
  990         self.config_fixture.config(group='credential', provider='my-driver')
  991         mock_utils.FernetUtils().validate_key_repository.return_value = True
  992         self.assertFalse(
  993             credential.symptom_usability_of_credential_fernet_key_repository())
  994 
  995         # No Symptom Detected: key repository is not world readable
  996         self.config_fixture.config(group='credential', provider='fernet')
  997         mock_utils.FernetUtils().validate_key_repository.return_value = True
  998         self.assertFalse(
  999             credential.symptom_usability_of_credential_fernet_key_repository())
 1000 
 1001     @mock.patch('keystone.cmd.doctor.credential.utils')
 1002     def test_keys_in_credential_fernet_key_repository_raised(self, mock_utils):
 1003         # Symptom Detected: Key repo is empty
 1004         self.config_fixture.config(group='credential', provider='fernet')
 1005         mock_utils.FernetUtils().load_keys.return_value = False
 1006         self.assertTrue(
 1007             credential.symptom_keys_in_credential_fernet_key_repository())
 1008 
 1009     @mock.patch('keystone.cmd.doctor.credential.utils')
 1010     def test_keys_in_credential_fernet_key_repository_not_raised(
 1011             self, mock_utils):
 1012         # No Symptom Detected: Custom driver is used
 1013         self.config_fixture.config(group='credential', provider='my-driver')
 1014         mock_utils.FernetUtils().load_keys.return_value = True
 1015         self.assertFalse(
 1016             credential.symptom_keys_in_credential_fernet_key_repository())
 1017 
 1018         # No Symptom Detected: Key repo is not empty, fernet is current driver
 1019         self.config_fixture.config(group='credential', provider='fernet')
 1020         mock_utils.FernetUtils().load_keys.return_value = True
 1021         self.assertFalse(
 1022             credential.symptom_keys_in_credential_fernet_key_repository())
 1023 
 1024 
 1025 class DatabaseDoctorTests(unit.TestCase):
 1026 
 1027     def test_symptom_is_raised_if_database_connection_is_SQLite(self):
 1028         # Symptom Detected: Database connection is sqlite
 1029         self.config_fixture.config(
 1030             group='database',
 1031             connection='sqlite:///mydb')
 1032         self.assertTrue(
 1033             doc_database.symptom_database_connection_is_not_SQLite())
 1034 
 1035         # No Symptom Detected: Database connection is MySQL
 1036         self.config_fixture.config(
 1037             group='database',
 1038             connection='mysql+mysqlconnector://admin:secret@localhost/mydb')
 1039         self.assertFalse(
 1040             doc_database.symptom_database_connection_is_not_SQLite())
 1041 
 1042 
 1043 class DebugDoctorTests(unit.TestCase):
 1044 
 1045     def test_symptom_debug_mode_is_enabled(self):
 1046         # Symptom Detected: Debug mode is enabled
 1047         self.config_fixture.config(debug=True)
 1048         self.assertTrue(debug.symptom_debug_mode_is_enabled())
 1049 
 1050         # No Symptom Detected: Debug mode is disabled
 1051         self.config_fixture.config(debug=False)
 1052         self.assertFalse(debug.symptom_debug_mode_is_enabled())
 1053 
 1054 
 1055 class FederationDoctorTests(unit.TestCase):
 1056 
 1057     def test_symptom_comma_in_SAML_public_certificate_path(self):
 1058         # Symptom Detected: There is a comma in path to public cert file
 1059         self.config_fixture.config(group='saml', certfile='file,cert.pem')
 1060         self.assertTrue(
 1061             federation.symptom_comma_in_SAML_public_certificate_path())
 1062 
 1063         # No Symptom Detected: There is no comma in the path
 1064         self.config_fixture.config(group='saml', certfile='signing_cert.pem')
 1065         self.assertFalse(
 1066             federation.symptom_comma_in_SAML_public_certificate_path())
 1067 
 1068     def test_symptom_comma_in_SAML_private_key_file_path(self):
 1069         # Symptom Detected: There is a comma in path to private key file
 1070         self.config_fixture.config(group='saml', keyfile='file,key.pem')
 1071         self.assertTrue(
 1072             federation.symptom_comma_in_SAML_private_key_file_path())
 1073 
 1074         # No Symptom Detected: There is no comma in the path
 1075         self.config_fixture.config(group='saml', keyfile='signing_key.pem')
 1076         self.assertFalse(
 1077             federation.symptom_comma_in_SAML_private_key_file_path())
 1078 
 1079 
 1080 class LdapDoctorTests(unit.TestCase):
 1081 
 1082     def test_user_enabled_emulation_dn_ignored_raised(self):
 1083         # Symptom when user_enabled_emulation_dn is being ignored because the
 1084         # user did not enable the user_enabled_emulation
 1085         self.config_fixture.config(group='ldap', user_enabled_emulation=False)
 1086         self.config_fixture.config(
 1087             group='ldap',
 1088             user_enabled_emulation_dn='cn=enabled_users,dc=example,dc=com')
 1089         self.assertTrue(
 1090             ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
 1091 
 1092     def test_user_enabled_emulation_dn_ignored_not_raised(self):
 1093         # No symptom when configuration set properly
 1094         self.config_fixture.config(group='ldap', user_enabled_emulation=True)
 1095         self.config_fixture.config(
 1096             group='ldap',
 1097             user_enabled_emulation_dn='cn=enabled_users,dc=example,dc=com')
 1098         self.assertFalse(
 1099             ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
 1100         # No symptom when both configurations disabled
 1101         self.config_fixture.config(group='ldap', user_enabled_emulation=False)
 1102         self.config_fixture.config(group='ldap',
 1103                                    user_enabled_emulation_dn=None)
 1104         self.assertFalse(
 1105             ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
 1106 
 1107     def test_user_enabled_emulation_use_group_config_ignored_raised(self):
 1108         # Symptom when user enabled emulation isn't enabled but group_config is
 1109         # enabled
 1110         self.config_fixture.config(group='ldap', user_enabled_emulation=False)
 1111         self.config_fixture.config(
 1112             group='ldap',
 1113             user_enabled_emulation_use_group_config=True)
 1114         self.assertTrue(
 1115             ldap.
 1116             symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
 1117 
 1118     def test_user_enabled_emulation_use_group_config_ignored_not_raised(self):
 1119         # No symptom when configuration deactivated
 1120         self.config_fixture.config(group='ldap', user_enabled_emulation=False)
 1121         self.config_fixture.config(
 1122             group='ldap',
 1123             user_enabled_emulation_use_group_config=False)
 1124         self.assertFalse(
 1125             ldap.
 1126             symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
 1127         # No symptom when configurations set properly
 1128         self.config_fixture.config(group='ldap', user_enabled_emulation=True)
 1129         self.config_fixture.config(
 1130             group='ldap',
 1131             user_enabled_emulation_use_group_config=True)
 1132         self.assertFalse(
 1133             ldap.
 1134             symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
 1135 
 1136     def test_group_members_are_ids_disabled_raised(self):
 1137         # Symptom when objectclass is set to posixGroup but members_are_ids are
 1138         # not enabled
 1139         self.config_fixture.config(group='ldap',
 1140                                    group_objectclass='posixGroup')
 1141         self.config_fixture.config(group='ldap',
 1142                                    group_members_are_ids=False)
 1143         self.assertTrue(ldap.symptom_LDAP_group_members_are_ids_disabled())
 1144 
 1145     def test_group_members_are_ids_disabled_not_raised(self):
 1146         # No symptom when the configurations are set properly
 1147         self.config_fixture.config(group='ldap',
 1148                                    group_objectclass='posixGroup')
 1149         self.config_fixture.config(group='ldap',
 1150                                    group_members_are_ids=True)
 1151         self.assertFalse(ldap.symptom_LDAP_group_members_are_ids_disabled())
 1152         # No symptom when configuration deactivated
 1153         self.config_fixture.config(group='ldap',
 1154                                    group_objectclass='groupOfNames')
 1155         self.config_fixture.config(group='ldap',
 1156                                    group_members_are_ids=False)
 1157         self.assertFalse(ldap.symptom_LDAP_group_members_are_ids_disabled())
 1158 
 1159     @mock.patch('os.listdir')
 1160     @mock.patch('os.path.isdir')
 1161     def test_file_based_domain_specific_configs_raised(self, mocked_isdir,
 1162                                                        mocked_listdir):
 1163         self.config_fixture.config(
 1164             group='identity',
 1165             domain_specific_drivers_enabled=True)
 1166         self.config_fixture.config(
 1167             group='identity',
 1168             domain_configurations_from_database=False)
 1169 
 1170         # Symptom if there is no existing directory
 1171         mocked_isdir.return_value = False
 1172         self.assertTrue(ldap.symptom_LDAP_file_based_domain_specific_configs())
 1173 
 1174         # Symptom if there is an invalid filename inside the domain directory
 1175         mocked_isdir.return_value = True
 1176         mocked_listdir.return_value = ['openstack.domains.conf']
 1177         self.assertTrue(ldap.symptom_LDAP_file_based_domain_specific_configs())
 1178 
 1179     @mock.patch('os.listdir')
 1180     @mock.patch('os.path.isdir')
 1181     def test_file_based_domain_specific_configs_not_raised(self, mocked_isdir,
 1182                                                            mocked_listdir):
 1183         # No symptom if both configurations deactivated
 1184         self.config_fixture.config(
 1185             group='identity',
 1186             domain_specific_drivers_enabled=False)
 1187         self.config_fixture.config(
 1188             group='identity',
 1189             domain_configurations_from_database=False)
 1190         self.assertFalse(
 1191             ldap.symptom_LDAP_file_based_domain_specific_configs())
 1192 
 1193         # No symptom if directory exists with no invalid filenames
 1194         self.config_fixture.config(
 1195             group='identity',
 1196             domain_specific_drivers_enabled=True)
 1197         self.config_fixture.config(
 1198             group='identity',
 1199             domain_configurations_from_database=False)
 1200         mocked_isdir.return_value = True
 1201         mocked_listdir.return_value = ['keystone.domains.conf']
 1202         self.assertFalse(
 1203             ldap.symptom_LDAP_file_based_domain_specific_configs())
 1204 
 1205     @mock.patch('os.listdir')
 1206     @mock.patch('os.path.isdir')
 1207     @mock.patch('keystone.cmd.doctor.ldap.configparser.ConfigParser')
 1208     def test_file_based_domain_specific_configs_formatted_correctly_raised(
 1209             self, mocked_parser, mocked_isdir, mocked_listdir):
 1210         symptom = ('symptom_LDAP_file_based_domain_specific_configs'
 1211                    '_formatted_correctly')
 1212         # Symptom Detected: Ldap domain specific configuration files are not
 1213         # formatted correctly
 1214         self.config_fixture.config(
 1215             group='identity',
 1216             domain_specific_drivers_enabled=True)
 1217         self.config_fixture.config(
 1218             group='identity',
 1219             domain_configurations_from_database=False)
 1220         mocked_isdir.return_value = True
 1221 
 1222         mocked_listdir.return_value = ['keystone.domains.conf']
 1223         mock_instance = mock.MagicMock()
 1224         mock_instance.read.side_effect = configparser.Error('No Section')
 1225         mocked_parser.return_value = mock_instance
 1226 
 1227         self.assertTrue(getattr(ldap, symptom)())
 1228 
 1229     @mock.patch('os.listdir')
 1230     @mock.patch('os.path.isdir')
 1231     def test_file_based_domain_specific_configs_formatted_correctly_not_raised(
 1232             self, mocked_isdir, mocked_listdir):
 1233         symptom = ('symptom_LDAP_file_based_domain_specific_configs'
 1234                    '_formatted_correctly')
 1235         # No Symptom Detected: Domain_specific drivers is not enabled
 1236         self.config_fixture.config(
 1237             group='identity',
 1238             domain_specific_drivers_enabled=False)
 1239         self.assertFalse(getattr(ldap, symptom)())
 1240 
 1241         # No Symptom Detected: Domain configuration from database is enabled
 1242         self.config_fixture.config(
 1243             group='identity',
 1244             domain_specific_drivers_enabled=True)
 1245         self.assertFalse(getattr(ldap, symptom)())
 1246         self.config_fixture.config(
 1247             group='identity',
 1248             domain_configurations_from_database=True)
 1249         self.assertFalse(getattr(ldap, symptom)())
 1250 
 1251         # No Symptom Detected: The directory in domain_config_dir doesn't exist
 1252         mocked_isdir.return_value = False
 1253         self.assertFalse(getattr(ldap, symptom)())
 1254 
 1255         # No Symptom Detected: domain specific drivers are enabled, domain
 1256         # configurations from database are disabled, directory exists, and no
 1257         # exceptions found.
 1258         self.config_fixture.config(
 1259             group='identity',
 1260             domain_configurations_from_database=False)
 1261         mocked_isdir.return_value = True
 1262         # An empty directory should not raise this symptom
 1263         self.assertFalse(getattr(ldap, symptom)())
 1264 
 1265         # Test again with a file inside the directory
 1266         mocked_listdir.return_value = ['keystone.domains.conf']
 1267         self.assertFalse(getattr(ldap, symptom)())
 1268 
 1269 
 1270 class SecurityComplianceDoctorTests(unit.TestCase):
 1271 
 1272     def test_minimum_password_age_greater_than_password_expires_days(self):
 1273         # Symptom Detected: Minimum password age is greater than the password
 1274         # expires days. Both values are positive integers greater than zero.
 1275         self.config_fixture.config(group='security_compliance',
 1276                                    minimum_password_age=2)
 1277         self.config_fixture.config(group='security_compliance',
 1278                                    password_expires_days=1)
 1279         self.assertTrue(
 1280             security_compliance.
 1281             symptom_minimum_password_age_greater_than_expires_days())
 1282 
 1283     def test_minimum_password_age_equal_to_password_expires_days(self):
 1284         # Symptom Detected: Minimum password age is equal to the password
 1285         # expires days. Both values are positive integers greater than zero.
 1286         self.config_fixture.config(group='security_compliance',
 1287                                    minimum_password_age=1)
 1288         self.config_fixture.config(group='security_compliance',
 1289                                    password_expires_days=1)
 1290         self.assertTrue(
 1291             security_compliance.
 1292             symptom_minimum_password_age_greater_than_expires_days())
 1293 
 1294     def test_minimum_password_age_less_than_password_expires_days(self):
 1295         # No Symptom Detected: Minimum password age is less than password
 1296         # expires days. Both values are positive integers greater than zero.
 1297         self.config_fixture.config(group='security_compliance',
 1298                                    minimum_password_age=1)
 1299         self.config_fixture.config(group='security_compliance',
 1300                                    password_expires_days=2)
 1301         self.assertFalse(
 1302             security_compliance.
 1303             symptom_minimum_password_age_greater_than_expires_days())
 1304 
 1305     def test_minimum_password_age_and_password_expires_days_deactivated(self):
 1306         # No Symptom Detected: when minimum_password_age's default value is 0
 1307         # and password_expires_days' default value is None
 1308         self.assertFalse(
 1309             security_compliance.
 1310             symptom_minimum_password_age_greater_than_expires_days())
 1311 
 1312     def test_invalid_password_regular_expression(self):
 1313         # Symptom Detected: Regular expression is invalid
 1314         self.config_fixture.config(
 1315             group='security_compliance',
 1316             password_regex='^^(??=.*\d)$')
 1317         self.assertTrue(
 1318             security_compliance.symptom_invalid_password_regular_expression())
 1319 
 1320     def test_valid_password_regular_expression(self):
 1321         # No Symptom Detected: Regular expression is valid
 1322         self.config_fixture.config(
 1323             group='security_compliance',
 1324             password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
 1325         self.assertFalse(
 1326             security_compliance.symptom_invalid_password_regular_expression())
 1327 
 1328     def test_password_regular_expression_deactivated(self):
 1329         # No Symptom Detected: Regular expression deactivated to None
 1330         self.config_fixture.config(
 1331             group='security_compliance',
 1332             password_regex=None)
 1333         self.assertFalse(
 1334             security_compliance.symptom_invalid_password_regular_expression())
 1335 
 1336     def test_password_regular_expression_description_not_set(self):
 1337         # Symptom Detected: Regular expression is set but description is not
 1338         self.config_fixture.config(
 1339             group='security_compliance',
 1340             password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
 1341         self.config_fixture.config(
 1342             group='security_compliance',
 1343             password_regex_description=None)
 1344         self.assertTrue(
 1345             security_compliance.
 1346             symptom_password_regular_expression_description_not_set())
 1347 
 1348     def test_password_regular_expression_description_set(self):
 1349         # No Symptom Detected: Regular expression and description are set
 1350         desc = '1 letter, 1 digit, and a minimum length of 7 is required'
 1351         self.config_fixture.config(
 1352             group='security_compliance',
 1353             password_regex='^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
 1354         self.config_fixture.config(
 1355             group='security_compliance',
 1356             password_regex_description=desc)
 1357         self.assertFalse(
 1358             security_compliance.
 1359             symptom_password_regular_expression_description_not_set())
 1360 
 1361     def test_password_regular_expression_description_deactivated(self):
 1362         # No Symptom Detected: Regular expression and description are
 1363         # deactivated to None
 1364         self.config_fixture.config(
 1365             group='security_compliance', password_regex=None)
 1366         self.config_fixture.config(
 1367             group='security_compliance', password_regex_description=None)
 1368         self.assertFalse(
 1369             security_compliance.
 1370             symptom_password_regular_expression_description_not_set())
 1371 
 1372 
 1373 class TokensDoctorTests(unit.TestCase):
 1374 
 1375     def test_unreasonable_max_token_size_raised(self):
 1376         # Symptom Detected: the max_token_size for fernet is greater than 255
 1377         self.config_fixture.config(group='token', provider='fernet')
 1378         self.config_fixture.config(max_token_size=256)
 1379         self.assertTrue(tokens.symptom_unreasonable_max_token_size())
 1380 
 1381     def test_unreasonable_max_token_size_not_raised(self):
 1382         # No Symptom Detected: the max_token_size for uuid is 32
 1383         self.config_fixture.config(group='token', provider='uuid')
 1384         self.config_fixture.config(max_token_size=32)
 1385         self.assertFalse(tokens.symptom_unreasonable_max_token_size())
 1386 
 1387         # No Symptom Detected: the max_token_size for fernet is 255 or less
 1388         self.config_fixture.config(group='token', provider='fernet')
 1389         self.config_fixture.config(max_token_size=255)
 1390         self.assertFalse(tokens.symptom_unreasonable_max_token_size())
 1391 
 1392 
 1393 class TokenFernetDoctorTests(unit.TestCase):
 1394 
 1395     @mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
 1396     def test_usability_of_Fernet_key_repository_raised(self, mock_utils):
 1397         # Symptom Detected: Fernet key repo is world readable
 1398         self.config_fixture.config(group='token', provider='fernet')
 1399         mock_utils.FernetUtils().validate_key_repository.return_value = False
 1400         self.assertTrue(
 1401             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1402 
 1403     @mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
 1404     def test_usability_of_Fernet_key_repository_not_raised(self, mock_utils):
 1405         # No Symptom Detected: UUID is used instead of fernet
 1406         self.config_fixture.config(group='token', provider='uuid')
 1407         mock_utils.FernetUtils().validate_key_repository.return_value = False
 1408         self.assertFalse(
 1409             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1410 
 1411         # No Symptom Detected: configs set properly, key repo is not world
 1412         # readable but is user readable
 1413         self.config_fixture.config(group='token', provider='fernet')
 1414         mock_utils.FernetUtils().validate_key_repository.return_value = True
 1415         self.assertFalse(
 1416             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1417 
 1418     @mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
 1419     def test_keys_in_Fernet_key_repository_raised(self, mock_utils):
 1420         # Symptom Detected: Fernet key repository is empty
 1421         self.config_fixture.config(group='token', provider='fernet')
 1422         mock_utils.FernetUtils().load_keys.return_value = False
 1423         self.assertTrue(
 1424             tokens_fernet.symptom_keys_in_Fernet_key_repository())
 1425 
 1426     @mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
 1427     def test_keys_in_Fernet_key_repository_not_raised(self, mock_utils):
 1428         # No Symptom Detected: UUID is used instead of fernet
 1429         self.config_fixture.config(group='token', provider='uuid')
 1430         mock_utils.FernetUtils().load_keys.return_value = True
 1431         self.assertFalse(
 1432             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1433 
 1434         # No Symptom Detected: configs set properly, key repo has been
 1435         # populated with keys
 1436         self.config_fixture.config(group='token', provider='fernet')
 1437         mock_utils.FernetUtils().load_keys.return_value = True
 1438         self.assertFalse(
 1439             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1440 
 1441 
 1442 class TestMappingPurge(unit.SQLDriverOverrides, unit.BaseTestCase):
 1443 
 1444     class FakeConfCommand(object):
 1445         def __init__(self, parent):
 1446             self.extension = False
 1447             self.all = parent.command_all
 1448             self.type = parent.command_type
 1449             self.domain_name = parent.command_domain_name
 1450             self.local_id = parent.command_local_id
 1451             self.public_id = parent.command_public_id
 1452 
 1453     def setUp(self):
 1454         # Set up preset cli options and a parser
 1455         super(TestMappingPurge, self).setUp()
 1456         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
 1457         self.config_fixture.register_cli_opt(cli.command_opt)
 1458         # For unit tests that should not throw any erorrs,
 1459         # Use the argument parser to test that the combinations work
 1460         parser_test = argparse.ArgumentParser()
 1461         subparsers = parser_test.add_subparsers()
 1462         self.parser = cli.MappingPurge.add_argument_parser(subparsers)
 1463 
 1464     def test_mapping_purge_with_no_arguments_fails(self):
 1465         # Make sure the logic in main() actually catches no argument error
 1466         self.command_type = None
 1467         self.command_all = False
 1468         self.command_domain_name = None
 1469         self.command_local_id = None
 1470         self.command_public_id = None
 1471         self.useFixture(fixtures.MockPatchObject(
 1472             CONF, 'command', self.FakeConfCommand(self)))
 1473         self.assertRaises(ValueError, cli.MappingPurge.main)
 1474 
 1475     def test_mapping_purge_with_all_and_other_argument_fails(self):
 1476         # Make sure the logic in main() actually catches invalid combinations
 1477         self.command_type = 'user'
 1478         self.command_all = True
 1479         self.command_domain_name = None
 1480         self.command_local_id = None
 1481         self.command_public_id = None
 1482         self.useFixture(fixtures.MockPatchObject(
 1483             CONF, 'command', self.FakeConfCommand(self)))
 1484         self.assertRaises(ValueError, cli.MappingPurge.main)
 1485 
 1486     def test_mapping_purge_with_only_all_passes(self):
 1487         args = (['--all'])
 1488         res = self.parser.parse_args(args)
 1489         self.assertTrue(vars(res)['all'])
 1490 
 1491     def test_mapping_purge_with_domain_name_argument_succeeds(self):
 1492         args = (['--domain-name', uuid.uuid4().hex])
 1493         self.parser.parse_args(args)
 1494 
 1495     def test_mapping_purge_with_public_id_argument_succeeds(self):
 1496         args = (['--public-id', uuid.uuid4().hex])
 1497         self.parser.parse_args(args)
 1498 
 1499     def test_mapping_purge_with_local_id_argument_succeeds(self):
 1500         args = (['--local-id', uuid.uuid4().hex])
 1501         self.parser.parse_args(args)
 1502 
 1503     def test_mapping_purge_with_type_argument_succeeds(self):
 1504         args = (['--type', 'user'])
 1505         self.parser.parse_args(args)
 1506         args = (['--type', 'group'])
 1507         self.parser.parse_args(args)
 1508 
 1509     def test_mapping_purge_with_invalid_argument_fails(self):
 1510         args = (['--invalid-option', 'some value'])
 1511         self.assertRaises(unit.UnexpectedExit, self.parser.parse_args, args)
 1512 
 1513     def test_mapping_purge_with_all_other_combinations_passes(self):
 1514         args = (['--type', 'user', '--local-id', uuid.uuid4().hex])
 1515         self.parser.parse_args(args)
 1516         args.append('--domain-name')
 1517         args.append('test')
 1518         self.parser.parse_args(args)
 1519         args.append('--public-id')
 1520         args.append(uuid.uuid4().hex)
 1521         self.parser.parse_args(args)
 1522 
 1523     @mock.patch.object(keystone.identity.MappingManager, 'purge_mappings')
 1524     def test_mapping_purge_type_user(self, purge_mock):
 1525         # Make sure the logic in main() actually catches no argument error
 1526         self.command_type = 'user'
 1527         self.command_all = False
 1528         self.command_domain_name = None
 1529         self.command_local_id = uuid.uuid4().hex
 1530         self.command_public_id = uuid.uuid4().hex
 1531         self.useFixture(fixtures.MockPatchObject(
 1532             CONF, 'command', self.FakeConfCommand(self)))
 1533 
 1534         def fake_load_backends():
 1535             return dict(
 1536                 id_mapping_api=keystone.identity.core.MappingManager,
 1537                 resource_api=None)
 1538 
 1539         self.useFixture(fixtures.MockPatch(
 1540             'keystone.server.backends.load_backends',
 1541             side_effect=fake_load_backends))
 1542 
 1543         cli.MappingPurge.main()
 1544         purge_mock.assert_called_with({'entity_type': 'user',
 1545                                        'local_id': self.command_local_id,
 1546                                        'public_id': self.command_public_id})
 1547 
 1548 
 1549 class TestUserMappingPurgeFunctional(unit.SQLDriverOverrides, unit.TestCase):
 1550 
 1551     def setUp(self):
 1552         sqldb = self.useFixture(database.Database())
 1553         super(TestUserMappingPurgeFunctional, self).setUp()
 1554         self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
 1555         self.ldapdb.clear()
 1556 
 1557         self.load_backends()
 1558 
 1559         sqldb.recreate()
 1560         self.load_fixtures(default_fixtures)
 1561 
 1562     def config_files(self):
 1563         self.config_fixture.register_cli_opt(cli.command_opt)
 1564         config_files = super(
 1565             TestUserMappingPurgeFunctional, self
 1566         ).config_files()
 1567         config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
 1568         return config_files
 1569 
 1570     def config_overrides(self):
 1571         super(TestUserMappingPurgeFunctional, self).config_overrides()
 1572         self.config_fixture.config(group='identity', driver='ldap')
 1573         self.config_fixture.config(group='identity_mapping',
 1574                                    backward_compatible_ids=False)
 1575 
 1576     def config(self, config_files):
 1577         CONF(args=['mapping_purge', '--type', 'user'],
 1578              project='keystone',
 1579              default_config_files=config_files)
 1580 
 1581     def test_purge_by_user_type(self):
 1582         # Grab the list of the users from the backend directly to avoid
 1583         # populating the public_ids for each user. We do this so we can grab
 1584         # the local_id of a user before it's overwritten by the public_id.
 1585         hints = None
 1586         users = PROVIDERS.identity_api.driver.list_users(hints)
 1587 
 1588         # Create a new group in the backend directly. We do this so that we
 1589         # have control over the local_id, which is `id` here. After creating
 1590         # the group, let's list them so the id_mapping_api creates the public
 1591         # id appropriately.
 1592         group_ref = {
 1593             'id': uuid.uuid4().hex,
 1594             'name': uuid.uuid4().hex,
 1595             'domain_id': CONF.identity.default_domain_id
 1596         }
 1597         PROVIDERS.identity_api.driver.create_group(group_ref['id'], group_ref)
 1598         PROVIDERS.identity_api.list_groups()
 1599 
 1600         # Make sure all users and groups have public ids by querying the
 1601         # id_mapping_api.
 1602         for user in users:
 1603             local_entity = {
 1604                 'domain_id': CONF.identity.default_domain_id,
 1605                 'local_id': user['id'],
 1606                 'entity_type': identity_mapping.EntityType.USER}
 1607             self.assertIsNotNone(
 1608                 PROVIDERS.id_mapping_api.get_public_id(local_entity))
 1609 
 1610         group_entity = {
 1611             'domain_id': CONF.identity.default_domain_id,
 1612             'local_id': group_ref['id'],
 1613             'entity_type': identity_mapping.EntityType.GROUP}
 1614         self.assertIsNotNone(
 1615             PROVIDERS.id_mapping_api.get_public_id(group_entity)
 1616         )
 1617 
 1618         # Purge all users mappings
 1619         provider_api.ProviderAPIs._clear_registry_instances()
 1620         cli.MappingPurge.main()
 1621 
 1622         # Check that all the user mappings were purged
 1623         for user in users:
 1624             local_entity = {
 1625                 'domain_id': CONF.identity.default_domain_id,
 1626                 'local_id': user['id'],
 1627                 'entity_type': identity_mapping.EntityType.USER}
 1628             self.assertIsNone(
 1629                 PROVIDERS.id_mapping_api.get_public_id(local_entity)
 1630             )
 1631 
 1632         # Make sure the group mapping still exists
 1633         self.assertIsNotNone(
 1634             PROVIDERS.id_mapping_api.get_public_id(group_entity)
 1635         )
 1636 
 1637 
 1638 class TestGroupMappingPurgeFunctional(unit.SQLDriverOverrides, unit.TestCase):
 1639 
 1640     def setUp(self):
 1641         sqldb = self.useFixture(database.Database())
 1642         super(TestGroupMappingPurgeFunctional, self).setUp()
 1643         self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
 1644         self.ldapdb.clear()
 1645 
 1646         self.load_backends()
 1647 
 1648         sqldb.recreate()
 1649         self.load_fixtures(default_fixtures)
 1650 
 1651     def config_files(self):
 1652         self.config_fixture.register_cli_opt(cli.command_opt)
 1653         config_files = super(
 1654             TestGroupMappingPurgeFunctional, self
 1655         ).config_files()
 1656         config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
 1657         return config_files
 1658 
 1659     def config_overrides(self):
 1660         super(TestGroupMappingPurgeFunctional, self).config_overrides()
 1661         self.config_fixture.config(group='identity', driver='ldap')
 1662         self.config_fixture.config(group='identity_mapping',
 1663                                    backward_compatible_ids=False)
 1664 
 1665     def config(self, config_files):
 1666         CONF(args=['mapping_purge', '--type', 'group'],
 1667              project='keystone',
 1668              default_config_files=config_files)
 1669 
 1670     def test_purge_by_group_type(self):
 1671         # Grab the list of the users from the backend directly to avoid
 1672         # populating the public_ids for each user. We do this so we can grab
 1673         # the local_id of a user before it's overwritten by the public_id.
 1674         hints = None
 1675         users = PROVIDERS.identity_api.driver.list_users(hints)
 1676 
 1677         # Create a new group in the backend directly. We do this so that we
 1678         # have control over the local_id, which is `id` here. After creating
 1679         # the group, let's list them so the id_mapping_api creates the public
 1680         # id appropriately.
 1681         group_ref = {
 1682             'id': uuid.uuid4().hex,
 1683             'name': uuid.uuid4().hex,
 1684             'domain_id': CONF.identity.default_domain_id
 1685         }
 1686         PROVIDERS.identity_api.driver.create_group(group_ref['id'], group_ref)
 1687         PROVIDERS.identity_api.list_groups()
 1688 
 1689         # Make sure all users and groups have public ids by querying the
 1690         # id_mapping_api.
 1691         for user in users:
 1692             local_entity = {
 1693                 'domain_id': CONF.identity.default_domain_id,
 1694                 'local_id': user['id'],
 1695                 'entity_type': identity_mapping.EntityType.USER}
 1696             self.assertIsNotNone(
 1697                 PROVIDERS.id_mapping_api.get_public_id(local_entity))
 1698 
 1699         group_entity = {
 1700             'domain_id': CONF.identity.default_domain_id,
 1701             'local_id': group_ref['id'],
 1702             'entity_type': identity_mapping.EntityType.GROUP}
 1703         self.assertIsNotNone(
 1704             PROVIDERS.id_mapping_api.get_public_id(group_entity)
 1705         )
 1706 
 1707         # Purge group mappings
 1708         provider_api.ProviderAPIs._clear_registry_instances()
 1709         cli.MappingPurge.main()
 1710 
 1711         # Make sure the group mapping was purged
 1712         self.assertIsNone(
 1713             PROVIDERS.id_mapping_api.get_public_id(group_entity)
 1714         )
 1715 
 1716         # Check that all the user mappings still exist
 1717         for user in users:
 1718             local_entity = {
 1719                 'domain_id': CONF.identity.default_domain_id,
 1720                 'local_id': user['id'],
 1721                 'entity_type': identity_mapping.EntityType.USER}
 1722             self.assertIsNotNone(
 1723                 PROVIDERS.id_mapping_api.get_public_id(local_entity)
 1724             )
 1725 
 1726 
 1727 class TestTrustFlush(unit.SQLDriverOverrides, unit.BaseTestCase):
 1728 
 1729     class FakeConfCommand(object):
 1730         def __init__(self, parent):
 1731             self.extension = False
 1732             self.project_id = parent.command_project_id
 1733             self.trustor_user_id = parent.command_trustor_user_id
 1734             self.trustee_user_id = parent.command_trustee_user_id
 1735             self.date = parent.command_date
 1736 
 1737     def setUp(self):
 1738         # Set up preset cli options and a parser
 1739         super(TestTrustFlush, self).setUp()
 1740         self.useFixture(database.Database())
 1741         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
 1742         self.config_fixture.register_cli_opt(cli.command_opt)
 1743         # For unit tests that should not throw any errors,
 1744         # Use the argument parser to test that the combinations work
 1745         parser_test = argparse.ArgumentParser()
 1746         subparsers = parser_test.add_subparsers()
 1747         self.parser = cli.TrustFlush.add_argument_parser(subparsers)
 1748 
 1749     def config_files(self):
 1750         config_files = super(TestTrustFlush, self).config_files()
 1751         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
 1752         return config_files
 1753 
 1754     def test_trust_flush(self):
 1755         self.command_project_id = None
 1756         self.command_trustor_user_id = None
 1757         self.command_trustee_user_id = None
 1758         self.command_date = datetime.datetime.utcnow()
 1759         self.useFixture(fixtures.MockPatchObject(
 1760             CONF, 'command', self.FakeConfCommand(self)))
 1761 
 1762         def fake_load_backends():
 1763             return dict(
 1764                 trust_api=keystone.trust.core.Manager())
 1765 
 1766         self.useFixture(fixtures.MockPatch(
 1767             'keystone.server.backends.load_backends',
 1768             side_effect=fake_load_backends))
 1769         trust = cli.TrustFlush()
 1770         trust.main()
 1771 
 1772     def test_trust_flush_with_invalid_date(self):
 1773         self.command_project_id = None
 1774         self.command_trustor_user_id = None
 1775         self.command_trustee_user_id = None
 1776         self.command_date = '4/10/92'
 1777         self.useFixture(fixtures.MockPatchObject(
 1778             CONF, 'command', self.FakeConfCommand(self)))
 1779 
 1780         def fake_load_backends():
 1781             return dict(
 1782                 trust_api=keystone.trust.core.Manager())
 1783 
 1784         self.useFixture(fixtures.MockPatch(
 1785             'keystone.server.backends.load_backends',
 1786             side_effect=fake_load_backends))
 1787         # Clear backend dependencies, since cli loads these manually
 1788         provider_api.ProviderAPIs._clear_registry_instances()
 1789         trust = cli.TrustFlush()
 1790         self.assertRaises(ValueError, trust.main)
 1791 
 1792 
 1793 class TestMappingEngineTester(unit.BaseTestCase):
 1794 
 1795     class FakeConfCommand(object):
 1796         def __init__(self, parent):
 1797             self.extension = False
 1798             self.rules = parent.command_rules
 1799             self.input = parent.command_input
 1800             self.prefix = parent.command_prefix
 1801             self.engine_debug = parent.command_engine_debug
 1802 
 1803     def setUp(self):
 1804         # Set up preset cli options and a parser
 1805         super(TestMappingEngineTester, self).setUp()
 1806         self.mapping_id = uuid.uuid4().hex
 1807         self.rules_pathname = None
 1808         self.rules = None
 1809         self.assertion_pathname = None
 1810         self.assertion = None
 1811         self.logging = self.useFixture(fixtures.LoggerFixture())
 1812         self.useFixture(database.Database())
 1813         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
 1814         self.config_fixture.register_cli_opt(cli.command_opt)
 1815         # For unit tests that should not throw any erorrs,
 1816         # Use the argument parser to test that the combinations work
 1817         parser_test = argparse.ArgumentParser()
 1818         subparsers = parser_test.add_subparsers()
 1819         self.parser = cli.MappingEngineTester.add_argument_parser(subparsers)
 1820 
 1821     def config_files(self):
 1822         config_files = super(TestMappingEngineTester, self).config_files()
 1823         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
 1824         return config_files
 1825 
 1826     def test_mapping_engine_tester_with_invalid_rules_file(self):
 1827         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1828         tmpinvalidfile = tempfilejson.file_name
 1829         # Here the data required for rules should be in JSON format
 1830         # whereas the file contains text.
 1831         with open(tmpinvalidfile, 'w') as f:
 1832             f.write("This is an invalid data")
 1833         self.command_rules = tmpinvalidfile
 1834         self.command_input = tmpinvalidfile
 1835         self.command_prefix = None
 1836         self.command_engine_debug = True
 1837         self.useFixture(fixtures.MockPatchObject(
 1838             CONF, 'command', self.FakeConfCommand(self)))
 1839         mapping_engine = cli.MappingEngineTester()
 1840         self.assertRaises(SystemExit, mapping_engine.main)
 1841 
 1842     def test_mapping_engine_tester_with_invalid_input_file(self):
 1843         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1844         tmpfilejsonname = tempfilejson.file_name
 1845         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_SMALL)
 1846         with open(tmpfilejsonname, 'w') as f:
 1847             f.write(jsonutils.dumps(updated_mapping))
 1848         self.command_rules = tmpfilejsonname
 1849         # Here invalid.csv does not exist
 1850         self.command_input = "invalid.csv"
 1851         self.command_prefix = None
 1852         self.command_engine_debug = True
 1853         self.useFixture(fixtures.MockPatchObject(
 1854             CONF, 'command', self.FakeConfCommand(self)))
 1855         mapping_engine = cli.MappingEngineTester()
 1856         self.assertRaises(SystemExit, mapping_engine.main)
 1857 
 1858     def test_mapping_engine_tester(self):
 1859         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1860         tmpfilejsonname = tempfilejson.file_name
 1861         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_SMALL)
 1862         with open(tmpfilejsonname, 'w') as f:
 1863             f.write(jsonutils.dumps(updated_mapping))
 1864         self.command_rules = tmpfilejsonname
 1865         tempfile = self.useFixture(temporaryfile.SecureTempFile())
 1866         tmpfilename = tempfile.file_name
 1867         with open(tmpfilename, 'w') as f:
 1868             f.write("\n")
 1869             f.write("UserName:me\n")
 1870             f.write("orgPersonType:NoContractor\n")
 1871             f.write("LastName:Bo\n")
 1872             f.write("FirstName:Jill\n")
 1873         self.command_input = tmpfilename
 1874         self.command_prefix = None
 1875         self.command_engine_debug = True
 1876         self.useFixture(fixtures.MockPatchObject(
 1877             CONF, 'command', self.FakeConfCommand(self)))
 1878         mapping_engine = cli.MappingEngineTester()
 1879         with mock.patch('builtins.print') as mock_print:
 1880             mapping_engine.main()
 1881             self.assertEqual(mock_print.call_count, 3)
 1882             call = mock_print.call_args_list[0]
 1883             args, kwargs = call
 1884             self.assertTrue(args[0].startswith('Using Rules:'))
 1885             call = mock_print.call_args_list[1]
 1886             args, kwargs = call
 1887             self.assertTrue(args[0].startswith('Using Assertion:'))
 1888             call = mock_print.call_args_list[2]
 1889             args, kwargs = call
 1890             expected = {
 1891                 "group_names": [],
 1892                 "user": {
 1893                     "type": "ephemeral",
 1894                     "name": "me"
 1895                 },
 1896                 "projects": [],
 1897                 "group_ids": ["0cd5e9"]
 1898             }
 1899             self.assertEqual(jsonutils.loads(args[0]), expected)
 1900 
 1901     def test_mapping_engine_tester_with_invalid_data(self):
 1902         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1903         tmpfilejsonname = tempfilejson.file_name
 1904         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_SMALL)
 1905         with open(tmpfilejsonname, 'w') as f:
 1906             f.write(jsonutils.dumps(updated_mapping))
 1907         self.command_rules = tmpfilejsonname
 1908         tempfile = self.useFixture(temporaryfile.SecureTempFile())
 1909         tmpfilename = tempfile.file_name
 1910         # Here we do not have any value matching to type 'Email'
 1911         # and condition in mapping_engine_test_rules.json
 1912         with open(tmpfilename, 'w') as f:
 1913             f.write("\n")
 1914             f.write("UserName: me\n")
 1915             f.write("Email: No@example.com\n")
 1916         self.command_input = tmpfilename
 1917         self.command_prefix = None
 1918         self.command_engine_debug = True
 1919         self.useFixture(fixtures.MockPatchObject(
 1920             CONF, 'command', self.FakeConfCommand(self)))
 1921         mapping_engine = cli.MappingEngineTester()
 1922         self.assertRaises(exception.ValidationError,
 1923                           mapping_engine.main)
 1924 
 1925     def test_mapping_engine_tester_logs_direct_maps(self):
 1926         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1927         tmpfilejsonname = tempfilejson.file_name
 1928         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_SMALL)
 1929         with open(tmpfilejsonname, 'w') as f:
 1930             f.write(jsonutils.dumps(updated_mapping))
 1931         self.command_rules = tmpfilejsonname
 1932         tempfile = self.useFixture(temporaryfile.SecureTempFile())
 1933         tmpfilename = tempfile.file_name
 1934         with open(tmpfilename, 'w') as f:
 1935             f.write("\n")
 1936             f.write("UserName:me\n")
 1937             f.write("orgPersonType:NoContractor\n")
 1938             f.write("LastName:Bo\n")
 1939             f.write("FirstName:Jill\n")
 1940         self.command_input = tmpfilename
 1941         self.command_prefix = None
 1942         self.command_engine_debug = True
 1943         self.useFixture(fixtures.MockPatchObject(
 1944             CONF, 'command', self.FakeConfCommand(self)))
 1945         mapping_engine = cli.MappingEngineTester()
 1946         logging = self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
 1947         mapping_engine.main()
 1948         expected_msg = "direct_maps: [['me']]"
 1949         self.assertThat(logging.output, matchers.Contains(expected_msg))
 1950 
 1951 
 1952 class CliStatusTestCase(unit.SQLDriverOverrides, unit.TestCase):
 1953 
 1954     def setUp(self):
 1955         self.useFixture(database.Database())
 1956         super(CliStatusTestCase, self).setUp()
 1957         self.load_backends()
 1958         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
 1959         self.policy_file_name = self.policy_file.file_name
 1960         self.useFixture(
 1961             policy.Policy(
 1962                 self.config_fixture, policy_file=self.policy_file_name
 1963             )
 1964         )
 1965         self.checks = status.Checks()
 1966 
 1967     def test_check_safe_trust_policies(self):
 1968         with open(self.policy_file_name, 'w') as f:
 1969             overridden_policies = {
 1970                 'identity:list_trusts': '',
 1971                 'identity:delete_trust': '',
 1972                 'identity:get_trust': '',
 1973                 'identity:list_roles_for_trust': '',
 1974                 'identity:get_role_for_trust': ''
 1975             }
 1976             f.write(jsonutils.dumps(overridden_policies))
 1977         result = self.checks.check_trust_policies_are_not_empty()
 1978         self.assertEqual(upgradecheck.Code.FAILURE, result.code)
 1979         with open(self.policy_file_name, 'w') as f:
 1980             overridden_policies = {
 1981                 'identity:list_trusts': 'rule:admin_required',
 1982                 'identity:delete_trust': 'rule:admin_required',
 1983                 'identity:get_trust': 'rule:admin_required',
 1984                 'identity:list_roles_for_trust': 'rule:admin_required',
 1985                 'identity:get_role_for_trust': 'rule:admin_required'
 1986             }
 1987             f.write(jsonutils.dumps(overridden_policies))
 1988         result = self.checks.check_trust_policies_are_not_empty()
 1989         self.assertEqual(upgradecheck.Code.SUCCESS, result.code)
 1990         with open(self.policy_file_name, 'w') as f:
 1991             overridden_policies = {}
 1992             f.write(jsonutils.dumps(overridden_policies))
 1993         result = self.checks.check_trust_policies_are_not_empty()
 1994         self.assertEqual(upgradecheck.Code.SUCCESS, result.code)
 1995 
 1996     def test_check_immutable_roles(self):
 1997         role_ref = unit.new_role_ref(name='admin')
 1998         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 1999         result = self.checks.check_default_roles_are_immutable()
 2000         self.assertEqual(upgradecheck.Code.FAILURE, result.code)
 2001         role_ref['options'] = {'immutable': True}
 2002         PROVIDERS.role_api.update_role(role_ref['id'], role_ref)
 2003         result = self.checks.check_default_roles_are_immutable()
 2004         self.assertEqual(upgradecheck.Code.SUCCESS, result.code)
 2005         # Check domain-specific roles are not reported
 2006         PROVIDERS.resource_api.create_domain(
 2007             default_fixtures.ROOT_DOMAIN['id'],
 2008             default_fixtures.ROOT_DOMAIN)
 2009         domain_ref = unit.new_domain_ref()
 2010         domain = PROVIDERS.resource_api.create_domain(
 2011             domain_ref['id'], domain_ref)
 2012         role_ref = unit.new_role_ref(name='admin', domain_id=domain['id'])
 2013         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 2014         result = self.checks.check_default_roles_are_immutable()
 2015         self.assertEqual(upgradecheck.Code.SUCCESS, result.code)