"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_protocols.py" (13 May 2020, 17093 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_protocols.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import uuid
   14 
   15 import http.client
   16 
   17 from keystone.common import provider_api
   18 import keystone.conf
   19 from keystone.tests.common import auth as common_auth
   20 from keystone.tests import unit
   21 from keystone.tests.unit import base_classes
   22 from keystone.tests.unit import ksfixtures
   23 
   24 CONF = keystone.conf.CONF
   25 PROVIDERS = provider_api.ProviderAPIs
   26 
   27 
   28 class _CommonUtilities(object):
   29 
   30     def _create_protocol_and_deps(self):
   31         identity_provider = unit.new_identity_provider_ref()
   32         identity_provider = PROVIDERS.federation_api.create_idp(
   33             identity_provider['id'], identity_provider
   34         )
   35 
   36         mapping = PROVIDERS.federation_api.create_mapping(
   37             uuid.uuid4().hex, unit.new_mapping_ref()
   38         )
   39         protocol = unit.new_protocol_ref(mapping_id=mapping['id'])
   40         protocol = PROVIDERS.federation_api.create_protocol(
   41             identity_provider['id'], protocol['id'], protocol
   42         )
   43         return (protocol, mapping, identity_provider)
   44 
   45 
   46 class _SystemUserProtocolTests(object):
   47     """Common default functionality for all system users."""
   48 
   49     def test_user_can_list_protocols(self):
   50         protocol, mapping, identity_provider = self._create_protocol_and_deps()
   51 
   52         with self.test_client() as c:
   53             path = (
   54                 '/v3/OS-FEDERATION/identity_providers/%s/protocols' %
   55                 identity_provider['id']
   56             )
   57             r = c.get(path, headers=self.headers)
   58             self.assertEqual(1, len(r.json['protocols']))
   59             self.assertEqual(protocol['id'], r.json['protocols'][0]['id'])
   60 
   61     def test_user_can_get_a_protocol(self):
   62         protocol, mapping, identity_provider = self._create_protocol_and_deps()
   63 
   64         with self.test_client() as c:
   65             path = (
   66                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
   67                 (identity_provider['id'], protocol['id'])
   68             )
   69             c.get(path, headers=self.headers)
   70 
   71 
   72 class _SystemReaderAndMemberProtocolTests(object):
   73 
   74     def test_user_cannot_create_protocols(self):
   75         identity_provider = unit.new_identity_provider_ref()
   76         identity_provider = PROVIDERS.federation_api.create_idp(
   77             identity_provider['id'], identity_provider
   78         )
   79 
   80         mapping = PROVIDERS.federation_api.create_mapping(
   81             uuid.uuid4().hex, unit.new_mapping_ref()
   82         )
   83 
   84         protocol_id = 'saml2'
   85         create = {'protocol': {'mapping_id': mapping['id']}}
   86 
   87         with self.test_client() as c:
   88             path = (
   89                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
   90                 (identity_provider['id'], protocol_id)
   91             )
   92             c.put(
   93                 path, json=create, headers=self.headers,
   94                 expected_status_code=http.client.FORBIDDEN
   95             )
   96 
   97     def test_user_cannot_update_protocols(self):
   98         protocol, mapping, identity_provider = self._create_protocol_and_deps()
   99 
  100         new_mapping = PROVIDERS.federation_api.create_mapping(
  101             uuid.uuid4().hex, unit.new_mapping_ref()
  102         )
  103 
  104         update = {'protocol': {'mapping_id': new_mapping['id']}}
  105         with self.test_client() as c:
  106             path = (
  107                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  108                 (identity_provider['id'], protocol['id'])
  109             )
  110             c.patch(
  111                 path, json=update, headers=self.headers,
  112                 expected_status_code=http.client.FORBIDDEN
  113             )
  114 
  115     def test_user_cannot_delete_protocol(self):
  116         protocol, mapping, identity_provider = self._create_protocol_and_deps()
  117 
  118         with self.test_client() as c:
  119             path = (
  120                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  121                 (identity_provider['id'], protocol['id'])
  122             )
  123             c.delete(
  124                 path, headers=self.headers,
  125                 expected_status_code=http.client.FORBIDDEN
  126             )
  127 
  128 
  129 class _DomainAndProjectUserProtocolTests(object):
  130 
  131     def test_user_cannot_create_protocols(self):
  132         identity_provider = unit.new_identity_provider_ref()
  133         identity_provider = PROVIDERS.federation_api.create_idp(
  134             identity_provider['id'], identity_provider
  135         )
  136 
  137         mapping = PROVIDERS.federation_api.create_mapping(
  138             uuid.uuid4().hex, unit.new_mapping_ref()
  139         )
  140 
  141         protocol_id = 'saml2'
  142         create = {'protocol': {'mapping_id': mapping['id']}}
  143 
  144         with self.test_client() as c:
  145             path = (
  146                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  147                 (identity_provider['id'], protocol_id)
  148             )
  149             c.put(
  150                 path, json=create, headers=self.headers,
  151                 expected_status_code=http.client.FORBIDDEN
  152             )
  153 
  154     def test_user_cannot_update_protocols(self):
  155         protocol, mapping, identity_provider = self._create_protocol_and_deps()
  156 
  157         new_mapping = PROVIDERS.federation_api.create_mapping(
  158             uuid.uuid4().hex, unit.new_mapping_ref()
  159         )
  160 
  161         update = {'protocol': {'mapping_id': new_mapping['id']}}
  162         with self.test_client() as c:
  163             path = (
  164                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  165                 (identity_provider['id'], protocol['id'])
  166             )
  167             c.patch(
  168                 path, json=update, headers=self.headers,
  169                 expected_status_code=http.client.FORBIDDEN
  170             )
  171 
  172     def test_user_cannot_delete_protocol(self):
  173         protocol, mapping, identity_provider = self._create_protocol_and_deps()
  174 
  175         with self.test_client() as c:
  176             path = (
  177                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  178                 (identity_provider['id'], protocol['id'])
  179             )
  180             c.delete(
  181                 path, headers=self.headers,
  182                 expected_status_code=http.client.FORBIDDEN
  183             )
  184 
  185     def test_user_cannot_list_protocols(self):
  186         protocol, mapping, identity_provider = self._create_protocol_and_deps()
  187 
  188         with self.test_client() as c:
  189             path = (
  190                 '/v3/OS-FEDERATION/identity_providers/%s/protocols' %
  191                 identity_provider['id']
  192             )
  193             c.get(
  194                 path, headers=self.headers,
  195                 expected_status_code=http.client.FORBIDDEN
  196             )
  197 
  198     def test_user_cannot_get_a_protocol(self):
  199         protocol, mapping, identity_provider = self._create_protocol_and_deps()
  200 
  201         with self.test_client() as c:
  202             path = (
  203                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  204                 (identity_provider['id'], protocol['id'])
  205             )
  206             c.get(
  207                 path, headers=self.headers,
  208                 expected_status_code=http.client.FORBIDDEN
  209             )
  210 
  211 
  212 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  213                         common_auth.AuthTestMixin,
  214                         _CommonUtilities,
  215                         _SystemUserProtocolTests,
  216                         _SystemReaderAndMemberProtocolTests):
  217 
  218     def setUp(self):
  219         super(SystemReaderTests, self).setUp()
  220         self.loadapp()
  221         self.useFixture(ksfixtures.Policy(self.config_fixture))
  222         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  223 
  224         system_reader = unit.new_user_ref(
  225             domain_id=CONF.identity.default_domain_id
  226         )
  227         self.user_id = PROVIDERS.identity_api.create_user(
  228             system_reader
  229         )['id']
  230         PROVIDERS.assignment_api.create_system_grant_for_user(
  231             self.user_id, self.bootstrapper.reader_role_id
  232         )
  233 
  234         auth = self.build_authentication_request(
  235             user_id=self.user_id, password=system_reader['password'],
  236             system=True
  237         )
  238 
  239         # Grab a token using the persona we're testing and prepare headers
  240         # for requests we'll be making in the tests.
  241         with self.test_client() as c:
  242             r = c.post('/v3/auth/tokens', json=auth)
  243             self.token_id = r.headers['X-Subject-Token']
  244             self.headers = {'X-Auth-Token': self.token_id}
  245 
  246 
  247 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  248                         common_auth.AuthTestMixin,
  249                         _CommonUtilities,
  250                         _SystemUserProtocolTests,
  251                         _SystemReaderAndMemberProtocolTests):
  252 
  253     def setUp(self):
  254         super(SystemMemberTests, self).setUp()
  255         self.loadapp()
  256         self.useFixture(ksfixtures.Policy(self.config_fixture))
  257         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  258 
  259         system_member = unit.new_user_ref(
  260             domain_id=CONF.identity.default_domain_id
  261         )
  262         self.user_id = PROVIDERS.identity_api.create_user(
  263             system_member
  264         )['id']
  265         PROVIDERS.assignment_api.create_system_grant_for_user(
  266             self.user_id, self.bootstrapper.member_role_id
  267         )
  268 
  269         auth = self.build_authentication_request(
  270             user_id=self.user_id, password=system_member['password'],
  271             system=True
  272         )
  273 
  274         # Grab a token using the persona we're testing and prepare headers
  275         # for requests we'll be making in the tests.
  276         with self.test_client() as c:
  277             r = c.post('/v3/auth/tokens', json=auth)
  278             self.token_id = r.headers['X-Subject-Token']
  279             self.headers = {'X-Auth-Token': self.token_id}
  280 
  281 
  282 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  283                        common_auth.AuthTestMixin,
  284                        _CommonUtilities,
  285                        _SystemUserProtocolTests):
  286 
  287     def setUp(self):
  288         super(SystemAdminTests, self).setUp()
  289         self.loadapp()
  290         self.useFixture(ksfixtures.Policy(self.config_fixture))
  291         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  292 
  293         # Reuse the system administrator account created during
  294         # ``keystone-manage bootstrap``
  295         self.user_id = self.bootstrapper.admin_user_id
  296         auth = self.build_authentication_request(
  297             user_id=self.user_id,
  298             password=self.bootstrapper.admin_password,
  299             system=True
  300         )
  301 
  302         # Grab a token using the persona we're testing and prepare headers
  303         # for requests we'll be making in the tests.
  304         with self.test_client() as c:
  305             r = c.post('/v3/auth/tokens', json=auth)
  306             self.token_id = r.headers['X-Subject-Token']
  307             self.headers = {'X-Auth-Token': self.token_id}
  308 
  309     def test_user_can_create_protocols(self):
  310         identity_provider = unit.new_identity_provider_ref()
  311         identity_provider = PROVIDERS.federation_api.create_idp(
  312             identity_provider['id'], identity_provider
  313         )
  314 
  315         mapping = PROVIDERS.federation_api.create_mapping(
  316             uuid.uuid4().hex, unit.new_mapping_ref()
  317         )
  318 
  319         protocol_id = 'saml2'
  320         create = {'protocol': {'mapping_id': mapping['id']}}
  321 
  322         with self.test_client() as c:
  323             path = (
  324                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  325                 (identity_provider['id'], protocol_id)
  326             )
  327             c.put(
  328                 path, json=create, headers=self.headers,
  329                 expected_status_code=http.client.CREATED
  330             )
  331 
  332     def test_user_can_update_protocols(self):
  333         protocol, mapping, identity_provider = self._create_protocol_and_deps()
  334 
  335         new_mapping = PROVIDERS.federation_api.create_mapping(
  336             uuid.uuid4().hex, unit.new_mapping_ref()
  337         )
  338 
  339         update = {'protocol': {'mapping_id': new_mapping['id']}}
  340         with self.test_client() as c:
  341             path = (
  342                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  343                 (identity_provider['id'], protocol['id'])
  344             )
  345             c.patch(path, json=update, headers=self.headers)
  346 
  347     def test_user_can_delete_protocol(self):
  348         protocol, mapping, identity_provider = self._create_protocol_and_deps()
  349 
  350         with self.test_client() as c:
  351             path = (
  352                 '/v3/OS-FEDERATION/identity_providers/%s/protocols/%s' %
  353                 (identity_provider['id'], protocol['id'])
  354             )
  355             c.delete(path, headers=self.headers)
  356 
  357 
  358 class DomainUserTests(base_classes.TestCaseWithBootstrap,
  359                       common_auth.AuthTestMixin,
  360                       _CommonUtilities,
  361                       _DomainAndProjectUserProtocolTests):
  362 
  363     def setUp(self):
  364         super(DomainUserTests, self).setUp()
  365         self.loadapp()
  366         self.useFixture(ksfixtures.Policy(self.config_fixture))
  367         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  368 
  369         domain = PROVIDERS.resource_api.create_domain(
  370             uuid.uuid4().hex, unit.new_domain_ref()
  371         )
  372         self.domain_id = domain['id']
  373         domain_admin = unit.new_user_ref(domain_id=self.domain_id)
  374         self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
  375         PROVIDERS.assignment_api.create_grant(
  376             self.bootstrapper.admin_role_id, user_id=self.user_id,
  377             domain_id=self.domain_id
  378         )
  379 
  380         auth = self.build_authentication_request(
  381             user_id=self.user_id,
  382             password=domain_admin['password'],
  383             domain_id=self.domain_id
  384         )
  385 
  386         # Grab a token using the persona we're testing and prepare headers
  387         # for requests we'll be making in the tests.
  388         with self.test_client() as c:
  389             r = c.post('/v3/auth/tokens', json=auth)
  390             self.token_id = r.headers['X-Subject-Token']
  391             self.headers = {'X-Auth-Token': self.token_id}
  392 
  393 
  394 class ProjectUserTests(base_classes.TestCaseWithBootstrap,
  395                        common_auth.AuthTestMixin,
  396                        _CommonUtilities,
  397                        _DomainAndProjectUserProtocolTests):
  398 
  399     def setUp(self):
  400         super(ProjectUserTests, self).setUp()
  401         self.loadapp()
  402         self.useFixture(ksfixtures.Policy(self.config_fixture))
  403         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  404 
  405         self.user_id = self.bootstrapper.admin_user_id
  406         auth = self.build_authentication_request(
  407             user_id=self.user_id,
  408             password=self.bootstrapper.admin_password,
  409             project_id=self.bootstrapper.project_id
  410         )
  411 
  412         # Grab a token using the persona we're testing and prepare headers
  413         # for requests we'll be making in the tests.
  414         with self.test_client() as c:
  415             r = c.post('/v3/auth/tokens', json=auth)
  416             self.token_id = r.headers['X-Subject-Token']
  417             self.headers = {'X-Auth-Token': self.token_id}
  418 
  419 
  420 class ProjectUserTestsWithoutEnforceScope(
  421         base_classes.TestCaseWithBootstrap,
  422         common_auth.AuthTestMixin,
  423         _CommonUtilities,
  424         _DomainAndProjectUserProtocolTests):
  425 
  426     def setUp(self):
  427         super(ProjectUserTestsWithoutEnforceScope, self).setUp()
  428         self.loadapp()
  429         self.useFixture(ksfixtures.Policy(self.config_fixture))
  430 
  431         # Explicityly set enforce_scope to False to make sure we maintain
  432         # backwards compatibility with project users.
  433         self.config_fixture.config(group='oslo_policy', enforce_scope=False)
  434 
  435         domain = PROVIDERS.resource_api.create_domain(
  436             uuid.uuid4().hex, unit.new_domain_ref()
  437         )
  438         user = unit.new_user_ref(domain_id=domain['id'])
  439         self.user_id = PROVIDERS.identity_api.create_user(user)['id']
  440 
  441         self.project_id = PROVIDERS.resource_api.create_project(
  442             uuid.uuid4().hex, unit.new_project_ref(domain_id=domain['id'])
  443         )['id']
  444 
  445         PROVIDERS.assignment_api.create_grant(
  446             self.bootstrapper.member_role_id, user_id=self.user_id,
  447             project_id=self.project_id
  448         )
  449 
  450         auth = self.build_authentication_request(
  451             user_id=self.user_id,
  452             password=user['password'],
  453             project_id=self.project_id
  454         )
  455 
  456         # Grab a token using the persona we're testing and prepare headers
  457         # for requests we'll be making in the tests.
  458         with self.test_client() as c:
  459             r = c.post('/v3/auth/tokens', json=auth)
  460             self.token_id = r.headers['X-Subject-Token']
  461             self.headers = {'X-Auth-Token': self.token_id}