"Fossies" - the Fresh Open Source Software Archive

Member "keystone-16.0.2/keystone/tests/unit/test_backend_ldap.py" (7 Jun 2021, 146303 Bytes) of package /linux/misc/openstack/keystone-16.0.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_backend_ldap.py": 16.0.1_vs_16.0.2.

    1 # -*- coding: utf-8 -*-
    2 # Copyright 2012 OpenStack Foundation
    3 # Copyright 2013 IBM Corp.
    4 #
    5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    6 # not use this file except in compliance with the License. You may obtain
    7 # a copy of the License at
    8 #
    9 #      http://www.apache.org/licenses/LICENSE-2.0
   10 #
   11 # Unless required by applicable law or agreed to in writing, software
   12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   14 # License for the specific language governing permissions and limitations
   15 # under the License.
   16 
   17 import copy
   18 import uuid
   19 
   20 import fixtures
   21 import ldap
   22 import mock
   23 from oslo_log import versionutils
   24 import pkg_resources
   25 from six.moves import http_client
   26 from six.moves import range
   27 from testtools import matchers
   28 
   29 from keystone.common import cache
   30 from keystone.common import driver_hints
   31 from keystone.common import provider_api
   32 import keystone.conf
   33 from keystone import exception
   34 from keystone import identity
   35 from keystone.identity.backends import ldap as ldap_identity
   36 from keystone.identity.backends.ldap import common as common_ldap
   37 from keystone.identity.backends import sql as sql_identity
   38 from keystone.identity.mapping_backends import mapping as map
   39 from keystone.tests import unit
   40 from keystone.tests.unit.assignment import test_backends as assignment_tests
   41 from keystone.tests.unit import default_fixtures
   42 from keystone.tests.unit.identity import test_backends as identity_tests
   43 from keystone.tests.unit import identity_mapping as mapping_sql
   44 from keystone.tests.unit.ksfixtures import database
   45 from keystone.tests.unit.ksfixtures import ldapdb
   46 from keystone.tests.unit.resource import test_backends as resource_tests
   47 
   48 
   49 CONF = keystone.conf.CONF
   50 PROVIDERS = provider_api.ProviderAPIs
   51 
   52 
   53 def _assert_backends(testcase, **kwargs):
   54 
   55     def _get_backend_cls(testcase, subsystem):
   56         observed_backend = getattr(testcase, subsystem + '_api').driver
   57         return observed_backend.__class__
   58 
   59     def _get_domain_specific_backend_cls(manager, domain):
   60         observed_backend = manager.domain_configs.get_domain_driver(domain)
   61         return observed_backend.__class__
   62 
   63     def _get_entrypoint_cls(subsystem, name):
   64         entrypoint = entrypoint_map['keystone.' + subsystem][name]
   65         return entrypoint.resolve()
   66 
   67     def _load_domain_specific_configs(manager):
   68         if (not manager.domain_configs.configured and
   69                 CONF.identity.domain_specific_drivers_enabled):
   70             manager.domain_configs.setup_domain_drivers(
   71                 manager.driver, manager.resource_api)
   72 
   73     def _assert_equal(expected_cls, observed_cls, subsystem,
   74                       domain=None):
   75         msg = ('subsystem %(subsystem)s expected %(expected_cls)r, '
   76                'but observed %(observed_cls)r')
   77         if domain:
   78             subsystem = '%s[domain=%s]' % (subsystem, domain)
   79         assert expected_cls == observed_cls, msg % {
   80             'expected_cls': expected_cls,
   81             'observed_cls': observed_cls,
   82             'subsystem': subsystem,
   83         }
   84 
   85     env = pkg_resources.Environment()
   86     keystone_dist = env['keystone'][0]
   87     entrypoint_map = pkg_resources.get_entry_map(keystone_dist)
   88 
   89     for subsystem, entrypoint_name in kwargs.items():
   90         if isinstance(entrypoint_name, str):
   91             observed_cls = _get_backend_cls(testcase, subsystem)
   92             expected_cls = _get_entrypoint_cls(subsystem, entrypoint_name)
   93             _assert_equal(expected_cls, observed_cls, subsystem)
   94 
   95         elif isinstance(entrypoint_name, dict):
   96             manager = getattr(testcase, subsystem + '_api')
   97             _load_domain_specific_configs(manager)
   98 
   99             for domain, entrypoint_name in entrypoint_name.items():
  100                 if domain is None:
  101                     observed_cls = _get_backend_cls(testcase, subsystem)
  102                     expected_cls = _get_entrypoint_cls(
  103                         subsystem, entrypoint_name)
  104                     _assert_equal(expected_cls, observed_cls, subsystem)
  105                     continue
  106 
  107                 observed_cls = _get_domain_specific_backend_cls(
  108                     manager, domain)
  109                 expected_cls = _get_entrypoint_cls(subsystem, entrypoint_name)
  110                 _assert_equal(expected_cls, observed_cls, subsystem, domain)
  111 
  112         else:
  113             raise ValueError('%r is not an expected value for entrypoint name'
  114                              % entrypoint_name)
  115 
  116 
  117 class IdentityTests(identity_tests.IdentityTests):
  118 
  119     def test_update_domain_set_immutable(self):
  120         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  121 
  122     def test_cannot_delete_disabled_domain_with_immutable(self):
  123         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  124 
  125     def test_delete_immutable_domain(self):
  126         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  127 
  128     def test_create_domain_immutable(self):
  129         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  130 
  131     def test_update_domain_unset_immutable(self):
  132         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  133 
  134     def test_cannot_update_immutable_domain(self):
  135         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  136 
  137     def test_delete_user_with_group_project_domain_links(self):
  138         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  139 
  140     def test_delete_group_with_user_project_domain_links(self):
  141         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  142 
  143     def test_create_duplicate_user_name_in_different_domains(self):
  144         self.skip_test_overrides('Domains are read-only against LDAP')
  145 
  146     def test_create_duplicate_group_name_in_different_domains(self):
  147         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  148 
  149     def test_move_user_between_domains(self):
  150         self.skip_test_overrides('Domains are read-only against LDAP')
  151 
  152     def test_move_group_between_domains(self):
  153         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  154 
  155     def test_arbitrary_attributes_are_returned_from_get_user(self):
  156         self.skip_test_overrides(
  157             "Using arbitrary attributes doesn't work under LDAP")
  158 
  159     def test_new_arbitrary_attributes_are_returned_from_update_user(self):
  160         self.skip_test_overrides(
  161             "Using arbitrary attributes doesn't work under LDAP")
  162 
  163     def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
  164         self.skip_test_overrides(
  165             "Using arbitrary attributes doesn't work under LDAP")
  166 
  167     def test_remove_user_from_group(self):
  168         self.skip_test_overrides('N/A: LDAP does not support write')
  169 
  170     def test_remove_user_from_group_returns_not_found(self):
  171         self.skip_test_overrides('N/A: LDAP does not support write')
  172 
  173     def test_delete_user_returns_not_found(self):
  174         self.skip_test_overrides('N/A: LDAP does not support write')
  175 
  176 
  177 class AssignmentTests(assignment_tests.AssignmentTests):
  178 
  179     def test_get_role_assignment_by_domain_not_found(self):
  180         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  181 
  182     def test_del_role_assignment_by_domain_not_found(self):
  183         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  184 
  185     def test_get_and_remove_role_grant_by_user_and_domain(self):
  186         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  187 
  188     def test_get_and_remove_correct_role_grant_from_a_mix(self):
  189         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  190 
  191     def test_get_and_remove_role_grant_by_group_and_cross_domain(self):
  192         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  193 
  194     def test_get_and_remove_role_grant_by_user_and_cross_domain(self):
  195         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  196 
  197     def test_role_grant_by_group_and_cross_domain_project(self):
  198         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  199 
  200     def test_role_grant_by_user_and_cross_domain_project(self):
  201         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  202 
  203     def test_multi_role_grant_by_user_group_on_project_domain(self):
  204         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  205 
  206     def test_delete_role_with_user_and_group_grants(self):
  207         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  208 
  209     def test_list_role_assignment_containing_names(self):
  210         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  211 
  212     def test_get_roles_for_user_and_domain(self):
  213         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  214 
  215     def test_get_roles_for_groups_on_domain(self):
  216         self.skip_test_overrides(
  217             'N/A: LDAP does not implement get_roles_for_groups; '
  218             'see bug 1333712 for details')
  219 
  220     def test_get_role_by_trustor_and_project(self):
  221         self.skip_test_overrides('Domains are read-only against LDAP')
  222 
  223     def test_get_roles_for_groups_on_project(self):
  224         self.skip_test_overrides(
  225             'N/A: LDAP does not implement get_roles_for_groups; '
  226             'see bug 1333712 for details')
  227 
  228     def test_list_domains_for_groups(self):
  229         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  230 
  231     def test_list_projects_for_groups(self):
  232         self.skip_test_overrides(
  233             'N/A: LDAP does not implement list_projects_for_groups; '
  234             'see bug 1333712 for details')
  235 
  236     def test_multi_group_grants_on_project_domain(self):
  237         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  238 
  239     def test_delete_user_grant_no_user(self):
  240         self.skip_test_overrides('N/A: LDAP has no write support')
  241 
  242     def test_delete_group_grant_no_group(self):
  243         self.skip_test_overrides('N/A: LDAP has no write support')
  244 
  245     def test_delete_user_with_project_roles(self):
  246         self.skip_test_overrides('N/A: LDAP has no write support')
  247 
  248     def test_delete_user_with_project_association(self):
  249         self.skip_test_overrides('N/A: LDAP has no write support')
  250 
  251     def test_delete_group_removes_role_assignments(self):
  252         self.skip_test_overrides('N/A: LDAP has no write support')
  253 
  254 
  255 class ResourceTests(resource_tests.ResourceTests):
  256 
  257     def test_create_duplicate_project_name_in_different_domains(self):
  258         self.skip_test_overrides('Domains are read-only against LDAP')
  259 
  260     def test_move_project_between_domains(self):
  261         self.skip_test_overrides('Domains are read-only against LDAP')
  262 
  263     def test_move_project_between_domains_with_clashing_names_fails(self):
  264         self.skip_test_overrides('Domains are read-only against LDAP')
  265 
  266     def test_domain_delete_hierarchy(self):
  267         self.skip_test_overrides('Domains are read-only against LDAP')
  268 
  269     def test_cache_layer_domain_crud(self):
  270         # TODO(morganfainberg): This also needs to be removed when full LDAP
  271         # implementation is submitted.  No need to duplicate the above test,
  272         # just skip this time.
  273         self.skip_test_overrides('Domains are read-only against LDAP')
  274 
  275     def test_domain_crud(self):
  276         self.skip_test_overrides('N/A: Not relevant for multi ldap testing')
  277 
  278     def test_delete_domain_call_db_time(self):
  279         self.skip_test_overrides('Domains are read-only against LDAP')
  280 
  281     def test_create_project_with_parent_id_and_without_domain_id(self):
  282         self.skip_test_overrides('Resource LDAP has been removed')
  283 
  284     def test_create_domain_under_regular_project_hierarchy_fails(self):
  285         self.skip_test_overrides('Resource LDAP has been removed')
  286 
  287     def test_create_project_passing_is_domain_flag_true(self):
  288         self.skip_test_overrides('Resource LDAP has been removed')
  289 
  290     def test_check_leaf_projects(self):
  291         self.skip_test_overrides('Resource LDAP has been removed')
  292 
  293     def test_list_projects_in_subtree(self):
  294         self.skip_test_overrides('Resource LDAP has been removed')
  295 
  296     def test_list_projects_in_subtree_with_circular_reference(self):
  297         self.skip_test_overrides('Resource LDAP has been removed')
  298 
  299     def test_list_project_parents(self):
  300         self.skip_test_overrides('Resource LDAP has been removed')
  301 
  302     def test_update_project_enabled_cascade(self):
  303         self.skip_test_overrides('Resource LDAP has been removed')
  304 
  305     def test_cannot_enable_cascade_with_parent_disabled(self):
  306         self.skip_test_overrides('Resource LDAP has been removed')
  307 
  308     def test_hierarchical_projects_crud(self):
  309         self.skip_test_overrides('Resource LDAP has been removed')
  310 
  311     def test_create_project_under_disabled_one(self):
  312         self.skip_test_overrides('Resource LDAP has been removed')
  313 
  314     def test_create_project_with_invalid_parent(self):
  315         self.skip_test_overrides('Resource LDAP has been removed')
  316 
  317     def test_update_project_parent(self):
  318         self.skip_test_overrides('Resource LDAP has been removed')
  319 
  320     def test_enable_project_with_disabled_parent(self):
  321         self.skip_test_overrides('Resource LDAP has been removed')
  322 
  323     def test_disable_hierarchical_leaf_project(self):
  324         self.skip_test_overrides('Resource LDAP has been removed')
  325 
  326     def test_disable_hierarchical_not_leaf_project(self):
  327         self.skip_test_overrides('Resource LDAP has been removed')
  328 
  329     def test_delete_hierarchical_leaf_project(self):
  330         self.skip_test_overrides('Resource LDAP has been removed')
  331 
  332     def test_delete_hierarchical_not_leaf_project(self):
  333         self.skip_test_overrides('Resource LDAP has been removed')
  334 
  335     def test_check_hierarchy_depth(self):
  336         self.skip_test_overrides('Resource LDAP has been removed')
  337 
  338     def test_list_projects_for_alternate_domain(self):
  339         self.skip_test_overrides('N/A: LDAP does not support multiple domains')
  340 
  341 
  342 class LDAPTestSetup(object):
  343     """Common setup for LDAP tests."""
  344 
  345     def setUp(self):
  346         super(LDAPTestSetup, self).setUp()
  347         self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
  348         self.useFixture(database.Database())
  349 
  350         self.load_backends()
  351         self.load_fixtures(default_fixtures)
  352         self.assert_backends()
  353 
  354 
  355 class BaseLDAPIdentity(LDAPTestSetup, IdentityTests, AssignmentTests,
  356                        ResourceTests):
  357 
  358     def _get_domain_fixture(self):
  359         """Return the static domain, since domains in LDAP are read-only."""
  360         return PROVIDERS.resource_api.get_domain(
  361             CONF.identity.default_domain_id
  362         )
  363 
  364     def get_config(self, domain_id):
  365         # Only one conf structure unless we are using separate domain backends
  366         return CONF
  367 
  368     def config_overrides(self):
  369         super(BaseLDAPIdentity, self).config_overrides()
  370         self.config_fixture.config(group='identity', driver='ldap')
  371 
  372     def config_files(self):
  373         config_files = super(BaseLDAPIdentity, self).config_files()
  374         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
  375         return config_files
  376 
  377     def new_user_ref(self, domain_id, project_id=None, **kwargs):
  378         ref = unit.new_user_ref(domain_id=domain_id, project_id=project_id,
  379                                 **kwargs)
  380         if 'id' not in kwargs:
  381             del ref['id']
  382         return ref
  383 
  384     def get_user_enabled_vals(self, user):
  385         user_dn = (
  386             PROVIDERS.identity_api.driver.user._id_to_dn_string(user['id']))
  387         enabled_attr_name = CONF.ldap.user_enabled_attribute
  388 
  389         ldap_ = PROVIDERS.identity_api.driver.user.get_connection()
  390         res = ldap_.search_s(user_dn,
  391                              ldap.SCOPE_BASE,
  392                              u'(sn=%s)' % user['name'])
  393         if enabled_attr_name in res[0][1]:
  394             return res[0][1][enabled_attr_name]
  395         else:
  396             return None
  397 
  398     def test_build_tree(self):
  399         """Regression test for building the tree names."""
  400         user_api = identity.backends.ldap.UserApi(CONF)
  401         self.assertTrue(user_api)
  402         self.assertEqual("ou=Users,%s" % CONF.ldap.suffix, user_api.tree_dn)
  403 
  404     def test_configurable_allowed_user_actions(self):
  405         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
  406         user = PROVIDERS.identity_api.create_user(user)
  407         PROVIDERS.identity_api.get_user(user['id'])
  408 
  409         user['password'] = u'fäképass2'
  410         PROVIDERS.identity_api.update_user(user['id'], user)
  411 
  412         self.assertRaises(exception.Forbidden,
  413                           PROVIDERS.identity_api.delete_user,
  414                           user['id'])
  415 
  416     def test_user_filter(self):
  417         user_ref = PROVIDERS.identity_api.get_user(self.user_foo['id'])
  418         self.user_foo.pop('password')
  419         self.assertDictEqual(self.user_foo, user_ref)
  420 
  421         driver = PROVIDERS.identity_api._select_identity_driver(
  422             user_ref['domain_id'])
  423         driver.user.ldap_filter = '(CN=DOES_NOT_MATCH)'
  424         # invalidate the cache if the result is cached.
  425         PROVIDERS.identity_api.get_user.invalidate(
  426             PROVIDERS.identity_api, self.user_foo['id']
  427         )
  428         self.assertRaises(exception.UserNotFound,
  429                           PROVIDERS.identity_api.get_user,
  430                           self.user_foo['id'])
  431 
  432     def test_list_users_by_name_and_with_filter(self):
  433         # confirm that the user is not exposed when it does not match the
  434         # filter setting in conf even if it is requested by name in user list
  435         hints = driver_hints.Hints()
  436         hints.add_filter('name', self.user_foo['name'])
  437         domain_id = self.user_foo['domain_id']
  438         driver = PROVIDERS.identity_api._select_identity_driver(domain_id)
  439         driver.user.ldap_filter = ('(|(cn=%s)(cn=%s))' %
  440                                    (self.user_sna['id'], self.user_two['id']))
  441         users = PROVIDERS.identity_api.list_users(
  442             domain_scope=self._set_domain_scope(domain_id),
  443             hints=hints)
  444         self.assertEqual(0, len(users))
  445 
  446     def test_list_groups_by_name_and_with_filter(self):
  447         # Create some test groups.
  448         domain = self._get_domain_fixture()
  449         group_names = []
  450         numgroups = 3
  451         for _ in range(numgroups):
  452             group = unit.new_group_ref(domain_id=domain['id'])
  453             group = PROVIDERS.identity_api.create_group(group)
  454             group_names.append(group['name'])
  455         # confirm that the groups can all be listed
  456         groups = PROVIDERS.identity_api.list_groups(
  457             domain_scope=self._set_domain_scope(domain['id']))
  458         self.assertEqual(numgroups, len(groups))
  459         # configure the group filter
  460         driver = PROVIDERS.identity_api._select_identity_driver(domain['id'])
  461         driver.group.ldap_filter = ('(|(ou=%s)(ou=%s))' %
  462                                     tuple(group_names[:2]))
  463         # confirm that the group filter is working
  464         groups = PROVIDERS.identity_api.list_groups(
  465             domain_scope=self._set_domain_scope(domain['id']))
  466         self.assertEqual(2, len(groups))
  467         # confirm that a group is not exposed when it does not match the
  468         # filter setting in conf even if it is requested by name in group list
  469         hints = driver_hints.Hints()
  470         hints.add_filter('name', group_names[2])
  471         groups = PROVIDERS.identity_api.list_groups(
  472             domain_scope=self._set_domain_scope(domain['id']),
  473             hints=hints)
  474         self.assertEqual(0, len(groups))
  475 
  476     def test_remove_role_grant_from_user_and_project(self):
  477         PROVIDERS.assignment_api.create_grant(
  478             user_id=self.user_foo['id'],
  479             project_id=self.project_baz['id'],
  480             role_id=default_fixtures.MEMBER_ROLE_ID)
  481         roles_ref = PROVIDERS.assignment_api.list_grants(
  482             user_id=self.user_foo['id'],
  483             project_id=self.project_baz['id'])
  484         self.assertDictEqual(self.role_member, roles_ref[0])
  485 
  486         PROVIDERS.assignment_api.delete_grant(
  487             user_id=self.user_foo['id'],
  488             project_id=self.project_baz['id'],
  489             role_id=default_fixtures.MEMBER_ROLE_ID)
  490         roles_ref = PROVIDERS.assignment_api.list_grants(
  491             user_id=self.user_foo['id'],
  492             project_id=self.project_baz['id'])
  493         self.assertEqual(0, len(roles_ref))
  494         self.assertRaises(exception.RoleAssignmentNotFound,
  495                           PROVIDERS.assignment_api.delete_grant,
  496                           user_id=self.user_foo['id'],
  497                           project_id=self.project_baz['id'],
  498                           role_id=default_fixtures.MEMBER_ROLE_ID)
  499 
  500     def test_get_and_remove_role_grant_by_group_and_project(self):
  501         new_domain = self._get_domain_fixture()
  502         new_group = unit.new_group_ref(domain_id=new_domain['id'])
  503         new_group = PROVIDERS.identity_api.create_group(new_group)
  504         new_user = self.new_user_ref(domain_id=new_domain['id'])
  505         new_user = PROVIDERS.identity_api.create_user(new_user)
  506         PROVIDERS.identity_api.add_user_to_group(
  507             new_user['id'], new_group['id']
  508         )
  509 
  510         roles_ref = PROVIDERS.assignment_api.list_grants(
  511             group_id=new_group['id'],
  512             project_id=self.project_bar['id'])
  513         self.assertEqual([], roles_ref)
  514         self.assertEqual(0, len(roles_ref))
  515 
  516         PROVIDERS.assignment_api.create_grant(
  517             group_id=new_group['id'],
  518             project_id=self.project_bar['id'],
  519             role_id=default_fixtures.MEMBER_ROLE_ID)
  520         roles_ref = PROVIDERS.assignment_api.list_grants(
  521             group_id=new_group['id'],
  522             project_id=self.project_bar['id'])
  523         self.assertNotEmpty(roles_ref)
  524         self.assertDictEqual(self.role_member, roles_ref[0])
  525 
  526         PROVIDERS.assignment_api.delete_grant(
  527             group_id=new_group['id'],
  528             project_id=self.project_bar['id'],
  529             role_id=default_fixtures.MEMBER_ROLE_ID)
  530         roles_ref = PROVIDERS.assignment_api.list_grants(
  531             group_id=new_group['id'],
  532             project_id=self.project_bar['id'])
  533         self.assertEqual(0, len(roles_ref))
  534         self.assertRaises(exception.RoleAssignmentNotFound,
  535                           PROVIDERS.assignment_api.delete_grant,
  536                           group_id=new_group['id'],
  537                           project_id=self.project_bar['id'],
  538                           role_id=default_fixtures.MEMBER_ROLE_ID)
  539 
  540     def test_get_and_remove_role_grant_by_group_and_domain(self):
  541         # TODO(henry-nash): We should really rewrite the tests in
  542         # unit.resource.test_backends to be more flexible as to where the
  543         # domains are sourced from, so that we would not need to override such
  544         # tests here. This is raised as bug 1373865.
  545         new_domain = self._get_domain_fixture()
  546         new_group = unit.new_group_ref(domain_id=new_domain['id'],)
  547         new_group = PROVIDERS.identity_api.create_group(new_group)
  548         new_user = self.new_user_ref(domain_id=new_domain['id'])
  549         new_user = PROVIDERS.identity_api.create_user(new_user)
  550         PROVIDERS.identity_api.add_user_to_group(
  551             new_user['id'], new_group['id']
  552         )
  553 
  554         roles_ref = PROVIDERS.assignment_api.list_grants(
  555             group_id=new_group['id'],
  556             domain_id=new_domain['id'])
  557         self.assertEqual(0, len(roles_ref))
  558 
  559         PROVIDERS.assignment_api.create_grant(
  560             group_id=new_group['id'],
  561             domain_id=new_domain['id'],
  562             role_id=default_fixtures.MEMBER_ROLE_ID)
  563 
  564         roles_ref = PROVIDERS.assignment_api.list_grants(
  565             group_id=new_group['id'],
  566             domain_id=new_domain['id'])
  567         self.assertDictEqual(self.role_member, roles_ref[0])
  568 
  569         PROVIDERS.assignment_api.delete_grant(
  570             group_id=new_group['id'],
  571             domain_id=new_domain['id'],
  572             role_id=default_fixtures.MEMBER_ROLE_ID)
  573         roles_ref = PROVIDERS.assignment_api.list_grants(
  574             group_id=new_group['id'],
  575             domain_id=new_domain['id'])
  576         self.assertEqual(0, len(roles_ref))
  577         self.assertRaises(exception.NotFound,
  578                           PROVIDERS.assignment_api.delete_grant,
  579                           group_id=new_group['id'],
  580                           domain_id=new_domain['id'],
  581                           role_id=default_fixtures.MEMBER_ROLE_ID)
  582 
  583     def test_list_projects_for_user(self):
  584         domain = self._get_domain_fixture()
  585         user1 = self.new_user_ref(domain_id=domain['id'])
  586         user1 = PROVIDERS.identity_api.create_user(user1)
  587         user_projects = PROVIDERS.assignment_api.list_projects_for_user(
  588             user1['id']
  589         )
  590         self.assertThat(user_projects, matchers.HasLength(0))
  591 
  592         # new grant(user1, role_member, project_bar)
  593         PROVIDERS.assignment_api.create_grant(
  594             user_id=user1['id'], project_id=self.project_bar['id'],
  595             role_id=self.role_member['id']
  596         )
  597         # new grant(user1, role_member, project_baz)
  598         PROVIDERS.assignment_api.create_grant(
  599             user_id=user1['id'], project_id=self.project_baz['id'],
  600             role_id=self.role_member['id']
  601         )
  602         user_projects = PROVIDERS.assignment_api.list_projects_for_user(
  603             user1['id']
  604         )
  605         self.assertThat(user_projects, matchers.HasLength(2))
  606 
  607         # Now, check number of projects through groups
  608         user2 = self.new_user_ref(domain_id=domain['id'])
  609         user2 = PROVIDERS.identity_api.create_user(user2)
  610 
  611         group1 = unit.new_group_ref(domain_id=domain['id'])
  612         group1 = PROVIDERS.identity_api.create_group(group1)
  613 
  614         PROVIDERS.identity_api.add_user_to_group(user2['id'], group1['id'])
  615 
  616         # new grant(group1(user2), role_member, project_bar)
  617         PROVIDERS.assignment_api.create_grant(
  618             group_id=group1['id'], project_id=self.project_bar['id'],
  619             role_id=self.role_member['id']
  620         )
  621         # new grant(group1(user2), role_member, project_baz)
  622         PROVIDERS.assignment_api.create_grant(
  623             group_id=group1['id'], project_id=self.project_baz['id'],
  624             role_id=self.role_member['id']
  625         )
  626         user_projects = PROVIDERS.assignment_api.list_projects_for_user(
  627             user2['id']
  628         )
  629         self.assertThat(user_projects, matchers.HasLength(2))
  630 
  631         # new grant(group1(user2), role_other, project_bar)
  632         PROVIDERS.assignment_api.create_grant(
  633             group_id=group1['id'], project_id=self.project_bar['id'],
  634             role_id=self.role_other['id']
  635         )
  636         user_projects = PROVIDERS.assignment_api.list_projects_for_user(
  637             user2['id']
  638         )
  639         self.assertThat(user_projects, matchers.HasLength(2))
  640 
  641     def test_list_projects_for_user_and_groups(self):
  642         domain = self._get_domain_fixture()
  643         # Create user1
  644         user1 = self.new_user_ref(domain_id=domain['id'])
  645         user1 = PROVIDERS.identity_api.create_user(user1)
  646 
  647         # Create new group for user1
  648         group1 = unit.new_group_ref(domain_id=domain['id'])
  649         group1 = PROVIDERS.identity_api.create_group(group1)
  650 
  651         # Add user1 to group1
  652         PROVIDERS.identity_api.add_user_to_group(user1['id'], group1['id'])
  653 
  654         # Now, add grant to user1 and group1 in project_bar
  655         PROVIDERS.assignment_api.create_grant(
  656             user_id=user1['id'], project_id=self.project_bar['id'],
  657             role_id=self.role_member['id']
  658         )
  659         PROVIDERS.assignment_api.create_grant(
  660             group_id=group1['id'], project_id=self.project_bar['id'],
  661             role_id=self.role_member['id']
  662         )
  663 
  664         # The result is user1 has only one project granted
  665         user_projects = PROVIDERS.assignment_api.list_projects_for_user(
  666             user1['id']
  667         )
  668         self.assertThat(user_projects, matchers.HasLength(1))
  669 
  670         # Now, delete user1 grant into project_bar and check
  671         PROVIDERS.assignment_api.delete_grant(
  672             user_id=user1['id'], project_id=self.project_bar['id'],
  673             role_id=self.role_member['id']
  674         )
  675 
  676         # The result is user1 has only one project granted.
  677         # Granted through group1.
  678         user_projects = PROVIDERS.assignment_api.list_projects_for_user(
  679             user1['id']
  680         )
  681         self.assertThat(user_projects, matchers.HasLength(1))
  682 
  683     def test_list_projects_for_user_with_grants(self):
  684         domain = self._get_domain_fixture()
  685         new_user = self.new_user_ref(domain_id=domain['id'])
  686         new_user = PROVIDERS.identity_api.create_user(new_user)
  687 
  688         group1 = unit.new_group_ref(domain_id=domain['id'])
  689         group1 = PROVIDERS.identity_api.create_group(group1)
  690         group2 = unit.new_group_ref(domain_id=domain['id'])
  691         group2 = PROVIDERS.identity_api.create_group(group2)
  692 
  693         project1 = unit.new_project_ref(domain_id=domain['id'])
  694         PROVIDERS.resource_api.create_project(project1['id'], project1)
  695         project2 = unit.new_project_ref(domain_id=domain['id'])
  696         PROVIDERS.resource_api.create_project(project2['id'], project2)
  697 
  698         PROVIDERS.identity_api.add_user_to_group(
  699             new_user['id'], group1['id']
  700         )
  701         PROVIDERS.identity_api.add_user_to_group(
  702             new_user['id'], group2['id']
  703         )
  704 
  705         PROVIDERS.assignment_api.create_grant(
  706             user_id=new_user['id'], project_id=self.project_bar['id'],
  707             role_id=self.role_member['id']
  708         )
  709         PROVIDERS.assignment_api.create_grant(
  710             user_id=new_user['id'], project_id=project1['id'],
  711             role_id=self.role_admin['id']
  712         )
  713         PROVIDERS.assignment_api.create_grant(
  714             group_id=group2['id'], project_id=project2['id'],
  715             role_id=self.role_admin['id']
  716         )
  717 
  718         user_projects = PROVIDERS.assignment_api.list_projects_for_user(
  719             new_user['id'])
  720         self.assertEqual(3, len(user_projects))
  721 
  722     def test_list_role_assignments_unfiltered(self):
  723         new_domain = self._get_domain_fixture()
  724         new_user = self.new_user_ref(domain_id=new_domain['id'])
  725         new_user = PROVIDERS.identity_api.create_user(new_user)
  726         new_group = unit.new_group_ref(domain_id=new_domain['id'])
  727         new_group = PROVIDERS.identity_api.create_group(new_group)
  728         new_project = unit.new_project_ref(domain_id=new_domain['id'])
  729         PROVIDERS.resource_api.create_project(new_project['id'], new_project)
  730 
  731         # First check how many role grant already exist
  732         existing_assignments = len(
  733             PROVIDERS.assignment_api.list_role_assignments()
  734         )
  735 
  736         PROVIDERS.assignment_api.create_grant(
  737             user_id=new_user['id'],
  738             project_id=new_project['id'],
  739             role_id=default_fixtures.OTHER_ROLE_ID)
  740         PROVIDERS.assignment_api.create_grant(
  741             group_id=new_group['id'],
  742             project_id=new_project['id'],
  743             role_id=default_fixtures.ADMIN_ROLE_ID)
  744 
  745         # Read back the list of assignments - check it is gone up by 2
  746         after_assignments = len(
  747             PROVIDERS.assignment_api.list_role_assignments()
  748         )
  749         self.assertEqual(existing_assignments + 2, after_assignments)
  750 
  751     def test_list_group_members_when_no_members(self):
  752         # List group members when there is no member in the group.
  753         # No exception should be raised.
  754         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  755         group = PROVIDERS.identity_api.create_group(group)
  756 
  757         # If this doesn't raise, then the test is successful.
  758         PROVIDERS.identity_api.list_users_in_group(group['id'])
  759 
  760     def test_list_domains(self):
  761         # We have more domains here than the parent class, check for the
  762         # correct number of domains for the multildap backend configs
  763         domain1 = unit.new_domain_ref()
  764         domain2 = unit.new_domain_ref()
  765         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
  766         PROVIDERS.resource_api.create_domain(domain2['id'], domain2)
  767         domains = PROVIDERS.resource_api.list_domains()
  768         self.assertEqual(7, len(domains))
  769         domain_ids = []
  770         for domain in domains:
  771             domain_ids.append(domain.get('id'))
  772         self.assertIn(CONF.identity.default_domain_id, domain_ids)
  773         self.assertIn(domain1['id'], domain_ids)
  774         self.assertIn(domain2['id'], domain_ids)
  775 
  776     def test_authenticate_requires_simple_bind(self):
  777         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
  778         user = PROVIDERS.identity_api.create_user(user)
  779         role_member = unit.new_role_ref()
  780         PROVIDERS.role_api.create_role(role_member['id'], role_member)
  781         PROVIDERS.assignment_api.add_role_to_user_and_project(
  782             user['id'], self.project_baz['id'], role_member['id']
  783         )
  784         driver = PROVIDERS.identity_api._select_identity_driver(
  785             user['domain_id'])
  786         driver.user.LDAP_USER = None
  787         driver.user.LDAP_PASSWORD = None
  788 
  789         with self.make_request():
  790             self.assertRaises(AssertionError,
  791                               PROVIDERS.identity_api.authenticate,
  792                               user_id=user['id'],
  793                               password=None)
  794 
  795     @mock.patch.object(versionutils, 'report_deprecated_feature')
  796     def test_user_crud(self, mock_deprecator):
  797         # NOTE(stevemar): As of the Mitaka release, we now check for calls that
  798         # the LDAP write functionality has been deprecated.
  799         user_dict = self.new_user_ref(
  800             domain_id=CONF.identity.default_domain_id)
  801         user = PROVIDERS.identity_api.create_user(user_dict)
  802         args, _kwargs = mock_deprecator.call_args
  803         self.assertIn("create_user for the LDAP identity backend", args[1])
  804 
  805         del user_dict['password']
  806         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  807         user_ref_dict = {x: user_ref[x] for x in user_ref}
  808         self.assertDictContainsSubset(user_dict, user_ref_dict)
  809 
  810         user_dict['password'] = uuid.uuid4().hex
  811         PROVIDERS.identity_api.update_user(user['id'], user_dict)
  812         args, _kwargs = mock_deprecator.call_args
  813         self.assertIn("update_user for the LDAP identity backend", args[1])
  814 
  815         del user_dict['password']
  816         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  817         user_ref_dict = {x: user_ref[x] for x in user_ref}
  818         self.assertDictContainsSubset(user_dict, user_ref_dict)
  819 
  820     # The group and domain CRUD tests below override the standard ones in
  821     # unit.identity.test_backends.py so that we can exclude the update name
  822     # test, since we do not (and will not) support the update of either group
  823     # or domain names with LDAP. In the tests below, the update is tested by
  824     # updating description.
  825     @mock.patch.object(versionutils, 'report_deprecated_feature')
  826     def test_group_crud(self, mock_deprecator):
  827         # NOTE(stevemar): As of the Mitaka release, we now check for calls that
  828         # the LDAP write functionality has been deprecated.
  829         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  830         group = PROVIDERS.identity_api.create_group(group)
  831         args, _kwargs = mock_deprecator.call_args
  832         self.assertIn("create_group for the LDAP identity backend", args[1])
  833 
  834         group_ref = PROVIDERS.identity_api.get_group(group['id'])
  835         self.assertDictEqual(group, group_ref)
  836         group['description'] = uuid.uuid4().hex
  837         PROVIDERS.identity_api.update_group(group['id'], group)
  838         args, _kwargs = mock_deprecator.call_args
  839         self.assertIn("update_group for the LDAP identity backend", args[1])
  840 
  841         group_ref = PROVIDERS.identity_api.get_group(group['id'])
  842         self.assertDictEqual(group, group_ref)
  843 
  844     @mock.patch.object(versionutils, 'report_deprecated_feature')
  845     def test_add_user_group_deprecated(self, mock_deprecator):
  846         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  847         group = PROVIDERS.identity_api.create_group(group)
  848         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  849         user = PROVIDERS.identity_api.create_user(user)
  850         PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
  851         args, _kwargs = mock_deprecator.call_args
  852         self.assertIn("add_user_to_group for the LDAP identity", args[1])
  853 
  854     @unit.skip_if_cache_disabled('identity')
  855     def test_cache_layer_group_crud(self):
  856         # Note(knikolla): Since delete logic has been deleted from LDAP,
  857         # this doesn't test caching on delete.
  858         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  859         group = PROVIDERS.identity_api.create_group(group)
  860         # cache the result
  861         PROVIDERS.identity_api.get_group(group['id'])
  862         group['description'] = uuid.uuid4().hex
  863         group_ref = PROVIDERS.identity_api.update_group(group['id'], group)
  864         self.assertDictContainsSubset(
  865             PROVIDERS.identity_api.get_group(group['id']), group_ref
  866         )
  867 
  868     @unit.skip_if_cache_disabled('identity')
  869     def test_cache_layer_get_user(self):
  870         # Note(knikolla): Since delete logic has been deleted from LDAP,
  871         # this doesn't test caching on delete.
  872         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  873         user = PROVIDERS.identity_api.create_user(user)
  874         ref = PROVIDERS.identity_api.get_user_by_name(
  875             user['name'], user['domain_id']
  876         )
  877         user['description'] = uuid.uuid4().hex
  878         # cache the result.
  879         PROVIDERS.identity_api.get_user(ref['id'])
  880         # update using identity api and get back updated user.
  881         user_updated = PROVIDERS.identity_api.update_user(ref['id'], user)
  882         self.assertDictContainsSubset(
  883             PROVIDERS.identity_api.get_user(ref['id']), user_updated
  884         )
  885         self.assertDictContainsSubset(
  886             PROVIDERS.identity_api.get_user_by_name(
  887                 ref['name'], ref['domain_id']
  888             ),
  889             user_updated
  890         )
  891 
  892     @unit.skip_if_cache_disabled('identity')
  893     def test_cache_layer_get_user_by_name(self):
  894         # Note(knikolla): Since delete logic has been deleted from LDAP,
  895         # this doesn't test caching on delete.
  896         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  897         user = PROVIDERS.identity_api.create_user(user)
  898         ref = PROVIDERS.identity_api.get_user_by_name(
  899             user['name'], user['domain_id']
  900         )
  901         user['description'] = uuid.uuid4().hex
  902         user_updated = PROVIDERS.identity_api.update_user(ref['id'], user)
  903         self.assertDictContainsSubset(
  904             PROVIDERS.identity_api.get_user(ref['id']), user_updated
  905         )
  906         self.assertDictContainsSubset(
  907             PROVIDERS.identity_api.get_user_by_name(
  908                 ref['name'], ref['domain_id']
  909             ),
  910             user_updated
  911         )
  912 
  913     def test_create_user_none_mapping(self):
  914         # When create a user where an attribute maps to None, the entry is
  915         # created without that attribute and it doesn't fail with a TypeError.
  916         driver = PROVIDERS.identity_api._select_identity_driver(
  917             CONF.identity.default_domain_id)
  918         driver.user.attribute_ignore = ['enabled', 'email',
  919                                         'projects', 'projectId']
  920         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id,
  921                                  project_id='maps_to_none')
  922 
  923         # If this doesn't raise, then the test is successful.
  924         user = PROVIDERS.identity_api.create_user(user)
  925 
  926     def test_unignored_user_none_mapping(self):
  927         # Ensure that an attribute that maps to None that is not explicitly
  928         # ignored in configuration is implicitly ignored without triggering
  929         # an error.
  930         driver = PROVIDERS.identity_api._select_identity_driver(
  931             CONF.identity.default_domain_id)
  932         driver.user.attribute_ignore = ['enabled', 'email',
  933                                         'projects', 'projectId']
  934 
  935         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
  936 
  937         user_ref = PROVIDERS.identity_api.create_user(user)
  938 
  939         # If this doesn't raise, then the test is successful.
  940         PROVIDERS.identity_api.get_user(user_ref['id'])
  941 
  942     def test_update_user_name(self):
  943         """A user's name cannot be changed through the LDAP driver."""
  944         self.assertRaises(exception.Conflict,
  945                           super(BaseLDAPIdentity, self).test_update_user_name)
  946 
  947     def test_user_id_comma(self):
  948         """Even if the user has a , in their ID, groups can be listed."""
  949         # Create a user with a , in their ID
  950         # NOTE(blk-u): the DN for this user is hard-coded in fakeldap!
  951 
  952         # Since we want to fake up this special ID, we'll squirt this
  953         # direct into the driver and bypass the manager layer.
  954         user_id = u'Doe, John'
  955         user = self.new_user_ref(id=user_id,
  956                                  domain_id=CONF.identity.default_domain_id)
  957         user = PROVIDERS.identity_api.driver.create_user(user_id, user)
  958 
  959         # Now we'll use the manager to discover it, which will create a
  960         # Public ID for it.
  961         ref_list = PROVIDERS.identity_api.list_users()
  962         public_user_id = None
  963         for ref in ref_list:
  964             if ref['name'] == user['name']:
  965                 public_user_id = ref['id']
  966                 break
  967 
  968         # Create a group
  969         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  970         group_id = group['id']
  971         group = PROVIDERS.identity_api.driver.create_group(group_id, group)
  972         # Now we'll use the manager to discover it, which will create a
  973         # Public ID for it.
  974         ref_list = PROVIDERS.identity_api.list_groups()
  975         public_group_id = None
  976         for ref in ref_list:
  977             if ref['name'] == group['name']:
  978                 public_group_id = ref['id']
  979                 break
  980 
  981         # Put the user in the group
  982         PROVIDERS.identity_api.add_user_to_group(
  983             public_user_id, public_group_id
  984         )
  985 
  986         # List groups for user.
  987         ref_list = PROVIDERS.identity_api.list_groups_for_user(public_user_id)
  988 
  989         group['id'] = public_group_id
  990         self.assertThat(ref_list, matchers.Equals([group]))
  991 
  992     def test_user_id_comma_grants(self):
  993         """List user and group grants, even with a comma in the user's ID."""
  994         # Create a user with a , in their ID
  995         # NOTE(blk-u): the DN for this user is hard-coded in fakeldap!
  996 
  997         # Since we want to fake up this special ID, we'll squirt this
  998         # direct into the driver and bypass the manager layer
  999         user_id = u'Doe, John'
 1000         user = self.new_user_ref(id=user_id,
 1001                                  domain_id=CONF.identity.default_domain_id)
 1002         PROVIDERS.identity_api.driver.create_user(user_id, user)
 1003 
 1004         # Now we'll use the manager to discover it, which will create a
 1005         # Public ID for it.
 1006         ref_list = PROVIDERS.identity_api.list_users()
 1007         public_user_id = None
 1008         for ref in ref_list:
 1009             if ref['name'] == user['name']:
 1010                 public_user_id = ref['id']
 1011                 break
 1012 
 1013         # Grant the user a role on a project.
 1014 
 1015         role_id = default_fixtures.MEMBER_ROLE_ID
 1016         project_id = self.project_baz['id']
 1017 
 1018         PROVIDERS.assignment_api.create_grant(
 1019             role_id, user_id=public_user_id, project_id=project_id
 1020         )
 1021 
 1022         role_ref = PROVIDERS.assignment_api.get_grant(
 1023             role_id, user_id=public_user_id, project_id=project_id
 1024         )
 1025 
 1026         self.assertEqual(role_id, role_ref['id'])
 1027 
 1028     def test_user_enabled_ignored_disable_error(self):
 1029         # When the server is configured so that the enabled attribute is
 1030         # ignored for users, users cannot be disabled.
 1031 
 1032         self.config_fixture.config(group='ldap',
 1033                                    user_attribute_ignore=['enabled'])
 1034 
 1035         # Need to re-load backends for the config change to take effect.
 1036         self.load_backends()
 1037 
 1038         # Attempt to disable the user.
 1039         self.assertRaises(
 1040             exception.ForbiddenAction,
 1041             PROVIDERS.identity_api.update_user, self.user_foo['id'],
 1042             {'enabled': False}
 1043         )
 1044 
 1045         user_info = PROVIDERS.identity_api.get_user(self.user_foo['id'])
 1046 
 1047         # If 'enabled' is ignored then 'enabled' isn't returned as part of the
 1048         # ref.
 1049         self.assertNotIn('enabled', user_info)
 1050 
 1051     def test_group_enabled_ignored_disable_error(self):
 1052         # When the server is configured so that the enabled attribute is
 1053         # ignored for groups, groups cannot be disabled.
 1054 
 1055         self.config_fixture.config(group='ldap',
 1056                                    group_attribute_ignore=['enabled'])
 1057 
 1058         # Need to re-load backends for the config change to take effect.
 1059         self.load_backends()
 1060 
 1061         # There's no group fixture so create a group.
 1062         new_domain = self._get_domain_fixture()
 1063         new_group = unit.new_group_ref(domain_id=new_domain['id'])
 1064         new_group = PROVIDERS.identity_api.create_group(new_group)
 1065 
 1066         # Attempt to disable the group.
 1067         self.assertRaises(exception.ForbiddenAction,
 1068                           PROVIDERS.identity_api.update_group, new_group['id'],
 1069                           {'enabled': False})
 1070 
 1071         group_info = PROVIDERS.identity_api.get_group(new_group['id'])
 1072 
 1073         # If 'enabled' is ignored then 'enabled' isn't returned as part of the
 1074         # ref.
 1075         self.assertNotIn('enabled', group_info)
 1076 
 1077     def test_list_role_assignment_by_domain(self):
 1078         """Multiple domain assignments are not supported."""
 1079         self.assertRaises(
 1080             (exception.Forbidden, exception.DomainNotFound,
 1081              exception.ValidationError),
 1082             super(BaseLDAPIdentity, self).test_list_role_assignment_by_domain)
 1083 
 1084     def test_list_role_assignment_by_user_with_domain_group_roles(self):
 1085         """Multiple domain assignments are not supported."""
 1086         self.assertRaises(
 1087             (exception.Forbidden, exception.DomainNotFound,
 1088              exception.ValidationError),
 1089             super(BaseLDAPIdentity, self).
 1090             test_list_role_assignment_by_user_with_domain_group_roles)
 1091 
 1092     def test_list_role_assignment_using_sourced_groups_with_domains(self):
 1093         """Multiple domain assignments are not supported."""
 1094         self.assertRaises(
 1095             (exception.Forbidden, exception.ValidationError,
 1096              exception.DomainNotFound),
 1097             super(BaseLDAPIdentity, self).
 1098             test_list_role_assignment_using_sourced_groups_with_domains)
 1099 
 1100     def test_create_project_with_domain_id_and_without_parent_id(self):
 1101         """Multiple domains are not supported."""
 1102         self.assertRaises(
 1103             exception.ValidationError,
 1104             super(BaseLDAPIdentity, self).
 1105             test_create_project_with_domain_id_and_without_parent_id)
 1106 
 1107     def test_create_project_with_domain_id_mismatch_to_parent_domain(self):
 1108         """Multiple domains are not supported."""
 1109         self.assertRaises(
 1110             exception.ValidationError,
 1111             super(BaseLDAPIdentity, self).
 1112             test_create_project_with_domain_id_mismatch_to_parent_domain)
 1113 
 1114     def test_remove_foreign_assignments_when_deleting_a_domain(self):
 1115         """Multiple domains are not supported."""
 1116         self.assertRaises(
 1117             (exception.ValidationError, exception.DomainNotFound),
 1118             super(BaseLDAPIdentity,
 1119                   self).test_remove_foreign_assignments_when_deleting_a_domain)
 1120 
 1121 
 1122 class LDAPIdentity(BaseLDAPIdentity):
 1123 
 1124     def assert_backends(self):
 1125         _assert_backends(self,
 1126                          assignment='sql',
 1127                          identity='ldap')
 1128 
 1129     def test_list_domains(self):
 1130         domains = PROVIDERS.resource_api.list_domains()
 1131         default_domain = unit.new_domain_ref(
 1132             description=u'The default domain',
 1133             id=CONF.identity.default_domain_id,
 1134             name=u'Default')
 1135         self.assertEqual([default_domain], domains)
 1136 
 1137     def test_authenticate_wrong_credentials(self):
 1138         self.assertRaises(exception.LDAPInvalidCredentialsError,
 1139                           PROVIDERS.identity_api.driver.user.get_connection,
 1140                           user='demo',
 1141                           password='demo',
 1142                           end_user_auth=True)
 1143 
 1144     def test_configurable_allowed_project_actions(self):
 1145         domain = self._get_domain_fixture()
 1146         project = unit.new_project_ref(domain_id=domain['id'])
 1147         project = PROVIDERS.resource_api.create_project(project['id'], project)
 1148         project_ref = PROVIDERS.resource_api.get_project(project['id'])
 1149         self.assertEqual(project['id'], project_ref['id'])
 1150 
 1151         project['enabled'] = False
 1152         PROVIDERS.resource_api.update_project(project['id'], project)
 1153 
 1154         PROVIDERS.resource_api.delete_project(project['id'])
 1155         self.assertRaises(exception.ProjectNotFound,
 1156                           PROVIDERS.resource_api.get_project,
 1157                           project['id'])
 1158 
 1159     def test_user_enable_attribute_mask(self):
 1160         self.config_fixture.config(group='ldap', user_enabled_mask=2,
 1161                                    user_enabled_default='512')
 1162         self.ldapdb.clear()
 1163         self.load_backends()
 1164 
 1165         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 1166 
 1167         user_ref = PROVIDERS.identity_api.create_user(user)
 1168 
 1169         # Use assertIs rather than assertTrue because assertIs will assert the
 1170         # value is a Boolean as expected.
 1171         self.assertIs(True, user_ref['enabled'])
 1172         self.assertNotIn('enabled_nomask', user_ref)
 1173 
 1174         enabled_vals = self.get_user_enabled_vals(user_ref)
 1175         self.assertEqual([512], enabled_vals)
 1176 
 1177         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 1178         self.assertIs(True, user_ref['enabled'])
 1179         self.assertNotIn('enabled_nomask', user_ref)
 1180 
 1181         user['enabled'] = False
 1182         user_ref = PROVIDERS.identity_api.update_user(user_ref['id'], user)
 1183         self.assertIs(False, user_ref['enabled'])
 1184         self.assertNotIn('enabled_nomask', user_ref)
 1185 
 1186         enabled_vals = self.get_user_enabled_vals(user_ref)
 1187         self.assertEqual([514], enabled_vals)
 1188 
 1189         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 1190         self.assertIs(False, user_ref['enabled'])
 1191         self.assertNotIn('enabled_nomask', user_ref)
 1192 
 1193         user['enabled'] = True
 1194         user_ref = PROVIDERS.identity_api.update_user(user_ref['id'], user)
 1195         self.assertIs(True, user_ref['enabled'])
 1196         self.assertNotIn('enabled_nomask', user_ref)
 1197 
 1198         enabled_vals = self.get_user_enabled_vals(user_ref)
 1199         self.assertEqual([512], enabled_vals)
 1200 
 1201         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 1202         self.assertIs(True, user_ref['enabled'])
 1203         self.assertNotIn('enabled_nomask', user_ref)
 1204 
 1205     def test_user_enabled_invert(self):
 1206         self.config_fixture.config(group='ldap', user_enabled_invert=True,
 1207                                    user_enabled_default='False')
 1208         self.ldapdb.clear()
 1209         self.load_backends()
 1210 
 1211         user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 1212 
 1213         user2 = self.new_user_ref(enabled=False,
 1214                                   domain_id=CONF.identity.default_domain_id)
 1215 
 1216         user3 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 1217 
 1218         # Ensure that the LDAP attribute is False for a newly created
 1219         # enabled user.
 1220         user_ref = PROVIDERS.identity_api.create_user(user1)
 1221         self.assertIs(True, user_ref['enabled'])
 1222         enabled_vals = self.get_user_enabled_vals(user_ref)
 1223         self.assertEqual([False], enabled_vals)
 1224         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 1225         self.assertIs(True, user_ref['enabled'])
 1226 
 1227         # Ensure that the LDAP attribute is True for a disabled user.
 1228         user1['enabled'] = False
 1229         user_ref = PROVIDERS.identity_api.update_user(user_ref['id'], user1)
 1230         self.assertIs(False, user_ref['enabled'])
 1231         enabled_vals = self.get_user_enabled_vals(user_ref)
 1232         self.assertEqual([True], enabled_vals)
 1233 
 1234         # Enable the user and ensure that the LDAP attribute is True again.
 1235         user1['enabled'] = True
 1236         user_ref = PROVIDERS.identity_api.update_user(user_ref['id'], user1)
 1237         self.assertIs(True, user_ref['enabled'])
 1238         enabled_vals = self.get_user_enabled_vals(user_ref)
 1239         self.assertEqual([False], enabled_vals)
 1240 
 1241         # Ensure that the LDAP attribute is True for a newly created
 1242         # disabled user.
 1243         user_ref = PROVIDERS.identity_api.create_user(user2)
 1244         self.assertIs(False, user_ref['enabled'])
 1245         enabled_vals = self.get_user_enabled_vals(user_ref)
 1246         self.assertEqual([True], enabled_vals)
 1247         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 1248         self.assertIs(False, user_ref['enabled'])
 1249 
 1250         # Ensure that the LDAP attribute is inverted for a newly created
 1251         # user when the user_enabled_default setting is used.
 1252         user_ref = PROVIDERS.identity_api.create_user(user3)
 1253         self.assertIs(True, user_ref['enabled'])
 1254         enabled_vals = self.get_user_enabled_vals(user_ref)
 1255         self.assertEqual([False], enabled_vals)
 1256         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 1257         self.assertIs(True, user_ref['enabled'])
 1258 
 1259     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1260     def test_user_enabled_invert_default_str_value(self, mock_ldap_get):
 1261         self.config_fixture.config(group='ldap', user_enabled_invert=True,
 1262                                    user_enabled_default='False')
 1263         # Mock the search results to return an entry with
 1264         # no enabled value.
 1265         mock_ldap_get.return_value = (
 1266             'cn=junk,dc=example,dc=com',
 1267             {
 1268                 'sn': [uuid.uuid4().hex],
 1269                 'email': [uuid.uuid4().hex],
 1270                 'cn': ['junk']
 1271             }
 1272         )
 1273 
 1274         user_api = identity.backends.ldap.UserApi(CONF)
 1275         user_ref = user_api.get('junk')
 1276         # Ensure that the model enabled attribute is inverted
 1277         # from the resource default.
 1278         self.assertIs(True, user_ref['enabled'])
 1279 
 1280     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
 1281     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'search_s')
 1282     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
 1283     def test_filter_ldap_result_by_attr(self, mock_simple_bind_s,
 1284                                         mock_search_s, mock_connect):
 1285 
 1286         # Mock the ldap search results to return user entries with
 1287         # user_name_attribute('sn') value has emptyspaces, emptystring
 1288         # and attibute itself is not set.
 1289         mock_search_s.return_value = [(
 1290             'sn=junk1,dc=example,dc=com',
 1291             {
 1292                 'cn': [uuid.uuid4().hex],
 1293                 'email': [uuid.uuid4().hex],
 1294                 'sn': ['junk1']
 1295             }
 1296         ),
 1297             (
 1298             '',
 1299             {
 1300                 'cn': [uuid.uuid4().hex],
 1301                 'email': [uuid.uuid4().hex],
 1302             }
 1303         ),
 1304             (
 1305             'sn=,dc=example,dc=com',
 1306             {
 1307                 'cn': [uuid.uuid4().hex],
 1308                 'email': [uuid.uuid4().hex],
 1309                 'sn': ['']
 1310             }
 1311         ),
 1312             (
 1313             'sn=   ,dc=example,dc=com',
 1314             {
 1315                 'cn': [uuid.uuid4().hex],
 1316                 'email': [uuid.uuid4().hex],
 1317                 'sn': ['   ']
 1318             }
 1319         )]
 1320 
 1321         user_api = identity.backends.ldap.UserApi(CONF)
 1322         user_refs = user_api.get_all()
 1323         # validate that keystone.identity.backends.ldap.common.BaseLdap.
 1324         # _filter_ldap_result_by_attr() method filtered the ldap query results
 1325         # whose name attribute values has emptyspaces, emptystring
 1326         # and attibute itself is not set.
 1327         self.assertEqual(1, len(user_refs))
 1328 
 1329         self.assertEqual('junk1', user_refs[0]['name'])
 1330         self.assertEqual('sn=junk1,dc=example,dc=com', user_refs[0]['dn'])
 1331 
 1332     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
 1333     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'search_s')
 1334     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
 1335     def test_filter_ldap_result_with_case_sensitive_attr(self,
 1336                                                          mock_simple_bind_s,
 1337                                                          mock_search_s,
 1338                                                          mock_connect):
 1339         # Mock the ldap search results to return user entries
 1340         # irrespective of lowercase and uppercase characters in
 1341         # ldap_result attribute keys e.g. {'Sn': ['junk1']} with
 1342         # user_name_attribute('sn')
 1343         mock_search_s.return_value = [(
 1344             'sn=junk1,dc=example,dc=com',
 1345             {
 1346                 'cn': [uuid.uuid4().hex],
 1347                 'email': [uuid.uuid4().hex],
 1348                 'sN': ['junk1']
 1349             }
 1350         ),
 1351             (
 1352             'sn=junk1,dc=example,dc=com',
 1353             {
 1354                 'cn': [uuid.uuid4().hex],
 1355                 'email': [uuid.uuid4().hex],
 1356                 'Sn': ['junk1']
 1357             }
 1358         ),
 1359             (
 1360             'sn=junk1,dc=example,dc=com',
 1361             {
 1362                 'cn': [uuid.uuid4().hex],
 1363                 'email': [uuid.uuid4().hex],
 1364                 'sn': ['    ']
 1365             }
 1366         )
 1367         ]
 1368 
 1369         user_api = identity.backends.ldap.UserApi(CONF)
 1370         user_refs = user_api.get_all()
 1371         # validate that keystone.identity.backends.ldap.common.BaseLdap.
 1372         # _filter_ldap_result_by_attr() method filtered the ldap query results
 1373         # whose name attribute keys having case insensitive characters.
 1374         self.assertEqual(2, len(user_refs))
 1375 
 1376         self.assertEqual('junk1', user_refs[0]['name'])
 1377         self.assertEqual('sn=junk1,dc=example,dc=com', user_refs[0]['dn'])
 1378 
 1379     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1380     def test_user_enabled_attribute_handles_expired(self, mock_ldap_get):
 1381         # If using 'passwordisexpired' as enabled attribute, and inverting it,
 1382         # Then an unauthorized user (expired password) should not be enabled.
 1383         self.config_fixture.config(group='ldap', user_enabled_invert=True,
 1384                                    user_enabled_attribute='passwordisexpired')
 1385         mock_ldap_get.return_value = (
 1386             u'uid=123456789,c=us,ou=our_ldap,o=acme.com',
 1387             {
 1388                 'uid': [123456789],
 1389                 'mail': ['shaun@acme.com'],
 1390                 'passwordisexpired': ['TRUE'],
 1391                 'cn': ['uid=123456789,c=us,ou=our_ldap,o=acme.com']
 1392             }
 1393         )
 1394 
 1395         user_api = identity.backends.ldap.UserApi(CONF)
 1396         user_ref = user_api.get('123456789')
 1397         self.assertIs(False, user_ref['enabled'])
 1398 
 1399     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1400     def test_user_enabled_attribute_handles_utf8(self, mock_ldap_get):
 1401         # If using 'passwordisexpired' as enabled attribute, and inverting it,
 1402         # and the result is utf8 encoded, then the an authorized user should
 1403         # be enabled.
 1404         self.config_fixture.config(group='ldap', user_enabled_invert=True,
 1405                                    user_enabled_attribute='passwordisexpired')
 1406         mock_ldap_get.return_value = (
 1407             u'uid=123456789,c=us,ou=our_ldap,o=acme.com',
 1408             {
 1409                 'uid': [123456789],
 1410                 'mail': [u'shaun@acme.com'],
 1411                 'passwordisexpired': [u'false'],
 1412                 'cn': [u'uid=123456789,c=us,ou=our_ldap,o=acme.com']
 1413             }
 1414         )
 1415 
 1416         user_api = identity.backends.ldap.UserApi(CONF)
 1417         user_ref = user_api.get('123456789')
 1418         self.assertIs(True, user_ref['enabled'])
 1419 
 1420     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
 1421     def test_user_api_get_connection_no_user_password(self, mocked_method):
 1422         """Bind anonymously when the user and password are blank."""
 1423         # Ensure the username/password are in-fact blank
 1424         self.config_fixture.config(group='ldap', user=None, password=None)
 1425         user_api = identity.backends.ldap.UserApi(CONF)
 1426         user_api.get_connection(user=None, password=None)
 1427         self.assertTrue(mocked_method.called)
 1428 
 1429     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
 1430     def test_chase_referrals_off(self, mocked_fakeldap):
 1431         self.config_fixture.config(
 1432             group='ldap',
 1433             url='fake://memory',
 1434             chase_referrals=False)
 1435         user_api = identity.backends.ldap.UserApi(CONF)
 1436         user_api.get_connection(user=None, password=None)
 1437 
 1438         # The last call_arg should be a dictionary and should contain
 1439         # chase_referrals. Check to make sure the value of chase_referrals
 1440         # is as expected.
 1441         self.assertFalse(mocked_fakeldap.call_args[-1]['chase_referrals'])
 1442 
 1443     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
 1444     def test_chase_referrals_on(self, mocked_fakeldap):
 1445         self.config_fixture.config(
 1446             group='ldap',
 1447             url='fake://memory',
 1448             chase_referrals=True)
 1449         user_api = identity.backends.ldap.UserApi(CONF)
 1450         user_api.get_connection(user=None, password=None)
 1451 
 1452         # The last call_arg should be a dictionary and should contain
 1453         # chase_referrals. Check to make sure the value of chase_referrals
 1454         # is as expected.
 1455         self.assertTrue(mocked_fakeldap.call_args[-1]['chase_referrals'])
 1456 
 1457     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'connect')
 1458     def test_debug_level_set(self, mocked_fakeldap):
 1459         level = 12345
 1460         self.config_fixture.config(
 1461             group='ldap',
 1462             url='fake://memory',
 1463             debug_level=level)
 1464         user_api = identity.backends.ldap.UserApi(CONF)
 1465         user_api.get_connection(user=None, password=None)
 1466 
 1467         # The last call_arg should be a dictionary and should contain
 1468         # debug_level. Check to make sure the value of debug_level
 1469         # is as expected.
 1470         self.assertEqual(level, mocked_fakeldap.call_args[-1]['debug_level'])
 1471 
 1472     def test_user_extra_attribute_mapping(self):
 1473         self.config_fixture.config(
 1474             group='ldap',
 1475             user_additional_attribute_mapping=['description:name'])
 1476         self.load_backends()
 1477         user = self.new_user_ref(name='EXTRA_ATTRIBUTES',
 1478                                  password='extra',
 1479                                  domain_id=CONF.identity.default_domain_id)
 1480         user = PROVIDERS.identity_api.create_user(user)
 1481         dn, attrs = PROVIDERS.identity_api.driver.user._ldap_get(user['id'])
 1482         self.assertThat([user['name']], matchers.Equals(attrs['description']))
 1483 
 1484     def test_user_description_attribute_mapping(self):
 1485         self.config_fixture.config(
 1486             group='ldap',
 1487             user_description_attribute='displayName')
 1488         self.load_backends()
 1489 
 1490         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id,
 1491                                  displayName=uuid.uuid4().hex)
 1492         description = user['displayName']
 1493         user = PROVIDERS.identity_api.create_user(user)
 1494         res = PROVIDERS.identity_api.driver.user.get_all()
 1495 
 1496         new_user = [u for u in res if u['id'] == user['id']][0]
 1497         self.assertThat(new_user['description'], matchers.Equals(description))
 1498 
 1499     def test_user_extra_attribute_mapping_description_is_returned(self):
 1500         # Given a mapping like description:description, the description is
 1501         # returned.
 1502 
 1503         self.config_fixture.config(
 1504             group='ldap',
 1505             user_additional_attribute_mapping=['description:description'])
 1506         self.load_backends()
 1507 
 1508         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id,
 1509                                  description=uuid.uuid4().hex)
 1510         description = user['description']
 1511         user = PROVIDERS.identity_api.create_user(user)
 1512         res = PROVIDERS.identity_api.driver.user.get_all()
 1513 
 1514         new_user = [u for u in res if u['id'] == user['id']][0]
 1515         self.assertThat(new_user['description'], matchers.Equals(description))
 1516 
 1517     def test_user_with_missing_id(self):
 1518         # create a user that doesn't have the id attribute
 1519         ldap_ = PROVIDERS.identity_api.driver.user.get_connection()
 1520         # `sn` is used for the attribute in the DN because it's allowed by
 1521         # the entry's objectclasses so that this test could conceivably run in
 1522         # the live tests.
 1523         ldap_id_field = 'sn'
 1524         ldap_id_value = uuid.uuid4().hex
 1525         dn = '%s=%s,ou=Users,cn=example,cn=com' % (ldap_id_field,
 1526                                                    ldap_id_value)
 1527         modlist = [('objectClass', ['person', 'inetOrgPerson']),
 1528                    (ldap_id_field, [ldap_id_value]),
 1529                    ('mail', ['email@example.com']),
 1530                    ('userPassword', [uuid.uuid4().hex])]
 1531         ldap_.add_s(dn, modlist)
 1532 
 1533         # make sure the user doesn't break other users
 1534         users = PROVIDERS.identity_api.driver.user.get_all()
 1535         self.assertThat(users, matchers.HasLength(len(default_fixtures.USERS)))
 1536 
 1537     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1538     def test_user_mixed_case_attribute(self, mock_ldap_get):
 1539         # Mock the search results to return attribute names
 1540         # with unexpected case.
 1541         mock_ldap_get.return_value = (
 1542             'cn=junk,dc=example,dc=com',
 1543             {
 1544                 'sN': [uuid.uuid4().hex],
 1545                 'MaIl': [uuid.uuid4().hex],
 1546                 'cn': ['junk']
 1547             }
 1548         )
 1549         user = PROVIDERS.identity_api.get_user('junk')
 1550         self.assertEqual(mock_ldap_get.return_value[1]['sN'][0],
 1551                          user['name'])
 1552         self.assertEqual(mock_ldap_get.return_value[1]['MaIl'][0],
 1553                          user['email'])
 1554 
 1555     def test_parse_extra_attribute_mapping(self):
 1556         option_list = ['description:name', 'gecos:password',
 1557                        'fake:invalid', 'invalid1', 'invalid2:',
 1558                        'description:name:something']
 1559         mapping = PROVIDERS.identity_api.driver.user._parse_extra_attrs(
 1560             option_list
 1561         )
 1562         expected_dict = {'description': 'name', 'gecos': 'password',
 1563                          'fake': 'invalid', 'invalid2': ''}
 1564         self.assertDictEqual(expected_dict, mapping)
 1565 
 1566     def test_create_domain(self):
 1567         domain = unit.new_domain_ref()
 1568         self.assertRaises(exception.ValidationError,
 1569                           PROVIDERS.resource_api.create_domain,
 1570                           domain['id'],
 1571                           domain)
 1572 
 1573     @unit.skip_if_no_multiple_domains_support
 1574     def test_create_domain_case_sensitivity(self):
 1575         # domains are read-only, so case sensitivity isn't an issue
 1576         ref = unit.new_domain_ref()
 1577         self.assertRaises(exception.Forbidden,
 1578                           PROVIDERS.resource_api.create_domain,
 1579                           ref['id'],
 1580                           ref)
 1581 
 1582     def test_domain_rename_invalidates_get_domain_by_name_cache(self):
 1583         parent = super(LDAPIdentity, self)
 1584         self.assertRaises(
 1585             exception.Forbidden,
 1586             parent.test_domain_rename_invalidates_get_domain_by_name_cache)
 1587 
 1588     def test_project_rename_invalidates_get_project_by_name_cache(self):
 1589         parent = super(LDAPIdentity, self)
 1590         self.assertRaises(
 1591             exception.Forbidden,
 1592             parent.test_project_rename_invalidates_get_project_by_name_cache)
 1593 
 1594     def test_project_crud(self):
 1595         # NOTE(topol): LDAP implementation does not currently support the
 1596         #              updating of a project name so this method override
 1597         #              provides a different update test
 1598         project = unit.new_project_ref(
 1599             domain_id=CONF.identity.default_domain_id)
 1600 
 1601         project = PROVIDERS.resource_api.create_project(project['id'], project)
 1602         project_ref = PROVIDERS.resource_api.get_project(project['id'])
 1603 
 1604         self.assertDictEqual(project, project_ref)
 1605 
 1606         project['description'] = uuid.uuid4().hex
 1607         PROVIDERS.resource_api.update_project(project['id'], project)
 1608         project_ref = PROVIDERS.resource_api.get_project(project['id'])
 1609         self.assertDictEqual(project, project_ref)
 1610 
 1611         PROVIDERS.resource_api.delete_project(project['id'])
 1612         self.assertRaises(exception.ProjectNotFound,
 1613                           PROVIDERS.resource_api.get_project,
 1614                           project['id'])
 1615 
 1616     @unit.skip_if_cache_disabled('assignment')
 1617     def test_cache_layer_project_crud(self):
 1618         # NOTE(morganfainberg): LDAP implementation does not currently support
 1619         # updating project names.  This method override provides a different
 1620         # update test.
 1621         project = unit.new_project_ref(
 1622             domain_id=CONF.identity.default_domain_id)
 1623         project_id = project['id']
 1624         # Create a project
 1625         project = PROVIDERS.resource_api.create_project(project_id, project)
 1626         PROVIDERS.resource_api.get_project(project_id)
 1627         updated_project = copy.deepcopy(project)
 1628         updated_project['description'] = uuid.uuid4().hex
 1629         # Update project, bypassing resource manager
 1630         PROVIDERS.resource_api.driver.update_project(
 1631             project_id, updated_project
 1632         )
 1633         # Verify get_project still returns the original project_ref
 1634         self.assertDictContainsSubset(
 1635             project, PROVIDERS.resource_api.get_project(project_id))
 1636         # Invalidate cache
 1637         PROVIDERS.resource_api.get_project.invalidate(
 1638             PROVIDERS.resource_api, project_id
 1639         )
 1640         # Verify get_project now returns the new project
 1641         self.assertDictContainsSubset(
 1642             updated_project,
 1643             PROVIDERS.resource_api.get_project(project_id))
 1644         # Update project using the resource_api manager back to original
 1645         PROVIDERS.resource_api.update_project(project['id'], project)
 1646         # Verify get_project returns the original project_ref
 1647         self.assertDictContainsSubset(
 1648             project, PROVIDERS.resource_api.get_project(project_id))
 1649         # Delete project bypassing resource_api
 1650         PROVIDERS.resource_api.driver.delete_project(project_id)
 1651         # Verify get_project still returns the project_ref
 1652         self.assertDictContainsSubset(
 1653             project, PROVIDERS.resource_api.get_project(project_id))
 1654         # Invalidate cache
 1655         PROVIDERS.resource_api.get_project.invalidate(
 1656             PROVIDERS.resource_api, project_id
 1657         )
 1658         # Verify ProjectNotFound now raised
 1659         self.assertRaises(exception.ProjectNotFound,
 1660                           PROVIDERS.resource_api.get_project,
 1661                           project_id)
 1662         # recreate project
 1663         PROVIDERS.resource_api.create_project(project_id, project)
 1664         PROVIDERS.resource_api.get_project(project_id)
 1665         # delete project
 1666         PROVIDERS.resource_api.delete_project(project_id)
 1667         # Verify ProjectNotFound is raised
 1668         self.assertRaises(exception.ProjectNotFound,
 1669                           PROVIDERS.resource_api.get_project,
 1670                           project_id)
 1671 
 1672     def test_update_is_domain_field(self):
 1673         domain = self._get_domain_fixture()
 1674         project = unit.new_project_ref(domain_id=domain['id'])
 1675         project = PROVIDERS.resource_api.create_project(project['id'], project)
 1676 
 1677         # Try to update the is_domain field to True
 1678         project['is_domain'] = True
 1679         self.assertRaises(exception.ValidationError,
 1680                           PROVIDERS.resource_api.update_project,
 1681                           project['id'], project)
 1682 
 1683     def test_multi_role_grant_by_user_group_on_project_domain(self):
 1684         # This is a partial implementation of the standard test that
 1685         # is defined in unit.assignment.test_backends.py.  It omits
 1686         # both domain and group grants. since neither of these are
 1687         # yet supported by the ldap backend.
 1688 
 1689         role_list = []
 1690         for _ in range(2):
 1691             role = unit.new_role_ref()
 1692             PROVIDERS.role_api.create_role(role['id'], role)
 1693             role_list.append(role)
 1694 
 1695         user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 1696         user1 = PROVIDERS.identity_api.create_user(user1)
 1697         project1 = unit.new_project_ref(
 1698             domain_id=CONF.identity.default_domain_id)
 1699         PROVIDERS.resource_api.create_project(project1['id'], project1)
 1700 
 1701         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1702             user_id=user1['id'],
 1703             project_id=project1['id'],
 1704             role_id=role_list[0]['id'])
 1705         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1706             user_id=user1['id'],
 1707             project_id=project1['id'],
 1708             role_id=role_list[1]['id'])
 1709 
 1710         # Although list_grants are not yet supported, we can test the
 1711         # alternate way of getting back lists of grants, where user
 1712         # and group roles are combined.  Only directly assigned user
 1713         # roles are available, since group grants are not yet supported
 1714 
 1715         combined_list = (
 1716             PROVIDERS.assignment_api.get_roles_for_user_and_project(
 1717                 user1['id'], project1['id']
 1718             )
 1719         )
 1720         self.assertEqual(2, len(combined_list))
 1721         self.assertIn(role_list[0]['id'], combined_list)
 1722         self.assertIn(role_list[1]['id'], combined_list)
 1723 
 1724         # Finally, although domain roles are not implemented, check we can
 1725         # issue the combined get roles call with benign results, since thus is
 1726         # used in token generation
 1727 
 1728         combined_role_list = (
 1729             PROVIDERS.assignment_api.get_roles_for_user_and_domain(
 1730                 user1['id'], CONF.identity.default_domain_id
 1731             )
 1732         )
 1733         self.assertEqual(0, len(combined_role_list))
 1734 
 1735     def test_get_default_domain_by_name(self):
 1736         domain = self._get_domain_fixture()
 1737 
 1738         domain_ref = PROVIDERS.resource_api.get_domain_by_name(domain['name'])
 1739         self.assertEqual(domain_ref, domain)
 1740 
 1741     def test_base_ldap_connection_deref_option(self):
 1742         def get_conn(deref_name):
 1743             self.config_fixture.config(group='ldap',
 1744                                        alias_dereferencing=deref_name)
 1745             base_ldap = common_ldap.BaseLdap(CONF)
 1746             return base_ldap.get_connection()
 1747 
 1748         conn = get_conn('default')
 1749         self.assertEqual(ldap.get_option(ldap.OPT_DEREF),
 1750                          conn.get_option(ldap.OPT_DEREF))
 1751 
 1752         conn = get_conn('always')
 1753         self.assertEqual(ldap.DEREF_ALWAYS,
 1754                          conn.get_option(ldap.OPT_DEREF))
 1755 
 1756         conn = get_conn('finding')
 1757         self.assertEqual(ldap.DEREF_FINDING,
 1758                          conn.get_option(ldap.OPT_DEREF))
 1759 
 1760         conn = get_conn('never')
 1761         self.assertEqual(ldap.DEREF_NEVER,
 1762                          conn.get_option(ldap.OPT_DEREF))
 1763 
 1764         conn = get_conn('searching')
 1765         self.assertEqual(ldap.DEREF_SEARCHING,
 1766                          conn.get_option(ldap.OPT_DEREF))
 1767 
 1768     def test_list_users_no_dn(self):
 1769         users = PROVIDERS.identity_api.list_users()
 1770         self.assertEqual(len(default_fixtures.USERS), len(users))
 1771         user_ids = set(user['id'] for user in users)
 1772         expected_user_ids = set(getattr(self, 'user_%s' % user['name'])['id']
 1773                                 for user in default_fixtures.USERS)
 1774         for user_ref in users:
 1775             self.assertNotIn('dn', user_ref)
 1776         self.assertEqual(expected_user_ids, user_ids)
 1777 
 1778     def test_list_groups_no_dn(self):
 1779         # Create some test groups.
 1780         domain = self._get_domain_fixture()
 1781         expected_group_ids = []
 1782         numgroups = 3
 1783         for _ in range(numgroups):
 1784             group = unit.new_group_ref(domain_id=domain['id'])
 1785             group = PROVIDERS.identity_api.create_group(group)
 1786             expected_group_ids.append(group['id'])
 1787         # Fetch the test groups and ensure that they don't contain a dn.
 1788         groups = PROVIDERS.identity_api.list_groups()
 1789         self.assertEqual(numgroups, len(groups))
 1790         group_ids = set(group['id'] for group in groups)
 1791         for group_ref in groups:
 1792             self.assertNotIn('dn', group_ref)
 1793         self.assertEqual(set(expected_group_ids), group_ids)
 1794 
 1795     def test_list_groups_for_user_no_dn(self):
 1796         # Create a test user.
 1797         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 1798         user = PROVIDERS.identity_api.create_user(user)
 1799         # Create some test groups and add the test user as a member.
 1800         domain = self._get_domain_fixture()
 1801         expected_group_ids = []
 1802         numgroups = 3
 1803         for _ in range(numgroups):
 1804             group = unit.new_group_ref(domain_id=domain['id'])
 1805             group = PROVIDERS.identity_api.create_group(group)
 1806             expected_group_ids.append(group['id'])
 1807             PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
 1808         # Fetch the groups for the test user
 1809         # and ensure they don't contain a dn.
 1810         groups = PROVIDERS.identity_api.list_groups_for_user(user['id'])
 1811         self.assertEqual(numgroups, len(groups))
 1812         group_ids = set(group['id'] for group in groups)
 1813         for group_ref in groups:
 1814             self.assertNotIn('dn', group_ref)
 1815         self.assertEqual(set(expected_group_ids), group_ids)
 1816 
 1817     def test_user_id_attribute_in_create(self):
 1818         driver = PROVIDERS.identity_api._select_identity_driver(
 1819             CONF.identity.default_domain_id)
 1820         driver.user.id_attr = 'mail'
 1821 
 1822         user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 1823         user = PROVIDERS.identity_api.create_user(user)
 1824         user_ref = PROVIDERS.identity_api.get_user(user['id'])
 1825         # 'email' attribute should've created because it is also being used
 1826         # as user_id
 1827         self.assertEqual(user_ref['id'], user_ref['email'])
 1828 
 1829     def test_user_id_attribute_map(self):
 1830         driver = PROVIDERS.identity_api._select_identity_driver(
 1831             CONF.identity.default_domain_id)
 1832         driver.user.id_attr = 'mail'
 1833 
 1834         user_ref = PROVIDERS.identity_api.get_user(self.user_foo['email'])
 1835         # the user_id_attribute map should be honored, which means
 1836         # user_ref['id'] should contains the email attribute
 1837         self.assertEqual(self.user_foo['email'], user_ref['id'])
 1838 
 1839     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1840     def test_get_multivalued_attribute_id_from_dn(self,
 1841                                                   mock_ldap_get):
 1842         driver = PROVIDERS.identity_api._select_identity_driver(
 1843             CONF.identity.default_domain_id)
 1844         driver.user.id_attr = 'mail'
 1845 
 1846         # make 'email' multivalued so we can test the error condition
 1847         email1 = uuid.uuid4().hex
 1848         email2 = uuid.uuid4().hex
 1849         # Mock the ldap search results to return user entries with
 1850         # user_name_attribute('sn') value has emptyspaces, emptystring
 1851         # and attibute itself is not set.
 1852         mock_ldap_get.return_value = (
 1853             'cn=users,dc=example,dc=com',
 1854             {
 1855                 'mail': [email1, email2],
 1856             }
 1857         )
 1858 
 1859         # This is not a valid scenario, since we do not support multiple value
 1860         # attribute id on DN.
 1861         self.assertRaises(exception.NotFound,
 1862                           PROVIDERS.identity_api.get_user, email1)
 1863 
 1864     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1865     def test_raise_not_found_dn_for_multivalued_attribute_id(self,
 1866                                                              mock_ldap_get):
 1867         driver = PROVIDERS.identity_api._select_identity_driver(
 1868             CONF.identity.default_domain_id)
 1869         driver.user.id_attr = 'mail'
 1870 
 1871         # make 'email' multivalued so we can test the error condition
 1872         email1 = uuid.uuid4().hex
 1873         email2 = uuid.uuid4().hex
 1874         mock_ldap_get.return_value = (
 1875             'cn=nobodycares,dc=example,dc=com',
 1876             {
 1877                 'sn': [uuid.uuid4().hex],
 1878                 'mail': [email1, email2],
 1879                 'cn': 'nobodycares'
 1880             }
 1881         )
 1882 
 1883         # This is not a valid scenario, since we do not support multiple value
 1884         # attribute id on DN.
 1885         self.assertRaises(exception.NotFound,
 1886                           PROVIDERS.identity_api.get_user, email1)
 1887 
 1888     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1889     def test_get_id_not_in_dn(self,
 1890                               mock_ldap_get):
 1891         driver = PROVIDERS.identity_api._select_identity_driver(
 1892             CONF.identity.default_domain_id)
 1893         driver.user.id_attr = 'sAMAccountName'
 1894 
 1895         user_id = uuid.uuid4().hex
 1896         mock_ldap_get.return_value = (
 1897             'cn=someuser,dc=example,dc=com',
 1898             {
 1899                 'cn': 'someuser',
 1900                 'sn': [uuid.uuid4().hex],
 1901                 'sAMAccountName': [user_id],
 1902             }
 1903         )
 1904         user_ref = PROVIDERS.identity_api.get_user(user_id)
 1905         self.assertEqual(user_id, user_ref['id'])
 1906 
 1907     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1908     def test_id_attribute_not_found(self, mock_ldap_get):
 1909         mock_ldap_get.return_value = (
 1910             'cn=nobodycares,dc=example,dc=com',
 1911             {
 1912                 'sn': [uuid.uuid4().hex],
 1913             }
 1914         )
 1915 
 1916         user_api = identity.backends.ldap.UserApi(CONF)
 1917         self.assertRaises(exception.NotFound,
 1918                           user_api.get,
 1919                           'nobodycares')
 1920 
 1921     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1922     def test_user_id_not_in_dn(self, mock_ldap_get):
 1923         driver = PROVIDERS.identity_api._select_identity_driver(
 1924             CONF.identity.default_domain_id)
 1925         driver.user.id_attr = 'uid'
 1926         driver.user.attribute_mapping['name'] = 'cn'
 1927 
 1928         mock_ldap_get.return_value = (
 1929             'foo=bar,dc=example,dc=com',
 1930             {
 1931                 'sn': [uuid.uuid4().hex],
 1932                 'foo': ['bar'],
 1933                 'cn': ['junk'],
 1934                 'uid': ['crap']
 1935             }
 1936         )
 1937         user_ref = PROVIDERS.identity_api.get_user('crap')
 1938         self.assertEqual('crap', user_ref['id'])
 1939         self.assertEqual('junk', user_ref['name'])
 1940 
 1941     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 1942     def test_user_name_in_dn(self, mock_ldap_get):
 1943         driver = PROVIDERS.identity_api._select_identity_driver(
 1944             CONF.identity.default_domain_id)
 1945         driver.user.id_attr = 'SAMAccountName'
 1946         driver.user.attribute_mapping['name'] = 'cn'
 1947 
 1948         mock_ldap_get.return_value = (
 1949             'cn=Foo Bar,dc=example,dc=com',
 1950             {
 1951                 'sn': [uuid.uuid4().hex],
 1952                 'cn': ['Foo Bar'],
 1953                 'SAMAccountName': ['crap']
 1954             }
 1955         )
 1956         user_ref = PROVIDERS.identity_api.get_user('crap')
 1957         self.assertEqual('crap', user_ref['id'])
 1958         self.assertEqual('Foo Bar', user_ref['name'])
 1959 
 1960     def test_identity_manager_catches_forbidden_when_deleting_a_project(self):
 1961         # The identity API registers a callback that listens for notifications
 1962         # that a project has been deleted. When it receives one, it uses the ID
 1963         # and attempts to clear any users who have `default_project_id`
 1964         # attributes associated to that project. Since the LDAP backend is
 1965         # read-only, clearing the `default_project_id` requires a write which
 1966         # isn't possible.
 1967         project = unit.new_project_ref(
 1968             domain_id=CONF.identity.default_domain_id
 1969         )
 1970         project = PROVIDERS.resource_api.create_project(project['id'], project)
 1971         with mock.patch.object(
 1972             ldap_identity.Identity, '_disallow_write'
 1973         ) as mocked:
 1974             mocked.side_effect = exception.Forbidden()
 1975             PROVIDERS.resource_api.delete_project(project['id'])
 1976 
 1977         mocked.assert_called_once()
 1978 
 1979 
 1980 class LDAPLimitTests(unit.TestCase, identity_tests.LimitTests):
 1981     def setUp(self):
 1982         super(LDAPLimitTests, self).setUp()
 1983 
 1984         self.useFixture(ldapdb.LDAPDatabase())
 1985         self.useFixture(database.Database())
 1986         self.load_backends()
 1987         self.load_fixtures(default_fixtures)
 1988         identity_tests.LimitTests.setUp(self)
 1989         _assert_backends(self,
 1990                          assignment='sql',
 1991                          identity='ldap')
 1992 
 1993     def config_overrides(self):
 1994         super(LDAPLimitTests, self).config_overrides()
 1995         self.config_fixture.config(group='identity', driver='ldap')
 1996         self.config_fixture.config(group='identity',
 1997                                    list_limit=len(default_fixtures.USERS) - 1)
 1998 
 1999     def config_files(self):
 2000         config_files = super(LDAPLimitTests, self).config_files()
 2001         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
 2002         return config_files
 2003 
 2004 
 2005 class LDAPIdentityEnabledEmulation(LDAPIdentity, unit.TestCase):
 2006     def setUp(self):
 2007         super(LDAPIdentityEnabledEmulation, self).setUp()
 2008         _assert_backends(self, identity='ldap')
 2009 
 2010     def load_fixtures(self, fixtures):
 2011         # Override super impl since need to create group container.
 2012         super(LDAPIdentity, self).load_fixtures(fixtures)
 2013         for obj in [self.project_bar, self.project_baz, self.user_foo,
 2014                     self.user_two, self.user_badguy]:
 2015             obj.setdefault('enabled', True)
 2016 
 2017     def config_files(self):
 2018         config_files = super(LDAPIdentityEnabledEmulation, self).config_files()
 2019         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
 2020         return config_files
 2021 
 2022     def config_overrides(self):
 2023         super(LDAPIdentityEnabledEmulation, self).config_overrides()
 2024         self.config_fixture.config(group='ldap',
 2025                                    user_enabled_emulation=True)
 2026 
 2027     def test_project_crud(self):
 2028         # NOTE(topol): LDAPIdentityEnabledEmulation will create an
 2029         #              enabled key in the project dictionary so this
 2030         #              method override handles this side-effect
 2031         project = unit.new_project_ref(
 2032             domain_id=CONF.identity.default_domain_id)
 2033 
 2034         project = PROVIDERS.resource_api.create_project(project['id'], project)
 2035         project_ref = PROVIDERS.resource_api.get_project(project['id'])
 2036 
 2037         # PROVIDERS.resource_api.create_project adds an enabled
 2038         # key with a value of True when LDAPIdentityEnabledEmulation
 2039         # is used so we now add this expected key to the project dictionary
 2040         project['enabled'] = True
 2041         self.assertDictEqual(project, project_ref)
 2042 
 2043         project['description'] = uuid.uuid4().hex
 2044         PROVIDERS.resource_api.update_project(project['id'], project)
 2045         project_ref = PROVIDERS.resource_api.get_project(project['id'])
 2046         self.assertDictEqual(project, project_ref)
 2047 
 2048         PROVIDERS.resource_api.delete_project(project['id'])
 2049         self.assertRaises(exception.ProjectNotFound,
 2050                           PROVIDERS.resource_api.get_project,
 2051                           project['id'])
 2052 
 2053     def test_user_auth_emulated(self):
 2054         driver = PROVIDERS.identity_api._select_identity_driver(
 2055             CONF.identity.default_domain_id)
 2056         driver.user.enabled_emulation_dn = 'cn=test,dc=test'
 2057         with self.make_request():
 2058             PROVIDERS.identity_api.authenticate(
 2059                 user_id=self.user_foo['id'],
 2060                 password=self.user_foo['password'])
 2061 
 2062     def test_user_enable_attribute_mask(self):
 2063         self.skip_test_overrides(
 2064             "Enabled emulation conflicts with enabled mask")
 2065 
 2066     def test_user_enabled_use_group_config(self):
 2067         # Establish enabled-emulation group name to later query its members
 2068         group_name = 'enabled_users'
 2069         driver = PROVIDERS.identity_api._select_identity_driver(
 2070             CONF.identity.default_domain_id)
 2071         group_dn = 'cn=%s,%s' % (group_name, driver.group.tree_dn)
 2072 
 2073         self.config_fixture.config(
 2074             group='ldap',
 2075             user_enabled_emulation_use_group_config=True,
 2076             user_enabled_emulation_dn=group_dn,
 2077             group_name_attribute='cn',
 2078             group_member_attribute='uniqueMember',
 2079             group_objectclass='groupOfUniqueNames')
 2080         self.ldapdb.clear()
 2081         self.load_backends()
 2082 
 2083         # Create a user and ensure they are enabled.
 2084         user1 = unit.new_user_ref(enabled=True,
 2085                                   domain_id=CONF.identity.default_domain_id)
 2086         user_ref = PROVIDERS.identity_api.create_user(user1)
 2087         self.assertIs(True, user_ref['enabled'])
 2088 
 2089         # Get a user and ensure they are enabled.
 2090         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 2091         self.assertIs(True, user_ref['enabled'])
 2092 
 2093         # Ensure state matches the group config
 2094         group_ref = PROVIDERS.identity_api.get_group_by_name(
 2095             group_name, CONF.identity.default_domain_id)
 2096         PROVIDERS.identity_api.check_user_in_group(
 2097             user_ref['id'], group_ref['id'])
 2098 
 2099     def test_user_enabled_use_group_config_with_ids(self):
 2100         # Establish enabled-emulation group name to later query its members
 2101         group_name = 'enabled_users'
 2102         driver = PROVIDERS.identity_api._select_identity_driver(
 2103             CONF.identity.default_domain_id)
 2104         group_dn = 'cn=%s,%s' % (group_name, driver.group.tree_dn)
 2105 
 2106         self.config_fixture.config(
 2107             group='ldap',
 2108             user_enabled_emulation_use_group_config=True,
 2109             user_enabled_emulation_dn=group_dn,
 2110             group_name_attribute='cn',
 2111             group_member_attribute='memberUid',
 2112             group_members_are_ids=True,
 2113             group_objectclass='posixGroup')
 2114         self.ldapdb.clear()
 2115         self.load_backends()
 2116 
 2117         # Create a user and ensure they are enabled.
 2118         user1 = unit.new_user_ref(enabled=True,
 2119                                   domain_id=CONF.identity.default_domain_id)
 2120         user_ref = PROVIDERS.identity_api.create_user(user1)
 2121         self.assertIs(True, user_ref['enabled'])
 2122 
 2123         # Get a user and ensure they are enabled.
 2124         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 2125         self.assertIs(True, user_ref['enabled'])
 2126 
 2127         # Ensure state matches the group config
 2128         group_ref = PROVIDERS.identity_api.get_group_by_name(
 2129             group_name, CONF.identity.default_domain_id)
 2130         PROVIDERS.identity_api.check_user_in_group(
 2131             user_ref['id'], group_ref['id'])
 2132 
 2133     def test_user_enabled_invert(self):
 2134         self.config_fixture.config(group='ldap', user_enabled_invert=True,
 2135                                    user_enabled_default='False')
 2136         self.ldapdb.clear()
 2137         self.load_backends()
 2138 
 2139         user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 2140 
 2141         user2 = self.new_user_ref(enabled=False,
 2142                                   domain_id=CONF.identity.default_domain_id)
 2143 
 2144         user3 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 2145 
 2146         # Ensure that the enabled LDAP attribute is not set for a
 2147         # newly created enabled user.
 2148         user_ref = PROVIDERS.identity_api.create_user(user1)
 2149         self.assertIs(True, user_ref['enabled'])
 2150         self.assertIsNone(self.get_user_enabled_vals(user_ref))
 2151         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 2152         self.assertIs(True, user_ref['enabled'])
 2153 
 2154         # Ensure that an enabled LDAP attribute is not set for a disabled user.
 2155         user1['enabled'] = False
 2156         user_ref = PROVIDERS.identity_api.update_user(user_ref['id'], user1)
 2157         self.assertIs(False, user_ref['enabled'])
 2158         self.assertIsNone(self.get_user_enabled_vals(user_ref))
 2159 
 2160         # Enable the user and ensure that the LDAP enabled
 2161         # attribute is not set.
 2162         user1['enabled'] = True
 2163         user_ref = PROVIDERS.identity_api.update_user(user_ref['id'], user1)
 2164         self.assertIs(True, user_ref['enabled'])
 2165         self.assertIsNone(self.get_user_enabled_vals(user_ref))
 2166 
 2167         # Ensure that the LDAP enabled attribute is not set for a
 2168         # newly created disabled user.
 2169         user_ref = PROVIDERS.identity_api.create_user(user2)
 2170         self.assertIs(False, user_ref['enabled'])
 2171         self.assertIsNone(self.get_user_enabled_vals(user_ref))
 2172         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 2173         self.assertIs(False, user_ref['enabled'])
 2174 
 2175         # Ensure that the LDAP enabled attribute is not set for a newly created
 2176         # user when the user_enabled_default setting is used.
 2177         user_ref = PROVIDERS.identity_api.create_user(user3)
 2178         self.assertIs(True, user_ref['enabled'])
 2179         self.assertIsNone(self.get_user_enabled_vals(user_ref))
 2180         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
 2181         self.assertIs(True, user_ref['enabled'])
 2182 
 2183     def test_user_enabled_invert_default_str_value(self):
 2184         self.skip_test_overrides(
 2185             "N/A: Covered by test_user_enabled_invert")
 2186 
 2187     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get')
 2188     def test_user_enabled_attribute_handles_utf8(self, mock_ldap_get):
 2189         # Since user_enabled_emulation is enabled in this test, this test will
 2190         # fail since it's using user_enabled_invert.
 2191         self.config_fixture.config(group='ldap', user_enabled_invert=True,
 2192                                    user_enabled_attribute='passwordisexpired')
 2193         mock_ldap_get.return_value = (
 2194             u'uid=123456789,c=us,ou=our_ldap,o=acme.com',
 2195             {
 2196                 'uid': [123456789],
 2197                 'mail': [u'shaun@acme.com'],
 2198                 'passwordisexpired': [u'false'],
 2199                 'cn': [u'uid=123456789,c=us,ou=our_ldap,o=acme.com']
 2200             }
 2201         )
 2202 
 2203         user_api = identity.backends.ldap.UserApi(CONF)
 2204         user_ref = user_api.get('123456789')
 2205         self.assertIs(False, user_ref['enabled'])
 2206 
 2207     def test_escape_member_dn(self):
 2208         # The enabled member DN is properly escaped when querying for enabled
 2209         # user.
 2210 
 2211         object_id = uuid.uuid4().hex
 2212         driver = PROVIDERS.identity_api._select_identity_driver(
 2213             CONF.identity.default_domain_id)
 2214 
 2215         # driver.user is the EnabledEmuMixIn implementation used for this test.
 2216         mixin_impl = driver.user
 2217 
 2218         # ) is a special char in a filter and must be escaped.
 2219         sample_dn = 'cn=foo)bar'
 2220         # LDAP requires ) is escaped by being replaced with "\29"
 2221         sample_dn_filter_esc = r'cn=foo\29bar'
 2222 
 2223         # Override the tree_dn, it's used to build the enabled member filter
 2224         mixin_impl.tree_dn = sample_dn
 2225 
 2226         # The filter, which _is_id_enabled is going to build, contains the
 2227         # tree_dn, which better be escaped in this case.
 2228         exp_filter = '(%s=%s=%s,%s)' % (
 2229             mixin_impl.member_attribute, mixin_impl.id_attr, object_id,
 2230             sample_dn_filter_esc)
 2231 
 2232         with mixin_impl.get_connection() as conn:
 2233             m = self.useFixture(
 2234                 fixtures.MockPatchObject(conn, 'search_s')).mock
 2235             mixin_impl._is_id_enabled(object_id, conn)
 2236             # The 3rd argument is the DN.
 2237             self.assertEqual(exp_filter, m.call_args[0][2])
 2238 
 2239 
 2240 class LDAPPosixGroupsTest(LDAPTestSetup, unit.TestCase):
 2241 
 2242     def assert_backends(self):
 2243         _assert_backends(self, identity='ldap')
 2244 
 2245     def config_overrides(self):
 2246         super(LDAPPosixGroupsTest, self).config_overrides()
 2247         self.config_fixture.config(group='identity', driver='ldap')
 2248         self.config_fixture.config(group='ldap', group_members_are_ids=True,
 2249                                    group_member_attribute='memberUID')
 2250 
 2251     def config_files(self):
 2252         config_files = super(LDAPPosixGroupsTest, self).config_files()
 2253         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
 2254         return config_files
 2255 
 2256     def _get_domain_fixture(self):
 2257         """Return the static domain, since domains in LDAP are read-only."""
 2258         return PROVIDERS.resource_api.get_domain(
 2259             CONF.identity.default_domain_id
 2260         )
 2261 
 2262     def test_posix_member_id(self):
 2263         domain = self._get_domain_fixture()
 2264         new_group = unit.new_group_ref(domain_id=domain['id'])
 2265         new_group = PROVIDERS.identity_api.create_group(new_group)
 2266         # Make sure we get an empty list back on a new group, not an error.
 2267         user_refs = PROVIDERS.identity_api.list_users_in_group(new_group['id'])
 2268         self.assertEqual([], user_refs)
 2269         # Make sure we get the correct users back once they have been added
 2270         # to the group.
 2271         new_user = unit.new_user_ref(domain_id=domain['id'])
 2272         new_user = PROVIDERS.identity_api.create_user(new_user)
 2273 
 2274         # NOTE(amakarov): Create the group directly using LDAP operations
 2275         # rather than going through the manager.
 2276         group_api = PROVIDERS.identity_api.driver.group
 2277         group_ref = group_api.get(new_group['id'])
 2278         mod = (ldap.MOD_ADD, group_api.member_attribute, new_user['id'])
 2279         conn = group_api.get_connection()
 2280         conn.modify_s(group_ref['dn'], [mod])
 2281 
 2282         # Testing the case "the group contains a user"
 2283         user_refs = PROVIDERS.identity_api.list_users_in_group(new_group['id'])
 2284         self.assertIn(new_user['id'], (x['id'] for x in user_refs))
 2285 
 2286         # Testing the case "the user is a member of a group"
 2287         group_refs = PROVIDERS.identity_api.list_groups_for_user(
 2288             new_user['id']
 2289         )
 2290         self.assertIn(new_group['id'], (x['id'] for x in group_refs))
 2291 
 2292 
 2293 class LdapIdentityWithMapping(
 2294         BaseLDAPIdentity, unit.SQLDriverOverrides, unit.TestCase):
 2295     """Class to test mapping of default LDAP backend.
 2296 
 2297     The default configuration is not to enable mapping when using a single
 2298     backend LDAP driver.  However, a cloud provider might want to enable
 2299     the mapping, hence hiding the LDAP IDs from any clients of keystone.
 2300     Setting backward_compatible_ids to False will enable this mapping.
 2301 
 2302     """
 2303 
 2304     def config_files(self):
 2305         config_files = super(LdapIdentityWithMapping, self).config_files()
 2306         config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
 2307         return config_files
 2308 
 2309     def setUp(self):
 2310         super(LdapIdentityWithMapping, self).setUp()
 2311         cache.configure_cache()
 2312 
 2313     def assert_backends(self):
 2314         _assert_backends(self, identity='ldap')
 2315 
 2316     def config_overrides(self):
 2317         super(LdapIdentityWithMapping, self).config_overrides()
 2318         self.config_fixture.config(group='identity', driver='ldap')
 2319         self.config_fixture.config(group='identity_mapping',
 2320                                    backward_compatible_ids=False)
 2321 
 2322     def test_dynamic_mapping_build(self):
 2323         """Test to ensure entities not create via controller are mapped.
 2324 
 2325         Many LDAP backends will, essentially, by Read Only. In these cases
 2326         the mapping is not built by creating objects, rather from enumerating
 2327         the entries.  We test this here my manually deleting the mapping and
 2328         then trying to re-read the entries.
 2329 
 2330         """
 2331         initial_mappings = len(mapping_sql.list_id_mappings())
 2332         user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 2333         user1 = PROVIDERS.identity_api.create_user(user1)
 2334         user2 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
 2335         user2 = PROVIDERS.identity_api.create_user(user2)
 2336         mappings = mapping_sql.list_id_mappings()
 2337         self.assertEqual(initial_mappings + 2, len(mappings))
 2338 
 2339         # Now delete the mappings for the two users above
 2340         PROVIDERS.id_mapping_api.purge_mappings({'public_id': user1['id']})
 2341         PROVIDERS.id_mapping_api.purge_mappings({'public_id': user2['id']})
 2342 
 2343         # We should no longer be able to get these users via their old IDs
 2344         self.assertRaises(exception.UserNotFound,
 2345                           PROVIDERS.identity_api.get_user,
 2346                           user1['id'])
 2347         self.assertRaises(exception.UserNotFound,
 2348                           PROVIDERS.identity_api.get_user,
 2349                           user2['id'])
 2350 
 2351         # Now enumerate all users...this should re-build the mapping, and
 2352         # we should be able to find the users via their original public IDs.
 2353         PROVIDERS.identity_api.list_users()
 2354         PROVIDERS.identity_api.get_user(user1['id'])
 2355         PROVIDERS.identity_api.get_user(user2['id'])
 2356 
 2357     def test_list_domains(self):
 2358         domains = PROVIDERS.resource_api.list_domains()
 2359         default_domain = unit.new_domain_ref(
 2360             description=u'The default domain',
 2361             id=CONF.identity.default_domain_id,
 2362             name=u'Default')
 2363         self.assertEqual([default_domain], domains)
 2364 
 2365 
 2366 class BaseMultiLDAPandSQLIdentity(object):
 2367     """Mixin class with support methods for domain-specific config testing."""
 2368 
 2369     def create_users_across_domains(self):
 2370         """Create a set of users, each with a role on their own domain."""
 2371         # We also will check that the right number of id mappings get created
 2372         initial_mappings = len(mapping_sql.list_id_mappings())
 2373 
 2374         users = {}
 2375 
 2376         users['user0'] = unit.create_user(
 2377             PROVIDERS.identity_api,
 2378             self.domain_default['id'])
 2379         PROVIDERS.assignment_api.create_grant(
 2380             user_id=users['user0']['id'],
 2381             domain_id=self.domain_default['id'],
 2382             role_id=self.role_member['id'])
 2383         for x in range(1, self.domain_count):
 2384             users['user%s' % x] = unit.create_user(
 2385                 PROVIDERS.identity_api,
 2386                 self.domains['domain%s' % x]['id'])
 2387             PROVIDERS.assignment_api.create_grant(
 2388                 user_id=users['user%s' % x]['id'],
 2389                 domain_id=self.domains['domain%s' % x]['id'],
 2390                 role_id=self.role_member['id'])
 2391 
 2392         # So how many new id mappings should have been created? One for each
 2393         # user created in a domain that is using the non default driver..
 2394         self.assertEqual(initial_mappings + self.domain_specific_count,
 2395                          len(mapping_sql.list_id_mappings()))
 2396 
 2397         return users
 2398 
 2399     def check_user(self, user, domain_id, expected_status):
 2400         """Check user is in correct backend.
 2401 
 2402         As part of the tests, we want to force ourselves to manually
 2403         select the driver for a given domain, to make sure the entity
 2404         ended up in the correct backend.
 2405 
 2406         """
 2407         driver = PROVIDERS.identity_api._select_identity_driver(domain_id)
 2408         unused, unused, entity_id = (
 2409             PROVIDERS.identity_api._get_domain_driver_and_entity_id(
 2410                 user['id']))
 2411 
 2412         if expected_status == http_client.OK:
 2413             ref = driver.get_user(entity_id)
 2414             ref = PROVIDERS.identity_api._set_domain_id_and_mapping(
 2415                 ref, domain_id, driver, map.EntityType.USER)
 2416             user = user.copy()
 2417             del user['password']
 2418             self.assertDictEqual(user, ref)
 2419         else:
 2420             # TODO(henry-nash): Use AssertRaises here, although
 2421             # there appears to be an issue with using driver.get_user
 2422             # inside that construct
 2423             try:
 2424                 driver.get_user(entity_id)
 2425             except expected_status:
 2426                 pass
 2427 
 2428     def setup_initial_domains(self):
 2429 
 2430         def create_domain(domain):
 2431             try:
 2432                 ref = PROVIDERS.resource_api.create_domain(
 2433                     domain['id'], domain)
 2434             except exception.Conflict:
 2435                 ref = (
 2436                     PROVIDERS.resource_api.get_domain_by_name(domain['name']))
 2437             return ref
 2438 
 2439         self.domains = {}
 2440         for x in range(1, self.domain_count):
 2441             domain = 'domain%s' % x
 2442             self.domains[domain] = create_domain(
 2443                 {'id': uuid.uuid4().hex, 'name': domain})
 2444 
 2445     def test_authenticate_to_each_domain(self):
 2446         """Test that a user in each domain can authenticate."""
 2447         users = self.create_users_across_domains()
 2448 
 2449         for user_num in range(self.domain_count):
 2450             user = 'user%s' % user_num
 2451             with self.make_request():
 2452                 PROVIDERS.identity_api.authenticate(
 2453                     user_id=users[user]['id'],
 2454                     password=users[user]['password'])
 2455 
 2456 
 2457 class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
 2458                               unit.TestCase, BaseMultiLDAPandSQLIdentity):
 2459     """Class to test common SQL plus individual LDAP backends.
 2460 
 2461     We define a set of domains and domain-specific backends:
 2462 
 2463     - A separate LDAP backend for the default domain
 2464     - A separate LDAP backend for domain1
 2465     - domain2 shares the same LDAP as domain1, but uses a different
 2466       tree attach point
 2467     - An SQL backend for all other domains (which will include domain3
 2468       and domain4)
 2469 
 2470     Normally one would expect that the default domain would be handled as
 2471     part of the "other domains" - however the above provides better
 2472     test coverage since most of the existing backend tests use the default
 2473     domain.
 2474 
 2475     """
 2476 
 2477     def load_fixtures(self, fixtures):
 2478         self.domain_count = 5
 2479         self.domain_specific_count = 3
 2480         PROVIDERS.resource_api.create_domain(
 2481             default_fixtures.ROOT_DOMAIN['id'], default_fixtures.ROOT_DOMAIN)
 2482         self.setup_initial_domains()
 2483 
 2484         # All initial test data setup complete, time to switch on support
 2485         # for separate backends per domain.
 2486         self.enable_multi_domain()
 2487 
 2488         super(MultiLDAPandSQLIdentity, self).load_fixtures(fixtures)
 2489 
 2490     def assert_backends(self):
 2491         _assert_backends(self,
 2492                          assignment='sql',
 2493                          identity={
 2494                              None: 'sql',
 2495                              self.domain_default['id']: 'ldap',
 2496                              self.domains['domain1']['id']: 'ldap',
 2497                              self.domains['domain2']['id']: 'ldap',
 2498                          })
 2499 
 2500     def config_overrides(self):
 2501         super(MultiLDAPandSQLIdentity, self).config_overrides()
 2502         # Make sure identity and assignment are actually SQL drivers,
 2503         # BaseLDAPIdentity sets these options to use LDAP.
 2504         self.config_fixture.config(group='identity', driver='sql')
 2505         self.config_fixture.config(group='resource', driver='sql')
 2506         self.config_fixture.config(group='assignment', driver='sql')
 2507 
 2508     def enable_multi_domain(self):
 2509         """Enable the chosen form of multi domain configuration support.
 2510 
 2511         This method enables the file-based configuration support. Child classes
 2512         that wish to use the database domain configuration support should
 2513         override this method and set the appropriate config_fixture option.
 2514 
 2515         """
 2516         self.config_fixture.config(
 2517             group='identity', domain_specific_drivers_enabled=True,
 2518             domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap',
 2519             list_limit=1000)
 2520         self.config_fixture.config(group='identity_mapping',
 2521                                    backward_compatible_ids=False)
 2522 
 2523     def get_config(self, domain_id):
 2524         # Get the config for this domain, will return CONF
 2525         # if no specific config defined for this domain
 2526         return PROVIDERS.identity_api.domain_configs.get_domain_conf(domain_id)
 2527 
 2528     def test_list_users(self):
 2529         _users = self.create_users_across_domains()
 2530 
 2531         # Override the standard list users, since we have added an extra user
 2532         # to the default domain, so the number of expected users is one more
 2533         # than in the standard test.
 2534         users = PROVIDERS.identity_api.list_users(
 2535             domain_scope=self._set_domain_scope(
 2536                 CONF.identity.default_domain_id))
 2537         self.assertEqual(len(default_fixtures.USERS) + 1, len(users))
 2538         user_ids = set(user['id'] for user in users)
 2539         expected_user_ids = set(getattr(self, 'user_%s' % user['name'])['id']
 2540                                 for user in default_fixtures.USERS)
 2541         expected_user_ids.add(_users['user0']['id'])
 2542         for user_ref in users:
 2543             self.assertNotIn('password', user_ref)
 2544         self.assertEqual(expected_user_ids, user_ids)
 2545 
 2546     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get_all')
 2547     def test_list_limit_domain_specific_inheritance(self, ldap_get_all):
 2548         # passiging hints is important, because if it's not passed, limiting
 2549         # is considered be disabled
 2550         hints = driver_hints.Hints()
 2551         PROVIDERS.identity_api.list_users(
 2552             domain_scope=self.domains['domain2']['id'],
 2553             hints=hints)
 2554         # since list_limit is not specified in keystone.domain2.conf, it should
 2555         # take the default, which is 1000
 2556         self.assertTrue(ldap_get_all.called)
 2557         args, kwargs = ldap_get_all.call_args
 2558         hints = args[0]
 2559         self.assertEqual(1000, hints.limit['limit'])
 2560 
 2561     @mock.patch.object(common_ldap.BaseLdap, '_ldap_get_all')
 2562     def test_list_limit_domain_specific_override(self, ldap_get_all):
 2563         # passiging hints is important, because if it's not passed, limiting
 2564         # is considered to be disabled
 2565         hints = driver_hints.Hints()
 2566         PROVIDERS.identity_api.list_users(
 2567             domain_scope=self.domains['domain1']['id'],
 2568             hints=hints)
 2569         # this should have the list_limit set in Keystone.domain1.conf, which
 2570         # is 101
 2571         self.assertTrue(ldap_get_all.called)
 2572         args, kwargs = ldap_get_all.call_args
 2573         hints = args[0]
 2574         self.assertEqual(101, hints.limit['limit'])
 2575 
 2576     def test_domain_segregation(self):
 2577         """Test that separate configs have segregated the domain.
 2578 
 2579         Test Plan:
 2580 
 2581         - Users were created in each domain as part of setup, now make sure
 2582           you can only find a given user in its relevant domain/backend
 2583         - Make sure that for a backend that supports multiple domains
 2584           you can get the users via any of its domains
 2585 
 2586         """
 2587         users = self.create_users_across_domains()
 2588 
 2589         # Check that I can read a user with the appropriate domain-selected
 2590         # driver, but won't find it via any other domain driver
 2591 
 2592         check_user = self.check_user
 2593         check_user(users['user0'],
 2594                    self.domain_default['id'], http_client.OK)
 2595         for domain in [self.domains['domain1']['id'],
 2596                        self.domains['domain2']['id'],
 2597                        self.domains['domain3']['id'],
 2598                        self.domains['domain4']['id']]:
 2599             check_user(users['user0'], domain, exception.UserNotFound)
 2600 
 2601         check_user(users['user1'], self.domains['domain1']['id'],
 2602                    http_client.OK)
 2603         for domain in [self.domain_default['id'],
 2604                        self.domains['domain2']['id'],
 2605                        self.domains['domain3']['id'],
 2606                        self.domains['domain4']['id']]:
 2607             check_user(users['user1'], domain, exception.UserNotFound)
 2608 
 2609         check_user(users['user2'], self.domains['domain2']['id'],
 2610                    http_client.OK)
 2611         for domain in [self.domain_default['id'],
 2612                        self.domains['domain1']['id'],
 2613                        self.domains['domain3']['id'],
 2614                        self.domains['domain4']['id']]:
 2615             check_user(users['user2'], domain, exception.UserNotFound)
 2616 
 2617         # domain3 and domain4 share the same backend, so you should be
 2618         # able to see user3 and user4 from either.
 2619 
 2620         check_user(users['user3'], self.domains['domain3']['id'],
 2621                    http_client.OK)
 2622         check_user(users['user3'], self.domains['domain4']['id'],
 2623                    http_client.OK)
 2624         check_user(users['user4'], self.domains['domain3']['id'],
 2625                    http_client.OK)
 2626         check_user(users['user4'], self.domains['domain4']['id'],
 2627                    http_client.OK)
 2628 
 2629         for domain in [self.domain_default['id'],
 2630                        self.domains['domain1']['id'],
 2631                        self.domains['domain2']['id']]:
 2632             check_user(users['user3'], domain, exception.UserNotFound)
 2633             check_user(users['user4'], domain, exception.UserNotFound)
 2634 
 2635         # Finally, going through the regular manager layer, make sure we
 2636         # only see the right number of users in each of the non-default
 2637         # domains.  One might have expected two users in domain1 (since we
 2638         # created one before we switched to multi-backend), however since
 2639         # that domain changed backends in the switch we don't find it anymore.
 2640         # This is as designed - we don't support moving domains between
 2641         # backends.
 2642         #
 2643         # The listing of the default domain is already handled in the
 2644         # test_lists_users() method.
 2645         for domain in [self.domains['domain1']['id'],
 2646                        self.domains['domain2']['id'],
 2647                        self.domains['domain4']['id']]:
 2648             self.assertThat(
 2649                 PROVIDERS.identity_api.list_users(domain_scope=domain),
 2650                 matchers.HasLength(1))
 2651 
 2652         # domain3 had a user created before we switched on
 2653         # multiple backends, plus one created afterwards - and its
 2654         # backend has not changed - so we should find two.
 2655         self.assertThat(
 2656             PROVIDERS.identity_api.list_users(
 2657                 domain_scope=self.domains['domain3']['id']),
 2658             matchers.HasLength(1))
 2659 
 2660     def test_existing_uuids_work(self):
 2661         """Test that 'uni-domain' created IDs still work.
 2662 
 2663         Throwing the switch to domain-specific backends should not cause
 2664         existing identities to be inaccessible via ID.
 2665 
 2666         """
 2667         userA = unit.create_user(
 2668             PROVIDERS.identity_api,
 2669             self.domain_default['id'])
 2670         userB = unit.create_user(
 2671             PROVIDERS.identity_api,
 2672             self.domains['domain1']['id'])
 2673         userC = unit.create_user(
 2674             PROVIDERS.identity_api,
 2675             self.domains['domain3']['id'])
 2676         PROVIDERS.identity_api.get_user(userA['id'])
 2677         PROVIDERS.identity_api.get_user(userB['id'])
 2678         PROVIDERS.identity_api.get_user(userC['id'])
 2679 
 2680     def test_scanning_of_config_dir(self):
 2681         """Test the Manager class scans the config directory.
 2682 
 2683         The setup for the main tests above load the domain configs directly
 2684         so that the test overrides can be included. This test just makes sure
 2685         that the standard config directory scanning does pick up the relevant
 2686         domain config files.
 2687 
 2688         """
 2689         # Confirm that config has drivers_enabled as True, which we will
 2690         # check has been set to False later in this test
 2691         self.assertTrue(CONF.identity.domain_specific_drivers_enabled)
 2692         self.load_backends()
 2693         # Execute any command to trigger the lazy loading of domain configs
 2694         PROVIDERS.identity_api.list_users(
 2695             domain_scope=self.domains['domain1']['id'])
 2696         # ...and now check the domain configs have been set up
 2697         self.assertIn('default', PROVIDERS.identity_api.domain_configs)
 2698         self.assertIn(self.domains['domain1']['id'],
 2699                       PROVIDERS.identity_api.domain_configs)
 2700         self.assertIn(self.domains['domain2']['id'],
 2701                       PROVIDERS.identity_api.domain_configs)
 2702         self.assertNotIn(self.domains['domain3']['id'],
 2703                          PROVIDERS.identity_api.domain_configs)
 2704         self.assertNotIn(self.domains['domain4']['id'],
 2705                          PROVIDERS.identity_api.domain_configs)
 2706 
 2707         # Finally check that a domain specific config contains items from both
 2708         # the primary config and the domain specific config
 2709         conf = PROVIDERS.identity_api.domain_configs.get_domain_conf(
 2710             self.domains['domain1']['id'])
 2711         # This should now be false, as is the default, since this is not
 2712         # set in the standard primary config file
 2713         self.assertFalse(conf.identity.domain_specific_drivers_enabled)
 2714         # ..and make sure a domain-specific options is also set
 2715         self.assertEqual('fake://memory1', conf.ldap.url)
 2716 
 2717     def test_delete_domain_with_user_added(self):
 2718         domain = unit.new_domain_ref()
 2719         project = unit.new_project_ref(domain_id=domain['id'])
 2720         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 2721         project = PROVIDERS.resource_api.create_project(project['id'], project)
 2722         project_ref = PROVIDERS.resource_api.get_project(project['id'])
 2723         self.assertDictEqual(project, project_ref)
 2724 
 2725         PROVIDERS.assignment_api.create_grant(
 2726             user_id=self.user_foo['id'], project_id=project['id'],
 2727             role_id=self.role_member['id']
 2728         )
 2729         PROVIDERS.assignment_api.delete_grant(
 2730             user_id=self.user_foo['id'], project_id=project['id'],
 2731             role_id=self.role_member['id']
 2732         )
 2733         domain['enabled'] = False
 2734         PROVIDERS.resource_api.update_domain(domain['id'], domain)
 2735         PROVIDERS.resource_api.delete_domain(domain['id'])
 2736         self.assertRaises(exception.DomainNotFound,
 2737                           PROVIDERS.resource_api.get_domain,
 2738                           domain['id'])
 2739 
 2740     def test_user_enabled_ignored_disable_error(self):
 2741         # Override.
 2742         self.skip_test_overrides("Doesn't apply since LDAP config has no "
 2743                                  "affect on the SQL identity backend.")
 2744 
 2745     def test_group_enabled_ignored_disable_error(self):
 2746         # Override.
 2747         self.skip_test_overrides("Doesn't apply since LDAP config has no "
 2748                                  "affect on the SQL identity backend.")
 2749 
 2750     def test_list_role_assignments_filtered_by_role(self):
 2751         # Domain roles are supported by the SQL Assignment backend
 2752         base = super(BaseLDAPIdentity, self)
 2753         base.test_list_role_assignments_filtered_by_role()
 2754 
 2755     def test_list_role_assignment_by_domain(self):
 2756         # With multi LDAP this method should work, so override the override
 2757         # from BaseLDAPIdentity
 2758         super(BaseLDAPIdentity, self).test_list_role_assignment_by_domain()
 2759 
 2760     def test_list_role_assignment_by_user_with_domain_group_roles(self):
 2761         # With multi LDAP this method should work, so override the override
 2762         # from BaseLDAPIdentity
 2763         super(BaseLDAPIdentity, self).\
 2764             test_list_role_assignment_by_user_with_domain_group_roles()
 2765 
 2766     def test_list_role_assignment_using_sourced_groups_with_domains(self):
 2767         # With SQL Assignment this method should work, so override the override
 2768         # from BaseLDAPIdentity
 2769         base = super(BaseLDAPIdentity, self)
 2770         base.test_list_role_assignment_using_sourced_groups_with_domains()
 2771 
 2772     def test_create_project_with_domain_id_and_without_parent_id(self):
 2773         # With multi LDAP this method should work, so override the override
 2774         # from BaseLDAPIdentity
 2775         super(BaseLDAPIdentity, self).\
 2776             test_create_project_with_domain_id_and_without_parent_id()
 2777 
 2778     def test_create_project_with_domain_id_mismatch_to_parent_domain(self):
 2779         # With multi LDAP this method should work, so override the override
 2780         # from BaseLDAPIdentity
 2781         super(BaseLDAPIdentity, self).\
 2782             test_create_project_with_domain_id_mismatch_to_parent_domain()
 2783 
 2784     def test_remove_foreign_assignments_when_deleting_a_domain(self):
 2785         # With multi LDAP this method should work, so override the override
 2786         # from BaseLDAPIdentity
 2787         base = super(BaseLDAPIdentity, self)
 2788         base.test_remove_foreign_assignments_when_deleting_a_domain()
 2789 
 2790     @mock.patch.object(ldap_identity.Identity, 'unset_default_project_id')
 2791     @mock.patch.object(sql_identity.Identity, 'unset_default_project_id')
 2792     def test_delete_project_unset_project_ids_for_all_backends(self, sql_mock,
 2793                                                                ldap_mock):
 2794         ldap_mock.side_effect = exception.Forbidden
 2795         project = unit.new_project_ref(
 2796             domain_id=CONF.identity.default_domain_id
 2797         )
 2798         project = PROVIDERS.resource_api.create_project(project['id'], project)
 2799         PROVIDERS.resource_api.delete_project(project['id'])
 2800         ldap_mock.assert_called_with(project['id'])
 2801         sql_mock.assert_called_with(project['id'])
 2802 
 2803 
 2804 class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
 2805     """Class to test the use of domain configs stored in the database.
 2806 
 2807     Repeat the same tests as MultiLDAPandSQLIdentity, but instead of using the
 2808     domain specific config files, store the domain specific values in the
 2809     database.
 2810 
 2811     """
 2812 
 2813     def assert_backends(self):
 2814         _assert_backends(self,
 2815                          assignment='sql',
 2816                          identity={
 2817                              None: 'sql',
 2818                              self.domain_default['id']: 'ldap',
 2819                              self.domains['domain1']['id']: 'ldap',
 2820                              self.domains['domain2']['id']: 'ldap',
 2821                          })
 2822 
 2823     def enable_multi_domain(self):
 2824         # The values below are the same as in the domain_configs_multi_ldap
 2825         # directory of test config_files.
 2826         default_config = {
 2827             'ldap': {'url': 'fake://memory',
 2828                      'user': 'cn=Admin',
 2829                      'password': 'password',
 2830                      'suffix': 'cn=example,cn=com'},
 2831             'identity': {'driver': 'ldap'}
 2832         }
 2833         domain1_config = {
 2834             'ldap': {'url': 'fake://memory1',
 2835                      'user': 'cn=Admin',
 2836                      'password': 'password',
 2837                      'suffix': 'cn=example,cn=com'},
 2838             'identity': {'driver': 'ldap',
 2839                          'list_limit': 101}
 2840         }
 2841         domain2_config = {
 2842             'ldap': {'url': 'fake://memory',
 2843                      'user': 'cn=Admin',
 2844                      'password': 'password',
 2845                      'suffix': 'cn=myroot,cn=com',
 2846                      'group_tree_dn': 'ou=UserGroups,dc=myroot,dc=org',
 2847                      'user_tree_dn': 'ou=Users,dc=myroot,dc=org'},
 2848             'identity': {'driver': 'ldap'}
 2849         }
 2850 
 2851         PROVIDERS.domain_config_api.create_config(
 2852             CONF.identity.default_domain_id, default_config
 2853         )
 2854         PROVIDERS.domain_config_api.create_config(
 2855             self.domains['domain1']['id'], domain1_config
 2856         )
 2857         PROVIDERS.domain_config_api.create_config(
 2858             self.domains['domain2']['id'], domain2_config
 2859         )
 2860 
 2861         self.config_fixture.config(
 2862             group='identity', domain_specific_drivers_enabled=True,
 2863             domain_configurations_from_database=True,
 2864             list_limit=1000)
 2865         self.config_fixture.config(group='identity_mapping',
 2866                                    backward_compatible_ids=False)
 2867 
 2868     def test_domain_config_has_no_impact_if_database_support_disabled(self):
 2869         """Ensure database domain configs have no effect if disabled.
 2870 
 2871         Set reading from database configs to false, restart the backends
 2872         and then try and set and use database configs.
 2873 
 2874         """
 2875         self.config_fixture.config(
 2876             group='identity', domain_configurations_from_database=False)
 2877         self.load_backends()
 2878         new_config = {'ldap': {'url': uuid.uuid4().hex}}
 2879         PROVIDERS.domain_config_api.create_config(
 2880             CONF.identity.default_domain_id, new_config)
 2881         # Trigger the identity backend to initialise any domain specific
 2882         # configurations
 2883         PROVIDERS.identity_api.list_users()
 2884         # Check that the new config has not been passed to the driver for
 2885         # the default domain.
 2886         default_config = (
 2887             PROVIDERS.identity_api.domain_configs.get_domain_conf(
 2888                 CONF.identity.default_domain_id))
 2889         self.assertEqual(CONF.ldap.url, default_config.ldap.url)
 2890 
 2891     def test_reloading_domain_config(self):
 2892         """Ensure domain drivers are reloaded on a config modification."""
 2893         domain_cfgs = PROVIDERS.identity_api.domain_configs
 2894 
 2895         # Create a new config for the default domain, hence overwriting the
 2896         # current settings.
 2897         new_config = {
 2898             'ldap': {'url': uuid.uuid4().hex},
 2899             'identity': {'driver': 'ldap'}}
 2900         PROVIDERS.domain_config_api.create_config(
 2901             CONF.identity.default_domain_id, new_config)
 2902         default_config = (
 2903             domain_cfgs.get_domain_conf(CONF.identity.default_domain_id))
 2904         self.assertEqual(new_config['ldap']['url'], default_config.ldap.url)
 2905 
 2906         # Ensure updating is also honored
 2907         updated_config = {'url': uuid.uuid4().hex}
 2908         PROVIDERS.domain_config_api.update_config(
 2909             CONF.identity.default_domain_id, updated_config,
 2910             group='ldap', option='url')
 2911         default_config = (
 2912             domain_cfgs.get_domain_conf(CONF.identity.default_domain_id))
 2913         self.assertEqual(updated_config['url'], default_config.ldap.url)
 2914 
 2915         # ...and finally ensure delete causes the driver to get the standard
 2916         # config again.
 2917         PROVIDERS.domain_config_api.delete_config(
 2918             CONF.identity.default_domain_id
 2919         )
 2920         default_config = (
 2921             domain_cfgs.get_domain_conf(CONF.identity.default_domain_id))
 2922         self.assertEqual(CONF.ldap.url, default_config.ldap.url)
 2923 
 2924     def test_setting_multiple_sql_driver_raises_exception(self):
 2925         """Ensure setting multiple domain specific sql drivers is prevented."""
 2926         new_config = {'identity': {'driver': 'sql'}}
 2927         PROVIDERS.domain_config_api.create_config(
 2928             CONF.identity.default_domain_id, new_config)
 2929         PROVIDERS.identity_api.domain_configs.get_domain_conf(
 2930             CONF.identity.default_domain_id)
 2931         PROVIDERS.domain_config_api.create_config(
 2932             self.domains['domain1']['id'], new_config
 2933         )
 2934         self.assertRaises(
 2935             exception.MultipleSQLDriversInConfig,
 2936             PROVIDERS.identity_api.domain_configs.get_domain_conf,
 2937             self.domains['domain1']['id']
 2938         )
 2939 
 2940     def test_same_domain_gets_sql_driver(self):
 2941         """Ensure we can set an SQL driver if we have had it before."""
 2942         new_config = {'identity': {'driver': 'sql'}}
 2943         PROVIDERS.domain_config_api.create_config(
 2944             CONF.identity.default_domain_id, new_config)
 2945         PROVIDERS.identity_api.domain_configs.get_domain_conf(
 2946             CONF.identity.default_domain_id)
 2947 
 2948         # By using a slightly different config, we cause the driver to be
 2949         # reloaded...and hence check if we can reuse the sql driver
 2950         new_config = {'identity': {'driver': 'sql'},
 2951                       'ldap': {'url': 'fake://memory1'}}
 2952         PROVIDERS.domain_config_api.create_config(
 2953             CONF.identity.default_domain_id, new_config)
 2954         PROVIDERS.identity_api.domain_configs.get_domain_conf(
 2955             CONF.identity.default_domain_id)
 2956 
 2957     def test_delete_domain_clears_sql_registration(self):
 2958         """Ensure registration is deleted when a domain is deleted."""
 2959         domain = unit.new_domain_ref()
 2960         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
 2961         new_config = {'identity': {'driver': 'sql'}}
 2962         PROVIDERS.domain_config_api.create_config(domain['id'], new_config)
 2963         PROVIDERS.identity_api.domain_configs.get_domain_conf(domain['id'])
 2964 
 2965         # First show that trying to set SQL for another driver fails
 2966         PROVIDERS.domain_config_api.create_config(
 2967             self.domains['domain1']['id'], new_config
 2968         )
 2969         self.assertRaises(
 2970             exception.MultipleSQLDriversInConfig,
 2971             PROVIDERS.identity_api.domain_configs.get_domain_conf,
 2972             self.domains['domain1']['id']
 2973         )
 2974         PROVIDERS.domain_config_api.delete_config(
 2975             self.domains['domain1']['id']
 2976         )
 2977 
 2978         # Now we delete the domain
 2979         domain['enabled'] = False
 2980         PROVIDERS.resource_api.update_domain(domain['id'], domain)
 2981         PROVIDERS.resource_api.delete_domain(domain['id'])
 2982 
 2983         # The registration should now be available
 2984         PROVIDERS.domain_config_api.create_config(
 2985             self.domains['domain1']['id'], new_config
 2986         )
 2987         PROVIDERS.identity_api.domain_configs.get_domain_conf(
 2988             self.domains['domain1']['id']
 2989         )
 2990 
 2991     def test_orphaned_registration_does_not_prevent_getting_sql_driver(self):
 2992         """Ensure we self heal an orphaned sql registration."""
 2993         domain = unit.new_domain_ref()
 2994         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
 2995         new_config = {'identity': {'driver': 'sql'}}
 2996         PROVIDERS.domain_config_api.create_config(domain['id'], new_config)
 2997         PROVIDERS.identity_api.domain_configs.get_domain_conf(domain['id'])
 2998 
 2999         # First show that trying to set SQL for another driver fails
 3000         PROVIDERS.domain_config_api.create_config(
 3001             self.domains['domain1']['id'], new_config
 3002         )
 3003         self.assertRaises(
 3004             exception.MultipleSQLDriversInConfig,
 3005             PROVIDERS.identity_api.domain_configs.get_domain_conf,
 3006             self.domains['domain1']['id']
 3007         )
 3008 
 3009         # Now we delete the domain by using the backend driver directly,
 3010         # which causes the domain to be deleted without any of the cleanup
 3011         # that is in the manager (this is simulating a server process crashing
 3012         # in the middle of a delete domain operation, and somehow leaving the
 3013         # domain config settings in place, but the domain is deleted). We
 3014         # should still be able to set another domain to SQL, since we should
 3015         # self heal this issue.
 3016 
 3017         PROVIDERS.resource_api.driver.delete_project(domain['id'])
 3018         # Invalidate cache (so we will see the domain has gone)
 3019         PROVIDERS.resource_api.get_domain.invalidate(
 3020             PROVIDERS.resource_api, domain['id'])
 3021 
 3022         # The registration should now be available
 3023         PROVIDERS.domain_config_api.create_config(
 3024             self.domains['domain1']['id'], new_config
 3025         )
 3026         PROVIDERS.identity_api.domain_configs.get_domain_conf(
 3027             self.domains['domain1']['id'])
 3028 
 3029 
 3030 class DomainSpecificLDAPandSQLIdentity(
 3031     BaseLDAPIdentity, unit.SQLDriverOverrides, unit.TestCase,
 3032         BaseMultiLDAPandSQLIdentity):
 3033     """Class to test when all domains use specific configs, including SQL.
 3034 
 3035     We define a set of domains and domain-specific backends:
 3036 
 3037     - A separate LDAP backend for the default domain
 3038     - A separate SQL backend for domain1
 3039 
 3040     Although the default driver still exists, we don't use it.
 3041 
 3042     """
 3043 
 3044     DOMAIN_COUNT = 2
 3045     DOMAIN_SPECIFIC_COUNT = 2
 3046 
 3047     def setUp(self):
 3048         self.domain_count = self.DOMAIN_COUNT
 3049         self.domain_specific_count = self.DOMAIN_SPECIFIC_COUNT
 3050 
 3051         super(DomainSpecificLDAPandSQLIdentity, self).setUp()
 3052 
 3053     def load_fixtures(self, fixtures):
 3054         PROVIDERS.resource_api.create_domain(
 3055             default_fixtures.ROOT_DOMAIN['id'], default_fixtures.ROOT_DOMAIN)
 3056         self.setup_initial_domains()
 3057         super(DomainSpecificLDAPandSQLIdentity, self).load_fixtures(fixtures)
 3058 
 3059     def assert_backends(self):
 3060         _assert_backends(
 3061             self,
 3062             assignment='sql',
 3063             identity={
 3064                 None: 'ldap',
 3065                 'default': 'ldap',
 3066                 self.domains['domain1']['id']: 'sql',
 3067             })
 3068 
 3069     def config_overrides(self):
 3070         super(DomainSpecificLDAPandSQLIdentity, self).config_overrides()
 3071         # Make sure resource & assignment are actually SQL drivers,
 3072         # BaseLDAPIdentity causes this option to use LDAP.
 3073         self.config_fixture.config(group='resource', driver='sql')
 3074         self.config_fixture.config(group='assignment', driver='sql')
 3075 
 3076         # We aren't setting up any initial data ahead of switching to
 3077         # domain-specific operation, so make the switch straight away.
 3078         self.config_fixture.config(
 3079             group='identity', domain_specific_drivers_enabled=True,
 3080             domain_config_dir=(
 3081                 unit.TESTCONF + '/domain_configs_one_sql_one_ldap'))
 3082 
 3083         self.config_fixture.config(group='identity_mapping',
 3084                                    backward_compatible_ids=False)
 3085 
 3086     def get_config(self, domain_id):
 3087         # Get the config for this domain, will return CONF
 3088         # if no specific config defined for this domain
 3089         return PROVIDERS.identity_api.domain_configs.get_domain_conf(domain_id)
 3090 
 3091     def test_list_domains(self):
 3092         self.skip_test_overrides('N/A: Not relevant for multi ldap testing')
 3093 
 3094     def test_delete_domain(self):
 3095         # With this restricted multi LDAP class, tests that use multiple
 3096         # domains and identity, are still not supported
 3097         self.assertRaises(
 3098             exception.DomainNotFound,
 3099             super(BaseLDAPIdentity, self).test_delete_domain_with_project_api)
 3100 
 3101     def test_list_users(self):
 3102         _users = self.create_users_across_domains()
 3103 
 3104         # Override the standard list users, since we have added an extra user
 3105         # to the default domain, so the number of expected users is one more
 3106         # than in the standard test.
 3107         users = PROVIDERS.identity_api.list_users(
 3108             domain_scope=self._set_domain_scope(
 3109                 CONF.identity.default_domain_id))
 3110         self.assertEqual(len(default_fixtures.USERS) + 1, len(users))
 3111         user_ids = set(user['id'] for user in users)
 3112         expected_user_ids = set(getattr(self, 'user_%s' % user['name'])['id']
 3113                                 for user in default_fixtures.USERS)
 3114         expected_user_ids.add(_users['user0']['id'])
 3115         for user_ref in users:
 3116             self.assertNotIn('password', user_ref)
 3117         self.assertEqual(expected_user_ids, user_ids)
 3118 
 3119     def test_domain_segregation(self):
 3120         """Test that separate configs have segregated the domain.
 3121 
 3122         Test Plan:
 3123 
 3124         - Users were created in each domain as part of setup, now make sure
 3125           you can only find a given user in its relevant domain/backend
 3126         - Make sure that for a backend that supports multiple domains
 3127           you can get the users via any of its domains
 3128 
 3129         """
 3130         users = self.create_users_across_domains()
 3131 
 3132         # Check that I can read a user with the appropriate domain-selected
 3133         # driver, but won't find it via any other domain driver
 3134 
 3135         self.check_user(users['user0'],
 3136                         self.domain_default['id'], http_client.OK)
 3137         self.check_user(users['user0'],
 3138                         self.domains['domain1']['id'], exception.UserNotFound)
 3139 
 3140         self.check_user(users['user1'],
 3141                         self.domains['domain1']['id'], http_client.OK)
 3142         self.check_user(users['user1'],
 3143                         self.domain_default['id'],
 3144                         exception.UserNotFound)
 3145 
 3146         # Finally, going through the regular manager layer, make sure we
 3147         # only see the right number of users in the non-default domain.
 3148 
 3149         self.assertThat(
 3150             PROVIDERS.identity_api.list_users(
 3151                 domain_scope=self.domains['domain1']['id']),
 3152             matchers.HasLength(1))
 3153 
 3154     def test_get_domain_mapping_list_is_used(self):
 3155         # before get_domain_mapping_list was introduced, it was required to
 3156         # make N calls to the database for N users, and it was slow.
 3157         # get_domain_mapping_list solves this problem and should be used
 3158         # when multiple users are fetched from domain-specific backend.
 3159         for i in range(5):
 3160             unit.create_user(PROVIDERS.identity_api,
 3161                              domain_id=self.domains['domain1']['id'])
 3162 
 3163         with mock.patch.multiple(PROVIDERS.id_mapping_api,
 3164                                  get_domain_mapping_list=mock.DEFAULT,
 3165                                  get_id_mapping=mock.DEFAULT) as mocked:
 3166             PROVIDERS.identity_api.list_users(
 3167                 domain_scope=self.domains['domain1']['id'])
 3168             mocked['get_domain_mapping_list'].assert_called()
 3169             mocked['get_id_mapping'].assert_not_called()
 3170 
 3171     def test_user_id_comma(self):
 3172         self.skip_test_overrides('Only valid if it is guaranteed to be '
 3173                                  'talking to the fakeldap backend')
 3174 
 3175     def test_user_enabled_ignored_disable_error(self):
 3176         # Override.
 3177         self.skip_test_overrides("Doesn't apply since LDAP config has no "
 3178                                  "affect on the SQL identity backend.")
 3179 
 3180     def test_group_enabled_ignored_disable_error(self):
 3181         # Override.
 3182         self.skip_test_overrides("Doesn't apply since LDAP config has no "
 3183                                  "affect on the SQL identity backend.")
 3184 
 3185     def test_list_role_assignments_filtered_by_role(self):
 3186         # Domain roles are supported by the SQL Assignment backend
 3187         base = super(BaseLDAPIdentity, self)
 3188         base.test_list_role_assignments_filtered_by_role()
 3189 
 3190     def test_delete_domain_with_project_api(self):
 3191         # With this restricted multi LDAP class, tests that use multiple
 3192         # domains and identity, are still not supported
 3193         self.assertRaises(
 3194             exception.DomainNotFound,
 3195             super(BaseLDAPIdentity, self).test_delete_domain_with_project_api)
 3196 
 3197     def test_create_project_with_domain_id_and_without_parent_id(self):
 3198         # With restricted multi LDAP, tests that don't use identity, but do
 3199         # required aditional domains will work
 3200         base = super(BaseLDAPIdentity, self)
 3201         base.test_create_project_with_domain_id_and_without_parent_id()
 3202 
 3203     def test_create_project_with_domain_id_mismatch_to_parent_domain(self):
 3204         # With restricted multi LDAP, tests that don't use identity, but do
 3205         # required aditional domains will work
 3206         base = super(BaseLDAPIdentity, self)
 3207         base.test_create_project_with_domain_id_mismatch_to_parent_domain()
 3208 
 3209     def test_list_domains_filtered_and_limited(self):
 3210         # With this restricted multi LDAP class, tests that use multiple
 3211         # domains and identity, are still not supported
 3212         self.skip_test_overrides(
 3213             'Restricted multi LDAP class does not support multiple domains')
 3214 
 3215     def test_list_limit_for_domains(self):
 3216         # With this restricted multi LDAP class, tests that use multiple
 3217         # domains and identity, are still not supported
 3218         self.skip_test_overrides(
 3219             'Restricted multi LDAP class does not support multiple domains')
 3220 
 3221 
 3222 class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
 3223     """Class to test simplest use of domain-specific SQL driver.
 3224 
 3225     The simplest use of an SQL domain-specific backend is when it is used to
 3226     augment the standard case when LDAP is the default driver defined in the
 3227     main config file. This would allow, for example, service users to be
 3228     stored in SQL while LDAP handles the rest. Hence we define:
 3229 
 3230     - The default driver uses the LDAP backend for the default domain
 3231     - A separate SQL backend for domain1
 3232 
 3233     """
 3234 
 3235     DOMAIN_COUNT = 2
 3236     DOMAIN_SPECIFIC_COUNT = 1
 3237 
 3238     def assert_backends(self):
 3239         _assert_backends(self,
 3240                          assignment='sql',
 3241                          identity='ldap')
 3242 
 3243     def config_overrides(self):
 3244         super(DomainSpecificSQLIdentity, self).config_overrides()
 3245         self.config_fixture.config(group='identity', driver='ldap')
 3246 
 3247         # We aren't setting up any initial data ahead of switching to
 3248         # domain-specific operation, so make the switch straight away.
 3249         self.config_fixture.config(
 3250             group='identity', domain_specific_drivers_enabled=True,
 3251             domain_config_dir=(
 3252                 unit.TESTCONF + '/domain_configs_default_ldap_one_sql'))
 3253 
 3254         # Part of the testing counts how many new mappings get created as
 3255         # we create users, so ensure we are NOT using mapping for the default
 3256         # LDAP domain so this doesn't confuse the calculation.
 3257         self.config_fixture.config(group='identity_mapping',
 3258                                    backward_compatible_ids=True)
 3259 
 3260     def get_config(self, domain_id):
 3261         if domain_id == CONF.identity.default_domain_id:
 3262             return CONF
 3263         else:
 3264             return PROVIDERS.identity_api.domain_configs.get_domain_conf(
 3265                 domain_id
 3266             )
 3267 
 3268     def test_default_sql_plus_sql_specific_driver_fails(self):
 3269         # First confirm that if ldap is default driver, domain1 can be
 3270         # loaded as sql
 3271         self.config_fixture.config(group='identity', driver='ldap')
 3272         self.config_fixture.config(group='assignment', driver='sql')
 3273         self.load_backends()
 3274         # Make any identity call to initiate the lazy loading of configs
 3275         PROVIDERS.identity_api.list_users(
 3276             domain_scope=CONF.identity.default_domain_id)
 3277         self.assertIsNotNone(self.get_config(self.domains['domain1']['id']))
 3278 
 3279         # Now re-initialize, but with sql as the identity driver
 3280         self.config_fixture.config(group='identity', driver='sql')
 3281         self.config_fixture.config(group='assignment', driver='sql')
 3282         self.load_backends()
 3283         # Make any identity call to initiate the lazy loading of configs, which
 3284         # should fail since we would now have two sql drivers.
 3285         self.assertRaises(exception.MultipleSQLDriversInConfig,
 3286                           PROVIDERS.identity_api.list_users,
 3287                           domain_scope=CONF.identity.default_domain_id)
 3288 
 3289     def test_multiple_sql_specific_drivers_fails(self):
 3290         self.config_fixture.config(group='identity', driver='ldap')
 3291         self.config_fixture.config(group='assignment', driver='sql')
 3292         self.load_backends()
 3293         # Ensure default, domain1 and domain2 exist
 3294         self.domain_count = 3
 3295         self.setup_initial_domains()
 3296         # Make any identity call to initiate the lazy loading of configs
 3297         PROVIDERS.identity_api.list_users(
 3298             domain_scope=CONF.identity.default_domain_id)
 3299         # This will only load domain1, since the domain2 config file is
 3300         # not stored in the same location
 3301         self.assertIsNotNone(self.get_config(self.domains['domain1']['id']))
 3302 
 3303         # Now try and manually load a 2nd sql specific driver, for domain2,
 3304         # which should fail.
 3305         self.assertRaises(
 3306             exception.MultipleSQLDriversInConfig,
 3307             PROVIDERS.identity_api.domain_configs._load_config_from_file,
 3308             PROVIDERS.resource_api,
 3309             [unit.TESTCONF + '/domain_configs_one_extra_sql/' +
 3310              'keystone.domain2.conf'],
 3311             'domain2')
 3312 
 3313 
 3314 class LdapFilterTests(identity_tests.FilterTests, LDAPTestSetup,
 3315                       unit.TestCase):
 3316 
 3317     def assert_backends(self):
 3318         _assert_backends(self, identity='ldap')
 3319 
 3320     def config_overrides(self):
 3321         super(LdapFilterTests, self).config_overrides()
 3322         self.config_fixture.config(group='identity', driver='ldap')
 3323 
 3324     def config_files(self):
 3325         config_files = super(LdapFilterTests, self).config_files()
 3326         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
 3327         return config_files
 3328 
 3329     def test_list_users_in_group_inexact_filtered(self):
 3330         # The LDAP identity driver currently does not support filtering on the
 3331         # listing users for a given group, so will fail this test.
 3332         self.skip_test_overrides('Not supported by LDAP identity driver')
 3333 
 3334     def test_list_users_in_group_exact_filtered(self):
 3335         # The LDAP identity driver currently does not support filtering on the
 3336         # listing users for a given group, so will fail this test.
 3337         self.skip_test_overrides('Not supported by LDAP identity driver')
 3338 
 3339 
 3340 class LDAPMatchingRuleInChainTests(LDAPTestSetup, unit.TestCase):
 3341 
 3342     def setUp(self):
 3343         super(LDAPMatchingRuleInChainTests, self).setUp()
 3344 
 3345         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
 3346         self.group = PROVIDERS.identity_api.create_group(group)
 3347 
 3348         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
 3349         self.user = PROVIDERS.identity_api.create_user(user)
 3350 
 3351         PROVIDERS.identity_api.add_user_to_group(
 3352             self.user['id'], self.group['id']
 3353         )
 3354 
 3355     def assert_backends(self):
 3356         _assert_backends(self, identity='ldap')
 3357 
 3358     def config_overrides(self):
 3359         super(LDAPMatchingRuleInChainTests, self).config_overrides()
 3360         self.config_fixture.config(group='identity', driver='ldap')
 3361         self.config_fixture.config(
 3362             group='ldap',
 3363             group_ad_nesting=True,
 3364             url='fake://memory',
 3365             chase_referrals=False,
 3366             group_tree_dn='cn=UserGroups,cn=example,cn=com',
 3367             query_scope='one')
 3368 
 3369     def config_files(self):
 3370         config_files = super(LDAPMatchingRuleInChainTests, self).config_files()
 3371         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
 3372         return config_files
 3373 
 3374     def test_get_group(self):
 3375         group_ref = PROVIDERS.identity_api.get_group(self.group['id'])
 3376         self.assertDictEqual(self.group, group_ref)
 3377 
 3378     def test_list_user_groups(self):
 3379         PROVIDERS.identity_api.list_groups_for_user(self.user['id'])
 3380 
 3381     def test_list_groups_for_user(self):
 3382         groups_ref = PROVIDERS.identity_api.list_groups_for_user(
 3383             self.user['id']
 3384         )
 3385         self.assertEqual(0, len(groups_ref))
 3386 
 3387     def test_list_groups(self):
 3388         groups_refs = PROVIDERS.identity_api.list_groups()
 3389         self.assertEqual(1, len(groups_refs))
 3390         self.assertEqual(self.group['id'], groups_refs[0]['id'])