"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_domains.py" (13 May 2020, 20270 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_domains.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import uuid
   14 
   15 import http.client
   16 from oslo_serialization import jsonutils
   17 
   18 from keystone.common.policies import domain as dp
   19 from keystone.common import provider_api
   20 import keystone.conf
   21 from keystone.tests.common import auth as common_auth
   22 from keystone.tests import unit
   23 from keystone.tests.unit import base_classes
   24 from keystone.tests.unit import ksfixtures
   25 from keystone.tests.unit.ksfixtures import temporaryfile
   26 
   27 CONF = keystone.conf.CONF
   28 PROVIDERS = provider_api.ProviderAPIs
   29 
   30 
   31 class _SystemUserDomainTests(object):
   32 
   33     def test_user_can_list_domains(self):
   34         domain = PROVIDERS.resource_api.create_domain(
   35             uuid.uuid4().hex, unit.new_domain_ref()
   36         )
   37 
   38         with self.test_client() as c:
   39             r = c.get('/v3/domains', headers=self.headers)
   40             domain_ids = []
   41             for domain in r.json['domains']:
   42                 domain_ids.append(domain['id'])
   43             self.assertIn(domain['id'], domain_ids)
   44 
   45     def test_user_can_filter_domains_by_name(self):
   46         domain_name = uuid.uuid4().hex
   47         domain = unit.new_domain_ref(name=domain_name)
   48         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
   49 
   50         PROVIDERS.resource_api.create_domain(
   51             uuid.uuid4().hex, unit.new_domain_ref()
   52         )
   53 
   54         with self.test_client() as c:
   55             r = c.get(
   56                 '/v3/domains?name=%s' % domain_name,
   57                 headers=self.headers
   58             )
   59             self.assertEqual(1, len(r.json['domains']))
   60             self.assertEqual(domain['id'], r.json['domains'][0]['id'])
   61 
   62     def test_user_can_filter_domains_by_enabled(self):
   63         enabled_domain = PROVIDERS.resource_api.create_domain(
   64             uuid.uuid4().hex, unit.new_domain_ref()
   65         )
   66         disabled_domain = PROVIDERS.resource_api.create_domain(
   67             uuid.uuid4().hex, unit.new_domain_ref(enabled=False)
   68         )
   69 
   70         with self.test_client() as c:
   71             r = c.get('/v3/domains?enabled=true', headers=self.headers)
   72             enabled_domain_ids = []
   73             for domain in r.json['domains']:
   74                 enabled_domain_ids.append(domain['id'])
   75             self.assertIn(enabled_domain['id'], enabled_domain_ids)
   76             self.assertNotIn(disabled_domain['id'], enabled_domain_ids)
   77 
   78             r = c.get('/v3/domains?enabled=false', headers=self.headers)
   79             disabled_domain_ids = []
   80             for domain in r.json['domains']:
   81                 disabled_domain_ids.append(domain['id'])
   82             self.assertIn(disabled_domain['id'], disabled_domain_ids)
   83             self.assertNotIn(enabled_domain['id'], disabled_domain_ids)
   84 
   85     def test_user_can_get_a_domain(self):
   86         domain = PROVIDERS.resource_api.create_domain(
   87             uuid.uuid4().hex, unit.new_domain_ref()
   88         )
   89 
   90         with self.test_client() as c:
   91             r = c.get('/v3/domains/%s' % domain['id'], headers=self.headers)
   92             self.assertEqual(domain['id'], r.json['domain']['id'])
   93 
   94 
   95 class _SystemMemberAndReaderDomainTests(object):
   96 
   97     def test_user_cannot_create_a_domain(self):
   98         create = {'domain': {'name': uuid.uuid4().hex}}
   99 
  100         with self.test_client() as c:
  101             c.post(
  102                 '/v3/domains', json=create, headers=self.headers,
  103                 expected_status_code=http.client.FORBIDDEN
  104             )
  105 
  106     def test_user_cannot_update_a_domain(self):
  107         domain = PROVIDERS.resource_api.create_domain(
  108             uuid.uuid4().hex, unit.new_domain_ref()
  109         )
  110 
  111         update = {'domain': {'description': uuid.uuid4().hex}}
  112         with self.test_client() as c:
  113             c.patch(
  114                 '/v3/domains/%s' % domain['id'], json=update,
  115                 headers=self.headers,
  116                 expected_status_code=http.client.FORBIDDEN
  117             )
  118 
  119     def test_user_cannot_delete_a_domain(self):
  120         domain = PROVIDERS.resource_api.create_domain(
  121             uuid.uuid4().hex, unit.new_domain_ref()
  122         )
  123 
  124         with self.test_client() as c:
  125             c.delete(
  126                 '/v3/domains/%s' % domain['id'], headers=self.headers,
  127                 expected_status_code=http.client.FORBIDDEN
  128             )
  129 
  130 
  131 class _DomainAndProjectUserDomainTests(object):
  132 
  133     def test_user_can_get_a_domain(self):
  134         with self.test_client() as c:
  135             r = c.get('/v3/domains/%s' % self.domain_id, headers=self.headers)
  136             self.assertEqual(self.domain_id, r.json['domain']['id'])
  137 
  138     def test_user_cannot_get_a_domain_they_are_not_authorized_to_access(self):
  139         domain = PROVIDERS.resource_api.create_domain(
  140             uuid.uuid4().hex, unit.new_domain_ref()
  141         )
  142 
  143         with self.test_client() as c:
  144             c.get(
  145                 '/v3/domains/%s' % domain['id'], headers=self.headers,
  146                 expected_status_code=http.client.FORBIDDEN
  147             )
  148 
  149     def test_user_cannot_list_domains(self):
  150         with self.test_client() as c:
  151             c.get(
  152                 '/v3/domains', headers=self.headers,
  153                 expected_status_code=http.client.FORBIDDEN
  154             )
  155 
  156     def test_user_cannot_filter_domains_by_name(self):
  157         domain_name = uuid.uuid4().hex
  158         domain = unit.new_domain_ref(name=domain_name)
  159         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
  160 
  161         PROVIDERS.resource_api.create_domain(
  162             uuid.uuid4().hex, unit.new_domain_ref()
  163         )
  164 
  165         with self.test_client() as c:
  166             c.get(
  167                 '/v3/domains?name=%s' % domain_name,
  168                 headers=self.headers,
  169                 expected_status_code=http.client.FORBIDDEN
  170             )
  171 
  172     def test_user_cannot_filter_domains_by_enabled(self):
  173         with self.test_client() as c:
  174             c.get(
  175                 '/v3/domains?enabled=true', headers=self.headers,
  176                 expected_status_code=http.client.FORBIDDEN
  177             )
  178             c.get(
  179                 '/v3/domains?enabled=false', headers=self.headers,
  180                 expected_status_code=http.client.FORBIDDEN
  181             )
  182 
  183     def test_user_cannot_update_a_domain(self):
  184         domain = PROVIDERS.resource_api.create_domain(
  185             uuid.uuid4().hex, unit.new_domain_ref()
  186         )
  187 
  188         update = {'domain': {'description': uuid.uuid4().hex}}
  189         with self.test_client() as c:
  190             c.patch(
  191                 '/v3/domains/%s' % domain['id'], json=update,
  192                 headers=self.headers,
  193                 expected_status_code=http.client.FORBIDDEN
  194             )
  195 
  196     def test_user_cannot_create_a_domain(self):
  197         create = {'domain': {'name': uuid.uuid4().hex}}
  198 
  199         with self.test_client() as c:
  200             c.post(
  201                 '/v3/domains', json=create, headers=self.headers,
  202                 expected_status_code=http.client.FORBIDDEN
  203             )
  204 
  205     def test_user_cannot_delete_a_domain(self):
  206         domain = PROVIDERS.resource_api.create_domain(
  207             uuid.uuid4().hex, unit.new_domain_ref()
  208         )
  209 
  210         with self.test_client() as c:
  211             update = {'domain': {'enabled': False}}
  212             path = '/v3/domains/%s' % domain['id']
  213             c.patch(
  214                 path, json=update, headers=self.headers,
  215                 expected_status_code=http.client.FORBIDDEN
  216             )
  217             c.delete(
  218                 path, headers=self.headers,
  219                 expected_status_code=http.client.FORBIDDEN
  220             )
  221 
  222     def test_user_cannot_get_non_existant_domain_forbidden(self):
  223 
  224         with self.test_client() as c:
  225             c.get(
  226                 '/v3/domains/%s' % uuid.uuid4().hex,
  227                 headers=self.headers,
  228                 expected_status_code=http.client.FORBIDDEN
  229             )
  230 
  231 
  232 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  233                         common_auth.AuthTestMixin,
  234                         _SystemUserDomainTests,
  235                         _SystemMemberAndReaderDomainTests):
  236 
  237     def setUp(self):
  238         super(SystemReaderTests, self).setUp()
  239         self.loadapp()
  240         self.useFixture(ksfixtures.Policy(self.config_fixture))
  241         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  242 
  243         system_reader = unit.new_user_ref(
  244             domain_id=CONF.identity.default_domain_id
  245         )
  246         self.system_reader_id = PROVIDERS.identity_api.create_user(
  247             system_reader
  248         )['id']
  249         PROVIDERS.assignment_api.create_system_grant_for_user(
  250             self.system_reader_id, self.bootstrapper.reader_role_id
  251         )
  252 
  253         auth = self.build_authentication_request(
  254             user_id=self.system_reader_id, password=system_reader['password'],
  255             system=True
  256         )
  257 
  258         # Grab a token using the persona we're testing and prepare headers
  259         # for requests we'll be making in the tests.
  260         with self.test_client() as c:
  261             r = c.post('/v3/auth/tokens', json=auth)
  262             self.token_id = r.headers['X-Subject-Token']
  263             self.headers = {'X-Auth-Token': self.token_id}
  264 
  265 
  266 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  267                         common_auth.AuthTestMixin,
  268                         _SystemUserDomainTests,
  269                         _SystemMemberAndReaderDomainTests):
  270 
  271     def setUp(self):
  272         super(SystemMemberTests, self).setUp()
  273         self.loadapp()
  274         self.useFixture(ksfixtures.Policy(self.config_fixture))
  275         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  276 
  277         system_member = unit.new_user_ref(
  278             domain_id=CONF.identity.default_domain_id
  279         )
  280         self.system_member_id = PROVIDERS.identity_api.create_user(
  281             system_member
  282         )['id']
  283         PROVIDERS.assignment_api.create_system_grant_for_user(
  284             self.system_member_id, self.bootstrapper.member_role_id
  285         )
  286 
  287         auth = self.build_authentication_request(
  288             user_id=self.system_member_id, password=system_member['password'],
  289             system=True
  290         )
  291 
  292         # Grab a token using the persona we're testing and prepare headers
  293         # for requests we'll be making in the tests.
  294         with self.test_client() as c:
  295             r = c.post('/v3/auth/tokens', json=auth)
  296             self.token_id = r.headers['X-Subject-Token']
  297             self.headers = {'X-Auth-Token': self.token_id}
  298 
  299 
  300 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  301                        common_auth.AuthTestMixin,
  302                        _SystemUserDomainTests):
  303 
  304     def setUp(self):
  305         super(SystemAdminTests, self).setUp()
  306         self.loadapp()
  307         self.useFixture(ksfixtures.Policy(self.config_fixture))
  308         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  309 
  310         self.system_admin_id = self.bootstrapper.admin_user_id
  311         auth = self.build_authentication_request(
  312             user_id=self.system_admin_id,
  313             password=self.bootstrapper.admin_password,
  314             system=True
  315         )
  316 
  317         # Grab a token using the persona we're testing and prepare headers
  318         # for requests we'll be making in the tests.
  319         with self.test_client() as c:
  320             r = c.post('/v3/auth/tokens', json=auth)
  321             self.token_id = r.headers['X-Subject-Token']
  322             self.headers = {'X-Auth-Token': self.token_id}
  323 
  324     def test_user_can_update_a_domain(self):
  325         domain = PROVIDERS.resource_api.create_domain(
  326             uuid.uuid4().hex, unit.new_domain_ref()
  327         )
  328 
  329         update = {'domain': {'description': uuid.uuid4().hex}}
  330         with self.test_client() as c:
  331             c.patch(
  332                 '/v3/domains/%s' % domain['id'], json=update,
  333                 headers=self.headers
  334             )
  335 
  336     def test_user_can_create_a_domain(self):
  337         create = {'domain': {'name': uuid.uuid4().hex}}
  338 
  339         with self.test_client() as c:
  340             c.post(
  341                 '/v3/domains', json=create, headers=self.headers
  342             )
  343 
  344     def test_user_can_delete_a_domain(self):
  345         domain = PROVIDERS.resource_api.create_domain(
  346             uuid.uuid4().hex, unit.new_domain_ref()
  347         )
  348 
  349         with self.test_client() as c:
  350             update = {'domain': {'enabled': False}}
  351             path = '/v3/domains/%s' % domain['id']
  352             c.patch(path, json=update, headers=self.headers)
  353             c.delete(path, headers=self.headers)
  354 
  355 
  356 class DomainUserTests(base_classes.TestCaseWithBootstrap,
  357                       common_auth.AuthTestMixin,
  358                       _DomainAndProjectUserDomainTests):
  359 
  360     def setUp(self):
  361         super(DomainUserTests, self).setUp()
  362         self.loadapp()
  363         self.useFixture(ksfixtures.Policy(self.config_fixture))
  364         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  365 
  366         domain = PROVIDERS.resource_api.create_domain(
  367             uuid.uuid4().hex, unit.new_domain_ref()
  368         )
  369         self.domain_id = domain['id']
  370         domain_user = unit.new_user_ref(domain_id=self.domain_id)
  371         self.domain_user_id = PROVIDERS.identity_api.create_user(
  372             domain_user
  373         )['id']
  374         PROVIDERS.assignment_api.create_grant(
  375             self.bootstrapper.member_role_id, user_id=self.domain_user_id,
  376             domain_id=self.domain_id
  377         )
  378 
  379         auth = self.build_authentication_request(
  380             user_id=self.domain_user_id, password=domain_user['password'],
  381             domain_id=self.domain_id
  382         )
  383 
  384         # Grab a token using the persona we're testing and prepare headers
  385         # for requests we'll be making in the tests.
  386         with self.test_client() as c:
  387             r = c.post('/v3/auth/tokens', json=auth)
  388             self.token_id = r.headers['X-Subject-Token']
  389             self.headers = {'X-Auth-Token': self.token_id}
  390 
  391 
  392 class ProjectReaderTests(base_classes.TestCaseWithBootstrap,
  393                          common_auth.AuthTestMixin,
  394                          _DomainAndProjectUserDomainTests):
  395 
  396     def setUp(self):
  397         super(ProjectReaderTests, self).setUp()
  398         self.loadapp()
  399         self.useFixture(ksfixtures.Policy(self.config_fixture))
  400         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  401 
  402         domain = PROVIDERS.resource_api.create_domain(
  403             uuid.uuid4().hex, unit.new_domain_ref()
  404         )
  405         self.domain_id = domain['id']
  406 
  407         project_reader = unit.new_user_ref(domain_id=self.domain_id)
  408         project_reader_id = PROVIDERS.identity_api.create_user(
  409             project_reader
  410         )['id']
  411         project = unit.new_project_ref(domain_id=self.domain_id)
  412         project_id = PROVIDERS.resource_api.create_project(
  413             project['id'], project
  414         )['id']
  415 
  416         PROVIDERS.assignment_api.create_grant(
  417             self.bootstrapper.reader_role_id, user_id=project_reader_id,
  418             project_id=project_id
  419         )
  420 
  421         auth = self.build_authentication_request(
  422             user_id=project_reader_id,
  423             password=project_reader['password'],
  424             project_id=project_id
  425         )
  426 
  427         # Grab a token using the persona we're testing and prepare headers
  428         # for requests we'll be making in the tests.
  429         with self.test_client() as c:
  430             r = c.post('/v3/auth/tokens', json=auth)
  431             self.token_id = r.headers['X-Subject-Token']
  432             self.headers = {'X-Auth-Token': self.token_id}
  433 
  434 
  435 class ProjectMemberTests(base_classes.TestCaseWithBootstrap,
  436                          common_auth.AuthTestMixin,
  437                          _DomainAndProjectUserDomainTests):
  438 
  439     def setUp(self):
  440         super(ProjectMemberTests, self).setUp()
  441         self.loadapp()
  442         self.useFixture(ksfixtures.Policy(self.config_fixture))
  443         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  444 
  445         domain = PROVIDERS.resource_api.create_domain(
  446             uuid.uuid4().hex, unit.new_domain_ref()
  447         )
  448         self.domain_id = domain['id']
  449 
  450         project_member = unit.new_user_ref(domain_id=self.domain_id)
  451         project_member_id = PROVIDERS.identity_api.create_user(
  452             project_member
  453         )['id']
  454         project = unit.new_project_ref(domain_id=self.domain_id)
  455         project_id = PROVIDERS.resource_api.create_project(
  456             project['id'], project
  457         )['id']
  458 
  459         PROVIDERS.assignment_api.create_grant(
  460             self.bootstrapper.member_role_id, user_id=project_member_id,
  461             project_id=project_id
  462         )
  463 
  464         auth = self.build_authentication_request(
  465             user_id=project_member_id,
  466             password=project_member['password'],
  467             project_id=project_id
  468         )
  469 
  470         # Grab a token using the persona we're testing and prepare headers
  471         # for requests we'll be making in the tests.
  472         with self.test_client() as c:
  473             r = c.post('/v3/auth/tokens', json=auth)
  474             self.token_id = r.headers['X-Subject-Token']
  475             self.headers = {'X-Auth-Token': self.token_id}
  476 
  477 
  478 class ProjectAdminTests(base_classes.TestCaseWithBootstrap,
  479                         common_auth.AuthTestMixin,
  480                         _DomainAndProjectUserDomainTests):
  481 
  482     def setUp(self):
  483         super(ProjectAdminTests, self).setUp()
  484         self.loadapp()
  485         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
  486         self.policy_file_name = self.policy_file.file_name
  487         self.useFixture(
  488             ksfixtures.Policy(
  489                 self.config_fixture, policy_file=self.policy_file_name
  490             )
  491         )
  492         self._override_policy()
  493         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  494 
  495         domain = PROVIDERS.resource_api.create_domain(
  496             uuid.uuid4().hex, unit.new_domain_ref()
  497         )
  498         self.domain_id = domain['id']
  499 
  500         project_admin = unit.new_user_ref(domain_id=self.domain_id)
  501         project_admin_id = PROVIDERS.identity_api.create_user(
  502             project_admin
  503         )['id']
  504         project = unit.new_project_ref(domain_id=self.domain_id)
  505         project_id = PROVIDERS.resource_api.create_project(
  506             project['id'], project
  507         )['id']
  508 
  509         PROVIDERS.assignment_api.create_grant(
  510             self.bootstrapper.admin_role_id, user_id=project_admin_id,
  511             project_id=project_id
  512         )
  513 
  514         auth = self.build_authentication_request(
  515             user_id=project_admin_id,
  516             password=project_admin['password'],
  517             project_id=project_id
  518         )
  519 
  520         # Grab a token using the persona we're testing and prepare headers
  521         # for requests we'll be making in the tests.
  522         with self.test_client() as c:
  523             r = c.post('/v3/auth/tokens', json=auth)
  524             self.token_id = r.headers['X-Subject-Token']
  525             self.headers = {'X-Auth-Token': self.token_id}
  526 
  527     def _override_policy(self):
  528         # TODO(lbragstad): Remove this once the deprecated policies in
  529         # keystone.common.policies.domains have been removed. This is only
  530         # here to make sure we test the new policies instead of the deprecated
  531         # ones. Oslo.policy will OR deprecated policies with new policies to
  532         # maintain compatibility and give operators a chance to update
  533         # permissions or update policies without breaking users. This will
  534         # cause these specific tests to fail since we're trying to correct this
  535         # broken behavior with better scope checking.
  536         with open(self.policy_file_name, 'w') as f:
  537             overridden_policies = {
  538                 'identity:get_domain': (
  539                     dp.SYSTEM_USER_OR_DOMAIN_USER_OR_PROJECT_USER
  540                 )
  541             }
  542             f.write(jsonutils.dumps(overridden_policies))