"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/application_credential/test_backends.py" (13 May 2020, 17912 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_backends.py": 16.0.1_vs_17.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #      http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import datetime
   14 import uuid
   15 
   16 from oslo_config import fixture as config_fixture
   17 
   18 from keystone.common import driver_hints
   19 from keystone.common import provider_api
   20 import keystone.conf
   21 from keystone import exception
   22 
   23 
   24 CONF = keystone.conf.CONF
   25 PROVIDERS = provider_api.ProviderAPIs
   26 
   27 
   28 class ApplicationCredentialTests(object):
   29 
   30     def _new_app_cred_data(self, user_id, project_id=None, name=None,
   31                            expires=None, system=None):
   32         if not name:
   33             name = uuid.uuid4().hex
   34         if not expires:
   35             expires = datetime.datetime.utcnow() + datetime.timedelta(days=365)
   36         if not system:
   37             system = uuid.uuid4().hex
   38         if not project_id:
   39             project_id = uuid.uuid4().hex
   40         app_cred_data = {
   41             'id': uuid.uuid4().hex,
   42             'name': name,
   43             'description': uuid.uuid4().hex,
   44             'user_id': user_id,
   45             'project_id': project_id,
   46             'system': system,
   47             'expires_at': expires,
   48             'roles': [
   49                 {'id': self.role__member_['id']},
   50             ],
   51             'secret': uuid.uuid4().hex,
   52             'unrestricted': False
   53         }
   54         return app_cred_data
   55 
   56     def test_create_application_credential(self):
   57         app_cred = self._new_app_cred_data(self.user_foo['id'],
   58                                            project_id=self.project_bar['id'])
   59         resp = self.app_cred_api.create_application_credential(app_cred)
   60         resp_roles = resp.pop('roles')
   61         orig_roles = app_cred.pop('roles')
   62         self.assertDictEqual(app_cred, resp)
   63         self.assertEqual(orig_roles[0]['id'], resp_roles[0]['id'])
   64 
   65     def test_create_duplicate_application_credential_fails(self):
   66         # Ensure a user can't create two application credentials with the same
   67         # name
   68         app_cred = self._new_app_cred_data(self.user_foo['id'],
   69                                            project_id=self.project_bar['id'])
   70         name = app_cred['name']
   71         self.app_cred_api.create_application_credential(app_cred)
   72         app_cred = self._new_app_cred_data(self.user_foo['id'],
   73                                            project_id=self.project_bar['id'],
   74                                            name=name)
   75         self.assertRaises(exception.Conflict,
   76                           self.app_cred_api.create_application_credential,
   77                           app_cred)
   78 
   79     def test_create_application_credential_require_role_assignments(self):
   80         # Ensure a user can't create an application credential for a project
   81         # they don't have a role assignment on
   82         app_cred = self._new_app_cred_data(self.user_foo['id'],
   83                                            project_id=self.project_baz['id'])
   84         self.assertRaises(exception.RoleAssignmentNotFound,
   85                           self.app_cred_api.create_application_credential,
   86                           app_cred)
   87 
   88     def test_application_credential_allow_recursion(self):
   89         app_cred = self._new_app_cred_data(self.user_foo['id'],
   90                                            project_id=self.project_bar['id'])
   91         app_cred['unrestricted'] = True
   92         resp = self.app_cred_api.create_application_credential(app_cred)
   93         resp.pop('roles')
   94         app_cred.pop('roles')
   95         self.assertDictEqual(app_cred, resp)
   96 
   97     def test_application_credential_limits(self):
   98         config_fixture_ = self.user = self.useFixture(config_fixture.Config())
   99         config_fixture_.config(group='application_credential', user_limit=2)
  100         app_cred = self._new_app_cred_data(self.user_foo['id'],
  101                                            self.project_bar['id'])
  102         self.app_cred_api.create_application_credential(app_cred)
  103         app_cred['name'] = 'two'
  104         self.app_cred_api.create_application_credential(app_cred)
  105         app_cred['name'] = 'three'
  106         self.assertRaises(exception.ApplicationCredentialLimitExceeded,
  107                           self.app_cred_api.create_application_credential,
  108                           app_cred)
  109 
  110     def test_create_application_credential_with_access_rules(self):
  111         app_cred = self._new_app_cred_data(self.user_foo['id'],
  112                                            project_id=self.project_bar['id'])
  113         app_cred['access_rules'] = [{
  114             'id': uuid.uuid4().hex,
  115             'service': uuid.uuid4().hex,
  116             'path': uuid.uuid4().hex,
  117             'method': uuid.uuid4().hex[16:]
  118         }]
  119         resp = self.app_cred_api.create_application_credential(app_cred)
  120         resp.pop('roles')
  121         resp_access_rules = resp.pop('access_rules')
  122         app_cred.pop('roles')
  123         orig_access_rules = app_cred.pop('access_rules')
  124         self.assertDictEqual(app_cred, resp)
  125         for i, ar in enumerate(resp_access_rules):
  126             self.assertDictEqual(orig_access_rules[i], ar)
  127 
  128     def test_create_application_credential_with_preexisting_access_rules(self):
  129         app_cred_1 = self._new_app_cred_data(self.user_foo['id'],
  130                                              project_id=self.project_bar['id'])
  131         app_cred_1['access_rules'] = [{
  132             'id': uuid.uuid4().hex,
  133             'service': uuid.uuid4().hex,
  134             'path': uuid.uuid4().hex,
  135             'method': uuid.uuid4().hex[16:]
  136         }]
  137         resp = self.app_cred_api.create_application_credential(app_cred_1)
  138         resp_access_rules_1 = resp.pop('access_rules')
  139         app_cred_2 = self._new_app_cred_data(self.user_foo['id'],
  140                                              project_id=self.project_bar['id'])
  141         app_cred_2['access_rules'] = [{'id': resp_access_rules_1[0]['id']}]
  142         resp = self.app_cred_api.create_application_credential(app_cred_2)
  143         resp_access_rules_2 = resp.pop('access_rules')
  144         self.assertDictEqual(resp_access_rules_1[0], resp_access_rules_2[0])
  145 
  146     def test_get_application_credential(self):
  147         app_cred = self._new_app_cred_data(self.user_foo['id'],
  148                                            project_id=self.project_bar['id'])
  149         create_resp = self.app_cred_api.create_application_credential(app_cred)
  150         app_cred_id = create_resp['id']
  151         get_resp = self.app_cred_api.get_application_credential(app_cred_id)
  152         create_resp.pop('secret')
  153         self.assertDictEqual(create_resp, get_resp)
  154 
  155     def test_get_application_credential_not_found(self):
  156         self.assertRaises(exception.ApplicationCredentialNotFound,
  157                           self.app_cred_api.get_application_credential,
  158                           uuid.uuid4().hex)
  159 
  160     def test_list_application_credentials(self):
  161         app_cred_1 = self._new_app_cred_data(self.user_foo['id'],
  162                                              project_id=self.project_bar['id'],
  163                                              name='app1')
  164         app_cred_2 = self._new_app_cred_data(self.user_foo['id'],
  165                                              project_id=self.project_bar['id'],
  166                                              name='app2')
  167         app_cred_3 = self._new_app_cred_data(self.user_two['id'],
  168                                              project_id=self.project_baz['id'],
  169                                              name='app3')
  170         resp1 = self.app_cred_api.create_application_credential(app_cred_1)
  171         resp2 = self.app_cred_api.create_application_credential(app_cred_2)
  172         resp3 = self.app_cred_api.create_application_credential(app_cred_3)
  173         hints = driver_hints.Hints()
  174         resp = self.app_cred_api.list_application_credentials(
  175             self.user_foo['id'], hints)
  176         resp_ids = [ac['id'] for ac in resp]
  177         self.assertIn(resp1['id'], resp_ids)
  178         self.assertIn(resp2['id'], resp_ids)
  179         self.assertNotIn(resp3['id'], resp_ids)
  180         for ac in resp:
  181             self.assertNotIn('secret_hash', ac)
  182 
  183     def _list_ids(self, user):
  184         hints = driver_hints.Hints()
  185         resp = self.app_cred_api.list_application_credentials(user['id'],
  186                                                               hints)
  187         return [ac['id'] for ac in resp]
  188 
  189     def test_delete_application_credential(self):
  190         app_cred = self._new_app_cred_data(self.user_foo['id'],
  191                                            project_id=self.project_bar['id'])
  192         self.app_cred_api.create_application_credential(app_cred)
  193 
  194         # cache the information
  195         self.app_cred_api.get_application_credential(app_cred['id'])
  196 
  197         self.assertIn(app_cred['id'], self._list_ids(self.user_foo))
  198         self.app_cred_api.delete_application_credential(app_cred['id'])
  199         self.assertNotIn(app_cred['id'], self._list_ids(self.user_foo))
  200 
  201         # the cache information has been invalidated.
  202         self.assertRaises(exception.ApplicationCredentialNotFound,
  203                           self.app_cred_api.get_application_credential,
  204                           app_cred['id'])
  205 
  206     def test_delete_application_credential_not_found(self):
  207         self.assertRaises(exception.ApplicationCredentialNotFound,
  208                           self.app_cred_api.delete_application_credential,
  209                           uuid.uuid4().hex)
  210 
  211     def test_deleting_a_user_deletes_application_credentials(self):
  212         app_cred_1 = self._new_app_cred_data(self.user_foo['id'],
  213                                              project_id=self.project_bar['id'],
  214                                              name='app1')
  215         app_cred_2 = self._new_app_cred_data(self.user_foo['id'],
  216                                              project_id=self.project_bar['id'],
  217                                              name='app2')
  218         self.app_cred_api.create_application_credential(app_cred_1)
  219         self.app_cred_api.create_application_credential(app_cred_2)
  220         self.assertIn(app_cred_1['id'], self._list_ids(self.user_foo))
  221         self.assertIn(app_cred_2['id'], self._list_ids(self.user_foo))
  222 
  223         # cache the information
  224         self.app_cred_api.get_application_credential(app_cred_1['id'])
  225         self.app_cred_api.get_application_credential(app_cred_2['id'])
  226 
  227         # This should trigger a notification which should invoke a callback in
  228         # the application credential Manager to cleanup user_foo's application
  229         # credentials.
  230         PROVIDERS.identity_api.delete_user(self.user_foo['id'])
  231         hints = driver_hints.Hints()
  232         self.assertListEqual(
  233             [], self.app_cred_api.list_application_credentials(
  234                 self.user_foo['id'], hints))
  235 
  236         # the cache information has been invalidated.
  237         self.assertRaises(exception.ApplicationCredentialNotFound,
  238                           self.app_cred_api.get_application_credential,
  239                           app_cred_1['id'])
  240         self.assertRaises(exception.ApplicationCredentialNotFound,
  241                           self.app_cred_api.get_application_credential,
  242                           app_cred_2['id'])
  243 
  244     def test_removing_user_from_project_deletes_application_credentials(self):
  245         app_cred_proj_A_1 = self._new_app_cred_data(
  246             self.user_foo['id'], project_id=self.project_bar['id'],
  247             name='app1')
  248         app_cred_proj_A_2 = self._new_app_cred_data(
  249             self.user_foo['id'], project_id=self.project_bar['id'],
  250             name='app2')
  251         app_cred_proj_B = self._new_app_cred_data(
  252             self.user_foo['id'], project_id=self.project_baz['id'],
  253             name='app3')
  254         PROVIDERS.assignment_api.add_role_to_user_and_project(
  255             project_id=self.project_baz['id'],
  256             user_id=self.user_foo['id'],
  257             role_id=self.role__member_['id'])
  258         self.app_cred_api.create_application_credential(app_cred_proj_A_1)
  259         self.app_cred_api.create_application_credential(app_cred_proj_A_2)
  260         self.app_cred_api.create_application_credential(app_cred_proj_B)
  261         self.assertIn(app_cred_proj_A_1['id'], self._list_ids(self.user_foo))
  262         self.assertIn(app_cred_proj_A_2['id'], self._list_ids(self.user_foo))
  263         self.assertIn(app_cred_proj_B['id'], self._list_ids(self.user_foo))
  264 
  265         # cache the information
  266         self.app_cred_api.get_application_credential(app_cred_proj_A_1['id'])
  267         self.app_cred_api.get_application_credential(app_cred_proj_A_2['id'])
  268         self.app_cred_api.get_application_credential(app_cred_proj_B['id'])
  269 
  270         # This should trigger a notification which should invoke a callback in
  271         # the application credential Manager to cleanup all of user_foo's
  272         # application credentials on project bar.
  273         PROVIDERS.assignment_api.remove_role_from_user_and_project(
  274             user_id=self.user_foo['id'],
  275             project_id=self.project_bar['id'],
  276             role_id=self.role__member_['id'])
  277         self.assertNotIn(app_cred_proj_A_1['id'],
  278                          self._list_ids(self.user_foo))
  279         self.assertNotIn(app_cred_proj_A_2['id'],
  280                          self._list_ids(self.user_foo))
  281         self.assertIn(app_cred_proj_B['id'], self._list_ids(self.user_foo))
  282 
  283         # the cache information has been invalidated only for the deleted
  284         # application credential.
  285         self.assertRaises(exception.ApplicationCredentialNotFound,
  286                           self.app_cred_api.get_application_credential,
  287                           app_cred_proj_A_1['id'])
  288         self.assertRaises(exception.ApplicationCredentialNotFound,
  289                           self.app_cred_api.get_application_credential,
  290                           app_cred_proj_A_2['id'])
  291         self.assertEqual(app_cred_proj_B['id'],
  292                          self.app_cred_api.get_application_credential(
  293                              app_cred_proj_B['id'])['id'])
  294 
  295     def test_authenticate(self):
  296         app_cred = self._new_app_cred_data(self.user_foo['id'],
  297                                            project_id=self.project_bar['id'])
  298         resp = self.app_cred_api.create_application_credential(app_cred)
  299         self.app_cred_api.authenticate(resp['id'], resp['secret'])
  300 
  301     def test_authenticate_not_found(self):
  302         self.assertRaises(AssertionError,
  303                           self.app_cred_api.authenticate,
  304                           uuid.uuid4().hex,
  305                           uuid.uuid4().hex)
  306 
  307     def test_authenticate_expired(self):
  308         yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1)
  309         app_cred = self._new_app_cred_data(self.user_foo['id'],
  310                                            project_id=self.project_bar['id'],
  311                                            expires=yesterday)
  312         resp = self.app_cred_api.create_application_credential(app_cred)
  313         self.assertRaises(AssertionError,
  314                           self.app_cred_api.authenticate,
  315                           resp['id'],
  316                           resp['secret'])
  317 
  318     def test_authenticate_bad_secret(self):
  319         app_cred = self._new_app_cred_data(self.user_foo['id'],
  320                                            project_id=self.project_bar['id'])
  321         resp = self.app_cred_api.create_application_credential(app_cred)
  322         badpass = 'badpass'
  323         self.assertNotEqual(badpass, resp['secret'])
  324         self.assertRaises(AssertionError,
  325                           self.app_cred_api.authenticate,
  326                           resp['id'],
  327                           badpass)
  328 
  329     def test_get_delete_access_rules(self):
  330         app_cred = self._new_app_cred_data(self.user_foo['id'],
  331                                            project_id=self.project_bar['id'])
  332         access_rule_id = uuid.uuid4().hex
  333         app_cred['access_rules'] = [{
  334             'id': access_rule_id,
  335             'service': uuid.uuid4().hex,
  336             'path': uuid.uuid4().hex,
  337             'method': uuid.uuid4().hex[16:]
  338         }]
  339         self.app_cred_api.create_application_credential(app_cred)
  340         self.assertDictEqual(app_cred['access_rules'][0],
  341                              self.app_cred_api.get_access_rule(access_rule_id))
  342         self.app_cred_api.delete_application_credential(app_cred['id'])
  343         self.app_cred_api.delete_access_rule(access_rule_id)
  344         self.assertRaises(exception.AccessRuleNotFound,
  345                           self.app_cred_api.get_access_rule,
  346                           access_rule_id)
  347 
  348     def test_list_delete_access_rule_for_user(self):
  349         app_cred = self._new_app_cred_data(self.user_foo['id'],
  350                                            project_id=self.project_bar['id'])
  351         access_rule_id = uuid.uuid4().hex
  352         app_cred['access_rules'] = [{
  353             'id': access_rule_id,
  354             'service': uuid.uuid4().hex,
  355             'path': uuid.uuid4().hex,
  356             'method': uuid.uuid4().hex[16:]
  357         }]
  358         self.app_cred_api.create_application_credential(app_cred)
  359         self.assertEqual(1, len(self.app_cred_api.list_access_rules_for_user(
  360             self.user_foo['id'])))
  361         self.app_cred_api.delete_application_credential(app_cred['id'])
  362         # access rule should still exist
  363         self.assertEqual(1, len(self.app_cred_api.list_access_rules_for_user(
  364             self.user_foo['id'])))
  365         self.app_cred_api.delete_access_rules_for_user(self.user_foo['id'])
  366         self.assertEqual(0, len(self.app_cred_api.list_access_rules_for_user(
  367             self.user_foo['id'])))