"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_projects.py" (13 May 2020, 35131 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_projects.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import uuid
   14 
   15 import http.client
   16 from oslo_serialization import jsonutils
   17 
   18 from keystone.common.policies import project as pp
   19 from keystone.common import provider_api
   20 import keystone.conf
   21 from keystone.tests.common import auth as common_auth
   22 from keystone.tests import unit
   23 from keystone.tests.unit import base_classes
   24 from keystone.tests.unit import ksfixtures
   25 from keystone.tests.unit.ksfixtures import temporaryfile
   26 
   27 CONF = keystone.conf.CONF
   28 PROVIDERS = provider_api.ProviderAPIs
   29 
   30 
   31 class _SystemUserTests(object):
   32     """Common default functionality for all system users."""
   33 
   34     def test_user_can_list_projects(self):
   35         PROVIDERS.resource_api.create_project(
   36             uuid.uuid4().hex,
   37             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
   38         )
   39 
   40         with self.test_client() as c:
   41             r = c.get('/v3/projects', headers=self.headers)
   42             self.assertEqual(2, len(r.json['projects']))
   43 
   44     def test_user_can_list_projects_for_other_users(self):
   45         project = PROVIDERS.resource_api.create_project(
   46             uuid.uuid4().hex,
   47             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
   48         )
   49 
   50         user = PROVIDERS.identity_api.create_user(
   51             unit.new_user_ref(
   52                 CONF.identity.default_domain_id,
   53                 id=uuid.uuid4().hex
   54             )
   55         )
   56 
   57         PROVIDERS.assignment_api.create_grant(
   58             self.bootstrapper.reader_role_id, user_id=user['id'],
   59             project_id=project['id']
   60         )
   61 
   62         with self.test_client() as c:
   63             r = c.get(
   64                 '/v3/users/%s/projects' % user['id'], headers=self.headers,
   65             )
   66             self.assertEqual(1, len(r.json['projects']))
   67             self.assertEqual(project['id'], r.json['projects'][0]['id'])
   68 
   69     def test_user_can_get_a_project(self):
   70         project = PROVIDERS.resource_api.create_project(
   71             uuid.uuid4().hex,
   72             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
   73         )
   74 
   75         with self.test_client() as c:
   76             r = c.get('/v3/projects/%s' % project['id'], headers=self.headers)
   77             self.assertEqual(project['id'], r.json['project']['id'])
   78 
   79     def test_user_cannot_get_non_existent_project_not_found(self):
   80         with self.test_client() as c:
   81             c.get(
   82                 '/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
   83                 expected_status_code=http.client.NOT_FOUND
   84             )
   85 
   86 
   87 class _SystemMemberAndReaderProjectTests(object):
   88     """Common default functionality for system members and system readers."""
   89 
   90     def test_user_cannot_create_projects(self):
   91         create = {
   92             'project': unit.new_project_ref(
   93                 domain_id=CONF.identity.default_domain_id
   94             )
   95         }
   96 
   97         with self.test_client() as c:
   98             c.post(
   99                 '/v3/projects', json=create, headers=self.headers,
  100                 expected_status_code=http.client.FORBIDDEN
  101             )
  102 
  103     def test_user_cannot_update_projects(self):
  104         project = PROVIDERS.resource_api.create_project(
  105             uuid.uuid4().hex,
  106             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  107         )
  108 
  109         update = {'project': {'description': uuid.uuid4().hex}}
  110 
  111         with self.test_client() as c:
  112             c.patch(
  113                 '/v3/projects/%s' % project['id'], json=update,
  114                 headers=self.headers,
  115                 expected_status_code=http.client.FORBIDDEN
  116             )
  117 
  118     def test_user_cannot_update_non_existent_project_forbidden(self):
  119         update = {'project': {'description': uuid.uuid4().hex}}
  120 
  121         with self.test_client() as c:
  122             c.patch(
  123                 '/v3/projects/%s' % uuid.uuid4().hex, json=update,
  124                 headers=self.headers,
  125                 expected_status_code=http.client.FORBIDDEN
  126             )
  127 
  128     def test_user_cannot_delete_projects(self):
  129         project = PROVIDERS.resource_api.create_project(
  130             uuid.uuid4().hex,
  131             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  132         )
  133 
  134         with self.test_client() as c:
  135             c.delete(
  136                 '/v3/projects/%s' % project['id'], headers=self.headers,
  137                 expected_status_code=http.client.FORBIDDEN
  138             )
  139 
  140     def test_user_cannot_delete_non_existent_project_forbidden(self):
  141         with self.test_client() as c:
  142             c.delete(
  143                 '/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
  144                 expected_status_code=http.client.FORBIDDEN
  145             )
  146 
  147 
  148 class _DomainUsersTests(object):
  149     """Common default functionality for all domain users."""
  150 
  151     def test_user_can_list_projects_within_domain(self):
  152         project = PROVIDERS.resource_api.create_project(
  153             uuid.uuid4().hex,
  154             unit.new_project_ref(domain_id=self.domain_id)
  155         )
  156 
  157         with self.test_client() as c:
  158             r = c.get('/v3/projects', headers=self.headers)
  159             self.assertEqual(1, len(r.json['projects']))
  160             self.assertEqual(project['id'], r.json['projects'][0]['id'])
  161 
  162     def test_user_cannot_list_projects_in_other_domain(self):
  163         PROVIDERS.resource_api.create_project(
  164             uuid.uuid4().hex,
  165             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  166         )
  167 
  168         with self.test_client() as c:
  169             r = c.get('/v3/projects', headers=self.headers)
  170             self.assertEqual(0, len(r.json['projects']))
  171 
  172     def test_user_can_get_a_project_within_domain(self):
  173         project = PROVIDERS.resource_api.create_project(
  174             uuid.uuid4().hex,
  175             unit.new_project_ref(domain_id=self.domain_id)
  176         )
  177 
  178         with self.test_client() as c:
  179             r = c.get('/v3/projects/%s' % project['id'], headers=self.headers)
  180             self.assertEqual(project['id'], r.json['project']['id'])
  181 
  182     def test_user_cannot_get_a_project_in_other_domain(self):
  183         project = PROVIDERS.resource_api.create_project(
  184             uuid.uuid4().hex,
  185             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  186         )
  187 
  188         with self.test_client() as c:
  189             c.get(
  190                 '/v3/projects/%s' % project['id'], headers=self.headers,
  191                 expected_status_code=http.client.FORBIDDEN
  192             )
  193 
  194     def test_user_can_list_projects_for_user_in_domain(self):
  195         user = PROVIDERS.identity_api.create_user(
  196             unit.new_user_ref(
  197                 self.domain_id,
  198                 id=uuid.uuid4().hex
  199             )
  200         )
  201 
  202         project = PROVIDERS.resource_api.create_project(
  203             uuid.uuid4().hex,
  204             unit.new_project_ref(domain_id=self.domain_id)
  205         )
  206 
  207         PROVIDERS.assignment_api.create_grant(
  208             self.bootstrapper.reader_role_id, user_id=user['id'],
  209             project_id=project['id']
  210         )
  211 
  212         with self.test_client() as c:
  213             r = c.get(
  214                 '/v3/users/%s/projects' % user['id'], headers=self.headers
  215             )
  216             self.assertEqual(1, len(r.json['projects']))
  217             self.assertEqual(project['id'], r.json['projects'][0]['id'])
  218 
  219     def test_user_cannot_list_projects_for_user_in_other_domain(self):
  220         user = PROVIDERS.identity_api.create_user(
  221             unit.new_user_ref(
  222                 CONF.identity.default_domain_id,
  223                 id=uuid.uuid4().hex
  224             )
  225         )
  226 
  227         project = PROVIDERS.resource_api.create_project(
  228             uuid.uuid4().hex,
  229             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  230         )
  231 
  232         PROVIDERS.assignment_api.create_grant(
  233             self.bootstrapper.reader_role_id, user_id=user['id'],
  234             project_id=project['id']
  235         )
  236 
  237         with self.test_client() as c:
  238             c.get(
  239                 '/v3/users/%s/projects' % user['id'], headers=self.headers,
  240                 expected_status_code=http.client.FORBIDDEN
  241             )
  242 
  243 
  244 class _DomainMemberAndReaderProjectTests(object):
  245     """Common default functionality for domain member and domain readers."""
  246 
  247     def test_user_cannot_create_projects_within_domain(self):
  248         create = {'project': unit.new_project_ref(domain_id=self.domain_id)}
  249 
  250         with self.test_client() as c:
  251             c.post(
  252                 '/v3/projects', json=create, headers=self.headers,
  253                 expected_status_code=http.client.FORBIDDEN
  254             )
  255 
  256     def test_user_cannot_create_projects_in_other_domains(self):
  257         create = {
  258             'project': unit.new_project_ref(
  259                 domain_id=CONF.identity.default_domain_id
  260             )
  261         }
  262 
  263         with self.test_client() as c:
  264             c.post(
  265                 '/v3/projects', json=create, headers=self.headers,
  266                 expected_status_code=http.client.FORBIDDEN
  267             )
  268 
  269     def test_user_cannot_update_projects_within_domain(self):
  270         project = PROVIDERS.resource_api.create_project(
  271             uuid.uuid4().hex,
  272             unit.new_project_ref(domain_id=self.domain_id)
  273         )
  274 
  275         update = {'project': {'description': uuid.uuid4().hex}}
  276 
  277         with self.test_client() as c:
  278             c.patch(
  279                 '/v3/projects/%s' % project['id'], json=update,
  280                 headers=self.headers,
  281                 expected_status_code=http.client.FORBIDDEN
  282             )
  283 
  284     def test_user_cannot_update_projects_in_other_domain(self):
  285         project = PROVIDERS.resource_api.create_project(
  286             uuid.uuid4().hex,
  287             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  288         )
  289 
  290         update = {'project': {'description': uuid.uuid4().hex}}
  291 
  292         with self.test_client() as c:
  293             c.patch(
  294                 '/v3/projects/%s' % project['id'], json=update,
  295                 headers=self.headers,
  296                 expected_status_code=http.client.FORBIDDEN
  297             )
  298 
  299     def test_user_cannot_update_non_existent_project_forbidden(self):
  300         update = {'project': {'description': uuid.uuid4().hex}}
  301 
  302         with self.test_client() as c:
  303             c.patch(
  304                 '/v3/projects/%s' % uuid.uuid4().hex, json=update,
  305                 headers=self.headers,
  306                 expected_status_code=http.client.FORBIDDEN
  307             )
  308 
  309     def test_user_cannot_delete_projects_within_domain(self):
  310         project = PROVIDERS.resource_api.create_project(
  311             uuid.uuid4().hex,
  312             unit.new_project_ref(domain_id=self.domain_id)
  313         )
  314 
  315         with self.test_client() as c:
  316             c.delete(
  317                 '/v3/projects/%s' % project['id'], headers=self.headers,
  318                 expected_status_code=http.client.FORBIDDEN
  319             )
  320 
  321     def test_user_cannot_delete_projects_in_other_domain(self):
  322         project = PROVIDERS.resource_api.create_project(
  323             uuid.uuid4().hex,
  324             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  325         )
  326 
  327         with self.test_client() as c:
  328             c.delete(
  329                 '/v3/projects/%s' % project['id'], headers=self.headers,
  330                 expected_status_code=http.client.FORBIDDEN
  331             )
  332 
  333     def test_user_cannot_delete_non_existent_projects_forbidden(self):
  334         with self.test_client() as c:
  335             c.delete(
  336                 '/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
  337                 expected_status_code=http.client.FORBIDDEN
  338             )
  339 
  340 
  341 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  342                         common_auth.AuthTestMixin,
  343                         _SystemUserTests,
  344                         _SystemMemberAndReaderProjectTests):
  345 
  346     def setUp(self):
  347         super(SystemReaderTests, self).setUp()
  348         self.loadapp()
  349         self.useFixture(ksfixtures.Policy(self.config_fixture))
  350         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  351 
  352         system_reader = unit.new_user_ref(
  353             domain_id=CONF.identity.default_domain_id
  354         )
  355         self.user_id = PROVIDERS.identity_api.create_user(
  356             system_reader
  357         )['id']
  358         PROVIDERS.assignment_api.create_system_grant_for_user(
  359             self.user_id, self.bootstrapper.reader_role_id
  360         )
  361 
  362         auth = self.build_authentication_request(
  363             user_id=self.user_id, password=system_reader['password'],
  364             system=True
  365         )
  366 
  367         # Grab a token using the persona we're testing and prepare headers
  368         # for requests we'll be making in the tests.
  369         with self.test_client() as c:
  370             r = c.post('/v3/auth/tokens', json=auth)
  371             self.token_id = r.headers['X-Subject-Token']
  372             self.headers = {'X-Auth-Token': self.token_id}
  373 
  374 
  375 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  376                         common_auth.AuthTestMixin,
  377                         _SystemUserTests,
  378                         _SystemMemberAndReaderProjectTests):
  379 
  380     def setUp(self):
  381         super(SystemMemberTests, self).setUp()
  382         self.loadapp()
  383         self.useFixture(ksfixtures.Policy(self.config_fixture))
  384         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  385 
  386         system_member = unit.new_user_ref(
  387             domain_id=CONF.identity.default_domain_id
  388         )
  389         self.user_id = PROVIDERS.identity_api.create_user(
  390             system_member
  391         )['id']
  392         PROVIDERS.assignment_api.create_system_grant_for_user(
  393             self.user_id, self.bootstrapper.member_role_id
  394         )
  395 
  396         auth = self.build_authentication_request(
  397             user_id=self.user_id, password=system_member['password'],
  398             system=True
  399         )
  400 
  401         # Grab a token using the persona we're testing and prepare headers
  402         # for requests we'll be making in the tests.
  403         with self.test_client() as c:
  404             r = c.post('/v3/auth/tokens', json=auth)
  405             self.token_id = r.headers['X-Subject-Token']
  406             self.headers = {'X-Auth-Token': self.token_id}
  407 
  408 
  409 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  410                        common_auth.AuthTestMixin,
  411                        _SystemUserTests):
  412 
  413     def setUp(self):
  414         super(SystemAdminTests, self).setUp()
  415         self.loadapp()
  416         self.useFixture(ksfixtures.Policy(self.config_fixture))
  417         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  418 
  419         self.user_id = self.bootstrapper.admin_user_id
  420         auth = self.build_authentication_request(
  421             user_id=self.user_id,
  422             password=self.bootstrapper.admin_password,
  423             system=True
  424         )
  425 
  426         # Grab a token using the persona we're testing and prepare headers
  427         # for requests we'll be making in the tests.
  428         with self.test_client() as c:
  429             r = c.post('/v3/auth/tokens', json=auth)
  430             self.token_id = r.headers['X-Subject-Token']
  431             self.headers = {'X-Auth-Token': self.token_id}
  432 
  433     def test_user_can_create_projects(self):
  434         create = {
  435             'project': unit.new_project_ref(
  436                 domain_id=CONF.identity.default_domain_id
  437             )
  438         }
  439 
  440         with self.test_client() as c:
  441             c.post('/v3/projects', json=create, headers=self.headers)
  442 
  443     def test_user_can_update_projects(self):
  444         project = PROVIDERS.resource_api.create_project(
  445             uuid.uuid4().hex,
  446             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  447         )
  448 
  449         update = {'project': {'description': uuid.uuid4().hex}}
  450 
  451         with self.test_client() as c:
  452             c.patch(
  453                 '/v3/projects/%s' % project['id'], json=update,
  454                 headers=self.headers
  455             )
  456 
  457     def test_user_can_update_non_existent_project_not_found(self):
  458         update = {'project': {'description': uuid.uuid4().hex}}
  459 
  460         with self.test_client() as c:
  461             c.patch(
  462                 '/v3/projects/%s' % uuid.uuid4().hex, json=update,
  463                 headers=self.headers,
  464                 expected_status_code=http.client.NOT_FOUND
  465             )
  466 
  467     def test_user_can_delete_projects(self):
  468         project = PROVIDERS.resource_api.create_project(
  469             uuid.uuid4().hex,
  470             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  471         )
  472 
  473         with self.test_client() as c:
  474             c.delete('/v3/projects/%s' % project['id'], headers=self.headers)
  475 
  476     def test_user_can_delete_non_existent_project_not_found(self):
  477         with self.test_client() as c:
  478             c.delete(
  479                 '/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
  480                 expected_status_code=http.client.NOT_FOUND
  481             )
  482 
  483     def test_user_can_list_their_projects(self):
  484         other_project = PROVIDERS.resource_api.create_project(
  485             uuid.uuid4().hex,
  486             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  487         )
  488 
  489         user_project = PROVIDERS.resource_api.create_project(
  490             uuid.uuid4().hex,
  491             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  492         )
  493 
  494         PROVIDERS.assignment_api.create_grant(
  495             self.bootstrapper.reader_role_id, user_id=self.user_id,
  496             project_id=user_project['id']
  497         )
  498 
  499         with self.test_client() as c:
  500             r = c.get(
  501                 '/v3/users/%s/projects' % self.user_id, headers=self.headers,
  502             )
  503             self.assertEqual(2, len(r.json['projects']))
  504             project_ids = []
  505             for project in r.json['projects']:
  506                 project_ids.append(project['id'])
  507 
  508             self.assertIn(user_project['id'], project_ids)
  509             self.assertIn(self.bootstrapper.project_id, project_ids)
  510             self.assertNotIn(other_project['id'], project_ids)
  511 
  512 
  513 class DomainReaderTests(base_classes.TestCaseWithBootstrap,
  514                         common_auth.AuthTestMixin,
  515                         _DomainUsersTests,
  516                         _DomainMemberAndReaderProjectTests):
  517 
  518     def setUp(self):
  519         super(DomainReaderTests, self).setUp()
  520         self.loadapp()
  521         self.useFixture(ksfixtures.Policy(self.config_fixture))
  522         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  523 
  524         domain = PROVIDERS.resource_api.create_domain(
  525             uuid.uuid4().hex, unit.new_domain_ref()
  526         )
  527         self.domain_id = domain['id']
  528         domain_user = unit.new_user_ref(domain_id=self.domain_id)
  529         self.user_id = PROVIDERS.identity_api.create_user(domain_user)['id']
  530         PROVIDERS.assignment_api.create_grant(
  531             self.bootstrapper.reader_role_id, user_id=self.user_id,
  532             domain_id=self.domain_id
  533         )
  534 
  535         auth = self.build_authentication_request(
  536             user_id=self.user_id, password=domain_user['password'],
  537             domain_id=self.domain_id
  538         )
  539 
  540         # Grab a token using the persona we're testing and prepare headers
  541         # for requests we'll be making in the tests.
  542         with self.test_client() as c:
  543             r = c.post('/v3/auth/tokens', json=auth)
  544             self.token_id = r.headers['X-Subject-Token']
  545             self.headers = {'X-Auth-Token': self.token_id}
  546 
  547 
  548 class DomainMemberTests(base_classes.TestCaseWithBootstrap,
  549                         common_auth.AuthTestMixin,
  550                         _DomainUsersTests,
  551                         _DomainMemberAndReaderProjectTests):
  552 
  553     def setUp(self):
  554         super(DomainMemberTests, self).setUp()
  555         self.loadapp()
  556         self.useFixture(ksfixtures.Policy(self.config_fixture))
  557         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  558 
  559         domain = PROVIDERS.resource_api.create_domain(
  560             uuid.uuid4().hex, unit.new_domain_ref()
  561         )
  562         self.domain_id = domain['id']
  563         domain_user = unit.new_user_ref(domain_id=self.domain_id)
  564         self.user_id = PROVIDERS.identity_api.create_user(domain_user)['id']
  565         PROVIDERS.assignment_api.create_grant(
  566             self.bootstrapper.member_role_id, user_id=self.user_id,
  567             domain_id=self.domain_id
  568         )
  569 
  570         auth = self.build_authentication_request(
  571             user_id=self.user_id, password=domain_user['password'],
  572             domain_id=self.domain_id
  573         )
  574 
  575         # Grab a token using the persona we're testing and prepare headers
  576         # for requests we'll be making in the tests.
  577         with self.test_client() as c:
  578             r = c.post('/v3/auth/tokens', json=auth)
  579             self.token_id = r.headers['X-Subject-Token']
  580             self.headers = {'X-Auth-Token': self.token_id}
  581 
  582 
  583 class DomainAdminTests(base_classes.TestCaseWithBootstrap,
  584                        common_auth.AuthTestMixin,
  585                        _DomainUsersTests):
  586 
  587     def setUp(self):
  588         super(DomainAdminTests, self).setUp()
  589         self.loadapp()
  590 
  591         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
  592         self.policy_file_name = self.policy_file.file_name
  593         self.useFixture(
  594             ksfixtures.Policy(
  595                 self.config_fixture, policy_file=self.policy_file_name
  596             )
  597         )
  598 
  599         self._override_policy()
  600         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  601 
  602         domain = PROVIDERS.resource_api.create_domain(
  603             uuid.uuid4().hex, unit.new_domain_ref()
  604         )
  605         self.domain_id = domain['id']
  606         domain_admin = unit.new_user_ref(domain_id=self.domain_id)
  607         self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
  608         PROVIDERS.assignment_api.create_grant(
  609             self.bootstrapper.admin_role_id, user_id=self.user_id,
  610             domain_id=self.domain_id
  611         )
  612 
  613         auth = self.build_authentication_request(
  614             user_id=self.user_id, password=domain_admin['password'],
  615             domain_id=self.domain_id,
  616         )
  617 
  618         # Grab a token using the persona we're testing and prepare headers
  619         # for requests we'll be making in the tests.
  620         with self.test_client() as c:
  621             r = c.post('/v3/auth/tokens', json=auth)
  622             self.token_id = r.headers['X-Subject-Token']
  623             self.headers = {'X-Auth-Token': self.token_id}
  624 
  625     def _override_policy(self):
  626         # TODO(lbragstad): Remove this once the deprecated policies in
  627         # keystone.common.policies.project have been removed. This is only
  628         # here to make sure we test the new policies instead of the deprecated
  629         # ones. Oslo.policy will OR deprecated policies with new policies to
  630         # maintain compatibility and give operators a chance to update
  631         # permissions or update policies without breaking users. This will
  632         # cause these specific tests to fail since we're trying to correct this
  633         # broken behavior with better scope checking.
  634         with open(self.policy_file_name, 'w') as f:
  635             overridden_policies = {
  636                 'identity:get_project': (
  637                     pp.SYSTEM_READER_OR_DOMAIN_READER_OR_PROJECT_USER),
  638                 'identity:list_user_projects': (
  639                     pp.SYSTEM_READER_OR_DOMAIN_READER_OR_OWNER),
  640                 'identity:list_projects': (
  641                     pp.SYSTEM_READER_OR_DOMAIN_READER),
  642                 'identity:create_project': pp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
  643                 'identity:update_project': pp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
  644                 'identity:delete_project': pp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN
  645             }
  646             f.write(jsonutils.dumps(overridden_policies))
  647 
  648     def test_user_can_create_projects_within_domain(self):
  649         create = {'project': unit.new_project_ref(domain_id=self.domain_id)}
  650 
  651         with self.test_client() as c:
  652             c.post('/v3/projects', json=create, headers=self.headers)
  653 
  654     def test_user_cannot_create_projects_in_other_domains(self):
  655         create = {
  656             'project': unit.new_project_ref(
  657                 domain_id=CONF.identity.default_domain_id
  658             )
  659         }
  660 
  661         with self.test_client() as c:
  662             c.post(
  663                 '/v3/projects', json=create, headers=self.headers,
  664                 expected_status_code=http.client.FORBIDDEN
  665             )
  666 
  667     def test_user_can_update_projects_within_domain(self):
  668         project = PROVIDERS.resource_api.create_project(
  669             uuid.uuid4().hex,
  670             unit.new_project_ref(domain_id=self.domain_id)
  671         )
  672 
  673         update = {'project': {'description': uuid.uuid4().hex}}
  674 
  675         with self.test_client() as c:
  676             c.patch(
  677                 '/v3/projects/%s' % project['id'], json=update,
  678                 headers=self.headers
  679             )
  680 
  681     def test_user_cannot_update_projects_in_other_domain(self):
  682         project = PROVIDERS.resource_api.create_project(
  683             uuid.uuid4().hex,
  684             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  685         )
  686 
  687         update = {'project': {'description': uuid.uuid4().hex}}
  688 
  689         with self.test_client() as c:
  690             c.patch(
  691                 '/v3/projects/%s' % project['id'], json=update,
  692                 headers=self.headers,
  693                 expected_status_code=http.client.FORBIDDEN
  694             )
  695 
  696     def test_user_cannot_update_non_existent_project_forbidden(self):
  697         # Because domain users operate outside of system scope, we can't
  698         # confidently return a Not Found here because they aren't system users.
  699         # The best we can do is return a Forbidden because we need the
  700         # project's domain in order to resolve the policy check, and the
  701         # project doesn't exist. This errors on the side of opacity and returns
  702         # a 403 instead of a 404.
  703         update = {'project': {'description': uuid.uuid4().hex}}
  704 
  705         with self.test_client() as c:
  706             c.patch(
  707                 '/v3/projects/%s' % uuid.uuid4().hex, json=update,
  708                 headers=self.headers,
  709                 expected_status_code=http.client.FORBIDDEN
  710             )
  711 
  712     def test_user_can_delete_projects_within_domain(self):
  713         project = PROVIDERS.resource_api.create_project(
  714             uuid.uuid4().hex,
  715             unit.new_project_ref(domain_id=self.domain_id)
  716         )
  717 
  718         with self.test_client() as c:
  719             c.delete('/v3/projects/%s' % project['id'], headers=self.headers)
  720 
  721     def test_user_cannot_delete_projects_in_other_domain(self):
  722         project = PROVIDERS.resource_api.create_project(
  723             uuid.uuid4().hex,
  724             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  725         )
  726 
  727         with self.test_client() as c:
  728             c.delete(
  729                 '/v3/projects/%s' % project['id'], headers=self.headers,
  730                 expected_status_code=http.client.FORBIDDEN
  731             )
  732 
  733     def test_user_cannot_delete_non_existent_projects_forbidden(self):
  734         # Because domain users operate outside of system scope, we can't
  735         # confidently return a Not Found here because they aren't system users.
  736         # The best we can do is return a Forbidden because we need the
  737         # project's domain in order to resolve the policy check, and the
  738         # project doesn't exist. This errors on the side of opacity and returns
  739         # a 403 instead of a 404.
  740         with self.test_client() as c:
  741             c.delete(
  742                 '/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
  743                 expected_status_code=http.client.FORBIDDEN
  744             )
  745 
  746 
  747 class ProjectUserTests(base_classes.TestCaseWithBootstrap,
  748                        common_auth.AuthTestMixin):
  749 
  750     def setUp(self):
  751         super(ProjectUserTests, self).setUp()
  752         self.loadapp()
  753 
  754         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
  755         self.policy_file_name = self.policy_file.file_name
  756         self.useFixture(
  757             ksfixtures.Policy(
  758                 self.config_fixture, policy_file=self.policy_file_name
  759             )
  760         )
  761         self._override_policy()
  762         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  763 
  764         self.user_id = self.bootstrapper.admin_user_id
  765         PROVIDERS.assignment_api.create_grant(
  766             self.bootstrapper.reader_role_id, user_id=self.user_id,
  767             project_id=self.bootstrapper.project_id
  768         )
  769         self.project_id = self.bootstrapper.project_id
  770 
  771         auth = self.build_authentication_request(
  772             user_id=self.user_id, password=self.bootstrapper.admin_password,
  773             project_id=self.bootstrapper.project_id
  774         )
  775 
  776         # Grab a token using the persona we're testing and prepare headers
  777         # for requests we'll be making in the tests.
  778         with self.test_client() as c:
  779             r = c.post('/v3/auth/tokens', json=auth)
  780             self.token_id = r.headers['X-Subject-Token']
  781             self.headers = {'X-Auth-Token': self.token_id}
  782 
  783     def _override_policy(self):
  784         # TODO(lbragstad): Remove this once the deprecated policies in
  785         # keystone.common.policies.project have been removed. This is only
  786         # here to make sure we test the new policies instead of the deprecated
  787         # ones. Oslo.policy will OR deprecated policies with new policies to
  788         # maintain compatibility and give operators a chance to update
  789         # permissions or update policies without breaking users. This will
  790         # cause these specific tests to fail since we're trying to correct this
  791         # broken behavior with better scope checking.
  792         with open(self.policy_file_name, 'w') as f:
  793             overridden_policies = {
  794                 'identity:get_project': (
  795                     pp.SYSTEM_READER_OR_DOMAIN_READER_OR_PROJECT_USER),
  796                 'identity:list_user_projects': (
  797                     pp.SYSTEM_READER_OR_DOMAIN_READER_OR_OWNER),
  798                 'identity:list_projects': (
  799                     pp.SYSTEM_READER_OR_DOMAIN_READER),
  800                 'identity:create_project': pp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
  801                 'identity:update_project': pp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
  802                 'identity:delete_project': pp.SYSTEM_ADMIN_OR_DOMAIN_ADMIN
  803             }
  804             f.write(jsonutils.dumps(overridden_policies))
  805 
  806     def test_user_cannot_list_projects(self):
  807         # This test is assuming the user calling the API has a role assignment
  808         # on the project created by ``keystone-manage bootstrap``.
  809         with self.test_client() as c:
  810             c.get(
  811                 '/v3/projects', headers=self.headers,
  812                 expected_status_code=http.client.FORBIDDEN
  813             )
  814 
  815     def test_user_cannot_list_projects_for_others(self):
  816         user = PROVIDERS.identity_api.create_user(
  817             unit.new_user_ref(
  818                 CONF.identity.default_domain_id,
  819                 id=uuid.uuid4().hex
  820             )
  821         )
  822 
  823         project = PROVIDERS.resource_api.create_project(
  824             uuid.uuid4().hex,
  825             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  826         )
  827 
  828         PROVIDERS.assignment_api.create_grant(
  829             self.bootstrapper.reader_role_id, user_id=user['id'],
  830             project_id=project['id']
  831         )
  832 
  833         with self.test_client() as c:
  834             c.get(
  835                 '/v3/users/%s/projects' % user['id'], headers=self.headers,
  836                 expected_status_code=http.client.FORBIDDEN
  837             )
  838 
  839     def test_user_can_list_their_projects(self):
  840         # Users can get this information from the GET /v3/auth/projects API or
  841         # the GET /v3/users/{user_id}/projects API. The GET /v3/projects API is
  842         # administrative, reserved for system and domain users.
  843         with self.test_client() as c:
  844             r = c.get(
  845                 '/v3/users/%s/projects' % self.user_id, headers=self.headers,
  846             )
  847             self.assertEqual(1, len(r.json['projects']))
  848             self.assertEqual(self.project_id, r.json['projects'][0]['id'])
  849 
  850     def test_user_can_get_their_project(self):
  851         with self.test_client() as c:
  852             c.get('/v3/projects/%s' % self.project_id, headers=self.headers)
  853 
  854     def test_user_cannot_get_other_projects(self):
  855         project = PROVIDERS.resource_api.create_project(
  856             uuid.uuid4().hex,
  857             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  858         )
  859 
  860         with self.test_client() as c:
  861             c.get(
  862                 '/v3/projects/%s' % project['id'], headers=self.headers,
  863                 expected_status_code=http.client.FORBIDDEN
  864             )
  865 
  866     def test_user_cannot_create_projects(self):
  867         create = {
  868             'project': unit.new_project_ref(
  869                 domain_id=CONF.identity.default_domain_id
  870             )
  871         }
  872 
  873         with self.test_client() as c:
  874             c.post(
  875                 '/v3/projects', json=create, headers=self.headers,
  876                 expected_status_code=http.client.FORBIDDEN
  877             )
  878 
  879     def test_user_cannot_update_projects(self):
  880         project = PROVIDERS.resource_api.create_project(
  881             uuid.uuid4().hex,
  882             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  883         )
  884 
  885         update = {'project': {'description': uuid.uuid4().hex}}
  886 
  887         with self.test_client() as c:
  888             c.patch(
  889                 '/v3/projects/%s' % project['id'], json=update,
  890                 headers=self.headers,
  891                 expected_status_code=http.client.FORBIDDEN
  892             )
  893 
  894     def test_user_cannot_update_non_existent_project_forbidden(self):
  895         update = {'project': {'description': uuid.uuid4().hex}}
  896 
  897         with self.test_client() as c:
  898             c.patch(
  899                 '/v3/projects/%s' % uuid.uuid4().hex, json=update,
  900                 headers=self.headers,
  901                 expected_status_code=http.client.FORBIDDEN
  902             )
  903 
  904     def test_user_cannot_delete_projects(self):
  905         project = PROVIDERS.resource_api.create_project(
  906             uuid.uuid4().hex,
  907             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  908         )
  909 
  910         with self.test_client() as c:
  911             c.delete(
  912                 '/v3/projects/%s' % project['id'], headers=self.headers,
  913                 expected_status_code=http.client.FORBIDDEN
  914             )
  915 
  916     def test_user_cannot_delete_non_existent_project_forbidden(self):
  917         with self.test_client() as c:
  918             c.delete(
  919                 '/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
  920                 expected_status_code=http.client.FORBIDDEN
  921             )