"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/test_v3.py" (13 May 2020, 58379 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_v3.py": 16.0.1_vs_17.0.0.

    1 # Copyright 2013 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import datetime
   16 import uuid
   17 
   18 import http.client
   19 import oslo_context.context
   20 from oslo_serialization import jsonutils
   21 from testtools import matchers
   22 import webtest
   23 
   24 from keystone.common import authorization
   25 from keystone.common import cache
   26 from keystone.common import provider_api
   27 from keystone.common.validation import validators
   28 from keystone import exception
   29 from keystone.resource.backends import base as resource_base
   30 from keystone.server.flask.request_processing.middleware import auth_context
   31 from keystone.tests.common import auth as common_auth
   32 from keystone.tests import unit
   33 from keystone.tests.unit import rest
   34 
   35 
   36 PROVIDERS = provider_api.ProviderAPIs
   37 DEFAULT_DOMAIN_ID = 'default'
   38 
   39 TIME_FORMAT = unit.TIME_FORMAT
   40 
   41 
   42 class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
   43                       common_auth.AuthTestMixin):
   44 
   45     def generate_token_schema(self, system_scoped=False, domain_scoped=False,
   46                               project_scoped=False):
   47         """Return a dictionary of token properties to validate against."""
   48         ROLES_SCHEMA = {
   49             'type': 'array',
   50             'items': {
   51                 'type': 'object',
   52                 'properties': {
   53                     'id': {'type': 'string', },
   54                     'name': {'type': 'string', },
   55                     'description': {'type': 'string', },
   56                     'options': {'type': 'object', }
   57                 },
   58                 'required': ['id', 'name', ],
   59                 'additionalProperties': False,
   60             },
   61             'minItems': 1,
   62         }
   63 
   64         properties = {
   65             'audit_ids': {
   66                 'type': 'array',
   67                 'items': {
   68                     'type': 'string',
   69                 },
   70                 'minItems': 1,
   71                 'maxItems': 2,
   72             },
   73             'expires_at': {
   74                 'type': 'string',
   75                 'pattern': unit.TIME_FORMAT_REGEX,
   76             },
   77             'issued_at': {
   78                 'type': 'string',
   79                 'pattern': unit.TIME_FORMAT_REGEX,
   80             },
   81             'methods': {
   82                 'type': 'array',
   83                 'items': {
   84                     'type': 'string',
   85                 },
   86             },
   87             'user': {
   88                 'type': 'object',
   89                 'required': ['id', 'name', 'domain', 'password_expires_at'],
   90                 'properties': {
   91                     'id': {'type': 'string'},
   92                     'name': {'type': 'string'},
   93                     'domain': {
   94                         'type': 'object',
   95                         'properties': {
   96                             'id': {'type': 'string'},
   97                             'name': {'type': 'string'}
   98                         },
   99                         'required': ['id', 'name'],
  100                         'additonalProperties': False,
  101                     },
  102                     'password_expires_at': {
  103                         'type': ['string', 'null'],
  104                         'pattern': unit.TIME_FORMAT_REGEX,
  105                     }
  106                 },
  107                 'additionalProperties': False,
  108             }
  109         }
  110 
  111         if system_scoped:
  112             properties['catalog'] = {'type': 'array'}
  113             properties['system'] = {
  114                 'type': 'object',
  115                 'properties': {
  116                     'all': {'type': 'boolean'}
  117                 }
  118             }
  119             properties['roles'] = ROLES_SCHEMA
  120         elif domain_scoped:
  121             properties['catalog'] = {'type': 'array'}
  122             properties['roles'] = ROLES_SCHEMA
  123             properties['domain'] = {
  124                 'type': 'object',
  125                 'required': ['id', 'name'],
  126                 'properties': {
  127                     'id': {'type': 'string'},
  128                     'name': {'type': 'string'}
  129                 },
  130                 'additionalProperties': False
  131             }
  132         elif project_scoped:
  133             properties['is_admin_project'] = {'type': 'boolean'}
  134             properties['catalog'] = {'type': 'array'}
  135             # FIXME(lbragstad): Remove this in favor of the predefined
  136             # ROLES_SCHEMA dictionary once bug 1763510 is fixed.
  137             ROLES_SCHEMA['items']['properties']['domain_id'] = {
  138                 'type': ['null', 'string', ],
  139             }
  140             properties['roles'] = ROLES_SCHEMA
  141             properties['is_domain'] = {'type': 'boolean'}
  142             properties['project'] = {
  143                 'type': ['object'],
  144                 'required': ['id', 'name', 'domain'],
  145                 'properties': {
  146                     'id': {'type': 'string'},
  147                     'name': {'type': 'string'},
  148                     'domain': {
  149                         'type': ['object'],
  150                         'required': ['id', 'name'],
  151                         'properties': {
  152                             'id': {'type': 'string'},
  153                             'name': {'type': 'string'}
  154                         },
  155                         'additionalProperties': False
  156                     }
  157                 },
  158                 'additionalProperties': False
  159             }
  160 
  161         schema = {
  162             'type': 'object',
  163             'properties': properties,
  164             'required': ['audit_ids', 'expires_at', 'issued_at', 'methods',
  165                          'user'],
  166             'optional': [],
  167             'additionalProperties': False
  168         }
  169 
  170         if system_scoped:
  171             schema['required'].extend(['system', 'roles'])
  172             schema['optional'].append('catalog')
  173         elif domain_scoped:
  174             schema['required'].extend(['domain', 'roles'])
  175             schema['optional'].append('catalog')
  176         elif project_scoped:
  177             schema['required'].append('project')
  178             schema['optional'].append('catalog')
  179             schema['optional'].append('OS-TRUST:trust')
  180             schema['optional'].append('is_admin_project')
  181             schema['optional'].append('is_domain')
  182 
  183         return schema
  184 
  185     def config_files(self):
  186         config_files = super(RestfulTestCase, self).config_files()
  187         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
  188         return config_files
  189 
  190     def setUp(self):
  191         """Setup for v3 Restful Test Cases."""
  192         super(RestfulTestCase, self).setUp()
  193 
  194         self.empty_context = {'environment': {}}
  195 
  196     def load_backends(self):
  197         # ensure the cache region instance is setup
  198         cache.configure_cache()
  199 
  200         super(RestfulTestCase, self).load_backends()
  201 
  202     def load_fixtures(self, fixtures):
  203         self.load_sample_data()
  204 
  205     def _populate_default_domain(self):
  206         try:
  207             PROVIDERS.resource_api.get_domain(DEFAULT_DOMAIN_ID)
  208         except exception.DomainNotFound:
  209             root_domain = unit.new_domain_ref(
  210                 id=resource_base.NULL_DOMAIN_ID,
  211                 name=resource_base.NULL_DOMAIN_ID
  212             )
  213             PROVIDERS.resource_api.create_domain(resource_base.NULL_DOMAIN_ID,
  214                                                  root_domain)
  215             domain = unit.new_domain_ref(
  216                 description=(u'The default domain'),
  217                 id=DEFAULT_DOMAIN_ID,
  218                 name=u'Default')
  219             PROVIDERS.resource_api.create_domain(DEFAULT_DOMAIN_ID, domain)
  220 
  221     def load_sample_data(self, create_region_and_endpoints=True):
  222         self._populate_default_domain()
  223         self.domain = unit.new_domain_ref()
  224         self.domain_id = self.domain['id']
  225         PROVIDERS.resource_api.create_domain(self.domain_id, self.domain)
  226 
  227         self.project = unit.new_project_ref(domain_id=self.domain_id)
  228         self.project_id = self.project['id']
  229         self.project = PROVIDERS.resource_api.create_project(
  230             self.project_id, self.project
  231         )
  232 
  233         self.user = unit.create_user(PROVIDERS.identity_api,
  234                                      domain_id=self.domain_id)
  235         self.user_id = self.user['id']
  236 
  237         self.default_domain_project_id = uuid.uuid4().hex
  238         self.default_domain_project = unit.new_project_ref(
  239             domain_id=DEFAULT_DOMAIN_ID)
  240         self.default_domain_project['id'] = self.default_domain_project_id
  241         PROVIDERS.resource_api.create_project(
  242             self.default_domain_project_id, self.default_domain_project
  243         )
  244 
  245         self.default_domain_user = unit.create_user(
  246             PROVIDERS.identity_api,
  247             domain_id=DEFAULT_DOMAIN_ID)
  248         self.default_domain_user_id = self.default_domain_user['id']
  249 
  250         # create & grant policy.json's default role for admin_required
  251         self.role = unit.new_role_ref(name='admin')
  252         self.role_id = self.role['id']
  253         PROVIDERS.role_api.create_role(self.role_id, self.role)
  254         PROVIDERS.assignment_api.add_role_to_user_and_project(
  255             self.user_id, self.project_id, self.role_id)
  256         PROVIDERS.assignment_api.add_role_to_user_and_project(
  257             self.default_domain_user_id, self.default_domain_project_id,
  258             self.role_id)
  259         PROVIDERS.assignment_api.add_role_to_user_and_project(
  260             self.default_domain_user_id, self.project_id,
  261             self.role_id)
  262 
  263         # Create "req_admin" user for simulating a real user instead of the
  264         # admin_token_auth middleware
  265         self.user_reqadmin = unit.create_user(PROVIDERS.identity_api,
  266                                               DEFAULT_DOMAIN_ID)
  267         PROVIDERS.assignment_api.add_role_to_user_and_project(
  268             self.user_reqadmin['id'],
  269             self.default_domain_project_id,
  270             self.role_id)
  271 
  272         if create_region_and_endpoints:
  273             self.region = unit.new_region_ref()
  274             self.region_id = self.region['id']
  275             PROVIDERS.catalog_api.create_region(self.region)
  276 
  277             self.service = unit.new_service_ref()
  278             self.service_id = self.service['id']
  279             PROVIDERS.catalog_api.create_service(
  280                 self.service_id, self.service.copy()
  281             )
  282 
  283             self.endpoint = unit.new_endpoint_ref(service_id=self.service_id,
  284                                                   interface='public',
  285                                                   region_id=self.region_id)
  286             self.endpoint_id = self.endpoint['id']
  287             PROVIDERS.catalog_api.create_endpoint(
  288                 self.endpoint_id, self.endpoint.copy()
  289             )
  290             # The server adds 'enabled' and defaults to True.
  291             self.endpoint['enabled'] = True
  292 
  293     def create_new_default_project_for_user(self, user_id, domain_id,
  294                                             enable_project=True):
  295         ref = unit.new_project_ref(domain_id=domain_id, enabled=enable_project)
  296         r = self.post('/projects', body={'project': ref})
  297         project = self.assertValidProjectResponse(r, ref)
  298         # set the user's preferred project
  299         body = {'user': {'default_project_id': project['id']}}
  300         r = self.patch('/users/%(user_id)s' % {
  301             'user_id': user_id},
  302             body=body)
  303         self.assertValidUserResponse(r)
  304 
  305         return project
  306 
  307     def get_admin_token(self):
  308         """Convenience method so that we can test authenticated requests."""
  309         r = self.admin_request(
  310             method='POST',
  311             path='/v3/auth/tokens',
  312             body={
  313                 'auth': {
  314                     'identity': {
  315                         'methods': ['password'],
  316                         'password': {
  317                             'user': {
  318                                 'name': self.user_reqadmin['name'],
  319                                 'password': self.user_reqadmin['password'],
  320                                 'domain': {
  321                                     'id': self.user_reqadmin['domain_id']
  322                                 }
  323                             }
  324                         }
  325                     },
  326                     'scope': {
  327                         'project': {
  328                             'id': self.default_domain_project_id,
  329                         }
  330                     }
  331                 }
  332             })
  333         return r.headers.get('X-Subject-Token')
  334 
  335     def get_unscoped_token(self):
  336         """Convenience method so that we can test authenticated requests."""
  337         r = self.admin_request(
  338             method='POST',
  339             path='/v3/auth/tokens',
  340             body={
  341                 'auth': {
  342                     'identity': {
  343                         'methods': ['password'],
  344                         'password': {
  345                             'user': {
  346                                 'name': self.user['name'],
  347                                 'password': self.user['password'],
  348                                 'domain': {
  349                                     'id': self.user['domain_id']
  350                                 }
  351                             }
  352                         }
  353                     }
  354                 }
  355             })
  356         return r.headers.get('X-Subject-Token')
  357 
  358     def get_scoped_token(self):
  359         """Convenience method so that we can test authenticated requests."""
  360         r = self.admin_request(
  361             method='POST',
  362             path='/v3/auth/tokens',
  363             body={
  364                 'auth': {
  365                     'identity': {
  366                         'methods': ['password'],
  367                         'password': {
  368                             'user': {
  369                                 'name': self.user['name'],
  370                                 'password': self.user['password'],
  371                                 'domain': {
  372                                     'id': self.user['domain_id']
  373                                 }
  374                             }
  375                         }
  376                     },
  377                     'scope': {
  378                         'project': {
  379                             'id': self.project['id'],
  380                         }
  381                     }
  382                 }
  383             })
  384         return r.headers.get('X-Subject-Token')
  385 
  386     def get_system_scoped_token(self):
  387         """Convenience method for requesting system scoped tokens."""
  388         r = self.admin_request(
  389             method='POST',
  390             path='/v3/auth/tokens',
  391             body={
  392                 'auth': {
  393                     'identity': {
  394                         'methods': ['password'],
  395                         'password': {
  396                             'user': {
  397                                 'name': self.user['name'],
  398                                 'password': self.user['password'],
  399                                 'domain': {
  400                                     'id': self.user['domain_id']
  401                                 }
  402                             }
  403                         }
  404                     },
  405                     'scope': {
  406                         'system': {'all': True}
  407                     }
  408                 }
  409             })
  410         return r.headers.get('X-Subject-Token')
  411 
  412     def get_domain_scoped_token(self):
  413         """Convenience method for requesting domain scoped token."""
  414         r = self.admin_request(
  415             method='POST',
  416             path='/v3/auth/tokens',
  417             body={
  418                 'auth': {
  419                     'identity': {
  420                         'methods': ['password'],
  421                         'password': {
  422                             'user': {
  423                                 'name': self.user['name'],
  424                                 'password': self.user['password'],
  425                                 'domain': {
  426                                     'id': self.user['domain_id']
  427                                 }
  428                             }
  429                         }
  430                     },
  431                     'scope': {
  432                         'domain': {
  433                             'id': self.domain['id'],
  434                         }
  435                     }
  436                 }
  437             })
  438         return r.headers.get('X-Subject-Token')
  439 
  440     def get_requested_token(self, auth):
  441         """Request the specific token we want."""
  442         r = self.v3_create_token(auth)
  443         return r.headers.get('X-Subject-Token')
  444 
  445     def v3_create_token(self, auth, expected_status=http.client.CREATED):
  446         return self.admin_request(method='POST',
  447                                   path='/v3/auth/tokens',
  448                                   body=auth,
  449                                   expected_status=expected_status)
  450 
  451     def v3_noauth_request(self, path, **kwargs):
  452         # request does not require auth token header
  453         path = '/v3' + path
  454         return self.admin_request(path=path, **kwargs)
  455 
  456     def v3_request(self, path, **kwargs):
  457         # check to see if caller requires token for the API call.
  458         if kwargs.pop('noauth', None):
  459             return self.v3_noauth_request(path, **kwargs)
  460 
  461         # Check if the caller has passed in auth details for
  462         # use in requesting the token
  463         auth_arg = kwargs.pop('auth', None)
  464         if auth_arg:
  465             token = self.get_requested_token(auth_arg)
  466         else:
  467             token = kwargs.pop('token', None)
  468             if not token:
  469                 token = self.get_scoped_token()
  470         path = '/v3' + path
  471 
  472         return self.admin_request(path=path, token=token, **kwargs)
  473 
  474     def get(self, path, expected_status=http.client.OK, **kwargs):
  475         return self.v3_request(path, method='GET',
  476                                expected_status=expected_status, **kwargs)
  477 
  478     def head(self, path, expected_status=http.client.NO_CONTENT, **kwargs):
  479         r = self.v3_request(path, method='HEAD',
  480                             expected_status=expected_status, **kwargs)
  481         self.assertEqual(b'', r.body)
  482         return r
  483 
  484     def post(self, path, expected_status=http.client.CREATED, **kwargs):
  485         return self.v3_request(path, method='POST',
  486                                expected_status=expected_status, **kwargs)
  487 
  488     def put(self, path, expected_status=http.client.NO_CONTENT, **kwargs):
  489         return self.v3_request(path, method='PUT',
  490                                expected_status=expected_status, **kwargs)
  491 
  492     def patch(self, path, expected_status=http.client.OK, **kwargs):
  493         return self.v3_request(path, method='PATCH',
  494                                expected_status=expected_status, **kwargs)
  495 
  496     def delete(self, path, expected_status=http.client.NO_CONTENT, **kwargs):
  497         return self.v3_request(path, method='DELETE',
  498                                expected_status=expected_status, **kwargs)
  499 
  500     def assertValidErrorResponse(self, r):
  501         resp = r.result
  502         self.assertIsNotNone(resp.get('error'))
  503         self.assertIsNotNone(resp['error'].get('code'))
  504         self.assertIsNotNone(resp['error'].get('title'))
  505         self.assertIsNotNone(resp['error'].get('message'))
  506         self.assertEqual(int(resp['error']['code']), r.status_code)
  507 
  508     def assertValidListLinks(self, links, resource_url=None):
  509         self.assertIsNotNone(links)
  510         self.assertIsNotNone(links.get('self'))
  511         self.assertThat(links['self'], matchers.StartsWith('http://localhost'))
  512 
  513         if resource_url:
  514             self.assertThat(links['self'], matchers.EndsWith(resource_url))
  515 
  516         self.assertIn('next', links)
  517         if links['next'] is not None:
  518             self.assertThat(links['next'],
  519                             matchers.StartsWith('http://localhost'))
  520 
  521         self.assertIn('previous', links)
  522         if links['previous'] is not None:
  523             self.assertThat(links['previous'],
  524                             matchers.StartsWith('http://localhost'))
  525 
  526     def assertValidListResponse(self, resp, key, entity_validator, ref=None,
  527                                 expected_length=None, keys_to_check=None,
  528                                 resource_url=None):
  529         """Make assertions common to all API list responses.
  530 
  531         If a reference is provided, it's ID will be searched for in the
  532         response, and asserted to be equal.
  533 
  534         """
  535         entities = resp.result.get(key)
  536         self.assertIsNotNone(entities)
  537 
  538         if expected_length is not None:
  539             self.assertEqual(expected_length, len(entities))
  540         elif ref is not None:
  541             # we're at least expecting the ref
  542             self.assertNotEmpty(entities)
  543 
  544         # collections should have relational links
  545         self.assertValidListLinks(resp.result.get('links'),
  546                                   resource_url=resource_url)
  547 
  548         for entity in entities:
  549             self.assertIsNotNone(entity)
  550             self.assertValidEntity(entity, keys_to_check=keys_to_check)
  551             entity_validator(entity)
  552         if ref:
  553             entity = [x for x in entities if x['id'] == ref['id']][0]
  554             self.assertValidEntity(entity, ref=ref,
  555                                    keys_to_check=keys_to_check)
  556             entity_validator(entity, ref)
  557         return entities
  558 
  559     def assertValidResponse(self, resp, key, entity_validator, *args,
  560                             **kwargs):
  561         """Make assertions common to all API responses."""
  562         entity = resp.result.get(key)
  563         self.assertIsNotNone(entity)
  564         keys = kwargs.pop('keys_to_check', None)
  565         self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs)
  566         entity_validator(entity, *args, **kwargs)
  567         return entity
  568 
  569     def assertValidEntity(self, entity, ref=None, keys_to_check=None):
  570         """Make assertions common to all API entities.
  571 
  572         If a reference is provided, the entity will also be compared against
  573         the reference.
  574         """
  575         if keys_to_check is not None:
  576             keys = keys_to_check
  577         else:
  578             keys = ['name', 'description', 'enabled']
  579 
  580         for k in ['id'] + keys:
  581             msg = '%s unexpectedly None in %s' % (k, entity)
  582             self.assertIsNotNone(entity.get(k), msg)
  583 
  584         self.assertIsNotNone(entity.get('links'))
  585         self.assertIsNotNone(entity['links'].get('self'))
  586         self.assertThat(entity['links']['self'],
  587                         matchers.StartsWith('http://localhost'))
  588         self.assertIn(entity['id'], entity['links']['self'])
  589 
  590         if ref:
  591             for k in keys:
  592                 msg = '%s not equal: %s != %s' % (k, ref[k], entity[k])
  593                 self.assertEqual(ref[k], entity[k])
  594 
  595         return entity
  596 
  597     # auth validation
  598 
  599     def assertValidISO8601ExtendedFormatDatetime(self, dt):
  600         try:
  601             return datetime.datetime.strptime(dt, TIME_FORMAT)
  602         except Exception:
  603             msg = '%s is not a valid ISO 8601 extended format date time.' % dt
  604             raise AssertionError(msg)
  605 
  606     def assertValidTokenResponse(self, r, user=None, forbid_token_id=False):
  607         if forbid_token_id:
  608             self.assertNotIn('X-Subject-Token', r.headers)
  609         else:
  610             self.assertTrue(r.headers.get('X-Subject-Token'))
  611         token = r.result['token']
  612 
  613         self.assertIsNotNone(token.get('expires_at'))
  614         expires_at = self.assertValidISO8601ExtendedFormatDatetime(
  615             token['expires_at'])
  616         self.assertIsNotNone(token.get('issued_at'))
  617         issued_at = self.assertValidISO8601ExtendedFormatDatetime(
  618             token['issued_at'])
  619         self.assertLess(issued_at, expires_at)
  620 
  621         self.assertIn('user', token)
  622         self.assertIn('id', token['user'])
  623         self.assertIn('name', token['user'])
  624         self.assertIn('domain', token['user'])
  625         self.assertIn('id', token['user']['domain'])
  626 
  627         if user is not None:
  628             self.assertEqual(user['id'], token['user']['id'])
  629             self.assertEqual(user['name'], token['user']['name'])
  630             self.assertEqual(user['domain_id'], token['user']['domain']['id'])
  631 
  632         return token
  633 
  634     def assertValidUnscopedTokenResponse(self, r, *args, **kwargs):
  635         token = self.assertValidTokenResponse(r, *args, **kwargs)
  636         validator_object = validators.SchemaValidator(
  637             self.generate_token_schema()
  638         )
  639         validator_object.validate(token)
  640 
  641         return token
  642 
  643     def assertValidScopedTokenResponse(self, r, *args, **kwargs):
  644         require_catalog = kwargs.pop('require_catalog', True)
  645         endpoint_filter = kwargs.pop('endpoint_filter', False)
  646         ep_filter_assoc = kwargs.pop('ep_filter_assoc', 0)
  647         is_admin_project = kwargs.pop('is_admin_project', None)
  648         token = self.assertValidTokenResponse(r, *args, **kwargs)
  649 
  650         if require_catalog:
  651             endpoint_num = 0
  652             self.assertIn('catalog', token)
  653 
  654             if isinstance(token['catalog'], list):
  655                 # only test JSON
  656                 for service in token['catalog']:
  657                     for endpoint in service['endpoints']:
  658                         self.assertNotIn('enabled', endpoint)
  659                         self.assertNotIn('legacy_endpoint_id', endpoint)
  660                         self.assertNotIn('service_id', endpoint)
  661                         endpoint_num += 1
  662 
  663             # sub test for the OS-EP-FILTER extension enabled
  664             if endpoint_filter:
  665                 self.assertEqual(ep_filter_assoc, endpoint_num)
  666         else:
  667             self.assertNotIn('catalog', token)
  668 
  669         self.assertIn('roles', token)
  670         self.assertTrue(token['roles'])
  671         for role in token['roles']:
  672             self.assertIn('id', role)
  673             self.assertIn('name', role)
  674 
  675         # NOTE(samueldmq): We want to explicitly test for boolean or None
  676         self.assertIs(is_admin_project, token.get('is_admin_project'))
  677 
  678         return token
  679 
  680     def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs):
  681         token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
  682 
  683         project_scoped_token_schema = self.generate_token_schema(
  684             project_scoped=True)
  685 
  686         if token.get('OS-TRUST:trust'):
  687             trust_properties = {
  688                 'OS-TRUST:trust': {
  689                     'type': ['object'],
  690                     'required': ['id', 'impersonation', 'trustor_user',
  691                                  'trustee_user'],
  692                     'properties': {
  693                         'id': {'type': 'string'},
  694                         'impersonation': {'type': 'boolean'},
  695                         'trustor_user': {
  696                             'type': 'object',
  697                             'required': ['id'],
  698                             'properties': {
  699                                 'id': {'type': 'string'}
  700                             },
  701                             'additionalProperties': False
  702                         },
  703                         'trustee_user': {
  704                             'type': 'object',
  705                             'required': ['id'],
  706                             'properties': {
  707                                 'id': {'type': 'string'}
  708                             },
  709                             'additionalProperties': False
  710                         }
  711                     },
  712                     'additionalProperties': False
  713                 }
  714             }
  715             project_scoped_token_schema['properties'].update(trust_properties)
  716 
  717         validator_object = validators.SchemaValidator(
  718             project_scoped_token_schema)
  719         validator_object.validate(token)
  720 
  721         self.assertEqual(self.role_id, token['roles'][0]['id'])
  722 
  723         return token
  724 
  725     def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs):
  726         token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
  727 
  728         validator_object = validators.SchemaValidator(
  729             self.generate_token_schema(domain_scoped=True)
  730         )
  731         validator_object.validate(token)
  732 
  733         return token
  734 
  735     def assertValidSystemScopedTokenResponse(self, r, *args, **kwargs):
  736         token = self.assertValidTokenResponse(r)
  737         self.assertTrue(token['system']['all'])
  738 
  739         system_scoped_token_schema = self.generate_token_schema(
  740             system_scoped=True
  741         )
  742         validator_object = validators.SchemaValidator(
  743             system_scoped_token_schema
  744         )
  745         validator_object.validate(token)
  746 
  747         return token
  748 
  749     # catalog validation
  750 
  751     def assertValidCatalogResponse(self, resp, *args, **kwargs):
  752         self.assertEqual(set(['catalog', 'links']), set(resp.json.keys()))
  753         self.assertValidCatalog(resp.json['catalog'])
  754         self.assertIn('links', resp.json)
  755         self.assertIsInstance(resp.json['links'], dict)
  756         self.assertEqual(['self'], list(resp.json['links'].keys()))
  757         self.assertEqual(
  758             'http://localhost/v3/auth/catalog',
  759             resp.json['links']['self'])
  760 
  761     def assertValidCatalog(self, entity):
  762         self.assertIsInstance(entity, list)
  763         self.assertGreater(len(entity), 0)
  764         for service in entity:
  765             self.assertIsNotNone(service.get('id'))
  766             self.assertIsNotNone(service.get('name'))
  767             self.assertIsNotNone(service.get('type'))
  768             self.assertNotIn('enabled', service)
  769             self.assertGreater(len(service['endpoints']), 0)
  770             for endpoint in service['endpoints']:
  771                 self.assertIsNotNone(endpoint.get('id'))
  772                 self.assertIsNotNone(endpoint.get('interface'))
  773                 self.assertIsNotNone(endpoint.get('url'))
  774                 self.assertNotIn('enabled', endpoint)
  775                 self.assertNotIn('legacy_endpoint_id', endpoint)
  776                 self.assertNotIn('service_id', endpoint)
  777 
  778     # region validation
  779 
  780     def assertValidRegionListResponse(self, resp, *args, **kwargs):
  781         # NOTE(jaypipes): I have to pass in a blank keys_to_check parameter
  782         #                 below otherwise the base assertValidEntity method
  783         #                 tries to find a "name" and an "enabled" key in the
  784         #                 returned ref dicts. The issue is, I don't understand
  785         #                 how the service and endpoint entity assertions below
  786         #                 actually work (they don't raise assertions), since
  787         #                 AFAICT, the service and endpoint tables don't have
  788         #                 a "name" column either... :(
  789         return self.assertValidListResponse(
  790             resp,
  791             'regions',
  792             self.assertValidRegion,
  793             keys_to_check=[],
  794             *args,
  795             **kwargs)
  796 
  797     def assertValidRegionResponse(self, resp, *args, **kwargs):
  798         return self.assertValidResponse(
  799             resp,
  800             'region',
  801             self.assertValidRegion,
  802             keys_to_check=[],
  803             *args,
  804             **kwargs)
  805 
  806     def assertValidRegion(self, entity, ref=None):
  807         self.assertIsNotNone(entity.get('description'))
  808         if ref:
  809             self.assertEqual(ref['description'], entity['description'])
  810         return entity
  811 
  812     # service validation
  813 
  814     def assertValidServiceListResponse(self, resp, *args, **kwargs):
  815         return self.assertValidListResponse(
  816             resp,
  817             'services',
  818             self.assertValidService,
  819             *args,
  820             **kwargs)
  821 
  822     def assertValidServiceResponse(self, resp, *args, **kwargs):
  823         return self.assertValidResponse(
  824             resp,
  825             'service',
  826             self.assertValidService,
  827             *args,
  828             **kwargs)
  829 
  830     def assertValidService(self, entity, ref=None):
  831         self.assertIsNotNone(entity.get('type'))
  832         self.assertIsInstance(entity.get('enabled'), bool)
  833         if ref:
  834             self.assertEqual(ref['type'], entity['type'])
  835         return entity
  836 
  837     # endpoint validation
  838 
  839     def assertValidEndpointListResponse(self, resp, *args, **kwargs):
  840         return self.assertValidListResponse(
  841             resp,
  842             'endpoints',
  843             self.assertValidEndpoint,
  844             *args,
  845             **kwargs)
  846 
  847     def assertValidEndpointResponse(self, resp, *args, **kwargs):
  848         return self.assertValidResponse(
  849             resp,
  850             'endpoint',
  851             self.assertValidEndpoint,
  852             *args,
  853             **kwargs)
  854 
  855     def assertValidEndpoint(self, entity, ref=None):
  856         self.assertIsNotNone(entity.get('interface'))
  857         self.assertIsNotNone(entity.get('service_id'))
  858         self.assertIsInstance(entity['enabled'], bool)
  859 
  860         # this is intended to be an unexposed implementation detail
  861         self.assertNotIn('legacy_endpoint_id', entity)
  862 
  863         if ref:
  864             self.assertEqual(ref['interface'], entity['interface'])
  865             self.assertEqual(ref['service_id'], entity['service_id'])
  866             if ref.get('region') is not None:
  867                 self.assertEqual(ref['region_id'], entity.get('region_id'))
  868 
  869         return entity
  870 
  871     # domain validation
  872 
  873     def assertValidDomainListResponse(self, resp, *args, **kwargs):
  874         return self.assertValidListResponse(
  875             resp,
  876             'domains',
  877             self.assertValidDomain,
  878             *args,
  879             **kwargs)
  880 
  881     def assertValidDomainResponse(self, resp, *args, **kwargs):
  882         return self.assertValidResponse(
  883             resp,
  884             'domain',
  885             self.assertValidDomain,
  886             *args,
  887             **kwargs)
  888 
  889     def assertValidDomain(self, entity, ref=None):
  890         if ref:
  891             pass
  892         return entity
  893 
  894     # project validation
  895 
  896     def assertValidProjectListResponse(self, resp, *args, **kwargs):
  897         return self.assertValidListResponse(
  898             resp,
  899             'projects',
  900             self.assertValidProject,
  901             *args,
  902             **kwargs)
  903 
  904     def assertValidProjectResponse(self, resp, *args, **kwargs):
  905         return self.assertValidResponse(
  906             resp,
  907             'project',
  908             self.assertValidProject,
  909             *args,
  910             **kwargs)
  911 
  912     def assertValidProject(self, entity, ref=None):
  913         if ref:
  914             self.assertEqual(ref['domain_id'], entity['domain_id'])
  915         return entity
  916 
  917     # user validation
  918 
  919     def assertValidUserListResponse(self, resp, *args, **kwargs):
  920         return self.assertValidListResponse(
  921             resp,
  922             'users',
  923             self.assertValidUser,
  924             keys_to_check=['name', 'enabled'],
  925             *args,
  926             **kwargs)
  927 
  928     def assertValidUserResponse(self, resp, *args, **kwargs):
  929         return self.assertValidResponse(
  930             resp,
  931             'user',
  932             self.assertValidUser,
  933             keys_to_check=['name', 'enabled'],
  934             *args,
  935             **kwargs)
  936 
  937     def assertValidUser(self, entity, ref=None):
  938         self.assertIsNotNone(entity.get('domain_id'))
  939         self.assertIsNotNone(entity.get('email'))
  940         self.assertNotIn('password', entity)
  941         self.assertNotIn('projectId', entity)
  942         self.assertIn('password_expires_at', entity)
  943         if ref:
  944             self.assertEqual(ref['domain_id'], entity['domain_id'])
  945             self.assertEqual(ref['email'], entity['email'])
  946             if 'default_project_id' in ref:
  947                 self.assertIsNotNone(ref['default_project_id'])
  948                 self.assertEqual(ref['default_project_id'],
  949                                  entity['default_project_id'])
  950         return entity
  951 
  952     # group validation
  953 
  954     def assertValidGroupListResponse(self, resp, *args, **kwargs):
  955         return self.assertValidListResponse(
  956             resp,
  957             'groups',
  958             self.assertValidGroup,
  959             keys_to_check=['name', 'description', 'domain_id'],
  960             *args,
  961             **kwargs)
  962 
  963     def assertValidGroupResponse(self, resp, *args, **kwargs):
  964         return self.assertValidResponse(
  965             resp,
  966             'group',
  967             self.assertValidGroup,
  968             keys_to_check=['name', 'description', 'domain_id'],
  969             *args,
  970             **kwargs)
  971 
  972     def assertValidGroup(self, entity, ref=None):
  973         self.assertIsNotNone(entity.get('name'))
  974         if ref:
  975             self.assertEqual(ref['name'], entity['name'])
  976         return entity
  977 
  978     # credential validation
  979 
  980     def assertValidCredentialListResponse(self, resp, *args, **kwargs):
  981         return self.assertValidListResponse(
  982             resp,
  983             'credentials',
  984             self.assertValidCredential,
  985             keys_to_check=['blob', 'user_id', 'type'],
  986             *args,
  987             **kwargs)
  988 
  989     def assertValidCredentialResponse(self, resp, *args, **kwargs):
  990         return self.assertValidResponse(
  991             resp,
  992             'credential',
  993             self.assertValidCredential,
  994             keys_to_check=['blob', 'user_id', 'type'],
  995             *args,
  996             **kwargs)
  997 
  998     def assertValidCredential(self, entity, ref=None):
  999         self.assertIsNotNone(entity.get('user_id'))
 1000         self.assertIsNotNone(entity.get('blob'))
 1001         self.assertIsNotNone(entity.get('type'))
 1002         self.assertNotIn('key_hash', entity)
 1003         self.assertNotIn('encrypted_blob', entity)
 1004         if ref:
 1005             self.assertEqual(ref['user_id'], entity['user_id'])
 1006             self.assertEqual(ref['blob'], entity['blob'])
 1007             self.assertEqual(ref['type'], entity['type'])
 1008             self.assertEqual(ref.get('project_id'), entity.get('project_id'))
 1009         return entity
 1010 
 1011     # role validation
 1012 
 1013     def assertValidRoleListResponse(self, resp, *args, **kwargs):
 1014         return self.assertValidListResponse(
 1015             resp,
 1016             'roles',
 1017             self.assertValidRole,
 1018             keys_to_check=['name'],
 1019             *args,
 1020             **kwargs)
 1021 
 1022     def assertRoleInListResponse(self, resp, ref, expected=1):
 1023         found_count = 0
 1024         for entity in resp.result.get('roles'):
 1025             try:
 1026                 self.assertValidRole(entity, ref=ref)
 1027             except Exception:
 1028                 # It doesn't match, so let's go onto the next one
 1029                 pass
 1030             else:
 1031                 found_count += 1
 1032         self.assertEqual(expected, found_count)
 1033 
 1034     def assertRoleNotInListResponse(self, resp, ref):
 1035         self.assertRoleInListResponse(resp, ref=ref, expected=0)
 1036 
 1037     def assertValidRoleResponse(self, resp, *args, **kwargs):
 1038         return self.assertValidResponse(
 1039             resp,
 1040             'role',
 1041             self.assertValidRole,
 1042             keys_to_check=['name'],
 1043             *args,
 1044             **kwargs)
 1045 
 1046     def assertValidRole(self, entity, ref=None):
 1047         self.assertIsNotNone(entity.get('name'))
 1048         if ref:
 1049             self.assertEqual(ref['name'], entity['name'])
 1050             self.assertEqual(ref['domain_id'], entity['domain_id'])
 1051         return entity
 1052 
 1053     # role assignment validation
 1054 
 1055     def assertValidRoleAssignmentListResponse(self, resp, expected_length=None,
 1056                                               resource_url=None):
 1057         entities = resp.result.get('role_assignments')
 1058 
 1059         if expected_length or expected_length == 0:
 1060             self.assertEqual(expected_length, len(entities))
 1061 
 1062         # Collections should have relational links
 1063         self.assertValidListLinks(resp.result.get('links'),
 1064                                   resource_url=resource_url)
 1065 
 1066         for entity in entities:
 1067             self.assertIsNotNone(entity)
 1068             self.assertValidRoleAssignment(entity)
 1069         return entities
 1070 
 1071     def assertValidRoleAssignment(self, entity, ref=None):
 1072         # A role should be present
 1073         self.assertIsNotNone(entity.get('role'))
 1074         self.assertIsNotNone(entity['role'].get('id'))
 1075 
 1076         # Only one of user or group should be present
 1077         if entity.get('user'):
 1078             self.assertNotIn('group', entity)
 1079             self.assertIsNotNone(entity['user'].get('id'))
 1080         else:
 1081             self.assertIsNotNone(entity.get('group'))
 1082             self.assertIsNotNone(entity['group'].get('id'))
 1083 
 1084         # A scope should be present and have only one of domain or project
 1085         self.assertIsNotNone(entity.get('scope'))
 1086 
 1087         if entity['scope'].get('project'):
 1088             self.assertNotIn('domain', entity['scope'])
 1089             self.assertIsNotNone(entity['scope']['project'].get('id'))
 1090         elif entity['scope'].get('domain'):
 1091             self.assertIsNotNone(entity['scope'].get('domain'))
 1092             self.assertIsNotNone(entity['scope']['domain'].get('id'))
 1093         else:
 1094             self.assertIsNotNone(entity['scope'].get('system'))
 1095             self.assertTrue(entity['scope']['system']['all'])
 1096 
 1097         # An assignment link should be present
 1098         self.assertIsNotNone(entity.get('links'))
 1099         self.assertIsNotNone(entity['links'].get('assignment'))
 1100 
 1101         if ref:
 1102             links = ref.pop('links')
 1103             try:
 1104                 self.assertDictContainsSubset(ref, entity)
 1105                 self.assertIn(links['assignment'],
 1106                               entity['links']['assignment'])
 1107             finally:
 1108                 if links:
 1109                     ref['links'] = links
 1110 
 1111     def assertRoleAssignmentInListResponse(self, resp, ref, expected=1):
 1112 
 1113         found_count = 0
 1114         for entity in resp.result.get('role_assignments'):
 1115             try:
 1116                 self.assertValidRoleAssignment(entity, ref=ref)
 1117             except Exception:
 1118                 # It doesn't match, so let's go onto the next one
 1119                 pass
 1120             else:
 1121                 found_count += 1
 1122         self.assertEqual(expected, found_count)
 1123 
 1124     def assertRoleAssignmentNotInListResponse(self, resp, ref):
 1125         self.assertRoleAssignmentInListResponse(resp, ref=ref, expected=0)
 1126 
 1127     # policy validation
 1128 
 1129     def assertValidPolicyListResponse(self, resp, *args, **kwargs):
 1130         return self.assertValidListResponse(
 1131             resp,
 1132             'policies',
 1133             self.assertValidPolicy,
 1134             *args,
 1135             **kwargs)
 1136 
 1137     def assertValidPolicyResponse(self, resp, *args, **kwargs):
 1138         return self.assertValidResponse(
 1139             resp,
 1140             'policy',
 1141             self.assertValidPolicy,
 1142             *args,
 1143             **kwargs)
 1144 
 1145     def assertValidPolicy(self, entity, ref=None):
 1146         self.assertIsNotNone(entity.get('blob'))
 1147         self.assertIsNotNone(entity.get('type'))
 1148         if ref:
 1149             self.assertEqual(ref['blob'], entity['blob'])
 1150             self.assertEqual(ref['type'], entity['type'])
 1151         return entity
 1152 
 1153     # trust validation
 1154 
 1155     def assertValidTrustListResponse(self, resp, *args, **kwargs):
 1156         return self.assertValidListResponse(
 1157             resp,
 1158             'trusts',
 1159             self.assertValidTrustSummary,
 1160             keys_to_check=['trustor_user_id',
 1161                            'trustee_user_id',
 1162                            'impersonation'],
 1163             *args,
 1164             **kwargs)
 1165 
 1166     def assertValidTrustResponse(self, resp, *args, **kwargs):
 1167         return self.assertValidResponse(
 1168             resp,
 1169             'trust',
 1170             self.assertValidTrust,
 1171             keys_to_check=['trustor_user_id',
 1172                            'trustee_user_id',
 1173                            'impersonation'],
 1174             *args,
 1175             **kwargs)
 1176 
 1177     def assertValidTrustSummary(self, entity, ref=None):
 1178         return self.assertValidTrust(entity, ref, summary=True)
 1179 
 1180     def assertValidTrust(self, entity, ref=None, summary=False):
 1181         self.assertIsNotNone(entity.get('trustor_user_id'))
 1182         self.assertIsNotNone(entity.get('trustee_user_id'))
 1183         self.assertIsNotNone(entity.get('impersonation'))
 1184 
 1185         self.assertIn('expires_at', entity)
 1186         if entity['expires_at'] is not None:
 1187             self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at'])
 1188 
 1189         if summary:
 1190             # Trust list contains no roles, but getting a specific
 1191             # trust by ID provides the detailed response containing roles
 1192             self.assertNotIn('roles', entity)
 1193             self.assertIn('project_id', entity)
 1194         else:
 1195             for role in entity['roles']:
 1196                 self.assertIsNotNone(role)
 1197                 self.assertValidEntity(role, keys_to_check=['name'])
 1198                 self.assertValidRole(role)
 1199 
 1200             self.assertValidListLinks(entity.get('roles_links'))
 1201             # Mirror the test tempest does to ensure the self-link is correct
 1202             self.assertIn('v3/OS-TRUST/trusts', entity.get('links')['self'])
 1203 
 1204             # always disallow role xor project_id (neither or both is allowed)
 1205             has_roles = bool(entity.get('roles'))
 1206             has_project = bool(entity.get('project_id'))
 1207             self.assertFalse(has_roles ^ has_project)
 1208 
 1209         if ref:
 1210             self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id'])
 1211             self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id'])
 1212             self.assertEqual(ref['project_id'], entity['project_id'])
 1213             if entity.get('expires_at') or ref.get('expires_at'):
 1214                 entity_exp = self.assertValidISO8601ExtendedFormatDatetime(
 1215                     entity['expires_at'])
 1216                 ref_exp = self.assertValidISO8601ExtendedFormatDatetime(
 1217                     ref['expires_at'])
 1218                 self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp)
 1219             else:
 1220                 self.assertEqual(ref.get('expires_at'),
 1221                                  entity.get('expires_at'))
 1222 
 1223         return entity
 1224 
 1225     # Service providers (federation)
 1226 
 1227     def assertValidServiceProvider(self, entity, ref=None, *args, **kwargs):
 1228 
 1229         attributes = frozenset(['auth_url', 'id', 'enabled', 'description',
 1230                                 'links', 'relay_state_prefix', 'sp_url'])
 1231         for attribute in attributes:
 1232             self.assertIsNotNone(entity.get(attribute))
 1233 
 1234     def build_external_auth_environ(self, remote_user, remote_domain=None):
 1235         environment = {'REMOTE_USER': remote_user, 'AUTH_TYPE': 'Negotiate'}
 1236         if remote_domain:
 1237             environment['REMOTE_DOMAIN'] = remote_domain
 1238         return environment
 1239 
 1240 
 1241 class VersionTestCase(RestfulTestCase):
 1242     def test_get_version(self):
 1243         pass
 1244 
 1245 
 1246 # NOTE(gyee): test AuthContextMiddleware here instead of test_middleware.py
 1247 # because we need the token
 1248 class AuthContextMiddlewareTestCase(RestfulTestCase):
 1249 
 1250     def _middleware_request(self, token, extra_environ=None):
 1251 
 1252         def application(environ, start_response):
 1253             body = b'body'
 1254             headers = [('Content-Type', 'text/html; charset=utf8'),
 1255                        ('Content-Length', str(len(body)))]
 1256             start_response('200 OK', headers)
 1257             return [body]
 1258 
 1259         app = webtest.TestApp(auth_context.AuthContextMiddleware(application),
 1260                               extra_environ=extra_environ)
 1261         resp = app.get('/', headers={authorization.AUTH_TOKEN_HEADER: token})
 1262         self.assertEqual(b'body', resp.body)  # just to make sure it worked
 1263         return resp.request
 1264 
 1265     def test_auth_context_build_by_middleware(self):
 1266         # test to make sure AuthContextMiddleware successful build the auth
 1267         # context from the incoming auth token
 1268         admin_token = self.get_scoped_token()
 1269         req = self._middleware_request(admin_token)
 1270         self.assertEqual(
 1271             self.user['id'],
 1272             req.environ.get(authorization.AUTH_CONTEXT_ENV)['user_id'])
 1273 
 1274     def test_auth_context_override(self):
 1275         overridden_context = 'OVERRIDDEN_CONTEXT'
 1276         # this token should not be used
 1277         token = uuid.uuid4().hex
 1278 
 1279         extra_environ = {authorization.AUTH_CONTEXT_ENV: overridden_context}
 1280         req = self._middleware_request(token, extra_environ=extra_environ)
 1281         # make sure overridden context take precedence
 1282         self.assertEqual(overridden_context,
 1283                          req.environ.get(authorization.AUTH_CONTEXT_ENV))
 1284 
 1285     def test_unscoped_token_auth_context(self):
 1286         unscoped_token = self.get_unscoped_token()
 1287         req = self._middleware_request(unscoped_token)
 1288         # This check originally looked that the value was unset
 1289         # but that was an artifact of the custom context keystone
 1290         # used to create.  Oslo-context will always provide the
 1291         # same set of keys, but the values will be None in an
 1292         # unscoped token
 1293         for key in ['project_id', 'domain_id', 'domain_name']:
 1294             self.assertIsNone(
 1295                 req.environ.get(authorization.AUTH_CONTEXT_ENV)[key])
 1296 
 1297     def test_project_scoped_token_auth_context(self):
 1298         project_scoped_token = self.get_scoped_token()
 1299         req = self._middleware_request(project_scoped_token)
 1300         self.assertEqual(
 1301             self.project['id'],
 1302             req.environ.get(authorization.AUTH_CONTEXT_ENV)['project_id'])
 1303 
 1304     def test_domain_scoped_token_auth_context(self):
 1305         # grant the domain role to user
 1306         path = '/domains/%s/users/%s/roles/%s' % (
 1307             self.domain['id'], self.user['id'], self.role['id'])
 1308         self.put(path=path)
 1309 
 1310         domain_scoped_token = self.get_domain_scoped_token()
 1311         req = self._middleware_request(domain_scoped_token)
 1312         self.assertEqual(
 1313             self.domain['id'],
 1314             req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_id'])
 1315         self.assertEqual(
 1316             self.domain['name'],
 1317             req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_name'])
 1318 
 1319     def test_oslo_context(self):
 1320         # After AuthContextMiddleware runs, an
 1321         # oslo_context.context.RequestContext was created so that its fields
 1322         # can be logged. This test validates that the RequestContext was
 1323         # created and the fields are set as expected.
 1324 
 1325         # Use a scoped token so more fields can be set.
 1326         token = self.get_scoped_token()
 1327 
 1328         # oslo_middleware RequestId middleware sets openstack.request_id.
 1329         request_id = uuid.uuid4().hex
 1330         environ = {'openstack.request_id': request_id}
 1331         self._middleware_request(token, extra_environ=environ)
 1332 
 1333         req_context = oslo_context.context.get_current()
 1334         self.assertEqual(request_id, req_context.request_id)
 1335         self.assertEqual(token, req_context.auth_token)
 1336         self.assertEqual(self.user['id'], req_context.user_id)
 1337         self.assertEqual(self.project['id'], req_context.project_id)
 1338         self.assertIsNone(req_context.domain_id)
 1339         self.assertEqual(self.user['domain_id'], req_context.user_domain_id)
 1340         self.assertEqual(self.project['domain_id'],
 1341                          req_context.project_domain_id)
 1342         self.assertFalse(req_context.is_admin)
 1343 
 1344 
 1345 class JsonHomeTestMixin(object):
 1346     """JSON Home test.
 1347 
 1348     Mixin this class to provide a test for the JSON-Home response for an
 1349     extension.
 1350 
 1351     The base class must set JSON_HOME_DATA to a dict of relationship URLs
 1352     (rels) to the JSON-Home data for the relationship. The rels and associated
 1353     data must be in the response.
 1354 
 1355     """
 1356 
 1357     def test_get_json_home(self):
 1358         resp = self.get('/', convert=False,
 1359                         headers={'Accept': 'application/json-home'})
 1360         self.assertThat(resp.headers['Content-Type'],
 1361                         matchers.Equals('application/json-home'))
 1362         resp_data = jsonutils.loads(resp.body)
 1363 
 1364         # Check that the example relationships are present.
 1365         for rel in self.JSON_HOME_DATA:
 1366             self.assertThat(resp_data['resources'][rel],
 1367                             matchers.Equals(self.JSON_HOME_DATA[rel]))
 1368 
 1369 
 1370 class AssignmentTestMixin(object):
 1371     """To hold assignment helper functions."""
 1372 
 1373     def build_role_assignment_query_url(self, effective=False, **filters):
 1374         """Build and return a role assignment query url with provided params.
 1375 
 1376         Available filters are: domain_id, project_id, user_id, group_id,
 1377         role_id and inherited_to_projects.
 1378         """
 1379         query_params = '?effective' if effective else ''
 1380 
 1381         for k, v in filters.items():
 1382             query_params += '?' if not query_params else '&'
 1383 
 1384             if k == 'inherited_to_projects':
 1385                 query_params += 'scope.OS-INHERIT:inherited_to=projects'
 1386             else:
 1387                 if k in ['domain_id', 'project_id']:
 1388                     query_params += 'scope.'
 1389                 elif k not in ['user_id', 'group_id', 'role_id']:
 1390                     raise ValueError(
 1391                         'Invalid key \'%s\' in provided filters.' % k)
 1392 
 1393                 query_params += '%s=%s' % (k.replace('_', '.'), v)
 1394 
 1395         return '/role_assignments%s' % query_params
 1396 
 1397     def build_role_assignment_link(self, **attribs):
 1398         """Build and return a role assignment link with provided attributes.
 1399 
 1400         Provided attributes are expected to contain: domain_id or project_id,
 1401         user_id or group_id, role_id and, optionally, inherited_to_projects.
 1402         """
 1403         if attribs.get('domain_id'):
 1404             link = '/domains/' + attribs['domain_id']
 1405         elif attribs.get('system'):
 1406             link = '/system'
 1407         else:
 1408             link = '/projects/' + attribs['project_id']
 1409 
 1410         if attribs.get('user_id'):
 1411             link += '/users/' + attribs['user_id']
 1412         else:
 1413             link += '/groups/' + attribs['group_id']
 1414 
 1415         link += '/roles/' + attribs['role_id']
 1416 
 1417         if attribs.get('inherited_to_projects'):
 1418             return '/OS-INHERIT%s/inherited_to_projects' % link
 1419 
 1420         return link
 1421 
 1422     def build_role_assignment_entity(
 1423             self, link=None, prior_role_link=None, **attribs):
 1424         """Build and return a role assignment entity with provided attributes.
 1425 
 1426         Provided attributes are expected to contain: domain_id or project_id,
 1427         user_id or group_id, role_id and, optionally, inherited_to_projects.
 1428         """
 1429         entity = {'links': {'assignment': (
 1430             link or self.build_role_assignment_link(**attribs))}}
 1431 
 1432         if attribs.get('domain_id'):
 1433             entity['scope'] = {'domain': {'id': attribs['domain_id']}}
 1434         elif attribs.get('system'):
 1435             entity['scope'] = {'system': {'all': True}}
 1436         else:
 1437             entity['scope'] = {'project': {'id': attribs['project_id']}}
 1438 
 1439         if attribs.get('user_id'):
 1440             entity['user'] = {'id': attribs['user_id']}
 1441 
 1442             if attribs.get('group_id'):
 1443                 entity['links']['membership'] = ('/groups/%s/users/%s' %
 1444                                                  (attribs['group_id'],
 1445                                                   attribs['user_id']))
 1446         else:
 1447             entity['group'] = {'id': attribs['group_id']}
 1448 
 1449         entity['role'] = {'id': attribs['role_id']}
 1450 
 1451         if attribs.get('inherited_to_projects'):
 1452             entity['scope']['OS-INHERIT:inherited_to'] = 'projects'
 1453 
 1454         if prior_role_link:
 1455             entity['links']['prior_role'] = prior_role_link
 1456 
 1457         return entity
 1458 
 1459     def build_role_assignment_entity_include_names(self,
 1460                                                    domain_ref=None,
 1461                                                    role_ref=None,
 1462                                                    group_ref=None,
 1463                                                    user_ref=None,
 1464                                                    project_ref=None,
 1465                                                    inherited_assignment=None):
 1466         """Build and return a role assignment entity with provided attributes.
 1467 
 1468         The expected attributes are: domain_ref or project_ref,
 1469         user_ref or group_ref, role_ref and, optionally, inherited_to_projects.
 1470         """
 1471         entity = {'links': {}}
 1472         attributes_for_links = {}
 1473         if project_ref:
 1474             dmn_name = PROVIDERS.resource_api.get_domain(
 1475                 project_ref['domain_id'])['name']
 1476 
 1477             entity['scope'] = {'project': {
 1478                                'id': project_ref['id'],
 1479                                'name': project_ref['name'],
 1480                                'domain': {
 1481                                    'id': project_ref['domain_id'],
 1482                                    'name': dmn_name}}}
 1483             attributes_for_links['project_id'] = project_ref['id']
 1484         else:
 1485             entity['scope'] = {'domain': {'id': domain_ref['id'],
 1486                                           'name': domain_ref['name']}}
 1487             attributes_for_links['domain_id'] = domain_ref['id']
 1488         if user_ref:
 1489             dmn_name = PROVIDERS.resource_api.get_domain(
 1490                 user_ref['domain_id'])['name']
 1491             entity['user'] = {'id': user_ref['id'],
 1492                               'name': user_ref['name'],
 1493                               'domain': {'id': user_ref['domain_id'],
 1494                                          'name': dmn_name}}
 1495             attributes_for_links['user_id'] = user_ref['id']
 1496         else:
 1497             dmn_name = PROVIDERS.resource_api.get_domain(
 1498                 group_ref['domain_id'])['name']
 1499             entity['group'] = {'id': group_ref['id'],
 1500                                'name': group_ref['name'],
 1501                                'domain': {
 1502                                    'id': group_ref['domain_id'],
 1503                                    'name': dmn_name}}
 1504             attributes_for_links['group_id'] = group_ref['id']
 1505 
 1506         if role_ref:
 1507             entity['role'] = {'id': role_ref['id'],
 1508                               'name': role_ref['name']}
 1509             if role_ref['domain_id']:
 1510                 dmn_name = PROVIDERS.resource_api.get_domain(
 1511                     role_ref['domain_id'])['name']
 1512                 entity['role']['domain'] = {'id': role_ref['domain_id'],
 1513                                             'name': dmn_name}
 1514             attributes_for_links['role_id'] = role_ref['id']
 1515 
 1516         if inherited_assignment:
 1517             entity['scope']['OS-INHERIT:inherited_to'] = 'projects'
 1518             attributes_for_links['inherited_to_projects'] = True
 1519 
 1520         entity['links']['assignment'] = self.build_role_assignment_link(
 1521             **attributes_for_links)
 1522 
 1523         return entity