"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/protection/v3/test_assignment.py" (13 May 2020, 62806 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_assignment.py": 16.0.1_vs_17.0.0.

    1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 #    not use this file except in compliance with the License. You may obtain
    3 #    a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 #    Unless required by applicable law or agreed to in writing, software
    8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 #    License for the specific language governing permissions and limitations
   11 #    under the License.
   12 
   13 import copy
   14 import http.client
   15 import uuid
   16 
   17 from oslo_serialization import jsonutils
   18 
   19 from keystone.common.policies import role_assignment as rp
   20 from keystone.common import provider_api
   21 import keystone.conf
   22 from keystone.tests.common import auth as common_auth
   23 from keystone.tests import unit
   24 from keystone.tests.unit import base_classes
   25 from keystone.tests.unit import ksfixtures
   26 from keystone.tests.unit.ksfixtures import temporaryfile
   27 
   28 CONF = keystone.conf.CONF
   29 PROVIDERS = provider_api.ProviderAPIs
   30 
   31 
   32 class _AssignmentTestUtilities(object):
   33     """Useful utilities for setting up test assignments and assertions."""
   34 
   35     def _setup_test_role_assignments(self):
   36         # Utility to create assignments and return important data for
   37         # assertions.
   38 
   39         # Since the role doesn't really matter too much, we can just re-use an
   40         # existing role instead of creating a new one.
   41         role_id = self.bootstrapper.reader_role_id
   42 
   43         user = PROVIDERS.identity_api.create_user(
   44             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
   45         )
   46 
   47         group = PROVIDERS.identity_api.create_group(
   48             unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
   49         )
   50 
   51         domain = PROVIDERS.resource_api.create_domain(
   52             uuid.uuid4().hex, unit.new_domain_ref()
   53         )
   54 
   55         project = PROVIDERS.resource_api.create_project(
   56             uuid.uuid4().hex,
   57             unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
   58         )
   59 
   60         # create a user+project role assignment.
   61         PROVIDERS.assignment_api.create_grant(
   62             role_id, user_id=user['id'], project_id=project['id']
   63         )
   64 
   65         # create a user+domain role assignment.
   66         PROVIDERS.assignment_api.create_grant(
   67             role_id, user_id=user['id'], domain_id=domain['id']
   68         )
   69 
   70         # create a user+system role assignment.
   71         PROVIDERS.assignment_api.create_system_grant_for_user(
   72             user['id'], role_id
   73         )
   74 
   75         # create a group+project role assignment.
   76         PROVIDERS.assignment_api.create_grant(
   77             role_id, group_id=group['id'], project_id=project['id']
   78         )
   79 
   80         # create a group+domain role assignment.
   81         PROVIDERS.assignment_api.create_grant(
   82             role_id, group_id=group['id'], domain_id=domain['id']
   83         )
   84 
   85         # create a group+system role assignment.
   86         PROVIDERS.assignment_api.create_system_grant_for_group(
   87             group['id'], role_id
   88         )
   89 
   90         return {
   91             'user_id': user['id'],
   92             'group_id': group['id'],
   93             'domain_id': domain['id'],
   94             'project_id': project['id'],
   95             'role_id': role_id,
   96         }
   97 
   98     def _extract_role_assignments_from_response_body(self, r):
   99         # Condense the role assignment details into a set of key things we can
  100         # use in assertions.
  101         assignments = []
  102         for assignment in r.json['role_assignments']:
  103             a = {}
  104             if 'project' in assignment['scope']:
  105                 a['project_id'] = assignment['scope']['project']['id']
  106             elif 'domain' in assignment['scope']:
  107                 a['domain_id'] = assignment['scope']['domain']['id']
  108             elif 'system' in assignment['scope']:
  109                 a['system'] = 'all'
  110 
  111             if 'user' in assignment:
  112                 a['user_id'] = assignment['user']['id']
  113             elif 'group' in assignment:
  114                 a['group_id'] = assignment['group']['id']
  115 
  116             a['role_id'] = assignment['role']['id']
  117 
  118             assignments.append(a)
  119         return assignments
  120 
  121 
  122 class _SystemUserTests(object):
  123     """Common functionality for system users regardless of default role."""
  124 
  125     def test_user_can_list_all_role_assignments_in_the_deployment(self):
  126         assignments = self._setup_test_role_assignments()
  127 
  128         # this assignment is created by keystone-manage bootstrap
  129         self.expected.append({
  130             'user_id': self.bootstrapper.admin_user_id,
  131             'project_id': self.bootstrapper.project_id,
  132             'role_id': self.bootstrapper.admin_role_id
  133         })
  134 
  135         # this assignment is created by keystone-manage bootstrap
  136         self.expected.append({
  137             'user_id': self.bootstrapper.admin_user_id,
  138             'system': 'all',
  139             'role_id': self.bootstrapper.admin_role_id
  140         })
  141         self.expected.append({
  142             'user_id': assignments['user_id'],
  143             'project_id': assignments['project_id'],
  144             'role_id': assignments['role_id']
  145         })
  146         self.expected.append({
  147             'user_id': assignments['user_id'],
  148             'domain_id': assignments['domain_id'],
  149             'role_id': assignments['role_id']
  150         })
  151         self.expected.append({
  152             'user_id': assignments['user_id'],
  153             'system': 'all',
  154             'role_id': assignments['role_id']
  155         })
  156         self.expected.append({
  157             'group_id': assignments['group_id'],
  158             'project_id': assignments['project_id'],
  159             'role_id': assignments['role_id']
  160         })
  161         self.expected.append({
  162             'group_id': assignments['group_id'],
  163             'domain_id': assignments['domain_id'],
  164             'role_id': assignments['role_id']
  165         })
  166         self.expected.append({
  167             'group_id': assignments['group_id'],
  168             'system': 'all',
  169             'role_id': assignments['role_id']
  170         })
  171 
  172         with self.test_client() as c:
  173             r = c.get('/v3/role_assignments', headers=self.headers)
  174             self.assertEqual(
  175                 len(self.expected), len(r.json['role_assignments'])
  176             )
  177             actual = self._extract_role_assignments_from_response_body(r)
  178             for assignment in actual:
  179                 self.assertIn(assignment, self.expected)
  180 
  181     def test_user_can_list_all_role_names_assignments_in_the_deployment(self):
  182         assignments = self._setup_test_role_assignments()
  183 
  184         # this assignment is created by keystone-manage bootstrap
  185         self.expected.append({
  186             'user_id': self.bootstrapper.admin_user_id,
  187             'project_id': self.bootstrapper.project_id,
  188             'role_id': self.bootstrapper.admin_role_id
  189         })
  190 
  191         # this assignment is created by keystone-manage bootstrap
  192         self.expected.append({
  193             'user_id': self.bootstrapper.admin_user_id,
  194             'system': 'all',
  195             'role_id': self.bootstrapper.admin_role_id
  196         })
  197         self.expected.append({
  198             'user_id': assignments['user_id'],
  199             'project_id': assignments['project_id'],
  200             'role_id': assignments['role_id']
  201         })
  202         self.expected.append({
  203             'user_id': assignments['user_id'],
  204             'domain_id': assignments['domain_id'],
  205             'role_id': assignments['role_id']
  206         })
  207         self.expected.append({
  208             'user_id': assignments['user_id'],
  209             'system': 'all',
  210             'role_id': assignments['role_id']
  211         })
  212         self.expected.append({
  213             'group_id': assignments['group_id'],
  214             'project_id': assignments['project_id'],
  215             'role_id': assignments['role_id']
  216         })
  217         self.expected.append({
  218             'group_id': assignments['group_id'],
  219             'domain_id': assignments['domain_id'],
  220             'role_id': assignments['role_id']
  221         })
  222         self.expected.append({
  223             'group_id': assignments['group_id'],
  224             'system': 'all',
  225             'role_id': assignments['role_id']
  226         })
  227 
  228         with self.test_client() as c:
  229             r = c.get(
  230                 '/v3/role_assignments?include_names=True', headers=self.headers
  231             )
  232             self.assertEqual(
  233                 len(self.expected), len(r.json['role_assignments'])
  234             )
  235             actual = self._extract_role_assignments_from_response_body(r)
  236             for assignment in actual:
  237                 self.assertIn(assignment, self.expected)
  238 
  239     def test_user_can_filter_role_assignments_by_project(self):
  240         assignments = self._setup_test_role_assignments()
  241         expected = [
  242             {
  243                 'user_id': assignments['user_id'],
  244                 'project_id': assignments['project_id'],
  245                 'role_id': assignments['role_id']
  246             },
  247             {
  248                 'group_id': assignments['group_id'],
  249                 'project_id': assignments['project_id'],
  250                 'role_id': assignments['role_id']
  251             }
  252         ]
  253         project_id = assignments['project_id']
  254 
  255         with self.test_client() as c:
  256             r = c.get(
  257                 '/v3/role_assignments?scope.project.id=%s' % project_id,
  258                 headers=self.headers
  259             )
  260             self.assertEqual(len(expected), len(r.json['role_assignments']))
  261             actual = self._extract_role_assignments_from_response_body(r)
  262             for assignment in actual:
  263                 self.assertIn(assignment, expected)
  264 
  265     def test_user_can_filter_role_assignments_by_domain(self):
  266         assignments = self._setup_test_role_assignments()
  267         expected = [
  268             {
  269                 'user_id': assignments['user_id'],
  270                 'domain_id': assignments['domain_id'],
  271                 'role_id': assignments['role_id']
  272             },
  273             {
  274                 'group_id': assignments['group_id'],
  275                 'domain_id': assignments['domain_id'],
  276                 'role_id': assignments['role_id']
  277             }
  278         ]
  279         domain_id = assignments['domain_id']
  280 
  281         with self.test_client() as c:
  282             r = c.get(
  283                 '/v3/role_assignments?scope.domain.id=%s' % domain_id,
  284                 headers=self.headers
  285             )
  286             self.assertEqual(len(expected), len(r.json['role_assignments']))
  287             actual = self._extract_role_assignments_from_response_body(r)
  288             for assignment in actual:
  289                 self.assertIn(assignment, expected)
  290 
  291     def test_user_can_filter_role_assignments_by_system(self):
  292         assignments = self._setup_test_role_assignments()
  293 
  294         # this assignment is created by keystone-manage bootstrap
  295         self.expected.append({
  296             'user_id': self.bootstrapper.admin_user_id,
  297             'system': 'all',
  298             'role_id': self.bootstrapper.admin_role_id
  299         })
  300         self.expected.append({
  301             'user_id': assignments['user_id'],
  302             'system': 'all',
  303             'role_id': assignments['role_id']
  304         })
  305         self.expected.append({
  306             'group_id': assignments['group_id'],
  307             'system': 'all',
  308             'role_id': assignments['role_id']
  309         })
  310 
  311         with self.test_client() as c:
  312             r = c.get(
  313                 '/v3/role_assignments?scope.system=all',
  314                 headers=self.headers
  315             )
  316             self.assertEqual(
  317                 len(self.expected), len(r.json['role_assignments'])
  318             )
  319             actual = self._extract_role_assignments_from_response_body(r)
  320             for assignment in actual:
  321                 self.assertIn(assignment, self.expected)
  322 
  323     def test_user_can_filter_role_assignments_by_user(self):
  324         assignments = self._setup_test_role_assignments()
  325         expected = [
  326             # assignment of the user running the test case
  327             {
  328                 'user_id': assignments['user_id'],
  329                 'project_id': assignments['project_id'],
  330                 'role_id': assignments['role_id']
  331             },
  332             {
  333                 'user_id': assignments['user_id'],
  334                 'domain_id': assignments['domain_id'],
  335                 'role_id': assignments['role_id']
  336             },
  337             {
  338                 'user_id': assignments['user_id'],
  339                 'system': 'all',
  340                 'role_id': assignments['role_id']
  341             }
  342         ]
  343         user_id = assignments['user_id']
  344 
  345         with self.test_client() as c:
  346             r = c.get(
  347                 '/v3/role_assignments?user.id=%s' % user_id,
  348                 headers=self.headers
  349             )
  350             self.assertEqual(len(expected), len(r.json['role_assignments']))
  351             actual = self._extract_role_assignments_from_response_body(r)
  352             for assignment in actual:
  353                 self.assertIn(assignment, expected)
  354 
  355     def test_user_can_filter_role_assignments_by_group(self):
  356         assignments = self._setup_test_role_assignments()
  357         expected = [
  358             {
  359                 'group_id': assignments['group_id'],
  360                 'project_id': assignments['project_id'],
  361                 'role_id': assignments['role_id']
  362             },
  363             {
  364                 'group_id': assignments['group_id'],
  365                 'domain_id': assignments['domain_id'],
  366                 'role_id': assignments['role_id']
  367             },
  368             {
  369                 'group_id': assignments['group_id'],
  370                 'system': 'all',
  371                 'role_id': assignments['role_id']
  372             }
  373         ]
  374         group_id = assignments['group_id']
  375 
  376         with self.test_client() as c:
  377             r = c.get(
  378                 '/v3/role_assignments?group.id=%s' % group_id,
  379                 headers=self.headers
  380             )
  381             self.assertEqual(len(expected), len(r.json['role_assignments']))
  382             actual = self._extract_role_assignments_from_response_body(r)
  383             for assignment in actual:
  384                 self.assertIn(assignment, expected)
  385 
  386     def test_user_can_filter_role_assignments_by_role(self):
  387         assignments = self._setup_test_role_assignments()
  388         self.expected = [ra for ra in self.expected
  389                          if ra['role_id'] == assignments['role_id']]
  390         self.expected.append({
  391             'user_id': assignments['user_id'],
  392             'project_id': assignments['project_id'],
  393             'role_id': assignments['role_id']
  394         })
  395         self.expected.append({
  396             'user_id': assignments['user_id'],
  397             'domain_id': assignments['domain_id'],
  398             'role_id': assignments['role_id']
  399         })
  400         self.expected.append({
  401             'user_id': assignments['user_id'],
  402             'system': 'all',
  403             'role_id': assignments['role_id']
  404         })
  405         self.expected.append({
  406             'group_id': assignments['group_id'],
  407             'project_id': assignments['project_id'],
  408             'role_id': assignments['role_id']
  409         })
  410         self.expected.append({
  411             'group_id': assignments['group_id'],
  412             'domain_id': assignments['domain_id'],
  413             'role_id': assignments['role_id']
  414         })
  415         self.expected.append({
  416             'group_id': assignments['group_id'],
  417             'system': 'all',
  418             'role_id': assignments['role_id']
  419         })
  420 
  421         role_id = assignments['role_id']
  422 
  423         with self.test_client() as c:
  424             r = c.get(
  425                 '/v3/role_assignments?role.id=%s&include_names=True' % role_id,
  426                 headers=self.headers
  427             )
  428             self.assertEqual(
  429                 len(self.expected), len(r.json['role_assignments'])
  430             )
  431             actual = self._extract_role_assignments_from_response_body(r)
  432             for assignment in actual:
  433                 self.assertIn(assignment, self.expected)
  434 
  435     def test_user_can_filter_role_assignments_by_project_and_role(self):
  436         assignments = self._setup_test_role_assignments()
  437         expected = [
  438             {
  439                 'user_id': assignments['user_id'],
  440                 'project_id': assignments['project_id'],
  441                 'role_id': assignments['role_id']
  442             },
  443             {
  444                 'group_id': assignments['group_id'],
  445                 'project_id': assignments['project_id'],
  446                 'role_id': assignments['role_id']
  447             },
  448         ]
  449 
  450         with self.test_client() as c:
  451             qs = (assignments['project_id'], assignments['role_id'])
  452             r = c.get(
  453                 '/v3/role_assignments?scope.project.id=%s&role.id=%s' % qs,
  454                 headers=self.headers
  455             )
  456             self.assertEqual(len(expected), len(r.json['role_assignments']))
  457             actual = self._extract_role_assignments_from_response_body(r)
  458             for assignment in actual:
  459                 self.assertIn(assignment, expected)
  460 
  461     def test_user_can_filter_role_assignments_by_domain_and_role(self):
  462         assignments = self._setup_test_role_assignments()
  463         expected = [
  464             {
  465                 'user_id': assignments['user_id'],
  466                 'domain_id': assignments['domain_id'],
  467                 'role_id': assignments['role_id']
  468             },
  469             {
  470                 'group_id': assignments['group_id'],
  471                 'domain_id': assignments['domain_id'],
  472                 'role_id': assignments['role_id']
  473             },
  474         ]
  475         qs = (assignments['domain_id'], assignments['role_id'])
  476 
  477         with self.test_client() as c:
  478             r = c.get(
  479                 '/v3/role_assignments?scope.domain.id=%s&role.id=%s' % qs,
  480                 headers=self.headers
  481             )
  482             self.assertEqual(len(expected), len(r.json['role_assignments']))
  483             actual = self._extract_role_assignments_from_response_body(r)
  484             for assignment in actual:
  485                 self.assertIn(assignment, expected)
  486 
  487     def test_user_can_filter_role_assignments_by_system_and_role(self):
  488         assignments = self._setup_test_role_assignments()
  489         self.expected = [ra for ra in self.expected
  490                          if ra['role_id'] == assignments['role_id']]
  491         self.expected.append({
  492             'user_id': assignments['user_id'],
  493             'system': 'all',
  494             'role_id': assignments['role_id']
  495         })
  496         self.expected.append({
  497             'group_id': assignments['group_id'],
  498             'system': 'all',
  499             'role_id': assignments['role_id']
  500         })
  501         role_id = assignments['role_id']
  502 
  503         with self.test_client() as c:
  504             r = c.get(
  505                 '/v3/role_assignments?scope.system=all&role.id=%s' % role_id,
  506                 headers=self.headers
  507             )
  508             self.assertEqual(
  509                 len(self.expected), len(r.json['role_assignments'])
  510             )
  511             actual = self._extract_role_assignments_from_response_body(r)
  512             for assignment in actual:
  513                 self.assertIn(assignment, self.expected)
  514 
  515     def test_user_can_filter_role_assignments_by_user_and_role(self):
  516         assignments = self._setup_test_role_assignments()
  517         expected = [
  518             {
  519                 'user_id': assignments['user_id'],
  520                 'project_id': assignments['project_id'],
  521                 'role_id': assignments['role_id']
  522             },
  523             {
  524                 'user_id': assignments['user_id'],
  525                 'domain_id': assignments['domain_id'],
  526                 'role_id': assignments['role_id']
  527             },
  528             {
  529                 'user_id': assignments['user_id'],
  530                 'system': 'all',
  531                 'role_id': assignments['role_id']
  532             }
  533         ]
  534         qs = (assignments['user_id'], assignments['role_id'])
  535 
  536         with self.test_client() as c:
  537             r = c.get(
  538                 '/v3/role_assignments?user.id=%s&role.id=%s' % qs,
  539                 headers=self.headers
  540             )
  541             self.assertEqual(len(expected), len(r.json['role_assignments']))
  542             actual = self._extract_role_assignments_from_response_body(r)
  543             for assignment in actual:
  544                 self.assertIn(assignment, expected)
  545 
  546     def test_user_can_filter_role_assignments_by_group_and_role(self):
  547         assignments = self._setup_test_role_assignments()
  548         expected = [
  549             {
  550                 'group_id': assignments['group_id'],
  551                 'project_id': assignments['project_id'],
  552                 'role_id': assignments['role_id']
  553             },
  554             {
  555                 'group_id': assignments['group_id'],
  556                 'domain_id': assignments['domain_id'],
  557                 'role_id': assignments['role_id']
  558             },
  559             {
  560                 'group_id': assignments['group_id'],
  561                 'system': 'all',
  562                 'role_id': assignments['role_id']
  563             }
  564         ]
  565 
  566         with self.test_client() as c:
  567             qs = (assignments['group_id'], assignments['role_id'])
  568             r = c.get(
  569                 '/v3/role_assignments?group.id=%s&role.id=%s' % qs,
  570                 headers=self.headers
  571             )
  572             self.assertEqual(len(expected), len(r.json['role_assignments']))
  573             actual = self._extract_role_assignments_from_response_body(r)
  574             for assignment in actual:
  575                 self.assertIn(assignment, expected)
  576 
  577     def test_user_can_filter_role_assignments_by_project_and_user(self):
  578         assignments = self._setup_test_role_assignments()
  579         expected = [
  580             {
  581                 'user_id': assignments['user_id'],
  582                 'project_id': assignments['project_id'],
  583                 'role_id': assignments['role_id']
  584             }
  585         ]
  586         qs = (assignments['project_id'], assignments['user_id'])
  587 
  588         with self.test_client() as c:
  589             r = c.get(
  590                 '/v3/role_assignments?scope.project.id=%s&user.id=%s' % qs,
  591                 headers=self.headers
  592             )
  593             self.assertEqual(len(expected), len(r.json['role_assignments']))
  594             actual = self._extract_role_assignments_from_response_body(r)
  595             for assignment in actual:
  596                 self.assertIn(assignment, expected)
  597 
  598     def test_user_can_filter_role_assignments_by_project_and_group(self):
  599         assignments = self._setup_test_role_assignments()
  600         expected = [
  601             {
  602                 'group_id': assignments['group_id'],
  603                 'project_id': assignments['project_id'],
  604                 'role_id': assignments['role_id']
  605             }
  606         ]
  607         qs = (assignments['project_id'], assignments['group_id'])
  608 
  609         with self.test_client() as c:
  610             r = c.get(
  611                 '/v3/role_assignments?scope.project.id=%s&group.id=%s' % qs,
  612                 headers=self.headers
  613             )
  614             self.assertEqual(len(expected), len(r.json['role_assignments']))
  615             actual = self._extract_role_assignments_from_response_body(r)
  616             for assignment in actual:
  617                 self.assertIn(assignment, expected)
  618 
  619     def test_user_can_filter_role_assignments_by_domain_and_user(self):
  620         assignments = self._setup_test_role_assignments()
  621         expected = [
  622             {
  623                 'user_id': assignments['user_id'],
  624                 'domain_id': assignments['domain_id'],
  625                 'role_id': assignments['role_id']
  626             }
  627         ]
  628         qs = (assignments['domain_id'], assignments['user_id'])
  629 
  630         with self.test_client() as c:
  631             r = c.get(
  632                 '/v3/role_assignments?scope.domain.id=%s&user.id=%s' % qs,
  633                 headers=self.headers
  634             )
  635             self.assertEqual(len(expected), len(r.json['role_assignments']))
  636             actual = self._extract_role_assignments_from_response_body(r)
  637             for assignment in actual:
  638                 self.assertIn(assignment, expected)
  639 
  640     def test_user_can_filter_role_assignments_by_domain_and_group(self):
  641         assignments = self._setup_test_role_assignments()
  642         expected = [
  643             {
  644                 'group_id': assignments['group_id'],
  645                 'domain_id': assignments['domain_id'],
  646                 'role_id': assignments['role_id']
  647             }
  648         ]
  649         qs = (assignments['domain_id'], assignments['group_id'])
  650 
  651         with self.test_client() as c:
  652             r = c.get(
  653                 '/v3/role_assignments?scope.domain.id=%s&group.id=%s' % qs,
  654                 headers=self.headers
  655             )
  656             self.assertEqual(len(expected), len(r.json['role_assignments']))
  657             actual = self._extract_role_assignments_from_response_body(r)
  658             for assignment in actual:
  659                 self.assertIn(assignment, expected)
  660 
  661     def test_user_can_list_assignments_for_subtree(self):
  662         assignments = self._setup_test_role_assignments()
  663         user = PROVIDERS.identity_api.create_user(
  664             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  665         )
  666         project = PROVIDERS.resource_api.create_project(
  667             uuid.uuid4().hex,
  668             unit.new_project_ref(domain_id=CONF.identity.default_domain_id,
  669                                  parent_id=assignments['project_id'])
  670         )
  671         PROVIDERS.assignment_api.create_grant(
  672             assignments['role_id'],
  673             user_id=user['id'],
  674             project_id=project['id']
  675         )
  676         expected = [
  677             {
  678                 'user_id': assignments['user_id'],
  679                 'project_id': assignments['project_id'],
  680                 'role_id': assignments['role_id']
  681             },
  682             {
  683                 'group_id': assignments['group_id'],
  684                 'project_id': assignments['project_id'],
  685                 'role_id': assignments['role_id']
  686             },
  687             {
  688                 'user_id': user['id'],
  689                 'project_id': project['id'],
  690                 'role_id': assignments['role_id']
  691             }
  692         ]
  693         with self.test_client() as c:
  694             r = c.get(
  695                 ('/v3/role_assignments?scope.project.id=%s&include_subtree' %
  696                  assignments['project_id']),
  697                 headers=self.headers
  698             )
  699             self.assertEqual(len(expected), len(r.json['role_assignments']))
  700             actual = self._extract_role_assignments_from_response_body(r)
  701             for assignment in actual:
  702                 self.assertIn(assignment, expected)
  703 
  704 
  705 class _DomainUserTests(object):
  706     """Common functionality for domain users."""
  707 
  708     def _setup_test_role_assignments_for_domain(self):
  709         # Populate role assignment within `self.domain_id` so that we can
  710         # assert users can view assignments within the domain they have
  711         # authorization on
  712         role_id = self.bootstrapper.reader_role_id
  713 
  714         user = PROVIDERS.identity_api.create_user(
  715             unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  716         )
  717 
  718         group = PROVIDERS.identity_api.create_group(
  719             unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  720         )
  721 
  722         project = PROVIDERS.resource_api.create_project(
  723             uuid.uuid4().hex,
  724             unit.new_project_ref(domain_id=self.domain_id)
  725         )
  726 
  727         # create a user+project role assignment.
  728         PROVIDERS.assignment_api.create_grant(
  729             role_id, user_id=user['id'], project_id=project['id']
  730         )
  731 
  732         # create a user+domain role assignment.
  733         PROVIDERS.assignment_api.create_grant(
  734             role_id, user_id=user['id'], domain_id=self.domain_id
  735         )
  736 
  737         # create a group+project role assignment.
  738         PROVIDERS.assignment_api.create_grant(
  739             role_id, group_id=group['id'], project_id=project['id']
  740         )
  741 
  742         # create a group+domain role assignment.
  743         PROVIDERS.assignment_api.create_grant(
  744             role_id, group_id=group['id'], domain_id=self.domain_id
  745         )
  746 
  747         return {
  748             'user_id': user['id'],
  749             'group_id': group['id'],
  750             'project_id': project['id'],
  751             'role_id': role_id,
  752         }
  753 
  754     def test_user_can_list_all_assignments_in_their_domain(self):
  755         self._setup_test_role_assignments()
  756         domain_assignments = self._setup_test_role_assignments_for_domain()
  757 
  758         self.expected.append({
  759             'user_id': domain_assignments['user_id'],
  760             'domain_id': self.domain_id,
  761             'role_id': domain_assignments['role_id']
  762         })
  763         self.expected.append({
  764             'user_id': domain_assignments['user_id'],
  765             'project_id': domain_assignments['project_id'],
  766             'role_id': domain_assignments['role_id']
  767         })
  768         self.expected.append({
  769             'group_id': domain_assignments['group_id'],
  770             'domain_id': self.domain_id,
  771             'role_id': domain_assignments['role_id']
  772         })
  773         self.expected.append({
  774             'group_id': domain_assignments['group_id'],
  775             'project_id': domain_assignments['project_id'],
  776             'role_id': domain_assignments['role_id']
  777         })
  778 
  779         with self.test_client() as c:
  780             r = c.get('/v3/role_assignments', headers=self.headers)
  781             self.assertEqual(
  782                 len(self.expected), len(r.json['role_assignments'])
  783             )
  784             actual = self._extract_role_assignments_from_response_body(r)
  785             for assignment in actual:
  786                 self.assertIn(assignment, self.expected)
  787 
  788     def test_user_can_filter_role_assignments_by_project_in_domain(self):
  789         self._setup_test_role_assignments()
  790         domain_assignments = self._setup_test_role_assignments_for_domain()
  791 
  792         expected = [
  793             {
  794                 'user_id': domain_assignments['user_id'],
  795                 'project_id': domain_assignments['project_id'],
  796                 'role_id': domain_assignments['role_id']
  797             },
  798             {
  799                 'group_id': domain_assignments['group_id'],
  800                 'project_id': domain_assignments['project_id'],
  801                 'role_id': domain_assignments['role_id']
  802             }
  803         ]
  804 
  805         project_id = domain_assignments['project_id']
  806 
  807         with self.test_client() as c:
  808             r = c.get(
  809                 '/v3/role_assignments?scope.project.id=%s' % project_id,
  810                 headers=self.headers
  811             )
  812             self.assertEqual(len(expected), len(r.json['role_assignments']))
  813             actual = self._extract_role_assignments_from_response_body(r)
  814             for assignment in actual:
  815                 self.assertIn(assignment, expected)
  816 
  817     def test_user_can_filter_role_assignments_by_domain(self):
  818         # This shouldn't really provide any more value than just calling GET
  819         # /v3/role_assignments with a domain-scoped token, but we test it
  820         # anyway.
  821         self._setup_test_role_assignments()
  822         domain_assignments = self._setup_test_role_assignments_for_domain()
  823 
  824         self.expected.append({
  825             'user_id': domain_assignments['user_id'],
  826             'domain_id': self.domain_id,
  827             'role_id': domain_assignments['role_id']
  828         })
  829         self.expected.append({
  830             'group_id': domain_assignments['group_id'],
  831             'domain_id': self.domain_id,
  832             'role_id': domain_assignments['role_id']
  833         })
  834 
  835         with self.test_client() as c:
  836             r = c.get(
  837                 '/v3/role_assignments?scope.domain.id=%s' % self.domain_id,
  838                 headers=self.headers
  839             )
  840             self.assertEqual(
  841                 len(self.expected), len(r.json['role_assignments'])
  842             )
  843             actual = self._extract_role_assignments_from_response_body(r)
  844             for assignment in actual:
  845                 self.assertIn(assignment, self.expected)
  846 
  847     def test_user_can_filter_role_assignments_by_user_of_domain(self):
  848         self._setup_test_role_assignments()
  849         domain_assignments = self._setup_test_role_assignments_for_domain()
  850 
  851         expected = [
  852             {
  853                 'user_id': domain_assignments['user_id'],
  854                 'domain_id': self.domain_id,
  855                 'role_id': domain_assignments['role_id']
  856             },
  857             {
  858                 'user_id': domain_assignments['user_id'],
  859                 'project_id': domain_assignments['project_id'],
  860                 'role_id': domain_assignments['role_id']
  861             }
  862         ]
  863 
  864         user_id = domain_assignments['user_id']
  865 
  866         with self.test_client() as c:
  867             r = c.get(
  868                 '/v3/role_assignments?user.id=%s' % user_id,
  869                 headers=self.headers
  870             )
  871             self.assertEqual(len(expected), len(r.json['role_assignments']))
  872             actual = self._extract_role_assignments_from_response_body(r)
  873             for assignment in actual:
  874                 self.assertIn(assignment, expected)
  875 
  876     def test_user_can_filter_role_assignments_by_group_of_domain(self):
  877         self._setup_test_role_assignments()
  878         domain_assignments = self._setup_test_role_assignments_for_domain()
  879 
  880         expected = [
  881             {
  882                 'group_id': domain_assignments['group_id'],
  883                 'domain_id': self.domain_id,
  884                 'role_id': domain_assignments['role_id']
  885             },
  886             {
  887                 'group_id': domain_assignments['group_id'],
  888                 'project_id': domain_assignments['project_id'],
  889                 'role_id': domain_assignments['role_id']
  890             }
  891         ]
  892 
  893         group_id = domain_assignments['group_id']
  894 
  895         with self.test_client() as c:
  896             r = c.get(
  897                 '/v3/role_assignments?group.id=%s' % group_id,
  898                 headers=self.headers
  899             )
  900             self.assertEqual(len(expected), len(r.json['role_assignments']))
  901             actual = self._extract_role_assignments_from_response_body(r)
  902             for assignment in actual:
  903                 self.assertIn(assignment, expected)
  904 
  905     def test_user_cannot_filter_role_assignments_by_system(self):
  906         self._setup_test_role_assignments()
  907         self._setup_test_role_assignments_for_domain()
  908 
  909         with self.test_client() as c:
  910             r = c.get(
  911                 '/v3/role_assignments?scope.system=all',
  912                 headers=self.headers
  913             )
  914             self.assertEqual(0, len(r.json['role_assignments']))
  915 
  916     def test_user_cannot_filter_role_assignments_by_other_domain(self):
  917         assignments = self._setup_test_role_assignments()
  918         domain = assignments['domain_id']
  919         with self.test_client() as c:
  920             r = c.get(
  921                 '/v3/role_assignments?scope.domain.id=%s' % domain,
  922                 headers=self.headers
  923             )
  924             self.assertEqual([], r.json['role_assignments'])
  925 
  926     def test_user_cannot_filter_role_assignments_by_other_domain_project(self):
  927         assignments = self._setup_test_role_assignments()
  928         self._setup_test_role_assignments_for_domain()
  929 
  930         # This project is in an entirely separate domain that this user doesn't
  931         # have authorization to access, so they should only see an empty list
  932         project_id = assignments['project_id']
  933 
  934         with self.test_client() as c:
  935             r = c.get(
  936                 '/v3/role_assignments?scope.project.id=%s' % project_id,
  937                 headers=self.headers
  938             )
  939             self.assertEqual(0, len(r.json['role_assignments']))
  940 
  941     def test_user_cannot_filter_role_assignments_by_other_domain_user(self):
  942         assignments = self._setup_test_role_assignments()
  943         self._setup_test_role_assignments_for_domain()
  944 
  945         # This user doesn't have any role assignments on self.domain_id, so the
  946         # domain user of self.domain_id should only see an empty list of role
  947         # assignments.
  948         user_id = assignments['user_id']
  949 
  950         with self.test_client() as c:
  951             r = c.get(
  952                 '/v3/role_assignments?user.id=%s' % user_id,
  953                 headers=self.headers
  954             )
  955             self.assertEqual(0, len(r.json['role_assignments']))
  956 
  957     def test_user_cannot_filter_role_assignments_by_other_domain_group(self):
  958         assignments = self._setup_test_role_assignments()
  959         self._setup_test_role_assignments_for_domain()
  960 
  961         # This group doesn't have any role assignments on self.domain_id, so
  962         # the domain user of self.domain_id should only see an empty list of
  963         # role assignments.
  964         group_id = assignments['group_id']
  965 
  966         with self.test_client() as c:
  967             r = c.get(
  968                 '/v3/role_assignments?group.id=%s' % group_id,
  969                 headers=self.headers,
  970             )
  971             self.assertEqual(0, len(r.json['role_assignments']))
  972 
  973     def test_user_can_list_assignments_for_subtree_in_their_domain(self):
  974         assignments = self._setup_test_role_assignments()
  975         domain_assignments = self._setup_test_role_assignments_for_domain()
  976         user = PROVIDERS.identity_api.create_user(
  977             unit.new_user_ref(domain_id=self.domain_id)
  978         )
  979         project = PROVIDERS.resource_api.create_project(
  980             uuid.uuid4().hex,
  981             unit.new_project_ref(domain_id=self.domain_id,
  982                                  parent_id=domain_assignments['project_id'])
  983         )
  984         PROVIDERS.assignment_api.create_grant(
  985             assignments['role_id'],
  986             user_id=user['id'],
  987             project_id=project['id']
  988         )
  989         expected = [
  990             {
  991                 'user_id': domain_assignments['user_id'],
  992                 'project_id': domain_assignments['project_id'],
  993                 'role_id': assignments['role_id']
  994             },
  995             {
  996                 'group_id': domain_assignments['group_id'],
  997                 'project_id': domain_assignments['project_id'],
  998                 'role_id': assignments['role_id']
  999             },
 1000             {
 1001                 'user_id': user['id'],
 1002                 'project_id': project['id'],
 1003                 'role_id': assignments['role_id']
 1004             }
 1005         ]
 1006         with self.test_client() as c:
 1007             r = c.get(
 1008                 ('/v3/role_assignments?scope.project.id=%s&include_subtree' %
 1009                  domain_assignments['project_id']),
 1010                 headers=self.headers
 1011             )
 1012             self.assertEqual(len(expected), len(r.json['role_assignments']))
 1013             actual = self._extract_role_assignments_from_response_body(r)
 1014             for assignment in actual:
 1015                 self.assertIn(assignment, expected)
 1016 
 1017     def test_user_cannot_list_assignments_for_subtree_in_other_domain(self):
 1018         assignments = self._setup_test_role_assignments()
 1019         with self.test_client() as c:
 1020             c.get(
 1021                 ('/v3/role_assignments?scope.project.id=%s&include_subtree' %
 1022                  assignments['project_id']),
 1023                 headers=self.headers,
 1024                 expected_status_code=http.client.FORBIDDEN
 1025             )
 1026 
 1027 
 1028 class _ProjectUserTests(object):
 1029 
 1030     def test_user_cannot_list_all_assignments_in_their_project(self):
 1031         with self.test_client() as c:
 1032             c.get(
 1033                 '/v3/role_assignments', headers=self.headers,
 1034                 expected_status_code=http.client.FORBIDDEN
 1035             )
 1036 
 1037     def test_user_cannot_filter_role_assignments_by_user_of_project(self):
 1038         assignments = self._setup_test_role_assignments()
 1039         user_id = assignments['user_id']
 1040 
 1041         with self.test_client() as c:
 1042             c.get(
 1043                 '/v3/role_assignments?user.id=%s' % user_id,
 1044                 headers=self.headers,
 1045                 expected_status_code=http.client.FORBIDDEN
 1046             )
 1047 
 1048     def test_user_cannot_filter_role_assignments_by_group_of_project(self):
 1049         assignments = self._setup_test_role_assignments()
 1050         group_id = assignments['group_id']
 1051 
 1052         with self.test_client() as c:
 1053             c.get(
 1054                 '/v3/role_assignments?group.id=%s' % group_id,
 1055                 headers=self.headers,
 1056                 expected_status_code=http.client.FORBIDDEN
 1057             )
 1058 
 1059     def test_user_cannot_filter_role_assignments_by_system(self):
 1060         with self.test_client() as c:
 1061             c.get(
 1062                 '/v3/role_assignments?scope.system=all',
 1063                 headers=self.headers,
 1064                 expected_status_code=http.client.FORBIDDEN
 1065             )
 1066 
 1067     def test_user_cannot_filter_role_assignments_by_domain(self):
 1068         with self.test_client() as c:
 1069             c.get(
 1070                 '/v3/role_assignments?scope.domain.id=%s'
 1071                 % self.domain_id,
 1072                 headers=self.headers,
 1073                 expected_status_code=http.client.FORBIDDEN
 1074             )
 1075 
 1076     def test_user_cannot_filter_role_assignments_by_other_project(self):
 1077         project1 = PROVIDERS.resource_api.create_project(
 1078             uuid.uuid4().hex,
 1079             unit.new_project_ref(domain_id=self.domain_id)
 1080         )
 1081 
 1082         with self.test_client() as c:
 1083             c.get(
 1084                 '/v3/role_assignments?scope.project.id=%s'
 1085                 % project1,
 1086                 headers=self.headers,
 1087                 expected_status_code=http.client.FORBIDDEN
 1088             )
 1089 
 1090     def test_user_cannot_filter_role_assignments_by_other_project_user(self):
 1091         assignments = self._setup_test_role_assignments()
 1092 
 1093         # This user doesn't have any role assignments on self.project_id, so
 1094         # the project user of self.project_id should only see an empty list of
 1095         # role assignments.
 1096         user_id = assignments['user_id']
 1097 
 1098         with self.test_client() as c:
 1099             c.get(
 1100                 '/v3/role_assignments?user.id=%s' % user_id,
 1101                 headers=self.headers,
 1102                 expected_status_code=http.client.FORBIDDEN
 1103             )
 1104 
 1105     def test_user_cannot_filter_role_assignments_by_other_project_group(self):
 1106         assignments = self._setup_test_role_assignments()
 1107 
 1108         # This group doesn't have any role assignments on self.project_id, so
 1109         # the project user of self.project_id should only see an empty list of
 1110         # role assignments.
 1111         group_id = assignments['group_id']
 1112 
 1113         with self.test_client() as c:
 1114             c.get(
 1115                 '/v3/role_assignments?group.id=%s' % group_id,
 1116                 headers=self.headers,
 1117                 expected_status_code=http.client.FORBIDDEN
 1118             )
 1119 
 1120 
 1121 class _ProjectReaderMemberTests(object):
 1122     def test_user_cannot_list_assignments_for_subtree(self):
 1123         user = PROVIDERS.identity_api.create_user(
 1124             unit.new_user_ref(domain_id=self.domain_id)
 1125         )
 1126         project = PROVIDERS.resource_api.create_project(
 1127             uuid.uuid4().hex,
 1128             unit.new_project_ref(domain_id=self.domain_id,
 1129                                  parent_id=self.project_id)
 1130         )
 1131         PROVIDERS.assignment_api.create_grant(
 1132             self.bootstrapper.reader_role_id,
 1133             user_id=user['id'],
 1134             project_id=project['id']
 1135         )
 1136         with self.test_client() as c:
 1137             c.get(
 1138                 ('/v3/role_assignments?scope.project.id=%s&include_subtree' %
 1139                  self.project_id),
 1140                 headers=self.headers,
 1141                 expected_status_code=http.client.FORBIDDEN
 1142             )
 1143 
 1144 
 1145 class SystemReaderTests(base_classes.TestCaseWithBootstrap,
 1146                         common_auth.AuthTestMixin,
 1147                         _AssignmentTestUtilities,
 1148                         _SystemUserTests):
 1149 
 1150     def setUp(self):
 1151         super(SystemReaderTests, self).setUp()
 1152         self.loadapp()
 1153         self.useFixture(ksfixtures.Policy(self.config_fixture))
 1154         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1155 
 1156         system_reader = unit.new_user_ref(
 1157             domain_id=CONF.identity.default_domain_id
 1158         )
 1159         self.user_id = PROVIDERS.identity_api.create_user(
 1160             system_reader
 1161         )['id']
 1162         PROVIDERS.assignment_api.create_system_grant_for_user(
 1163             self.user_id, self.bootstrapper.reader_role_id
 1164         )
 1165         self.expected = [
 1166             # assignment of the user running the test case
 1167             {
 1168                 'user_id': self.user_id,
 1169                 'system': 'all',
 1170                 'role_id': self.bootstrapper.reader_role_id
 1171             }
 1172         ]
 1173 
 1174         auth = self.build_authentication_request(
 1175             user_id=self.user_id, password=system_reader['password'],
 1176             system=True
 1177         )
 1178 
 1179         # Grab a token using the persona we're testing and prepare headers
 1180         # for requests we'll be making in the tests.
 1181         with self.test_client() as c:
 1182             r = c.post('/v3/auth/tokens', json=auth)
 1183             self.token_id = r.headers['X-Subject-Token']
 1184             self.headers = {'X-Auth-Token': self.token_id}
 1185 
 1186 
 1187 class SystemMemberTests(base_classes.TestCaseWithBootstrap,
 1188                         common_auth.AuthTestMixin,
 1189                         _AssignmentTestUtilities,
 1190                         _SystemUserTests):
 1191 
 1192     def setUp(self):
 1193         super(SystemMemberTests, self).setUp()
 1194         self.loadapp()
 1195         self.useFixture(ksfixtures.Policy(self.config_fixture))
 1196         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1197 
 1198         system_member = unit.new_user_ref(
 1199             domain_id=CONF.identity.default_domain_id
 1200         )
 1201         self.user_id = PROVIDERS.identity_api.create_user(
 1202             system_member
 1203         )['id']
 1204         PROVIDERS.assignment_api.create_system_grant_for_user(
 1205             self.user_id, self.bootstrapper.member_role_id
 1206         )
 1207         self.expected = [
 1208             # assignment of the user running the test case
 1209             {
 1210                 'user_id': self.user_id,
 1211                 'system': 'all',
 1212                 'role_id': self.bootstrapper.member_role_id
 1213             }
 1214         ]
 1215 
 1216         auth = self.build_authentication_request(
 1217             user_id=self.user_id, password=system_member['password'],
 1218             system=True
 1219         )
 1220 
 1221         # Grab a token using the persona we're testing and prepare headers
 1222         # for requests we'll be making in the tests.
 1223         with self.test_client() as c:
 1224             r = c.post('/v3/auth/tokens', json=auth)
 1225             self.token_id = r.headers['X-Subject-Token']
 1226             self.headers = {'X-Auth-Token': self.token_id}
 1227 
 1228 
 1229 class SystemAdminTests(base_classes.TestCaseWithBootstrap,
 1230                        common_auth.AuthTestMixin,
 1231                        _AssignmentTestUtilities,
 1232                        _SystemUserTests):
 1233 
 1234     def setUp(self):
 1235         super(SystemAdminTests, self).setUp()
 1236         self.loadapp()
 1237         self.useFixture(ksfixtures.Policy(self.config_fixture))
 1238         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1239 
 1240         self.user_id = self.bootstrapper.admin_user_id
 1241         self.expected = []
 1242 
 1243         auth = self.build_authentication_request(
 1244             user_id=self.user_id, password=self.bootstrapper.admin_password,
 1245             system=True
 1246         )
 1247 
 1248         # Grab a token using the persona we're testing and prepare headers
 1249         # for requests we'll be making in the tests.
 1250         with self.test_client() as c:
 1251             r = c.post('/v3/auth/tokens', json=auth)
 1252             self.token_id = r.headers['X-Subject-Token']
 1253             self.headers = {'X-Auth-Token': self.token_id}
 1254 
 1255 
 1256 class DomainReaderTests(base_classes.TestCaseWithBootstrap,
 1257                         common_auth.AuthTestMixin,
 1258                         _AssignmentTestUtilities,
 1259                         _DomainUserTests):
 1260 
 1261     def setUp(self):
 1262         super(DomainReaderTests, self).setUp()
 1263         self.loadapp()
 1264         self.useFixture(ksfixtures.Policy(self.config_fixture))
 1265         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1266 
 1267         domain = PROVIDERS.resource_api.create_domain(
 1268             uuid.uuid4().hex, unit.new_domain_ref()
 1269         )
 1270         self.domain_id = domain['id']
 1271         domain_reader = unit.new_user_ref(domain_id=self.domain_id)
 1272         self.user_id = PROVIDERS.identity_api.create_user(domain_reader)['id']
 1273         PROVIDERS.assignment_api.create_grant(
 1274             self.bootstrapper.reader_role_id, user_id=self.user_id,
 1275             domain_id=self.domain_id
 1276         )
 1277         self.expected = [
 1278             # assignment of the user running the test case
 1279             {
 1280                 'user_id': self.user_id,
 1281                 'domain_id': self.domain_id,
 1282                 'role_id': self.bootstrapper.reader_role_id
 1283             }]
 1284 
 1285         auth = self.build_authentication_request(
 1286             user_id=self.user_id, password=domain_reader['password'],
 1287             domain_id=self.domain_id,
 1288         )
 1289 
 1290         # Grab a token using the persona we're testing and prepare headers
 1291         # for requests we'll be making in the tests.
 1292         with self.test_client() as c:
 1293             r = c.post('/v3/auth/tokens', json=auth)
 1294             self.token_id = r.headers['X-Subject-Token']
 1295             self.headers = {'X-Auth-Token': self.token_id}
 1296 
 1297 
 1298 class DomainMemberTests(base_classes.TestCaseWithBootstrap,
 1299                         common_auth.AuthTestMixin,
 1300                         _AssignmentTestUtilities,
 1301                         _DomainUserTests):
 1302 
 1303     def setUp(self):
 1304         super(DomainMemberTests, self).setUp()
 1305         self.loadapp()
 1306         self.useFixture(ksfixtures.Policy(self.config_fixture))
 1307         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1308 
 1309         domain = PROVIDERS.resource_api.create_domain(
 1310             uuid.uuid4().hex, unit.new_domain_ref()
 1311         )
 1312         self.domain_id = domain['id']
 1313         domain_user = unit.new_user_ref(domain_id=self.domain_id)
 1314         self.user_id = PROVIDERS.identity_api.create_user(domain_user)['id']
 1315         PROVIDERS.assignment_api.create_grant(
 1316             self.bootstrapper.member_role_id, user_id=self.user_id,
 1317             domain_id=self.domain_id
 1318         )
 1319         self.expected = [
 1320             # assignment of the user running the test case
 1321             {
 1322                 'user_id': self.user_id,
 1323                 'domain_id': self.domain_id,
 1324                 'role_id': self.bootstrapper.member_role_id
 1325             }]
 1326 
 1327         auth = self.build_authentication_request(
 1328             user_id=self.user_id, password=domain_user['password'],
 1329             domain_id=self.domain_id
 1330         )
 1331 
 1332         # Grab a token using the persona we're testing and prepare headers
 1333         # for requests we'll be making in the tests.
 1334         with self.test_client() as c:
 1335             r = c.post('/v3/auth/tokens', json=auth)
 1336             self.token_id = r.headers['X-Subject-Token']
 1337             self.headers = {'X-Auth-Token': self.token_id}
 1338 
 1339 
 1340 class DomainAdminTests(base_classes.TestCaseWithBootstrap,
 1341                        common_auth.AuthTestMixin,
 1342                        _AssignmentTestUtilities,
 1343                        _DomainUserTests):
 1344 
 1345     def _override_policy(self):
 1346         # TODO(lbragstad): Remove this once the deprecated policies in
 1347         # keystone.common.policies.role_assignment have been removed. This is
 1348         # only here to make sure we test the new policies instead of the
 1349         # deprecated ones. Oslo.policy will OR deprecated policies with new
 1350         # policies to maintain compatibility and give operators a chance to
 1351         # update permissions or update policies without breaking users. This
 1352         # will cause these specific tests to fail since we're trying to correct
 1353         # this broken behavior with better scope checking.
 1354         with open(self.policy_file_name, 'w') as f:
 1355             overridden_policies = {
 1356                 'identity:list_role_assignments': (
 1357                     rp.SYSTEM_READER_OR_DOMAIN_READER
 1358                 ),
 1359                 'identity:list_role_assignments_for_tree': (
 1360                     rp.SYSTEM_READER_OR_PROJECT_DOMAIN_READER_OR_PROJECT_ADMIN
 1361                 )
 1362             }
 1363             f.write(jsonutils.dumps(overridden_policies))
 1364 
 1365     def setUp(self):
 1366         super(DomainAdminTests, self).setUp()
 1367         self.loadapp()
 1368         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
 1369         self.policy_file_name = self.policy_file.file_name
 1370         self.useFixture(
 1371             ksfixtures.Policy(
 1372                 self.config_fixture, policy_file=self.policy_file_name
 1373             )
 1374         )
 1375         self._override_policy()
 1376         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1377 
 1378         domain = PROVIDERS.resource_api.create_domain(
 1379             uuid.uuid4().hex, unit.new_domain_ref()
 1380         )
 1381         self.domain_id = domain['id']
 1382         domain_admin = unit.new_user_ref(domain_id=self.domain_id)
 1383         self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
 1384         PROVIDERS.assignment_api.create_grant(
 1385             self.bootstrapper.admin_role_id, user_id=self.user_id,
 1386             domain_id=self.domain_id
 1387         )
 1388         self.expected = [
 1389             # assignment of the user running the test case
 1390             {
 1391                 'user_id': self.user_id,
 1392                 'domain_id': self.domain_id,
 1393                 'role_id': self.bootstrapper.admin_role_id
 1394             }]
 1395 
 1396         auth = self.build_authentication_request(
 1397             user_id=self.user_id, password=domain_admin['password'],
 1398             domain_id=self.domain_id,
 1399         )
 1400 
 1401         # Grab a token using the persona we're testing and prepare headers
 1402         # for requests we'll be making in the tests.
 1403         with self.test_client() as c:
 1404             r = c.post('/v3/auth/tokens', json=auth)
 1405             self.token_id = r.headers['X-Subject-Token']
 1406             self.headers = {'X-Auth-Token': self.token_id}
 1407 
 1408 
 1409 class ProjectReaderTests(base_classes.TestCaseWithBootstrap,
 1410                          common_auth.AuthTestMixin,
 1411                          _AssignmentTestUtilities,
 1412                          _ProjectUserTests,
 1413                          _ProjectReaderMemberTests):
 1414 
 1415     def setUp(self):
 1416         super(ProjectReaderTests, self).setUp()
 1417         self.loadapp()
 1418         self.useFixture(ksfixtures.Policy(self.config_fixture))
 1419         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1420 
 1421         domain = PROVIDERS.resource_api.create_domain(
 1422             uuid.uuid4().hex, unit.new_domain_ref()
 1423         )
 1424         self.domain_id = domain['id']
 1425 
 1426         project = unit.new_project_ref(domain_id=self.domain_id)
 1427         project = PROVIDERS.resource_api.create_project(project['id'], project)
 1428         self.project_id = project['id']
 1429 
 1430         project_reader = unit.new_user_ref(domain_id=self.domain_id)
 1431         self.user_id = PROVIDERS.identity_api.create_user(project_reader)['id']
 1432         PROVIDERS.assignment_api.create_grant(
 1433             self.bootstrapper.reader_role_id, user_id=self.user_id,
 1434             project_id=self.project_id
 1435         )
 1436 
 1437         self.expected = [
 1438             # assignment of the user running the test case
 1439             {
 1440                 'user_id': self.user_id,
 1441                 'project_id': self.project_id,
 1442                 'role_id': self.bootstrapper.reader_role_id
 1443             }]
 1444 
 1445         auth = self.build_authentication_request(
 1446             user_id=self.user_id, password=project_reader['password'],
 1447             project_id=self.project_id,
 1448         )
 1449 
 1450         # Grab a token using the persona we're testing and prepare headers
 1451         # for requests we'll be making in the tests.
 1452         with self.test_client() as c:
 1453             r = c.post('/v3/auth/tokens', json=auth)
 1454             self.token_id = r.headers['X-Subject-Token']
 1455             self.headers = {'X-Auth-Token': self.token_id}
 1456 
 1457 
 1458 class ProjectMemberTests(base_classes.TestCaseWithBootstrap,
 1459                          common_auth.AuthTestMixin,
 1460                          _AssignmentTestUtilities,
 1461                          _ProjectUserTests,
 1462                          _ProjectReaderMemberTests):
 1463 
 1464     def setUp(self):
 1465         super(ProjectMemberTests, self).setUp()
 1466         self.loadapp()
 1467         self.useFixture(ksfixtures.Policy(self.config_fixture))
 1468         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1469 
 1470         domain = PROVIDERS.resource_api.create_domain(
 1471             uuid.uuid4().hex, unit.new_domain_ref()
 1472         )
 1473         self.domain_id = domain['id']
 1474 
 1475         project = unit.new_project_ref(domain_id=self.domain_id)
 1476         project = PROVIDERS.resource_api.create_project(project['id'], project)
 1477         self.project_id = project['id']
 1478 
 1479         project_member = unit.new_user_ref(domain_id=self.domain_id)
 1480         self.user_id = PROVIDERS.identity_api.create_user(project_member)['id']
 1481         PROVIDERS.assignment_api.create_grant(
 1482             self.bootstrapper.member_role_id, user_id=self.user_id,
 1483             project_id=self.project_id
 1484         )
 1485 
 1486         self.expected = [
 1487             # assignment of the user running the test case
 1488             {
 1489                 'user_id': self.user_id,
 1490                 'project_id': self.project_id,
 1491                 'role_id': self.bootstrapper.member_role_id
 1492             }]
 1493 
 1494         auth = self.build_authentication_request(
 1495             user_id=self.user_id, password=project_member['password'],
 1496             project_id=self.project_id,
 1497         )
 1498 
 1499         # Grab a token using the persona we're testing and prepare headers
 1500         # for requests we'll be making in the tests.
 1501         with self.test_client() as c:
 1502             r = c.post('/v3/auth/tokens', json=auth)
 1503             self.token_id = r.headers['X-Subject-Token']
 1504             self.headers = {'X-Auth-Token': self.token_id}
 1505 
 1506 
 1507 class ProjectAdminTests(base_classes.TestCaseWithBootstrap,
 1508                         common_auth.AuthTestMixin,
 1509                         _AssignmentTestUtilities,
 1510                         _ProjectUserTests):
 1511 
 1512     def setUp(self):
 1513         super(ProjectAdminTests, self).setUp()
 1514         self.loadapp()
 1515         self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
 1516         self.policy_file_name = self.policy_file.file_name
 1517         self.useFixture(
 1518             ksfixtures.Policy(
 1519                 self.config_fixture, policy_file=self.policy_file_name
 1520             )
 1521         )
 1522         self._override_policy()
 1523         self.config_fixture.config(group='oslo_policy', enforce_scope=True)
 1524 
 1525         domain = PROVIDERS.resource_api.create_domain(
 1526             uuid.uuid4().hex, unit.new_domain_ref()
 1527         )
 1528         self.domain_id = domain['id']
 1529 
 1530         self.user_id = self.bootstrapper.admin_user_id
 1531 
 1532         project = unit.new_project_ref(domain_id=self.domain_id)
 1533         project = PROVIDERS.resource_api.create_project(project['id'], project)
 1534         self.project_id = project['id']
 1535 
 1536         PROVIDERS.assignment_api.create_grant(
 1537             self.bootstrapper.admin_role_id, user_id=self.user_id,
 1538             project_id=self.project_id
 1539         )
 1540 
 1541         self.expected = [
 1542             # assignment of the user running the test case
 1543             {
 1544                 'user_id': self.user_id,
 1545                 'project_id': self.project_id,
 1546                 'role_id': self.bootstrapper.admin_role_id
 1547             }]
 1548 
 1549         auth = self.build_authentication_request(
 1550             user_id=self.user_id,
 1551             password=self.bootstrapper.admin_password,
 1552             project_id=self.project_id
 1553         )
 1554 
 1555         # Grab a token using the persona we're testing and prepare headers
 1556         # for requests we'll be making in the tests.
 1557         with self.test_client() as c:
 1558             r = c.post('/v3/auth/tokens', json=auth)
 1559             self.token_id = r.headers['X-Subject-Token']
 1560             self.headers = {'X-Auth-Token': self.token_id}
 1561 
 1562     def _override_policy(self):
 1563         # TODO(lbragstad): Remove this once the deprecated policies in
 1564         # keystone.common.policies.role_assignment have been removed. This is
 1565         # only here to make sure we test the new policies instead of the
 1566         # deprecated ones. Oslo.policy will OR deprecated policies with new
 1567         # policies to maintain compatibility and give operators a chance to
 1568         # update permissions or update policies without breaking users. This
 1569         # will cause these specific tests to fail since we're trying to correct
 1570         # this broken behavior with better scope checking.
 1571         with open(self.policy_file_name, 'w') as f:
 1572             overridden_policies = {
 1573                 'identity:list_role_assignments': (
 1574                     rp.SYSTEM_READER_OR_DOMAIN_READER
 1575                 ),
 1576                 'identity:list_role_assignments_for_tree': (
 1577                     rp.SYSTEM_READER_OR_PROJECT_DOMAIN_READER_OR_PROJECT_ADMIN
 1578                 )
 1579             }
 1580             f.write(jsonutils.dumps(overridden_policies))
 1581 
 1582     def test_user_can_list_assignments_for_subtree_on_own_project(self):
 1583         user = PROVIDERS.identity_api.create_user(
 1584             unit.new_user_ref(domain_id=self.domain_id)
 1585         )
 1586         project = PROVIDERS.resource_api.create_project(
 1587             uuid.uuid4().hex,
 1588             unit.new_project_ref(domain_id=self.domain_id,
 1589                                  parent_id=self.project_id)
 1590         )
 1591         PROVIDERS.assignment_api.create_grant(
 1592             self.bootstrapper.reader_role_id,
 1593             user_id=user['id'],
 1594             project_id=project['id']
 1595         )
 1596         expected = copy.copy(self.expected)
 1597         expected.append({
 1598             'project_id': project['id'],
 1599             'user_id': user['id'],
 1600             'role_id': self.bootstrapper.reader_role_id
 1601         })
 1602         with self.test_client() as c:
 1603             r = c.get(
 1604                 ('/v3/role_assignments?scope.project.id=%s&include_subtree' %
 1605                  self.project_id),
 1606                 headers=self.headers
 1607             )
 1608             self.assertEqual(len(expected), len(r.json['role_assignments']))
 1609             actual = self._extract_role_assignments_from_response_body(r)
 1610             for assignment in actual:
 1611                 self.assertIn(assignment, expected)
 1612 
 1613     def test_user_cannot_list_assignments_for_subtree_on_other_project(self):
 1614         user = PROVIDERS.identity_api.create_user(
 1615             unit.new_user_ref(domain_id=self.domain_id)
 1616         )
 1617         project = PROVIDERS.resource_api.create_project(
 1618             uuid.uuid4().hex,
 1619             unit.new_project_ref(domain_id=self.domain_id)
 1620         )
 1621         PROVIDERS.assignment_api.create_grant(
 1622             self.bootstrapper.reader_role_id,
 1623             user_id=user['id'],
 1624             project_id=project['id']
 1625         )
 1626         with self.test_client() as c:
 1627             c.get(
 1628                 ('/v3/role_assignments?scope.project.id=%s&include_subtree' %
 1629                  project['id']),
 1630                 headers=self.headers,
 1631                 expected_status_code=http.client.FORBIDDEN
 1632             )