"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_services.py" (13 May 2020, 14003 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_services.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import uuid
   14 
   15 import http.client
   16 
   17 from keystone.common import provider_api
   18 import keystone.conf
   19 from keystone.tests.common import auth as common_auth
   20 from keystone.tests import unit
   21 from keystone.tests.unit import base_classes
   22 from keystone.tests.unit import ksfixtures
   23 
   24 CONF = keystone.conf.CONF
   25 PROVIDERS = provider_api.ProviderAPIs
   26 
   27 
   28 class _SystemUserServiceTests(object):
   29     """Common default functionality for all system users."""
   30 
   31     def test_user_can_list_services(self):
   32         expected_service_ids = []
   33         for _ in range(2):
   34             s = unit.new_service_ref()
   35             service = PROVIDERS.catalog_api.create_service(s['id'], s)
   36             expected_service_ids.append(service['id'])
   37 
   38         with self.test_client() as c:
   39             r = c.get('/v3/services', headers=self.headers)
   40 
   41             actual_service_ids = []
   42             for service in r.json['services']:
   43                 actual_service_ids.append(service['id'])
   44 
   45             for service_id in expected_service_ids:
   46                 self.assertIn(service_id, actual_service_ids)
   47 
   48     def test_user_can_get_a_service(self):
   49         service = unit.new_service_ref()
   50         service = PROVIDERS.catalog_api.create_service(service['id'], service)
   51 
   52         with self.test_client() as c:
   53             r = c.get('/v3/services/%s' % service['id'], headers=self.headers)
   54             self.assertEqual(r.json['service']['id'], service['id'])
   55 
   56 
   57 class _SystemReaderAndMemberUserServiceTests(object):
   58     """Common default functionality for system readers and system members."""
   59 
   60     def test_user_cannot_create_services(self):
   61         create = {
   62             'service': {
   63                 'type': uuid.uuid4().hex,
   64                 'name': uuid.uuid4().hex,
   65             }
   66         }
   67 
   68         with self.test_client() as c:
   69             c.post(
   70                 '/v3/services', json=create, headers=self.headers,
   71                 expected_status_code=http.client.FORBIDDEN
   72             )
   73 
   74     def test_user_cannot_update_services(self):
   75         service = unit.new_service_ref()
   76         service = PROVIDERS.catalog_api.create_service(service['id'], service)
   77 
   78         update = {'service': {'description': uuid.uuid4().hex}}
   79 
   80         with self.test_client() as c:
   81             c.patch(
   82                 '/v3/services/%s' % service['id'], json=update,
   83                 headers=self.headers,
   84                 expected_status_code=http.client.FORBIDDEN
   85             )
   86 
   87     def test_user_cannot_delete_services(self):
   88         service = unit.new_service_ref()
   89         service = PROVIDERS.catalog_api.create_service(service['id'], service)
   90 
   91         with self.test_client() as c:
   92             c.delete(
   93                 '/v3/services/%s' % service['id'], headers=self.headers,
   94                 expected_status_code=http.client.FORBIDDEN
   95             )
   96 
   97 
   98 class _DomainAndProjectUserServiceTests(object):
   99 
  100     def test_user_cannot_create_services(self):
  101         create = {
  102             'service': {
  103                 'type': uuid.uuid4().hex,
  104                 'name': uuid.uuid4().hex,
  105             }
  106         }
  107 
  108         with self.test_client() as c:
  109             c.post(
  110                 '/v3/services', json=create, headers=self.headers,
  111                 expected_status_code=http.client.FORBIDDEN
  112             )
  113 
  114     def test_user_cannot_list_services(self):
  115         service = unit.new_service_ref()
  116         PROVIDERS.catalog_api.create_service(service['id'], service)
  117 
  118         with self.test_client() as c:
  119             c.get(
  120                 '/v3/services', headers=self.headers,
  121                 expected_status_code=http.client.FORBIDDEN
  122             )
  123 
  124     def test_user_cannot_get_a_service(self):
  125         service = unit.new_service_ref()
  126         service = PROVIDERS.catalog_api.create_service(service['id'], service)
  127 
  128         with self.test_client() as c:
  129             c.get(
  130                 '/v3/services/%s' % service['id'], headers=self.headers,
  131                 expected_status_code=http.client.FORBIDDEN
  132             )
  133 
  134     def test_user_cannot_update_services(self):
  135         service = unit.new_service_ref()
  136         service = PROVIDERS.catalog_api.create_service(service['id'], service)
  137 
  138         update = {'service': {'description': uuid.uuid4().hex}}
  139 
  140         with self.test_client() as c:
  141             c.patch(
  142                 '/v3/services/%s' % service['id'], json=update,
  143                 headers=self.headers,
  144                 expected_status_code=http.client.FORBIDDEN
  145             )
  146 
  147     def test_user_cannot_delete_services(self):
  148         service = unit.new_service_ref()
  149         service = PROVIDERS.catalog_api.create_service(service['id'], service)
  150 
  151         with self.test_client() as c:
  152             c.delete(
  153                 '/v3/services/%s' % service['id'], headers=self.headers,
  154                 expected_status_code=http.client.FORBIDDEN
  155             )
  156 
  157 
  158 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  159                         common_auth.AuthTestMixin,
  160                         _SystemUserServiceTests,
  161                         _SystemReaderAndMemberUserServiceTests):
  162 
  163     def setUp(self):
  164         super(SystemReaderTests, self).setUp()
  165         self.loadapp()
  166         self.useFixture(ksfixtures.Policy(self.config_fixture))
  167         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  168 
  169         system_reader = unit.new_user_ref(
  170             domain_id=CONF.identity.default_domain_id
  171         )
  172         self.user_id = PROVIDERS.identity_api.create_user(
  173             system_reader
  174         )['id']
  175         PROVIDERS.assignment_api.create_system_grant_for_user(
  176             self.user_id, self.bootstrapper.reader_role_id
  177         )
  178 
  179         auth = self.build_authentication_request(
  180             user_id=self.user_id, password=system_reader['password'],
  181             system=True
  182         )
  183 
  184         # Grab a token using the persona we're testing and prepare headers
  185         # for requests we'll be making in the tests.
  186         with self.test_client() as c:
  187             r = c.post('/v3/auth/tokens', json=auth)
  188             self.token_id = r.headers['X-Subject-Token']
  189             self.headers = {'X-Auth-Token': self.token_id}
  190 
  191 
  192 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  193                         common_auth.AuthTestMixin,
  194                         _SystemUserServiceTests,
  195                         _SystemReaderAndMemberUserServiceTests):
  196 
  197     def setUp(self):
  198         super(SystemMemberTests, self).setUp()
  199         self.loadapp()
  200         self.useFixture(ksfixtures.Policy(self.config_fixture))
  201         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  202 
  203         system_member = unit.new_user_ref(
  204             domain_id=CONF.identity.default_domain_id
  205         )
  206         self.user_id = PROVIDERS.identity_api.create_user(
  207             system_member
  208         )['id']
  209         PROVIDERS.assignment_api.create_system_grant_for_user(
  210             self.user_id, self.bootstrapper.member_role_id
  211         )
  212 
  213         auth = self.build_authentication_request(
  214             user_id=self.user_id, password=system_member['password'],
  215             system=True
  216         )
  217 
  218         # Grab a token using the persona we're testing and prepare headers
  219         # for requests we'll be making in the tests.
  220         with self.test_client() as c:
  221             r = c.post('/v3/auth/tokens', json=auth)
  222             self.token_id = r.headers['X-Subject-Token']
  223             self.headers = {'X-Auth-Token': self.token_id}
  224 
  225 
  226 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  227                        common_auth.AuthTestMixin,
  228                        _SystemUserServiceTests):
  229 
  230     def setUp(self):
  231         super(SystemAdminTests, self).setUp()
  232         self.loadapp()
  233         self.useFixture(ksfixtures.Policy(self.config_fixture))
  234         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  235 
  236         # Reuse the system administrator account created during
  237         # ``keystone-manage bootstrap``
  238         self.user_id = self.bootstrapper.admin_user_id
  239         auth = self.build_authentication_request(
  240             user_id=self.user_id,
  241             password=self.bootstrapper.admin_password,
  242             system=True
  243         )
  244 
  245         # Grab a token using the persona we're testing and prepare headers
  246         # for requests we'll be making in the tests.
  247         with self.test_client() as c:
  248             r = c.post('/v3/auth/tokens', json=auth)
  249             self.token_id = r.headers['X-Subject-Token']
  250             self.headers = {'X-Auth-Token': self.token_id}
  251 
  252     def test_user_can_create_services(self):
  253         create = {
  254             'service': {
  255                 'type': uuid.uuid4().hex,
  256                 'name': uuid.uuid4().hex,
  257             }
  258         }
  259 
  260         with self.test_client() as c:
  261             c.post('/v3/services', json=create, headers=self.headers)
  262 
  263     def test_user_can_update_services(self):
  264         service = unit.new_service_ref()
  265         service = PROVIDERS.catalog_api.create_service(service['id'], service)
  266 
  267         update = {'service': {'description': uuid.uuid4().hex}}
  268 
  269         with self.test_client() as c:
  270             c.patch(
  271                 '/v3/services/%s' % service['id'], json=update,
  272                 headers=self.headers
  273             )
  274 
  275     def test_user_can_delete_services(self):
  276         service = unit.new_service_ref()
  277         service = PROVIDERS.catalog_api.create_service(service['id'], service)
  278 
  279         with self.test_client() as c:
  280             c.delete('/v3/services/%s' % service['id'], headers=self.headers)
  281 
  282 
  283 class DomainUserTests(base_classes.TestCaseWithBootstrap,
  284                       common_auth.AuthTestMixin,
  285                       _DomainAndProjectUserServiceTests):
  286 
  287     def setUp(self):
  288         super(DomainUserTests, self).setUp()
  289         self.loadapp()
  290         self.useFixture(ksfixtures.Policy(self.config_fixture))
  291         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  292 
  293         domain = PROVIDERS.resource_api.create_domain(
  294             uuid.uuid4().hex, unit.new_domain_ref()
  295         )
  296         self.domain_id = domain['id']
  297         domain_admin = unit.new_user_ref(domain_id=self.domain_id)
  298         self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
  299         PROVIDERS.assignment_api.create_grant(
  300             self.bootstrapper.admin_role_id, user_id=self.user_id,
  301             domain_id=self.domain_id
  302         )
  303 
  304         auth = self.build_authentication_request(
  305             user_id=self.user_id,
  306             password=domain_admin['password'],
  307             domain_id=self.domain_id
  308         )
  309 
  310         # Grab a token using the persona we're testing and prepare headers
  311         # for requests we'll be making in the tests.
  312         with self.test_client() as c:
  313             r = c.post('/v3/auth/tokens', json=auth)
  314             self.token_id = r.headers['X-Subject-Token']
  315             self.headers = {'X-Auth-Token': self.token_id}
  316 
  317 
  318 class ProjectUserTests(base_classes.TestCaseWithBootstrap,
  319                        common_auth.AuthTestMixin,
  320                        _DomainAndProjectUserServiceTests):
  321 
  322     def setUp(self):
  323         super(ProjectUserTests, self).setUp()
  324         self.loadapp()
  325         self.useFixture(ksfixtures.Policy(self.config_fixture))
  326         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  327 
  328         self.user_id = self.bootstrapper.admin_user_id
  329         auth = self.build_authentication_request(
  330             user_id=self.user_id,
  331             password=self.bootstrapper.admin_password,
  332             project_id=self.bootstrapper.project_id
  333         )
  334 
  335         # Grab a token using the persona we're testing and prepare headers
  336         # for requests we'll be making in the tests.
  337         with self.test_client() as c:
  338             r = c.post('/v3/auth/tokens', json=auth)
  339             self.token_id = r.headers['X-Subject-Token']
  340             self.headers = {'X-Auth-Token': self.token_id}
  341 
  342 
  343 class ProjectUserTestsWithoutEnforceScope(
  344         base_classes.TestCaseWithBootstrap,
  345         common_auth.AuthTestMixin,
  346         _DomainAndProjectUserServiceTests):
  347 
  348     def setUp(self):
  349         super(ProjectUserTestsWithoutEnforceScope, self).setUp()
  350         self.loadapp()
  351         self.useFixture(ksfixtures.Policy(self.config_fixture))
  352 
  353         # Explicityly set enforce_scope to False to make sure we maintain
  354         # backwards compatibility with project users.
  355         self.config_fixture.config(group='oslo_policy', enforce_scope=False)
  356 
  357         domain = PROVIDERS.resource_api.create_domain(
  358             uuid.uuid4().hex, unit.new_domain_ref()
  359         )
  360         user = unit.new_user_ref(domain_id=domain['id'])
  361         self.user_id = PROVIDERS.identity_api.create_user(user)['id']
  362 
  363         self.project_id = PROVIDERS.resource_api.create_project(
  364             uuid.uuid4().hex, unit.new_project_ref(domain_id=domain['id'])
  365         )['id']
  366 
  367         PROVIDERS.assignment_api.create_grant(
  368             self.bootstrapper.member_role_id, user_id=self.user_id,
  369             project_id=self.project_id
  370         )
  371 
  372         auth = self.build_authentication_request(
  373             user_id=self.user_id,
  374             password=user['password'],
  375             project_id=self.project_id
  376         )
  377 
  378         # Grab a token using the persona we're testing and prepare headers
  379         # for requests we'll be making in the tests.
  380         with self.test_client() as c:
  381             r = c.post('/v3/auth/tokens', json=auth)
  382             self.token_id = r.headers['X-Subject-Token']
  383             self.headers = {'X-Auth-Token': self.token_id}