"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/test_v3_identity.py" (13 May 2020, 56024 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_v3_identity.py": 16.0.1_vs_17.0.0.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import datetime
   16 from unittest import mock
   17 import uuid
   18 
   19 import fixtures
   20 import freezegun
   21 import http.client
   22 from oslo_db import exception as oslo_db_exception
   23 from oslo_log import log
   24 from testtools import matchers
   25 
   26 from keystone.common import provider_api
   27 from keystone.common import sql
   28 import keystone.conf
   29 from keystone.credential.providers import fernet as credential_fernet
   30 from keystone import exception
   31 from keystone.identity.backends import base as identity_base
   32 from keystone.identity.backends import resource_options as options
   33 from keystone.identity.backends import sql_model as model
   34 from keystone.tests import unit
   35 from keystone.tests.unit import ksfixtures
   36 from keystone.tests.unit.ksfixtures import database
   37 from keystone.tests.unit import mapping_fixtures
   38 from keystone.tests.unit import test_v3
   39 
   40 
   41 CONF = keystone.conf.CONF
   42 PROVIDERS = provider_api.ProviderAPIs
   43 
   44 
   45 class IdentityTestCase(test_v3.RestfulTestCase):
   46     """Test users and groups."""
   47 
   48     def setUp(self):
   49         super(IdentityTestCase, self).setUp()
   50         self.useFixture(
   51             ksfixtures.KeyRepository(
   52                 self.config_fixture,
   53                 'credential',
   54                 credential_fernet.MAX_ACTIVE_KEYS
   55             )
   56         )
   57 
   58         self.group = unit.new_group_ref(domain_id=self.domain_id)
   59         self.group = PROVIDERS.identity_api.create_group(self.group)
   60         self.group_id = self.group['id']
   61 
   62         self.credential = unit.new_credential_ref(
   63             user_id=self.user['id'],
   64             project_id=self.project_id)
   65 
   66         PROVIDERS.credential_api.create_credential(
   67             self.credential['id'], self.credential
   68         )
   69 
   70     # user crud tests
   71 
   72     def test_create_user(self):
   73         """Call ``POST /users``."""
   74         ref = unit.new_user_ref(domain_id=self.domain_id)
   75         r = self.post(
   76             '/users',
   77             body={'user': ref})
   78         return self.assertValidUserResponse(r, ref)
   79 
   80     def test_create_user_without_domain(self):
   81         """Call ``POST /users`` without specifying domain.
   82 
   83         According to the identity-api specification, if you do not
   84         explicitly specific the domain_id in the entity, it should
   85         take the domain scope of the token as the domain_id.
   86 
   87         """
   88         # Create a user with a role on the domain so we can get a
   89         # domain scoped token
   90         domain = unit.new_domain_ref()
   91         PROVIDERS.resource_api.create_domain(domain['id'], domain)
   92         user = unit.create_user(PROVIDERS.identity_api, domain_id=domain['id'])
   93         PROVIDERS.assignment_api.create_grant(
   94             role_id=self.role_id, user_id=user['id'],
   95             domain_id=domain['id'])
   96 
   97         ref = unit.new_user_ref(domain_id=domain['id'])
   98         ref_nd = ref.copy()
   99         ref_nd.pop('domain_id')
  100         auth = self.build_authentication_request(
  101             user_id=user['id'],
  102             password=user['password'],
  103             domain_id=domain['id'])
  104         r = self.post('/users', body={'user': ref_nd}, auth=auth)
  105         self.assertValidUserResponse(r, ref)
  106 
  107         # Now try the same thing without a domain token - which should fail
  108         ref = unit.new_user_ref(domain_id=domain['id'])
  109         ref_nd = ref.copy()
  110         ref_nd.pop('domain_id')
  111         auth = self.build_authentication_request(
  112             user_id=self.user['id'],
  113             password=self.user['password'],
  114             project_id=self.project['id'])
  115 
  116         # TODO(henry-nash): Due to bug #1283539 we currently automatically
  117         # use the default domain_id if a domain scoped token is not being
  118         # used. For now we just check that a deprecation warning has been
  119         # issued. Change the code below to expect a failure once this bug is
  120         # fixed.
  121         with mock.patch(
  122                 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
  123             r = self.post('/users', body={'user': ref_nd}, auth=auth)
  124             self.assertTrue(mock_dep.called)
  125 
  126         ref['domain_id'] = CONF.identity.default_domain_id
  127         return self.assertValidUserResponse(r, ref)
  128 
  129     def test_create_user_with_admin_token_and_domain(self):
  130         """Call ``POST /users`` with admin token and domain id."""
  131         ref = unit.new_user_ref(domain_id=self.domain_id)
  132         self.post('/users', body={'user': ref}, token=self.get_admin_token(),
  133                   expected_status=http.client.CREATED)
  134 
  135     def test_user_management_normalized_keys(self):
  136         """Illustrate the inconsistent handling of hyphens in keys.
  137 
  138         To quote Morgan in bug 1526244:
  139 
  140             the reason this is converted from "domain-id" to "domain_id" is
  141             because of how we process/normalize data. The way we have to handle
  142             specific data types for known columns requires avoiding "-" in the
  143             actual python code since "-" is not valid for attributes in python
  144             w/o significant use of "getattr" etc.
  145 
  146             In short, historically we handle some things in conversions. The
  147             use of "extras" has long been a poor design choice that leads to
  148             odd/strange inconsistent behaviors because of other choices made in
  149             handling data from within the body. (In many cases we convert from
  150             "-" to "_" throughout openstack)
  151 
  152         Source: https://bugs.launchpad.net/keystone/+bug/1526244/comments/9
  153 
  154         """
  155         # Create two domains to work with.
  156         domain1 = unit.new_domain_ref()
  157         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
  158         domain2 = unit.new_domain_ref()
  159         PROVIDERS.resource_api.create_domain(domain2['id'], domain2)
  160 
  161         # We can successfully create a normal user without any surprises.
  162         user = unit.new_user_ref(domain_id=domain1['id'])
  163         r = self.post(
  164             '/users',
  165             body={'user': user})
  166         self.assertValidUserResponse(r, user)
  167         user['id'] = r.json['user']['id']
  168 
  169         # Query strings are not normalized: so we get all users back (like
  170         # self.user), not just the ones in the specified domain.
  171         r = self.get(
  172             '/users?domain-id=%s' % domain1['id'])
  173         self.assertValidUserListResponse(r, ref=self.user)
  174         self.assertNotEqual(domain1['id'], self.user['domain_id'])
  175 
  176         # When creating a new user, if we move the 'domain_id' into the
  177         # 'domain-id' attribute, the server will normalize the request
  178         # attribute, and effectively "move it back" for us.
  179         user = unit.new_user_ref(domain_id=domain1['id'])
  180         user['domain-id'] = user.pop('domain_id')
  181         r = self.post(
  182             '/users',
  183             body={'user': user})
  184         self.assertNotIn('domain-id', r.json['user'])
  185         self.assertEqual(domain1['id'], r.json['user']['domain_id'])
  186         # (move this attribute back so we can use assertValidUserResponse)
  187         user['domain_id'] = user.pop('domain-id')
  188         self.assertValidUserResponse(r, user)
  189         user['id'] = r.json['user']['id']
  190 
  191         # If we try updating the user's 'domain_id' by specifying a
  192         # 'domain-id', then it'll be stored into extras rather than normalized,
  193         # and the user's actual 'domain_id' is not affected.
  194         r = self.patch(
  195             '/users/%s' % user['id'],
  196             body={'user': {'domain-id': domain2['id']}})
  197         self.assertEqual(domain2['id'], r.json['user']['domain-id'])
  198         self.assertEqual(user['domain_id'], r.json['user']['domain_id'])
  199         self.assertNotEqual(domain2['id'], user['domain_id'])
  200         self.assertValidUserResponse(r, user)
  201 
  202     def test_create_user_bad_request(self):
  203         """Call ``POST /users``."""
  204         self.post('/users', body={'user': {}},
  205                   expected_status=http.client.BAD_REQUEST)
  206 
  207     def test_create_user_bad_domain_id(self):
  208         """Call ``POST /users``."""
  209         # create user with 'DEFaUlT' domain_id instead if 'default'
  210         # and verify it fails
  211         self.post('/users',
  212                   body={'user': {"name": "baddomain", "domain_id":
  213                         "DEFaUlT"}},
  214                   expected_status=http.client.NOT_FOUND)
  215 
  216     def test_list_head_users(self):
  217         """Call ``GET & HEAD /users``."""
  218         resource_url = '/users'
  219         r = self.get(resource_url)
  220         self.assertValidUserListResponse(r, ref=self.user,
  221                                          resource_url=resource_url)
  222         self.head(resource_url, expected_status=http.client.OK)
  223 
  224     def test_list_users_with_multiple_backends(self):
  225         """Call ``GET /users`` when multiple backends is enabled.
  226 
  227         In this scenario, the controller requires a domain to be specified
  228         either as a filter or by using a domain scoped token.
  229 
  230         """
  231         self.config_fixture.config(group='identity',
  232                                    domain_specific_drivers_enabled=True)
  233 
  234         # Create a new domain with a new project and user
  235         domain = unit.new_domain_ref()
  236         PROVIDERS.resource_api.create_domain(domain['id'], domain)
  237 
  238         project = unit.new_project_ref(domain_id=domain['id'])
  239         PROVIDERS.resource_api.create_project(project['id'], project)
  240 
  241         user = unit.create_user(PROVIDERS.identity_api, domain_id=domain['id'])
  242 
  243         # Create both project and domain role grants for the user so we
  244         # can get both project and domain scoped tokens
  245         PROVIDERS.assignment_api.create_grant(
  246             role_id=self.role_id, user_id=user['id'],
  247             domain_id=domain['id'])
  248         PROVIDERS.assignment_api.create_grant(
  249             role_id=self.role_id, user_id=user['id'],
  250             project_id=project['id'])
  251 
  252         dom_auth = self.build_authentication_request(
  253             user_id=user['id'],
  254             password=user['password'],
  255             domain_id=domain['id'])
  256         project_auth = self.build_authentication_request(
  257             user_id=user['id'],
  258             password=user['password'],
  259             project_id=project['id'])
  260 
  261         # First try using a domain scoped token
  262         resource_url = '/users'
  263         r = self.get(resource_url, auth=dom_auth)
  264         self.assertValidUserListResponse(r, ref=user,
  265                                          resource_url=resource_url)
  266 
  267         # Now try using a project scoped token
  268         resource_url = '/users'
  269         r = self.get(resource_url, auth=project_auth)
  270         self.assertValidUserListResponse(r, ref=user,
  271                                          resource_url=resource_url)
  272 
  273         # Now try with an explicit filter
  274         resource_url = ('/users?domain_id=%(domain_id)s' %
  275                         {'domain_id': domain['id']})
  276         r = self.get(resource_url)
  277         self.assertValidUserListResponse(r, ref=user,
  278                                          resource_url=resource_url)
  279 
  280     def test_list_users_no_default_project(self):
  281         """Call ``GET /users`` making sure no default_project_id."""
  282         user = unit.new_user_ref(self.domain_id)
  283         user = PROVIDERS.identity_api.create_user(user)
  284         resource_url = '/users'
  285         r = self.get(resource_url)
  286         self.assertValidUserListResponse(r, ref=user,
  287                                          resource_url=resource_url)
  288 
  289     def test_get_head_user(self):
  290         """Call ``GET & HEAD /users/{user_id}``."""
  291         resource_url = '/users/%(user_id)s' % {
  292             'user_id': self.user['id']}
  293         r = self.get(resource_url)
  294         self.assertValidUserResponse(r, self.user)
  295         self.head(resource_url, expected_status=http.client.OK)
  296 
  297     def test_get_user_does_not_include_extra_attributes(self):
  298         """Call ``GET /users/{user_id}`` extra attributes are not included."""
  299         user = unit.new_user_ref(domain_id=self.domain_id,
  300                                  project_id=self.project_id)
  301         user = PROVIDERS.identity_api.create_user(user)
  302         self.assertNotIn('created_at', user)
  303         self.assertNotIn('last_active_at', user)
  304 
  305     def test_get_user_includes_required_attributes(self):
  306         """Call ``GET /users/{user_id}`` required attributes are included."""
  307         user = unit.new_user_ref(domain_id=self.domain_id,
  308                                  project_id=self.project_id)
  309         user = PROVIDERS.identity_api.create_user(user)
  310         self.assertIn('id', user)
  311         self.assertIn('name', user)
  312         self.assertIn('enabled', user)
  313         self.assertIn('password_expires_at', user)
  314         r = self.get('/users/%(user_id)s' % {'user_id': user['id']})
  315         self.assertValidUserResponse(r, user)
  316 
  317     def test_get_user_with_default_project(self):
  318         """Call ``GET /users/{user_id}`` making sure of default_project_id."""
  319         user = unit.new_user_ref(domain_id=self.domain_id,
  320                                  project_id=self.project_id)
  321         user = PROVIDERS.identity_api.create_user(user)
  322         r = self.get('/users/%(user_id)s' % {'user_id': user['id']})
  323         self.assertValidUserResponse(r, user)
  324 
  325     def test_add_user_to_group(self):
  326         """Call ``PUT /groups/{group_id}/users/{user_id}``."""
  327         self.put('/groups/%(group_id)s/users/%(user_id)s' % {
  328             'group_id': self.group_id, 'user_id': self.user['id']})
  329 
  330     def test_list_head_groups_for_user(self):
  331         """Call ``GET & HEAD /users/{user_id}/groups``."""
  332         user1 = unit.create_user(PROVIDERS.identity_api,
  333                                  domain_id=self.domain['id'])
  334         user2 = unit.create_user(PROVIDERS.identity_api,
  335                                  domain_id=self.domain['id'])
  336 
  337         self.put('/groups/%(group_id)s/users/%(user_id)s' % {
  338             'group_id': self.group_id, 'user_id': user1['id']})
  339 
  340         # Scenarios below are written to test the default policy configuration
  341 
  342         # One should be allowed to list one's own groups
  343         auth = self.build_authentication_request(
  344             user_id=user1['id'],
  345             password=user1['password'])
  346         resource_url = ('/users/%(user_id)s/groups' %
  347                         {'user_id': user1['id']})
  348         r = self.get(resource_url, auth=auth)
  349         self.assertValidGroupListResponse(r, ref=self.group,
  350                                           resource_url=resource_url)
  351         self.head(resource_url, auth=auth, expected_status=http.client.OK)
  352 
  353         # Administrator is allowed to list others' groups
  354         resource_url = ('/users/%(user_id)s/groups' %
  355                         {'user_id': user1['id']})
  356         r = self.get(resource_url)
  357         self.assertValidGroupListResponse(r, ref=self.group,
  358                                           resource_url=resource_url)
  359         self.head(resource_url, expected_status=http.client.OK)
  360 
  361         # Ordinary users should not be allowed to list other's groups
  362         auth = self.build_authentication_request(
  363             user_id=user2['id'],
  364             password=user2['password'])
  365         resource_url = '/users/%(user_id)s/groups' % {
  366             'user_id': user1['id']}
  367         self.get(resource_url, auth=auth,
  368                  expected_status=exception.ForbiddenAction.code)
  369         self.head(resource_url, auth=auth,
  370                   expected_status=exception.ForbiddenAction.code)
  371 
  372     def test_check_user_in_group(self):
  373         """Call ``HEAD /groups/{group_id}/users/{user_id}``."""
  374         self.put('/groups/%(group_id)s/users/%(user_id)s' % {
  375             'group_id': self.group_id, 'user_id': self.user['id']})
  376         self.head('/groups/%(group_id)s/users/%(user_id)s' % {
  377             'group_id': self.group_id, 'user_id': self.user['id']})
  378 
  379     def test_list_head_users_in_group(self):
  380         """Call ``GET & HEAD /groups/{group_id}/users``."""
  381         self.put('/groups/%(group_id)s/users/%(user_id)s' % {
  382             'group_id': self.group_id, 'user_id': self.user['id']})
  383         resource_url = ('/groups/%(group_id)s/users' %
  384                         {'group_id': self.group_id})
  385         r = self.get(resource_url)
  386         self.assertValidUserListResponse(r, ref=self.user,
  387                                          resource_url=resource_url)
  388         self.assertIn('/groups/%(group_id)s/users' % {
  389             'group_id': self.group_id}, r.result['links']['self'])
  390         self.head(resource_url, expected_status=http.client.OK)
  391 
  392     def test_remove_user_from_group(self):
  393         """Call ``DELETE /groups/{group_id}/users/{user_id}``."""
  394         self.put('/groups/%(group_id)s/users/%(user_id)s' % {
  395             'group_id': self.group_id, 'user_id': self.user['id']})
  396         self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
  397             'group_id': self.group_id, 'user_id': self.user['id']})
  398 
  399     def test_update_ephemeral_user(self):
  400         federated_user_a = model.FederatedUser()
  401         federated_user_b = model.FederatedUser()
  402         federated_user_a.idp_id = 'a_idp'
  403         federated_user_b.idp_id = 'b_idp'
  404         federated_user_a.display_name = 'federated_a'
  405         federated_user_b.display_name = 'federated_b'
  406         federated_users = [federated_user_a, federated_user_b]
  407 
  408         user_a = model.User()
  409         user_a.federated_users = federated_users
  410 
  411         self.assertEqual(federated_user_a.display_name, user_a.name)
  412         self.assertIsNone(user_a.password)
  413 
  414         user_a.name = 'new_federated_a'
  415 
  416         self.assertEqual('new_federated_a', user_a.name)
  417         self.assertIsNone(user_a.local_user)
  418 
  419     def test_update_user(self):
  420         """Call ``PATCH /users/{user_id}``."""
  421         user = unit.new_user_ref(domain_id=self.domain_id)
  422         del user['id']
  423         r = self.patch('/users/%(user_id)s' % {
  424             'user_id': self.user['id']},
  425             body={'user': user})
  426         self.assertValidUserResponse(r, user)
  427 
  428     def test_admin_password_reset(self):
  429         # bootstrap a user as admin
  430         user_ref = unit.create_user(PROVIDERS.identity_api,
  431                                     domain_id=self.domain['id'])
  432 
  433         # auth as user should work before a password change
  434         old_password_auth = self.build_authentication_request(
  435             user_id=user_ref['id'],
  436             password=user_ref['password'])
  437         r = self.v3_create_token(old_password_auth)
  438         old_token = r.headers.get('X-Subject-Token')
  439 
  440         # auth as user with a token should work before a password change
  441         old_token_auth = self.build_authentication_request(token=old_token)
  442         self.v3_create_token(old_token_auth)
  443 
  444         # administrative password reset
  445         new_password = uuid.uuid4().hex
  446         self.patch('/users/%s' % user_ref['id'],
  447                    body={'user': {'password': new_password}})
  448 
  449         # auth as user with original password should not work after change
  450         self.v3_create_token(old_password_auth,
  451                              expected_status=http.client.UNAUTHORIZED)
  452 
  453         # auth as user with an old token should not work after change
  454         self.v3_create_token(old_token_auth,
  455                              expected_status=http.client.NOT_FOUND)
  456 
  457         # new password should work
  458         new_password_auth = self.build_authentication_request(
  459             user_id=user_ref['id'],
  460             password=new_password)
  461         self.v3_create_token(new_password_auth)
  462 
  463     def test_admin_password_reset_with_min_password_age_enabled(self):
  464         # enable minimum_password_age, this should have no effect on admin
  465         # password reset
  466         self.config_fixture.config(group='security_compliance',
  467                                    minimum_password_age=1)
  468         # create user
  469         user_ref = unit.create_user(PROVIDERS.identity_api,
  470                                     domain_id=self.domain['id'])
  471         # administrative password reset
  472         new_password = uuid.uuid4().hex
  473         r = self.patch('/users/%s' % user_ref['id'],
  474                        body={'user': {'password': new_password}})
  475         self.assertValidUserResponse(r, user_ref)
  476         # authenticate with new password
  477         new_password_auth = self.build_authentication_request(
  478             user_id=user_ref['id'],
  479             password=new_password)
  480         self.v3_create_token(new_password_auth)
  481 
  482     def test_admin_password_reset_with_password_lock(self):
  483         # create user
  484         user_ref = unit.create_user(PROVIDERS.identity_api,
  485                                     domain_id=self.domain['id'])
  486         lock_pw_opt = options.LOCK_PASSWORD_OPT.option_name
  487         update_user_body = {'user': {'options': {lock_pw_opt: True}}}
  488         self.patch('/users/%s' % user_ref['id'], body=update_user_body)
  489 
  490         # administrative password reset
  491         new_password = uuid.uuid4().hex
  492         r = self.patch('/users/%s' % user_ref['id'],
  493                        body={'user': {'password': new_password}})
  494         self.assertValidUserResponse(r, user_ref)
  495         # authenticate with new password
  496         new_password_auth = self.build_authentication_request(
  497             user_id=user_ref['id'],
  498             password=new_password)
  499         self.v3_create_token(new_password_auth)
  500 
  501     def test_update_user_domain_id(self):
  502         """Call ``PATCH /users/{user_id}`` with domain_id.
  503 
  504         A user's `domain_id` is immutable. Ensure that any attempts to update
  505         the `domain_id` of a user fails.
  506         """
  507         user = unit.new_user_ref(domain_id=self.domain['id'])
  508         user = PROVIDERS.identity_api.create_user(user)
  509         user['domain_id'] = CONF.identity.default_domain_id
  510         self.patch('/users/%(user_id)s' % {
  511             'user_id': user['id']},
  512             body={'user': user},
  513             expected_status=exception.ValidationError.code)
  514 
  515     def test_delete_user(self):
  516         """Call ``DELETE /users/{user_id}``.
  517 
  518         As well as making sure the delete succeeds, we ensure
  519         that any credentials that reference this user are
  520         also deleted, while other credentials are unaffected.
  521         In addition, no tokens should remain valid for this user.
  522 
  523         """
  524         # First check the credential for this user is present
  525         r = PROVIDERS.credential_api.get_credential(self.credential['id'])
  526         self.assertDictEqual(self.credential, r)
  527         # Create a second credential with a different user
  528 
  529         user2 = unit.new_user_ref(domain_id=self.domain['id'],
  530                                   project_id=self.project['id'])
  531         user2 = PROVIDERS.identity_api.create_user(user2)
  532         credential2 = unit.new_credential_ref(user_id=user2['id'],
  533                                               project_id=self.project['id'])
  534         PROVIDERS.credential_api.create_credential(
  535             credential2['id'], credential2
  536         )
  537 
  538         # Create a token for this user which we can check later
  539         # gets deleted
  540         auth_data = self.build_authentication_request(
  541             user_id=self.user['id'],
  542             password=self.user['password'],
  543             project_id=self.project['id'])
  544         token = self.get_requested_token(auth_data)
  545         # Confirm token is valid for now
  546         self.head('/auth/tokens',
  547                   headers={'X-Subject-Token': token},
  548                   expected_status=http.client.OK)
  549 
  550         # Now delete the user
  551         self.delete('/users/%(user_id)s' % {
  552             'user_id': self.user['id']})
  553 
  554         # Deleting the user should have deleted any credentials
  555         # that reference this project
  556         self.assertRaises(exception.CredentialNotFound,
  557                           PROVIDERS.credential_api.get_credential,
  558                           self.credential['id'])
  559         # But the credential for user2 is unaffected
  560         r = PROVIDERS.credential_api.get_credential(credential2['id'])
  561         self.assertDictEqual(credential2, r)
  562 
  563     def test_delete_user_retries_on_deadlock(self):
  564         patcher = mock.patch('sqlalchemy.orm.query.Query.delete',
  565                              autospec=True)
  566 
  567         class FakeDeadlock(object):
  568             def __init__(self, mock_patcher):
  569                 self.deadlock_count = 2
  570                 self.mock_patcher = mock_patcher
  571                 self.patched = True
  572 
  573             def __call__(self, *args, **kwargs):
  574                 if self.deadlock_count > 1:
  575                     self.deadlock_count -= 1
  576                 else:
  577                     self.mock_patcher.stop()
  578                     self.patched = False
  579                 raise oslo_db_exception.DBDeadlock
  580 
  581         sql_delete_mock = patcher.start()
  582         side_effect = FakeDeadlock(patcher)
  583         sql_delete_mock.side_effect = side_effect
  584 
  585         user_ref = unit.create_user(PROVIDERS.identity_api,
  586                                     domain_id=self.domain['id'])
  587 
  588         try:
  589             PROVIDERS.identity_api.delete_user(user_id=user_ref['id'])
  590         finally:
  591             if side_effect.patched:
  592                 patcher.stop()
  593 
  594         call_count = sql_delete_mock.call_count
  595 
  596         # initial attempt + 1 retry
  597         delete_user_attempt_count = 2
  598         self.assertEqual(call_count, delete_user_attempt_count)
  599 
  600     # group crud tests
  601 
  602     def test_create_group(self):
  603         """Call ``POST /groups``."""
  604         # Create a new group to avoid a duplicate check failure
  605         ref = unit.new_group_ref(domain_id=self.domain_id)
  606         r = self.post(
  607             '/groups',
  608             body={'group': ref})
  609         return self.assertValidGroupResponse(r, ref)
  610 
  611     def test_create_group_bad_request(self):
  612         """Call ``POST /groups``."""
  613         self.post('/groups', body={'group': {}},
  614                   expected_status=http.client.BAD_REQUEST)
  615 
  616     def test_list_head_groups(self):
  617         """Call ``GET & HEAD /groups``."""
  618         resource_url = '/groups'
  619         r = self.get(resource_url)
  620         self.assertValidGroupListResponse(r, ref=self.group,
  621                                           resource_url=resource_url)
  622         self.head(resource_url, expected_status=http.client.OK)
  623 
  624     def test_get_head_group(self):
  625         """Call ``GET & HEAD /groups/{group_id}``."""
  626         resource_url = '/groups/%(group_id)s' % {
  627             'group_id': self.group_id}
  628         r = self.get(resource_url)
  629         self.assertValidGroupResponse(r, self.group)
  630         self.head(resource_url, expected_status=http.client.OK)
  631 
  632     def test_update_group(self):
  633         """Call ``PATCH /groups/{group_id}``."""
  634         group = unit.new_group_ref(domain_id=self.domain_id)
  635         del group['id']
  636         r = self.patch('/groups/%(group_id)s' % {
  637             'group_id': self.group_id},
  638             body={'group': group})
  639         self.assertValidGroupResponse(r, group)
  640 
  641     def test_update_group_domain_id(self):
  642         """Call ``PATCH /groups/{group_id}`` with domain_id.
  643 
  644         A group's `domain_id` is immutable. Ensure that any attempts to update
  645         the `domain_id` of a group fails.
  646         """
  647         self.group['domain_id'] = CONF.identity.default_domain_id
  648         self.patch('/groups/%(group_id)s' % {
  649             'group_id': self.group['id']},
  650             body={'group': self.group},
  651             expected_status=exception.ValidationError.code)
  652 
  653     def test_delete_group(self):
  654         """Call ``DELETE /groups/{group_id}``."""
  655         self.delete('/groups/%(group_id)s' % {
  656             'group_id': self.group_id})
  657 
  658     def test_create_user_password_not_logged(self):
  659         # When a user is created, the password isn't logged at any level.
  660 
  661         log_fix = self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
  662 
  663         ref = unit.new_user_ref(domain_id=self.domain_id)
  664         self.post(
  665             '/users',
  666             body={'user': ref})
  667 
  668         self.assertNotIn(ref['password'], log_fix.output)
  669 
  670     def test_update_password_not_logged(self):
  671         # When admin modifies user password, the password isn't logged at any
  672         # level.
  673 
  674         log_fix = self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
  675 
  676         # bootstrap a user as admin
  677         user_ref = unit.create_user(PROVIDERS.identity_api,
  678                                     domain_id=self.domain['id'])
  679 
  680         self.assertNotIn(user_ref['password'], log_fix.output)
  681 
  682         # administrative password reset
  683         new_password = uuid.uuid4().hex
  684         self.patch('/users/%s' % user_ref['id'],
  685                    body={'user': {'password': new_password}})
  686 
  687         self.assertNotIn(new_password, log_fix.output)
  688 
  689     def test_setting_default_project_id_to_domain_failed(self):
  690         """Call ``POST and PATCH /users`` default_project_id=domain_id.
  691 
  692         Make sure we validate the default_project_id if it is specified.
  693         It cannot be set to a domain_id, even for a project acting as domain
  694         right now. That's because we haven't sort out the issuing
  695         project-scoped token for project acting as domain bit yet. Once we
  696         got that sorted out, we can relax this constraint.
  697 
  698         """
  699         # creating a new user with default_project_id set to a
  700         # domain_id should result in HTTP 400
  701         ref = unit.new_user_ref(domain_id=self.domain_id,
  702                                 project_id=self.domain_id)
  703         self.post('/users', body={'user': ref}, token=CONF.admin_token,
  704                   expected_status=http.client.BAD_REQUEST)
  705 
  706         # updating user's default_project_id to a domain_id should result
  707         # in HTTP 400
  708         user = {'default_project_id': self.domain_id}
  709         self.patch('/users/%(user_id)s' % {
  710             'user_id': self.user['id']},
  711             body={'user': user},
  712             token=CONF.admin_token,
  713             expected_status=http.client.BAD_REQUEST)
  714 
  715 
  716 class ChangePasswordTestCase(test_v3.RestfulTestCase):
  717 
  718     def setUp(self):
  719         super(ChangePasswordTestCase, self).setUp()
  720         self.user_ref = unit.create_user(PROVIDERS.identity_api,
  721                                          domain_id=self.domain['id'])
  722         self.token = self.get_request_token(self.user_ref['password'],
  723                                             http.client.CREATED)
  724 
  725     def get_request_token(self, password, expected_status):
  726         auth_data = self.build_authentication_request(
  727             user_id=self.user_ref['id'],
  728             password=password)
  729         r = self.v3_create_token(auth_data,
  730                                  expected_status=expected_status)
  731         return r.headers.get('X-Subject-Token')
  732 
  733     def change_password(self, expected_status, **kwargs):
  734         """Return a test response for a change password request."""
  735         return self.post('/users/%s/password' % self.user_ref['id'],
  736                          body={'user': kwargs},
  737                          token=self.token,
  738                          expected_status=expected_status)
  739 
  740 
  741 class UserSelfServiceChangingPasswordsTestCase(ChangePasswordTestCase):
  742 
  743     def _create_user_with_expired_password(self):
  744         expire_days = CONF.security_compliance.password_expires_days + 1
  745         time = (
  746             datetime.datetime.utcnow() -
  747             datetime.timedelta(expire_days)
  748         )
  749         password = uuid.uuid4().hex
  750         user_ref = unit.new_user_ref(domain_id=self.domain_id,
  751                                      password=password)
  752         with freezegun.freeze_time(time):
  753             self.user_ref = PROVIDERS.identity_api.create_user(user_ref)
  754 
  755         return password
  756 
  757     def test_changing_password(self):
  758         # original password works
  759         token_id = self.get_request_token(self.user_ref['password'],
  760                                           expected_status=http.client.CREATED)
  761         # original token works
  762         old_token_auth = self.build_authentication_request(token=token_id)
  763         self.v3_create_token(old_token_auth)
  764 
  765         # change password
  766         new_password = uuid.uuid4().hex
  767         self.change_password(password=new_password,
  768                              original_password=self.user_ref['password'],
  769                              expected_status=http.client.NO_CONTENT)
  770 
  771         # old password fails
  772         self.get_request_token(self.user_ref['password'],
  773                                expected_status=http.client.UNAUTHORIZED)
  774 
  775         # old token fails
  776         self.v3_create_token(old_token_auth,
  777                              expected_status=http.client.NOT_FOUND)
  778 
  779         # new password works
  780         self.get_request_token(new_password,
  781                                expected_status=http.client.CREATED)
  782 
  783     def test_changing_password_with_min_password_age(self):
  784         time = datetime.datetime.utcnow()
  785         with freezegun.freeze_time(time) as frozen_datetime:
  786             # enable minimum_password_age and attempt to change password
  787             new_password = uuid.uuid4().hex
  788             self.config_fixture.config(group='security_compliance',
  789                                        minimum_password_age=1)
  790             # able to change password after create user
  791             self.change_password(password=new_password,
  792                                  original_password=self.user_ref['password'],
  793                                  expected_status=http.client.NO_CONTENT)
  794             # 2nd change password should fail due to minimum password age and
  795             # make sure we wait one second to avoid race conditions with Fernet
  796             frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
  797             self.token = self.get_request_token(
  798                 new_password,
  799                 http.client.CREATED
  800             )
  801             self.change_password(password=uuid.uuid4().hex,
  802                                  original_password=new_password,
  803                                  expected_status=http.client.BAD_REQUEST)
  804             # disable minimum_password_age and attempt to change password
  805             self.config_fixture.config(group='security_compliance',
  806                                        minimum_password_age=0)
  807             self.change_password(password=uuid.uuid4().hex,
  808                                  original_password=new_password,
  809                                  expected_status=http.client.NO_CONTENT)
  810 
  811     def test_changing_password_with_password_lock(self):
  812         password = uuid.uuid4().hex
  813         ref = unit.new_user_ref(domain_id=self.domain_id, password=password)
  814         response = self.post('/users', body={'user': ref})
  815         user_id = response.json_body['user']['id']
  816 
  817         time = datetime.datetime.utcnow()
  818         with freezegun.freeze_time(time) as frozen_datetime:
  819             # Lock the user's password
  820             lock_pw_opt = options.LOCK_PASSWORD_OPT.option_name
  821             user_patch = {'user': {'options': {lock_pw_opt: True}}}
  822             self.patch('/users/%s' % user_id, body=user_patch)
  823 
  824             # Fail, password is locked
  825             new_password = uuid.uuid4().hex
  826             body = {
  827                 'user': {
  828                     'original_password': password,
  829                     'password': new_password
  830                 }
  831             }
  832             path = '/users/%s/password' % user_id
  833             self.post(path, body=body, expected_status=http.client.BAD_REQUEST)
  834 
  835             # Unlock the password, and change should work
  836             user_patch['user']['options'][lock_pw_opt] = False
  837             self.patch('/users/%s' % user_id, body=user_patch)
  838 
  839             path = '/users/%s/password' % user_id
  840             self.post(path, body=body, expected_status=http.client.NO_CONTENT)
  841 
  842             frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
  843 
  844             auth_data = self.build_authentication_request(
  845                 user_id=user_id,
  846                 password=new_password
  847             )
  848             self.v3_create_token(
  849                 auth_data, expected_status=http.client.CREATED
  850             )
  851 
  852             path = '/users/%s' % user_id
  853             user = self.get(path).json_body['user']
  854             self.assertIn(lock_pw_opt, user['options'])
  855             self.assertFalse(user['options'][lock_pw_opt])
  856 
  857             # Completely unset the option from the user's reference
  858             user_patch['user']['options'][lock_pw_opt] = None
  859             self.patch('/users/%s' % user_id, body=user_patch)
  860             path = '/users/%s' % user_id
  861             user = self.get(path).json_body['user']
  862             self.assertNotIn(lock_pw_opt, user['options'])
  863 
  864     def test_changing_password_with_missing_original_password_fails(self):
  865         r = self.change_password(password=uuid.uuid4().hex,
  866                                  expected_status=http.client.BAD_REQUEST)
  867         self.assertThat(r.result['error']['message'],
  868                         matchers.Contains('original_password'))
  869 
  870     def test_changing_password_with_missing_password_fails(self):
  871         r = self.change_password(original_password=self.user_ref['password'],
  872                                  expected_status=http.client.BAD_REQUEST)
  873         self.assertThat(r.result['error']['message'],
  874                         matchers.Contains('password'))
  875 
  876     def test_changing_password_with_incorrect_password_fails(self):
  877         self.change_password(password=uuid.uuid4().hex,
  878                              original_password=uuid.uuid4().hex,
  879                              expected_status=http.client.UNAUTHORIZED)
  880 
  881     def test_changing_password_with_disabled_user_fails(self):
  882         # disable the user account
  883         self.user_ref['enabled'] = False
  884         self.patch('/users/%s' % self.user_ref['id'],
  885                    body={'user': self.user_ref})
  886 
  887         self.change_password(password=uuid.uuid4().hex,
  888                              original_password=self.user_ref['password'],
  889                              expected_status=http.client.UNAUTHORIZED)
  890 
  891     def test_changing_password_not_logged(self):
  892         # When a user changes their password, the password isn't logged at any
  893         # level.
  894 
  895         log_fix = self.useFixture(fixtures.FakeLogger(level=log.DEBUG))
  896 
  897         # change password
  898         new_password = uuid.uuid4().hex
  899         self.change_password(password=new_password,
  900                              original_password=self.user_ref['password'],
  901                              expected_status=http.client.NO_CONTENT)
  902 
  903         self.assertNotIn(self.user_ref['password'], log_fix.output)
  904         self.assertNotIn(new_password, log_fix.output)
  905 
  906     def test_changing_expired_password_succeeds(self):
  907         self.config_fixture.config(group='security_compliance',
  908                                    password_expires_days=2)
  909         password = self._create_user_with_expired_password()
  910 
  911         new_password = uuid.uuid4().hex
  912         self.change_password(password=new_password,
  913                              original_password=password,
  914                              expected_status=http.client.NO_CONTENT)
  915         # new password works
  916         self.get_request_token(new_password,
  917                                expected_status=http.client.CREATED)
  918 
  919     def test_changing_expired_password_with_disabled_user_fails(self):
  920         self.config_fixture.config(group='security_compliance',
  921                                    password_expires_days=2)
  922 
  923         password = self._create_user_with_expired_password()
  924         # disable the user account
  925         self.user_ref['enabled'] = False
  926         self.patch('/users/%s' % self.user_ref['id'],
  927                    body={'user': self.user_ref})
  928 
  929         new_password = uuid.uuid4().hex
  930         self.change_password(password=new_password,
  931                              original_password=password,
  932                              expected_status=http.client.UNAUTHORIZED)
  933 
  934     def test_change_password_required_upon_first_use_for_create(self):
  935         self.config_fixture.config(group='security_compliance',
  936                                    change_password_upon_first_use=True)
  937 
  938         # create user
  939         self.user_ref = unit.create_user(PROVIDERS.identity_api,
  940                                          domain_id=self.domain['id'])
  941 
  942         # attempt to authenticate with create user password
  943         self.get_request_token(self.user_ref['password'],
  944                                expected_status=http.client.UNAUTHORIZED)
  945 
  946         # self-service change password
  947         new_password = uuid.uuid4().hex
  948         self.change_password(password=new_password,
  949                              original_password=self.user_ref['password'],
  950                              expected_status=http.client.NO_CONTENT)
  951 
  952         # authenticate with the new password
  953         self.token = self.get_request_token(new_password, http.client.CREATED)
  954 
  955     def test_change_password_required_upon_first_use_for_admin_reset(self):
  956         self.config_fixture.config(group='security_compliance',
  957                                    change_password_upon_first_use=True)
  958 
  959         # admin reset
  960         reset_password = uuid.uuid4().hex
  961         user_password = {'password': reset_password}
  962         PROVIDERS.identity_api.update_user(self.user_ref['id'], user_password)
  963 
  964         # attempt to authenticate with admin reset password
  965         self.get_request_token(reset_password,
  966                                expected_status=http.client.UNAUTHORIZED)
  967 
  968         # self-service change password
  969         new_password = uuid.uuid4().hex
  970         self.change_password(password=new_password,
  971                              original_password=reset_password,
  972                              expected_status=http.client.NO_CONTENT)
  973 
  974         # authenticate with the new password
  975         self.token = self.get_request_token(new_password, http.client.CREATED)
  976 
  977     def test_change_password_required_upon_first_use_ignore_user(self):
  978         self.config_fixture.config(group='security_compliance',
  979                                    change_password_upon_first_use=True)
  980 
  981         # ignore user and reset password
  982         reset_password = uuid.uuid4().hex
  983         self.user_ref['password'] = reset_password
  984         ignore_opt_name = options.IGNORE_CHANGE_PASSWORD_OPT.option_name
  985         self.user_ref['options'][ignore_opt_name] = True
  986         PROVIDERS.identity_api.update_user(self.user_ref['id'], self.user_ref)
  987 
  988         # authenticate with the reset password
  989         self.token = self.get_request_token(reset_password,
  990                                             http.client.CREATED)
  991 
  992     def test_lockout_exempt(self):
  993         self.config_fixture.config(group='security_compliance',
  994                                    lockout_failure_attempts=1)
  995 
  996         # create user
  997         self.user_ref = unit.create_user(PROVIDERS.identity_api,
  998                                          domain_id=self.domain['id'])
  999 
 1000         # update the user, mark her as exempt from lockout
 1001         ignore_opt_name = options.IGNORE_LOCKOUT_ATTEMPT_OPT.option_name
 1002         self.user_ref['options'][ignore_opt_name] = True
 1003         PROVIDERS.identity_api.update_user(self.user_ref['id'], self.user_ref)
 1004 
 1005         # fail to auth, this should lockout the user, since we're allowed
 1006         # one failure, but we're exempt from lockout!
 1007         bad_password = uuid.uuid4().hex
 1008         self.token = self.get_request_token(bad_password,
 1009                                             http.client.UNAUTHORIZED)
 1010 
 1011         # attempt to authenticate with correct password
 1012         self.get_request_token(self.user_ref['password'],
 1013                                expected_status=http.client.CREATED)
 1014 
 1015 
 1016 class PasswordValidationTestCase(ChangePasswordTestCase):
 1017 
 1018     def setUp(self):
 1019         super(PasswordValidationTestCase, self).setUp()
 1020         # passwords requires: 1 letter, 1 digit, 7 chars
 1021         self.config_fixture.config(group='security_compliance',
 1022                                    password_regex=(
 1023                                        '^(?=.*\d)(?=.*[a-zA-Z]).{7,}$'))
 1024 
 1025     def test_create_user_with_invalid_password(self):
 1026         user = unit.new_user_ref(domain_id=self.domain_id)
 1027         user['password'] = 'simple'
 1028         self.post('/users', body={'user': user}, token=self.get_admin_token(),
 1029                   expected_status=http.client.BAD_REQUEST)
 1030 
 1031     def test_update_user_with_invalid_password(self):
 1032         user = unit.create_user(PROVIDERS.identity_api,
 1033                                 domain_id=self.domain['id'])
 1034         user['password'] = 'simple'
 1035         self.patch('/users/%(user_id)s' % {
 1036             'user_id': user['id']},
 1037             body={'user': user},
 1038             expected_status=http.client.BAD_REQUEST)
 1039 
 1040     def test_changing_password_with_simple_password_strength(self):
 1041         # password requires: any non-whitespace character
 1042         self.config_fixture.config(group='security_compliance',
 1043                                    password_regex='[\S]+')
 1044         self.change_password(password='simple',
 1045                              original_password=self.user_ref['password'],
 1046                              expected_status=http.client.NO_CONTENT)
 1047 
 1048     def test_changing_password_with_strong_password_strength(self):
 1049         self.change_password(password='mypassword2',
 1050                              original_password=self.user_ref['password'],
 1051                              expected_status=http.client.NO_CONTENT)
 1052 
 1053     def test_changing_password_with_strong_password_strength_fails(self):
 1054         # no digit
 1055         self.change_password(password='mypassword',
 1056                              original_password=self.user_ref['password'],
 1057                              expected_status=http.client.BAD_REQUEST)
 1058 
 1059         # no letter
 1060         self.change_password(password='12345678',
 1061                              original_password=self.user_ref['password'],
 1062                              expected_status=http.client.BAD_REQUEST)
 1063 
 1064         # less than 7 chars
 1065         self.change_password(password='mypas2',
 1066                              original_password=self.user_ref['password'],
 1067                              expected_status=http.client.BAD_REQUEST)
 1068 
 1069 
 1070 class UserFederatedAttributesTests(test_v3.RestfulTestCase):
 1071     def _create_federated_attributes(self):
 1072         # Create the idp
 1073         idp = {
 1074             'id': uuid.uuid4().hex,
 1075             'enabled': True,
 1076             'description': uuid.uuid4().hex
 1077         }
 1078         PROVIDERS.federation_api.create_idp(idp['id'], idp)
 1079         # Create the mapping
 1080         mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
 1081         mapping['id'] = uuid.uuid4().hex
 1082         PROVIDERS.federation_api.create_mapping(mapping['id'], mapping)
 1083         # Create the protocol
 1084         protocol = {
 1085             'id': uuid.uuid4().hex,
 1086             'mapping_id': mapping['id']
 1087         }
 1088         PROVIDERS.federation_api.create_protocol(
 1089             idp['id'], protocol['id'], protocol
 1090         )
 1091         return idp, protocol
 1092 
 1093     def _create_user_with_federated_user(self, user, fed_dict):
 1094         with sql.session_for_write() as session:
 1095             federated_ref = model.FederatedUser.from_dict(fed_dict)
 1096             user_ref = model.User.from_dict(user)
 1097             user_ref.created_at = datetime.datetime.utcnow()
 1098             user_ref.federated_users.append(federated_ref)
 1099             session.add(user_ref)
 1100             return identity_base.filter_user(user_ref.to_dict())
 1101 
 1102     def setUp(self):
 1103         super(UserFederatedAttributesTests, self).setUp()
 1104         self.useFixture(database.Database())
 1105         self.load_backends()
 1106         # Create the federated object
 1107         idp, protocol = self._create_federated_attributes()
 1108         self.fed_dict = unit.new_federated_user_ref()
 1109         self.fed_dict['idp_id'] = idp['id']
 1110         self.fed_dict['protocol_id'] = protocol['id']
 1111         self.fed_dict['unique_id'] = "jdoe"
 1112         # Create the domain_id, user, and federated_user relationship
 1113         self.domain = unit.new_domain_ref()
 1114         PROVIDERS.resource_api.create_domain(self.domain['id'], self.domain)
 1115         self.fed_user = unit.new_user_ref(domain_id=self.domain['id'])
 1116         self.fed_user = self._create_user_with_federated_user(self.fed_user,
 1117                                                               self.fed_dict)
 1118         # Create two new fed_users which will have the same idp and protocol
 1119         # but be completely different from the first fed_user
 1120         # Create a new idp and protocol for fed_user2 and 3
 1121         idp, protocol = self._create_federated_attributes()
 1122         self.fed_dict2 = unit.new_federated_user_ref()
 1123         self.fed_dict2['idp_id'] = idp['id']
 1124         self.fed_dict2['protocol_id'] = protocol['id']
 1125         self.fed_dict2['unique_id'] = "ravelar"
 1126         self.fed_user2 = unit.new_user_ref(domain_id=self.domain['id'])
 1127         self.fed_user2 = self._create_user_with_federated_user(self.fed_user2,
 1128                                                                self.fed_dict2)
 1129         self.fed_dict3 = unit.new_federated_user_ref()
 1130         self.fed_dict3['idp_id'] = idp['id']
 1131         self.fed_dict3['protocol_id'] = protocol['id']
 1132         self.fed_dict3['unique_id'] = "jsmith"
 1133         self.fed_user3 = unit.new_user_ref(domain_id=self.domain['id'])
 1134         self.fed_user3 = self._create_user_with_federated_user(self.fed_user3,
 1135                                                                self.fed_dict3)
 1136 
 1137     def _test_list_users_with_federated_parameter(self, parameter):
 1138         # construct the resource url based off what's passed in parameter
 1139         resource_url = ('/users?%s=%s'
 1140                         % (parameter[0], self.fed_dict[parameter[0]]))
 1141         for attr in parameter[1:]:
 1142             resource_url += '&%s=%s' % (attr, self.fed_dict[attr])
 1143         r = self.get(resource_url)
 1144         # Check that only one out of 3 fed_users is matched by calling the api
 1145         # and that it is a valid response
 1146         self.assertEqual(1, len(r.result['users']))
 1147         self.assertValidUserListResponse(r, ref=self.fed_user,
 1148                                          resource_url=resource_url)
 1149         # Since unique_id will always return one user if matching for unique_id
 1150         # in the query, we rule out unique_id for the next tests
 1151         if not any('unique_id' in x for x in parameter):
 1152             # Check that we get two matches here since fed_user2 and fed_user3
 1153             # both have the same idp and protocol
 1154             resource_url = ('/users?%s=%s'
 1155                             % (parameter[0], self.fed_dict2[parameter[0]]))
 1156             for attr in parameter[1:]:
 1157                 resource_url += '&%s=%s' % (attr, self.fed_dict2[attr])
 1158             r = self.get(resource_url)
 1159             self.assertEqual(2, len(r.result['users']))
 1160             self.assertValidUserListResponse(r, ref=self.fed_user2,
 1161                                              resource_url=resource_url)
 1162 
 1163     def test_list_users_with_idp_id(self):
 1164         attribute = ['idp_id']
 1165         self._test_list_users_with_federated_parameter(attribute)
 1166 
 1167     def test_list_users_with_protocol_id(self):
 1168         attribute = ['protocol_id']
 1169         self._test_list_users_with_federated_parameter(attribute)
 1170 
 1171     def test_list_users_with_unique_id(self):
 1172         attribute = ['unique_id']
 1173         self._test_list_users_with_federated_parameter(attribute)
 1174 
 1175     def test_list_users_with_idp_id_and_unique_id(self):
 1176         attribute = ['idp_id', 'unique_id']
 1177         self._test_list_users_with_federated_parameter(attribute)
 1178 
 1179     def test_list_users_with_idp_id_and_protocol_id(self):
 1180         attribute = ['idp_id', 'protocol_id']
 1181         self._test_list_users_with_federated_parameter(attribute)
 1182 
 1183     def test_list_users_with_protocol_id_and_unique_id(self):
 1184         attribute = ['protocol_id', 'unique_id']
 1185         self._test_list_users_with_federated_parameter(attribute)
 1186 
 1187     def test_list_users_with_all_federated_attributes(self):
 1188         attribute = ['idp_id', 'protocol_id', 'unique_id']
 1189         self._test_list_users_with_federated_parameter(attribute)
 1190 
 1191     def test_get_user_includes_required_federated_attributes(self):
 1192         user = self.identity_api.get_user(self.fed_user['id'])
 1193         self.assertIn('federated', user)
 1194         self.assertIn('idp_id', user['federated'][0])
 1195         self.assertIn('protocols', user['federated'][0])
 1196         self.assertIn('protocol_id', user['federated'][0]['protocols'][0])
 1197         self.assertIn('unique_id', user['federated'][0]['protocols'][0])
 1198         r = self.get('/users/%(user_id)s' % {'user_id': user['id']})
 1199         self.assertValidUserResponse(r, user)
 1200 
 1201     def test_create_user_with_federated_attributes(self):
 1202         """Call ``POST /users``."""
 1203         idp, protocol = self._create_federated_attributes()
 1204         ref = unit.new_user_ref(domain_id=self.domain_id)
 1205         ref['federated'] = [
 1206             {
 1207                 'idp_id': idp['id'],
 1208                 'protocols': [
 1209                     {
 1210                         'protocol_id': protocol['id'],
 1211                         'unique_id': uuid.uuid4().hex
 1212                     }
 1213                 ]
 1214             }
 1215         ]
 1216         r = self.post(
 1217             '/users',
 1218             body={'user': ref})
 1219         user = r.result['user']
 1220         self.assertEqual(user['name'], ref['name'])
 1221         self.assertEqual(user['federated'], ref['federated'])
 1222         self.assertValidUserResponse(r, ref)
 1223 
 1224     def test_create_user_fails_when_given_invalid_idp_and_protocols(self):
 1225         """Call ``POST /users`` with invalid idp and protocol to fail."""
 1226         idp, protocol = self._create_federated_attributes()
 1227         ref = unit.new_user_ref(domain_id=self.domain_id)
 1228         ref['federated'] = [
 1229             {
 1230                 'idp_id': 'fakeidp',
 1231                 'protocols': [
 1232                     {
 1233                         'protocol_id': 'fakeprotocol_id',
 1234                         'unique_id': uuid.uuid4().hex
 1235                     }
 1236                 ]
 1237             }
 1238         ]
 1239 
 1240         self.post('/users', body={'user': ref}, token=self.get_admin_token(),
 1241                   expected_status=http.client.BAD_REQUEST)
 1242         ref['federated'][0]['idp_id'] = idp['id']
 1243         self.post('/users', body={'user': ref}, token=self.get_admin_token(),
 1244                   expected_status=http.client.BAD_REQUEST)
 1245 
 1246     def test_update_user_with_federated_attributes(self):
 1247         """Call ``PATCH /users/{user_id}``."""
 1248         user = self.fed_user.copy()
 1249         del user['id']
 1250         user['name'] = 'James Doe'
 1251         idp, protocol = self._create_federated_attributes()
 1252         user['federated'] = [
 1253             {
 1254                 'idp_id': idp['id'],
 1255                 'protocols': [
 1256                     {
 1257                         'protocol_id': protocol['id'],
 1258                         'unique_id': 'jdoe'
 1259                     }
 1260                 ]
 1261             }
 1262         ]
 1263         r = self.patch('/users/%(user_id)s' % {
 1264             'user_id': self.fed_user['id']},
 1265             body={'user': user})
 1266         resp_user = r.result['user']
 1267         self.assertEqual(user['name'], resp_user['name'])
 1268         self.assertEqual(user['federated'], resp_user['federated'])
 1269         self.assertValidUserResponse(r, user)
 1270 
 1271     def test_update_user_fails_when_given_invalid_idp_and_protocols(self):
 1272         """Call ``PATCH /users/{user_id}``."""
 1273         user = self.fed_user.copy()
 1274         del user['id']
 1275         idp, protocol = self._create_federated_attributes()
 1276         user['federated'] = [
 1277             {
 1278                 'idp_id': 'fakeidp',
 1279                 'protocols': [
 1280                     {
 1281                         'protocol_id': 'fakeprotocol_id',
 1282                         'unique_id': uuid.uuid4().hex
 1283                     }
 1284                 ]
 1285             }
 1286         ]
 1287 
 1288         self.patch('/users/%(user_id)s' % {
 1289             'user_id': self.fed_user['id']},
 1290             body={'user': user},
 1291             expected_status=http.client.BAD_REQUEST)
 1292         user['federated'][0]['idp_id'] = idp['id']
 1293         self.patch('/users/%(user_id)s' % {
 1294             'user_id': self.fed_user['id']},
 1295             body={'user': user},
 1296             expected_status=http.client.BAD_REQUEST)