"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/identity/test_backends.py" (13 May 2020, 62699 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_backends.py": 16.0.1_vs_17.0.0.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import uuid
   16 
   17 from testtools import matchers
   18 
   19 from keystone.common import driver_hints
   20 from keystone.common import provider_api
   21 import keystone.conf
   22 from keystone import exception
   23 from keystone.tests import unit
   24 from keystone.tests.unit import default_fixtures
   25 from keystone.tests.unit import filtering
   26 
   27 
   28 CONF = keystone.conf.CONF
   29 PROVIDERS = provider_api.ProviderAPIs
   30 
   31 
   32 class IdentityTests(object):
   33 
   34     def _get_domain_fixture(self):
   35         domain = unit.new_domain_ref()
   36         PROVIDERS.resource_api.create_domain(domain['id'], domain)
   37         return domain
   38 
   39     def _set_domain_scope(self, domain_id):
   40         # We only provide a domain scope if we have multiple drivers
   41         if CONF.identity.domain_specific_drivers_enabled:
   42             return domain_id
   43 
   44     def test_authenticate_bad_user(self):
   45         with self.make_request():
   46             self.assertRaises(AssertionError,
   47                               PROVIDERS.identity_api.authenticate,
   48                               user_id=uuid.uuid4().hex,
   49                               password=self.user_foo['password'])
   50 
   51     def test_authenticate_bad_password(self):
   52         with self.make_request():
   53             self.assertRaises(AssertionError,
   54                               PROVIDERS.identity_api.authenticate,
   55                               user_id=self.user_foo['id'],
   56                               password=uuid.uuid4().hex)
   57 
   58     def test_authenticate(self):
   59         with self.make_request():
   60             user_ref = PROVIDERS.identity_api.authenticate(
   61                 user_id=self.user_sna['id'],
   62                 password=self.user_sna['password'])
   63             # NOTE(termie): the password field is left in user_sna to make
   64         #               it easier to authenticate in tests, but should
   65         #               not be returned by the api
   66         self.user_sna.pop('password')
   67         self.user_sna['enabled'] = True
   68         self.assertUserDictEqual(self.user_sna, user_ref)
   69 
   70     def test_authenticate_and_get_roles_no_metadata(self):
   71         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
   72 
   73         # Remove user id. It is ignored by create_user() and will break the
   74         # subset test below.
   75         del user['id']
   76 
   77         new_user = PROVIDERS.identity_api.create_user(user)
   78 
   79         role_member = unit.new_role_ref()
   80         PROVIDERS.role_api.create_role(role_member['id'], role_member)
   81 
   82         PROVIDERS.assignment_api.add_role_to_user_and_project(
   83             new_user['id'], self.project_baz['id'], role_member['id']
   84         )
   85         with self.make_request():
   86             user_ref = PROVIDERS.identity_api.authenticate(
   87                 user_id=new_user['id'],
   88                 password=user['password'])
   89         self.assertNotIn('password', user_ref)
   90         # NOTE(termie): the password field is left in user_sna to make
   91         #               it easier to authenticate in tests, but should
   92         #               not be returned by the api
   93         user.pop('password')
   94         self.assertDictContainsSubset(user, user_ref)
   95         role_list = PROVIDERS.assignment_api.get_roles_for_user_and_project(
   96             new_user['id'], self.project_baz['id'])
   97         self.assertEqual(1, len(role_list))
   98         self.assertIn(role_member['id'], role_list)
   99 
  100     def test_authenticate_if_no_password_set(self):
  101         id_ = uuid.uuid4().hex
  102         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  103         PROVIDERS.identity_api.create_user(user)
  104 
  105         with self.make_request():
  106             self.assertRaises(AssertionError,
  107                               PROVIDERS.identity_api.authenticate,
  108                               user_id=id_,
  109                               password='password')
  110 
  111     def test_create_unicode_user_name(self):
  112         unicode_name = u'name \u540d\u5b57'
  113         user = unit.new_user_ref(name=unicode_name,
  114                                  domain_id=CONF.identity.default_domain_id)
  115         ref = PROVIDERS.identity_api.create_user(user)
  116         self.assertEqual(unicode_name, ref['name'])
  117 
  118     def test_get_user(self):
  119         user_ref = PROVIDERS.identity_api.get_user(self.user_foo['id'])
  120         # NOTE(termie): the password field is left in user_foo to make
  121         #               it easier to authenticate in tests, but should
  122         #               not be returned by the api
  123         self.user_foo.pop('password')
  124         # NOTE(edmondsw): check that options is set, even if it's just an
  125         # empty dict, because otherwise auth will blow up for whatever
  126         # case misses this.
  127         self.assertIn('options', user_ref)
  128         self.assertDictEqual(self.user_foo, user_ref)
  129 
  130     def test_get_user_returns_required_attributes(self):
  131         user_ref = PROVIDERS.identity_api.get_user(self.user_foo['id'])
  132         self.assertIn('id', user_ref)
  133         self.assertIn('name', user_ref)
  134         self.assertIn('enabled', user_ref)
  135         self.assertIn('password_expires_at', user_ref)
  136 
  137     @unit.skip_if_cache_disabled('identity')
  138     def test_cache_layer_get_user(self):
  139         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  140         PROVIDERS.identity_api.create_user(user)
  141         ref = PROVIDERS.identity_api.get_user_by_name(
  142             user['name'], user['domain_id']
  143         )
  144         # cache the result.
  145         PROVIDERS.identity_api.get_user(ref['id'])
  146         # delete bypassing identity api
  147         domain_id, driver, entity_id = (
  148             PROVIDERS.identity_api._get_domain_driver_and_entity_id(ref['id']))
  149         driver.delete_user(entity_id)
  150 
  151         self.assertDictEqual(ref, PROVIDERS.identity_api.get_user(ref['id']))
  152         PROVIDERS.identity_api.get_user.invalidate(
  153             PROVIDERS.identity_api, ref['id']
  154         )
  155         self.assertRaises(exception.UserNotFound,
  156                           PROVIDERS.identity_api.get_user, ref['id'])
  157         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  158         user = PROVIDERS.identity_api.create_user(user)
  159         ref = PROVIDERS.identity_api.get_user_by_name(
  160             user['name'], user['domain_id']
  161         )
  162         user['description'] = uuid.uuid4().hex
  163         # cache the result.
  164         PROVIDERS.identity_api.get_user(ref['id'])
  165         # update using identity api and get back updated user.
  166         user_updated = PROVIDERS.identity_api.update_user(ref['id'], user)
  167         self.assertDictContainsSubset(
  168             PROVIDERS.identity_api.get_user(ref['id']), user_updated
  169         )
  170         self.assertDictContainsSubset(
  171             PROVIDERS.identity_api.get_user_by_name(
  172                 ref['name'], ref['domain_id']), user_updated
  173         )
  174 
  175     def test_get_user_returns_not_found(self):
  176         self.assertRaises(exception.UserNotFound,
  177                           PROVIDERS.identity_api.get_user,
  178                           uuid.uuid4().hex)
  179 
  180     def test_get_user_by_name(self):
  181         user_ref = PROVIDERS.identity_api.get_user_by_name(
  182             self.user_foo['name'], CONF.identity.default_domain_id)
  183         # NOTE(termie): the password field is left in user_foo to make
  184         #               it easier to authenticate in tests, but should
  185         #               not be returned by the api
  186         self.user_foo.pop('password')
  187         self.assertDictEqual(self.user_foo, user_ref)
  188 
  189     @unit.skip_if_cache_disabled('identity')
  190     def test_cache_layer_get_user_by_name(self):
  191         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  192         PROVIDERS.identity_api.create_user(user)
  193         ref = PROVIDERS.identity_api.get_user_by_name(
  194             user['name'], user['domain_id']
  195         )
  196         # delete bypassing the identity api.
  197         domain_id, driver, entity_id = (
  198             PROVIDERS.identity_api._get_domain_driver_and_entity_id(ref['id']))
  199         driver.delete_user(entity_id)
  200 
  201         self.assertDictEqual(ref, PROVIDERS.identity_api.get_user_by_name(
  202             user['name'], CONF.identity.default_domain_id))
  203         PROVIDERS.identity_api.get_user_by_name.invalidate(
  204             PROVIDERS.identity_api,
  205             user['name'],
  206             CONF.identity.default_domain_id
  207         )
  208         self.assertRaises(exception.UserNotFound,
  209                           PROVIDERS.identity_api.get_user_by_name,
  210                           user['name'], CONF.identity.default_domain_id)
  211         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  212         user = PROVIDERS.identity_api.create_user(user)
  213         ref = PROVIDERS.identity_api.get_user_by_name(
  214             user['name'], user['domain_id']
  215         )
  216         user['description'] = uuid.uuid4().hex
  217         user_updated = PROVIDERS.identity_api.update_user(ref['id'], user)
  218         self.assertDictContainsSubset(
  219             PROVIDERS.identity_api.get_user(ref['id']), user_updated
  220         )
  221         self.assertDictContainsSubset(
  222             PROVIDERS.identity_api.get_user_by_name(
  223                 ref['name'],
  224                 ref['domain_id']
  225             ),
  226             user_updated
  227         )
  228 
  229     def test_get_user_by_name_returns_not_found(self):
  230         self.assertRaises(exception.UserNotFound,
  231                           PROVIDERS.identity_api.get_user_by_name,
  232                           uuid.uuid4().hex,
  233                           CONF.identity.default_domain_id)
  234 
  235     def test_create_duplicate_user_name_fails(self):
  236         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  237         user = PROVIDERS.identity_api.create_user(user)
  238         self.assertRaises(exception.Conflict,
  239                           PROVIDERS.identity_api.create_user,
  240                           user)
  241 
  242     def test_create_duplicate_user_name_in_different_domains(self):
  243         new_domain = unit.new_domain_ref()
  244         PROVIDERS.resource_api.create_domain(new_domain['id'], new_domain)
  245         user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  246 
  247         user2 = unit.new_user_ref(name=user1['name'],
  248                                   domain_id=new_domain['id'])
  249 
  250         PROVIDERS.identity_api.create_user(user1)
  251         PROVIDERS.identity_api.create_user(user2)
  252 
  253     def test_move_user_between_domains(self):
  254         domain1 = unit.new_domain_ref()
  255         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
  256         domain2 = unit.new_domain_ref()
  257         PROVIDERS.resource_api.create_domain(domain2['id'], domain2)
  258         user = unit.new_user_ref(domain_id=domain1['id'])
  259         user = PROVIDERS.identity_api.create_user(user)
  260         user['domain_id'] = domain2['id']
  261         self.assertRaises(exception.ValidationError,
  262                           PROVIDERS.identity_api.update_user, user['id'], user)
  263 
  264     def test_rename_duplicate_user_name_fails(self):
  265         user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  266         user2 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  267         PROVIDERS.identity_api.create_user(user1)
  268         user2 = PROVIDERS.identity_api.create_user(user2)
  269         user2['name'] = user1['name']
  270         self.assertRaises(exception.Conflict,
  271                           PROVIDERS.identity_api.update_user,
  272                           user2['id'],
  273                           user2)
  274 
  275     def test_update_user_id_fails(self):
  276         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  277         user = PROVIDERS.identity_api.create_user(user)
  278         original_id = user['id']
  279         user['id'] = 'fake2'
  280         self.assertRaises(exception.ValidationError,
  281                           PROVIDERS.identity_api.update_user,
  282                           original_id,
  283                           user)
  284         user_ref = PROVIDERS.identity_api.get_user(original_id)
  285         self.assertEqual(original_id, user_ref['id'])
  286         self.assertRaises(exception.UserNotFound,
  287                           PROVIDERS.identity_api.get_user,
  288                           'fake2')
  289 
  290     def test_delete_user_with_group_project_domain_links(self):
  291         role1 = unit.new_role_ref()
  292         PROVIDERS.role_api.create_role(role1['id'], role1)
  293         domain1 = unit.new_domain_ref()
  294         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
  295         project1 = unit.new_project_ref(domain_id=domain1['id'])
  296         PROVIDERS.resource_api.create_project(project1['id'], project1)
  297         user1 = unit.new_user_ref(domain_id=domain1['id'])
  298         user1 = PROVIDERS.identity_api.create_user(user1)
  299         group1 = unit.new_group_ref(domain_id=domain1['id'])
  300         group1 = PROVIDERS.identity_api.create_group(group1)
  301         PROVIDERS.assignment_api.create_grant(
  302             user_id=user1['id'], project_id=project1['id'], role_id=role1['id']
  303         )
  304         PROVIDERS.assignment_api.create_grant(
  305             user_id=user1['id'], domain_id=domain1['id'], role_id=role1['id']
  306         )
  307         PROVIDERS.identity_api.add_user_to_group(
  308             user_id=user1['id'], group_id=group1['id']
  309         )
  310         roles_ref = PROVIDERS.assignment_api.list_grants(
  311             user_id=user1['id'],
  312             project_id=project1['id'])
  313         self.assertEqual(1, len(roles_ref))
  314         roles_ref = PROVIDERS.assignment_api.list_grants(
  315             user_id=user1['id'],
  316             domain_id=domain1['id'])
  317         self.assertEqual(1, len(roles_ref))
  318         PROVIDERS.identity_api.check_user_in_group(
  319             user_id=user1['id'],
  320             group_id=group1['id'])
  321         PROVIDERS.identity_api.delete_user(user1['id'])
  322         self.assertRaises(exception.NotFound,
  323                           PROVIDERS.identity_api.check_user_in_group,
  324                           user1['id'],
  325                           group1['id'])
  326 
  327     def test_delete_group_with_user_project_domain_links(self):
  328         role1 = unit.new_role_ref()
  329         PROVIDERS.role_api.create_role(role1['id'], role1)
  330         domain1 = unit.new_domain_ref()
  331         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
  332         project1 = unit.new_project_ref(domain_id=domain1['id'])
  333         PROVIDERS.resource_api.create_project(project1['id'], project1)
  334         user1 = unit.new_user_ref(domain_id=domain1['id'])
  335         user1 = PROVIDERS.identity_api.create_user(user1)
  336         group1 = unit.new_group_ref(domain_id=domain1['id'])
  337         group1 = PROVIDERS.identity_api.create_group(group1)
  338 
  339         PROVIDERS.assignment_api.create_grant(
  340             group_id=group1['id'], project_id=project1['id'],
  341             role_id=role1['id']
  342         )
  343         PROVIDERS.assignment_api.create_grant(
  344             group_id=group1['id'], domain_id=domain1['id'], role_id=role1['id']
  345         )
  346         PROVIDERS.identity_api.add_user_to_group(
  347             user_id=user1['id'], group_id=group1['id']
  348         )
  349         roles_ref = PROVIDERS.assignment_api.list_grants(
  350             group_id=group1['id'],
  351             project_id=project1['id'])
  352         self.assertEqual(1, len(roles_ref))
  353         roles_ref = PROVIDERS.assignment_api.list_grants(
  354             group_id=group1['id'],
  355             domain_id=domain1['id'])
  356         self.assertEqual(1, len(roles_ref))
  357         PROVIDERS.identity_api.check_user_in_group(
  358             user_id=user1['id'],
  359             group_id=group1['id'])
  360         PROVIDERS.identity_api.delete_group(group1['id'])
  361         PROVIDERS.identity_api.get_user(user1['id'])
  362 
  363     def test_update_user_returns_not_found(self):
  364         user_id = uuid.uuid4().hex
  365         self.assertRaises(exception.UserNotFound,
  366                           PROVIDERS.identity_api.update_user,
  367                           user_id,
  368                           {'id': user_id,
  369                            'domain_id': CONF.identity.default_domain_id})
  370 
  371     def test_delete_user_returns_not_found(self):
  372         self.assertRaises(exception.UserNotFound,
  373                           PROVIDERS.identity_api.delete_user,
  374                           uuid.uuid4().hex)
  375 
  376     def test_create_user_with_long_password(self):
  377         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id,
  378                                  password='a' * 2000)
  379         # success create a user with long password
  380         PROVIDERS.identity_api.create_user(user)
  381 
  382     def test_create_user_missed_password(self):
  383         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  384         user = PROVIDERS.identity_api.create_user(user)
  385         PROVIDERS.identity_api.get_user(user['id'])
  386         # Make sure  the user is not allowed to login
  387         # with a password that  is empty string or None
  388         with self.make_request():
  389             self.assertRaises(AssertionError,
  390                               PROVIDERS.identity_api.authenticate,
  391                               user_id=user['id'],
  392                               password='')
  393             self.assertRaises(AssertionError,
  394                               PROVIDERS.identity_api.authenticate,
  395                               user_id=user['id'],
  396                               password=None)
  397 
  398     def test_create_user_none_password(self):
  399         user = unit.new_user_ref(password=None,
  400                                  domain_id=CONF.identity.default_domain_id)
  401         user = PROVIDERS.identity_api.create_user(user)
  402         PROVIDERS.identity_api.get_user(user['id'])
  403         # Make sure  the user is not allowed to login
  404         # with a password that  is empty string or None
  405         with self.make_request():
  406             self.assertRaises(AssertionError,
  407                               PROVIDERS.identity_api.authenticate,
  408                               user_id=user['id'],
  409                               password='')
  410             self.assertRaises(AssertionError,
  411                               PROVIDERS.identity_api.authenticate,
  412                               user_id=user['id'],
  413                               password=None)
  414 
  415     def test_list_users(self):
  416         users = PROVIDERS.identity_api.list_users(
  417             domain_scope=self._set_domain_scope(
  418                 CONF.identity.default_domain_id))
  419         self.assertEqual(len(default_fixtures.USERS), len(users))
  420         user_ids = set(user['id'] for user in users)
  421         expected_user_ids = set(getattr(self, 'user_%s' % user['name'])['id']
  422                                 for user in default_fixtures.USERS)
  423         for user_ref in users:
  424             self.assertNotIn('password', user_ref)
  425         self.assertEqual(expected_user_ids, user_ids)
  426 
  427     def _build_hints(self, hints, filters, fed_dict):
  428         for key in filters:
  429             hints.add_filter(key,
  430                              fed_dict[key],
  431                              comparator='equals')
  432         return hints
  433 
  434     def _build_fed_resource(self):
  435         # create one test mapping, two idps and two protocols for federation
  436         # test.
  437         new_mapping = unit.new_mapping_ref()
  438         PROVIDERS.federation_api.create_mapping(new_mapping['id'], new_mapping)
  439         for idp_id, protocol_id in [('ORG_IDP', 'saml2'),
  440                                     ('myidp', 'mapped')]:
  441             new_idp = unit.new_identity_provider_ref(idp_id=idp_id,
  442                                                      domain_id='default')
  443             new_protocol = unit.new_protocol_ref(protocol_id=protocol_id,
  444                                                  idp_id=idp_id,
  445                                                  mapping_id=new_mapping['id'])
  446 
  447             PROVIDERS.federation_api.create_idp(new_idp['id'], new_idp)
  448             PROVIDERS.federation_api.create_protocol(new_idp['id'],
  449                                                      new_protocol['id'],
  450                                                      new_protocol)
  451 
  452     def _test_list_users_with_attribute(self, filters, fed_dict):
  453         self._build_fed_resource()
  454         domain = self._get_domain_fixture()
  455         # Call list_users while no match exists for the federated user
  456         hints = driver_hints.Hints()
  457         hints = self._build_hints(hints, filters, fed_dict)
  458         users = PROVIDERS.identity_api.list_users(hints=hints)
  459         self.assertEqual(0, len(users))
  460 
  461         # list_users with a new relational user and federated user
  462         hints = self._build_hints(hints, filters, fed_dict)
  463         PROVIDERS.shadow_users_api.create_federated_user(
  464             domain['id'], fed_dict
  465         )
  466         users = PROVIDERS.identity_api.list_users(hints=hints)
  467         self.assertEqual(1, len(users))
  468 
  469         # create another federated user that shouldnt be matched and ensure
  470         # that still only one match is found
  471         hints = self._build_hints(hints, filters, fed_dict)
  472         fed_dict2 = unit.new_federated_user_ref()
  473         fed_dict2['idp_id'] = 'myidp'
  474         fed_dict2['protocol_id'] = 'mapped'
  475         PROVIDERS.shadow_users_api.create_federated_user(
  476             domain['id'], fed_dict2
  477         )
  478         users = PROVIDERS.identity_api.list_users(hints=hints)
  479         self.assertEqual(1, len(users))
  480 
  481         # create another federated user that should also be matched and ensure
  482         # that there are now two matches in the users list. Unless there is a
  483         # unique id in the filter since unique_ids must be unique and would
  484         # therefore cause a duplicate error.
  485         hints = self._build_hints(hints, filters, fed_dict)
  486         if not any('unique_id' in x['name'] for x in hints.filters):
  487             hints = self._build_hints(hints, filters, fed_dict)
  488             fed_dict3 = unit.new_federated_user_ref()
  489             # check which filters are here and create another match
  490             for filters_ in hints.filters:
  491                 if filters_['name'] == 'idp_id':
  492                     fed_dict3['idp_id'] = fed_dict['idp_id']
  493                 elif filters_['name'] == 'protocol_id':
  494                     fed_dict3['protocol_id'] = fed_dict['protocol_id']
  495             PROVIDERS.shadow_users_api.create_federated_user(
  496                 domain['id'], fed_dict3
  497             )
  498             users = PROVIDERS.identity_api.list_users(hints=hints)
  499             self.assertEqual(2, len(users))
  500 
  501     def test_list_users_with_unique_id(self):
  502         federated_dict = unit.new_federated_user_ref()
  503         filters = ['unique_id']
  504         self._test_list_users_with_attribute(filters, federated_dict)
  505 
  506     def test_list_users_with_idp_id(self):
  507         federated_dict = unit.new_federated_user_ref()
  508         filters = ['idp_id']
  509         self._test_list_users_with_attribute(filters, federated_dict)
  510 
  511     def test_list_users_with_protocol_id(self):
  512         federated_dict = unit.new_federated_user_ref()
  513         filters = ['protocol_id']
  514         self._test_list_users_with_attribute(filters, federated_dict)
  515 
  516     def test_list_users_with_unique_id_and_idp_id(self):
  517         federated_dict = unit.new_federated_user_ref()
  518         filters = ['unique_id', 'idp_id']
  519         self._test_list_users_with_attribute(filters, federated_dict)
  520 
  521     def test_list_users_with_unique_id_and_protocol_id(self):
  522         federated_dict = unit.new_federated_user_ref()
  523         filters = ['unique_id', 'protocol_id']
  524         self._test_list_users_with_attribute(filters, federated_dict)
  525 
  526     def test_list_users_with_idp_id_protocol_id(self):
  527         federated_dict = unit.new_federated_user_ref()
  528         filters = ['idp_id', 'protocol_id']
  529         self._test_list_users_with_attribute(filters, federated_dict)
  530 
  531     def test_list_users_with_all_federated_attributes(self):
  532         federated_dict = unit.new_federated_user_ref()
  533         filters = ['unique_id', 'idp_id', 'protocol_id']
  534         self._test_list_users_with_attribute(filters, federated_dict)
  535 
  536     def test_list_users_with_name(self):
  537         self._build_fed_resource()
  538         federated_dict_1 = unit.new_federated_user_ref(
  539             display_name='test1@federation.org')
  540         federated_dict_2 = unit.new_federated_user_ref(
  541             display_name='test2@federation.org')
  542         domain = self._get_domain_fixture()
  543 
  544         hints = driver_hints.Hints()
  545         hints.add_filter('name', 'test1@federation.org')
  546         users = self.identity_api.list_users(hints=hints)
  547         self.assertEqual(0, len(users))
  548 
  549         self.shadow_users_api.create_federated_user(domain['id'],
  550                                                     federated_dict_1)
  551         self.shadow_users_api.create_federated_user(domain['id'],
  552                                                     federated_dict_2)
  553         hints = driver_hints.Hints()
  554         hints.add_filter('name', 'test1@federation.org')
  555         users = self.identity_api.list_users(hints=hints)
  556         self.assertEqual(1, len(users))
  557 
  558         hints = driver_hints.Hints()
  559         hints.add_filter('name', 'test1@federation.org')
  560         hints.add_filter('idp_id', 'ORG_IDP')
  561         users = self.identity_api.list_users(hints=hints)
  562         self.assertEqual(1, len(users))
  563 
  564     def test_list_groups(self):
  565         group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  566         group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  567         group1 = PROVIDERS.identity_api.create_group(group1)
  568         group2 = PROVIDERS.identity_api.create_group(group2)
  569         groups = PROVIDERS.identity_api.list_groups(
  570             domain_scope=self._set_domain_scope(
  571                 CONF.identity.default_domain_id))
  572         self.assertEqual(2, len(groups))
  573         group_ids = []
  574         for group in groups:
  575             group_ids.append(group.get('id'))
  576         self.assertIn(group1['id'], group_ids)
  577         self.assertIn(group2['id'], group_ids)
  578 
  579     def test_create_user_doesnt_modify_passed_in_dict(self):
  580         new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  581         original_user = new_user.copy()
  582         PROVIDERS.identity_api.create_user(new_user)
  583         self.assertDictEqual(original_user, new_user)
  584 
  585     def test_update_user_enable(self):
  586         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  587         user = PROVIDERS.identity_api.create_user(user)
  588         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  589         self.assertTrue(user_ref['enabled'])
  590 
  591         user['enabled'] = False
  592         PROVIDERS.identity_api.update_user(user['id'], user)
  593         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  594         self.assertEqual(user['enabled'], user_ref['enabled'])
  595 
  596         # If not present, enabled field should not be updated
  597         del user['enabled']
  598         PROVIDERS.identity_api.update_user(user['id'], user)
  599         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  600         self.assertFalse(user_ref['enabled'])
  601 
  602         user['enabled'] = True
  603         PROVIDERS.identity_api.update_user(user['id'], user)
  604         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  605         self.assertEqual(user['enabled'], user_ref['enabled'])
  606 
  607         del user['enabled']
  608         PROVIDERS.identity_api.update_user(user['id'], user)
  609         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  610         self.assertTrue(user_ref['enabled'])
  611 
  612     def test_update_user_name(self):
  613         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  614         user = PROVIDERS.identity_api.create_user(user)
  615         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  616         self.assertEqual(user['name'], user_ref['name'])
  617 
  618         changed_name = user_ref['name'] + '_changed'
  619         user_ref['name'] = changed_name
  620         updated_user = PROVIDERS.identity_api.update_user(
  621             user_ref['id'], user_ref
  622         )
  623 
  624         # NOTE(dstanek): the SQL backend adds an 'extra' field containing a
  625         #                dictionary of the extra fields in addition to the
  626         #                fields in the object. For the details see:
  627         #                SqlIdentity.test_update_project_returns_extra
  628         updated_user.pop('extra', None)
  629 
  630         self.assertDictEqual(user_ref, updated_user)
  631 
  632         user_ref = PROVIDERS.identity_api.get_user(user_ref['id'])
  633         self.assertEqual(changed_name, user_ref['name'])
  634 
  635     def test_add_user_to_group(self):
  636         domain = self._get_domain_fixture()
  637         new_group = unit.new_group_ref(domain_id=domain['id'])
  638         new_group = PROVIDERS.identity_api.create_group(new_group)
  639         new_user = unit.new_user_ref(domain_id=domain['id'])
  640         new_user = PROVIDERS.identity_api.create_user(new_user)
  641         PROVIDERS.identity_api.add_user_to_group(
  642             new_user['id'], new_group['id']
  643         )
  644         groups = PROVIDERS.identity_api.list_groups_for_user(new_user['id'])
  645 
  646         found = False
  647         for x in groups:
  648             if (x['id'] == new_group['id']):
  649                 found = True
  650         self.assertTrue(found)
  651 
  652     def test_add_user_to_group_returns_not_found(self):
  653         domain = self._get_domain_fixture()
  654         new_user = unit.new_user_ref(domain_id=domain['id'])
  655         new_user = PROVIDERS.identity_api.create_user(new_user)
  656         self.assertRaises(exception.GroupNotFound,
  657                           PROVIDERS.identity_api.add_user_to_group,
  658                           new_user['id'],
  659                           uuid.uuid4().hex)
  660 
  661         new_group = unit.new_group_ref(domain_id=domain['id'])
  662         new_group = PROVIDERS.identity_api.create_group(new_group)
  663         self.assertRaises(exception.UserNotFound,
  664                           PROVIDERS.identity_api.add_user_to_group,
  665                           uuid.uuid4().hex,
  666                           new_group['id'])
  667 
  668         self.assertRaises(exception.NotFound,
  669                           PROVIDERS.identity_api.add_user_to_group,
  670                           uuid.uuid4().hex,
  671                           uuid.uuid4().hex)
  672 
  673     def test_check_user_in_group(self):
  674         domain = self._get_domain_fixture()
  675         new_group = unit.new_group_ref(domain_id=domain['id'])
  676         new_group = PROVIDERS.identity_api.create_group(new_group)
  677         new_user = unit.new_user_ref(domain_id=domain['id'])
  678         new_user = PROVIDERS.identity_api.create_user(new_user)
  679         PROVIDERS.identity_api.add_user_to_group(
  680             new_user['id'], new_group['id']
  681         )
  682         PROVIDERS.identity_api.check_user_in_group(
  683             new_user['id'], new_group['id']
  684         )
  685 
  686     def test_check_user_not_in_group(self):
  687         new_group = unit.new_group_ref(
  688             domain_id=CONF.identity.default_domain_id)
  689         new_group = PROVIDERS.identity_api.create_group(new_group)
  690 
  691         new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  692         new_user = PROVIDERS.identity_api.create_user(new_user)
  693 
  694         self.assertRaises(exception.NotFound,
  695                           PROVIDERS.identity_api.check_user_in_group,
  696                           new_user['id'],
  697                           new_group['id'])
  698 
  699     def test_check_user_in_group_returns_not_found(self):
  700         new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  701         new_user = PROVIDERS.identity_api.create_user(new_user)
  702 
  703         new_group = unit.new_group_ref(
  704             domain_id=CONF.identity.default_domain_id)
  705         new_group = PROVIDERS.identity_api.create_group(new_group)
  706 
  707         self.assertRaises(exception.UserNotFound,
  708                           PROVIDERS.identity_api.check_user_in_group,
  709                           uuid.uuid4().hex,
  710                           new_group['id'])
  711 
  712         self.assertRaises(exception.GroupNotFound,
  713                           PROVIDERS.identity_api.check_user_in_group,
  714                           new_user['id'],
  715                           uuid.uuid4().hex)
  716 
  717         self.assertRaises(exception.NotFound,
  718                           PROVIDERS.identity_api.check_user_in_group,
  719                           uuid.uuid4().hex,
  720                           uuid.uuid4().hex)
  721 
  722     def test_list_users_in_group(self):
  723         domain = self._get_domain_fixture()
  724         new_group = unit.new_group_ref(domain_id=domain['id'])
  725         new_group = PROVIDERS.identity_api.create_group(new_group)
  726         # Make sure we get an empty list back on a new group, not an error.
  727         user_refs = PROVIDERS.identity_api.list_users_in_group(new_group['id'])
  728         self.assertEqual([], user_refs)
  729         # Make sure we get the correct users back once they have been added
  730         # to the group.
  731         new_user = unit.new_user_ref(domain_id=domain['id'])
  732         new_user = PROVIDERS.identity_api.create_user(new_user)
  733         PROVIDERS.identity_api.add_user_to_group(
  734             new_user['id'], new_group['id']
  735         )
  736         user_refs = PROVIDERS.identity_api.list_users_in_group(new_group['id'])
  737         found = False
  738         for x in user_refs:
  739             if (x['id'] == new_user['id']):
  740                 found = True
  741             self.assertNotIn('password', x)
  742         self.assertTrue(found)
  743 
  744     def test_list_users_in_group_returns_not_found(self):
  745         self.assertRaises(exception.GroupNotFound,
  746                           PROVIDERS.identity_api.list_users_in_group,
  747                           uuid.uuid4().hex)
  748 
  749     def test_list_groups_for_user(self):
  750         domain = self._get_domain_fixture()
  751         test_groups = []
  752         test_users = []
  753         GROUP_COUNT = 3
  754         USER_COUNT = 2
  755 
  756         for x in range(0, USER_COUNT):
  757             new_user = unit.new_user_ref(domain_id=domain['id'])
  758             new_user = PROVIDERS.identity_api.create_user(new_user)
  759             test_users.append(new_user)
  760         positive_user = test_users[0]
  761         negative_user = test_users[1]
  762 
  763         for x in range(0, USER_COUNT):
  764             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  765                 test_users[x]['id'])
  766             self.assertEqual(0, len(group_refs))
  767 
  768         for x in range(0, GROUP_COUNT):
  769             before_count = x
  770             after_count = x + 1
  771             new_group = unit.new_group_ref(domain_id=domain['id'])
  772             new_group = PROVIDERS.identity_api.create_group(new_group)
  773             test_groups.append(new_group)
  774 
  775             # add the user to the group and ensure that the
  776             # group count increases by one for each
  777             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  778                 positive_user['id'])
  779             self.assertEqual(before_count, len(group_refs))
  780             PROVIDERS.identity_api.add_user_to_group(
  781                 positive_user['id'],
  782                 new_group['id'])
  783             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  784                 positive_user['id'])
  785             self.assertEqual(after_count, len(group_refs))
  786 
  787             # Make sure the group count for the unrelated user did not change
  788             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  789                 negative_user['id'])
  790             self.assertEqual(0, len(group_refs))
  791 
  792     def test_remove_user_from_group(self):
  793         domain = self._get_domain_fixture()
  794         new_group = unit.new_group_ref(domain_id=domain['id'])
  795         new_group = PROVIDERS.identity_api.create_group(new_group)
  796         new_user = unit.new_user_ref(domain_id=domain['id'])
  797         new_user = PROVIDERS.identity_api.create_user(new_user)
  798         PROVIDERS.identity_api.add_user_to_group(
  799             new_user['id'], new_group['id']
  800         )
  801         groups = PROVIDERS.identity_api.list_groups_for_user(new_user['id'])
  802         self.assertIn(new_group['id'], [x['id'] for x in groups])
  803         PROVIDERS.identity_api.remove_user_from_group(
  804             new_user['id'], new_group['id']
  805         )
  806         groups = PROVIDERS.identity_api.list_groups_for_user(new_user['id'])
  807         self.assertNotIn(new_group['id'], [x['id'] for x in groups])
  808 
  809     def test_remove_user_from_group_returns_not_found(self):
  810         domain = self._get_domain_fixture()
  811         new_user = unit.new_user_ref(domain_id=domain['id'])
  812         new_user = PROVIDERS.identity_api.create_user(new_user)
  813         new_group = unit.new_group_ref(domain_id=domain['id'])
  814         new_group = PROVIDERS.identity_api.create_group(new_group)
  815         self.assertRaises(exception.GroupNotFound,
  816                           PROVIDERS.identity_api.remove_user_from_group,
  817                           new_user['id'],
  818                           uuid.uuid4().hex)
  819 
  820         self.assertRaises(exception.UserNotFound,
  821                           PROVIDERS.identity_api.remove_user_from_group,
  822                           uuid.uuid4().hex,
  823                           new_group['id'])
  824 
  825         self.assertRaises(exception.NotFound,
  826                           PROVIDERS.identity_api.remove_user_from_group,
  827                           uuid.uuid4().hex,
  828                           uuid.uuid4().hex)
  829 
  830     def test_group_crud(self):
  831         domain = unit.new_domain_ref()
  832         PROVIDERS.resource_api.create_domain(domain['id'], domain)
  833         group = unit.new_group_ref(domain_id=domain['id'])
  834         group = PROVIDERS.identity_api.create_group(group)
  835         group_ref = PROVIDERS.identity_api.get_group(group['id'])
  836         self.assertDictContainsSubset(group, group_ref)
  837 
  838         group['name'] = uuid.uuid4().hex
  839         PROVIDERS.identity_api.update_group(group['id'], group)
  840         group_ref = PROVIDERS.identity_api.get_group(group['id'])
  841         self.assertDictContainsSubset(group, group_ref)
  842 
  843         PROVIDERS.identity_api.delete_group(group['id'])
  844         self.assertRaises(exception.GroupNotFound,
  845                           PROVIDERS.identity_api.get_group,
  846                           group['id'])
  847 
  848     def test_create_group_name_with_trailing_whitespace(self):
  849         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  850         group_name = group['name'] = (group['name'] + '    ')
  851         group_returned = PROVIDERS.identity_api.create_group(group)
  852         self.assertEqual(group_returned['name'], group_name.strip())
  853 
  854     def test_update_group_name_with_trailing_whitespace(self):
  855         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  856         group_create = PROVIDERS.identity_api.create_group(group)
  857         group_name = group['name'] = (group['name'] + '    ')
  858         group_update = PROVIDERS.identity_api.update_group(
  859             group_create['id'], group
  860         )
  861         self.assertEqual(group_update['id'], group_create['id'])
  862         self.assertEqual(group_update['name'], group_name.strip())
  863 
  864     def test_get_group_by_name(self):
  865         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  866         group_name = group['name']
  867         group = PROVIDERS.identity_api.create_group(group)
  868         spoiler = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  869         PROVIDERS.identity_api.create_group(spoiler)
  870 
  871         group_ref = PROVIDERS.identity_api.get_group_by_name(
  872             group_name, CONF.identity.default_domain_id)
  873         self.assertDictEqual(group, group_ref)
  874 
  875     def test_get_group_by_name_returns_not_found(self):
  876         self.assertRaises(exception.GroupNotFound,
  877                           PROVIDERS.identity_api.get_group_by_name,
  878                           uuid.uuid4().hex,
  879                           CONF.identity.default_domain_id)
  880 
  881     @unit.skip_if_cache_disabled('identity')
  882     def test_cache_layer_group_crud(self):
  883         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  884         group = PROVIDERS.identity_api.create_group(group)
  885         # cache the result
  886         group_ref = PROVIDERS.identity_api.get_group(group['id'])
  887         # delete the group bypassing identity api.
  888         domain_id, driver, entity_id = (
  889             PROVIDERS.identity_api._get_domain_driver_and_entity_id(
  890                 group['id']
  891             )
  892         )
  893         driver.delete_group(entity_id)
  894 
  895         self.assertEqual(
  896             group_ref, PROVIDERS.identity_api.get_group(group['id'])
  897         )
  898         PROVIDERS.identity_api.get_group.invalidate(
  899             PROVIDERS.identity_api, group['id']
  900         )
  901         self.assertRaises(exception.GroupNotFound,
  902                           PROVIDERS.identity_api.get_group, group['id'])
  903 
  904         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  905         group = PROVIDERS.identity_api.create_group(group)
  906         # cache the result
  907         PROVIDERS.identity_api.get_group(group['id'])
  908         group['name'] = uuid.uuid4().hex
  909         group_ref = PROVIDERS.identity_api.update_group(group['id'], group)
  910         # after updating through identity api, get updated group
  911         self.assertDictContainsSubset(
  912             PROVIDERS.identity_api.get_group(group['id']), group_ref
  913         )
  914 
  915     def test_create_duplicate_group_name_fails(self):
  916         group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  917         group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id,
  918                                     name=group1['name'])
  919         group1 = PROVIDERS.identity_api.create_group(group1)
  920         self.assertRaises(exception.Conflict,
  921                           PROVIDERS.identity_api.create_group,
  922                           group2)
  923 
  924     def test_create_duplicate_group_name_in_different_domains(self):
  925         new_domain = unit.new_domain_ref()
  926         PROVIDERS.resource_api.create_domain(new_domain['id'], new_domain)
  927         group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  928         group2 = unit.new_group_ref(domain_id=new_domain['id'],
  929                                     name=group1['name'])
  930         group1 = PROVIDERS.identity_api.create_group(group1)
  931         group2 = PROVIDERS.identity_api.create_group(group2)
  932 
  933     def test_move_group_between_domains(self):
  934         domain1 = unit.new_domain_ref()
  935         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
  936         domain2 = unit.new_domain_ref()
  937         PROVIDERS.resource_api.create_domain(domain2['id'], domain2)
  938         group = unit.new_group_ref(domain_id=domain1['id'])
  939         group = PROVIDERS.identity_api.create_group(group)
  940         group['domain_id'] = domain2['id']
  941         self.assertRaises(exception.ValidationError,
  942                           PROVIDERS.identity_api.update_group,
  943                           group['id'], group)
  944 
  945     def test_user_crud(self):
  946         user_dict = unit.new_user_ref(
  947             domain_id=CONF.identity.default_domain_id)
  948         del user_dict['id']
  949         user = PROVIDERS.identity_api.create_user(user_dict)
  950         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  951         del user_dict['password']
  952         user_ref_dict = {x: user_ref[x] for x in user_ref}
  953         self.assertDictContainsSubset(user_dict, user_ref_dict)
  954 
  955         user_dict['password'] = uuid.uuid4().hex
  956         PROVIDERS.identity_api.update_user(user['id'], user_dict)
  957         user_ref = PROVIDERS.identity_api.get_user(user['id'])
  958         del user_dict['password']
  959         user_ref_dict = {x: user_ref[x] for x in user_ref}
  960         self.assertDictContainsSubset(user_dict, user_ref_dict)
  961 
  962         PROVIDERS.identity_api.delete_user(user['id'])
  963         self.assertRaises(exception.UserNotFound,
  964                           PROVIDERS.identity_api.get_user,
  965                           user['id'])
  966 
  967     def test_arbitrary_attributes_are_returned_from_create_user(self):
  968         attr_value = uuid.uuid4().hex
  969         user_data = unit.new_user_ref(
  970             domain_id=CONF.identity.default_domain_id,
  971             arbitrary_attr=attr_value)
  972 
  973         user = PROVIDERS.identity_api.create_user(user_data)
  974 
  975         self.assertEqual(attr_value, user['arbitrary_attr'])
  976 
  977     def test_arbitrary_attributes_are_returned_from_get_user(self):
  978         attr_value = uuid.uuid4().hex
  979         user_data = unit.new_user_ref(
  980             domain_id=CONF.identity.default_domain_id,
  981             arbitrary_attr=attr_value)
  982 
  983         user_data = PROVIDERS.identity_api.create_user(user_data)
  984 
  985         user = PROVIDERS.identity_api.get_user(user_data['id'])
  986         self.assertEqual(attr_value, user['arbitrary_attr'])
  987 
  988     def test_new_arbitrary_attributes_are_returned_from_update_user(self):
  989         user_data = unit.new_user_ref(
  990             domain_id=CONF.identity.default_domain_id)
  991 
  992         user = PROVIDERS.identity_api.create_user(user_data)
  993         attr_value = uuid.uuid4().hex
  994         user['arbitrary_attr'] = attr_value
  995         updated_user = PROVIDERS.identity_api.update_user(user['id'], user)
  996 
  997         self.assertEqual(attr_value, updated_user['arbitrary_attr'])
  998 
  999     def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
 1000         attr_value = uuid.uuid4().hex
 1001         user_data = unit.new_user_ref(
 1002             domain_id=CONF.identity.default_domain_id,
 1003             arbitrary_attr=attr_value)
 1004 
 1005         new_attr_value = uuid.uuid4().hex
 1006         user = PROVIDERS.identity_api.create_user(user_data)
 1007         user['arbitrary_attr'] = new_attr_value
 1008         updated_user = PROVIDERS.identity_api.update_user(user['id'], user)
 1009 
 1010         self.assertEqual(new_attr_value, updated_user['arbitrary_attr'])
 1011 
 1012     def test_user_update_and_user_get_return_same_response(self):
 1013         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
 1014 
 1015         user = PROVIDERS.identity_api.create_user(user)
 1016 
 1017         updated_user = {'enabled': False}
 1018         updated_user_ref = PROVIDERS.identity_api.update_user(
 1019             user['id'], updated_user)
 1020 
 1021         # SQL backend adds 'extra' field
 1022         updated_user_ref.pop('extra', None)
 1023 
 1024         self.assertIs(False, updated_user_ref['enabled'])
 1025 
 1026         user_ref = PROVIDERS.identity_api.get_user(user['id'])
 1027         self.assertDictEqual(updated_user_ref, user_ref)
 1028 
 1029     @unit.skip_if_no_multiple_domains_support
 1030     def test_list_domains_filtered_and_limited(self):
 1031         # The test is designed for multiple domains only
 1032         def create_domains(domain_count, domain_name_prefix):
 1033             for _ in range(domain_count):
 1034                 domain_name = '%s-%s' % (domain_name_prefix, uuid.uuid4().hex)
 1035                 domain = unit.new_domain_ref(name=domain_name)
 1036                 self.domain_list[domain_name] = \
 1037                     PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1038 
 1039         def clean_up_domains():
 1040             for _, domain in self.domain_list.items():
 1041                 domain['enabled'] = False
 1042                 PROVIDERS.resource_api.update_domain(domain['id'], domain)
 1043                 PROVIDERS.resource_api.delete_domain(domain['id'])
 1044 
 1045         self.domain_list = {}
 1046         create_domains(2, 'domaingroup1')
 1047         create_domains(3, 'domaingroup2')
 1048 
 1049         self.addCleanup(clean_up_domains)
 1050         unfiltered_domains = PROVIDERS.resource_api.list_domains()
 1051 
 1052         # Should get back just 4 entities
 1053         self.config_fixture.config(list_limit=4)
 1054         hints = driver_hints.Hints()
 1055         entities = PROVIDERS.resource_api.list_domains(hints=hints)
 1056         self.assertThat(entities, matchers.HasLength(hints.limit['limit']))
 1057         self.assertTrue(hints.limit['truncated'])
 1058 
 1059         # Get one exact item from the list
 1060         hints = driver_hints.Hints()
 1061         hints.add_filter('name', unfiltered_domains[3]['name'])
 1062         entities = PROVIDERS.resource_api.list_domains(hints=hints)
 1063         self.assertThat(entities, matchers.HasLength(1))
 1064         self.assertEqual(entities[0], unfiltered_domains[3])
 1065 
 1066         # Get 2 entries
 1067         hints = driver_hints.Hints()
 1068         hints.add_filter('name', 'domaingroup1', comparator='startswith')
 1069         entities = PROVIDERS.resource_api.list_domains(hints=hints)
 1070         self.assertThat(entities, matchers.HasLength(2))
 1071         self.assertThat(entities[0]['name'],
 1072                         matchers.StartsWith('domaingroup1'))
 1073         self.assertThat(entities[1]['name'],
 1074                         matchers.StartsWith('domaingroup1'))
 1075 
 1076     @unit.skip_if_no_multiple_domains_support
 1077     def test_list_limit_for_domains(self):
 1078         def create_domains(count):
 1079             for _ in range(count):
 1080                 domain = unit.new_domain_ref()
 1081                 self.domain_list.append(
 1082                     PROVIDERS.resource_api.create_domain(domain['id'], domain))
 1083 
 1084         def clean_up_domains():
 1085             for domain in self.domain_list:
 1086                 PROVIDERS.resource_api.update_domain(
 1087                     domain['id'], {'enabled': False})
 1088                 PROVIDERS.resource_api.delete_domain(domain['id'])
 1089 
 1090         self.domain_list = []
 1091         create_domains(6)
 1092         self.addCleanup(clean_up_domains)
 1093 
 1094         for x in range(1, 7):
 1095             self.config_fixture.config(group='resource', list_limit=x)
 1096             hints = driver_hints.Hints()
 1097             entities = PROVIDERS.resource_api.list_domains(hints=hints)
 1098             self.assertThat(entities, matchers.HasLength(hints.limit['limit']))
 1099 
 1100 
 1101 class FilterTests(filtering.FilterTests):
 1102     def test_list_entities_filtered(self):
 1103         for entity in ['user', 'group', 'project']:
 1104             # Create 20 entities
 1105             entity_list = self._create_test_data(entity, 20)
 1106 
 1107             # Try filtering to get one an exact item out of the list
 1108             hints = driver_hints.Hints()
 1109             hints.add_filter('name', entity_list[10]['name'])
 1110             entities = self._list_entities(entity)(hints=hints)
 1111             self.assertEqual(1, len(entities))
 1112             self.assertEqual(entity_list[10]['id'], entities[0]['id'])
 1113             # Check the driver has removed the filter from the list hints
 1114             self.assertFalse(hints.get_exact_filter_by_name('name'))
 1115             self._delete_test_data(entity, entity_list)
 1116 
 1117     def test_list_users_inexact_filtered(self):
 1118         # Create 20 users, some with specific names. We set the names at create
 1119         # time (rather than updating them), since the LDAP driver does not
 1120         # support name updates.
 1121         user_name_data = {
 1122             # user index: name for user
 1123             5: 'The',
 1124             6: 'The Ministry',
 1125             7: 'The Ministry of',
 1126             8: 'The Ministry of Silly',
 1127             9: 'The Ministry of Silly Walks',
 1128             # ...and one for useful case insensitivity testing
 1129             10: 'The ministry of silly walks OF'
 1130         }
 1131         user_list = self._create_test_data(
 1132             'user', 20, domain_id=CONF.identity.default_domain_id,
 1133             name_dict=user_name_data)
 1134 
 1135         hints = driver_hints.Hints()
 1136         hints.add_filter('name', 'ministry', comparator='contains')
 1137         users = PROVIDERS.identity_api.list_users(hints=hints)
 1138         self.assertEqual(5, len(users))
 1139         self._match_with_list(users, user_list,
 1140                               list_start=6, list_end=11)
 1141         # TODO(henry-nash) Check inexact filter has been removed.
 1142 
 1143         hints = driver_hints.Hints()
 1144         hints.add_filter('name', 'The', comparator='startswith')
 1145         users = PROVIDERS.identity_api.list_users(hints=hints)
 1146         self.assertEqual(6, len(users))
 1147         self._match_with_list(users, user_list,
 1148                               list_start=5, list_end=11)
 1149         # TODO(henry-nash) Check inexact filter has been removed.
 1150 
 1151         hints = driver_hints.Hints()
 1152         hints.add_filter('name', 'of', comparator='endswith')
 1153         users = PROVIDERS.identity_api.list_users(hints=hints)
 1154         self.assertEqual(2, len(users))
 1155         # We can't assume we will get back the users in any particular order
 1156         self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']])
 1157         self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']])
 1158         # TODO(henry-nash) Check inexact filter has been removed.
 1159 
 1160         # TODO(henry-nash): Add some case sensitive tests.  However,
 1161         # these would be hard to validate currently, since:
 1162         #
 1163         # For SQL, the issue is that MySQL 0.7, by default, is installed in
 1164         # case insensitive mode (which is what is run by default for our
 1165         # SQL backend tests).  For production deployments. OpenStack
 1166         # assumes a case sensitive database.  For these tests, therefore, we
 1167         # need to be able to check the sensitivity of the database so as to
 1168         # know whether to run case sensitive tests here.
 1169         #
 1170         # For LDAP/AD, although dependent on the schema being used, attributes
 1171         # are typically configured to be case aware, but not case sensitive.
 1172 
 1173         self._delete_test_data('user', user_list)
 1174 
 1175     def _groups_for_user_data(self):
 1176         number_of_groups = 10
 1177         group_name_data = {
 1178             # entity index: name for entity
 1179             5: 'The',
 1180             6: 'The Ministry',
 1181             9: 'The Ministry of Silly Walks',
 1182         }
 1183         group_list = self._create_test_data(
 1184             'group', number_of_groups,
 1185             domain_id=CONF.identity.default_domain_id,
 1186             name_dict=group_name_data)
 1187         user_list = self._create_test_data('user', 2)
 1188 
 1189         for group in range(7):
 1190             # Create membership, including with two out of the three groups
 1191             # with well know names
 1192             PROVIDERS.identity_api.add_user_to_group(
 1193                 user_list[0]['id'], group_list[group]['id']
 1194             )
 1195         # ...and some spoiler memberships
 1196         for group in range(7, number_of_groups):
 1197             PROVIDERS.identity_api.add_user_to_group(
 1198                 user_list[1]['id'], group_list[group]['id']
 1199             )
 1200 
 1201         return group_list, user_list
 1202 
 1203     def test_groups_for_user_inexact_filtered(self):
 1204         """Test use of filtering doesn't break groups_for_user listing.
 1205 
 1206         Some backends may use filtering to achieve the list of groups for a
 1207         user, so test that it can combine a second filter.
 1208 
 1209         Test Plan:
 1210 
 1211         - Create 10 groups, some with names we can filter on
 1212         - Create 2 users
 1213         - Assign 1 of those users to most of the groups, including some of the
 1214           well known named ones
 1215         - Assign the other user to other groups as spoilers
 1216         - Ensure that when we list groups for users with a filter on the group
 1217           name, both restrictions have been enforced on what is returned.
 1218 
 1219         """
 1220         group_list, user_list = self._groups_for_user_data()
 1221 
 1222         hints = driver_hints.Hints()
 1223         hints.add_filter('name', 'Ministry', comparator='contains')
 1224         groups = PROVIDERS.identity_api.list_groups_for_user(
 1225             user_list[0]['id'], hints=hints)
 1226         # We should only get back one group, since of the two that contain
 1227         # 'Ministry' the user only belongs to one.
 1228         self.assertThat(len(groups), matchers.Equals(1))
 1229         self.assertEqual(group_list[6]['id'], groups[0]['id'])
 1230 
 1231         hints = driver_hints.Hints()
 1232         hints.add_filter('name', 'The', comparator='startswith')
 1233         groups = PROVIDERS.identity_api.list_groups_for_user(
 1234             user_list[0]['id'], hints=hints)
 1235         # We should only get back 2 out of the 3 groups that start with 'The'
 1236         # hence showing that both "filters" have been applied
 1237         self.assertThat(len(groups), matchers.Equals(2))
 1238         self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']])
 1239         self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']])
 1240 
 1241         hints.add_filter('name', 'The', comparator='endswith')
 1242         groups = PROVIDERS.identity_api.list_groups_for_user(
 1243             user_list[0]['id'], hints=hints)
 1244         # We should only get back one group since it is the only one that
 1245         # ends with 'The'
 1246         self.assertThat(len(groups), matchers.Equals(1))
 1247         self.assertEqual(group_list[5]['id'], groups[0]['id'])
 1248 
 1249         self._delete_test_data('user', user_list)
 1250         self._delete_test_data('group', group_list)
 1251 
 1252     def test_groups_for_user_exact_filtered(self):
 1253         """Test exact filters doesn't break groups_for_user listing."""
 1254         group_list, user_list = self._groups_for_user_data()
 1255         hints = driver_hints.Hints()
 1256         hints.add_filter('name', 'The Ministry', comparator='equals')
 1257         groups = PROVIDERS.identity_api.list_groups_for_user(
 1258             user_list[0]['id'], hints=hints)
 1259         # We should only get back 1 out of the 3 groups with name 'The
 1260         # Ministry' hence showing that both "filters" have been applied.
 1261         self.assertEqual(1, len(groups))
 1262         self.assertEqual(group_list[6]['id'], groups[0]['id'])
 1263         self._delete_test_data('user', user_list)
 1264         self._delete_test_data('group', group_list)
 1265 
 1266     def _get_user_name_field_size(self):
 1267         """Return the size of the user name field for the backend.
 1268 
 1269         Subclasses can override this method to indicate that the user name
 1270         field is limited in length. The user name is the field used in the test
 1271         that validates that a filter value works even if it's longer than a
 1272         field.
 1273 
 1274         If the backend doesn't limit the value length then return None.
 1275 
 1276         """
 1277         return None
 1278 
 1279     def test_filter_value_wider_than_field(self):
 1280         # If a filter value is given that's larger than the field in the
 1281         # backend then no values are returned.
 1282 
 1283         user_name_field_size = self._get_user_name_field_size()
 1284 
 1285         if user_name_field_size is None:
 1286             # The backend doesn't limit the size of the user name, so pass this
 1287             # test.
 1288             return
 1289 
 1290         # Create some users just to make sure would return something if the
 1291         # filter was ignored.
 1292         self._create_test_data('user', 2)
 1293 
 1294         hints = driver_hints.Hints()
 1295         value = 'A' * (user_name_field_size + 1)
 1296         hints.add_filter('name', value)
 1297         users = PROVIDERS.identity_api.list_users(hints=hints)
 1298         self.assertEqual([], users)
 1299 
 1300     def _list_users_in_group_data(self):
 1301         number_of_users = 10
 1302         user_name_data = {
 1303             1: 'Arthur Conan Doyle',
 1304             3: 'Arthur Rimbaud',
 1305             9: 'Arthur Schopenhauer',
 1306         }
 1307         user_list = self._create_test_data(
 1308             'user', number_of_users,
 1309             domain_id=CONF.identity.default_domain_id,
 1310             name_dict=user_name_data)
 1311         group = self._create_one_entity(
 1312             'group', CONF.identity.default_domain_id, 'Great Writers')
 1313         for i in range(7):
 1314             PROVIDERS.identity_api.add_user_to_group(
 1315                 user_list[i]['id'], group['id']
 1316             )
 1317 
 1318         return user_list, group
 1319 
 1320     def test_list_users_in_group_inexact_filtered(self):
 1321         user_list, group = self._list_users_in_group_data()
 1322 
 1323         hints = driver_hints.Hints()
 1324         hints.add_filter('name', 'Arthur', comparator='contains')
 1325         users = PROVIDERS.identity_api.list_users_in_group(
 1326             group['id'], hints=hints
 1327         )
 1328         self.assertThat(len(users), matchers.Equals(2))
 1329         self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
 1330         self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
 1331 
 1332         hints = driver_hints.Hints()
 1333         hints.add_filter('name', 'Arthur', comparator='startswith')
 1334         users = PROVIDERS.identity_api.list_users_in_group(
 1335             group['id'], hints=hints
 1336         )
 1337         self.assertThat(len(users), matchers.Equals(2))
 1338         self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
 1339         self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
 1340 
 1341         hints = driver_hints.Hints()
 1342         hints.add_filter('name', 'Doyle', comparator='endswith')
 1343         users = PROVIDERS.identity_api.list_users_in_group(
 1344             group['id'], hints=hints
 1345         )
 1346         self.assertThat(len(users), matchers.Equals(1))
 1347         self.assertEqual(user_list[1]['id'], users[0]['id'])
 1348 
 1349         self._delete_test_data('user', user_list)
 1350         self._delete_entity('group')(group['id'])
 1351 
 1352     def test_list_users_in_group_exact_filtered(self):
 1353         hints = driver_hints.Hints()
 1354         user_list, group = self._list_users_in_group_data()
 1355         hints.add_filter('name', 'Arthur Rimbaud', comparator='equals')
 1356         users = PROVIDERS.identity_api.list_users_in_group(
 1357             group['id'], hints=hints
 1358         )
 1359         self.assertEqual(1, len(users))
 1360         self.assertEqual(user_list[3]['id'], users[0]['id'])
 1361         self._delete_test_data('user', user_list)
 1362         self._delete_entity('group')(group['id'])
 1363 
 1364 
 1365 class LimitTests(filtering.FilterTests):
 1366     ENTITIES = ['user', 'group', 'project']
 1367 
 1368     def setUp(self):
 1369         """Setup for Limit Test Cases."""
 1370         self.entity_lists = {}
 1371 
 1372         for entity in self.ENTITIES:
 1373             # Create 20 entities
 1374             self.entity_lists[entity] = self._create_test_data(entity, 20)
 1375         self.addCleanup(self.clean_up_entities)
 1376 
 1377     def clean_up_entities(self):
 1378         """Clean up entity test data from Limit Test Cases."""
 1379         for entity in self.ENTITIES:
 1380             self._delete_test_data(entity, self.entity_lists[entity])
 1381         del self.entity_lists
 1382 
 1383     def _test_list_entity_filtered_and_limited(self, entity):
 1384         self.config_fixture.config(list_limit=10)
 1385         # Should get back just 10 entities
 1386         hints = driver_hints.Hints()
 1387         entities = self._list_entities(entity)(hints=hints)
 1388         self.assertEqual(hints.limit['limit'], len(entities))
 1389         self.assertTrue(hints.limit['truncated'])
 1390 
 1391         # Override with driver specific limit
 1392         if entity == 'project':
 1393             self.config_fixture.config(group='resource', list_limit=5)
 1394         else:
 1395             self.config_fixture.config(group='identity', list_limit=5)
 1396 
 1397         # Should get back just 5 users
 1398         hints = driver_hints.Hints()
 1399         entities = self._list_entities(entity)(hints=hints)
 1400         self.assertEqual(hints.limit['limit'], len(entities))
 1401 
 1402         # Finally, let's pretend we want to get the full list of entities,
 1403         # even with the limits set, as part of some internal calculation.
 1404         # Calling the API without a hints list should achieve this, and
 1405         # return at least the 20 entries we created (there may be other
 1406         # entities lying around created by other tests/setup).
 1407         entities = self._list_entities(entity)()
 1408         self.assertGreaterEqual(len(entities), 20)
 1409         self._match_with_list(self.entity_lists[entity], entities)
 1410 
 1411     def test_list_users_filtered_and_limited(self):
 1412         self._test_list_entity_filtered_and_limited('user')
 1413 
 1414     def test_list_groups_filtered_and_limited(self):
 1415         self._test_list_entity_filtered_and_limited('group')
 1416 
 1417     def test_list_projects_filtered_and_limited(self):
 1418         self._test_list_entity_filtered_and_limited('project')