"Fossies" - the Fresh Open Source Software Archive

Member "keystone-19.0.0/keystone/tests/unit/test_backend_id_mapping_sql.py" (14 Apr 2021, 18421 Bytes) of package /linux/misc/openstack/keystone-19.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_backend_id_mapping_sql.py": 18.0.0_vs_19.0.0.

    1 # -*- coding: utf-8 -*-
    2 # Copyright 2014 IBM Corp.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 # not use this file except in compliance with the License. You may obtain
    6 # a copy of the License at
    7 #
    8 #      http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 # Unless required by applicable law or agreed to in writing, software
   11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 # License for the specific language governing permissions and limitations
   14 # under the License.
   15 
   16 import uuid
   17 
   18 from testtools import matchers
   19 
   20 from keystone.common import provider_api
   21 from keystone.common import sql
   22 from keystone.identity.mapping_backends import mapping
   23 from keystone.tests import unit
   24 from keystone.tests.unit import identity_mapping as mapping_sql
   25 from keystone.tests.unit import test_backend_sql
   26 
   27 PROVIDERS = provider_api.ProviderAPIs
   28 
   29 
   30 class SqlIDMappingTable(test_backend_sql.SqlModels):
   31     """Set of tests for checking SQL Identity ID Mapping."""
   32 
   33     def test_id_mapping(self):
   34         cols = (('public_id', sql.String, 64),
   35                 ('domain_id', sql.String, 64),
   36                 ('local_id', sql.String, 64),
   37                 ('entity_type', sql.Enum, None))
   38         self.assertExpectedSchema('id_mapping', cols)
   39 
   40 
   41 class SqlIDMapping(test_backend_sql.SqlTests):
   42 
   43     def setUp(self):
   44         super(SqlIDMapping, self).setUp()
   45         self.load_sample_data()
   46 
   47     def load_sample_data(self):
   48         self.addCleanup(self.clean_sample_data)
   49         domainA = unit.new_domain_ref()
   50         self.domainA = PROVIDERS.resource_api.create_domain(
   51             domainA['id'], domainA
   52         )
   53         domainB = unit.new_domain_ref()
   54         self.domainB = PROVIDERS.resource_api.create_domain(
   55             domainB['id'], domainB
   56         )
   57 
   58     def clean_sample_data(self):
   59         if hasattr(self, 'domainA'):
   60             self.domainA['enabled'] = False
   61             PROVIDERS.resource_api.update_domain(
   62                 self.domainA['id'], self.domainA
   63             )
   64             PROVIDERS.resource_api.delete_domain(self.domainA['id'])
   65         if hasattr(self, 'domainB'):
   66             self.domainB['enabled'] = False
   67             PROVIDERS.resource_api.update_domain(
   68                 self.domainB['id'], self.domainB
   69             )
   70             PROVIDERS.resource_api.delete_domain(self.domainB['id'])
   71 
   72     def test_invalid_public_key(self):
   73         self.assertIsNone(
   74             PROVIDERS.id_mapping_api.get_id_mapping(uuid.uuid4().hex)
   75         )
   76 
   77     def test_id_mapping_crud(self):
   78         initial_mappings = len(mapping_sql.list_id_mappings())
   79         local_id1 = uuid.uuid4().hex
   80         local_id2 = uuid.uuid4().hex
   81         local_entity1 = {'domain_id': self.domainA['id'],
   82                          'local_id': local_id1,
   83                          'entity_type': mapping.EntityType.USER}
   84         local_entity2 = {'domain_id': self.domainB['id'],
   85                          'local_id': local_id2,
   86                          'entity_type': mapping.EntityType.GROUP}
   87 
   88         # Check no mappings for the new local entities
   89         self.assertIsNone(
   90             PROVIDERS.id_mapping_api.get_public_id(local_entity1)
   91         )
   92         self.assertIsNone(
   93             PROVIDERS.id_mapping_api.get_public_id(local_entity2)
   94         )
   95 
   96         # Create the new mappings and then read them back
   97         public_id1 = PROVIDERS.id_mapping_api.create_id_mapping(local_entity1)
   98         public_id2 = PROVIDERS.id_mapping_api.create_id_mapping(local_entity2)
   99         self.assertThat(mapping_sql.list_id_mappings(),
  100                         matchers.HasLength(initial_mappings + 2))
  101         self.assertEqual(
  102             public_id1, PROVIDERS.id_mapping_api.get_public_id(local_entity1))
  103         self.assertEqual(
  104             public_id2, PROVIDERS.id_mapping_api.get_public_id(local_entity2))
  105 
  106         local_id_ref = PROVIDERS.id_mapping_api.get_id_mapping(public_id1)
  107         self.assertEqual(self.domainA['id'], local_id_ref['domain_id'])
  108         self.assertEqual(local_id1, local_id_ref['local_id'])
  109         self.assertEqual(mapping.EntityType.USER, local_id_ref['entity_type'])
  110         # Check we have really created a new external ID
  111         self.assertNotEqual(local_id1, public_id1)
  112 
  113         local_id_ref = PROVIDERS.id_mapping_api.get_id_mapping(public_id2)
  114         self.assertEqual(self.domainB['id'], local_id_ref['domain_id'])
  115         self.assertEqual(local_id2, local_id_ref['local_id'])
  116         self.assertEqual(mapping.EntityType.GROUP, local_id_ref['entity_type'])
  117         # Check we have really created a new external ID
  118         self.assertNotEqual(local_id2, public_id2)
  119 
  120         # Create another mappings, this time specifying a public ID to use
  121         new_public_id = uuid.uuid4().hex
  122         public_id3 = PROVIDERS.id_mapping_api.create_id_mapping(
  123             {'domain_id': self.domainB['id'], 'local_id': local_id2,
  124              'entity_type': mapping.EntityType.USER},
  125             public_id=new_public_id)
  126         self.assertEqual(new_public_id, public_id3)
  127         self.assertThat(mapping_sql.list_id_mappings(),
  128                         matchers.HasLength(initial_mappings + 3))
  129 
  130         # Delete the mappings we created, and make sure the mapping count
  131         # goes back to where it was
  132         PROVIDERS.id_mapping_api.delete_id_mapping(public_id1)
  133         PROVIDERS.id_mapping_api.delete_id_mapping(public_id2)
  134         PROVIDERS.id_mapping_api.delete_id_mapping(public_id3)
  135         self.assertThat(mapping_sql.list_id_mappings(),
  136                         matchers.HasLength(initial_mappings))
  137 
  138     def test_id_mapping_handles_unicode(self):
  139         initial_mappings = len(mapping_sql.list_id_mappings())
  140         local_id = u'fäké1'
  141         local_entity = {'domain_id': self.domainA['id'],
  142                         'local_id': local_id,
  143                         'entity_type': mapping.EntityType.USER}
  144 
  145         # Check no mappings for the new local entity
  146         self.assertIsNone(PROVIDERS.id_mapping_api.get_public_id(local_entity))
  147 
  148         # Create the new mapping and then read it back
  149         public_id = PROVIDERS.id_mapping_api.create_id_mapping(local_entity)
  150         self.assertThat(mapping_sql.list_id_mappings(),
  151                         matchers.HasLength(initial_mappings + 1))
  152         self.assertEqual(
  153             public_id, PROVIDERS.id_mapping_api.get_public_id(local_entity))
  154 
  155     def test_id_mapping_handles_bytes(self):
  156         initial_mappings = len(mapping_sql.list_id_mappings())
  157         local_id = b'FaKeID'
  158         local_entity = {'domain_id': self.domainA['id'],
  159                         'local_id': local_id,
  160                         'entity_type': mapping.EntityType.USER}
  161 
  162         # Check no mappings for the new local entity
  163         self.assertIsNone(PROVIDERS.id_mapping_api.get_public_id(local_entity))
  164 
  165         # Create the new mapping and then read it back
  166         public_id = PROVIDERS.id_mapping_api.create_id_mapping(local_entity)
  167         self.assertThat(mapping_sql.list_id_mappings(),
  168                         matchers.HasLength(initial_mappings + 1))
  169         self.assertEqual(
  170             public_id, PROVIDERS.id_mapping_api.get_public_id(local_entity))
  171 
  172     def test_delete_public_id_is_silent(self):
  173         # Test that deleting an invalid public key is silent
  174         PROVIDERS.id_mapping_api.delete_id_mapping(uuid.uuid4().hex)
  175 
  176     def test_purge_mappings(self):
  177         initial_mappings = len(mapping_sql.list_id_mappings())
  178         local_id1 = uuid.uuid4().hex
  179         local_id2 = uuid.uuid4().hex
  180         local_id3 = uuid.uuid4().hex
  181         local_id4 = uuid.uuid4().hex
  182         local_id5 = uuid.uuid4().hex
  183 
  184         # Create five mappings,two in domainA, three in domainB
  185         PROVIDERS.id_mapping_api.create_id_mapping(
  186             {'domain_id': self.domainA['id'], 'local_id': local_id1,
  187              'entity_type': mapping.EntityType.USER})
  188         PROVIDERS.id_mapping_api.create_id_mapping(
  189             {'domain_id': self.domainA['id'], 'local_id': local_id2,
  190              'entity_type': mapping.EntityType.USER})
  191         public_id3 = PROVIDERS.id_mapping_api.create_id_mapping(
  192             {'domain_id': self.domainB['id'], 'local_id': local_id3,
  193              'entity_type': mapping.EntityType.GROUP})
  194         public_id4 = PROVIDERS.id_mapping_api.create_id_mapping(
  195             {'domain_id': self.domainB['id'], 'local_id': local_id4,
  196              'entity_type': mapping.EntityType.USER})
  197         public_id5 = PROVIDERS.id_mapping_api.create_id_mapping(
  198             {'domain_id': self.domainB['id'], 'local_id': local_id5,
  199              'entity_type': mapping.EntityType.USER})
  200 
  201         self.assertThat(mapping_sql.list_id_mappings(),
  202                         matchers.HasLength(initial_mappings + 5))
  203 
  204         # Purge mappings for domainA, should be left with those in B
  205         PROVIDERS.id_mapping_api.purge_mappings(
  206             {'domain_id': self.domainA['id']})
  207         self.assertThat(mapping_sql.list_id_mappings(),
  208                         matchers.HasLength(initial_mappings + 3))
  209         PROVIDERS.id_mapping_api.get_id_mapping(public_id3)
  210         PROVIDERS.id_mapping_api.get_id_mapping(public_id4)
  211         PROVIDERS.id_mapping_api.get_id_mapping(public_id5)
  212 
  213         # Purge mappings for type Group, should purge one more
  214         PROVIDERS.id_mapping_api.purge_mappings(
  215             {'entity_type': mapping.EntityType.GROUP})
  216         self.assertThat(mapping_sql.list_id_mappings(),
  217                         matchers.HasLength(initial_mappings + 2))
  218         PROVIDERS.id_mapping_api.get_id_mapping(public_id4)
  219         PROVIDERS.id_mapping_api.get_id_mapping(public_id5)
  220 
  221         # Purge mapping for a specific local identifier
  222         PROVIDERS.id_mapping_api.purge_mappings(
  223             {'domain_id': self.domainB['id'], 'local_id': local_id4,
  224              'entity_type': mapping.EntityType.USER})
  225         self.assertThat(mapping_sql.list_id_mappings(),
  226                         matchers.HasLength(initial_mappings + 1))
  227         PROVIDERS.id_mapping_api.get_id_mapping(public_id5)
  228 
  229         # Purge mappings the remaining mappings
  230         PROVIDERS.id_mapping_api.purge_mappings({})
  231         self.assertThat(mapping_sql.list_id_mappings(),
  232                         matchers.HasLength(initial_mappings))
  233 
  234     def test_create_duplicate_mapping(self):
  235         local_entity = {
  236             'domain_id': self.domainA['id'],
  237             'local_id': uuid.uuid4().hex,
  238             'entity_type': mapping.EntityType.USER}
  239         public_id1 = PROVIDERS.id_mapping_api.create_id_mapping(local_entity)
  240 
  241         # second call should be successful and return the same
  242         # public_id as above
  243         public_id2 = PROVIDERS.id_mapping_api.create_id_mapping(local_entity)
  244         self.assertEqual(public_id1, public_id2)
  245 
  246         # even if public_id was specified, it should not be used,
  247         # and still the same public_id should be returned
  248         public_id3 = PROVIDERS.id_mapping_api.create_id_mapping(
  249             local_entity, public_id=uuid.uuid4().hex)
  250         self.assertEqual(public_id1, public_id3)
  251 
  252     @unit.skip_if_cache_disabled('identity')
  253     def test_cache_when_id_mapping_crud(self):
  254         local_id = uuid.uuid4().hex
  255         local_entity = {'domain_id': self.domainA['id'],
  256                         'local_id': local_id,
  257                         'entity_type': mapping.EntityType.USER}
  258 
  259         # Check no mappings for the new local entity
  260         self.assertIsNone(PROVIDERS.id_mapping_api.get_public_id(local_entity))
  261 
  262         # Create new mappings, and it should be in the cache after created
  263         public_id = PROVIDERS.id_mapping_api.create_id_mapping(local_entity)
  264         self.assertEqual(
  265             public_id, PROVIDERS.id_mapping_api.get_public_id(local_entity))
  266         local_id_ref = PROVIDERS.id_mapping_api.get_id_mapping(public_id)
  267         self.assertEqual(self.domainA['id'], local_id_ref['domain_id'])
  268         self.assertEqual(local_id, local_id_ref['local_id'])
  269         self.assertEqual(mapping.EntityType.USER, local_id_ref['entity_type'])
  270 
  271         # After delete the mapping, should be deleted from cache too
  272         PROVIDERS.id_mapping_api.delete_id_mapping(public_id)
  273         self.assertIsNone(PROVIDERS.id_mapping_api.get_public_id(local_entity))
  274         self.assertIsNone(PROVIDERS.id_mapping_api.get_id_mapping(public_id))
  275 
  276     @unit.skip_if_cache_disabled('identity')
  277     def test_invalidate_cache_when_purge_mappings(self):
  278         local_id1 = uuid.uuid4().hex
  279         local_id2 = uuid.uuid4().hex
  280         local_id3 = uuid.uuid4().hex
  281         local_id4 = uuid.uuid4().hex
  282         local_id5 = uuid.uuid4().hex
  283 
  284         # Create five mappings,two in domainA, three in domainB
  285         local_entity1 = {'domain_id': self.domainA['id'],
  286                          'local_id': local_id1,
  287                          'entity_type': mapping.EntityType.USER}
  288         local_entity2 = {'domain_id': self.domainA['id'],
  289                          'local_id': local_id2,
  290                          'entity_type': mapping.EntityType.USER}
  291         local_entity3 = {'domain_id': self.domainB['id'],
  292                          'local_id': local_id3,
  293                          'entity_type': mapping.EntityType.GROUP}
  294         local_entity4 = {'domain_id': self.domainB['id'],
  295                          'local_id': local_id4,
  296                          'entity_type': mapping.EntityType.USER}
  297         local_entity5 = {'domain_id': self.domainB['id'],
  298                          'local_id': local_id5,
  299                          'entity_type': mapping.EntityType.USER}
  300 
  301         PROVIDERS.id_mapping_api.create_id_mapping(local_entity1)
  302         PROVIDERS.id_mapping_api.create_id_mapping(local_entity2)
  303         PROVIDERS.id_mapping_api.create_id_mapping(local_entity3)
  304         PROVIDERS.id_mapping_api.create_id_mapping(local_entity4)
  305         PROVIDERS.id_mapping_api.create_id_mapping(local_entity5)
  306 
  307         # Purge mappings for domainA, should be left with those in B
  308         PROVIDERS.id_mapping_api.purge_mappings(
  309             {'domain_id': self.domainA['id']})
  310         self.assertIsNone(
  311             PROVIDERS.id_mapping_api.get_public_id(local_entity1)
  312         )
  313         self.assertIsNone(
  314             PROVIDERS.id_mapping_api.get_public_id(local_entity2)
  315         )
  316 
  317         # Purge mappings for type Group, should purge one more
  318         PROVIDERS.id_mapping_api.purge_mappings(
  319             {'entity_type': mapping.EntityType.GROUP})
  320         self.assertIsNone(
  321             PROVIDERS.id_mapping_api.get_public_id(local_entity3)
  322         )
  323 
  324         # Purge mapping for a specific local identifier
  325         PROVIDERS.id_mapping_api.purge_mappings(
  326             {'domain_id': self.domainB['id'], 'local_id': local_id4,
  327              'entity_type': mapping.EntityType.USER})
  328         self.assertIsNone(
  329             PROVIDERS.id_mapping_api.get_public_id(local_entity4)
  330         )
  331 
  332         # Purge mappings the remaining mappings
  333         PROVIDERS.id_mapping_api.purge_mappings({})
  334         self.assertIsNone(
  335             PROVIDERS.id_mapping_api.get_public_id(local_entity5)
  336         )
  337 
  338     def _prepare_domain_mappings_for_list(self):
  339         # Create five mappings:
  340         # two users in domainA, one group and two users in domainB
  341         local_entities = [
  342             {'domain_id': self.domainA['id'],
  343              'entity_type': mapping.EntityType.USER},
  344             {'domain_id': self.domainA['id'],
  345              'entity_type': mapping.EntityType.USER},
  346             {'domain_id': self.domainB['id'],
  347              'entity_type': mapping.EntityType.GROUP},
  348             {'domain_id': self.domainB['id'],
  349              'entity_type': mapping.EntityType.USER},
  350             {'domain_id': self.domainB['id'],
  351              'entity_type': mapping.EntityType.USER}
  352         ]
  353         for e in local_entities:
  354             e['local_id'] = uuid.uuid4().hex
  355             e['public_id'] = PROVIDERS.id_mapping_api.create_id_mapping(e)
  356         return local_entities
  357 
  358     def test_get_domain_mapping_list(self):
  359         local_entities = self._prepare_domain_mappings_for_list()
  360         # NOTE(notmorgan): Always call to_dict in an active session context to
  361         # ensure that lazy-loaded relationships succeed. Edge cases could cause
  362         # issues especially in attribute mappers.
  363         with sql.session_for_read():
  364             # list mappings for domainA
  365             domain_a_mappings = (
  366                 PROVIDERS.id_mapping_api.get_domain_mapping_list(
  367                     self.domainA['id']
  368                 )
  369             )
  370             domain_a_mappings = [m.to_dict() for m in domain_a_mappings]
  371         self.assertItemsEqual(local_entities[:2], domain_a_mappings)
  372 
  373     def test_get_domain_mapping_list_by_user_entity_type(self):
  374         local_entities = self._prepare_domain_mappings_for_list()
  375         # NOTE(notmorgan): Always call to_dict in an active session context to
  376         # ensure that lazy-loaded relationships succeed. Edge cases could cause
  377         # issues especially in attribute mappers.
  378         with sql.session_for_read():
  379             # list user mappings for domainB
  380             domain_b_mappings_user = (
  381                 PROVIDERS.id_mapping_api.get_domain_mapping_list(
  382                     self.domainB['id'], entity_type=mapping.EntityType.USER
  383                 )
  384             )
  385             domain_b_mappings_user = [m.to_dict()
  386                                       for m in domain_b_mappings_user]
  387         self.assertItemsEqual(local_entities[-2:], domain_b_mappings_user)
  388 
  389     def test_get_domain_mapping_list_by_group_entity_type(self):
  390         local_entities = self._prepare_domain_mappings_for_list()
  391         # NOTE(notmorgan): Always call to_dict in an active session context to
  392         # ensure that lazy-loaded relationships succeed. Edge cases could cause
  393         # issues especially in attribute mappers.
  394         with sql.session_for_read():
  395             # List group mappings for domainB. Given the data set, this should
  396             # only return a single reference, so don't both iterating the query
  397             # response.
  398             domain_b_mappings_group = (
  399                 PROVIDERS.id_mapping_api.get_domain_mapping_list(
  400                     self.domainB['id'], entity_type=mapping.EntityType.GROUP
  401                 )
  402             )
  403             domain_b_mappings_group = domain_b_mappings_group.first().to_dict()
  404         self.assertItemsEqual(local_entities[2], domain_b_mappings_group)