"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_project_endpoint.py" (13 May 2020, 19092 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_project_endpoint.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import uuid
   14 
   15 import http.client
   16 from oslo_serialization import jsonutils
   17 
   18 from keystone.common.policies import base as bp
   19 from keystone.common import provider_api
   20 import keystone.conf
   21 from keystone.tests.common import auth as common_auth
   22 from keystone.tests import unit
   23 from keystone.tests.unit import base_classes
   24 from keystone.tests.unit import ksfixtures
   25 from keystone.tests.unit.ksfixtures import temporaryfile
   26 
   27 CONF = keystone.conf.CONF
   28 PROVIDERS = provider_api.ProviderAPIs
   29 
   30 
   31 class _SystemUserProjectEndpointTests(object):
   32     """Common default functionality for all system users."""
   33 
   34     def test_user_can_list_projects_for_endpoint(self):
   35         project = PROVIDERS.resource_api.create_project(
   36             uuid.uuid4().hex,
   37             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
   38         )
   39 
   40         service = PROVIDERS.catalog_api.create_service(
   41             uuid.uuid4().hex, unit.new_service_ref()
   42         )
   43         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
   44         endpoint = PROVIDERS.catalog_api.create_endpoint(
   45             endpoint['id'], endpoint
   46         )
   47 
   48         PROVIDERS.catalog_api.add_endpoint_to_project(
   49             endpoint['id'], project['id'])
   50         with self.test_client() as c:
   51             r = c.get('/v3/OS-EP-FILTER/endpoints/%s/projects'
   52                       % endpoint['id'],
   53                       headers=self.headers)
   54             for project_itr in r.json['projects']:
   55                 self.assertIn(project['id'], project_itr['id'])
   56 
   57     def test_user_can_check_endpoint_in_project(self):
   58         project = PROVIDERS.resource_api.create_project(
   59             uuid.uuid4().hex,
   60             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
   61         )
   62 
   63         service = PROVIDERS.catalog_api.create_service(
   64             uuid.uuid4().hex, unit.new_service_ref()
   65         )
   66         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
   67         endpoint = PROVIDERS.catalog_api.create_endpoint(
   68             endpoint['id'], endpoint
   69         )
   70 
   71         PROVIDERS.catalog_api.add_endpoint_to_project(
   72             endpoint['id'], project['id'])
   73         with self.test_client() as c:
   74             c.get('/v3/OS-EP-FILTER/projects/%s/endpoints/%s'
   75                   % (project['id'], endpoint['id']),
   76                   headers=self.headers,
   77                   expected_status_code=http.client.NO_CONTENT)
   78 
   79     def test_user_can_list_endpoints_for_project(self):
   80         project = PROVIDERS.resource_api.create_project(
   81             uuid.uuid4().hex,
   82             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
   83         )
   84 
   85         service = PROVIDERS.catalog_api.create_service(
   86             uuid.uuid4().hex, unit.new_service_ref()
   87         )
   88         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
   89         endpoint = PROVIDERS.catalog_api.create_endpoint(
   90             endpoint['id'], endpoint
   91         )
   92 
   93         PROVIDERS.catalog_api.add_endpoint_to_project(
   94             endpoint['id'], project['id'])
   95         with self.test_client() as c:
   96             r = c.get('/v3/OS-EP-FILTER/projects/%s/endpoints' % project['id'],
   97                       headers=self.headers)
   98             for endpoint_itr in r.json['endpoints']:
   99                 self.assertIn(endpoint['id'], endpoint_itr['id'])
  100 
  101 
  102 class _SystemReaderAndMemberProjectEndpointTests(object):
  103 
  104     def test_user_cannot_add_endpoint_to_project(self):
  105         project = PROVIDERS.resource_api.create_project(
  106             uuid.uuid4().hex,
  107             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  108         )
  109 
  110         service = PROVIDERS.catalog_api.create_service(
  111             uuid.uuid4().hex, unit.new_service_ref()
  112         )
  113         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
  114         endpoint = PROVIDERS.catalog_api.create_endpoint(
  115             endpoint['id'], endpoint
  116         )
  117         with self.test_client() as c:
  118             c.put('/v3/OS-EP-FILTER/projects/%s/endpoints/%s'
  119                   % (project['id'], endpoint['id']),
  120                   headers=self.headers,
  121                   expected_status_code=http.client.FORBIDDEN)
  122 
  123     def test_user_cannot_remove_endpoint_from_project(self):
  124         project = PROVIDERS.resource_api.create_project(
  125             uuid.uuid4().hex,
  126             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  127         )
  128 
  129         service = PROVIDERS.catalog_api.create_service(
  130             uuid.uuid4().hex, unit.new_service_ref()
  131         )
  132         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
  133         endpoint = PROVIDERS.catalog_api.create_endpoint(
  134             endpoint['id'], endpoint
  135         )
  136         with self.test_client() as c:
  137             c.delete('/v3/OS-EP-FILTER/projects/%s/endpoints/%s'
  138                      % (project['id'], endpoint['id']),
  139                      headers=self.headers,
  140                      expected_status_code=http.client.FORBIDDEN)
  141 
  142 
  143 class _DomainAndProjectUserProjectEndpointTests(object):
  144 
  145     def test_user_cannot_list_projects_for_endpoint(self):
  146         project = PROVIDERS.resource_api.create_project(
  147             uuid.uuid4().hex,
  148             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  149         )
  150 
  151         service = PROVIDERS.catalog_api.create_service(
  152             uuid.uuid4().hex, unit.new_service_ref()
  153         )
  154         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
  155         endpoint = PROVIDERS.catalog_api.create_endpoint(
  156             endpoint['id'], endpoint
  157         )
  158 
  159         PROVIDERS.catalog_api.add_endpoint_to_project(
  160             endpoint['id'], project['id'])
  161         with self.test_client() as c:
  162             c.get('/v3/OS-EP-FILTER/endpoints/%s/projects' % endpoint['id'],
  163                   headers=self.headers,
  164                   expected_status_code=http.client.FORBIDDEN)
  165 
  166     def test_user_cannot_check_endpoint_in_project(self):
  167         project = PROVIDERS.resource_api.create_project(
  168             uuid.uuid4().hex,
  169             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  170         )
  171 
  172         service = PROVIDERS.catalog_api.create_service(
  173             uuid.uuid4().hex, unit.new_service_ref()
  174         )
  175         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
  176         endpoint = PROVIDERS.catalog_api.create_endpoint(
  177             endpoint['id'], endpoint
  178         )
  179 
  180         PROVIDERS.catalog_api.add_endpoint_to_project(
  181             endpoint['id'], project['id'])
  182         with self.test_client() as c:
  183             c.get('/v3/OS-EP-FILTER/projects/%s/endpoints/%s'
  184                   % (project['id'], endpoint['id']),
  185                   headers=self.headers,
  186                   expected_status_code=http.client.FORBIDDEN)
  187 
  188     def test_user_cannot_list_endpoints_for_project(self):
  189         project = PROVIDERS.resource_api.create_project(
  190             uuid.uuid4().hex,
  191             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  192         )
  193 
  194         service = PROVIDERS.catalog_api.create_service(
  195             uuid.uuid4().hex, unit.new_service_ref()
  196         )
  197         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
  198         endpoint = PROVIDERS.catalog_api.create_endpoint(
  199             endpoint['id'], endpoint
  200         )
  201 
  202         PROVIDERS.catalog_api.add_endpoint_to_project(
  203             endpoint['id'], project['id'])
  204         with self.test_client() as c:
  205             c.get('/v3/OS-EP-FILTER/projects/%s/endpoints' % project['id'],
  206                   headers=self.headers,
  207                   expected_status_code=http.client.FORBIDDEN)
  208 
  209 
  210 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  211                         common_auth.AuthTestMixin,
  212                         _SystemUserProjectEndpointTests,
  213                         _SystemReaderAndMemberProjectEndpointTests):
  214 
  215     def setUp(self):
  216         super(SystemReaderTests, self).setUp()
  217         self.loadapp()
  218         self.useFixture(ksfixtures.Policy(self.config_fixture))
  219         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  220 
  221         system_reader = unit.new_user_ref(
  222             domain_id=CONF.identity.default_domain_id
  223         )
  224         self.user_id = PROVIDERS.identity_api.create_user(
  225             system_reader
  226         )['id']
  227         PROVIDERS.assignment_api.create_system_grant_for_user(
  228             self.user_id, self.bootstrapper.reader_role_id
  229         )
  230 
  231         auth = self.build_authentication_request(
  232             user_id=self.user_id, password=system_reader['password'],
  233             system=True
  234         )
  235 
  236         # Grab a token using the persona we're testing and prepare headers
  237         # for requests we'll be making in the tests.
  238         with self.test_client() as c:
  239             r = c.post('/v3/auth/tokens', json=auth)
  240             self.token_id = r.headers['X-Subject-Token']
  241             self.headers = {'X-Auth-Token': self.token_id}
  242 
  243 
  244 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  245                         common_auth.AuthTestMixin,
  246                         _SystemUserProjectEndpointTests,
  247                         _SystemReaderAndMemberProjectEndpointTests):
  248 
  249     def setUp(self):
  250         super(SystemMemberTests, self).setUp()
  251         self.loadapp()
  252         self.useFixture(ksfixtures.Policy(self.config_fixture))
  253         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  254 
  255         system_member = unit.new_user_ref(
  256             domain_id=CONF.identity.default_domain_id
  257         )
  258         self.user_id = PROVIDERS.identity_api.create_user(
  259             system_member
  260         )['id']
  261         PROVIDERS.assignment_api.create_system_grant_for_user(
  262             self.user_id, self.bootstrapper.member_role_id
  263         )
  264 
  265         auth = self.build_authentication_request(
  266             user_id=self.user_id, password=system_member['password'],
  267             system=True
  268         )
  269 
  270         # Grab a token using the persona we're testing and prepare headers
  271         # for requests we'll be making in the tests.
  272         with self.test_client() as c:
  273             r = c.post('/v3/auth/tokens', json=auth)
  274             self.token_id = r.headers['X-Subject-Token']
  275             self.headers = {'X-Auth-Token': self.token_id}
  276 
  277 
  278 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  279                        common_auth.AuthTestMixin,
  280                        _SystemUserProjectEndpointTests):
  281 
  282     def setUp(self):
  283         super(SystemAdminTests, self).setUp()
  284         self.loadapp()
  285         self.useFixture(ksfixtures.Policy(self.config_fixture))
  286         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  287 
  288         # Reuse the system administrator account created during
  289         # ``keystone-manage bootstrap``
  290         self.user_id = self.bootstrapper.admin_user_id
  291         auth = self.build_authentication_request(
  292             user_id=self.user_id,
  293             password=self.bootstrapper.admin_password,
  294             system=True
  295         )
  296 
  297         # Grab a token using the persona we're testing and prepare headers
  298         # for requests we'll be making in the tests.
  299         with self.test_client() as c:
  300             r = c.post('/v3/auth/tokens', json=auth)
  301             self.token_id = r.headers['X-Subject-Token']
  302             self.headers = {'X-Auth-Token': self.token_id}
  303 
  304     def test_user_can_add_endpoint_to_project(self):
  305         project = PROVIDERS.resource_api.create_project(
  306             uuid.uuid4().hex,
  307             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  308         )
  309 
  310         service = PROVIDERS.catalog_api.create_service(
  311             uuid.uuid4().hex, unit.new_service_ref()
  312         )
  313         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
  314         endpoint = PROVIDERS.catalog_api.create_endpoint(
  315             endpoint['id'], endpoint
  316         )
  317         with self.test_client() as c:
  318             c.put('/v3/OS-EP-FILTER/projects/%s/endpoints/%s'
  319                   % (project['id'], endpoint['id']),
  320                   headers=self.headers,
  321                   expected_status_code=http.client.NO_CONTENT)
  322 
  323     def test_user_can_remove_endpoint_from_project(self):
  324         project = PROVIDERS.resource_api.create_project(
  325             uuid.uuid4().hex,
  326             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  327         )
  328 
  329         service = PROVIDERS.catalog_api.create_service(
  330             uuid.uuid4().hex, unit.new_service_ref()
  331         )
  332         endpoint = unit.new_endpoint_ref(service['id'], region_id=None)
  333         endpoint = PROVIDERS.catalog_api.create_endpoint(
  334             endpoint['id'], endpoint
  335         )
  336         PROVIDERS.catalog_api.add_endpoint_to_project(
  337             endpoint['id'], project['id'])
  338         with self.test_client() as c:
  339             c.delete('/v3/OS-EP-FILTER/projects/%s/endpoints/%s'
  340                      % (project['id'], endpoint['id']),
  341                      headers=self.headers,
  342                      expected_status_code=http.client.NO_CONTENT)
  343 
  344 
  345 class DomainUserTests(base_classes.TestCaseWithBootstrap,
  346                       common_auth.AuthTestMixin,
  347                       _DomainAndProjectUserProjectEndpointTests,
  348                       _SystemReaderAndMemberProjectEndpointTests):
  349 
  350     def setUp(self):
  351         super(DomainUserTests, self).setUp()
  352         self.loadapp()
  353         self.useFixture(ksfixtures.Policy(self.config_fixture))
  354         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  355 
  356         domain = PROVIDERS.resource_api.create_domain(
  357             uuid.uuid4().hex, unit.new_domain_ref()
  358         )
  359         self.domain_id = domain['id']
  360         domain_admin = unit.new_user_ref(domain_id=self.domain_id)
  361         self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
  362         PROVIDERS.assignment_api.create_grant(
  363             self.bootstrapper.admin_role_id, user_id=self.user_id,
  364             domain_id=self.domain_id
  365         )
  366 
  367         auth = self.build_authentication_request(
  368             user_id=self.user_id,
  369             password=domain_admin['password'],
  370             domain_id=self.domain_id
  371         )
  372 
  373         # Grab a token using the persona we're testing and prepare headers
  374         # for requests we'll be making in the tests.
  375         with self.test_client() as c:
  376             r = c.post('/v3/auth/tokens', json=auth)
  377             self.token_id = r.headers['X-Subject-Token']
  378             self.headers = {'X-Auth-Token': self.token_id}
  379 
  380 
  381 class ProjectUserTests(base_classes.TestCaseWithBootstrap,
  382                        common_auth.AuthTestMixin,
  383                        _DomainAndProjectUserProjectEndpointTests,
  384                        _SystemReaderAndMemberProjectEndpointTests):
  385 
  386     def setUp(self):
  387         super(ProjectUserTests, self).setUp()
  388         self.loadapp()
  389         self.useFixture(ksfixtures.Policy(self.config_fixture))
  390         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  391 
  392         self.user_id = self.bootstrapper.admin_user_id
  393         auth = self.build_authentication_request(
  394             user_id=self.user_id,
  395             password=self.bootstrapper.admin_password,
  396             project_id=self.bootstrapper.project_id
  397         )
  398 
  399         # Grab a token using the persona we're testing and prepare headers
  400         # for requests we'll be making in the tests.
  401         with self.test_client() as c:
  402             r = c.post('/v3/auth/tokens', json=auth)
  403             self.token_id = r.headers['X-Subject-Token']
  404             self.headers = {'X-Auth-Token': self.token_id}
  405 
  406 
  407 class ProjectUserTestsWithoutEnforceScope(
  408         base_classes.TestCaseWithBootstrap,
  409         common_auth.AuthTestMixin,
  410         _DomainAndProjectUserProjectEndpointTests,
  411         _SystemReaderAndMemberProjectEndpointTests):
  412 
  413     def _override_policy(self):
  414         # TODO(cmurphy): Remove this once the deprecated policies in
  415         # keystone.common.policies.project_endpoint have been removed. This is
  416         # only here to make sure we test the new policies instead of the
  417         # deprecated ones. Oslo.policy will OR deprecated policies with new
  418         # policies to maintain compatibility and give operators a chance to
  419         # update permissions or update policies without breaking users. This
  420         # will cause these specific tests to fail since we're trying to correct
  421         # this broken behavior with better scope checking.
  422         with open(self.policy_file_name, 'w') as f:
  423             overridden_policies = {
  424                 'identity:list_projects_for_endpoint': bp.SYSTEM_READER,
  425                 'identity:add_endpoint_to_project': bp.SYSTEM_ADMIN,
  426                 'identity:check_endpoint_in_project': bp.SYSTEM_READER,
  427                 'identity:list_endpoints_for_project': bp.SYSTEM_READER,
  428                 'identity:remove_endpoint_from_project': bp.SYSTEM_ADMIN
  429             }
  430             f.write(jsonutils.dumps(overridden_policies))
  431 
  432     def setUp(self):
  433         super(ProjectUserTestsWithoutEnforceScope, self).setUp()
  434         self.loadapp()
  435         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
  436         self.policy_file_name = self.policy_file.file_name
  437         self.useFixture(
  438             ksfixtures.Policy(
  439                 self.config_fixture, policy_file=self.policy_file_name
  440             )
  441         )
  442         self._override_policy()
  443         # Explicity set enforce_scope to False to make sure we maintain
  444         # backwards compatibility with project users.
  445         self.config_fixture.config(group='oslo_policy', enforce_scope=False)
  446 
  447         domain = PROVIDERS.resource_api.create_domain(
  448             uuid.uuid4().hex, unit.new_domain_ref()
  449         )
  450         user = unit.new_user_ref(domain_id=domain['id'])
  451         self.user_id = PROVIDERS.identity_api.create_user(user)['id']
  452 
  453         self.project_id = PROVIDERS.resource_api.create_project(
  454             uuid.uuid4().hex, unit.new_project_ref(domain_id=domain['id'])
  455         )['id']
  456 
  457         PROVIDERS.assignment_api.create_grant(
  458             self.bootstrapper.admin_role_id, user_id=self.user_id,
  459             project_id=self.project_id
  460         )
  461 
  462         auth = self.build_authentication_request(
  463             user_id=self.user_id,
  464             password=user['password'],
  465             project_id=self.project_id
  466         )
  467 
  468         # Grab a token using the persona we're testing and prepare headers
  469         # for requests we'll be making in the tests.
  470         with self.test_client() as c:
  471             r = c.post('/v3/auth/tokens', json=auth)
  472             self.token_id = r.headers['X-Subject-Token']
  473             self.headers = {'X-Auth-Token': self.token_id}