"Fossies" - the Fresh Open Source Software Archive

Member "keystone-18.0.0/keystone/tests/unit/test_shadow_users.py" (14 Oct 2020, 8022 Bytes) of package /linux/misc/openstack/keystone-18.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_shadow_users.py": 17.0.0_vs_18.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #      http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import uuid
   14 
   15 from keystone.common import provider_api
   16 from keystone import exception
   17 from keystone.tests import unit
   18 from keystone.tests.unit import default_fixtures
   19 from keystone.tests.unit.identity.shadow_users import test_backend
   20 from keystone.tests.unit.identity.shadow_users import test_core
   21 from keystone.tests.unit.ksfixtures import database
   22 
   23 PROVIDERS = provider_api.ProviderAPIs
   24 
   25 
   26 class ShadowUsersTests(unit.TestCase,
   27                        test_backend.ShadowUsersBackendTests,
   28                        test_core.ShadowUsersCoreTests):
   29     def setUp(self):
   30         super(ShadowUsersTests, self).setUp()
   31         self.useFixture(database.Database())
   32         self.load_backends()
   33         PROVIDERS.resource_api.create_domain(
   34             default_fixtures.ROOT_DOMAIN['id'], default_fixtures.ROOT_DOMAIN)
   35         self.idp = {
   36             'id': uuid.uuid4().hex,
   37             'enabled': True,
   38             'description': uuid.uuid4().hex
   39         }
   40         self.mapping = {
   41             'id': uuid.uuid4().hex,
   42         }
   43         self.protocol = {
   44             'id': uuid.uuid4().hex,
   45             'idp_id': self.idp['id'],
   46             'mapping_id': self.mapping['id']
   47         }
   48         self.federated_user = {
   49             'idp_id': self.idp['id'],
   50             'protocol_id': self.protocol['id'],
   51             'unique_id': uuid.uuid4().hex,
   52             'display_name': uuid.uuid4().hex
   53         }
   54         self.email = uuid.uuid4().hex
   55         PROVIDERS.federation_api.create_idp(self.idp['id'], self.idp)
   56         PROVIDERS.federation_api.create_mapping(
   57             self.mapping['id'], self.mapping
   58         )
   59         PROVIDERS.federation_api.create_protocol(
   60             self.idp['id'], self.protocol['id'], self.protocol)
   61         self.domain_id = (
   62             PROVIDERS.federation_api.get_idp(self.idp['id'])['domain_id'])
   63 
   64 
   65 class TestUserWithFederatedUser(ShadowUsersTests):
   66 
   67     def setUp(self):
   68         super(TestUserWithFederatedUser, self).setUp()
   69         self.useFixture(database.Database())
   70         self.load_backends()
   71 
   72     def assertFederatedDictsEqual(self, fed_dict, fed_object):
   73         self.assertEqual(fed_dict['idp_id'], fed_object['idp_id'])
   74         self.assertEqual(fed_dict['protocol_id'],
   75                          fed_object['protocols'][0]['protocol_id'])
   76         self.assertEqual(fed_dict['unique_id'],
   77                          fed_object['protocols'][0]['unique_id'])
   78 
   79     def test_get_user_when_user_has_federated_object(self):
   80         fed_dict = unit.new_federated_user_ref(idp_id=self.idp['id'],
   81                                                protocol_id=self.protocol['id'])
   82         user = self.shadow_users_api.create_federated_user(
   83             self.domain_id, fed_dict)
   84 
   85         # test that the user returns a federated object and that there is only
   86         # one returned
   87         user_ref = self.identity_api.get_user(user['id'])
   88         self.assertIn('federated', user_ref)
   89         self.assertEqual(1, len(user_ref['federated']))
   90 
   91         self.assertFederatedDictsEqual(fed_dict, user_ref['federated'][0])
   92 
   93     def test_create_user_with_invalid_idp_and_protocol_fails(self):
   94         baduser = unit.new_user_ref(domain_id=self.domain_id)
   95         baduser['federated'] = [
   96             {
   97                 'idp_id': 'fakeidp',
   98                 'protocols': [
   99                     {
  100                         'protocol_id': 'nonexistent',
  101                         'unique_id': 'unknown'
  102                     }
  103                 ]
  104             }
  105         ]
  106         # Check validation works by throwing a federated object with
  107         # invalid idp_id, protocol_id inside the user passed to create_user.
  108         self.assertRaises(exception.ValidationError,
  109                           self.identity_api.create_user,
  110                           baduser)
  111 
  112         baduser['federated'][0]['idp_id'] = self.idp['id']
  113         self.assertRaises(exception.ValidationError,
  114                           self.identity_api.create_user,
  115                           baduser)
  116 
  117     def test_create_user_with_federated_attributes(self):
  118         # Create the schema of a federated attribute being passed in with a
  119         # user.
  120         user = unit.new_user_ref(domain_id=self.domain_id)
  121         unique_id = uuid.uuid4().hex
  122         user['federated'] = [
  123             {
  124                 'idp_id': self.idp['id'],
  125                 'protocols': [
  126                     {
  127                         'protocol_id': self.protocol['id'],
  128                         'unique_id': unique_id
  129                     }
  130                 ]
  131             }
  132         ]
  133 
  134         # Test that there are no current federated_users that match our users
  135         # federated object and create the user
  136         self.assertRaises(exception.UserNotFound,
  137                           self.shadow_users_api.get_federated_user,
  138                           self.idp['id'],
  139                           self.protocol['id'],
  140                           unique_id)
  141 
  142         ref = self.identity_api.create_user(user)
  143 
  144         # Test that the user and federated object now exists
  145         self.assertEqual(user['name'], ref['name'])
  146         self.assertEqual(user['federated'], ref['federated'])
  147         fed_user = self.shadow_users_api.get_federated_user(
  148             self.idp['id'],
  149             self.protocol['id'],
  150             unique_id)
  151         self.assertIsNotNone(fed_user)
  152 
  153     def test_update_user_with_invalid_idp_and_protocol_fails(self):
  154         baduser = unit.new_user_ref(domain_id=self.domain_id)
  155         baduser['federated'] = [
  156             {
  157                 'idp_id': 'fakeidp',
  158                 'protocols': [
  159                     {
  160                         'protocol_id': 'nonexistent',
  161                         'unique_id': 'unknown'
  162                     }
  163                 ]
  164             }
  165         ]
  166         # Check validation works by throwing a federated object with
  167         # invalid idp_id, protocol_id inside the user passed to create_user.
  168         self.assertRaises(exception.ValidationError,
  169                           self.identity_api.create_user,
  170                           baduser)
  171 
  172         baduser['federated'][0]['idp_id'] = self.idp['id']
  173         self.assertRaises(exception.ValidationError,
  174                           self.identity_api.create_user,
  175                           baduser)
  176 
  177     def test_update_user_with_federated_attributes(self):
  178         user = self.shadow_users_api.create_federated_user(
  179             self.domain_id, self.federated_user)
  180         user = self.identity_api.get_user(user['id'])
  181 
  182         # Test that update user can return a federated object with the user as
  183         # a response if the user has any
  184         user = self.identity_api.update_user(user['id'], user)
  185         self.assertFederatedDictsEqual(self.federated_user,
  186                                        user['federated'][0])
  187 
  188         # Test that update user can replace a users federated objects if added
  189         # in the request and that its response is that new federated objects
  190         new_fed = [
  191             {
  192                 'idp_id': self.idp['id'],
  193                 'protocols': [
  194                     {
  195                         'protocol_id': self.protocol['id'],
  196                         'unique_id': uuid.uuid4().hex
  197                     }
  198                 ]
  199             }
  200         ]
  201         user['federated'] = new_fed
  202         user = self.identity_api.update_user(user['id'], user)
  203         self.assertTrue('federated' in user)
  204         self.assertEqual(len(user['federated']), 1)
  205         self.assertEqual(user['federated'][0], new_fed[0])