"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_policy.py" (13 May 2020, 14341 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_policy.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import json
   14 import uuid
   15 
   16 import http.client
   17 
   18 from keystone.common import provider_api
   19 import keystone.conf
   20 from keystone.tests.common import auth as common_auth
   21 from keystone.tests import unit
   22 from keystone.tests.unit import base_classes
   23 from keystone.tests.unit import ksfixtures
   24 
   25 CONF = keystone.conf.CONF
   26 PROVIDERS = provider_api.ProviderAPIs
   27 
   28 
   29 class _SystemUserPoliciesTests(object):
   30     """Common default functionality for all system users."""
   31 
   32     def test_user_can_list_policies(self):
   33         policy = unit.new_policy_ref()
   34         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
   35 
   36         with self.test_client() as c:
   37             r = c.get('/v3/policies', headers=self.headers)
   38             policies = []
   39             for policy in r.json['policies']:
   40                 policies.append(policy['id'])
   41 
   42             self.assertIn(policy['id'], policies)
   43 
   44     def test_user_can_get_policy(self):
   45         policy = unit.new_policy_ref()
   46         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
   47 
   48         with self.test_client() as c:
   49             c.get('/v3/policies/%s' % policy['id'],
   50                   headers=self.headers)
   51 
   52 
   53 class _SystemReaderAndMemberPoliciesTests(object):
   54     """Common default functionality for system readers and system members."""
   55 
   56     def test_user_cannot_create_policy(self):
   57         create = {
   58             'id': uuid.uuid4().hex,
   59             'name': uuid.uuid4().hex,
   60             'description': uuid.uuid4().hex,
   61             'enabled': True,
   62             # Store serialized JSON data as the blob to mimic real world usage.
   63             'blob': json.dumps({'data': uuid.uuid4().hex, }),
   64             'type': uuid.uuid4().hex,
   65         }
   66         with self.test_client() as c:
   67             c.post(
   68                 '/v3/policies', json=create, headers=self.headers,
   69                 expected_status_code=http.client.FORBIDDEN
   70             )
   71 
   72     def test_user_cannot_update_policy(self):
   73         policy = unit.new_policy_ref()
   74         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
   75 
   76         update = {'policy': {'name': uuid.uuid4().hex}}
   77 
   78         with self.test_client() as c:
   79             c.patch(
   80                 '/v3/policies/%s' % policy['id'], json=update,
   81                 headers=self.headers,
   82                 expected_status_code=http.client.FORBIDDEN
   83             )
   84 
   85     def test_user_cannot_delete_policy(self):
   86         policy = unit.new_policy_ref()
   87         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
   88 
   89         with self.test_client() as c:
   90             c.delete(
   91                 '/v3/policies/%s' % policy['id'], headers=self.headers,
   92                 expected_status_code=http.client.FORBIDDEN
   93             )
   94 
   95 
   96 class _DomainAndProjectUserPolicyTests(object):
   97 
   98     def test_user_cannot_list_policies(self):
   99         policy = unit.new_policy_ref()
  100         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
  101 
  102         with self.test_client() as c:
  103             c.get('/v3/policies', headers=self.headers,
  104                   expected_status_code=http.client.FORBIDDEN)
  105 
  106     def test_user_cannot_get_policy(self):
  107         policy = unit.new_policy_ref()
  108         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
  109 
  110         with self.test_client() as c:
  111             c.get('/v3/policies/%s' % policy['id'], headers=self.headers,
  112                   expected_status_code=http.client.FORBIDDEN)
  113 
  114     def test_user_cannot_create_policy(self):
  115         create = {
  116             'id': uuid.uuid4().hex,
  117             'name': uuid.uuid4().hex,
  118             'description': uuid.uuid4().hex,
  119             'enabled': True,
  120             # Store serialized JSON data as the blob to mimic real world usage.
  121             'blob': json.dumps({'data': uuid.uuid4().hex, }),
  122             'type': uuid.uuid4().hex,
  123         }
  124         with self.test_client() as c:
  125             c.post(
  126                 '/v3/policies', json=create, headers=self.headers,
  127                 expected_status_code=http.client.FORBIDDEN
  128             )
  129 
  130     def test_user_cannot_update_policy(self):
  131         policy = unit.new_policy_ref()
  132         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
  133 
  134         update = {'policy': {'name': uuid.uuid4().hex}}
  135 
  136         with self.test_client() as c:
  137             c.patch(
  138                 '/v3/policies/%s' % policy['id'], json=update,
  139                 headers=self.headers,
  140                 expected_status_code=http.client.FORBIDDEN
  141             )
  142 
  143     def test_user_cannot_delete_policy(self):
  144         policy = unit.new_policy_ref()
  145         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
  146 
  147         with self.test_client() as c:
  148             c.delete(
  149                 '/v3/policies/%s' % policy['id'], headers=self.headers,
  150                 expected_status_code=http.client.FORBIDDEN
  151             )
  152 
  153 
  154 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  155                         common_auth.AuthTestMixin,
  156                         _SystemUserPoliciesTests,
  157                         _SystemReaderAndMemberPoliciesTests):
  158 
  159     def setUp(self):
  160         super(SystemReaderTests, self).setUp()
  161         self.loadapp()
  162         self.useFixture(ksfixtures.Policy(self.config_fixture))
  163         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  164 
  165         system_reader = unit.new_user_ref(
  166             domain_id=CONF.identity.default_domain_id
  167         )
  168         self.user_id = PROVIDERS.identity_api.create_user(
  169             system_reader
  170         )['id']
  171         PROVIDERS.assignment_api.create_system_grant_for_user(
  172             self.user_id, self.bootstrapper.reader_role_id
  173         )
  174 
  175         auth = self.build_authentication_request(
  176             user_id=self.user_id, password=system_reader['password'],
  177             system=True
  178         )
  179 
  180         # Grab a token using the persona we're testing and prepare headers
  181         # for requests we'll be making in the tests.
  182         with self.test_client() as c:
  183             r = c.post('/v3/auth/tokens', json=auth)
  184             self.token_id = r.headers['X-Subject-Token']
  185             self.headers = {'X-Auth-Token': self.token_id}
  186 
  187 
  188 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  189                         common_auth.AuthTestMixin,
  190                         _SystemUserPoliciesTests,
  191                         _SystemReaderAndMemberPoliciesTests):
  192 
  193     def setUp(self):
  194         super(SystemMemberTests, self).setUp()
  195         self.loadapp()
  196         self.useFixture(ksfixtures.Policy(self.config_fixture))
  197         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  198 
  199         system_member = unit.new_user_ref(
  200             domain_id=CONF.identity.default_domain_id
  201         )
  202         self.user_id = PROVIDERS.identity_api.create_user(
  203             system_member
  204         )['id']
  205         PROVIDERS.assignment_api.create_system_grant_for_user(
  206             self.user_id, self.bootstrapper.member_role_id
  207         )
  208 
  209         auth = self.build_authentication_request(
  210             user_id=self.user_id, password=system_member['password'],
  211             system=True
  212         )
  213 
  214         # Grab a token using the persona we're testing and prepare headers
  215         # for requests we'll be making in the tests.
  216         with self.test_client() as c:
  217             r = c.post('/v3/auth/tokens', json=auth)
  218             self.token_id = r.headers['X-Subject-Token']
  219             self.headers = {'X-Auth-Token': self.token_id}
  220 
  221 
  222 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  223                        common_auth.AuthTestMixin,
  224                        _SystemUserPoliciesTests):
  225 
  226     def setUp(self):
  227         super(SystemAdminTests, self).setUp()
  228         self.loadapp()
  229         self.useFixture(ksfixtures.Policy(self.config_fixture))
  230         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  231 
  232         # Reuse the system administrator account created during
  233         # ``keystone-manage bootstrap``
  234         self.user_id = self.bootstrapper.admin_user_id
  235         auth = self.build_authentication_request(
  236             user_id=self.user_id,
  237             password=self.bootstrapper.admin_password,
  238             system=True
  239         )
  240 
  241         # Grab a token using the persona we're testing and prepare headers
  242         # for requests we'll be making in the tests.
  243         with self.test_client() as c:
  244             r = c.post('/v3/auth/tokens', json=auth)
  245             self.token_id = r.headers['X-Subject-Token']
  246             self.headers = {'X-Auth-Token': self.token_id}
  247 
  248     def test_user_can_create_policy(self):
  249         create = {
  250             'policy': {
  251                 'id': uuid.uuid4().hex,
  252                 'name': uuid.uuid4().hex,
  253                 'description': uuid.uuid4().hex,
  254                 'enabled': True,
  255                 # Store serialized JSON data as the blob to mimic real world
  256                 # usage.
  257                 'blob': json.dumps({'data': uuid.uuid4().hex, }),
  258                 'type': uuid.uuid4().hex
  259             }
  260         }
  261         with self.test_client() as c:
  262             c.post(
  263                 '/v3/policies', json=create, headers=self.headers
  264             )
  265 
  266     def test_user_can_update_policy(self):
  267         policy = unit.new_policy_ref()
  268         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
  269 
  270         update = {'policy': {'name': uuid.uuid4().hex}}
  271 
  272         with self.test_client() as c:
  273             c.patch(
  274                 '/v3/policies/%s' % policy['id'], json=update,
  275                 headers=self.headers
  276             )
  277 
  278     def test_user_can_delete_policy(self):
  279         policy = unit.new_policy_ref()
  280         policy = PROVIDERS.policy_api.create_policy(policy['id'], policy)
  281 
  282         with self.test_client() as c:
  283             c.delete(
  284                 '/v3/policies/%s' % policy['id'], headers=self.headers
  285             )
  286 
  287 
  288 class DomainUserTests(base_classes.TestCaseWithBootstrap,
  289                       common_auth.AuthTestMixin,
  290                       _DomainAndProjectUserPolicyTests):
  291 
  292     def setUp(self):
  293         super(DomainUserTests, self).setUp()
  294         self.loadapp()
  295         self.useFixture(ksfixtures.Policy(self.config_fixture))
  296         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  297 
  298         domain = PROVIDERS.resource_api.create_domain(
  299             uuid.uuid4().hex, unit.new_domain_ref()
  300         )
  301         self.domain_id = domain['id']
  302         domain_admin = unit.new_user_ref(domain_id=self.domain_id)
  303         self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
  304         PROVIDERS.assignment_api.create_grant(
  305             self.bootstrapper.admin_role_id, user_id=self.user_id,
  306             domain_id=self.domain_id
  307         )
  308 
  309         auth = self.build_authentication_request(
  310             user_id=self.user_id,
  311             password=domain_admin['password'],
  312             domain_id=self.domain_id
  313         )
  314 
  315         # Grab a token using the persona we're testing and prepare headers
  316         # for requests we'll be making in the tests.
  317         with self.test_client() as c:
  318             r = c.post('/v3/auth/tokens', json=auth)
  319             self.token_id = r.headers['X-Subject-Token']
  320             self.headers = {'X-Auth-Token': self.token_id}
  321 
  322 
  323 class ProjectUserTests(base_classes.TestCaseWithBootstrap,
  324                        common_auth.AuthTestMixin,
  325                        _DomainAndProjectUserPolicyTests):
  326 
  327     def setUp(self):
  328         super(ProjectUserTests, self).setUp()
  329         self.loadapp()
  330         self.useFixture(ksfixtures.Policy(self.config_fixture))
  331         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  332 
  333         self.user_id = self.bootstrapper.admin_user_id
  334         auth = self.build_authentication_request(
  335             user_id=self.user_id,
  336             password=self.bootstrapper.admin_password,
  337             project_id=self.bootstrapper.project_id
  338         )
  339 
  340         # Grab a token using the persona we're testing and prepare headers
  341         # for requests we'll be making in the tests.
  342         with self.test_client() as c:
  343             r = c.post('/v3/auth/tokens', json=auth)
  344             self.token_id = r.headers['X-Subject-Token']
  345             self.headers = {'X-Auth-Token': self.token_id}
  346 
  347 
  348 class ProjectUserTestsWithoutEnforceScope(
  349         base_classes.TestCaseWithBootstrap,
  350         common_auth.AuthTestMixin,
  351         _DomainAndProjectUserPolicyTests):
  352 
  353     def setUp(self):
  354         super(ProjectUserTestsWithoutEnforceScope, self).setUp()
  355         self.loadapp()
  356         self.useFixture(ksfixtures.Policy(self.config_fixture))
  357 
  358         # Explicityly set enforce_scope to False to make sure we maintain
  359         # backwards compatibility with project users.
  360         self.config_fixture.config(group='oslo_policy', enforce_scope=False)
  361 
  362         domain = PROVIDERS.resource_api.create_domain(
  363             uuid.uuid4().hex, unit.new_domain_ref()
  364         )
  365         user = unit.new_user_ref(domain_id=domain['id'])
  366         self.user_id = PROVIDERS.identity_api.create_user(user)['id']
  367 
  368         self.project_id = PROVIDERS.resource_api.create_project(
  369             uuid.uuid4().hex, unit.new_project_ref(domain_id=domain['id'])
  370         )['id']
  371 
  372         PROVIDERS.assignment_api.create_grant(
  373             self.bootstrapper.member_role_id, user_id=self.user_id,
  374             project_id=self.project_id
  375         )
  376 
  377         auth = self.build_authentication_request(
  378             user_id=self.user_id,
  379             password=user['password'],
  380             project_id=self.project_id
  381         )
  382 
  383         # Grab a token using the persona we're testing and prepare headers
  384         # for requests we'll be making in the tests.
  385         with self.test_client() as c:
  386             r = c.post('/v3/auth/tokens', json=auth)
  387             self.token_id = r.headers['X-Subject-Token']
  388             self.headers = {'X-Auth-Token': self.token_id}