"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_users.py" (13 May 2020, 37590 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_users.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import uuid
   14 
   15 import http.client
   16 from oslo_serialization import jsonutils
   17 
   18 from keystone.common.policies import user as up
   19 from keystone.common import provider_api
   20 import keystone.conf
   21 from keystone.tests.common import auth as common_auth
   22 from keystone.tests import unit
   23 from keystone.tests.unit import base_classes
   24 from keystone.tests.unit import ksfixtures
   25 from keystone.tests.unit.ksfixtures import temporaryfile
   26 
   27 CONF = keystone.conf.CONF
   28 PROVIDERS = provider_api.ProviderAPIs
   29 
   30 
   31 class _CommonUserTests(object):
   32     """Common default functionality for all users."""
   33 
   34     def test_user_can_get_their_own_user_reference(self):
   35         with self.test_client() as c:
   36             r = c.get('/v3/users/%s' % self.user_id, headers=self.headers)
   37             self.assertEqual(self.user_id, r.json['user']['id'])
   38 
   39 
   40 class _SystemUserTests(object):
   41     """Common default functionality for all system users."""
   42 
   43     def test_user_can_get_other_users(self):
   44         user = PROVIDERS.identity_api.create_user(
   45             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
   46         )
   47 
   48         with self.test_client() as c:
   49             r = c.get('/v3/users/%s' % user['id'], headers=self.headers)
   50             self.assertEqual(user['id'], r.json['user']['id'])
   51 
   52     def test_user_cannot_get_non_existent_user_not_found(self):
   53         with self.test_client() as c:
   54             c.get(
   55                 '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
   56                 expected_status_code=http.client.NOT_FOUND
   57             )
   58 
   59     def test_user_can_list_users(self):
   60         expected_user_ids = []
   61         for _ in range(3):
   62             user = PROVIDERS.identity_api.create_user(
   63                 unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
   64             )
   65             expected_user_ids.append(user['id'])
   66 
   67         with self.test_client() as c:
   68             r = c.get('/v3/users', headers=self.headers)
   69             returned_user_ids = []
   70             for user in r.json['users']:
   71                 returned_user_ids.append(user['id'])
   72 
   73             for user_id in expected_user_ids:
   74                 self.assertIn(user_id, returned_user_ids)
   75 
   76 
   77 class _SystemMemberAndReaderUserTests(object):
   78     """Common functionality for system readers and system members."""
   79 
   80     def test_user_cannot_create_users(self):
   81         create = {
   82             'user': {
   83                 'name': uuid.uuid4().hex,
   84                 'domain': CONF.identity.default_domain_id
   85             }
   86         }
   87 
   88         with self.test_client() as c:
   89             c.post(
   90                 '/v3/users', json=create, headers=self.headers,
   91                 expected_status_code=http.client.FORBIDDEN
   92             )
   93 
   94     def test_user_cannot_update_users(self):
   95         user = PROVIDERS.identity_api.create_user(
   96             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
   97         )
   98 
   99         with self.test_client() as c:
  100             update = {'user': {'email': uuid.uuid4().hex}}
  101 
  102             c.patch(
  103                 '/v3/users/%s' % user['id'], json=update, headers=self.headers,
  104                 expected_status_code=http.client.FORBIDDEN
  105             )
  106 
  107     def test_user_cannot_update_non_existent_user_forbidden(self):
  108         user = PROVIDERS.identity_api.create_user(
  109             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  110         )
  111 
  112         update = {'user': {'email': uuid.uuid4().hex}}
  113         with self.test_client() as c:
  114             c.patch(
  115                 '/v3/users/%s' % user['id'], json=update, headers=self.headers,
  116                 expected_status_code=http.client.FORBIDDEN
  117             )
  118 
  119     def test_user_cannot_delete_users(self):
  120         user = PROVIDERS.identity_api.create_user(
  121             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  122         )
  123 
  124         with self.test_client() as c:
  125             c.delete(
  126                 '/v3/users/%s' % user['id'], headers=self.headers,
  127                 expected_status_code=http.client.FORBIDDEN
  128             )
  129 
  130     def test_user_cannot_delete_non_existent_user_forbidden(self):
  131         with self.test_client() as c:
  132             c.delete(
  133                 '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
  134                 expected_status_code=http.client.FORBIDDEN
  135             )
  136 
  137 
  138 class _DomainUserTests(object):
  139     """Commont default functionality for all domain users."""
  140 
  141     def test_user_can_get_user_within_domain(self):
  142         user = PROVIDERS.identity_api.create_user(
  143             unit.new_user_ref(domain_id=self.domain_id)
  144         )
  145 
  146         with self.test_client() as c:
  147             r = c.get('/v3/users/%s' % user['id'], headers=self.headers)
  148             self.assertEqual(user['id'], r.json['user']['id'])
  149 
  150     def test_user_cannot_get_user_in_other_domain(self):
  151         domain = PROVIDERS.resource_api.create_domain(
  152             uuid.uuid4().hex, unit.new_domain_ref()
  153         )
  154 
  155         user = PROVIDERS.identity_api.create_user(
  156             unit.new_user_ref(domain_id=domain['id'])
  157         )
  158 
  159         with self.test_client() as c:
  160             c.get(
  161                 '/v3/users/%s' % user['id'], headers=self.headers,
  162                 expected_status_code=http.client.FORBIDDEN
  163             )
  164 
  165     def test_user_can_list_users_within_domain(self):
  166         user = PROVIDERS.identity_api.create_user(
  167             unit.new_user_ref(domain_id=self.domain_id)
  168         )
  169 
  170         with self.test_client() as c:
  171             r = c.get('/v3/users', headers=self.headers)
  172             self.assertEqual(2, len(r.json['users']))
  173             user_ids = []
  174             for user in r.json['users']:
  175                 user_ids.append(user['id'])
  176             self.assertIn(self.user_id, user_ids)
  177             self.assertIn(user['id'], user_ids)
  178 
  179     def test_user_cannot_list_users_in_other_domain(self):
  180         domain = PROVIDERS.resource_api.create_domain(
  181             uuid.uuid4().hex, unit.new_domain_ref()
  182         )
  183 
  184         user = PROVIDERS.identity_api.create_user(
  185             unit.new_user_ref(domain_id=domain['id'])
  186         )
  187 
  188         with self.test_client() as c:
  189             r = c.get('/v3/users', headers=self.headers)
  190             user_ids = []
  191             for u in r.json['users']:
  192                 user_ids.append(u['id'])
  193             self.assertNotIn(user['id'], user_ids)
  194 
  195 
  196 class _DomainMemberAndReaderUserTests(object):
  197     """Functionality for all domain members and domain readers."""
  198 
  199     def test_user_cannot_create_users_within_domain(self):
  200         create = {
  201             'user': {
  202                 'domain_id': self.domain_id,
  203                 'name': uuid.uuid4().hex
  204             }
  205         }
  206 
  207         with self.test_client() as c:
  208             c.post(
  209                 '/v3/users', json=create, headers=self.headers,
  210                 expected_status_code=http.client.FORBIDDEN
  211             )
  212 
  213     def test_user_cannot_create_users_in_other_domain(self):
  214         domain = PROVIDERS.resource_api.create_domain(
  215             uuid.uuid4().hex, unit.new_domain_ref()
  216         )
  217 
  218         create = {
  219             'user': {
  220                 'domain_id': domain['id'],
  221                 'name': uuid.uuid4().hex
  222             }
  223         }
  224 
  225         with self.test_client() as c:
  226             c.post(
  227                 '/v3/users', json=create, headers=self.headers,
  228                 expected_status_code=http.client.FORBIDDEN
  229             )
  230 
  231     def test_user_cannot_update_users_within_domain(self):
  232         user = PROVIDERS.identity_api.create_user(
  233             unit.new_user_ref(domain_id=self.domain_id)
  234         )
  235 
  236         update = {'user': {'email': uuid.uuid4().hex}}
  237         with self.test_client() as c:
  238             c.patch(
  239                 '/v3/users/%s' % user['id'], json=update, headers=self.headers,
  240                 expected_status_code=http.client.FORBIDDEN
  241             )
  242 
  243     def test_user_cannot_update_users_in_other_domain(self):
  244         domain = PROVIDERS.resource_api.create_domain(
  245             uuid.uuid4().hex, unit.new_domain_ref()
  246         )
  247         user = PROVIDERS.identity_api.create_user(
  248             unit.new_user_ref(domain_id=domain['id'])
  249         )
  250 
  251         update = {'user': {'email': uuid.uuid4().hex}}
  252         with self.test_client() as c:
  253             c.patch(
  254                 '/v3/users/%s' % user['id'], json=update, headers=self.headers,
  255                 expected_status_code=http.client.FORBIDDEN
  256             )
  257 
  258     def test_user_cannot_update_non_existent_user_forbidden(self):
  259         user = PROVIDERS.identity_api.create_user(
  260             unit.new_user_ref(domain_id=self.domain_id)
  261         )
  262 
  263         update = {'user': {'email': uuid.uuid4().hex}}
  264         with self.test_client() as c:
  265             c.patch(
  266                 '/v3/users/%s' % user['id'], json=update, headers=self.headers,
  267                 expected_status_code=http.client.FORBIDDEN
  268             )
  269 
  270     def test_user_cannot_delete_users_within_domain(self):
  271         user = PROVIDERS.identity_api.create_user(
  272             unit.new_user_ref(domain_id=self.domain_id)
  273         )
  274 
  275         with self.test_client() as c:
  276             c.delete(
  277                 '/v3/users/%s' % user['id'], headers=self.headers,
  278                 expected_status_code=http.client.FORBIDDEN
  279             )
  280 
  281     def test_user_cannot_delete_users_in_other_domain(self):
  282         domain = PROVIDERS.resource_api.create_domain(
  283             uuid.uuid4().hex, unit.new_domain_ref()
  284         )
  285         user = PROVIDERS.identity_api.create_user(
  286             unit.new_user_ref(domain_id=domain['id'])
  287         )
  288 
  289         with self.test_client() as c:
  290             c.delete(
  291                 '/v3/users/%s' % user['id'], headers=self.headers,
  292                 expected_status_code=http.client.FORBIDDEN
  293             )
  294 
  295     def test_user_cannot_delete_non_existent_user_forbidden(self):
  296         with self.test_client() as c:
  297             c.delete(
  298                 '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
  299                 expected_status_code=http.client.FORBIDDEN
  300             )
  301 
  302 
  303 class _ProjectUserTests(object):
  304     """Common tests cases for all project users."""
  305 
  306     def test_user_cannot_get_users_within_their_domain(self):
  307         user = PROVIDERS.identity_api.create_user(
  308             unit.new_user_ref(domain_id=self.domain_id)
  309         )
  310 
  311         with self.test_client() as c:
  312             c.get(
  313                 '/v3/users/%s' % user['id'], headers=self.headers,
  314                 expected_status_code=http.client.FORBIDDEN
  315             )
  316 
  317     def test_user_cannot_get_users_in_other_domains(self):
  318         domain = PROVIDERS.resource_api.create_domain(
  319             uuid.uuid4().hex, unit.new_domain_ref()
  320         )
  321         user = PROVIDERS.identity_api.create_user(
  322             unit.new_user_ref(domain_id=domain['id'])
  323         )
  324 
  325         with self.test_client() as c:
  326             c.get(
  327                 '/v3/users/%s' % user['id'], headers=self.headers,
  328                 expected_status_code=http.client.FORBIDDEN
  329             )
  330 
  331     def test_user_cannot_get_non_existent_user_forbidden(self):
  332         with self.test_client() as c:
  333             c.get(
  334                 '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
  335                 expected_status_code=http.client.FORBIDDEN
  336             )
  337 
  338     def test_user_cannot_list_users_within_domain(self):
  339         with self.test_client() as c:
  340             c.get(
  341                 '/v3/users?domain_id=%s' % self.domain_id,
  342                 headers=self.headers,
  343                 expected_status_code=http.client.FORBIDDEN
  344             )
  345 
  346     def test_user_cannot_list_users_in_other_domains(self):
  347         domain = PROVIDERS.resource_api.create_domain(
  348             uuid.uuid4().hex, unit.new_domain_ref()
  349         )
  350         PROVIDERS.identity_api.create_user(
  351             unit.new_user_ref(domain_id=domain['id'])
  352         )
  353 
  354         with self.test_client() as c:
  355             c.get(
  356                 '/v3/users?domain_id=%s' % domain['id'],
  357                 headers=self.headers,
  358                 expected_status_code=http.client.FORBIDDEN
  359             )
  360 
  361     def test_user_cannot_create_users_within_domain(self):
  362         create = {
  363             'user': {
  364                 'domain_id': self.domain_id,
  365                 'name': uuid.uuid4().hex
  366             }
  367         }
  368 
  369         with self.test_client() as c:
  370             c.post(
  371                 '/v3/users', json=create, headers=self.headers,
  372                 expected_status_code=http.client.FORBIDDEN
  373             )
  374 
  375     def test_user_cannot_create_users_in_other_domains(self):
  376         domain = PROVIDERS.resource_api.create_domain(
  377             uuid.uuid4().hex, unit.new_domain_ref()
  378         )
  379 
  380         create = {
  381             'user': {
  382                 'domain_id': domain['id'],
  383                 'name': uuid.uuid4().hex
  384             }
  385         }
  386 
  387         with self.test_client() as c:
  388             c.post(
  389                 '/v3/users', json=create, headers=self.headers,
  390                 expected_status_code=http.client.FORBIDDEN
  391             )
  392 
  393     def test_user_cannot_update_users_within_domain(self):
  394         user = PROVIDERS.identity_api.create_user(
  395             unit.new_user_ref(domain_id=self.domain_id)
  396         )
  397 
  398         update = {'user': {'email': uuid.uuid4().hex}}
  399         with self.test_client() as c:
  400             c.patch(
  401                 '/v3/users/%s' % user['id'], json=update, headers=self.headers,
  402                 expected_status_code=http.client.FORBIDDEN
  403             )
  404 
  405     def test_user_cannot_update_users_in_other_domain(self):
  406         domain = PROVIDERS.resource_api.create_domain(
  407             uuid.uuid4().hex, unit.new_domain_ref()
  408         )
  409         user = PROVIDERS.identity_api.create_user(
  410             unit.new_user_ref(domain_id=domain['id'])
  411         )
  412 
  413         update = {'user': {'email': uuid.uuid4().hex}}
  414         with self.test_client() as c:
  415             c.patch(
  416                 '/v3/users/%s' % user['id'], json=update, headers=self.headers,
  417                 expected_status_code=http.client.FORBIDDEN
  418             )
  419 
  420     def test_user_cannot_update_non_existent_user_forbidden(self):
  421         update = {'user': {'email': uuid.uuid4().hex}}
  422         with self.test_client() as c:
  423             c.patch(
  424                 '/v3/users/%s' % uuid.uuid4().hex, json=update,
  425                 headers=self.headers,
  426                 expected_status_code=http.client.FORBIDDEN
  427             )
  428 
  429     def test_user_cannot_delete_users_within_domain(self):
  430         user = PROVIDERS.identity_api.create_user(
  431             unit.new_user_ref(domain_id=self.domain_id)
  432         )
  433 
  434         with self.test_client() as c:
  435             c.delete(
  436                 '/v3/users/%s' % user['id'], headers=self.headers,
  437                 expected_status_code=http.client.FORBIDDEN
  438             )
  439 
  440     def test_user_cannot_delete_users_in_other_domains(self):
  441         domain = PROVIDERS.resource_api.create_domain(
  442             uuid.uuid4().hex, unit.new_domain_ref()
  443         )
  444         user = PROVIDERS.identity_api.create_user(
  445             unit.new_user_ref(domain_id=domain['id'])
  446         )
  447 
  448         with self.test_client() as c:
  449             c.delete(
  450                 '/v3/users/%s' % user['id'], headers=self.headers,
  451                 expected_status_code=http.client.FORBIDDEN
  452             )
  453 
  454     def test_user_cannot_delete_non_existent_user_forbidden(self):
  455         with self.test_client() as c:
  456             c.delete(
  457                 '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
  458                 expected_status_code=http.client.FORBIDDEN
  459             )
  460 
  461 
  462 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  463                         common_auth.AuthTestMixin,
  464                         _CommonUserTests,
  465                         _SystemUserTests,
  466                         _SystemMemberAndReaderUserTests):
  467 
  468     def setUp(self):
  469         super(SystemReaderTests, self).setUp()
  470         self.loadapp()
  471         self.useFixture(ksfixtures.Policy(self.config_fixture))
  472         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  473 
  474         system_reader = unit.new_user_ref(
  475             domain_id=CONF.identity.default_domain_id
  476         )
  477         self.user_id = PROVIDERS.identity_api.create_user(
  478             system_reader
  479         )['id']
  480         PROVIDERS.assignment_api.create_system_grant_for_user(
  481             self.user_id, self.bootstrapper.reader_role_id
  482         )
  483 
  484         auth = self.build_authentication_request(
  485             user_id=self.user_id, password=system_reader['password'],
  486             system=True
  487         )
  488 
  489         # Grab a token using the persona we're testing and prepare headers
  490         # for requests we'll be making in the tests.
  491         with self.test_client() as c:
  492             r = c.post('/v3/auth/tokens', json=auth)
  493             self.token_id = r.headers['X-Subject-Token']
  494             self.headers = {'X-Auth-Token': self.token_id}
  495 
  496 
  497 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  498                         common_auth.AuthTestMixin,
  499                         _CommonUserTests,
  500                         _SystemUserTests,
  501                         _SystemMemberAndReaderUserTests):
  502 
  503     def setUp(self):
  504         super(SystemMemberTests, self).setUp()
  505         self.loadapp()
  506         self.useFixture(ksfixtures.Policy(self.config_fixture))
  507         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  508 
  509         system_member = unit.new_user_ref(
  510             domain_id=CONF.identity.default_domain_id
  511         )
  512         self.user_id = PROVIDERS.identity_api.create_user(
  513             system_member
  514         )['id']
  515         PROVIDERS.assignment_api.create_system_grant_for_user(
  516             self.user_id, self.bootstrapper.member_role_id
  517         )
  518 
  519         auth = self.build_authentication_request(
  520             user_id=self.user_id, password=system_member['password'],
  521             system=True
  522         )
  523 
  524         # Grab a token using the persona we're testing and prepare headers
  525         # for requests we'll be making in the tests.
  526         with self.test_client() as c:
  527             r = c.post('/v3/auth/tokens', json=auth)
  528             self.token_id = r.headers['X-Subject-Token']
  529             self.headers = {'X-Auth-Token': self.token_id}
  530 
  531 
  532 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  533                        common_auth.AuthTestMixin,
  534                        _CommonUserTests,
  535                        _SystemUserTests):
  536 
  537     def setUp(self):
  538         super(SystemAdminTests, self).setUp()
  539         self.loadapp()
  540         self.useFixture(ksfixtures.Policy(self.config_fixture))
  541         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  542 
  543         self.user_id = self.bootstrapper.admin_user_id
  544         auth = self.build_authentication_request(
  545             user_id=self.user_id,
  546             password=self.bootstrapper.admin_password,
  547             system=True
  548         )
  549 
  550         # Grab a token using the persona we're testing and prepare headers
  551         # for requests we'll be making in the tests.
  552         with self.test_client() as c:
  553             r = c.post('/v3/auth/tokens', json=auth)
  554             self.token_id = r.headers['X-Subject-Token']
  555             self.headers = {'X-Auth-Token': self.token_id}
  556 
  557     def test_user_can_create_users(self):
  558         create = {
  559             'user': {
  560                 'name': uuid.uuid4().hex,
  561                 'domain': CONF.identity.default_domain_id
  562             }
  563         }
  564 
  565         with self.test_client() as c:
  566             c.post('/v3/users', json=create, headers=self.headers)
  567 
  568     def test_user_can_update_users(self):
  569         user = PROVIDERS.identity_api.create_user(
  570             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  571         )
  572 
  573         update = {'user': {'email': uuid.uuid4().hex}}
  574         with self.test_client() as c:
  575             c.patch(
  576                 '/v3/users/%s' % user['id'], json=update, headers=self.headers
  577             )
  578 
  579     def test_user_cannot_update_non_existent_user_not_found(self):
  580         update = {'user': {'email': uuid.uuid4().hex}}
  581         with self.test_client() as c:
  582             c.patch(
  583                 '/v3/users/%s' % uuid.uuid4().hex, json=update,
  584                 headers=self.headers,
  585                 expected_status_code=http.client.NOT_FOUND
  586             )
  587 
  588     def test_user_can_delete_users(self):
  589         user = PROVIDERS.identity_api.create_user(
  590             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  591         )
  592 
  593         with self.test_client() as c:
  594             c.delete('/v3/users/%s' % user['id'], headers=self.headers)
  595 
  596     def test_user_cannot_delete_non_existent_user_not_found(self):
  597         with self.test_client() as c:
  598             c.delete(
  599                 '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
  600                 expected_status_code=http.client.NOT_FOUND
  601             )
  602 
  603 
  604 class DomainReaderTests(base_classes.TestCaseWithBootstrap,
  605                         common_auth.AuthTestMixin,
  606                         _CommonUserTests,
  607                         _DomainUserTests,
  608                         _DomainMemberAndReaderUserTests):
  609 
  610     def setUp(self):
  611         super(DomainReaderTests, self).setUp()
  612         self.loadapp()
  613         self.useFixture(ksfixtures.Policy(self.config_fixture))
  614         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  615 
  616         domain = PROVIDERS.resource_api.create_domain(
  617             uuid.uuid4().hex, unit.new_domain_ref()
  618         )
  619         self.domain_id = domain['id']
  620         domain_reader = unit.new_user_ref(domain_id=self.domain_id)
  621         self.user_id = PROVIDERS.identity_api.create_user(domain_reader)['id']
  622         PROVIDERS.assignment_api.create_grant(
  623             self.bootstrapper.reader_role_id, user_id=self.user_id,
  624             domain_id=self.domain_id
  625         )
  626 
  627         auth = self.build_authentication_request(
  628             user_id=self.user_id, password=domain_reader['password'],
  629             domain_id=self.domain_id,
  630         )
  631 
  632         # Grab a token using the persona we're testing and prepare headers
  633         # for requests we'll be making in the tests.
  634         with self.test_client() as c:
  635             r = c.post('/v3/auth/tokens', json=auth)
  636             self.token_id = r.headers['X-Subject-Token']
  637             self.headers = {'X-Auth-Token': self.token_id}
  638 
  639 
  640 class DomainMemberTests(base_classes.TestCaseWithBootstrap,
  641                         common_auth.AuthTestMixin,
  642                         _CommonUserTests,
  643                         _DomainUserTests,
  644                         _DomainMemberAndReaderUserTests):
  645 
  646     def setUp(self):
  647         super(DomainMemberTests, self).setUp()
  648         self.loadapp()
  649         self.useFixture(ksfixtures.Policy(self.config_fixture))
  650         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  651 
  652         domain = PROVIDERS.resource_api.create_domain(
  653             uuid.uuid4().hex, unit.new_domain_ref()
  654         )
  655         self.domain_id = domain['id']
  656         domain_user = unit.new_user_ref(domain_id=self.domain_id)
  657         self.user_id = PROVIDERS.identity_api.create_user(domain_user)['id']
  658         PROVIDERS.assignment_api.create_grant(
  659             self.bootstrapper.member_role_id, user_id=self.user_id,
  660             domain_id=self.domain_id
  661         )
  662 
  663         auth = self.build_authentication_request(
  664             user_id=self.user_id, password=domain_user['password'],
  665             domain_id=self.domain_id
  666         )
  667 
  668         # Grab a token using the persona we're testing and prepare headers
  669         # for requests we'll be making in the tests.
  670         with self.test_client() as c:
  671             r = c.post('/v3/auth/tokens', json=auth)
  672             self.token_id = r.headers['X-Subject-Token']
  673             self.headers = {'X-Auth-Token': self.token_id}
  674 
  675 
  676 class DomainAdminTests(base_classes.TestCaseWithBootstrap,
  677                        common_auth.AuthTestMixin,
  678                        _CommonUserTests,
  679                        _DomainUserTests):
  680 
  681     def setUp(self):
  682         super(DomainAdminTests, self).setUp()
  683         self.loadapp()
  684 
  685         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
  686         self.policy_file_name = self.policy_file.file_name
  687         self.useFixture(
  688             ksfixtures.Policy(
  689                 self.config_fixture, policy_file=self.policy_file_name
  690             )
  691         )
  692 
  693         self._override_policy()
  694         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  695 
  696         domain = PROVIDERS.resource_api.create_domain(
  697             uuid.uuid4().hex, unit.new_domain_ref()
  698         )
  699         self.domain_id = domain['id']
  700         domain_admin = unit.new_user_ref(domain_id=self.domain_id)
  701         self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
  702         PROVIDERS.assignment_api.create_grant(
  703             self.bootstrapper.admin_role_id, user_id=self.user_id,
  704             domain_id=self.domain_id
  705         )
  706 
  707         auth = self.build_authentication_request(
  708             user_id=self.user_id, password=domain_admin['password'],
  709             domain_id=self.domain_id,
  710         )
  711 
  712         # Grab a token using the persona we're testing and prepare headers
  713         # for requests we'll be making in the tests.
  714         with self.test_client() as c:
  715             r = c.post('/v3/auth/tokens', json=auth)
  716             self.token_id = r.headers['X-Subject-Token']
  717             self.headers = {'X-Auth-Token': self.token_id}
  718 
  719     def _override_policy(self):
  720         # TODO(lbragstad): Remove this once the deprecated policies in
  721         # keystone.common.policies.users have been removed. This is only
  722         # here to make sure we test the new policies instead of the deprecated
  723         # ones. Oslo.policy will apply a logical OR to deprecated policies with
  724         # new policies to maintain compatibility and give operators a chance to
  725         # update permissions or update policies without breaking users. This
  726         # will cause these specific tests to fail since we're trying to correct
  727         # this broken behavior with better scope checking.
  728         with open(self.policy_file_name, 'w') as f:
  729             overridden_policies = {
  730                 'identity:get_user': up.SYSTEM_READER_OR_DOMAIN_READER_OR_USER,
  731                 'identity:list_users': up.SYSTEM_READER_OR_DOMAIN_READER,
  732                 'identity:create_user': up.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
  733                 'identity:update_user': up.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
  734                 'identity:delete_user': up.SYSTEM_ADMIN_OR_DOMAIN_ADMIN
  735             }
  736             f.write(jsonutils.dumps(overridden_policies))
  737 
  738     def test_user_can_create_users_within_domain(self):
  739         create = {
  740             'user': {
  741                 'domain_id': self.domain_id,
  742                 'name': uuid.uuid4().hex
  743             }
  744         }
  745 
  746         with self.test_client() as c:
  747             c.post('/v3/users', json=create, headers=self.headers)
  748 
  749     def test_user_cannot_create_users_within_domain_hyphened_domain_id(self):
  750         # Finally, show that we can create a new user without any surprises.
  751         # But if we specify a 'domain-id' instead of a 'domain_id', we get a
  752         # Forbidden response because we fail a policy check before
  753         # normalization occurs.
  754         domain = PROVIDERS.resource_api.create_domain(
  755             uuid.uuid4().hex, unit.new_domain_ref()
  756         )
  757 
  758         create = {
  759             'user': {
  760                 'domain-id': domain['id'],
  761                 'name': uuid.uuid4().hex
  762             }
  763         }
  764 
  765         with self.test_client() as c:
  766             c.post(
  767                 '/v3/users', json=create, headers=self.headers,
  768                 expected_status_code=http.client.FORBIDDEN
  769             )
  770 
  771     def test_user_cannot_create_users_in_other_domain(self):
  772         domain = PROVIDERS.resource_api.create_domain(
  773             uuid.uuid4().hex, unit.new_domain_ref()
  774         )
  775 
  776         create = {
  777             'user': {
  778                 'domain_id': domain['id'],
  779                 'name': uuid.uuid4().hex
  780             }
  781         }
  782 
  783         with self.test_client() as c:
  784             c.post(
  785                 '/v3/users', json=create, headers=self.headers,
  786                 expected_status_code=http.client.FORBIDDEN
  787             )
  788 
  789     def test_user_can_update_users_within_domain(self):
  790         user = PROVIDERS.identity_api.create_user(
  791             unit.new_user_ref(domain_id=self.domain_id)
  792         )
  793 
  794         update = {'user': {'email': uuid.uuid4().hex}}
  795         with self.test_client() as c:
  796             c.patch(
  797                 '/v3/users/%s' % user['id'], json=update, headers=self.headers
  798             )
  799 
  800     def test_user_can_update_users_within_domain_hyphened_domain_id(self):
  801         # If we try updating the user's 'domain_id' by specifying a
  802         # 'domain-id', then it'll be stored into extras rather than normalized,
  803         # and the user's actual 'domain_id' is not affected.
  804         domain = PROVIDERS.resource_api.create_domain(
  805             uuid.uuid4().hex, unit.new_domain_ref()
  806         )
  807 
  808         user = PROVIDERS.identity_api.create_user(
  809             unit.new_user_ref(domain_id=self.domain_id)
  810         )
  811 
  812         update = {'user': {'domain-id': domain['id']}}
  813         with self.test_client() as c:
  814             r = c.patch(
  815                 '/v3/users/%s' % user['id'], json=update, headers=self.headers
  816             )
  817             self.assertEqual(domain['id'], r.json['user']['domain-id'])
  818             self.assertEqual(self.domain_id, r.json['user']['domain_id'])
  819 
  820     def test_user_cannot_update_users_in_other_domain(self):
  821         domain = PROVIDERS.resource_api.create_domain(
  822             uuid.uuid4().hex, unit.new_domain_ref()
  823         )
  824         user = PROVIDERS.identity_api.create_user(
  825             unit.new_user_ref(domain_id=domain['id'])
  826         )
  827 
  828         update = {'user': {'email': uuid.uuid4().hex}}
  829         with self.test_client() as c:
  830             c.patch(
  831                 '/v3/users/%s' % user['id'], json=update, headers=self.headers,
  832                 expected_status_code=http.client.FORBIDDEN
  833             )
  834 
  835     def test_user_cannot_update_non_existent_user_forbidden(self):
  836         update = {'user': {'email': uuid.uuid4().hex}}
  837         with self.test_client() as c:
  838             c.patch(
  839                 '/v3/users/%s' % uuid.uuid4().hex, json=update,
  840                 headers=self.headers,
  841                 expected_status_code=http.client.FORBIDDEN
  842             )
  843 
  844     def test_user_can_delete_users_within_domain(self):
  845         user = PROVIDERS.identity_api.create_user(
  846             unit.new_user_ref(domain_id=self.domain_id)
  847         )
  848 
  849         with self.test_client() as c:
  850             c.delete(
  851                 '/v3/users/%s' % user['id'], headers=self.headers
  852             )
  853 
  854     def test_user_cannot_delete_users_in_other_domain(self):
  855         domain = PROVIDERS.resource_api.create_domain(
  856             uuid.uuid4().hex, unit.new_domain_ref()
  857         )
  858         user = PROVIDERS.identity_api.create_user(
  859             unit.new_user_ref(domain_id=domain['id'])
  860         )
  861 
  862         with self.test_client() as c:
  863             c.delete(
  864                 '/v3/users/%s' % user['id'], headers=self.headers,
  865                 expected_status_code=http.client.FORBIDDEN
  866             )
  867 
  868     def test_user_cannot_delete_non_existent_user_forbidden(self):
  869         with self.test_client() as c:
  870             c.delete(
  871                 '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
  872                 expected_status_code=http.client.FORBIDDEN
  873             )
  874 
  875 
  876 class ProjectReaderTests(base_classes.TestCaseWithBootstrap,
  877                          common_auth.AuthTestMixin,
  878                          _CommonUserTests,
  879                          _ProjectUserTests):
  880 
  881     def setUp(self):
  882         super(ProjectReaderTests, self).setUp()
  883         self.loadapp()
  884         self.useFixture(ksfixtures.Policy(self.config_fixture))
  885         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  886 
  887         domain = PROVIDERS.resource_api.create_domain(
  888             uuid.uuid4().hex, unit.new_domain_ref()
  889         )
  890         self.domain_id = domain['id']
  891 
  892         project = unit.new_project_ref(domain_id=self.domain_id)
  893         project = PROVIDERS.resource_api.create_project(project['id'], project)
  894         self.project_id = project['id']
  895 
  896         project_reader = unit.new_user_ref(domain_id=self.domain_id)
  897         self.user_id = PROVIDERS.identity_api.create_user(project_reader)['id']
  898         PROVIDERS.assignment_api.create_grant(
  899             self.bootstrapper.reader_role_id, user_id=self.user_id,
  900             project_id=self.project_id
  901         )
  902 
  903         auth = self.build_authentication_request(
  904             user_id=self.user_id, password=project_reader['password'],
  905             project_id=self.project_id,
  906         )
  907 
  908         # Grab a token using the persona we're testing and prepare headers
  909         # for requests we'll be making in the tests.
  910         with self.test_client() as c:
  911             r = c.post('/v3/auth/tokens', json=auth)
  912             self.token_id = r.headers['X-Subject-Token']
  913             self.headers = {'X-Auth-Token': self.token_id}
  914 
  915 
  916 class ProjectMemberTests(base_classes.TestCaseWithBootstrap,
  917                          common_auth.AuthTestMixin,
  918                          _CommonUserTests,
  919                          _ProjectUserTests):
  920 
  921     def setUp(self):
  922         super(ProjectMemberTests, self).setUp()
  923         self.loadapp()
  924         self.useFixture(ksfixtures.Policy(self.config_fixture))
  925         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  926 
  927         domain = PROVIDERS.resource_api.create_domain(
  928             uuid.uuid4().hex, unit.new_domain_ref()
  929         )
  930         self.domain_id = domain['id']
  931 
  932         project = unit.new_project_ref(domain_id=self.domain_id)
  933         project = PROVIDERS.resource_api.create_project(project['id'], project)
  934         self.project_id = project['id']
  935 
  936         project_member = unit.new_user_ref(domain_id=self.domain_id)
  937         self.user_id = PROVIDERS.identity_api.create_user(project_member)['id']
  938         PROVIDERS.assignment_api.create_grant(
  939             self.bootstrapper.member_role_id, user_id=self.user_id,
  940             project_id=self.project_id
  941         )
  942 
  943         auth = self.build_authentication_request(
  944             user_id=self.user_id, password=project_member['password'],
  945             project_id=self.project_id,
  946         )
  947 
  948         # Grab a token using the persona we're testing and prepare headers
  949         # for requests we'll be making in the tests.
  950         with self.test_client() as c:
  951             r = c.post('/v3/auth/tokens', json=auth)
  952             self.token_id = r.headers['X-Subject-Token']
  953             self.headers = {'X-Auth-Token': self.token_id}
  954 
  955 
  956 class ProjectAdminTests(base_classes.TestCaseWithBootstrap,
  957                         common_auth.AuthTestMixin,
  958                         _CommonUserTests,
  959                         _ProjectUserTests):
  960 
  961     def setUp(self):
  962         super(ProjectAdminTests, self).setUp()
  963         self.loadapp()
  964 
  965         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
  966         self.policy_file_name = self.policy_file.file_name
  967         self.useFixture(
  968             ksfixtures.Policy(
  969                 self.config_fixture, policy_file=self.policy_file_name
  970             )
  971         )
  972 
  973         self._override_policy()
  974         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  975 
  976         domain = PROVIDERS.resource_api.create_domain(
  977             uuid.uuid4().hex, unit.new_domain_ref()
  978         )
  979         self.domain_id = domain['id']
  980 
  981         self.user_id = self.bootstrapper.admin_user_id
  982         auth = self.build_authentication_request(
  983             user_id=self.user_id,
  984             password=self.bootstrapper.admin_password,
  985             project_id=self.bootstrapper.project_id
  986         )
  987 
  988         # Grab a token using the persona we're testing and prepare headers
  989         # for requests we'll be making in the tests.
  990         with self.test_client() as c:
  991             r = c.post('/v3/auth/tokens', json=auth)
  992             self.token_id = r.headers['X-Subject-Token']
  993             self.headers = {'X-Auth-Token': self.token_id}
  994 
  995     def _override_policy(self):
  996         # TODO(lbragstad): Remove this once the deprecated policies in
  997         # keystone.common.policies.users have been removed. This is only
  998         # here to make sure we test the new policies instead of the deprecated
  999         # ones. Oslo.policy will OR deprecated policies with new policies to
 1000         # maintain compatibility and give operators a chance to update
 1001         # permissions or update policies without breaking users. This will
 1002         # cause these specific tests to fail since we're trying to correct this
 1003         # broken behavior with better scope checking.
 1004         with open(self.policy_file_name, 'w') as f:
 1005             overridden_policies = {
 1006                 'identity:get_user': up.SYSTEM_READER_OR_DOMAIN_READER_OR_USER,
 1007                 'identity:list_users': up.SYSTEM_READER_OR_DOMAIN_READER,
 1008                 'identity:create_user': up.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
 1009                 'identity:update_user': up.SYSTEM_ADMIN_OR_DOMAIN_ADMIN,
 1010                 'identity:delete_user': up.SYSTEM_ADMIN_OR_DOMAIN_ADMIN
 1011             }
 1012             f.write(jsonutils.dumps(overridden_policies))