"Fossies" - the Fresh Open Source Software Archive

Member "keystone-18.0.0/keystone/tests/unit/test_cli.py" (14 Oct 2020, 90242 Bytes) of package /linux/misc/openstack/keystone-18.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_cli.py": 17.0.0_vs_18.0.0.

    1 # Copyright 2014 IBM Corp.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import copy
   16 import datetime
   17 import logging
   18 import os
   19 from unittest import mock
   20 import uuid
   21 
   22 import argparse
   23 import configparser
   24 import fixtures
   25 import freezegun
   26 import http.client
   27 import oslo_config.fixture
   28 from oslo_db.sqlalchemy import migration
   29 from oslo_log import log
   30 from oslo_serialization import jsonutils
   31 from oslo_upgradecheck import upgradecheck
   32 from testtools import matchers
   33 
   34 from keystone.cmd import cli
   35 from keystone.cmd.doctor import caching
   36 from keystone.cmd.doctor import credential
   37 from keystone.cmd.doctor import database as doc_database
   38 from keystone.cmd.doctor import debug
   39 from keystone.cmd.doctor import federation
   40 from keystone.cmd.doctor import ldap
   41 from keystone.cmd.doctor import security_compliance
   42 from keystone.cmd.doctor import tokens
   43 from keystone.cmd.doctor import tokens_fernet
   44 from keystone.cmd import status
   45 from keystone.common import provider_api
   46 from keystone.common.sql import upgrades
   47 import keystone.conf
   48 from keystone import exception
   49 from keystone.i18n import _
   50 from keystone.identity.mapping_backends import mapping as identity_mapping
   51 from keystone.tests import unit
   52 from keystone.tests.unit import default_fixtures
   53 from keystone.tests.unit.ksfixtures import database
   54 from keystone.tests.unit.ksfixtures import ldapdb
   55 from keystone.tests.unit.ksfixtures import policy
   56 from keystone.tests.unit.ksfixtures import temporaryfile
   57 from keystone.tests.unit import mapping_fixtures
   58 
   59 
   60 CONF = keystone.conf.CONF
   61 PROVIDERS = provider_api.ProviderAPIs
   62 
   63 
   64 class CliLoggingTestCase(unit.BaseTestCase):
   65 
   66     def setUp(self):
   67         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
   68         self.config_fixture.register_cli_opt(cli.command_opt)
   69         self.useFixture(fixtures.MockPatch(
   70             'oslo_config.cfg.find_config_files', return_value=[]))
   71         fd = self.useFixture(temporaryfile.SecureTempFile())
   72         self.fake_config_file = fd.file_name
   73         super(CliLoggingTestCase, self).setUp()
   74 
   75         # NOTE(crinkle): the command call doesn't have to actually work,
   76         # that's what the other unit tests are for. So just mock it out.
   77         class FakeConfCommand(object):
   78             def __init__(self):
   79                 self.cmd_class = mock.Mock()
   80         self.useFixture(fixtures.MockPatchObject(
   81             CONF, 'command', FakeConfCommand()))
   82 
   83         self.logging = self.useFixture(fixtures.FakeLogger(level=log.WARN))
   84 
   85     def test_absent_config_logs_warning(self):
   86         expected_msg = 'Config file not found, using default configs.'
   87         cli.main(argv=['keystone-manage', 'db_sync'])
   88         self.assertThat(self.logging.output, matchers.Contains(expected_msg))
   89 
   90     def test_present_config_does_not_log_warning(self):
   91         fake_argv = [
   92             'keystone-manage', '--config-file', self.fake_config_file, 'doctor'
   93         ]
   94         cli.main(argv=fake_argv)
   95         expected_msg = 'Config file not found, using default configs.'
   96         self.assertNotIn(expected_msg, self.logging.output)
   97 
   98 
   99 class CliBootStrapTestCase(unit.SQLDriverOverrides, unit.TestCase):
  100 
  101     def setUp(self):
  102         self.useFixture(database.Database())
  103         super(CliBootStrapTestCase, self).setUp()
  104         self.bootstrap = cli.BootStrap()
  105 
  106     def config_files(self):
  107         self.config_fixture.register_cli_opt(cli.command_opt)
  108         config_files = super(CliBootStrapTestCase, self).config_files()
  109         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
  110         return config_files
  111 
  112     def config(self, config_files):
  113         CONF(args=['bootstrap', '--bootstrap-password', uuid.uuid4().hex],
  114              project='keystone',
  115              default_config_files=config_files)
  116 
  117     def test_bootstrap(self):
  118         self._do_test_bootstrap(self.bootstrap)
  119 
  120     def _do_test_bootstrap(self, bootstrap):
  121         try:
  122             PROVIDERS.resource_api.create_domain(
  123                 default_fixtures.ROOT_DOMAIN['id'],
  124                 default_fixtures.ROOT_DOMAIN)
  125         except exception.Conflict:
  126             pass
  127 
  128         bootstrap.do_bootstrap()
  129         project = PROVIDERS.resource_api.get_project_by_name(
  130             bootstrap.project_name,
  131             'default')
  132         user = PROVIDERS.identity_api.get_user_by_name(
  133             bootstrap.username,
  134             'default')
  135         admin_role = PROVIDERS.role_api.get_role(bootstrap.role_id)
  136         reader_role = PROVIDERS.role_api.get_role(bootstrap.reader_role_id)
  137         member_role = PROVIDERS.role_api.get_role(bootstrap.member_role_id)
  138         role_list = (
  139             PROVIDERS.assignment_api.get_roles_for_user_and_project(
  140                 user['id'],
  141                 project['id']))
  142         self.assertIs(3, len(role_list))
  143         self.assertIn(admin_role['id'], role_list)
  144         self.assertIn(reader_role['id'], role_list)
  145         self.assertIn(member_role['id'], role_list)
  146         system_roles = (
  147             PROVIDERS.assignment_api.list_system_grants_for_user(
  148                 user['id']
  149             )
  150         )
  151         self.assertIs(1, len(system_roles))
  152         self.assertEqual(system_roles[0]['id'], admin_role['id'])
  153         # NOTE(morganfainberg): Pass an empty context, it isn't used by
  154         # `authenticate` method.
  155         with self.make_request():
  156             PROVIDERS.identity_api.authenticate(
  157                 user['id'],
  158                 bootstrap.password)
  159 
  160         if bootstrap.region_id:
  161             region = PROVIDERS.catalog_api.get_region(bootstrap.region_id)
  162             self.assertEqual(self.region_id, region['id'])
  163 
  164         if bootstrap.service_id:
  165             svc = PROVIDERS.catalog_api.get_service(bootstrap.service_id)
  166             self.assertEqual(self.service_name, svc['name'])
  167 
  168             self.assertEqual(set(['admin', 'public', 'internal']),
  169                              set(bootstrap.endpoints))
  170 
  171             urls = {'public': self.public_url,
  172                     'internal': self.internal_url,
  173                     'admin': self.admin_url}
  174 
  175             for interface, url in urls.items():
  176                 endpoint_id = bootstrap.endpoints[interface]
  177                 endpoint = PROVIDERS.catalog_api.get_endpoint(endpoint_id)
  178 
  179                 self.assertEqual(self.region_id, endpoint['region_id'])
  180                 self.assertEqual(url, endpoint['url'])
  181                 self.assertEqual(svc['id'], endpoint['service_id'])
  182                 self.assertEqual(interface, endpoint['interface'])
  183 
  184     def test_bootstrap_is_idempotent_when_password_does_not_change(self):
  185         # NOTE(morganfainberg): Ensure we can run bootstrap with the same
  186         # configuration multiple times without erroring.
  187         self._do_test_bootstrap(self.bootstrap)
  188         app = self.loadapp()
  189         v3_password_data = {
  190             'auth': {
  191                 'identity': {
  192                     "methods": ["password"],
  193                     "password": {
  194                         "user": {
  195                             "name": self.bootstrap.username,
  196                             "password": self.bootstrap.password,
  197                             "domain": {
  198                                 "id": CONF.identity.default_domain_id
  199                             }
  200                         }
  201                     }
  202                 }
  203             }
  204         }
  205         with app.test_client() as c:
  206             auth_response = c.post('/v3/auth/tokens',
  207                                    json=v3_password_data)
  208             token = auth_response.headers['X-Subject-Token']
  209         self._do_test_bootstrap(self.bootstrap)
  210         # build validation request
  211         with app.test_client() as c:
  212             # Get a new X-Auth-Token
  213             r = c.post(
  214                 '/v3/auth/tokens',
  215                 json=v3_password_data)
  216 
  217             # Validate the old token with our new X-Auth-Token.
  218             c.get('/v3/auth/tokens',
  219                   headers={'X-Auth-Token': r.headers['X-Subject-Token'],
  220                            'X-Subject-Token': token})
  221         admin_role = PROVIDERS.role_api.get_role(self.bootstrap.role_id)
  222         reader_role = PROVIDERS.role_api.get_role(
  223             self.bootstrap.reader_role_id)
  224         member_role = PROVIDERS.role_api.get_role(
  225             self.bootstrap.member_role_id)
  226         self.assertEqual(admin_role['options'], {'immutable': True})
  227         self.assertEqual(member_role['options'], {'immutable': True})
  228         self.assertEqual(reader_role['options'], {'immutable': True})
  229 
  230     def test_bootstrap_is_not_idempotent_when_password_does_change(self):
  231         # NOTE(lbragstad): Ensure bootstrap isn't idempotent when run with
  232         # different arguments or configuration values.
  233         self._do_test_bootstrap(self.bootstrap)
  234         app = self.loadapp()
  235         v3_password_data = {
  236             'auth': {
  237                 'identity': {
  238                     "methods": ["password"],
  239                     "password": {
  240                         "user": {
  241                             "name": self.bootstrap.username,
  242                             "password": self.bootstrap.password,
  243                             "domain": {
  244                                 "id": CONF.identity.default_domain_id
  245                             }
  246                         }
  247                     }
  248                 }
  249             }
  250         }
  251         time = datetime.datetime.utcnow()
  252         with freezegun.freeze_time(time) as frozen_time:
  253             with app.test_client() as c:
  254                 auth_response = c.post('/v3/auth/tokens',
  255                                        json=v3_password_data)
  256                 token = auth_response.headers['X-Subject-Token']
  257             new_passwd = uuid.uuid4().hex
  258             os.environ['OS_BOOTSTRAP_PASSWORD'] = new_passwd
  259             self._do_test_bootstrap(self.bootstrap)
  260             v3_password_data['auth']['identity']['password']['user'][
  261                 'password'] = new_passwd
  262             # Move time forward a second to avoid rev. event capturing the new
  263             # auth-token since we're within a single second (possibly) for the
  264             # test case.
  265             frozen_time.tick(delta=datetime.timedelta(seconds=1))
  266             # Validate the old token
  267             with app.test_client() as c:
  268                 # Get a new X-Auth-Token
  269                 r = c.post('/v3/auth/tokens', json=v3_password_data)
  270                 # Since the user account was recovered with a different
  271                 # password, we shouldn't be able to validate this token.
  272                 # Bootstrap should have persisted a revocation event because
  273                 # the user's password was updated. Since this token was
  274                 # obtained using the original password, it should now be
  275                 # invalid.
  276                 c.get('/v3/auth/tokens',
  277                       headers={'X-Auth-Token': r.headers['X-Subject-Token'],
  278                                'X-Subject-Token': token},
  279                       expected_status_code=http.client.NOT_FOUND)
  280 
  281     def test_bootstrap_recovers_user(self):
  282         self._do_test_bootstrap(self.bootstrap)
  283 
  284         # Completely lock the user out.
  285         user_id = PROVIDERS.identity_api.get_user_by_name(
  286             self.bootstrap.username,
  287             'default')['id']
  288         PROVIDERS.identity_api.update_user(
  289             user_id,
  290             {'enabled': False,
  291              'password': uuid.uuid4().hex})
  292 
  293         # The second bootstrap run will recover the account.
  294         self._do_test_bootstrap(self.bootstrap)
  295 
  296         # Sanity check that the original password works again.
  297         with self.make_request():
  298             PROVIDERS.identity_api.authenticate(
  299                 user_id,
  300                 self.bootstrap.password)
  301 
  302     def test_bootstrap_with_explicit_immutable_roles(self):
  303         CONF(args=['bootstrap',
  304                    '--bootstrap-password', uuid.uuid4().hex,
  305                    '--immutable-roles'],
  306              project='keystone')
  307         self._do_test_bootstrap(self.bootstrap)
  308         admin_role = PROVIDERS.role_api.get_role(self.bootstrap.role_id)
  309         reader_role = PROVIDERS.role_api.get_role(
  310             self.bootstrap.reader_role_id)
  311         member_role = PROVIDERS.role_api.get_role(
  312             self.bootstrap.member_role_id)
  313         self.assertTrue(admin_role['options']['immutable'])
  314         self.assertTrue(member_role['options']['immutable'])
  315         self.assertTrue(reader_role['options']['immutable'])
  316 
  317     def test_bootstrap_with_default_immutable_roles(self):
  318         CONF(args=['bootstrap',
  319                    '--bootstrap-password', uuid.uuid4().hex],
  320              project='keystone')
  321         self._do_test_bootstrap(self.bootstrap)
  322         admin_role = PROVIDERS.role_api.get_role(self.bootstrap.role_id)
  323         reader_role = PROVIDERS.role_api.get_role(
  324             self.bootstrap.reader_role_id)
  325         member_role = PROVIDERS.role_api.get_role(
  326             self.bootstrap.member_role_id)
  327         self.assertTrue(admin_role['options']['immutable'])
  328         self.assertTrue(member_role['options']['immutable'])
  329         self.assertTrue(reader_role['options']['immutable'])
  330 
  331     def test_bootstrap_with_no_immutable_roles(self):
  332         CONF(args=['bootstrap',
  333                    '--bootstrap-password', uuid.uuid4().hex,
  334                    '--no-immutable-roles'],
  335              project='keystone')
  336         self._do_test_bootstrap(self.bootstrap)
  337         admin_role = PROVIDERS.role_api.get_role(self.bootstrap.role_id)
  338         reader_role = PROVIDERS.role_api.get_role(
  339             self.bootstrap.reader_role_id)
  340         member_role = PROVIDERS.role_api.get_role(
  341             self.bootstrap.member_role_id)
  342         self.assertNotIn('immutable', admin_role['options'])
  343         self.assertNotIn('immutable', member_role['options'])
  344         self.assertNotIn('immutable', reader_role['options'])
  345 
  346     def test_bootstrap_with_ambiguous_role_names(self):
  347         # bootstrap system to create the default admin role
  348         self._do_test_bootstrap(self.bootstrap)
  349 
  350         # create a domain-specific roles that share the same names as the
  351         # default roles created by keystone-manage bootstrap
  352         domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
  353         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
  354         domain_roles = {}
  355 
  356         for name in ['admin', 'member', 'reader']:
  357             domain_role = {
  358                 'domain_id': domain['id'],
  359                 'id': uuid.uuid4().hex,
  360                 'name': name
  361             }
  362             domain_roles[name] = PROVIDERS.role_api.create_role(
  363                 domain_role['id'], domain_role
  364             )
  365 
  366             # ensure subsequent bootstrap attempts don't fail because of
  367             # ambiguity
  368             self._do_test_bootstrap(self.bootstrap)
  369 
  370 
  371 class CliBootStrapTestCaseWithEnvironment(CliBootStrapTestCase):
  372 
  373     def config(self, config_files):
  374         CONF(args=['bootstrap'], project='keystone',
  375              default_config_files=config_files)
  376 
  377     def setUp(self):
  378         super(CliBootStrapTestCaseWithEnvironment, self).setUp()
  379         self.password = uuid.uuid4().hex
  380         self.username = uuid.uuid4().hex
  381         self.project_name = uuid.uuid4().hex
  382         self.role_name = uuid.uuid4().hex
  383         self.service_name = uuid.uuid4().hex
  384         self.public_url = uuid.uuid4().hex
  385         self.internal_url = uuid.uuid4().hex
  386         self.admin_url = uuid.uuid4().hex
  387         self.region_id = uuid.uuid4().hex
  388         self.default_domain = {
  389             'id': CONF.identity.default_domain_id,
  390             'name': 'Default',
  391         }
  392         self.useFixture(
  393             fixtures.EnvironmentVariable('OS_BOOTSTRAP_PASSWORD',
  394                                          newvalue=self.password))
  395         self.useFixture(
  396             fixtures.EnvironmentVariable('OS_BOOTSTRAP_USERNAME',
  397                                          newvalue=self.username))
  398         self.useFixture(
  399             fixtures.EnvironmentVariable('OS_BOOTSTRAP_PROJECT_NAME',
  400                                          newvalue=self.project_name))
  401         self.useFixture(
  402             fixtures.EnvironmentVariable('OS_BOOTSTRAP_ROLE_NAME',
  403                                          newvalue=self.role_name))
  404         self.useFixture(
  405             fixtures.EnvironmentVariable('OS_BOOTSTRAP_SERVICE_NAME',
  406                                          newvalue=self.service_name))
  407         self.useFixture(
  408             fixtures.EnvironmentVariable('OS_BOOTSTRAP_PUBLIC_URL',
  409                                          newvalue=self.public_url))
  410         self.useFixture(
  411             fixtures.EnvironmentVariable('OS_BOOTSTRAP_INTERNAL_URL',
  412                                          newvalue=self.internal_url))
  413         self.useFixture(
  414             fixtures.EnvironmentVariable('OS_BOOTSTRAP_ADMIN_URL',
  415                                          newvalue=self.admin_url))
  416         self.useFixture(
  417             fixtures.EnvironmentVariable('OS_BOOTSTRAP_REGION_ID',
  418                                          newvalue=self.region_id))
  419 
  420         PROVIDERS.resource_api.create_domain(
  421             default_fixtures.ROOT_DOMAIN['id'], default_fixtures.ROOT_DOMAIN)
  422 
  423     def test_assignment_created_with_user_exists(self):
  424         # test assignment can be created if user already exists.
  425         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  426                                              self.default_domain)
  427         user_ref = unit.new_user_ref(self.default_domain['id'],
  428                                      name=self.username,
  429                                      password=self.password)
  430         PROVIDERS.identity_api.create_user(user_ref)
  431 
  432         self._do_test_bootstrap(self.bootstrap)
  433 
  434     def test_assignment_created_with_project_exists(self):
  435         # test assignment can be created if project already exists.
  436         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  437                                              self.default_domain)
  438         project_ref = unit.new_project_ref(self.default_domain['id'],
  439                                            name=self.project_name)
  440         PROVIDERS.resource_api.create_project(project_ref['id'], project_ref)
  441         self._do_test_bootstrap(self.bootstrap)
  442 
  443     def test_assignment_created_with_role_exists(self):
  444         # test assignment can be created if role already exists.
  445         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  446                                              self.default_domain)
  447         role = unit.new_role_ref(name=self.role_name)
  448         PROVIDERS.role_api.create_role(role['id'], role)
  449         self._do_test_bootstrap(self.bootstrap)
  450 
  451     def test_assignment_created_with_region_exists(self):
  452         # test assignment can be created if region already exists.
  453         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  454                                              self.default_domain)
  455         region = unit.new_region_ref(id=self.region_id)
  456         PROVIDERS.catalog_api.create_region(region)
  457         self._do_test_bootstrap(self.bootstrap)
  458 
  459     def test_endpoints_created_with_service_exists(self):
  460         # test assignment can be created if service already exists.
  461         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  462                                              self.default_domain)
  463         service = unit.new_service_ref(name=self.service_name)
  464         PROVIDERS.catalog_api.create_service(service['id'], service)
  465         self._do_test_bootstrap(self.bootstrap)
  466 
  467     def test_endpoints_created_with_endpoint_exists(self):
  468         # test assignment can be created if endpoint already exists.
  469         PROVIDERS.resource_api.create_domain(self.default_domain['id'],
  470                                              self.default_domain)
  471         service = unit.new_service_ref(name=self.service_name)
  472         PROVIDERS.catalog_api.create_service(service['id'], service)
  473 
  474         region = unit.new_region_ref(id=self.region_id)
  475         PROVIDERS.catalog_api.create_region(region)
  476 
  477         endpoint = unit.new_endpoint_ref(interface='public',
  478                                          service_id=service['id'],
  479                                          url=self.public_url,
  480                                          region_id=self.region_id)
  481         PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
  482 
  483         self._do_test_bootstrap(self.bootstrap)
  484 
  485     def test_endpoints_created_with_new_endpoints(self):
  486         service = unit.new_service_ref(name=self.service_name, type='identity')
  487         PROVIDERS.catalog_api.create_service(service['id'], service)
  488         region = unit.new_region_ref(id=self.region_id)
  489         PROVIDERS.catalog_api.create_region(region)
  490         endpoint = unit.new_endpoint_ref(interface='public',
  491                                          service_id=service['id'],
  492                                          url=uuid.uuid4().hex,
  493                                          region_id=self.region_id)
  494         PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
  495 
  496         self._do_test_bootstrap(self.bootstrap)
  497         updated_endpoint = PROVIDERS.catalog_api.get_endpoint(endpoint['id'])
  498         self.assertEqual(updated_endpoint['url'], self.bootstrap.public_url)
  499 
  500 
  501 class CliDomainConfigAllTestCase(unit.SQLDriverOverrides, unit.TestCase):
  502 
  503     def setUp(self):
  504         self.useFixture(database.Database())
  505         super(CliDomainConfigAllTestCase, self).setUp()
  506         self.load_backends()
  507         self.config_fixture.config(
  508             group='identity',
  509             domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap')
  510         self.domain_count = 3
  511         self.setup_initial_domains()
  512         self.logging = self.useFixture(
  513             fixtures.FakeLogger(level=logging.INFO))
  514 
  515     def config_files(self):
  516         self.config_fixture.register_cli_opt(cli.command_opt)
  517         config_files = super(CliDomainConfigAllTestCase, self).config_files()
  518         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
  519         return config_files
  520 
  521     def cleanup_domains(self):
  522         for domain in self.domains:
  523             if domain == 'domain_default':
  524                 # Not allowed to delete the default domain, but should at least
  525                 # delete any domain-specific config for it.
  526                 PROVIDERS.domain_config_api.delete_config(
  527                     CONF.identity.default_domain_id)
  528                 continue
  529             this_domain = self.domains[domain]
  530             this_domain['enabled'] = False
  531             PROVIDERS.resource_api.update_domain(
  532                 this_domain['id'], this_domain
  533             )
  534             PROVIDERS.resource_api.delete_domain(this_domain['id'])
  535         self.domains = {}
  536 
  537     def config(self, config_files):
  538         CONF(args=['domain_config_upload', '--all'], project='keystone',
  539              default_config_files=config_files)
  540 
  541     def setup_initial_domains(self):
  542 
  543         def create_domain(domain):
  544             return PROVIDERS.resource_api.create_domain(domain['id'], domain)
  545 
  546         PROVIDERS.resource_api.create_domain(
  547             default_fixtures.ROOT_DOMAIN['id'], default_fixtures.ROOT_DOMAIN)
  548 
  549         self.domains = {}
  550         self.addCleanup(self.cleanup_domains)
  551         for x in range(1, self.domain_count):
  552             domain = 'domain%s' % x
  553             self.domains[domain] = create_domain(
  554                 {'id': uuid.uuid4().hex, 'name': domain})
  555         self.default_domain = unit.new_domain_ref(
  556             description=u'The default domain',
  557             id=CONF.identity.default_domain_id,
  558             name=u'Default')
  559         self.domains['domain_default'] = create_domain(self.default_domain)
  560 
  561     def test_config_upload(self):
  562         # The values below are the same as in the domain_configs_multi_ldap
  563         # directory of test config_files.
  564         default_config = {
  565             'ldap': {'url': 'fake://memory',
  566                      'user': 'cn=Admin',
  567                      'password': 'password',
  568                      'suffix': 'cn=example,cn=com'},
  569             'identity': {'driver': 'ldap'}
  570         }
  571         domain1_config = {
  572             'ldap': {'url': 'fake://memory1',
  573                      'user': 'cn=Admin',
  574                      'password': 'password',
  575                      'suffix': 'cn=example,cn=com'},
  576             'identity': {'driver': 'ldap',
  577                          'list_limit': '101'}
  578         }
  579         domain2_config = {
  580             'ldap': {'url': 'fake://memory',
  581                      'user': 'cn=Admin',
  582                      'password': 'password',
  583                      'suffix': 'cn=myroot,cn=com',
  584                      'group_tree_dn': 'ou=UserGroups,dc=myroot,dc=org',
  585                      'user_tree_dn': 'ou=Users,dc=myroot,dc=org'},
  586             'identity': {'driver': 'ldap'}
  587         }
  588 
  589         # Clear backend dependencies, since cli loads these manually
  590         provider_api.ProviderAPIs._clear_registry_instances()
  591         cli.DomainConfigUpload.main()
  592 
  593         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  594             CONF.identity.default_domain_id)
  595         self.assertEqual(default_config, res)
  596         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  597             self.domains['domain1']['id'])
  598         self.assertEqual(domain1_config, res)
  599         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  600             self.domains['domain2']['id'])
  601         self.assertEqual(domain2_config, res)
  602 
  603 
  604 class CliDomainConfigSingleDomainTestCase(CliDomainConfigAllTestCase):
  605 
  606     def config(self, config_files):
  607         CONF(args=['domain_config_upload', '--domain-name', 'Default'],
  608              project='keystone', default_config_files=config_files)
  609 
  610     def test_config_upload(self):
  611         # The values below are the same as in the domain_configs_multi_ldap
  612         # directory of test config_files.
  613         default_config = {
  614             'ldap': {'url': 'fake://memory',
  615                      'user': 'cn=Admin',
  616                      'password': 'password',
  617                      'suffix': 'cn=example,cn=com'},
  618             'identity': {'driver': 'ldap'}
  619         }
  620 
  621         # Clear backend dependencies, since cli loads these manually
  622         provider_api.ProviderAPIs._clear_registry_instances()
  623         cli.DomainConfigUpload.main()
  624 
  625         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  626             CONF.identity.default_domain_id)
  627         self.assertEqual(default_config, res)
  628         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  629             self.domains['domain1']['id'])
  630         self.assertEqual({}, res)
  631         res = PROVIDERS.domain_config_api.get_config_with_sensitive_info(
  632             self.domains['domain2']['id'])
  633         self.assertEqual({}, res)
  634 
  635     def test_no_overwrite_config(self):
  636         # Create a config for the default domain
  637         default_config = {
  638             'ldap': {'url': uuid.uuid4().hex},
  639             'identity': {'driver': 'ldap'}
  640         }
  641         PROVIDERS.domain_config_api.create_config(
  642             CONF.identity.default_domain_id, default_config)
  643 
  644         # Now try and upload the settings in the configuration file for the
  645         # default domain
  646         provider_api.ProviderAPIs._clear_registry_instances()
  647         with mock.patch('builtins.print') as mock_print:
  648             self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
  649             file_name = ('keystone.%s.conf' % self.default_domain['name'])
  650             error_msg = _(
  651                 'Domain: %(domain)s already has a configuration defined - '
  652                 'ignoring file: %(file)s.') % {
  653                     'domain': self.default_domain['name'],
  654                     'file': os.path.join(CONF.identity.domain_config_dir,
  655                                          file_name)}
  656             mock_print.assert_has_calls([mock.call(error_msg)])
  657 
  658         res = PROVIDERS.domain_config_api.get_config(
  659             CONF.identity.default_domain_id)
  660         # The initial config should not have been overwritten
  661         self.assertEqual(default_config, res)
  662 
  663 
  664 class CliDomainConfigNoOptionsTestCase(CliDomainConfigAllTestCase):
  665 
  666     def config(self, config_files):
  667         CONF(args=['domain_config_upload'],
  668              project='keystone', default_config_files=config_files)
  669 
  670     def test_config_upload(self):
  671         provider_api.ProviderAPIs._clear_registry_instances()
  672         with mock.patch('builtins.print') as mock_print:
  673             self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
  674             mock_print.assert_has_calls(
  675                 [mock.call(
  676                     _('At least one option must be provided, use either '
  677                       '--all or --domain-name'))])
  678 
  679 
  680 class CliDomainConfigTooManyOptionsTestCase(CliDomainConfigAllTestCase):
  681 
  682     def config(self, config_files):
  683         CONF(args=['domain_config_upload', '--all', '--domain-name',
  684                    'Default'],
  685              project='keystone', default_config_files=config_files)
  686 
  687     def test_config_upload(self):
  688         provider_api.ProviderAPIs._clear_registry_instances()
  689         with mock.patch('builtins.print') as mock_print:
  690             self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
  691             mock_print.assert_has_calls(
  692                 [mock.call(_('The --all option cannot be used with '
  693                              'the --domain-name option'))])
  694 
  695 
  696 class CliDomainConfigInvalidDomainTestCase(CliDomainConfigAllTestCase):
  697 
  698     def config(self, config_files):
  699         self.invalid_domain_name = uuid.uuid4().hex
  700         CONF(args=['domain_config_upload', '--domain-name',
  701                    self.invalid_domain_name],
  702              project='keystone', default_config_files=config_files)
  703 
  704     def test_config_upload(self):
  705         provider_api.ProviderAPIs._clear_registry_instances()
  706         with mock.patch('builtins.print') as mock_print:
  707             self.assertRaises(unit.UnexpectedExit, cli.DomainConfigUpload.main)
  708             file_name = 'keystone.%s.conf' % self.invalid_domain_name
  709             error_msg = (_(
  710                 'Invalid domain name: %(domain)s found in config file name: '
  711                 '%(file)s - ignoring this file.') % {
  712                     'domain': self.invalid_domain_name,
  713                     'file': os.path.join(CONF.identity.domain_config_dir,
  714                                          file_name)})
  715             mock_print.assert_has_calls([mock.call(error_msg)])
  716 
  717 
  718 class TestDomainConfigFinder(unit.BaseTestCase):
  719 
  720     def setUp(self):
  721         super(TestDomainConfigFinder, self).setUp()
  722         self.logging = self.useFixture(fixtures.LoggerFixture())
  723 
  724     @mock.patch('os.walk')
  725     def test_finder_ignores_files(self, mock_walk):
  726         mock_walk.return_value = [
  727             ['.', [], ['file.txt', 'keystone.conf', 'keystone.domain0.conf']],
  728         ]
  729 
  730         domain_configs = list(cli._domain_config_finder('.'))
  731 
  732         expected_domain_configs = [('./keystone.domain0.conf', 'domain0')]
  733         self.assertThat(domain_configs,
  734                         matchers.Equals(expected_domain_configs))
  735 
  736         expected_msg_template = ('Ignoring file (%s) while scanning '
  737                                  'domain config directory')
  738         self.assertThat(
  739             self.logging.output,
  740             matchers.Contains(expected_msg_template % 'file.txt'))
  741         self.assertThat(
  742             self.logging.output,
  743             matchers.Contains(expected_msg_template % 'keystone.conf'))
  744 
  745 
  746 class CliDBSyncTestCase(unit.BaseTestCase):
  747 
  748     class FakeConfCommand(object):
  749         def __init__(self, parent):
  750             self.extension = False
  751             self.check = parent.command_check
  752             self.expand = parent.command_expand
  753             self.migrate = parent.command_migrate
  754             self.contract = parent.command_contract
  755             self.version = None
  756 
  757     def setUp(self):
  758         super(CliDBSyncTestCase, self).setUp()
  759         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
  760         self.config_fixture.register_cli_opt(cli.command_opt)
  761         upgrades.offline_sync_database_to_version = mock.Mock()
  762         upgrades.expand_schema = mock.Mock()
  763         upgrades.migrate_data = mock.Mock()
  764         upgrades.contract_schema = mock.Mock()
  765         self.command_check = False
  766         self.command_expand = False
  767         self.command_migrate = False
  768         self.command_contract = False
  769 
  770     def _assert_correct_call(self, mocked_function):
  771         for func in [upgrades.offline_sync_database_to_version,
  772                      upgrades.expand_schema,
  773                      upgrades.migrate_data,
  774                      upgrades.contract_schema]:
  775             if func == mocked_function:
  776                 self.assertTrue(func.called)
  777             else:
  778                 self.assertFalse(func.called)
  779 
  780     def test_db_sync(self):
  781         self.useFixture(fixtures.MockPatchObject(
  782             CONF, 'command', self.FakeConfCommand(self)))
  783         cli.DbSync.main()
  784         self._assert_correct_call(
  785             upgrades.offline_sync_database_to_version)
  786 
  787     def test_db_sync_expand(self):
  788         self.command_expand = True
  789         self.useFixture(fixtures.MockPatchObject(
  790             CONF, 'command', self.FakeConfCommand(self)))
  791         cli.DbSync.main()
  792         self._assert_correct_call(upgrades.expand_schema)
  793 
  794     def test_db_sync_migrate(self):
  795         self.command_migrate = True
  796         self.useFixture(fixtures.MockPatchObject(
  797             CONF, 'command', self.FakeConfCommand(self)))
  798         cli.DbSync.main()
  799         self._assert_correct_call(upgrades.migrate_data)
  800 
  801     def test_db_sync_contract(self):
  802         self.command_contract = True
  803         self.useFixture(fixtures.MockPatchObject(
  804             CONF, 'command', self.FakeConfCommand(self)))
  805         cli.DbSync.main()
  806         self._assert_correct_call(upgrades.contract_schema)
  807 
  808     @mock.patch('keystone.cmd.cli.upgrades.get_db_version')
  809     def test_db_sync_check_when_database_is_empty(self, mocked_get_db_version):
  810         e = migration.exception.DBMigrationError("Invalid version")
  811         mocked_get_db_version.side_effect = e
  812         checker = cli.DbSync()
  813 
  814         log_info = self.useFixture(fixtures.FakeLogger(level=log.INFO))
  815         status = checker.check_db_sync_status()
  816         self.assertIn("not currently under version control", log_info.output)
  817         self.assertEqual(status, 2)
  818 
  819 
  820 class TestMappingPopulate(unit.SQLDriverOverrides, unit.TestCase):
  821 
  822     def setUp(self):
  823         sqldb = self.useFixture(database.Database())
  824         super(TestMappingPopulate, self).setUp()
  825         self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
  826         self.ldapdb.clear()
  827 
  828         self.load_backends()
  829 
  830         sqldb.recreate()
  831         self.load_fixtures(default_fixtures)
  832 
  833     def config_files(self):
  834         self.config_fixture.register_cli_opt(cli.command_opt)
  835         config_files = super(TestMappingPopulate, self).config_files()
  836         config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
  837         return config_files
  838 
  839     def config_overrides(self):
  840         super(TestMappingPopulate, self).config_overrides()
  841         self.config_fixture.config(group='identity', driver='ldap')
  842         self.config_fixture.config(group='identity_mapping',
  843                                    backward_compatible_ids=False)
  844 
  845     def config(self, config_files):
  846         CONF(args=['mapping_populate', '--domain-name', 'Default'],
  847              project='keystone',
  848              default_config_files=config_files)
  849 
  850     def test_mapping_populate(self):
  851         # mapping_populate should create id mappings. Test plan:
  852         # 0. Purge mappings
  853         # 1. Fetch user list directly via backend. It will not create any
  854         #    mappings because it bypasses identity manager
  855         # 2. Verify that users have no public_id yet
  856         # 3. Execute mapping_populate. It should create id mappings
  857         # 4. For the same users verify that they have public_id now
  858         purge_filter = {}
  859         PROVIDERS.id_mapping_api.purge_mappings(purge_filter)
  860         hints = None
  861         users = PROVIDERS.identity_api.driver.list_users(hints)
  862         for user in users:
  863             local_entity = {
  864                 'domain_id': CONF.identity.default_domain_id,
  865                 'local_id': user['id'],
  866                 'entity_type': identity_mapping.EntityType.USER}
  867             self.assertIsNone(
  868                 PROVIDERS.id_mapping_api.get_public_id(local_entity)
  869             )
  870 
  871         # backends are loaded again in the command handler
  872         provider_api.ProviderAPIs._clear_registry_instances()
  873         cli.MappingPopulate.main()
  874 
  875         for user in users:
  876             local_entity = {
  877                 'domain_id': CONF.identity.default_domain_id,
  878                 'local_id': user['id'],
  879                 'entity_type': identity_mapping.EntityType.USER}
  880             self.assertIsNotNone(
  881                 PROVIDERS.id_mapping_api.get_public_id(local_entity))
  882 
  883     def test_bad_domain_name(self):
  884         CONF(args=['mapping_populate', '--domain-name', uuid.uuid4().hex],
  885              project='keystone')
  886         # backends are loaded again in the command handler
  887         provider_api.ProviderAPIs._clear_registry_instances()
  888         # NOTE: assertEqual is used on purpose. assertFalse passes with None.
  889         self.assertEqual(False, cli.MappingPopulate.main())
  890 
  891 
  892 class CliDomainConfigUploadNothing(unit.BaseTestCase):
  893 
  894     def setUp(self):
  895         super(CliDomainConfigUploadNothing, self).setUp()
  896 
  897         config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
  898         config_fixture.register_cli_opt(cli.command_opt)
  899 
  900         # NOTE(dstanek): since this is not testing any database
  901         # functionality there is no need to go through the motions and
  902         # setup a test database.
  903         def fake_load_backends(self):
  904             self.resource_manager = mock.Mock()
  905         self.useFixture(fixtures.MockPatchObject(
  906             cli.DomainConfigUploadFiles, 'load_backends', fake_load_backends))
  907 
  908         tempdir = self.useFixture(fixtures.TempDir())
  909         config_fixture.config(group='identity', domain_config_dir=tempdir.path)
  910 
  911         self.logging = self.useFixture(
  912             fixtures.FakeLogger(level=logging.DEBUG))
  913 
  914     def test_uploading_all_from_an_empty_directory(self):
  915         CONF(args=['domain_config_upload', '--all'], project='keystone',
  916              default_config_files=[])
  917         cli.DomainConfigUpload.main()
  918 
  919         expected_msg = ('No domain configs uploaded from %r' %
  920                         CONF.identity.domain_config_dir)
  921         self.assertThat(self.logging.output,
  922                         matchers.Contains(expected_msg))
  923 
  924 
  925 class CachingDoctorTests(unit.TestCase):
  926 
  927     def test_symptom_caching_disabled(self):
  928         # Symptom Detected: Caching disabled
  929         self.config_fixture.config(group='cache', enabled=False)
  930         self.assertTrue(caching.symptom_caching_disabled())
  931 
  932         # No Symptom Detected: Caching is enabled
  933         self.config_fixture.config(group='cache', enabled=True)
  934         self.assertFalse(caching.symptom_caching_disabled())
  935 
  936     def test_caching_symptom_caching_enabled_without_a_backend(self):
  937         # Success Case: Caching enabled and backend configured
  938         self.config_fixture.config(group='cache', enabled=True)
  939         self.config_fixture.config(group='cache', backend='dogpile.cache.null')
  940         self.assertTrue(caching.symptom_caching_enabled_without_a_backend())
  941 
  942         # Failure Case 1: Caching disabled and backend not configured
  943         self.config_fixture.config(group='cache', enabled=False)
  944         self.config_fixture.config(group='cache', backend='dogpile.cache.null')
  945         self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
  946 
  947         # Failure Case 2: Caching disabled and backend configured
  948         self.config_fixture.config(group='cache', enabled=False)
  949         self.config_fixture.config(group='cache',
  950                                    backend='dogpile.cache.memory')
  951         self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
  952 
  953         # Failure Case 3: Caching enabled and backend configured
  954         self.config_fixture.config(group='cache', enabled=True)
  955         self.config_fixture.config(group='cache',
  956                                    backend='dogpile.cache.memory')
  957         self.assertFalse(caching.symptom_caching_enabled_without_a_backend())
  958 
  959     @mock.patch('keystone.cmd.doctor.caching.cache.CACHE_REGION')
  960     def test_symptom_connection_to_memcached(self, cache_mock):
  961         self.config_fixture.config(group='cache', enabled=True)
  962         self.config_fixture.config(
  963             group='cache',
  964             memcache_servers=['alpha.com:11211', 'beta.com:11211']
  965         )
  966         self.config_fixture.config(
  967             group='cache', backend='dogpile.cache.memcached'
  968         )
  969 
  970         # No symptom detected: Caching driver can connect to both memcached
  971         # servers
  972         cache_mock.actual_backend.client.get_stats.return_value = (
  973             [('alpha.com', {}), ('beta.com', {})]
  974         )
  975         self.assertFalse(caching.symptom_connection_to_memcached())
  976 
  977         # Symptom detected: Caching driver can't connect to either memcached
  978         # server
  979         cache_mock.actual_backend.client.get_stats.return_value = []
  980         self.assertTrue(caching.symptom_connection_to_memcached())
  981 
  982         # Symptom detected: Caching driver can't connect to one memcached
  983         # server
  984         cache_mock.actual_backend.client.get_stats.return_value = [
  985             ('alpha.com', {})
  986         ]
  987         self.assertTrue(caching.symptom_connection_to_memcached())
  988 
  989         self.config_fixture.config(
  990             group='cache',
  991             memcache_servers=['alpha.com:11211', 'beta.com:11211']
  992         )
  993         self.config_fixture.config(
  994             group='cache', backend='oslo_cache.memcache_pool'
  995         )
  996 
  997         # No symptom detected: Caching driver can connect to both memcached
  998         # servers
  999         cache_mock.actual_backend.client.get_stats.return_value = (
 1000             [('alpha.com', {}), ('beta.com', {})]
 1001         )
 1002         self.assertFalse(caching.symptom_connection_to_memcached())
 1003 
 1004         # Symptom detected: Caching driver can't connect to either memcached
 1005         # server
 1006         cache_mock.actual_backend.client.get_stats.return_value = []
 1007         self.assertTrue(caching.symptom_connection_to_memcached())
 1008 
 1009         # Symptom detected: Caching driver can't connect to one memcached
 1010         # server
 1011         cache_mock.actual_backend.client.get_stats.return_value = [
 1012             ('alpha.com', {})
 1013         ]
 1014         self.assertTrue(caching.symptom_connection_to_memcached())
 1015 
 1016 
 1017 class CredentialDoctorTests(unit.TestCase):
 1018 
 1019     def test_credential_and_fernet_key_repositories_match(self):
 1020         # Symptom Detected: Key repository paths are not unique
 1021         directory = self.useFixture(fixtures.TempDir()).path
 1022         self.config_fixture.config(group='credential',
 1023                                    key_repository=directory)
 1024         self.config_fixture.config(group='fernet_tokens',
 1025                                    key_repository=directory)
 1026         self.assertTrue(credential.symptom_unique_key_repositories())
 1027 
 1028     def test_credential_and_fernet_key_repositories_are_unique(self):
 1029         # No Symptom Detected: Key repository paths are unique
 1030         self.config_fixture.config(group='credential',
 1031                                    key_repository='/etc/keystone/cred-repo')
 1032         self.config_fixture.config(group='fernet_tokens',
 1033                                    key_repository='/etc/keystone/fernet-repo')
 1034         self.assertFalse(credential.symptom_unique_key_repositories())
 1035 
 1036     @mock.patch('keystone.cmd.doctor.credential.utils')
 1037     def test_usability_of_cred_fernet_key_repo_raised(self, mock_utils):
 1038         # Symptom Detected: credential fernet key repository is world readable
 1039         self.config_fixture.config(group='credential', provider='fernet')
 1040         mock_utils.FernetUtils().validate_key_repository.return_value = False
 1041         self.assertTrue(
 1042             credential.symptom_usability_of_credential_fernet_key_repository())
 1043 
 1044     @mock.patch('keystone.cmd.doctor.credential.utils')
 1045     def test_usability_of_cred_fernet_key_repo_not_raised(self, mock_utils):
 1046         # No Symptom Detected: Custom driver is used
 1047         self.config_fixture.config(group='credential', provider='my-driver')
 1048         mock_utils.FernetUtils().validate_key_repository.return_value = True
 1049         self.assertFalse(
 1050             credential.symptom_usability_of_credential_fernet_key_repository())
 1051 
 1052         # No Symptom Detected: key repository is not world readable
 1053         self.config_fixture.config(group='credential', provider='fernet')
 1054         mock_utils.FernetUtils().validate_key_repository.return_value = True
 1055         self.assertFalse(
 1056             credential.symptom_usability_of_credential_fernet_key_repository())
 1057 
 1058     @mock.patch('keystone.cmd.doctor.credential.utils')
 1059     def test_keys_in_credential_fernet_key_repository_raised(self, mock_utils):
 1060         # Symptom Detected: Key repo is empty
 1061         self.config_fixture.config(group='credential', provider='fernet')
 1062         mock_utils.FernetUtils().load_keys.return_value = False
 1063         self.assertTrue(
 1064             credential.symptom_keys_in_credential_fernet_key_repository())
 1065 
 1066     @mock.patch('keystone.cmd.doctor.credential.utils')
 1067     def test_keys_in_credential_fernet_key_repository_not_raised(
 1068             self, mock_utils):
 1069         # No Symptom Detected: Custom driver is used
 1070         self.config_fixture.config(group='credential', provider='my-driver')
 1071         mock_utils.FernetUtils().load_keys.return_value = True
 1072         self.assertFalse(
 1073             credential.symptom_keys_in_credential_fernet_key_repository())
 1074 
 1075         # No Symptom Detected: Key repo is not empty, fernet is current driver
 1076         self.config_fixture.config(group='credential', provider='fernet')
 1077         mock_utils.FernetUtils().load_keys.return_value = True
 1078         self.assertFalse(
 1079             credential.symptom_keys_in_credential_fernet_key_repository())
 1080 
 1081 
 1082 class DatabaseDoctorTests(unit.TestCase):
 1083 
 1084     def test_symptom_is_raised_if_database_connection_is_SQLite(self):
 1085         # Symptom Detected: Database connection is sqlite
 1086         self.config_fixture.config(
 1087             group='database',
 1088             connection='sqlite:///mydb')
 1089         self.assertTrue(
 1090             doc_database.symptom_database_connection_is_not_SQLite())
 1091 
 1092         # No Symptom Detected: Database connection is MySQL
 1093         self.config_fixture.config(
 1094             group='database',
 1095             connection='mysql+mysqlconnector://admin:secret@localhost/mydb')
 1096         self.assertFalse(
 1097             doc_database.symptom_database_connection_is_not_SQLite())
 1098 
 1099 
 1100 class DebugDoctorTests(unit.TestCase):
 1101 
 1102     def test_symptom_debug_mode_is_enabled(self):
 1103         # Symptom Detected: Debug mode is enabled
 1104         self.config_fixture.config(debug=True)
 1105         self.assertTrue(debug.symptom_debug_mode_is_enabled())
 1106 
 1107         # No Symptom Detected: Debug mode is disabled
 1108         self.config_fixture.config(debug=False)
 1109         self.assertFalse(debug.symptom_debug_mode_is_enabled())
 1110 
 1111 
 1112 class FederationDoctorTests(unit.TestCase):
 1113 
 1114     def test_symptom_comma_in_SAML_public_certificate_path(self):
 1115         # Symptom Detected: There is a comma in path to public cert file
 1116         self.config_fixture.config(group='saml', certfile='file,cert.pem')
 1117         self.assertTrue(
 1118             federation.symptom_comma_in_SAML_public_certificate_path())
 1119 
 1120         # No Symptom Detected: There is no comma in the path
 1121         self.config_fixture.config(group='saml', certfile='signing_cert.pem')
 1122         self.assertFalse(
 1123             federation.symptom_comma_in_SAML_public_certificate_path())
 1124 
 1125     def test_symptom_comma_in_SAML_private_key_file_path(self):
 1126         # Symptom Detected: There is a comma in path to private key file
 1127         self.config_fixture.config(group='saml', keyfile='file,key.pem')
 1128         self.assertTrue(
 1129             federation.symptom_comma_in_SAML_private_key_file_path())
 1130 
 1131         # No Symptom Detected: There is no comma in the path
 1132         self.config_fixture.config(group='saml', keyfile='signing_key.pem')
 1133         self.assertFalse(
 1134             federation.symptom_comma_in_SAML_private_key_file_path())
 1135 
 1136 
 1137 class LdapDoctorTests(unit.TestCase):
 1138 
 1139     def test_user_enabled_emulation_dn_ignored_raised(self):
 1140         # Symptom when user_enabled_emulation_dn is being ignored because the
 1141         # user did not enable the user_enabled_emulation
 1142         self.config_fixture.config(group='ldap', user_enabled_emulation=False)
 1143         self.config_fixture.config(
 1144             group='ldap',
 1145             user_enabled_emulation_dn='cn=enabled_users,dc=example,dc=com')
 1146         self.assertTrue(
 1147             ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
 1148 
 1149     def test_user_enabled_emulation_dn_ignored_not_raised(self):
 1150         # No symptom when configuration set properly
 1151         self.config_fixture.config(group='ldap', user_enabled_emulation=True)
 1152         self.config_fixture.config(
 1153             group='ldap',
 1154             user_enabled_emulation_dn='cn=enabled_users,dc=example,dc=com')
 1155         self.assertFalse(
 1156             ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
 1157         # No symptom when both configurations disabled
 1158         self.config_fixture.config(group='ldap', user_enabled_emulation=False)
 1159         self.config_fixture.config(group='ldap',
 1160                                    user_enabled_emulation_dn=None)
 1161         self.assertFalse(
 1162             ldap.symptom_LDAP_user_enabled_emulation_dn_ignored())
 1163 
 1164     def test_user_enabled_emulation_use_group_config_ignored_raised(self):
 1165         # Symptom when user enabled emulation isn't enabled but group_config is
 1166         # enabled
 1167         self.config_fixture.config(group='ldap', user_enabled_emulation=False)
 1168         self.config_fixture.config(
 1169             group='ldap',
 1170             user_enabled_emulation_use_group_config=True)
 1171         self.assertTrue(
 1172             ldap.
 1173             symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
 1174 
 1175     def test_user_enabled_emulation_use_group_config_ignored_not_raised(self):
 1176         # No symptom when configuration deactivated
 1177         self.config_fixture.config(group='ldap', user_enabled_emulation=False)
 1178         self.config_fixture.config(
 1179             group='ldap',
 1180             user_enabled_emulation_use_group_config=False)
 1181         self.assertFalse(
 1182             ldap.
 1183             symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
 1184         # No symptom when configurations set properly
 1185         self.config_fixture.config(group='ldap', user_enabled_emulation=True)
 1186         self.config_fixture.config(
 1187             group='ldap',
 1188             user_enabled_emulation_use_group_config=True)
 1189         self.assertFalse(
 1190             ldap.
 1191             symptom_LDAP_user_enabled_emulation_use_group_config_ignored())
 1192 
 1193     def test_group_members_are_ids_disabled_raised(self):
 1194         # Symptom when objectclass is set to posixGroup but members_are_ids are
 1195         # not enabled
 1196         self.config_fixture.config(group='ldap',
 1197                                    group_objectclass='posixGroup')
 1198         self.config_fixture.config(group='ldap',
 1199                                    group_members_are_ids=False)
 1200         self.assertTrue(ldap.symptom_LDAP_group_members_are_ids_disabled())
 1201 
 1202     def test_group_members_are_ids_disabled_not_raised(self):
 1203         # No symptom when the configurations are set properly
 1204         self.config_fixture.config(group='ldap',
 1205                                    group_objectclass='posixGroup')
 1206         self.config_fixture.config(group='ldap',
 1207                                    group_members_are_ids=True)
 1208         self.assertFalse(ldap.symptom_LDAP_group_members_are_ids_disabled())
 1209         # No symptom when configuration deactivated
 1210         self.config_fixture.config(group='ldap',
 1211                                    group_objectclass='groupOfNames')
 1212         self.config_fixture.config(group='ldap',
 1213                                    group_members_are_ids=False)
 1214         self.assertFalse(ldap.symptom_LDAP_group_members_are_ids_disabled())
 1215 
 1216     @mock.patch('os.listdir')
 1217     @mock.patch('os.path.isdir')
 1218     def test_file_based_domain_specific_configs_raised(self, mocked_isdir,
 1219                                                        mocked_listdir):
 1220         self.config_fixture.config(
 1221             group='identity',
 1222             domain_specific_drivers_enabled=True)
 1223         self.config_fixture.config(
 1224             group='identity',
 1225             domain_configurations_from_database=False)
 1226 
 1227         # Symptom if there is no existing directory
 1228         mocked_isdir.return_value = False
 1229         self.assertTrue(ldap.symptom_LDAP_file_based_domain_specific_configs())
 1230 
 1231         # Symptom if there is an invalid filename inside the domain directory
 1232         mocked_isdir.return_value = True
 1233         mocked_listdir.return_value = ['openstack.domains.conf']
 1234         self.assertTrue(ldap.symptom_LDAP_file_based_domain_specific_configs())
 1235 
 1236     @mock.patch('os.listdir')
 1237     @mock.patch('os.path.isdir')
 1238     def test_file_based_domain_specific_configs_not_raised(self, mocked_isdir,
 1239                                                            mocked_listdir):
 1240         # No symptom if both configurations deactivated
 1241         self.config_fixture.config(
 1242             group='identity',
 1243             domain_specific_drivers_enabled=False)
 1244         self.config_fixture.config(
 1245             group='identity',
 1246             domain_configurations_from_database=False)
 1247         self.assertFalse(
 1248             ldap.symptom_LDAP_file_based_domain_specific_configs())
 1249 
 1250         # No symptom if directory exists with no invalid filenames
 1251         self.config_fixture.config(
 1252             group='identity',
 1253             domain_specific_drivers_enabled=True)
 1254         self.config_fixture.config(
 1255             group='identity',
 1256             domain_configurations_from_database=False)
 1257         mocked_isdir.return_value = True
 1258         mocked_listdir.return_value = ['keystone.domains.conf']
 1259         self.assertFalse(
 1260             ldap.symptom_LDAP_file_based_domain_specific_configs())
 1261 
 1262     @mock.patch('os.listdir')
 1263     @mock.patch('os.path.isdir')
 1264     @mock.patch('keystone.cmd.doctor.ldap.configparser.ConfigParser')
 1265     def test_file_based_domain_specific_configs_formatted_correctly_raised(
 1266             self, mocked_parser, mocked_isdir, mocked_listdir):
 1267         symptom = ('symptom_LDAP_file_based_domain_specific_configs'
 1268                    '_formatted_correctly')
 1269         # Symptom Detected: Ldap domain specific configuration files are not
 1270         # formatted correctly
 1271         self.config_fixture.config(
 1272             group='identity',
 1273             domain_specific_drivers_enabled=True)
 1274         self.config_fixture.config(
 1275             group='identity',
 1276             domain_configurations_from_database=False)
 1277         mocked_isdir.return_value = True
 1278 
 1279         mocked_listdir.return_value = ['keystone.domains.conf']
 1280         mock_instance = mock.MagicMock()
 1281         mock_instance.read.side_effect = configparser.Error('No Section')
 1282         mocked_parser.return_value = mock_instance
 1283 
 1284         self.assertTrue(getattr(ldap, symptom)())
 1285 
 1286     @mock.patch('os.listdir')
 1287     @mock.patch('os.path.isdir')
 1288     def test_file_based_domain_specific_configs_formatted_correctly_not_raised(
 1289             self, mocked_isdir, mocked_listdir):
 1290         symptom = ('symptom_LDAP_file_based_domain_specific_configs'
 1291                    '_formatted_correctly')
 1292         # No Symptom Detected: Domain_specific drivers is not enabled
 1293         self.config_fixture.config(
 1294             group='identity',
 1295             domain_specific_drivers_enabled=False)
 1296         self.assertFalse(getattr(ldap, symptom)())
 1297 
 1298         # No Symptom Detected: Domain configuration from database is enabled
 1299         self.config_fixture.config(
 1300             group='identity',
 1301             domain_specific_drivers_enabled=True)
 1302         self.assertFalse(getattr(ldap, symptom)())
 1303         self.config_fixture.config(
 1304             group='identity',
 1305             domain_configurations_from_database=True)
 1306         self.assertFalse(getattr(ldap, symptom)())
 1307 
 1308         # No Symptom Detected: The directory in domain_config_dir doesn't exist
 1309         mocked_isdir.return_value = False
 1310         self.assertFalse(getattr(ldap, symptom)())
 1311 
 1312         # No Symptom Detected: domain specific drivers are enabled, domain
 1313         # configurations from database are disabled, directory exists, and no
 1314         # exceptions found.
 1315         self.config_fixture.config(
 1316             group='identity',
 1317             domain_configurations_from_database=False)
 1318         mocked_isdir.return_value = True
 1319         # An empty directory should not raise this symptom
 1320         self.assertFalse(getattr(ldap, symptom)())
 1321 
 1322         # Test again with a file inside the directory
 1323         mocked_listdir.return_value = ['keystone.domains.conf']
 1324         self.assertFalse(getattr(ldap, symptom)())
 1325 
 1326 
 1327 class SecurityComplianceDoctorTests(unit.TestCase):
 1328 
 1329     def test_minimum_password_age_greater_than_password_expires_days(self):
 1330         # Symptom Detected: Minimum password age is greater than the password
 1331         # expires days. Both values are positive integers greater than zero.
 1332         self.config_fixture.config(group='security_compliance',
 1333                                    minimum_password_age=2)
 1334         self.config_fixture.config(group='security_compliance',
 1335                                    password_expires_days=1)
 1336         self.assertTrue(
 1337             security_compliance.
 1338             symptom_minimum_password_age_greater_than_expires_days())
 1339 
 1340     def test_minimum_password_age_equal_to_password_expires_days(self):
 1341         # Symptom Detected: Minimum password age is equal to the password
 1342         # expires days. Both values are positive integers greater than zero.
 1343         self.config_fixture.config(group='security_compliance',
 1344                                    minimum_password_age=1)
 1345         self.config_fixture.config(group='security_compliance',
 1346                                    password_expires_days=1)
 1347         self.assertTrue(
 1348             security_compliance.
 1349             symptom_minimum_password_age_greater_than_expires_days())
 1350 
 1351     def test_minimum_password_age_less_than_password_expires_days(self):
 1352         # No Symptom Detected: Minimum password age is less than password
 1353         # expires days. Both values are positive integers greater than zero.
 1354         self.config_fixture.config(group='security_compliance',
 1355                                    minimum_password_age=1)
 1356         self.config_fixture.config(group='security_compliance',
 1357                                    password_expires_days=2)
 1358         self.assertFalse(
 1359             security_compliance.
 1360             symptom_minimum_password_age_greater_than_expires_days())
 1361 
 1362     def test_minimum_password_age_and_password_expires_days_deactivated(self):
 1363         # No Symptom Detected: when minimum_password_age's default value is 0
 1364         # and password_expires_days' default value is None
 1365         self.assertFalse(
 1366             security_compliance.
 1367             symptom_minimum_password_age_greater_than_expires_days())
 1368 
 1369     def test_invalid_password_regular_expression(self):
 1370         # Symptom Detected: Regular expression is invalid
 1371         self.config_fixture.config(
 1372             group='security_compliance',
 1373             password_regex=r'^^(??=.*\d)$')
 1374         self.assertTrue(
 1375             security_compliance.symptom_invalid_password_regular_expression())
 1376 
 1377     def test_valid_password_regular_expression(self):
 1378         # No Symptom Detected: Regular expression is valid
 1379         self.config_fixture.config(
 1380             group='security_compliance',
 1381             password_regex=r'^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
 1382         self.assertFalse(
 1383             security_compliance.symptom_invalid_password_regular_expression())
 1384 
 1385     def test_password_regular_expression_deactivated(self):
 1386         # No Symptom Detected: Regular expression deactivated to None
 1387         self.config_fixture.config(
 1388             group='security_compliance',
 1389             password_regex=None)
 1390         self.assertFalse(
 1391             security_compliance.symptom_invalid_password_regular_expression())
 1392 
 1393     def test_password_regular_expression_description_not_set(self):
 1394         # Symptom Detected: Regular expression is set but description is not
 1395         self.config_fixture.config(
 1396             group='security_compliance',
 1397             password_regex=r'^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
 1398         self.config_fixture.config(
 1399             group='security_compliance',
 1400             password_regex_description=None)
 1401         self.assertTrue(
 1402             security_compliance.
 1403             symptom_password_regular_expression_description_not_set())
 1404 
 1405     def test_password_regular_expression_description_set(self):
 1406         # No Symptom Detected: Regular expression and description are set
 1407         desc = '1 letter, 1 digit, and a minimum length of 7 is required'
 1408         self.config_fixture.config(
 1409             group='security_compliance',
 1410             password_regex=r'^(?=.*\d)(?=.*[a-zA-Z]).{7,}$')
 1411         self.config_fixture.config(
 1412             group='security_compliance',
 1413             password_regex_description=desc)
 1414         self.assertFalse(
 1415             security_compliance.
 1416             symptom_password_regular_expression_description_not_set())
 1417 
 1418     def test_password_regular_expression_description_deactivated(self):
 1419         # No Symptom Detected: Regular expression and description are
 1420         # deactivated to None
 1421         self.config_fixture.config(
 1422             group='security_compliance', password_regex=None)
 1423         self.config_fixture.config(
 1424             group='security_compliance', password_regex_description=None)
 1425         self.assertFalse(
 1426             security_compliance.
 1427             symptom_password_regular_expression_description_not_set())
 1428 
 1429 
 1430 class TokensDoctorTests(unit.TestCase):
 1431 
 1432     def test_unreasonable_max_token_size_raised(self):
 1433         # Symptom Detected: the max_token_size for fernet is greater than 255
 1434         self.config_fixture.config(group='token', provider='fernet')
 1435         self.config_fixture.config(max_token_size=256)
 1436         self.assertTrue(tokens.symptom_unreasonable_max_token_size())
 1437 
 1438     def test_unreasonable_max_token_size_not_raised(self):
 1439         # No Symptom Detected: the max_token_size for uuid is 32
 1440         self.config_fixture.config(group='token', provider='uuid')
 1441         self.config_fixture.config(max_token_size=32)
 1442         self.assertFalse(tokens.symptom_unreasonable_max_token_size())
 1443 
 1444         # No Symptom Detected: the max_token_size for fernet is 255 or less
 1445         self.config_fixture.config(group='token', provider='fernet')
 1446         self.config_fixture.config(max_token_size=255)
 1447         self.assertFalse(tokens.symptom_unreasonable_max_token_size())
 1448 
 1449 
 1450 class TokenFernetDoctorTests(unit.TestCase):
 1451 
 1452     @mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
 1453     def test_usability_of_Fernet_key_repository_raised(self, mock_utils):
 1454         # Symptom Detected: Fernet key repo is world readable
 1455         self.config_fixture.config(group='token', provider='fernet')
 1456         mock_utils.FernetUtils().validate_key_repository.return_value = False
 1457         self.assertTrue(
 1458             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1459 
 1460     @mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
 1461     def test_usability_of_Fernet_key_repository_not_raised(self, mock_utils):
 1462         # No Symptom Detected: UUID is used instead of fernet
 1463         self.config_fixture.config(group='token', provider='uuid')
 1464         mock_utils.FernetUtils().validate_key_repository.return_value = False
 1465         self.assertFalse(
 1466             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1467 
 1468         # No Symptom Detected: configs set properly, key repo is not world
 1469         # readable but is user readable
 1470         self.config_fixture.config(group='token', provider='fernet')
 1471         mock_utils.FernetUtils().validate_key_repository.return_value = True
 1472         self.assertFalse(
 1473             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1474 
 1475     @mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
 1476     def test_keys_in_Fernet_key_repository_raised(self, mock_utils):
 1477         # Symptom Detected: Fernet key repository is empty
 1478         self.config_fixture.config(group='token', provider='fernet')
 1479         mock_utils.FernetUtils().load_keys.return_value = False
 1480         self.assertTrue(
 1481             tokens_fernet.symptom_keys_in_Fernet_key_repository())
 1482 
 1483     @mock.patch('keystone.cmd.doctor.tokens_fernet.utils')
 1484     def test_keys_in_Fernet_key_repository_not_raised(self, mock_utils):
 1485         # No Symptom Detected: UUID is used instead of fernet
 1486         self.config_fixture.config(group='token', provider='uuid')
 1487         mock_utils.FernetUtils().load_keys.return_value = True
 1488         self.assertFalse(
 1489             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1490 
 1491         # No Symptom Detected: configs set properly, key repo has been
 1492         # populated with keys
 1493         self.config_fixture.config(group='token', provider='fernet')
 1494         mock_utils.FernetUtils().load_keys.return_value = True
 1495         self.assertFalse(
 1496             tokens_fernet.symptom_usability_of_Fernet_key_repository())
 1497 
 1498 
 1499 class TestMappingPurge(unit.SQLDriverOverrides, unit.BaseTestCase):
 1500 
 1501     class FakeConfCommand(object):
 1502         def __init__(self, parent):
 1503             self.extension = False
 1504             self.all = parent.command_all
 1505             self.type = parent.command_type
 1506             self.domain_name = parent.command_domain_name
 1507             self.local_id = parent.command_local_id
 1508             self.public_id = parent.command_public_id
 1509 
 1510     def setUp(self):
 1511         # Set up preset cli options and a parser
 1512         super(TestMappingPurge, self).setUp()
 1513         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
 1514         self.config_fixture.register_cli_opt(cli.command_opt)
 1515         # For unit tests that should not throw any erorrs,
 1516         # Use the argument parser to test that the combinations work
 1517         parser_test = argparse.ArgumentParser()
 1518         subparsers = parser_test.add_subparsers()
 1519         self.parser = cli.MappingPurge.add_argument_parser(subparsers)
 1520 
 1521     def test_mapping_purge_with_no_arguments_fails(self):
 1522         # Make sure the logic in main() actually catches no argument error
 1523         self.command_type = None
 1524         self.command_all = False
 1525         self.command_domain_name = None
 1526         self.command_local_id = None
 1527         self.command_public_id = None
 1528         self.useFixture(fixtures.MockPatchObject(
 1529             CONF, 'command', self.FakeConfCommand(self)))
 1530         self.assertRaises(ValueError, cli.MappingPurge.main)
 1531 
 1532     def test_mapping_purge_with_all_and_other_argument_fails(self):
 1533         # Make sure the logic in main() actually catches invalid combinations
 1534         self.command_type = 'user'
 1535         self.command_all = True
 1536         self.command_domain_name = None
 1537         self.command_local_id = None
 1538         self.command_public_id = None
 1539         self.useFixture(fixtures.MockPatchObject(
 1540             CONF, 'command', self.FakeConfCommand(self)))
 1541         self.assertRaises(ValueError, cli.MappingPurge.main)
 1542 
 1543     def test_mapping_purge_with_only_all_passes(self):
 1544         args = (['--all'])
 1545         res = self.parser.parse_args(args)
 1546         self.assertTrue(vars(res)['all'])
 1547 
 1548     def test_mapping_purge_with_domain_name_argument_succeeds(self):
 1549         args = (['--domain-name', uuid.uuid4().hex])
 1550         self.parser.parse_args(args)
 1551 
 1552     def test_mapping_purge_with_public_id_argument_succeeds(self):
 1553         args = (['--public-id', uuid.uuid4().hex])
 1554         self.parser.parse_args(args)
 1555 
 1556     def test_mapping_purge_with_local_id_argument_succeeds(self):
 1557         args = (['--local-id', uuid.uuid4().hex])
 1558         self.parser.parse_args(args)
 1559 
 1560     def test_mapping_purge_with_type_argument_succeeds(self):
 1561         args = (['--type', 'user'])
 1562         self.parser.parse_args(args)
 1563         args = (['--type', 'group'])
 1564         self.parser.parse_args(args)
 1565 
 1566     def test_mapping_purge_with_invalid_argument_fails(self):
 1567         args = (['--invalid-option', 'some value'])
 1568         self.assertRaises(unit.UnexpectedExit, self.parser.parse_args, args)
 1569 
 1570     def test_mapping_purge_with_all_other_combinations_passes(self):
 1571         args = (['--type', 'user', '--local-id', uuid.uuid4().hex])
 1572         self.parser.parse_args(args)
 1573         args.append('--domain-name')
 1574         args.append('test')
 1575         self.parser.parse_args(args)
 1576         args.append('--public-id')
 1577         args.append(uuid.uuid4().hex)
 1578         self.parser.parse_args(args)
 1579 
 1580     @mock.patch.object(keystone.identity.MappingManager, 'purge_mappings')
 1581     def test_mapping_purge_type_user(self, purge_mock):
 1582         # Make sure the logic in main() actually catches no argument error
 1583         self.command_type = 'user'
 1584         self.command_all = False
 1585         self.command_domain_name = None
 1586         self.command_local_id = uuid.uuid4().hex
 1587         self.command_public_id = uuid.uuid4().hex
 1588         self.useFixture(fixtures.MockPatchObject(
 1589             CONF, 'command', self.FakeConfCommand(self)))
 1590 
 1591         def fake_load_backends():
 1592             return dict(
 1593                 id_mapping_api=keystone.identity.core.MappingManager,
 1594                 resource_api=None)
 1595 
 1596         self.useFixture(fixtures.MockPatch(
 1597             'keystone.server.backends.load_backends',
 1598             side_effect=fake_load_backends))
 1599 
 1600         cli.MappingPurge.main()
 1601         purge_mock.assert_called_with({'entity_type': 'user',
 1602                                        'local_id': self.command_local_id,
 1603                                        'public_id': self.command_public_id})
 1604 
 1605 
 1606 class TestUserMappingPurgeFunctional(unit.SQLDriverOverrides, unit.TestCase):
 1607 
 1608     def setUp(self):
 1609         sqldb = self.useFixture(database.Database())
 1610         super(TestUserMappingPurgeFunctional, self).setUp()
 1611         self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
 1612         self.ldapdb.clear()
 1613 
 1614         self.load_backends()
 1615 
 1616         sqldb.recreate()
 1617         self.load_fixtures(default_fixtures)
 1618 
 1619     def config_files(self):
 1620         self.config_fixture.register_cli_opt(cli.command_opt)
 1621         config_files = super(
 1622             TestUserMappingPurgeFunctional, self
 1623         ).config_files()
 1624         config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
 1625         return config_files
 1626 
 1627     def config_overrides(self):
 1628         super(TestUserMappingPurgeFunctional, self).config_overrides()
 1629         self.config_fixture.config(group='identity', driver='ldap')
 1630         self.config_fixture.config(group='identity_mapping',
 1631                                    backward_compatible_ids=False)
 1632 
 1633     def config(self, config_files):
 1634         CONF(args=['mapping_purge', '--type', 'user'],
 1635              project='keystone',
 1636              default_config_files=config_files)
 1637 
 1638     def test_purge_by_user_type(self):
 1639         # Grab the list of the users from the backend directly to avoid
 1640         # populating the public_ids for each user. We do this so we can grab
 1641         # the local_id of a user before it's overwritten by the public_id.
 1642         hints = None
 1643         users = PROVIDERS.identity_api.driver.list_users(hints)
 1644 
 1645         # Create a new group in the backend directly. We do this so that we
 1646         # have control over the local_id, which is `id` here. After creating
 1647         # the group, let's list them so the id_mapping_api creates the public
 1648         # id appropriately.
 1649         group_ref = {
 1650             'id': uuid.uuid4().hex,
 1651             'name': uuid.uuid4().hex,
 1652             'domain_id': CONF.identity.default_domain_id
 1653         }
 1654         PROVIDERS.identity_api.driver.create_group(group_ref['id'], group_ref)
 1655         PROVIDERS.identity_api.list_groups()
 1656 
 1657         # Make sure all users and groups have public ids by querying the
 1658         # id_mapping_api.
 1659         for user in users:
 1660             local_entity = {
 1661                 'domain_id': CONF.identity.default_domain_id,
 1662                 'local_id': user['id'],
 1663                 'entity_type': identity_mapping.EntityType.USER}
 1664             self.assertIsNotNone(
 1665                 PROVIDERS.id_mapping_api.get_public_id(local_entity))
 1666 
 1667         group_entity = {
 1668             'domain_id': CONF.identity.default_domain_id,
 1669             'local_id': group_ref['id'],
 1670             'entity_type': identity_mapping.EntityType.GROUP}
 1671         self.assertIsNotNone(
 1672             PROVIDERS.id_mapping_api.get_public_id(group_entity)
 1673         )
 1674 
 1675         # Purge all users mappings
 1676         provider_api.ProviderAPIs._clear_registry_instances()
 1677         cli.MappingPurge.main()
 1678 
 1679         # Check that all the user mappings were purged
 1680         for user in users:
 1681             local_entity = {
 1682                 'domain_id': CONF.identity.default_domain_id,
 1683                 'local_id': user['id'],
 1684                 'entity_type': identity_mapping.EntityType.USER}
 1685             self.assertIsNone(
 1686                 PROVIDERS.id_mapping_api.get_public_id(local_entity)
 1687             )
 1688 
 1689         # Make sure the group mapping still exists
 1690         self.assertIsNotNone(
 1691             PROVIDERS.id_mapping_api.get_public_id(group_entity)
 1692         )
 1693 
 1694 
 1695 class TestGroupMappingPurgeFunctional(unit.SQLDriverOverrides, unit.TestCase):
 1696 
 1697     def setUp(self):
 1698         sqldb = self.useFixture(database.Database())
 1699         super(TestGroupMappingPurgeFunctional, self).setUp()
 1700         self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
 1701         self.ldapdb.clear()
 1702 
 1703         self.load_backends()
 1704 
 1705         sqldb.recreate()
 1706         self.load_fixtures(default_fixtures)
 1707 
 1708     def config_files(self):
 1709         self.config_fixture.register_cli_opt(cli.command_opt)
 1710         config_files = super(
 1711             TestGroupMappingPurgeFunctional, self
 1712         ).config_files()
 1713         config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
 1714         return config_files
 1715 
 1716     def config_overrides(self):
 1717         super(TestGroupMappingPurgeFunctional, self).config_overrides()
 1718         self.config_fixture.config(group='identity', driver='ldap')
 1719         self.config_fixture.config(group='identity_mapping',
 1720                                    backward_compatible_ids=False)
 1721 
 1722     def config(self, config_files):
 1723         CONF(args=['mapping_purge', '--type', 'group'],
 1724              project='keystone',
 1725              default_config_files=config_files)
 1726 
 1727     def test_purge_by_group_type(self):
 1728         # Grab the list of the users from the backend directly to avoid
 1729         # populating the public_ids for each user. We do this so we can grab
 1730         # the local_id of a user before it's overwritten by the public_id.
 1731         hints = None
 1732         users = PROVIDERS.identity_api.driver.list_users(hints)
 1733 
 1734         # Create a new group in the backend directly. We do this so that we
 1735         # have control over the local_id, which is `id` here. After creating
 1736         # the group, let's list them so the id_mapping_api creates the public
 1737         # id appropriately.
 1738         group_ref = {
 1739             'id': uuid.uuid4().hex,
 1740             'name': uuid.uuid4().hex,
 1741             'domain_id': CONF.identity.default_domain_id
 1742         }
 1743         PROVIDERS.identity_api.driver.create_group(group_ref['id'], group_ref)
 1744         PROVIDERS.identity_api.list_groups()
 1745 
 1746         # Make sure all users and groups have public ids by querying the
 1747         # id_mapping_api.
 1748         for user in users:
 1749             local_entity = {
 1750                 'domain_id': CONF.identity.default_domain_id,
 1751                 'local_id': user['id'],
 1752                 'entity_type': identity_mapping.EntityType.USER}
 1753             self.assertIsNotNone(
 1754                 PROVIDERS.id_mapping_api.get_public_id(local_entity))
 1755 
 1756         group_entity = {
 1757             'domain_id': CONF.identity.default_domain_id,
 1758             'local_id': group_ref['id'],
 1759             'entity_type': identity_mapping.EntityType.GROUP}
 1760         self.assertIsNotNone(
 1761             PROVIDERS.id_mapping_api.get_public_id(group_entity)
 1762         )
 1763 
 1764         # Purge group mappings
 1765         provider_api.ProviderAPIs._clear_registry_instances()
 1766         cli.MappingPurge.main()
 1767 
 1768         # Make sure the group mapping was purged
 1769         self.assertIsNone(
 1770             PROVIDERS.id_mapping_api.get_public_id(group_entity)
 1771         )
 1772 
 1773         # Check that all the user mappings still exist
 1774         for user in users:
 1775             local_entity = {
 1776                 'domain_id': CONF.identity.default_domain_id,
 1777                 'local_id': user['id'],
 1778                 'entity_type': identity_mapping.EntityType.USER}
 1779             self.assertIsNotNone(
 1780                 PROVIDERS.id_mapping_api.get_public_id(local_entity)
 1781             )
 1782 
 1783 
 1784 class TestTrustFlush(unit.SQLDriverOverrides, unit.BaseTestCase):
 1785 
 1786     class FakeConfCommand(object):
 1787         def __init__(self, parent):
 1788             self.extension = False
 1789             self.project_id = parent.command_project_id
 1790             self.trustor_user_id = parent.command_trustor_user_id
 1791             self.trustee_user_id = parent.command_trustee_user_id
 1792             self.date = parent.command_date
 1793 
 1794     def setUp(self):
 1795         # Set up preset cli options and a parser
 1796         super(TestTrustFlush, self).setUp()
 1797         self.useFixture(database.Database())
 1798         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
 1799         self.config_fixture.register_cli_opt(cli.command_opt)
 1800         # For unit tests that should not throw any errors,
 1801         # Use the argument parser to test that the combinations work
 1802         parser_test = argparse.ArgumentParser()
 1803         subparsers = parser_test.add_subparsers()
 1804         self.parser = cli.TrustFlush.add_argument_parser(subparsers)
 1805 
 1806     def config_files(self):
 1807         config_files = super(TestTrustFlush, self).config_files()
 1808         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
 1809         return config_files
 1810 
 1811     def test_trust_flush(self):
 1812         self.command_project_id = None
 1813         self.command_trustor_user_id = None
 1814         self.command_trustee_user_id = None
 1815         self.command_date = datetime.datetime.utcnow()
 1816         self.useFixture(fixtures.MockPatchObject(
 1817             CONF, 'command', self.FakeConfCommand(self)))
 1818 
 1819         def fake_load_backends():
 1820             return dict(
 1821                 trust_api=keystone.trust.core.Manager())
 1822 
 1823         self.useFixture(fixtures.MockPatch(
 1824             'keystone.server.backends.load_backends',
 1825             side_effect=fake_load_backends))
 1826         trust = cli.TrustFlush()
 1827         trust.main()
 1828 
 1829     def test_trust_flush_with_invalid_date(self):
 1830         self.command_project_id = None
 1831         self.command_trustor_user_id = None
 1832         self.command_trustee_user_id = None
 1833         self.command_date = '4/10/92'
 1834         self.useFixture(fixtures.MockPatchObject(
 1835             CONF, 'command', self.FakeConfCommand(self)))
 1836 
 1837         def fake_load_backends():
 1838             return dict(
 1839                 trust_api=keystone.trust.core.Manager())
 1840 
 1841         self.useFixture(fixtures.MockPatch(
 1842             'keystone.server.backends.load_backends',
 1843             side_effect=fake_load_backends))
 1844         # Clear backend dependencies, since cli loads these manually
 1845         provider_api.ProviderAPIs._clear_registry_instances()
 1846         trust = cli.TrustFlush()
 1847         self.assertRaises(ValueError, trust.main)
 1848 
 1849 
 1850 class TestMappingEngineTester(unit.BaseTestCase):
 1851 
 1852     class FakeConfCommand(object):
 1853         def __init__(self, parent):
 1854             self.extension = False
 1855             self.rules = parent.command_rules
 1856             self.input = parent.command_input
 1857             self.prefix = parent.command_prefix
 1858             self.engine_debug = parent.command_engine_debug
 1859 
 1860     def setUp(self):
 1861         # Set up preset cli options and a parser
 1862         super(TestMappingEngineTester, self).setUp()
 1863         self.mapping_id = uuid.uuid4().hex
 1864         self.rules_pathname = None
 1865         self.rules = None
 1866         self.assertion_pathname = None
 1867         self.assertion = None
 1868         self.logging = self.useFixture(fixtures.LoggerFixture())
 1869         self.useFixture(database.Database())
 1870         self.config_fixture = self.useFixture(oslo_config.fixture.Config(CONF))
 1871         self.config_fixture.register_cli_opt(cli.command_opt)
 1872         # For unit tests that should not throw any erorrs,
 1873         # Use the argument parser to test that the combinations work
 1874         parser_test = argparse.ArgumentParser()
 1875         subparsers = parser_test.add_subparsers()
 1876         self.parser = cli.MappingEngineTester.add_argument_parser(subparsers)
 1877 
 1878     def config_files(self):
 1879         config_files = super(TestMappingEngineTester, self).config_files()
 1880         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
 1881         return config_files
 1882 
 1883     def test_mapping_engine_tester_with_invalid_rules_file(self):
 1884         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1885         tmpinvalidfile = tempfilejson.file_name
 1886         # Here the data required for rules should be in JSON format
 1887         # whereas the file contains text.
 1888         with open(tmpinvalidfile, 'w') as f:
 1889             f.write("This is an invalid data")
 1890         self.command_rules = tmpinvalidfile
 1891         self.command_input = tmpinvalidfile
 1892         self.command_prefix = None
 1893         self.command_engine_debug = True
 1894         self.useFixture(fixtures.MockPatchObject(
 1895             CONF, 'command', self.FakeConfCommand(self)))
 1896         mapping_engine = cli.MappingEngineTester()
 1897         self.assertRaises(SystemExit, mapping_engine.main)
 1898 
 1899     def test_mapping_engine_tester_with_invalid_input_file(self):
 1900         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1901         tmpfilejsonname = tempfilejson.file_name
 1902         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_SMALL)
 1903         with open(tmpfilejsonname, 'w') as f:
 1904             f.write(jsonutils.dumps(updated_mapping))
 1905         self.command_rules = tmpfilejsonname
 1906         # Here invalid.csv does not exist
 1907         self.command_input = "invalid.csv"
 1908         self.command_prefix = None
 1909         self.command_engine_debug = True
 1910         self.useFixture(fixtures.MockPatchObject(
 1911             CONF, 'command', self.FakeConfCommand(self)))
 1912         mapping_engine = cli.MappingEngineTester()
 1913         self.assertRaises(SystemExit, mapping_engine.main)
 1914 
 1915     def test_mapping_engine_tester(self):
 1916         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1917         tmpfilejsonname = tempfilejson.file_name
 1918         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_SMALL)
 1919         with open(tmpfilejsonname, 'w') as f:
 1920             f.write(jsonutils.dumps(updated_mapping))
 1921         self.command_rules = tmpfilejsonname
 1922         tempfile = self.useFixture(temporaryfile.SecureTempFile())
 1923         tmpfilename = tempfile.file_name
 1924         with open(tmpfilename, 'w') as f:
 1925             f.write("\n")
 1926             f.write("UserName:me\n")
 1927             f.write("orgPersonType:NoContractor\n")
 1928             f.write("LastName:Bo\n")
 1929             f.write("FirstName:Jill\n")
 1930         self.command_input = tmpfilename
 1931         self.command_prefix = None
 1932         self.command_engine_debug = True
 1933         self.useFixture(fixtures.MockPatchObject(
 1934             CONF, 'command', self.FakeConfCommand(self)))
 1935         mapping_engine = cli.MappingEngineTester()
 1936         with mock.patch('builtins.print') as mock_print:
 1937             mapping_engine.main()
 1938             self.assertEqual(mock_print.call_count, 3)
 1939             call = mock_print.call_args_list[0]
 1940             args, kwargs = call
 1941             self.assertTrue(args[0].startswith('Using Rules:'))
 1942             call = mock_print.call_args_list[1]
 1943             args, kwargs = call
 1944             self.assertTrue(args[0].startswith('Using Assertion:'))
 1945             call = mock_print.call_args_list[2]
 1946             args, kwargs = call
 1947             expected = {
 1948                 "group_names": [],
 1949                 "user": {
 1950                     "type": "ephemeral",
 1951                     "name": "me"
 1952                 },
 1953                 "projects": [],
 1954                 "group_ids": ["0cd5e9"]
 1955             }
 1956             self.assertEqual(jsonutils.loads(args[0]), expected)
 1957 
 1958     def test_mapping_engine_tester_with_invalid_data(self):
 1959         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1960         tmpfilejsonname = tempfilejson.file_name
 1961         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_SMALL)
 1962         with open(tmpfilejsonname, 'w') as f:
 1963             f.write(jsonutils.dumps(updated_mapping))
 1964         self.command_rules = tmpfilejsonname
 1965         tempfile = self.useFixture(temporaryfile.SecureTempFile())
 1966         tmpfilename = tempfile.file_name
 1967         # Here we do not have any value matching to type 'Email'
 1968         # and condition in mapping_engine_test_rules.json
 1969         with open(tmpfilename, 'w') as f:
 1970             f.write("\n")
 1971             f.write("UserName: me\n")
 1972             f.write("Email: No@example.com\n")
 1973         self.command_input = tmpfilename
 1974         self.command_prefix = None
 1975         self.command_engine_debug = True
 1976         self.useFixture(fixtures.MockPatchObject(
 1977             CONF, 'command', self.FakeConfCommand(self)))
 1978         mapping_engine = cli.MappingEngineTester()
 1979         self.assertRaises(exception.ValidationError,
 1980                           mapping_engine.main)
 1981 
 1982     def test_mapping_engine_tester_logs_direct_maps(self):
 1983         tempfilejson = self.useFixture(temporaryfile.SecureTempFile())
 1984         tmpfilejsonname = tempfilejson.file_name
 1985         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_SMALL)
 1986         with open(tmpfilejsonname, 'w') as f:
 1987             f.write(jsonutils.dumps(updated_mapping))
 1988         self.command_rules = tmpfilejsonname
 1989         tempfile = self.useFixture(temporaryfile.SecureTempFile())
 1990         tmpfilename = tempfile.file_name
 1991         with open(tmpfilename, 'w') as f:
 1992             f.write("\n")
 1993             f.write("UserName:me\n")
 1994             f.write("orgPersonType:NoContractor\n")
 1995             f.write("LastName:Bo\n")
 1996             f.write("FirstName:Jill\n")
 1997         self.command_input = tmpfilename
 1998         self.command_prefix = None
 1999         self.command_engine_debug = True
 2000         self.useFixture(fixtures.MockPatchObject(
 2001             CONF, 'command', self.FakeConfCommand(self)))
 2002         mapping_engine = cli.MappingEngineTester()
 2003         logging = self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
 2004         mapping_engine.main()
 2005         expected_msg = "direct_maps: [['me']]"
 2006         self.assertThat(logging.output, matchers.Contains(expected_msg))
 2007 
 2008 
 2009 class CliStatusTestCase(unit.SQLDriverOverrides, unit.TestCase):
 2010 
 2011     def setUp(self):
 2012         self.useFixture(database.Database())
 2013         super(CliStatusTestCase, self).setUp()
 2014         self.load_backends()
 2015         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
 2016         self.policy_file_name = self.policy_file.file_name
 2017         self.useFixture(
 2018             policy.Policy(
 2019                 self.config_fixture, policy_file=self.policy_file_name
 2020             )
 2021         )
 2022         self.checks = status.Checks()
 2023 
 2024     def test_check_safe_trust_policies(self):
 2025         with open(self.policy_file_name, 'w') as f:
 2026             overridden_policies = {
 2027                 'identity:list_trusts': '',
 2028                 'identity:delete_trust': '',
 2029                 'identity:get_trust': '',
 2030                 'identity:list_roles_for_trust': '',
 2031                 'identity:get_role_for_trust': ''
 2032             }
 2033             f.write(jsonutils.dumps(overridden_policies))
 2034         result = self.checks.check_trust_policies_are_not_empty()
 2035         self.assertEqual(upgradecheck.Code.FAILURE, result.code)
 2036         with open(self.policy_file_name, 'w') as f:
 2037             overridden_policies = {
 2038                 'identity:list_trusts': 'rule:admin_required',
 2039                 'identity:delete_trust': 'rule:admin_required',
 2040                 'identity:get_trust': 'rule:admin_required',
 2041                 'identity:list_roles_for_trust': 'rule:admin_required',
 2042                 'identity:get_role_for_trust': 'rule:admin_required'
 2043             }
 2044             f.write(jsonutils.dumps(overridden_policies))
 2045         result = self.checks.check_trust_policies_are_not_empty()
 2046         self.assertEqual(upgradecheck.Code.SUCCESS, result.code)
 2047         with open(self.policy_file_name, 'w') as f:
 2048             overridden_policies = {}
 2049             f.write(jsonutils.dumps(overridden_policies))
 2050         result = self.checks.check_trust_policies_are_not_empty()
 2051         self.assertEqual(upgradecheck.Code.SUCCESS, result.code)
 2052 
 2053     def test_check_immutable_roles(self):
 2054         role_ref = unit.new_role_ref(name='admin')
 2055         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 2056         result = self.checks.check_default_roles_are_immutable()
 2057         self.assertEqual(upgradecheck.Code.FAILURE, result.code)
 2058         role_ref['options'] = {'immutable': True}
 2059         PROVIDERS.role_api.update_role(role_ref['id'], role_ref)
 2060         result = self.checks.check_default_roles_are_immutable()
 2061         self.assertEqual(upgradecheck.Code.SUCCESS, result.code)
 2062         # Check domain-specific roles are not reported
 2063         PROVIDERS.resource_api.create_domain(
 2064             default_fixtures.ROOT_DOMAIN['id'],
 2065             default_fixtures.ROOT_DOMAIN)
 2066         domain_ref = unit.new_domain_ref()
 2067         domain = PROVIDERS.resource_api.create_domain(
 2068             domain_ref['id'], domain_ref)
 2069         role_ref = unit.new_role_ref(name='admin', domain_id=domain['id'])
 2070         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 2071         result = self.checks.check_default_roles_are_immutable()
 2072         self.assertEqual(upgradecheck.Code.SUCCESS, result.code)