"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/test_v3_auth.py" (13 May 2020, 242631 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_v3_auth.py": 16.0.1_vs_17.0.0.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import copy
   16 import datetime
   17 import fixtures
   18 import itertools
   19 import operator
   20 import re
   21 from unittest import mock
   22 import uuid
   23 
   24 import freezegun
   25 import http.client
   26 from oslo_serialization import jsonutils as json
   27 from oslo_utils import fixture
   28 from oslo_utils import timeutils
   29 from testtools import matchers
   30 from testtools import testcase
   31 
   32 from keystone import auth
   33 from keystone.auth.plugins import totp
   34 from keystone.common import authorization
   35 from keystone.common import provider_api
   36 from keystone.common.rbac_enforcer import policy
   37 from keystone.common import utils
   38 import keystone.conf
   39 from keystone.credential.providers import fernet as credential_fernet
   40 from keystone import exception
   41 from keystone.identity.backends import resource_options as ro
   42 from keystone.tests.common import auth as common_auth
   43 from keystone.tests import unit
   44 from keystone.tests.unit import ksfixtures
   45 from keystone.tests.unit import test_v3
   46 
   47 
   48 CONF = keystone.conf.CONF
   49 PROVIDERS = provider_api.ProviderAPIs
   50 
   51 
   52 class TestMFARules(test_v3.RestfulTestCase):
   53     def config_overrides(self):
   54         super(TestMFARules, self).config_overrides()
   55 
   56         self.useFixture(
   57             ksfixtures.KeyRepository(
   58                 self.config_fixture,
   59                 'fernet_tokens',
   60                 CONF.fernet_tokens.max_active_keys
   61             )
   62         )
   63 
   64         self.useFixture(
   65             ksfixtures.KeyRepository(
   66                 self.config_fixture,
   67                 'credential',
   68                 credential_fernet.MAX_ACTIVE_KEYS
   69             )
   70         )
   71 
   72     def assertValidErrorResponse(self, r):
   73         resp = r.result
   74         if r.headers.get(authorization.AUTH_RECEIPT_HEADER):
   75             self.assertIsNotNone(resp.get('receipt'))
   76             self.assertIsNotNone(resp.get('receipt').get('methods'))
   77         else:
   78             self.assertIsNotNone(resp.get('error'))
   79             self.assertIsNotNone(resp['error'].get('code'))
   80             self.assertIsNotNone(resp['error'].get('title'))
   81             self.assertIsNotNone(resp['error'].get('message'))
   82             self.assertEqual(int(resp['error']['code']), r.status_code)
   83 
   84     def _create_totp_cred(self):
   85         totp_cred = unit.new_totp_credential(self.user_id, self.project_id)
   86         PROVIDERS.credential_api.create_credential(uuid.uuid4().hex, totp_cred)
   87 
   88         def cleanup(testcase):
   89             totp_creds = testcase.credential_api.list_credentials_for_user(
   90                 testcase.user['id'], type='totp')
   91 
   92             for cred in totp_creds:
   93                 testcase.credential_api.delete_credential(cred['id'])
   94 
   95         self.addCleanup(cleanup, testcase=self)
   96         return totp_cred
   97 
   98     def auth_plugin_config_override(self, methods=None, **method_classes):
   99         methods = ['totp', 'token', 'password']
  100         super(TestMFARules, self).auth_plugin_config_override(methods)
  101 
  102     def _update_user_with_MFA_rules(self, rule_list, rules_enabled=True):
  103         user = self.user.copy()
  104         # Do not update password
  105         user.pop('password')
  106         user['options'][ro.MFA_RULES_OPT.option_name] = rule_list
  107         user['options'][ro.MFA_ENABLED_OPT.option_name] = rules_enabled
  108         PROVIDERS.identity_api.update_user(user['id'], user)
  109 
  110     def test_MFA_single_method_rules_requirements_met_succeeds(self):
  111         # ensure that a simple password works if a password-only rules exists
  112         rule_list = [['password'], ['password', 'totp']]
  113         self._update_user_with_MFA_rules(rule_list=rule_list)
  114         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  115         # issues with revocation events that occur at the same time as the
  116         # token issuance. This is a bug with the limited resolution that
  117         # tokens and revocation events have.
  118         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  119         with freezegun.freeze_time(time):
  120             self.v3_create_token(
  121                 self.build_authentication_request(
  122                     user_id=self.user_id,
  123                     password=self.user['password'],
  124                     user_domain_id=self.domain_id,
  125                     project_id=self.project_id))
  126 
  127     def test_MFA_multi_method_rules_requirements_met_succeeds(self):
  128         # validate that multiple auth-methods function if all are specified
  129         # and the rules requires it
  130         rule_list = [['password', 'totp']]
  131         totp_cred = self._create_totp_cred()
  132         self._update_user_with_MFA_rules(rule_list=rule_list)
  133         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  134         # issues with revocation events that occur at the same time as the
  135         # token issuance. This is a bug with the limited resolution that
  136         # tokens and revocation events have.
  137         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  138         with freezegun.freeze_time(time):
  139             auth_req = self.build_authentication_request(
  140                 user_id=self.user_id,
  141                 password=self.user['password'],
  142                 user_domain_id=self.domain_id,
  143                 passcode=totp._generate_totp_passcodes(totp_cred['blob'])[0])
  144             self.v3_create_token(auth_req)
  145 
  146     def test_MFA_single_method_rules_requirements_not_met_fails(self):
  147         # if a rule matching a single auth type is specified and is not matched
  148         # the result should be unauthorized
  149         rule_list = [['totp']]
  150         self._update_user_with_MFA_rules(rule_list=rule_list)
  151         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  152         # issues with revocation events that occur at the same time as the
  153         # token issuance. This is a bug with the limited resolution that
  154         # tokens and revocation events have.
  155         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  156         with freezegun.freeze_time(time):
  157             self.v3_create_token(
  158                 self.build_authentication_request(
  159                     user_id=self.user_id,
  160                     password=self.user['password'],
  161                     user_domain_id=self.domain_id,
  162                     project_id=self.project_id),
  163                 expected_status=http.client.UNAUTHORIZED)
  164 
  165     def test_MFA_multi_method_rules_requirements_not_met_fails(self):
  166         # if multiple rules are specified and only one is passed,
  167         # unauthorized is expected
  168         rule_list = [['password', 'totp']]
  169         self._update_user_with_MFA_rules(rule_list=rule_list)
  170         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  171         # issues with revocation events that occur at the same time as the
  172         # token issuance. This is a bug with the limited resolution that
  173         # tokens and revocation events have.
  174         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  175         with freezegun.freeze_time(time):
  176             self.v3_create_token(
  177                 self.build_authentication_request(
  178                     user_id=self.user_id,
  179                     password=self.user['password'],
  180                     user_domain_id=self.domain_id,
  181                     project_id=self.project_id),
  182                 expected_status=http.client.UNAUTHORIZED)
  183 
  184     def test_MFA_rules_bogus_non_existing_auth_method_succeeds(self):
  185         # Bogus auth methods are thrown out from rules.
  186         rule_list = [['password'], ['BoGusAuThMeTh0dHandl3r']]
  187         self._update_user_with_MFA_rules(rule_list=rule_list)
  188         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  189         # issues with revocation events that occur at the same time as the
  190         # token issuance. This is a bug with the limited resolution that
  191         # tokens and revocation events have.
  192         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  193         with freezegun.freeze_time(time):
  194             self.v3_create_token(
  195                 self.build_authentication_request(
  196                     user_id=self.user_id,
  197                     password=self.user['password'],
  198                     user_domain_id=self.domain_id,
  199                     project_id=self.project_id))
  200 
  201     def test_MFA_rules_disabled_MFA_succeeeds(self):
  202         # ensure that if MFA is "disableD" authentication succeeds, even if
  203         # not enough auth methods are specified
  204         rule_list = [['password', 'totp']]
  205         self._update_user_with_MFA_rules(rule_list=rule_list,
  206                                          rules_enabled=False)
  207         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  208         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  209         # issues with revocation events that occur at the same time as the
  210         # token issuance. This is a bug with the limited resolution that
  211         # tokens and revocation events have.
  212         with freezegun.freeze_time(time):
  213             self.v3_create_token(
  214                 self.build_authentication_request(
  215                     user_id=self.user_id,
  216                     password=self.user['password'],
  217                     user_domain_id=self.domain_id,
  218                     project_id=self.project_id))
  219 
  220     def test_MFA_rules_all_bogus_rules_results_in_default_behavior(self):
  221         # if all the rules are bogus, the result is the same as the default
  222         # behavior, any single password method is sufficient
  223         rule_list = [[uuid.uuid4().hex, uuid.uuid4().hex],
  224                      ['BoGus'],
  225                      ['NonExistantMethod']]
  226         self._update_user_with_MFA_rules(rule_list=rule_list)
  227         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  228         # issues with revocation events that occur at the same time as the
  229         # token issuance. This is a bug with the limited resolution that
  230         # tokens and revocation events have.
  231         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  232         with freezegun.freeze_time(time):
  233             self.v3_create_token(
  234                 self.build_authentication_request(
  235                     user_id=self.user_id,
  236                     password=self.user['password'],
  237                     user_domain_id=self.domain_id,
  238                     project_id=self.project_id))
  239 
  240     def test_MFA_rules_rescope_works_without_token_method_in_rules(self):
  241         rule_list = [['password', 'totp']]
  242         totp_cred = self._create_totp_cred()
  243         self._update_user_with_MFA_rules(rule_list=rule_list)
  244         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  245         # issues with revocation events that occur at the same time as the
  246         # token issuance. This is a bug with the limited resolution that
  247         # tokens and revocation events have.
  248         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  249         with freezegun.freeze_time(time):
  250             auth_data = self.build_authentication_request(
  251                 user_id=self.user_id,
  252                 password=self.user['password'],
  253                 user_domain_id=self.domain_id,
  254                 passcode=totp._generate_totp_passcodes(totp_cred['blob'])[0])
  255             r = self.v3_create_token(auth_data)
  256             auth_data = self.build_authentication_request(
  257                 token=r.headers.get('X-Subject-Token'),
  258                 project_id=self.project_id)
  259             self.v3_create_token(auth_data)
  260 
  261     def test_MFA_requirements_makes_correct_receipt_for_password(self):
  262         # if multiple rules are specified and only one is passed,
  263         # unauthorized is expected
  264         rule_list = [['password', 'totp']]
  265         self._update_user_with_MFA_rules(rule_list=rule_list)
  266         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  267         # issues with revocation events that occur at the same time as the
  268         # token issuance. This is a bug with the limited resolution that
  269         # tokens and revocation events have.
  270         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  271         with freezegun.freeze_time(time):
  272             response = self.admin_request(
  273                 method='POST',
  274                 path='/v3/auth/tokens',
  275                 body=self.build_authentication_request(
  276                     user_id=self.user_id,
  277                     password=self.user['password'],
  278                     user_domain_id=self.domain_id,
  279                     project_id=self.project_id),
  280                 expected_status=http.client.UNAUTHORIZED)
  281 
  282         self.assertIsNotNone(
  283             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  284         resp_data = response.result
  285         # NOTE(adriant): We convert to sets to avoid any potential sorting
  286         # related failures since order isn't important, just content.
  287         self.assertEqual(
  288             {'password'}, set(resp_data.get('receipt').get('methods')))
  289         self.assertEqual(
  290             set(frozenset(r) for r in rule_list),
  291             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  292 
  293     def test_MFA_requirements_makes_correct_receipt_for_totp(self):
  294         # if multiple rules are specified and only one is passed,
  295         # unauthorized is expected
  296         totp_cred = self._create_totp_cred()
  297         rule_list = [['password', 'totp']]
  298         self._update_user_with_MFA_rules(rule_list=rule_list)
  299         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  300         # issues with revocation events that occur at the same time as the
  301         # token issuance. This is a bug with the limited resolution that
  302         # tokens and revocation events have.
  303         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  304         with freezegun.freeze_time(time):
  305             response = self.admin_request(
  306                 method='POST',
  307                 path='/v3/auth/tokens',
  308                 body=self.build_authentication_request(
  309                     user_id=self.user_id,
  310                     user_domain_id=self.domain_id,
  311                     project_id=self.project_id,
  312                     passcode=totp._generate_totp_passcodes(
  313                         totp_cred['blob'])[0]),
  314                 expected_status=http.client.UNAUTHORIZED)
  315 
  316         self.assertIsNotNone(
  317             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  318         resp_data = response.result
  319         # NOTE(adriant): We convert to sets to avoid any potential sorting
  320         # related failures since order isn't important, just content.
  321         self.assertEqual(
  322             {'totp'}, set(resp_data.get('receipt').get('methods')))
  323         self.assertEqual(
  324             set(frozenset(r) for r in rule_list),
  325             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  326 
  327     def test_MFA_requirements_makes_correct_receipt_for_pass_and_totp(self):
  328         # if multiple rules are specified and only one is passed,
  329         # unauthorized is expected
  330         totp_cred = self._create_totp_cred()
  331         rule_list = [['password', 'totp', 'token']]
  332         self._update_user_with_MFA_rules(rule_list=rule_list)
  333         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  334         # issues with revocation events that occur at the same time as the
  335         # token issuance. This is a bug with the limited resolution that
  336         # tokens and revocation events have.
  337         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  338         with freezegun.freeze_time(time):
  339             response = self.admin_request(
  340                 method='POST',
  341                 path='/v3/auth/tokens',
  342                 body=self.build_authentication_request(
  343                     user_id=self.user_id,
  344                     password=self.user['password'],
  345                     user_domain_id=self.domain_id,
  346                     project_id=self.project_id,
  347                     passcode=totp._generate_totp_passcodes(
  348                         totp_cred['blob'])[0]),
  349                 expected_status=http.client.UNAUTHORIZED)
  350 
  351         self.assertIsNotNone(
  352             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  353         resp_data = response.result
  354         # NOTE(adriant): We convert to sets to avoid any potential sorting
  355         # related failures since order isn't important, just content.
  356         self.assertEqual(
  357             {'password', 'totp'}, set(resp_data.get('receipt').get('methods')))
  358         self.assertEqual(
  359             set(frozenset(r) for r in rule_list),
  360             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  361 
  362     def test_MFA_requirements_returns_correct_required_auth_methods(self):
  363         # if multiple rules are specified and only one is passed,
  364         # unauthorized is expected
  365         rule_list = [
  366             ['password', 'totp', 'token'],
  367             ['password', 'totp'],
  368             ['token', 'totp'],
  369             ['BoGusAuThMeTh0dHandl3r']
  370         ]
  371         expect_rule_list = rule_list = [
  372             ['password', 'totp', 'token'],
  373             ['password', 'totp'],
  374         ]
  375         self._update_user_with_MFA_rules(rule_list=rule_list)
  376         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  377         # issues with revocation events that occur at the same time as the
  378         # token issuance. This is a bug with the limited resolution that
  379         # tokens and revocation events have.
  380         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  381         with freezegun.freeze_time(time):
  382             response = self.admin_request(
  383                 method='POST',
  384                 path='/v3/auth/tokens',
  385                 body=self.build_authentication_request(
  386                     user_id=self.user_id,
  387                     password=self.user['password'],
  388                     user_domain_id=self.domain_id,
  389                     project_id=self.project_id),
  390                 expected_status=http.client.UNAUTHORIZED)
  391 
  392         self.assertIsNotNone(
  393             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  394         resp_data = response.result
  395         # NOTE(adriant): We convert to sets to avoid any potential sorting
  396         # related failures since order isn't important, just content.
  397         self.assertEqual(
  398             {'password'}, set(resp_data.get('receipt').get('methods')))
  399         self.assertEqual(
  400             set(frozenset(r) for r in expect_rule_list),
  401             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  402 
  403     def test_MFA_consuming_receipt_with_totp(self):
  404         # if multiple rules are specified and only one is passed,
  405         # unauthorized is expected
  406         totp_cred = self._create_totp_cred()
  407         rule_list = [['password', 'totp']]
  408         self._update_user_with_MFA_rules(rule_list=rule_list)
  409         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  410         # issues with revocation events that occur at the same time as the
  411         # token issuance. This is a bug with the limited resolution that
  412         # tokens and revocation events have.
  413         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  414         with freezegun.freeze_time(time):
  415             response = self.admin_request(
  416                 method='POST',
  417                 path='/v3/auth/tokens',
  418                 body=self.build_authentication_request(
  419                     user_id=self.user_id,
  420                     password=self.user['password'],
  421                     user_domain_id=self.domain_id,
  422                     project_id=self.project_id),
  423                 expected_status=http.client.UNAUTHORIZED)
  424 
  425         self.assertIsNotNone(
  426             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  427         receipt = response.headers.get(authorization.AUTH_RECEIPT_HEADER)
  428         resp_data = response.result
  429         # NOTE(adriant): We convert to sets to avoid any potential sorting
  430         # related failures since order isn't important, just content.
  431         self.assertEqual(
  432             {'password'}, set(resp_data.get('receipt').get('methods')))
  433         self.assertEqual(
  434             set(frozenset(r) for r in rule_list),
  435             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  436 
  437         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  438         with freezegun.freeze_time(time):
  439             response = self.admin_request(
  440                 method='POST',
  441                 path='/v3/auth/tokens',
  442                 headers={authorization.AUTH_RECEIPT_HEADER: receipt},
  443                 body=self.build_authentication_request(
  444                     user_id=self.user_id,
  445                     user_domain_id=self.domain_id,
  446                     project_id=self.project_id,
  447                     passcode=totp._generate_totp_passcodes(
  448                         totp_cred['blob'])[0]))
  449 
  450     def test_MFA_consuming_receipt_not_found(self):
  451         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  452         with freezegun.freeze_time(time):
  453             response = self.admin_request(
  454                 method='POST',
  455                 path='/v3/auth/tokens',
  456                 headers={authorization.AUTH_RECEIPT_HEADER: "bogus-receipt"},
  457                 body=self.build_authentication_request(
  458                     user_id=self.user_id,
  459                     user_domain_id=self.domain_id,
  460                     project_id=self.project_id),
  461                 expected_status=http.client.UNAUTHORIZED)
  462         self.assertEqual(401, response.result['error']['code'])
  463 
  464 
  465 class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
  466     def setUp(self):
  467         super(TestAuthInfo, self).setUp()
  468         auth.core.load_auth_methods()
  469 
  470     def test_unsupported_auth_method(self):
  471         auth_data = {'methods': ['abc']}
  472         auth_data['abc'] = {'test': 'test'}
  473         auth_data = {'identity': auth_data}
  474         self.assertRaises(exception.AuthMethodNotSupported,
  475                           auth.core.AuthInfo.create,
  476                           auth_data)
  477 
  478     def test_missing_auth_method_data(self):
  479         auth_data = {'methods': ['password']}
  480         auth_data = {'identity': auth_data}
  481         self.assertRaises(exception.ValidationError,
  482                           auth.core.AuthInfo.create,
  483                           auth_data)
  484 
  485     def test_project_name_no_domain(self):
  486         auth_data = self.build_authentication_request(
  487             username='test',
  488             password='test',
  489             project_name='abc')['auth']
  490         self.assertRaises(exception.ValidationError,
  491                           auth.core.AuthInfo.create,
  492                           auth_data)
  493 
  494     def test_both_project_and_domain_in_scope(self):
  495         auth_data = self.build_authentication_request(
  496             user_id='test',
  497             password='test',
  498             project_name='test',
  499             domain_name='test')['auth']
  500         self.assertRaises(exception.ValidationError,
  501                           auth.core.AuthInfo.create,
  502                           auth_data)
  503 
  504     def test_get_method_names_duplicates(self):
  505         auth_data = self.build_authentication_request(
  506             token='test',
  507             user_id='test',
  508             password='test')['auth']
  509         auth_data['identity']['methods'] = ['password', 'token',
  510                                             'password', 'password']
  511         auth_info = auth.core.AuthInfo.create(auth_data)
  512         self.assertEqual(['password', 'token'],
  513                          auth_info.get_method_names())
  514 
  515     def test_get_method_data_invalid_method(self):
  516         auth_data = self.build_authentication_request(
  517             user_id='test',
  518             password='test')['auth']
  519         auth_info = auth.core.AuthInfo.create(auth_data)
  520 
  521         method_name = uuid.uuid4().hex
  522         self.assertRaises(exception.ValidationError,
  523                           auth_info.get_method_data,
  524                           method_name)
  525 
  526 
  527 class TokenAPITests(object):
  528     # Why is this not just setUp? Because TokenAPITests is not a test class
  529     # itself. If TokenAPITests became a subclass of the testcase, it would get
  530     # called by the enumerate-tests-in-file code. The way the functions get
  531     # resolved in Python for multiple inheritance means that a setUp in this
  532     # would get skipped by the testrunner.
  533     def doSetUp(self):
  534         r = self.v3_create_token(self.build_authentication_request(
  535             username=self.user['name'],
  536             user_domain_id=self.domain_id,
  537             password=self.user['password']))
  538         self.v3_token_data = r.result
  539         self.v3_token = r.headers.get('X-Subject-Token')
  540         self.headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')}
  541 
  542     def _get_unscoped_token(self):
  543         auth_data = self.build_authentication_request(
  544             user_id=self.user['id'],
  545             password=self.user['password'])
  546         r = self.post('/auth/tokens', body=auth_data)
  547         self.assertValidUnscopedTokenResponse(r)
  548         return r.headers.get('X-Subject-Token')
  549 
  550     def _get_domain_scoped_token(self):
  551         auth_data = self.build_authentication_request(
  552             user_id=self.user['id'],
  553             password=self.user['password'],
  554             domain_id=self.domain_id)
  555         r = self.post('/auth/tokens', body=auth_data)
  556         self.assertValidDomainScopedTokenResponse(r)
  557         return r.headers.get('X-Subject-Token')
  558 
  559     def _get_project_scoped_token(self):
  560         auth_data = self.build_authentication_request(
  561             user_id=self.user['id'],
  562             password=self.user['password'],
  563             project_id=self.project_id)
  564         r = self.post('/auth/tokens', body=auth_data)
  565         self.assertValidProjectScopedTokenResponse(r)
  566         return r.headers.get('X-Subject-Token')
  567 
  568     def _get_trust_scoped_token(self, trustee_user, trust):
  569         auth_data = self.build_authentication_request(
  570             user_id=trustee_user['id'],
  571             password=trustee_user['password'],
  572             trust_id=trust['id'])
  573         r = self.post('/auth/tokens', body=auth_data)
  574         self.assertValidProjectScopedTokenResponse(r)
  575         return r.headers.get('X-Subject-Token')
  576 
  577     def _create_trust(self, impersonation=False):
  578         # Create a trustee user
  579         trustee_user = unit.create_user(PROVIDERS.identity_api,
  580                                         domain_id=self.domain_id)
  581         ref = unit.new_trust_ref(
  582             trustor_user_id=self.user_id,
  583             trustee_user_id=trustee_user['id'],
  584             project_id=self.project_id,
  585             impersonation=impersonation,
  586             role_ids=[self.role_id])
  587 
  588         # Create a trust
  589         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
  590         trust = self.assertValidTrustResponse(r)
  591         return (trustee_user, trust)
  592 
  593     def _validate_token(self, token,
  594                         expected_status=http.client.OK, allow_expired=False):
  595         path = '/v3/auth/tokens'
  596 
  597         if allow_expired:
  598             path += '?allow_expired=1'
  599 
  600         return self.admin_request(
  601             path=path,
  602             headers={'X-Auth-Token': self.get_admin_token(),
  603                      'X-Subject-Token': token},
  604             method='GET',
  605             expected_status=expected_status
  606         )
  607 
  608     def _revoke_token(self, token, expected_status=http.client.NO_CONTENT):
  609         return self.delete(
  610             '/auth/tokens',
  611             headers={'x-subject-token': token},
  612             expected_status=expected_status)
  613 
  614     def _set_user_enabled(self, user, enabled=True):
  615         user['enabled'] = enabled
  616         PROVIDERS.identity_api.update_user(user['id'], user)
  617 
  618     def _create_project_and_set_as_default_project(self):
  619         # create a new project
  620         ref = unit.new_project_ref(domain_id=self.domain_id)
  621         r = self.post('/projects', body={'project': ref})
  622         project = self.assertValidProjectResponse(r, ref)
  623 
  624         # grant the user a role on the project
  625         self.put(
  626             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
  627                 'user_id': self.user['id'],
  628                 'project_id': project['id'],
  629                 'role_id': self.role['id']})
  630 
  631         # make the new project the user's default project
  632         body = {'user': {'default_project_id': project['id']}}
  633         r = self.patch('/users/%(user_id)s' % {
  634             'user_id': self.user['id']},
  635             body=body)
  636         self.assertValidUserResponse(r)
  637 
  638         return project
  639 
  640     def test_auth_with_token_as_different_user_fails(self):
  641         # get the token for a user. This is self.user which is different from
  642         # self.default_domain_user.
  643         token = self.get_scoped_token()
  644         # try both password and token methods with different identities and it
  645         # should fail
  646         auth_data = self.build_authentication_request(
  647             token=token,
  648             user_id=self.default_domain_user['id'],
  649             password=self.default_domain_user['password'])
  650         self.v3_create_token(auth_data,
  651                              expected_status=http.client.UNAUTHORIZED)
  652 
  653     def test_create_token_for_user_without_password_fails(self):
  654         user = unit.new_user_ref(domain_id=self.domain['id'])
  655         del user['password']  # can't have a password for this test
  656         user = PROVIDERS.identity_api.create_user(user)
  657 
  658         auth_data = self.build_authentication_request(
  659             user_id=user['id'],
  660             password='password')
  661 
  662         self.v3_create_token(auth_data,
  663                              expected_status=http.client.UNAUTHORIZED)
  664 
  665     def test_create_unscoped_token_by_authenticating_with_unscoped_token(self):
  666         auth_data = self.build_authentication_request(
  667             user_id=self.user['id'],
  668             password=self.user['password'])
  669         r = self.v3_create_token(auth_data)
  670         self.assertValidUnscopedTokenResponse(r)
  671         token_id = r.headers.get('X-Subject-Token')
  672 
  673         auth_data = self.build_authentication_request(token=token_id)
  674         r = self.v3_create_token(auth_data)
  675         self.assertValidUnscopedTokenResponse(r)
  676 
  677     def test_create_unscoped_token_with_user_id(self):
  678         auth_data = self.build_authentication_request(
  679             user_id=self.user['id'],
  680             password=self.user['password'])
  681         r = self.v3_create_token(auth_data)
  682         self.assertValidUnscopedTokenResponse(r)
  683 
  684     def test_create_unscoped_token_with_user_domain_id(self):
  685         auth_data = self.build_authentication_request(
  686             username=self.user['name'],
  687             user_domain_id=self.domain['id'],
  688             password=self.user['password'])
  689         r = self.v3_create_token(auth_data)
  690         self.assertValidUnscopedTokenResponse(r)
  691 
  692     def test_create_unscoped_token_with_user_domain_name(self):
  693         auth_data = self.build_authentication_request(
  694             username=self.user['name'],
  695             user_domain_name=self.domain['name'],
  696             password=self.user['password'])
  697         r = self.v3_create_token(auth_data)
  698         self.assertValidUnscopedTokenResponse(r)
  699 
  700     def test_validate_unscoped_token(self):
  701         unscoped_token = self._get_unscoped_token()
  702         r = self._validate_token(unscoped_token)
  703         self.assertValidUnscopedTokenResponse(r)
  704 
  705     def test_validate_expired_unscoped_token_returns_not_found(self):
  706         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
  707         # use the context manager of freezegun without sqlite issues.
  708         self.config_fixture.config(group='token',
  709                                    expiration=10)
  710         time = datetime.datetime.utcnow()
  711         with freezegun.freeze_time(time) as frozen_datetime:
  712             unscoped_token = self._get_unscoped_token()
  713             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
  714             self._validate_token(
  715                 unscoped_token,
  716                 expected_status=http.client.NOT_FOUND
  717             )
  718 
  719     def test_revoke_unscoped_token(self):
  720         unscoped_token = self._get_unscoped_token()
  721         r = self._validate_token(unscoped_token)
  722         self.assertValidUnscopedTokenResponse(r)
  723         self._revoke_token(unscoped_token)
  724         self._validate_token(unscoped_token,
  725                              expected_status=http.client.NOT_FOUND)
  726 
  727     def test_create_explicit_unscoped_token(self):
  728         self._create_project_and_set_as_default_project()
  729 
  730         # explicitly ask for an unscoped token
  731         auth_data = self.build_authentication_request(
  732             user_id=self.user['id'],
  733             password=self.user['password'],
  734             unscoped="unscoped")
  735         r = self.post('/auth/tokens', body=auth_data, noauth=True)
  736         self.assertValidUnscopedTokenResponse(r)
  737 
  738     def test_disabled_users_default_project_result_in_unscoped_token(self):
  739         # create a disabled project to work with
  740         project = self.create_new_default_project_for_user(
  741             self.user['id'], self.domain_id, enable_project=False)
  742 
  743         # assign a role to user for the new project
  744         PROVIDERS.assignment_api.add_role_to_user_and_project(
  745             self.user['id'], project['id'], self.role_id
  746         )
  747 
  748         # attempt to authenticate without requesting a project
  749         auth_data = self.build_authentication_request(
  750             user_id=self.user['id'],
  751             password=self.user['password'])
  752         r = self.v3_create_token(auth_data)
  753         self.assertValidUnscopedTokenResponse(r)
  754 
  755     def test_disabled_default_project_domain_result_in_unscoped_token(self):
  756         domain_ref = unit.new_domain_ref()
  757         r = self.post('/domains', body={'domain': domain_ref})
  758         domain = self.assertValidDomainResponse(r, domain_ref)
  759 
  760         project = self.create_new_default_project_for_user(
  761             self.user['id'], domain['id'])
  762 
  763         # assign a role to user for the new project
  764         PROVIDERS.assignment_api.add_role_to_user_and_project(
  765             self.user['id'], project['id'], self.role_id
  766         )
  767 
  768         # now disable the project domain
  769         body = {'domain': {'enabled': False}}
  770         r = self.patch('/domains/%(domain_id)s' % {'domain_id': domain['id']},
  771                        body=body)
  772         self.assertValidDomainResponse(r)
  773 
  774         # attempt to authenticate without requesting a project
  775         auth_data = self.build_authentication_request(
  776             user_id=self.user['id'],
  777             password=self.user['password'])
  778         r = self.v3_create_token(auth_data)
  779         self.assertValidUnscopedTokenResponse(r)
  780 
  781     def test_unscoped_token_is_invalid_after_disabling_user(self):
  782         unscoped_token = self._get_unscoped_token()
  783         # Make sure the token is valid
  784         r = self._validate_token(unscoped_token)
  785         self.assertValidUnscopedTokenResponse(r)
  786         # Disable the user
  787         self._set_user_enabled(self.user, enabled=False)
  788         # Ensure validating a token for a disabled user fails
  789         self._validate_token(
  790             unscoped_token,
  791             expected_status=http.client.NOT_FOUND
  792         )
  793 
  794     def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
  795         unscoped_token = self._get_unscoped_token()
  796         # Make sure the token is valid
  797         r = self._validate_token(unscoped_token)
  798         self.assertValidUnscopedTokenResponse(r)
  799         # Disable the user
  800         self._set_user_enabled(self.user, enabled=False)
  801         # Ensure validating a token for a disabled user fails
  802         self._validate_token(
  803             unscoped_token,
  804             expected_status=http.client.NOT_FOUND
  805         )
  806         # Enable the user
  807         self._set_user_enabled(self.user)
  808         # Ensure validating a token for a re-enabled user fails
  809         self._validate_token(
  810             unscoped_token,
  811             expected_status=http.client.NOT_FOUND
  812         )
  813 
  814     def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
  815         unscoped_token = self._get_unscoped_token()
  816         # Make sure the token is valid
  817         r = self._validate_token(unscoped_token)
  818         self.assertValidUnscopedTokenResponse(r)
  819         # Disable the user's domain
  820         self.domain['enabled'] = False
  821         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
  822         # Ensure validating a token for a disabled user fails
  823         self._validate_token(
  824             unscoped_token,
  825             expected_status=http.client.NOT_FOUND
  826         )
  827 
  828     def test_unscoped_token_is_invalid_after_changing_user_password(self):
  829         unscoped_token = self._get_unscoped_token()
  830         # Make sure the token is valid
  831         r = self._validate_token(unscoped_token)
  832         self.assertValidUnscopedTokenResponse(r)
  833         # Change user's password
  834         self.user['password'] = 'Password1'
  835         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  836         # Ensure updating user's password revokes existing user's tokens
  837         self._validate_token(
  838             unscoped_token,
  839             expected_status=http.client.NOT_FOUND
  840         )
  841 
  842     def test_create_system_token_with_user_id(self):
  843         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  844             'user_id': self.user['id'],
  845             'role_id': self.role_id
  846         }
  847         self.put(path=path)
  848 
  849         auth_request_body = self.build_authentication_request(
  850             user_id=self.user['id'],
  851             password=self.user['password'],
  852             system=True
  853         )
  854 
  855         response = self.v3_create_token(auth_request_body)
  856         self.assertValidSystemScopedTokenResponse(response)
  857 
  858     def test_create_system_token_with_username(self):
  859         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  860             'user_id': self.user['id'],
  861             'role_id': self.role_id
  862         }
  863         self.put(path=path)
  864 
  865         auth_request_body = self.build_authentication_request(
  866             username=self.user['name'],
  867             password=self.user['password'],
  868             user_domain_id=self.domain['id'],
  869             system=True
  870         )
  871 
  872         response = self.v3_create_token(auth_request_body)
  873         self.assertValidSystemScopedTokenResponse(response)
  874 
  875     def test_create_system_token_fails_without_system_assignment(self):
  876         auth_request_body = self.build_authentication_request(
  877             user_id=self.user['id'],
  878             password=self.user['password'],
  879             system=True
  880         )
  881         self.v3_create_token(
  882             auth_request_body,
  883             expected_status=http.client.UNAUTHORIZED
  884         )
  885 
  886     def test_system_token_is_invalid_after_disabling_user(self):
  887         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  888             'user_id': self.user['id'],
  889             'role_id': self.role_id
  890         }
  891         self.put(path=path)
  892 
  893         auth_request_body = self.build_authentication_request(
  894             username=self.user['name'],
  895             password=self.user['password'],
  896             user_domain_id=self.domain['id'],
  897             system=True
  898         )
  899 
  900         response = self.v3_create_token(auth_request_body)
  901         self.assertValidSystemScopedTokenResponse(response)
  902         token = response.headers.get('X-Subject-Token')
  903         self._validate_token(token)
  904 
  905         # NOTE(lbragstad): This would make a good test for groups, but
  906         # apparently it's not possible to disable a group.
  907         user_ref = {
  908             'user': {
  909                 'enabled': False
  910             }
  911         }
  912         self.patch(
  913             '/users/%(user_id)s' % {'user_id': self.user['id']},
  914             body=user_ref
  915         )
  916 
  917         self.admin_request(
  918             path='/v3/auth/tokens',
  919             headers={'X-Auth-Token': token,
  920                      'X-Subject-Token': token},
  921             method='GET',
  922             expected_status=http.client.UNAUTHORIZED
  923         )
  924         self.admin_request(
  925             path='/v3/auth/tokens',
  926             headers={'X-Auth-Token': token,
  927                      'X-Subject-Token': token},
  928             method='HEAD',
  929             expected_status=http.client.UNAUTHORIZED
  930         )
  931 
  932     def test_create_system_token_via_system_group_assignment(self):
  933         ref = {
  934             'group': unit.new_group_ref(
  935                 domain_id=CONF.identity.default_domain_id
  936             )
  937         }
  938 
  939         group = self.post('/groups', body=ref).json_body['group']
  940         path = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
  941             'group_id': group['id'],
  942             'role_id': self.role_id
  943         }
  944         self.put(path=path)
  945 
  946         path = '/groups/%(group_id)s/users/%(user_id)s' % {
  947             'group_id': group['id'],
  948             'user_id': self.user['id']
  949         }
  950         self.put(path=path)
  951 
  952         auth_request_body = self.build_authentication_request(
  953             user_id=self.user['id'],
  954             password=self.user['password'],
  955             system=True
  956         )
  957         response = self.v3_create_token(auth_request_body)
  958         self.assertValidSystemScopedTokenResponse(response)
  959         token = response.headers.get('X-Subject-Token')
  960         self._validate_token(token)
  961 
  962     def test_revoke_system_token(self):
  963         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  964             'user_id': self.user['id'],
  965             'role_id': self.role_id
  966         }
  967         self.put(path=path)
  968 
  969         auth_request_body = self.build_authentication_request(
  970             username=self.user['name'],
  971             password=self.user['password'],
  972             user_domain_id=self.domain['id'],
  973             system=True
  974         )
  975 
  976         response = self.v3_create_token(auth_request_body)
  977         self.assertValidSystemScopedTokenResponse(response)
  978         token = response.headers.get('X-Subject-Token')
  979         self._validate_token(token)
  980         self._revoke_token(token)
  981         self._validate_token(token, expected_status=http.client.NOT_FOUND)
  982 
  983     def test_system_token_is_invalid_after_deleting_system_role(self):
  984         ref = {'role': unit.new_role_ref()}
  985         system_role = self.post('/roles', body=ref).json_body['role']
  986 
  987         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  988             'user_id': self.user['id'],
  989             'role_id': system_role['id']
  990         }
  991         self.put(path=path)
  992 
  993         auth_request_body = self.build_authentication_request(
  994             username=self.user['name'],
  995             password=self.user['password'],
  996             user_domain_id=self.domain['id'],
  997             system=True
  998         )
  999 
 1000         response = self.v3_create_token(auth_request_body)
 1001         self.assertValidSystemScopedTokenResponse(response)
 1002         token = response.headers.get('X-Subject-Token')
 1003         self._validate_token(token)
 1004 
 1005         self.delete('/roles/%(role_id)s' % {'role_id': system_role['id']})
 1006         self._validate_token(token, expected_status=http.client.NOT_FOUND)
 1007 
 1008     def test_rescoping_a_system_token_for_a_project_token_fails(self):
 1009         ref = {'role': unit.new_role_ref()}
 1010         system_role = self.post('/roles', body=ref).json_body['role']
 1011 
 1012         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
 1013             'user_id': self.user['id'],
 1014             'role_id': system_role['id']
 1015         }
 1016         self.put(path=path)
 1017 
 1018         auth_request_body = self.build_authentication_request(
 1019             username=self.user['name'],
 1020             password=self.user['password'],
 1021             user_domain_id=self.domain['id'],
 1022             system=True
 1023         )
 1024         response = self.v3_create_token(auth_request_body)
 1025         self.assertValidSystemScopedTokenResponse(response)
 1026         system_token = response.headers.get('X-Subject-Token')
 1027 
 1028         auth_request_body = self.build_authentication_request(
 1029             token=system_token, project_id=self.project_id
 1030         )
 1031         self.v3_create_token(
 1032             auth_request_body, expected_status=http.client.FORBIDDEN
 1033         )
 1034 
 1035     def test_rescoping_a_system_token_for_a_domain_token_fails(self):
 1036         ref = {'role': unit.new_role_ref()}
 1037         system_role = self.post('/roles', body=ref).json_body['role']
 1038 
 1039         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
 1040             'user_id': self.user['id'],
 1041             'role_id': system_role['id']
 1042         }
 1043         self.put(path=path)
 1044 
 1045         auth_request_body = self.build_authentication_request(
 1046             username=self.user['name'],
 1047             password=self.user['password'],
 1048             user_domain_id=self.domain['id'],
 1049             system=True
 1050         )
 1051         response = self.v3_create_token(auth_request_body)
 1052         self.assertValidSystemScopedTokenResponse(response)
 1053         system_token = response.headers.get('X-Subject-Token')
 1054 
 1055         auth_request_body = self.build_authentication_request(
 1056             token=system_token, domain_id=CONF.identity.default_domain_id
 1057         )
 1058         self.v3_create_token(
 1059             auth_request_body, expected_status=http.client.FORBIDDEN
 1060         )
 1061 
 1062     def test_create_domain_token_scoped_with_domain_id_and_user_id(self):
 1063         # grant the user a role on the domain
 1064         path = '/domains/%s/users/%s/roles/%s' % (
 1065             self.domain['id'], self.user['id'], self.role['id'])
 1066         self.put(path=path)
 1067 
 1068         auth_data = self.build_authentication_request(
 1069             user_id=self.user['id'],
 1070             password=self.user['password'],
 1071             domain_id=self.domain['id'])
 1072         r = self.v3_create_token(auth_data)
 1073         self.assertValidDomainScopedTokenResponse(r)
 1074 
 1075     def test_create_domain_token_scoped_with_domain_id_and_username(self):
 1076         # grant the user a role on the domain
 1077         path = '/domains/%s/users/%s/roles/%s' % (
 1078             self.domain['id'], self.user['id'], self.role['id'])
 1079         self.put(path=path)
 1080 
 1081         auth_data = self.build_authentication_request(
 1082             username=self.user['name'],
 1083             user_domain_id=self.domain['id'],
 1084             password=self.user['password'],
 1085             domain_id=self.domain['id'])
 1086         r = self.v3_create_token(auth_data)
 1087         self.assertValidDomainScopedTokenResponse(r)
 1088 
 1089     def test_create_domain_token_scoped_with_domain_id(self):
 1090         # grant the user a role on the domain
 1091         path = '/domains/%s/users/%s/roles/%s' % (
 1092             self.domain['id'], self.user['id'], self.role['id'])
 1093         self.put(path=path)
 1094 
 1095         auth_data = self.build_authentication_request(
 1096             username=self.user['name'],
 1097             user_domain_name=self.domain['name'],
 1098             password=self.user['password'],
 1099             domain_id=self.domain['id'])
 1100         r = self.v3_create_token(auth_data)
 1101         self.assertValidDomainScopedTokenResponse(r)
 1102 
 1103     def test_create_domain_token_scoped_with_domain_name(self):
 1104         # grant the user a role on the domain
 1105         path = '/domains/%s/users/%s/roles/%s' % (
 1106             self.domain['id'], self.user['id'], self.role['id'])
 1107         self.put(path=path)
 1108 
 1109         auth_data = self.build_authentication_request(
 1110             user_id=self.user['id'],
 1111             password=self.user['password'],
 1112             domain_name=self.domain['name'])
 1113         r = self.v3_create_token(auth_data)
 1114         self.assertValidDomainScopedTokenResponse(r)
 1115 
 1116     def test_create_domain_token_scoped_with_domain_name_and_username(self):
 1117         # grant the user a role on the domain
 1118         path = '/domains/%s/users/%s/roles/%s' % (
 1119             self.domain['id'], self.user['id'], self.role['id'])
 1120         self.put(path=path)
 1121 
 1122         auth_data = self.build_authentication_request(
 1123             username=self.user['name'],
 1124             user_domain_id=self.domain['id'],
 1125             password=self.user['password'],
 1126             domain_name=self.domain['name'])
 1127         r = self.v3_create_token(auth_data)
 1128         self.assertValidDomainScopedTokenResponse(r)
 1129 
 1130     def test_create_domain_token_with_only_domain_name_and_username(self):
 1131         # grant the user a role on the domain
 1132         path = '/domains/%s/users/%s/roles/%s' % (
 1133             self.domain['id'], self.user['id'], self.role['id'])
 1134         self.put(path=path)
 1135 
 1136         auth_data = self.build_authentication_request(
 1137             username=self.user['name'],
 1138             user_domain_name=self.domain['name'],
 1139             password=self.user['password'],
 1140             domain_name=self.domain['name'])
 1141         r = self.v3_create_token(auth_data)
 1142         self.assertValidDomainScopedTokenResponse(r)
 1143 
 1144     def test_create_domain_token_with_group_role(self):
 1145         group = unit.new_group_ref(domain_id=self.domain_id)
 1146         group = PROVIDERS.identity_api.create_group(group)
 1147 
 1148         # add user to group
 1149         PROVIDERS.identity_api.add_user_to_group(self.user['id'], group['id'])
 1150 
 1151         # grant the domain role to group
 1152         path = '/domains/%s/groups/%s/roles/%s' % (
 1153             self.domain['id'], group['id'], self.role['id'])
 1154         self.put(path=path)
 1155 
 1156         # now get a domain-scoped token
 1157         auth_data = self.build_authentication_request(
 1158             user_id=self.user['id'],
 1159             password=self.user['password'],
 1160             domain_id=self.domain['id'])
 1161         r = self.v3_create_token(auth_data)
 1162         self.assertValidDomainScopedTokenResponse(r)
 1163 
 1164     def test_create_domain_token_fails_if_domain_name_unsafe(self):
 1165         """Verify authenticate to a domain with unsafe name fails."""
 1166         # Start with url name restrictions off, so we can create the unsafe
 1167         # named domain
 1168         self.config_fixture.config(group='resource',
 1169                                    domain_name_url_safe='off')
 1170         unsafe_name = 'i am not / safe'
 1171         domain = unit.new_domain_ref(name=unsafe_name)
 1172         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1173         role_member = unit.new_role_ref()
 1174         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1175         PROVIDERS.assignment_api.create_grant(
 1176             role_member['id'],
 1177             user_id=self.user['id'],
 1178             domain_id=domain['id'])
 1179 
 1180         auth_data = self.build_authentication_request(
 1181             user_id=self.user['id'],
 1182             password=self.user['password'],
 1183             domain_name=domain['name'])
 1184 
 1185         # Since name url restriction is off, we should be able to authenticate
 1186         self.v3_create_token(auth_data)
 1187 
 1188         # Set the name url restriction to new, which should still allow us to
 1189         # authenticate
 1190         self.config_fixture.config(group='resource',
 1191                                    project_name_url_safe='new')
 1192         self.v3_create_token(auth_data)
 1193 
 1194         # Set the name url restriction to strict and we should fail to
 1195         # authenticate
 1196         self.config_fixture.config(group='resource',
 1197                                    domain_name_url_safe='strict')
 1198         self.v3_create_token(auth_data,
 1199                              expected_status=http.client.UNAUTHORIZED)
 1200 
 1201     def test_create_domain_token_without_grant_returns_unauthorized(self):
 1202         auth_data = self.build_authentication_request(
 1203             user_id=self.user['id'],
 1204             password=self.user['password'],
 1205             domain_id=self.domain['id'])
 1206         # this fails because the user does not have a role on self.domain
 1207         self.v3_create_token(auth_data,
 1208                              expected_status=http.client.UNAUTHORIZED)
 1209 
 1210     def test_validate_domain_scoped_token(self):
 1211         # Grant user access to domain
 1212         PROVIDERS.assignment_api.create_grant(
 1213             self.role['id'], user_id=self.user['id'],
 1214             domain_id=self.domain['id']
 1215         )
 1216         domain_scoped_token = self._get_domain_scoped_token()
 1217         r = self._validate_token(domain_scoped_token)
 1218         self.assertValidDomainScopedTokenResponse(r)
 1219         resp_json = json.loads(r.body)
 1220         self.assertIsNotNone(resp_json['token']['catalog'])
 1221         self.assertIsNotNone(resp_json['token']['roles'])
 1222         self.assertIsNotNone(resp_json['token']['domain'])
 1223 
 1224     def test_validate_expired_domain_scoped_token_returns_not_found(self):
 1225         # Grant user access to domain
 1226         PROVIDERS.assignment_api.create_grant(
 1227             self.role['id'], user_id=self.user['id'],
 1228             domain_id=self.domain['id']
 1229         )
 1230         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1231         # use the context manager of freezegun without sqlite issues.
 1232         self.config_fixture.config(group='token',
 1233                                    expiration=10)
 1234         time = datetime.datetime.utcnow()
 1235         with freezegun.freeze_time(time) as frozen_datetime:
 1236             domain_scoped_token = self._get_domain_scoped_token()
 1237             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1238             self._validate_token(
 1239                 domain_scoped_token,
 1240                 expected_status=http.client.NOT_FOUND
 1241             )
 1242 
 1243     def test_domain_scoped_token_is_invalid_after_disabling_user(self):
 1244         # Grant user access to domain
 1245         PROVIDERS.assignment_api.create_grant(
 1246             self.role['id'], user_id=self.user['id'],
 1247             domain_id=self.domain['id']
 1248         )
 1249         domain_scoped_token = self._get_domain_scoped_token()
 1250         # Make sure the token is valid
 1251         r = self._validate_token(domain_scoped_token)
 1252         self.assertValidDomainScopedTokenResponse(r)
 1253         # Disable user
 1254         self._set_user_enabled(self.user, enabled=False)
 1255         # Ensure validating a token for a disabled user fails
 1256         self._validate_token(
 1257             domain_scoped_token,
 1258             expected_status=http.client.NOT_FOUND
 1259         )
 1260 
 1261     def test_domain_scoped_token_is_invalid_after_deleting_grant(self):
 1262         # Grant user access to domain
 1263         PROVIDERS.assignment_api.create_grant(
 1264             self.role['id'], user_id=self.user['id'],
 1265             domain_id=self.domain['id']
 1266         )
 1267         domain_scoped_token = self._get_domain_scoped_token()
 1268         # Make sure the token is valid
 1269         r = self._validate_token(domain_scoped_token)
 1270         self.assertValidDomainScopedTokenResponse(r)
 1271         # Delete access to domain
 1272         PROVIDERS.assignment_api.delete_grant(
 1273             self.role['id'], user_id=self.user['id'],
 1274             domain_id=self.domain['id']
 1275         )
 1276         # Ensure validating a token for a disabled user fails
 1277         self._validate_token(
 1278             domain_scoped_token,
 1279             expected_status=http.client.NOT_FOUND
 1280         )
 1281 
 1282     def test_domain_scoped_token_invalid_after_disabling_domain(self):
 1283         # Grant user access to domain
 1284         PROVIDERS.assignment_api.create_grant(
 1285             self.role['id'], user_id=self.user['id'],
 1286             domain_id=self.domain['id']
 1287         )
 1288         domain_scoped_token = self._get_domain_scoped_token()
 1289         # Make sure the token is valid
 1290         r = self._validate_token(domain_scoped_token)
 1291         self.assertValidDomainScopedTokenResponse(r)
 1292         # Disable domain
 1293         self.domain['enabled'] = False
 1294         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
 1295         # Ensure validating a token for a disabled domain fails
 1296         self._validate_token(
 1297             domain_scoped_token,
 1298             expected_status=http.client.NOT_FOUND
 1299         )
 1300 
 1301     def test_create_project_scoped_token_with_project_id_and_user_id(self):
 1302         auth_data = self.build_authentication_request(
 1303             user_id=self.user['id'],
 1304             password=self.user['password'],
 1305             project_id=self.project['id'])
 1306         r = self.v3_create_token(auth_data)
 1307         self.assertValidProjectScopedTokenResponse(r)
 1308 
 1309     def test_validate_project_scoped_token(self):
 1310         project_scoped_token = self._get_project_scoped_token()
 1311         r = self._validate_token(project_scoped_token)
 1312         self.assertValidProjectScopedTokenResponse(r)
 1313 
 1314     def test_validate_expired_project_scoped_token_returns_not_found(self):
 1315         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1316         # use the context manager of freezegun without sqlite issues.
 1317         self.config_fixture.config(group='token',
 1318                                    expiration=10)
 1319         time = datetime.datetime.utcnow()
 1320         with freezegun.freeze_time(time) as frozen_datetime:
 1321             project_scoped_token = self._get_project_scoped_token()
 1322             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1323             self._validate_token(
 1324                 project_scoped_token,
 1325                 expected_status=http.client.NOT_FOUND
 1326             )
 1327 
 1328     def test_revoke_project_scoped_token(self):
 1329         project_scoped_token = self._get_project_scoped_token()
 1330         r = self._validate_token(project_scoped_token)
 1331         self.assertValidProjectScopedTokenResponse(r)
 1332         self._revoke_token(project_scoped_token)
 1333         self._validate_token(project_scoped_token,
 1334                              expected_status=http.client.NOT_FOUND)
 1335 
 1336     def test_project_scoped_token_is_scoped_to_default_project(self):
 1337         project = self._create_project_and_set_as_default_project()
 1338 
 1339         # attempt to authenticate without requesting a project
 1340         auth_data = self.build_authentication_request(
 1341             user_id=self.user['id'],
 1342             password=self.user['password'])
 1343         r = self.v3_create_token(auth_data)
 1344 
 1345         # ensure the project id in the token matches the default project id
 1346         self.assertValidProjectScopedTokenResponse(r)
 1347         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1348 
 1349     def test_project_scoped_token_no_catalog_is_scoped_to_default_project(
 1350             self):
 1351         project = self._create_project_and_set_as_default_project()
 1352 
 1353         # attempt to authenticate without requesting a project or catalog
 1354         auth_data = self.build_authentication_request(
 1355             user_id=self.user['id'],
 1356             password=self.user['password'])
 1357         r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
 1358 
 1359         # ensure the project id in the token matches the default project id
 1360         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1361         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1362 
 1363     def test_implicit_project_id_scoped_token_with_user_id_no_catalog(self):
 1364         self._create_project_and_set_as_default_project()
 1365 
 1366         # create a project scoped token that isn't scoped to the default
 1367         # project
 1368         auth_data = self.build_authentication_request(
 1369             user_id=self.user['id'],
 1370             password=self.user['password'],
 1371             project_id=self.project['id'])
 1372         r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
 1373 
 1374         # ensure the project id in the token matches the one we as for
 1375         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1376         self.assertEqual(self.project['id'],
 1377                          r.result['token']['project']['id'])
 1378 
 1379     def test_project_scoped_token_catalog_attributes(self):
 1380         auth_data = self.build_authentication_request(
 1381             user_id=self.user['id'],
 1382             password=self.user['password'],
 1383             project_id=self.project['id'])
 1384         r = self.v3_create_token(auth_data)
 1385 
 1386         catalog = r.result['token']['catalog']
 1387         self.assertEqual(1, len(catalog))
 1388         catalog = catalog[0]
 1389 
 1390         self.assertEqual(self.service['id'], catalog['id'])
 1391         self.assertEqual(self.service['name'], catalog['name'])
 1392         self.assertEqual(self.service['type'], catalog['type'])
 1393 
 1394         endpoint = catalog['endpoints']
 1395         self.assertEqual(1, len(endpoint))
 1396         endpoint = endpoint[0]
 1397 
 1398         self.assertEqual(self.endpoint['id'], endpoint['id'])
 1399         self.assertEqual(self.endpoint['interface'], endpoint['interface'])
 1400         self.assertEqual(self.endpoint['region_id'], endpoint['region_id'])
 1401         self.assertEqual(self.endpoint['url'], endpoint['url'])
 1402 
 1403     def test_project_scoped_token_catalog_excludes_disabled_endpoint(self):
 1404         # Create a disabled endpoint
 1405         disabled_endpoint_ref = copy.copy(self.endpoint)
 1406         disabled_endpoint_id = uuid.uuid4().hex
 1407         disabled_endpoint_ref.update({
 1408             'id': disabled_endpoint_id,
 1409             'enabled': False,
 1410             'interface': 'internal'
 1411         })
 1412         PROVIDERS.catalog_api.create_endpoint(
 1413             disabled_endpoint_id, disabled_endpoint_ref
 1414         )
 1415 
 1416         auth_data = self.build_authentication_request(
 1417             user_id=self.user['id'],
 1418             password=self.user['password'],
 1419             project_id=self.project['id'])
 1420         resp = self.v3_create_token(auth_data)
 1421 
 1422         # make sure the disabled endpoint id isn't in the list of endpoints
 1423         endpoints = resp.result['token']['catalog'][0]['endpoints']
 1424         endpoint_ids = [endpoint['id'] for endpoint in endpoints]
 1425         self.assertNotIn(disabled_endpoint_id, endpoint_ids)
 1426 
 1427     def test_project_scoped_token_catalog_excludes_disabled_service(self):
 1428         """On authenticate, get a catalog that excludes disabled services."""
 1429         # although the endpoint associated with the service is enabled, the
 1430         # service is disabled
 1431         self.assertTrue(self.endpoint['enabled'])
 1432         PROVIDERS.catalog_api.update_service(
 1433             self.endpoint['service_id'], {'enabled': False})
 1434         service = PROVIDERS.catalog_api.get_service(
 1435             self.endpoint['service_id']
 1436         )
 1437         self.assertFalse(service['enabled'])
 1438 
 1439         auth_data = self.build_authentication_request(
 1440             user_id=self.user['id'],
 1441             password=self.user['password'],
 1442             project_id=self.project['id'])
 1443         r = self.v3_create_token(auth_data)
 1444 
 1445         self.assertEqual([], r.result['token']['catalog'])
 1446 
 1447     def test_scope_to_project_without_grant_returns_unauthorized(self):
 1448         project = unit.new_project_ref(domain_id=self.domain_id)
 1449         PROVIDERS.resource_api.create_project(project['id'], project)
 1450 
 1451         auth_data = self.build_authentication_request(
 1452             user_id=self.user['id'],
 1453             password=self.user['password'],
 1454             project_id=project['id'])
 1455         self.v3_create_token(auth_data,
 1456                              expected_status=http.client.UNAUTHORIZED)
 1457 
 1458     def test_create_project_scoped_token_with_username_and_domain_id(self):
 1459         auth_data = self.build_authentication_request(
 1460             username=self.user['name'],
 1461             user_domain_id=self.domain['id'],
 1462             password=self.user['password'],
 1463             project_id=self.project['id'])
 1464         r = self.v3_create_token(auth_data)
 1465         self.assertValidProjectScopedTokenResponse(r)
 1466 
 1467     def test_create_project_scoped_token_with_username_and_domain_name(self):
 1468         auth_data = self.build_authentication_request(
 1469             username=self.user['name'],
 1470             user_domain_name=self.domain['name'],
 1471             password=self.user['password'],
 1472             project_id=self.project['id'])
 1473         r = self.v3_create_token(auth_data)
 1474         self.assertValidProjectScopedTokenResponse(r)
 1475 
 1476     def test_create_project_scoped_token_fails_if_project_name_unsafe(self):
 1477         """Verify authenticate to a project with unsafe name fails."""
 1478         # Start with url name restrictions off, so we can create the unsafe
 1479         # named project
 1480         self.config_fixture.config(group='resource',
 1481                                    project_name_url_safe='off')
 1482         unsafe_name = 'i am not / safe'
 1483         project = unit.new_project_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID,
 1484                                        name=unsafe_name)
 1485         PROVIDERS.resource_api.create_project(project['id'], project)
 1486         role_member = unit.new_role_ref()
 1487         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1488         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1489             self.user['id'], project['id'], role_member['id'])
 1490 
 1491         auth_data = self.build_authentication_request(
 1492             user_id=self.user['id'],
 1493             password=self.user['password'],
 1494             project_name=project['name'],
 1495             project_domain_id=test_v3.DEFAULT_DOMAIN_ID)
 1496 
 1497         # Since name url restriction is off, we should be able to authenticate
 1498         self.v3_create_token(auth_data)
 1499 
 1500         # Set the name url restriction to new, which should still allow us to
 1501         # authenticate
 1502         self.config_fixture.config(group='resource',
 1503                                    project_name_url_safe='new')
 1504         self.v3_create_token(auth_data)
 1505 
 1506         # Set the name url restriction to strict and we should fail to
 1507         # authenticate
 1508         self.config_fixture.config(group='resource',
 1509                                    project_name_url_safe='strict')
 1510         self.v3_create_token(auth_data,
 1511                              expected_status=http.client.UNAUTHORIZED)
 1512 
 1513     def test_create_project_scoped_token_fails_if_domain_name_unsafe(self):
 1514         """Verify authenticate to a project using unsafe domain name fails."""
 1515         # Start with url name restrictions off, so we can create the unsafe
 1516         # named domain
 1517         self.config_fixture.config(group='resource',
 1518                                    domain_name_url_safe='off')
 1519         unsafe_name = 'i am not / safe'
 1520         domain = unit.new_domain_ref(name=unsafe_name)
 1521         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1522         # Add a (safely named) project to that domain
 1523         project = unit.new_project_ref(domain_id=domain['id'])
 1524         PROVIDERS.resource_api.create_project(project['id'], project)
 1525         role_member = unit.new_role_ref()
 1526         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1527         PROVIDERS.assignment_api.create_grant(
 1528             role_member['id'],
 1529             user_id=self.user['id'],
 1530             project_id=project['id'])
 1531 
 1532         # An auth request via project ID, but specifying domain by name
 1533         auth_data = self.build_authentication_request(
 1534             user_id=self.user['id'],
 1535             password=self.user['password'],
 1536             project_name=project['name'],
 1537             project_domain_name=domain['name'])
 1538 
 1539         # Since name url restriction is off, we should be able to authenticate
 1540         self.v3_create_token(auth_data)
 1541 
 1542         # Set the name url restriction to new, which should still allow us to
 1543         # authenticate
 1544         self.config_fixture.config(group='resource',
 1545                                    project_name_url_safe='new')
 1546         self.v3_create_token(auth_data)
 1547 
 1548         # Set the name url restriction to strict and we should fail to
 1549         # authenticate
 1550         self.config_fixture.config(group='resource',
 1551                                    domain_name_url_safe='strict')
 1552         self.v3_create_token(auth_data,
 1553                              expected_status=http.client.UNAUTHORIZED)
 1554 
 1555     def test_create_project_token_with_same_domain_and_project_name(self):
 1556         """Authenticate to a project with the same name as its domain."""
 1557         domain = unit.new_project_ref(is_domain=True)
 1558         domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
 1559         project = unit.new_project_ref(domain_id=domain['id'],
 1560                                        name=domain['name'])
 1561         PROVIDERS.resource_api.create_project(project['id'], project)
 1562         role_member = unit.new_role_ref()
 1563         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1564         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1565             self.user['id'], project['id'], role_member['id'])
 1566 
 1567         auth_data = self.build_authentication_request(
 1568             user_id=self.user['id'],
 1569             password=self.user['password'],
 1570             project_name=project['name'],
 1571             project_domain_name=domain['name'])
 1572 
 1573         r = self.v3_create_token(auth_data)
 1574         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1575 
 1576     def test_create_project_token_fails_with_project_acting_as_domain(self):
 1577         domain = unit.new_project_ref(is_domain=True)
 1578         domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
 1579         role_member = unit.new_role_ref()
 1580         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1581         PROVIDERS.assignment_api.create_grant(
 1582             role_member['id'],
 1583             user_id=self.user['id'],
 1584             domain_id=domain['id'])
 1585 
 1586         # authentication will fail because the project name is incorrect
 1587         auth_data = self.build_authentication_request(
 1588             user_id=self.user['id'],
 1589             password=self.user['password'],
 1590             project_name=domain['name'],
 1591             project_domain_name=domain['name'])
 1592         self.v3_create_token(auth_data,
 1593                              expected_status=http.client.UNAUTHORIZED)
 1594 
 1595     def test_create_project_token_with_disabled_project_domain_fails(self):
 1596         # create a disabled domain
 1597         domain = unit.new_domain_ref()
 1598         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1599 
 1600         # create a project in the domain
 1601         project = unit.new_project_ref(domain_id=domain['id'])
 1602         PROVIDERS.resource_api.create_project(project['id'], project)
 1603 
 1604         # assign some role to self.user for the project in the domain
 1605         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1606             self.user['id'],
 1607             project['id'],
 1608             self.role_id)
 1609 
 1610         # Disable the domain
 1611         domain['enabled'] = False
 1612         PROVIDERS.resource_api.update_domain(domain['id'], domain)
 1613 
 1614         # user should not be able to auth with project_id
 1615         auth_data = self.build_authentication_request(
 1616             user_id=self.user['id'],
 1617             password=self.user['password'],
 1618             project_id=project['id'])
 1619         self.v3_create_token(auth_data,
 1620                              expected_status=http.client.UNAUTHORIZED)
 1621 
 1622         # user should not be able to auth with project_name & domain
 1623         auth_data = self.build_authentication_request(
 1624             user_id=self.user['id'],
 1625             password=self.user['password'],
 1626             project_name=project['name'],
 1627             project_domain_id=domain['id'])
 1628         self.v3_create_token(auth_data,
 1629                              expected_status=http.client.UNAUTHORIZED)
 1630 
 1631     def test_create_project_token_with_default_domain_as_project(self):
 1632         # Authenticate to a project with the default domain as project
 1633         auth_data = self.build_authentication_request(
 1634             user_id=self.user['id'],
 1635             password=self.user['password'],
 1636             project_id=test_v3.DEFAULT_DOMAIN_ID)
 1637         self.v3_create_token(auth_data,
 1638                              expected_status=http.client.UNAUTHORIZED)
 1639 
 1640     def test_project_scoped_token_is_invalid_after_disabling_user(self):
 1641         project_scoped_token = self._get_project_scoped_token()
 1642         # Make sure the token is valid
 1643         r = self._validate_token(project_scoped_token)
 1644         self.assertValidProjectScopedTokenResponse(r)
 1645         # Disable the user
 1646         self._set_user_enabled(self.user, enabled=False)
 1647         # Ensure validating a token for a disabled user fails
 1648         self._validate_token(
 1649             project_scoped_token,
 1650             expected_status=http.client.NOT_FOUND
 1651         )
 1652 
 1653     def test_project_scoped_token_invalid_after_changing_user_password(self):
 1654         project_scoped_token = self._get_project_scoped_token()
 1655         # Make sure the token is valid
 1656         r = self._validate_token(project_scoped_token)
 1657         self.assertValidProjectScopedTokenResponse(r)
 1658         # Update user's password
 1659         self.user['password'] = 'Password1'
 1660         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
 1661         # Ensure updating user's password revokes existing tokens
 1662         self._validate_token(
 1663             project_scoped_token,
 1664             expected_status=http.client.NOT_FOUND
 1665         )
 1666 
 1667     def test_project_scoped_token_invalid_after_disabling_project(self):
 1668         project_scoped_token = self._get_project_scoped_token()
 1669         # Make sure the token is valid
 1670         r = self._validate_token(project_scoped_token)
 1671         self.assertValidProjectScopedTokenResponse(r)
 1672         # Disable project
 1673         self.project['enabled'] = False
 1674         PROVIDERS.resource_api.update_project(self.project['id'], self.project)
 1675         # Ensure validating a token for a disabled project fails
 1676         self._validate_token(
 1677             project_scoped_token,
 1678             expected_status=http.client.NOT_FOUND
 1679         )
 1680 
 1681     def test_project_scoped_token_is_invalid_after_deleting_grant(self):
 1682         # disable caching so that user grant deletion is not hidden
 1683         # by token caching
 1684         self.config_fixture.config(
 1685             group='cache',
 1686             enabled=False)
 1687         # Grant user access to project
 1688         PROVIDERS.assignment_api.create_grant(
 1689             self.role['id'], user_id=self.user['id'],
 1690             project_id=self.project['id']
 1691         )
 1692         project_scoped_token = self._get_project_scoped_token()
 1693         # Make sure the token is valid
 1694         r = self._validate_token(project_scoped_token)
 1695         self.assertValidProjectScopedTokenResponse(r)
 1696         # Delete access to project
 1697         PROVIDERS.assignment_api.delete_grant(
 1698             self.role['id'], user_id=self.user['id'],
 1699             project_id=self.project['id']
 1700         )
 1701         # Ensure the token has been revoked
 1702         self._validate_token(
 1703             project_scoped_token,
 1704             expected_status=http.client.NOT_FOUND
 1705         )
 1706 
 1707     def test_no_access_to_default_project_result_in_unscoped_token(self):
 1708         # create a disabled project to work with
 1709         self.create_new_default_project_for_user(self.user['id'],
 1710                                                  self.domain_id)
 1711 
 1712         # attempt to authenticate without requesting a project
 1713         auth_data = self.build_authentication_request(
 1714             user_id=self.user['id'],
 1715             password=self.user['password'])
 1716         r = self.v3_create_token(auth_data)
 1717         self.assertValidUnscopedTokenResponse(r)
 1718 
 1719     def test_rescope_unscoped_token_with_trust(self):
 1720         trustee_user, trust = self._create_trust()
 1721         self._get_trust_scoped_token(trustee_user, trust)
 1722 
 1723     def test_validate_a_trust_scoped_token(self):
 1724         trustee_user, trust = self._create_trust()
 1725         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1726         # Validate a trust scoped token
 1727         r = self._validate_token(trust_scoped_token)
 1728         self.assertValidProjectScopedTokenResponse(r)
 1729 
 1730     def test_validate_expired_trust_scoped_token_returns_not_found(self):
 1731         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1732         # use the context manager of freezegun without sqlite issues.
 1733         self.config_fixture.config(group='token',
 1734                                    expiration=10)
 1735         time = datetime.datetime.utcnow()
 1736         with freezegun.freeze_time(time) as frozen_datetime:
 1737             trustee_user, trust = self._create_trust()
 1738             trust_scoped_token = self._get_trust_scoped_token(
 1739                 trustee_user, trust
 1740             )
 1741             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1742             self._validate_token(
 1743                 trust_scoped_token,
 1744                 expected_status=http.client.NOT_FOUND
 1745             )
 1746 
 1747     def test_validate_a_trust_scoped_token_impersonated(self):
 1748         trustee_user, trust = self._create_trust(impersonation=True)
 1749         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1750         # Validate a trust scoped token
 1751         r = self._validate_token(trust_scoped_token)
 1752         self.assertValidProjectScopedTokenResponse(r)
 1753 
 1754     def test_revoke_trust_scoped_token(self):
 1755         trustee_user, trust = self._create_trust()
 1756         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1757         # Validate a trust scoped token
 1758         r = self._validate_token(trust_scoped_token)
 1759         self.assertValidProjectScopedTokenResponse(r)
 1760         self._revoke_token(trust_scoped_token)
 1761         self._validate_token(trust_scoped_token,
 1762                              expected_status=http.client.NOT_FOUND)
 1763 
 1764     def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
 1765         trustee_user, trust = self._create_trust()
 1766         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1767         # Validate a trust scoped token
 1768         r = self._validate_token(trust_scoped_token)
 1769         self.assertValidProjectScopedTokenResponse(r)
 1770 
 1771         # Disable trustee
 1772         trustee_update_ref = dict(enabled=False)
 1773         PROVIDERS.identity_api.update_user(
 1774             trustee_user['id'], trustee_update_ref
 1775         )
 1776         # Ensure validating a token for a disabled user fails
 1777         self._validate_token(
 1778             trust_scoped_token,
 1779             expected_status=http.client.NOT_FOUND
 1780         )
 1781 
 1782     def test_trust_token_is_invalid_when_trustee_domain_disabled(self):
 1783         # create a new domain with new user in that domain
 1784         new_domain_ref = unit.new_domain_ref()
 1785         PROVIDERS.resource_api.create_domain(
 1786             new_domain_ref['id'], new_domain_ref
 1787         )
 1788 
 1789         trustee_ref = unit.create_user(PROVIDERS.identity_api,
 1790                                        domain_id=new_domain_ref['id'])
 1791 
 1792         new_project_ref = unit.new_project_ref(domain_id=self.domain_id)
 1793         PROVIDERS.resource_api.create_project(
 1794             new_project_ref['id'], new_project_ref
 1795         )
 1796 
 1797         # grant the trustor access to the new project
 1798         PROVIDERS.assignment_api.create_grant(
 1799             self.role['id'],
 1800             user_id=self.user_id,
 1801             project_id=new_project_ref['id'])
 1802 
 1803         trust_ref = unit.new_trust_ref(trustor_user_id=self.user_id,
 1804                                        trustee_user_id=trustee_ref['id'],
 1805                                        expires=dict(minutes=1),
 1806                                        project_id=new_project_ref['id'],
 1807                                        impersonation=True,
 1808                                        role_ids=[self.role['id']])
 1809 
 1810         resp = self.post('/OS-TRUST/trusts', body={'trust': trust_ref})
 1811         self.assertValidTrustResponse(resp, trust_ref)
 1812         trust_id = resp.json_body['trust']['id']
 1813 
 1814         # get a project-scoped token using the trust
 1815         trust_auth_data = self.build_authentication_request(
 1816             user_id=trustee_ref['id'],
 1817             password=trustee_ref['password'],
 1818             trust_id=trust_id)
 1819         trust_scoped_token = self.get_requested_token(trust_auth_data)
 1820 
 1821         # ensure the project-scoped token from the trust is valid
 1822         self._validate_token(trust_scoped_token)
 1823 
 1824         disable_body = {'domain': {'enabled': False}}
 1825         self.patch(
 1826             '/domains/%(domain_id)s' % {'domain_id': new_domain_ref['id']},
 1827             body=disable_body)
 1828 
 1829         # ensure the project-scoped token from the trust is invalid
 1830         self._validate_token(trust_scoped_token,
 1831                              expected_status=http.client.NOT_FOUND)
 1832 
 1833     def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
 1834         trustee_user, trust = self._create_trust()
 1835         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1836         # Validate a trust scoped token
 1837         r = self._validate_token(trust_scoped_token)
 1838         self.assertValidProjectScopedTokenResponse(r)
 1839         # Change trustee's password
 1840         trustee_update_ref = dict(password='Password1')
 1841         PROVIDERS.identity_api.update_user(
 1842             trustee_user['id'], trustee_update_ref
 1843         )
 1844         # Ensure updating trustee's password revokes existing tokens
 1845         self._validate_token(
 1846             trust_scoped_token,
 1847             expected_status=http.client.NOT_FOUND
 1848         )
 1849 
 1850     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 1851         trustee_user, trust = self._create_trust()
 1852         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1853         # Validate a trust scoped token
 1854         r = self._validate_token(trust_scoped_token)
 1855         self.assertValidProjectScopedTokenResponse(r)
 1856 
 1857         # Disable the trustor
 1858         trustor_update_ref = dict(enabled=False)
 1859         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1860         # Ensure validating a token for a disabled user fails
 1861         self._validate_token(
 1862             trust_scoped_token,
 1863             expected_status=http.client.NOT_FOUND
 1864         )
 1865 
 1866     def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
 1867         trustee_user, trust = self._create_trust()
 1868         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1869         # Validate a trust scoped token
 1870         r = self._validate_token(trust_scoped_token)
 1871         self.assertValidProjectScopedTokenResponse(r)
 1872 
 1873         # Change trustor's password
 1874         trustor_update_ref = dict(password='Password1')
 1875         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1876         # Ensure updating trustor's password revokes existing user's tokens
 1877         self._validate_token(
 1878             trust_scoped_token,
 1879             expected_status=http.client.NOT_FOUND
 1880         )
 1881 
 1882     def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
 1883         trustee_user, trust = self._create_trust()
 1884         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1885         # Validate a trust scoped token
 1886         r = self._validate_token(trust_scoped_token)
 1887         self.assertValidProjectScopedTokenResponse(r)
 1888 
 1889         # Disable trustor's domain
 1890         self.domain['enabled'] = False
 1891         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
 1892 
 1893         trustor_update_ref = dict(password='Password1')
 1894         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1895         # Ensure updating trustor's password revokes existing user's tokens
 1896         self._validate_token(
 1897             trust_scoped_token,
 1898             expected_status=http.client.NOT_FOUND
 1899         )
 1900 
 1901     def test_default_fixture_scope_token(self):
 1902         self.assertIsNotNone(self.get_scoped_token())
 1903 
 1904     def test_rescoping_token(self):
 1905         expires = self.v3_token_data['token']['expires_at']
 1906 
 1907         # rescope the token
 1908         r = self.v3_create_token(self.build_authentication_request(
 1909             token=self.v3_token,
 1910             project_id=self.project_id))
 1911         self.assertValidProjectScopedTokenResponse(r)
 1912 
 1913         # ensure token expiration stayed the same
 1914         self.assertTimestampEqual(expires, r.result['token']['expires_at'])
 1915 
 1916     def test_check_token(self):
 1917         self.head('/auth/tokens', headers=self.headers,
 1918                   expected_status=http.client.OK)
 1919 
 1920     def test_validate_token(self):
 1921         r = self.get('/auth/tokens', headers=self.headers)
 1922         self.assertValidUnscopedTokenResponse(r)
 1923 
 1924     def test_validate_missing_subject_token(self):
 1925         self.get('/auth/tokens',
 1926                  expected_status=http.client.NOT_FOUND)
 1927 
 1928     def test_validate_missing_auth_token(self):
 1929         self.admin_request(
 1930             method='GET',
 1931             path='/v3/projects',
 1932             token=None,
 1933             expected_status=http.client.UNAUTHORIZED)
 1934 
 1935     def test_validate_token_nocatalog(self):
 1936         v3_token = self.get_requested_token(self.build_authentication_request(
 1937             user_id=self.user['id'],
 1938             password=self.user['password'],
 1939             project_id=self.project['id']))
 1940         r = self.get(
 1941             '/auth/tokens?nocatalog',
 1942             headers={'X-Subject-Token': v3_token})
 1943         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1944 
 1945     def test_is_admin_token_by_ids(self):
 1946         self.config_fixture.config(
 1947             group='resource',
 1948             admin_project_domain_name=self.domain['name'],
 1949             admin_project_name=self.project['name'])
 1950         r = self.v3_create_token(self.build_authentication_request(
 1951             user_id=self.user['id'],
 1952             password=self.user['password'],
 1953             project_id=self.project['id']))
 1954         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1955         v3_token = r.headers.get('X-Subject-Token')
 1956         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1957         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1958 
 1959     def test_is_admin_token_by_names(self):
 1960         self.config_fixture.config(
 1961             group='resource',
 1962             admin_project_domain_name=self.domain['name'],
 1963             admin_project_name=self.project['name'])
 1964         r = self.v3_create_token(self.build_authentication_request(
 1965             user_id=self.user['id'],
 1966             password=self.user['password'],
 1967             project_domain_name=self.domain['name'],
 1968             project_name=self.project['name']))
 1969         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1970         v3_token = r.headers.get('X-Subject-Token')
 1971         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1972         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1973 
 1974     def test_token_for_non_admin_project_is_not_admin(self):
 1975         self.config_fixture.config(
 1976             group='resource',
 1977             admin_project_domain_name=self.domain['name'],
 1978             admin_project_name=uuid.uuid4().hex)
 1979         r = self.v3_create_token(self.build_authentication_request(
 1980             user_id=self.user['id'],
 1981             password=self.user['password'],
 1982             project_id=self.project['id']))
 1983         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 1984         v3_token = r.headers.get('X-Subject-Token')
 1985         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1986         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 1987 
 1988     def test_token_for_non_admin_domain_same_project_name_is_not_admin(self):
 1989         self.config_fixture.config(
 1990             group='resource',
 1991             admin_project_domain_name=uuid.uuid4().hex,
 1992             admin_project_name=self.project['name'])
 1993         r = self.v3_create_token(self.build_authentication_request(
 1994             user_id=self.user['id'],
 1995             password=self.user['password'],
 1996             project_id=self.project['id']))
 1997         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 1998         v3_token = r.headers.get('X-Subject-Token')
 1999         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2000         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 2001 
 2002     def test_only_admin_project_set_acts_as_non_admin(self):
 2003         self.config_fixture.config(
 2004             group='resource',
 2005             admin_project_name=self.project['name'])
 2006         r = self.v3_create_token(self.build_authentication_request(
 2007             user_id=self.user['id'],
 2008             password=self.user['password'],
 2009             project_id=self.project['id']))
 2010         self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
 2011         v3_token = r.headers.get('X-Subject-Token')
 2012         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2013         self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
 2014 
 2015     def _create_role(self, domain_id=None):
 2016         """Call ``POST /roles``."""
 2017         ref = unit.new_role_ref(domain_id=domain_id)
 2018         r = self.post('/roles', body={'role': ref})
 2019         return self.assertValidRoleResponse(r, ref)
 2020 
 2021     def _create_implied_role(self, prior_id):
 2022         implied = self._create_role()
 2023         url = '/roles/%s/implies/%s' % (prior_id, implied['id'])
 2024         self.put(url, expected_status=http.client.CREATED)
 2025         return implied
 2026 
 2027     def _delete_implied_role(self, prior_role_id, implied_role_id):
 2028         url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id)
 2029         self.delete(url)
 2030 
 2031     def _get_scoped_token_roles(self, is_domain=False):
 2032         if is_domain:
 2033             v3_token = self.get_domain_scoped_token()
 2034         else:
 2035             v3_token = self.get_scoped_token()
 2036 
 2037         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2038         v3_token_data = r.result
 2039         token_roles = v3_token_data['token']['roles']
 2040         return token_roles
 2041 
 2042     def _create_implied_role_shows_in_v3_token(self, is_domain):
 2043         token_roles = self._get_scoped_token_roles(is_domain)
 2044         self.assertEqual(1, len(token_roles))
 2045 
 2046         prior = token_roles[0]['id']
 2047         implied1 = self._create_implied_role(prior)
 2048 
 2049         token_roles = self._get_scoped_token_roles(is_domain)
 2050         self.assertEqual(2, len(token_roles))
 2051 
 2052         implied2 = self._create_implied_role(prior)
 2053         token_roles = self._get_scoped_token_roles(is_domain)
 2054         self.assertEqual(3, len(token_roles))
 2055 
 2056         token_role_ids = [role['id'] for role in token_roles]
 2057         self.assertIn(prior, token_role_ids)
 2058         self.assertIn(implied1['id'], token_role_ids)
 2059         self.assertIn(implied2['id'], token_role_ids)
 2060 
 2061     def test_create_implied_role_shows_in_v3_project_token(self):
 2062         # regardless of the default chosen, this should always
 2063         # test with the option set.
 2064         self.config_fixture.config(group='token')
 2065         self._create_implied_role_shows_in_v3_token(False)
 2066 
 2067     def test_create_implied_role_shows_in_v3_domain_token(self):
 2068         self.config_fixture.config(group='token')
 2069         PROVIDERS.assignment_api.create_grant(
 2070             self.role['id'], user_id=self.user['id'],
 2071             domain_id=self.domain['id']
 2072         )
 2073 
 2074         self._create_implied_role_shows_in_v3_token(True)
 2075 
 2076     def test_create_implied_role_shows_in_v3_system_token(self):
 2077         self.config_fixture.config(group='token')
 2078         PROVIDERS.assignment_api.create_system_grant_for_user(
 2079             self.user['id'], self.role['id']
 2080         )
 2081 
 2082         token_id = self.get_system_scoped_token()
 2083         r = self.get('/auth/tokens', headers={'X-Subject-Token': token_id})
 2084         token_roles = r.result['token']['roles']
 2085 
 2086         prior = token_roles[0]['id']
 2087         self._create_implied_role(prior)
 2088 
 2089         r = self.get('/auth/tokens', headers={'X-Subject-Token': token_id})
 2090         token_roles = r.result['token']['roles']
 2091         self.assertEqual(2, len(token_roles))
 2092 
 2093     def test_group_assigned_implied_role_shows_in_v3_token(self):
 2094         self.config_fixture.config(group='token')
 2095         is_domain = False
 2096         token_roles = self._get_scoped_token_roles(is_domain)
 2097         self.assertEqual(1, len(token_roles))
 2098 
 2099         new_role = self._create_role()
 2100         prior = new_role['id']
 2101 
 2102         new_group_ref = unit.new_group_ref(domain_id=self.domain['id'])
 2103         new_group = PROVIDERS.identity_api.create_group(new_group_ref)
 2104         PROVIDERS.assignment_api.create_grant(
 2105             prior, group_id=new_group['id'], project_id=self.project['id']
 2106         )
 2107 
 2108         token_roles = self._get_scoped_token_roles(is_domain)
 2109         self.assertEqual(1, len(token_roles))
 2110 
 2111         PROVIDERS.identity_api.add_user_to_group(
 2112             self.user['id'], new_group['id']
 2113         )
 2114 
 2115         token_roles = self._get_scoped_token_roles(is_domain)
 2116         self.assertEqual(2, len(token_roles))
 2117 
 2118         implied1 = self._create_implied_role(prior)
 2119 
 2120         token_roles = self._get_scoped_token_roles(is_domain)
 2121         self.assertEqual(3, len(token_roles))
 2122 
 2123         implied2 = self._create_implied_role(prior)
 2124         token_roles = self._get_scoped_token_roles(is_domain)
 2125         self.assertEqual(4, len(token_roles))
 2126 
 2127         token_role_ids = [role['id'] for role in token_roles]
 2128         self.assertIn(prior, token_role_ids)
 2129         self.assertIn(implied1['id'], token_role_ids)
 2130         self.assertIn(implied2['id'], token_role_ids)
 2131 
 2132     def test_multiple_implied_roles_show_in_v3_token(self):
 2133         self.config_fixture.config(group='token')
 2134         token_roles = self._get_scoped_token_roles()
 2135         self.assertEqual(1, len(token_roles))
 2136 
 2137         prior = token_roles[0]['id']
 2138         implied1 = self._create_implied_role(prior)
 2139         implied2 = self._create_implied_role(prior)
 2140         implied3 = self._create_implied_role(prior)
 2141 
 2142         token_roles = self._get_scoped_token_roles()
 2143         self.assertEqual(4, len(token_roles))
 2144 
 2145         token_role_ids = [role['id'] for role in token_roles]
 2146         self.assertIn(prior, token_role_ids)
 2147         self.assertIn(implied1['id'], token_role_ids)
 2148         self.assertIn(implied2['id'], token_role_ids)
 2149         self.assertIn(implied3['id'], token_role_ids)
 2150 
 2151     def test_chained_implied_role_shows_in_v3_token(self):
 2152         self.config_fixture.config(group='token')
 2153         token_roles = self._get_scoped_token_roles()
 2154         self.assertEqual(1, len(token_roles))
 2155 
 2156         prior = token_roles[0]['id']
 2157         implied1 = self._create_implied_role(prior)
 2158         implied2 = self._create_implied_role(implied1['id'])
 2159         implied3 = self._create_implied_role(implied2['id'])
 2160 
 2161         token_roles = self._get_scoped_token_roles()
 2162         self.assertEqual(4, len(token_roles))
 2163 
 2164         token_role_ids = [role['id'] for role in token_roles]
 2165 
 2166         self.assertIn(prior, token_role_ids)
 2167         self.assertIn(implied1['id'], token_role_ids)
 2168         self.assertIn(implied2['id'], token_role_ids)
 2169         self.assertIn(implied3['id'], token_role_ids)
 2170 
 2171     def test_implied_role_disabled_by_config(self):
 2172         self.config_fixture.config(group='token')
 2173         token_roles = self._get_scoped_token_roles()
 2174         self.assertEqual(1, len(token_roles))
 2175 
 2176         prior = token_roles[0]['id']
 2177         implied1 = self._create_implied_role(prior)
 2178         implied2 = self._create_implied_role(implied1['id'])
 2179         self._create_implied_role(implied2['id'])
 2180 
 2181         token_roles = self._get_scoped_token_roles()
 2182         self.assertEqual(4, len(token_roles))
 2183         token_role_ids = [role['id'] for role in token_roles]
 2184         self.assertIn(prior, token_role_ids)
 2185 
 2186     def test_delete_implied_role_do_not_show_in_v3_token(self):
 2187         self.config_fixture.config(group='token')
 2188         token_roles = self._get_scoped_token_roles()
 2189         prior = token_roles[0]['id']
 2190         implied = self._create_implied_role(prior)
 2191 
 2192         token_roles = self._get_scoped_token_roles()
 2193         self.assertEqual(2, len(token_roles))
 2194         self._delete_implied_role(prior, implied['id'])
 2195 
 2196         token_roles = self._get_scoped_token_roles()
 2197         self.assertEqual(1, len(token_roles))
 2198 
 2199     def test_unrelated_implied_roles_do_not_change_v3_token(self):
 2200         self.config_fixture.config(group='token')
 2201         token_roles = self._get_scoped_token_roles()
 2202         prior = token_roles[0]['id']
 2203         implied = self._create_implied_role(prior)
 2204 
 2205         token_roles = self._get_scoped_token_roles()
 2206         self.assertEqual(2, len(token_roles))
 2207 
 2208         unrelated = self._create_role()
 2209         url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id'])
 2210         self.put(url, expected_status=http.client.CREATED)
 2211 
 2212         token_roles = self._get_scoped_token_roles()
 2213         self.assertEqual(2, len(token_roles))
 2214 
 2215         self._delete_implied_role(unrelated['id'], implied['id'])
 2216         token_roles = self._get_scoped_token_roles()
 2217         self.assertEqual(2, len(token_roles))
 2218 
 2219     def test_domain_specific_roles_do_not_show_v3_token(self):
 2220         self.config_fixture.config(group='token')
 2221         initial_token_roles = self._get_scoped_token_roles()
 2222 
 2223         new_role = self._create_role(domain_id=self.domain_id)
 2224         PROVIDERS.assignment_api.create_grant(
 2225             new_role['id'], user_id=self.user['id'],
 2226             project_id=self.project['id']
 2227         )
 2228         implied = self._create_implied_role(new_role['id'])
 2229 
 2230         token_roles = self._get_scoped_token_roles()
 2231         self.assertEqual(len(initial_token_roles) + 1, len(token_roles))
 2232 
 2233         # The implied role from the domain specific role should be in the
 2234         # token, but not the domain specific role itself.
 2235         token_role_ids = [role['id'] for role in token_roles]
 2236         self.assertIn(implied['id'], token_role_ids)
 2237         self.assertNotIn(new_role['id'], token_role_ids)
 2238 
 2239     def test_remove_all_roles_from_scope_result_in_404(self):
 2240         # create a new user
 2241         new_user = unit.create_user(PROVIDERS.identity_api,
 2242                                     domain_id=self.domain['id'])
 2243 
 2244         # give the new user a role on a project
 2245         path = '/projects/%s/users/%s/roles/%s' % (
 2246             self.project['id'], new_user['id'], self.role['id'])
 2247         self.put(path=path)
 2248 
 2249         # authenticate as the new user and get a project-scoped token
 2250         auth_data = self.build_authentication_request(
 2251             user_id=new_user['id'],
 2252             password=new_user['password'],
 2253             project_id=self.project['id'])
 2254         subject_token_id = self.v3_create_token(auth_data).headers.get(
 2255             'X-Subject-Token')
 2256 
 2257         # make sure the project-scoped token is valid
 2258         headers = {'X-Subject-Token': subject_token_id}
 2259         r = self.get('/auth/tokens', headers=headers)
 2260         self.assertValidProjectScopedTokenResponse(r)
 2261 
 2262         # remove the roles from the user for the given scope
 2263         path = '/projects/%s/users/%s/roles/%s' % (
 2264             self.project['id'], new_user['id'], self.role['id'])
 2265         self.delete(path=path)
 2266 
 2267         # token validation should now result in 404
 2268         self.get('/auth/tokens', headers=headers,
 2269                  expected_status=http.client.NOT_FOUND)
 2270 
 2271     def test_create_token_with_nonexistant_user_id_fails(self):
 2272         auth_data = self.build_authentication_request(
 2273             user_id=uuid.uuid4().hex,
 2274             password=self.user['password'])
 2275         self.v3_create_token(auth_data,
 2276                              expected_status=http.client.UNAUTHORIZED)
 2277 
 2278     def test_create_token_with_nonexistant_username_fails(self):
 2279         auth_data = self.build_authentication_request(
 2280             username=uuid.uuid4().hex,
 2281             user_domain_id=self.domain['id'],
 2282             password=self.user['password'])
 2283         self.v3_create_token(auth_data,
 2284                              expected_status=http.client.UNAUTHORIZED)
 2285 
 2286     def test_create_token_with_nonexistant_domain_id_fails(self):
 2287         auth_data = self.build_authentication_request(
 2288             username=self.user['name'],
 2289             user_domain_id=uuid.uuid4().hex,
 2290             password=self.user['password'])
 2291         self.v3_create_token(auth_data,
 2292                              expected_status=http.client.UNAUTHORIZED)
 2293 
 2294     def test_create_token_with_nonexistant_domain_name_fails(self):
 2295         auth_data = self.build_authentication_request(
 2296             username=self.user['name'],
 2297             user_domain_name=uuid.uuid4().hex,
 2298             password=self.user['password'])
 2299         self.v3_create_token(auth_data,
 2300                              expected_status=http.client.UNAUTHORIZED)
 2301 
 2302     def test_create_token_with_wrong_password_fails(self):
 2303         auth_data = self.build_authentication_request(
 2304             user_id=self.user['id'],
 2305             password=uuid.uuid4().hex)
 2306         self.v3_create_token(auth_data,
 2307                              expected_status=http.client.UNAUTHORIZED)
 2308 
 2309     def test_user_and_group_roles_scoped_token(self):
 2310         """Test correct roles are returned in scoped token.
 2311 
 2312         Test Plan:
 2313 
 2314         - Create a domain, with 1 project, 2 users (user1 and user2)
 2315           and 2 groups (group1 and group2)
 2316         - Make user1 a member of group1, user2 a member of group2
 2317         - Create 8 roles, assigning them to each of the 8 combinations
 2318           of users/groups on domain/project
 2319         - Get a project scoped token for user1, checking that the right
 2320           two roles are returned (one directly assigned, one by virtue
 2321           of group membership)
 2322         - Repeat this for a domain scoped token
 2323         - Make user1 also a member of group2
 2324         - Get another scoped token making sure the additional role
 2325           shows up
 2326         - User2 is just here as a spoiler, to make sure we don't get
 2327           any roles uniquely assigned to it returned in any of our
 2328           tokens
 2329 
 2330         """
 2331         domainA = unit.new_domain_ref()
 2332         PROVIDERS.resource_api.create_domain(domainA['id'], domainA)
 2333         projectA = unit.new_project_ref(domain_id=domainA['id'])
 2334         PROVIDERS.resource_api.create_project(projectA['id'], projectA)
 2335 
 2336         user1 = unit.create_user(
 2337             PROVIDERS.identity_api, domain_id=domainA['id']
 2338         )
 2339 
 2340         user2 = unit.create_user(
 2341             PROVIDERS.identity_api, domain_id=domainA['id']
 2342         )
 2343 
 2344         group1 = unit.new_group_ref(domain_id=domainA['id'])
 2345         group1 = PROVIDERS.identity_api.create_group(group1)
 2346 
 2347         group2 = unit.new_group_ref(domain_id=domainA['id'])
 2348         group2 = PROVIDERS.identity_api.create_group(group2)
 2349 
 2350         PROVIDERS.identity_api.add_user_to_group(
 2351             user1['id'], group1['id']
 2352         )
 2353         PROVIDERS.identity_api.add_user_to_group(
 2354             user2['id'], group2['id']
 2355         )
 2356 
 2357         # Now create all the roles and assign them
 2358         role_list = []
 2359         for _ in range(8):
 2360             role = unit.new_role_ref()
 2361             PROVIDERS.role_api.create_role(role['id'], role)
 2362             role_list.append(role)
 2363 
 2364         PROVIDERS.assignment_api.create_grant(
 2365             role_list[0]['id'], user_id=user1['id'], domain_id=domainA['id']
 2366         )
 2367         PROVIDERS.assignment_api.create_grant(
 2368             role_list[1]['id'], user_id=user1['id'], project_id=projectA['id']
 2369         )
 2370         PROVIDERS.assignment_api.create_grant(
 2371             role_list[2]['id'], user_id=user2['id'], domain_id=domainA['id']
 2372         )
 2373         PROVIDERS.assignment_api.create_grant(
 2374             role_list[3]['id'], user_id=user2['id'], project_id=projectA['id']
 2375         )
 2376         PROVIDERS.assignment_api.create_grant(
 2377             role_list[4]['id'], group_id=group1['id'], domain_id=domainA['id']
 2378         )
 2379         PROVIDERS.assignment_api.create_grant(
 2380             role_list[5]['id'], group_id=group1['id'],
 2381             project_id=projectA['id']
 2382         )
 2383         PROVIDERS.assignment_api.create_grant(
 2384             role_list[6]['id'], group_id=group2['id'], domain_id=domainA['id']
 2385         )
 2386         PROVIDERS.assignment_api.create_grant(
 2387             role_list[7]['id'], group_id=group2['id'],
 2388             project_id=projectA['id']
 2389         )
 2390 
 2391         # First, get a project scoped token - which should
 2392         # contain the direct user role and the one by virtue
 2393         # of group membership
 2394         auth_data = self.build_authentication_request(
 2395             user_id=user1['id'],
 2396             password=user1['password'],
 2397             project_id=projectA['id'])
 2398         r = self.v3_create_token(auth_data)
 2399         token = self.assertValidScopedTokenResponse(r)
 2400         roles_ids = []
 2401         for ref in token['roles']:
 2402             roles_ids.append(ref['id'])
 2403         self.assertEqual(2, len(token['roles']))
 2404         self.assertIn(role_list[1]['id'], roles_ids)
 2405         self.assertIn(role_list[5]['id'], roles_ids)
 2406 
 2407         # Now the same thing for a domain scoped token
 2408         auth_data = self.build_authentication_request(
 2409             user_id=user1['id'],
 2410             password=user1['password'],
 2411             domain_id=domainA['id'])
 2412         r = self.v3_create_token(auth_data)
 2413         token = self.assertValidScopedTokenResponse(r)
 2414         roles_ids = []
 2415         for ref in token['roles']:
 2416             roles_ids.append(ref['id'])
 2417         self.assertEqual(2, len(token['roles']))
 2418         self.assertIn(role_list[0]['id'], roles_ids)
 2419         self.assertIn(role_list[4]['id'], roles_ids)
 2420 
 2421         # Finally, add user1 to the 2nd group, and get a new
 2422         # scoped token - the extra role should now be included
 2423         # by virtue of the 2nd group
 2424         PROVIDERS.identity_api.add_user_to_group(
 2425             user1['id'], group2['id']
 2426         )
 2427         auth_data = self.build_authentication_request(
 2428             user_id=user1['id'],
 2429             password=user1['password'],
 2430             project_id=projectA['id'])
 2431         r = self.v3_create_token(auth_data)
 2432         token = self.assertValidScopedTokenResponse(r)
 2433         roles_ids = []
 2434         for ref in token['roles']:
 2435             roles_ids.append(ref['id'])
 2436         self.assertEqual(3, len(token['roles']))
 2437         self.assertIn(role_list[1]['id'], roles_ids)
 2438         self.assertIn(role_list[5]['id'], roles_ids)
 2439         self.assertIn(role_list[7]['id'], roles_ids)
 2440 
 2441     def test_auth_token_cross_domain_group_and_project(self):
 2442         """Verify getting a token in cross domain group/project roles."""
 2443         # create domain, project and group and grant roles to user
 2444         domain1 = unit.new_domain_ref()
 2445         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
 2446         project1 = unit.new_project_ref(domain_id=domain1['id'])
 2447         PROVIDERS.resource_api.create_project(project1['id'], project1)
 2448         user_foo = unit.create_user(PROVIDERS.identity_api,
 2449                                     domain_id=test_v3.DEFAULT_DOMAIN_ID)
 2450         role_member = unit.new_role_ref()
 2451         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 2452         role_admin = unit.new_role_ref()
 2453         PROVIDERS.role_api.create_role(role_admin['id'], role_admin)
 2454         role_foo_domain1 = unit.new_role_ref()
 2455         PROVIDERS.role_api.create_role(
 2456             role_foo_domain1['id'], role_foo_domain1
 2457         )
 2458         role_group_domain1 = unit.new_role_ref()
 2459         PROVIDERS.role_api.create_role(
 2460             role_group_domain1['id'], role_group_domain1
 2461         )
 2462         new_group = unit.new_group_ref(domain_id=domain1['id'])
 2463         new_group = PROVIDERS.identity_api.create_group(new_group)
 2464         PROVIDERS.identity_api.add_user_to_group(
 2465             user_foo['id'], new_group['id']
 2466         )
 2467         PROVIDERS.assignment_api.create_grant(
 2468             user_id=user_foo['id'],
 2469             project_id=project1['id'],
 2470             role_id=role_member['id'])
 2471         PROVIDERS.assignment_api.create_grant(
 2472             group_id=new_group['id'],
 2473             project_id=project1['id'],
 2474             role_id=role_admin['id'])
 2475         PROVIDERS.assignment_api.create_grant(
 2476             user_id=user_foo['id'],
 2477             domain_id=domain1['id'],
 2478             role_id=role_foo_domain1['id'])
 2479         PROVIDERS.assignment_api.create_grant(
 2480             group_id=new_group['id'],
 2481             domain_id=domain1['id'],
 2482             role_id=role_group_domain1['id'])
 2483 
 2484         # Get a scoped token for the project
 2485         auth_data = self.build_authentication_request(
 2486             username=user_foo['name'],
 2487             user_domain_id=test_v3.DEFAULT_DOMAIN_ID,
 2488             password=user_foo['password'],
 2489             project_name=project1['name'],
 2490             project_domain_id=domain1['id'])
 2491 
 2492         r = self.v3_create_token(auth_data)
 2493         scoped_token = self.assertValidScopedTokenResponse(r)
 2494         project = scoped_token["project"]
 2495         roles_ids = []
 2496         for ref in scoped_token['roles']:
 2497             roles_ids.append(ref['id'])
 2498         self.assertEqual(project1['id'], project["id"])
 2499         self.assertIn(role_member['id'], roles_ids)
 2500         self.assertIn(role_admin['id'], roles_ids)
 2501         self.assertNotIn(role_foo_domain1['id'], roles_ids)
 2502         self.assertNotIn(role_group_domain1['id'], roles_ids)
 2503 
 2504     def test_remote_user_no_realm(self):
 2505         app = self.loadapp()
 2506 
 2507         auth_contexts = []
 2508 
 2509         # NOTE(morgan): This __init__ is used to inject the auth context into
 2510         # the auth_contexts list so that we can perform introspection. This way
 2511         # we do not need to try and mock out anything deep within keystone's
 2512         # auth pipeline. Note that we are using MockPatch to ensure we undo
 2513         # the mock after the fact.
 2514         def new_init(self, *args, **kwargs):
 2515             super(auth.core.AuthContext, self).__init__(*args, **kwargs)
 2516             auth_contexts.append(self)
 2517 
 2518         self.useFixture(fixtures.MockPatch(
 2519             'keystone.auth.core.AuthContext.__init__', new_init))
 2520         with app.test_client() as c:
 2521             c.environ_base.update(self.build_external_auth_environ(
 2522                 self.default_domain_user['name']))
 2523             auth_req = self.build_authentication_request()
 2524             c.post('/v3/auth/tokens', json=auth_req)
 2525             self.assertEqual(self.default_domain_user['id'],
 2526                              auth_contexts[-1]['user_id'])
 2527 
 2528         # Now test to make sure the user name can, itself, contain the
 2529         # '@' character.
 2530         user = {'name': 'myname@mydivision'}
 2531         PROVIDERS.identity_api.update_user(
 2532             self.default_domain_user['id'], user
 2533         )
 2534         with app.test_client() as c:
 2535             c.environ_base.update(self.build_external_auth_environ(
 2536                 user['name']))
 2537             auth_req = self.build_authentication_request()
 2538             c.post('/v3/auth/tokens', json=auth_req)
 2539             self.assertEqual(self.default_domain_user['id'],
 2540                              auth_contexts[-1]['user_id'])
 2541         self.assertEqual(self.default_domain_user['id'],
 2542                          auth_contexts[-1]['user_id'])
 2543 
 2544     def test_remote_user_no_domain(self):
 2545         app = self.loadapp()
 2546         with app.test_client() as c:
 2547             c.environ_base.update(self.build_external_auth_environ(
 2548                 self.user['name']))
 2549             auth_request = self.build_authentication_request()
 2550             c.post('/v3/auth/tokens', json=auth_request,
 2551                    expected_status_code=http.client.UNAUTHORIZED)
 2552 
 2553     def test_remote_user_and_password(self):
 2554         # both REMOTE_USER and password methods must pass.
 2555         # note that they do not have to match
 2556         app = self.loadapp()
 2557         with app.test_client() as c:
 2558             auth_data = self.build_authentication_request(
 2559                 user_domain_id=self.default_domain_user['domain_id'],
 2560                 username=self.default_domain_user['name'],
 2561                 password=self.default_domain_user['password'])
 2562             c.post('/v3/auth/tokens', json=auth_data)
 2563 
 2564     def test_remote_user_and_explicit_external(self):
 2565         # both REMOTE_USER and password methods must pass.
 2566         # note that they do not have to match
 2567         auth_data = self.build_authentication_request(
 2568             user_domain_id=self.domain['id'],
 2569             username=self.user['name'],
 2570             password=self.user['password'])
 2571         auth_data['auth']['identity']['methods'] = ["password", "external"]
 2572         auth_data['auth']['identity']['external'] = {}
 2573         app = self.loadapp()
 2574         with app.test_client() as c:
 2575             c.post('/v3/auth/tokens', json=auth_data,
 2576                    expected_status_code=http.client.UNAUTHORIZED)
 2577 
 2578     def test_remote_user_bad_password(self):
 2579         # both REMOTE_USER and password methods must pass.
 2580         app = self.loadapp()
 2581         auth_data = self.build_authentication_request(
 2582             user_domain_id=self.domain['id'],
 2583             username=self.user['name'],
 2584             password='badpassword')
 2585         with app.test_client() as c:
 2586             c.post('/v3/auth/tokens', json=auth_data,
 2587                    expected_status_code=http.client.UNAUTHORIZED)
 2588 
 2589     def test_fetch_expired_allow_expired(self):
 2590         self.config_fixture.config(group='token',
 2591                                    expiration=10,
 2592                                    allow_expired_window=20)
 2593         time = datetime.datetime.utcnow()
 2594         with freezegun.freeze_time(time) as frozen_datetime:
 2595             token = self._get_project_scoped_token()
 2596 
 2597             # initially it validates because it's within time
 2598             frozen_datetime.tick(delta=datetime.timedelta(seconds=2))
 2599             self._validate_token(token)
 2600 
 2601             # after passing expiry time validation fails
 2602             frozen_datetime.tick(delta=datetime.timedelta(seconds=12))
 2603             self._validate_token(token, expected_status=http.client.NOT_FOUND)
 2604 
 2605             # but if we pass allow_expired it validates
 2606             self._validate_token(token, allow_expired=True)
 2607 
 2608             # and then if we're passed the allow_expired_window it will fail
 2609             # anyway raises expired when now > expiration + window
 2610             frozen_datetime.tick(delta=datetime.timedelta(seconds=22))
 2611             self._validate_token(token,
 2612                                  allow_expired=True,
 2613                                  expected_status=http.client.NOT_FOUND)
 2614 
 2615     def test_system_scoped_token_works_with_domain_specific_drivers(self):
 2616         self.config_fixture.config(
 2617             group='identity', domain_specific_drivers_enabled=True
 2618         )
 2619 
 2620         PROVIDERS.assignment_api.create_system_grant_for_user(
 2621             self.user['id'], self.role['id']
 2622         )
 2623 
 2624         token_id = self.get_system_scoped_token()
 2625         headers = {'X-Auth-Token': token_id}
 2626 
 2627         app = self.loadapp()
 2628         with app.test_client() as c:
 2629             c.get('/v3/users', headers=headers)
 2630 
 2631 
 2632 class TokenDataTests(object):
 2633     """Test the data in specific token types."""
 2634 
 2635     def test_unscoped_token_format(self):
 2636         # ensure the unscoped token response contains the appropriate data
 2637         r = self.get('/auth/tokens', headers=self.headers)
 2638         self.assertValidUnscopedTokenResponse(r)
 2639 
 2640     def test_domain_scoped_token_format(self):
 2641         # ensure the domain scoped token response contains the appropriate data
 2642         PROVIDERS.assignment_api.create_grant(
 2643             self.role['id'],
 2644             user_id=self.default_domain_user['id'],
 2645             domain_id=self.domain['id'])
 2646 
 2647         domain_scoped_token = self.get_requested_token(
 2648             self.build_authentication_request(
 2649                 user_id=self.default_domain_user['id'],
 2650                 password=self.default_domain_user['password'],
 2651                 domain_id=self.domain['id'])
 2652         )
 2653         self.headers['X-Subject-Token'] = domain_scoped_token
 2654         r = self.get('/auth/tokens', headers=self.headers)
 2655         self.assertValidDomainScopedTokenResponse(r)
 2656 
 2657     def test_project_scoped_token_format(self):
 2658         # ensure project scoped token responses contains the appropriate data
 2659         project_scoped_token = self.get_requested_token(
 2660             self.build_authentication_request(
 2661                 user_id=self.default_domain_user['id'],
 2662                 password=self.default_domain_user['password'],
 2663                 project_id=self.default_domain_project['id'])
 2664         )
 2665         self.headers['X-Subject-Token'] = project_scoped_token
 2666         r = self.get('/auth/tokens', headers=self.headers)
 2667         self.assertValidProjectScopedTokenResponse(r)
 2668 
 2669     def test_extra_data_in_unscoped_token_fails_validation(self):
 2670         # ensure unscoped token response contains the appropriate data
 2671         r = self.get('/auth/tokens', headers=self.headers)
 2672 
 2673         # populate the response result with some extra data
 2674         r.result['token'][u'extra'] = str(uuid.uuid4().hex)
 2675         self.assertRaises(exception.SchemaValidationError,
 2676                           self.assertValidUnscopedTokenResponse,
 2677                           r)
 2678 
 2679     def test_extra_data_in_domain_scoped_token_fails_validation(self):
 2680         # ensure domain scoped token response contains the appropriate data
 2681         PROVIDERS.assignment_api.create_grant(
 2682             self.role['id'],
 2683             user_id=self.default_domain_user['id'],
 2684             domain_id=self.domain['id'])
 2685 
 2686         domain_scoped_token = self.get_requested_token(
 2687             self.build_authentication_request(
 2688                 user_id=self.default_domain_user['id'],
 2689                 password=self.default_domain_user['password'],
 2690                 domain_id=self.domain['id'])
 2691         )
 2692         self.headers['X-Subject-Token'] = domain_scoped_token
 2693         r = self.get('/auth/tokens', headers=self.headers)
 2694 
 2695         # populate the response result with some extra data
 2696         r.result['token'][u'extra'] = str(uuid.uuid4().hex)
 2697         self.assertRaises(exception.SchemaValidationError,
 2698                           self.assertValidDomainScopedTokenResponse,
 2699                           r)
 2700 
 2701     def test_extra_data_in_project_scoped_token_fails_validation(self):
 2702         # ensure project scoped token responses contains the appropriate data
 2703         project_scoped_token = self.get_requested_token(
 2704             self.build_authentication_request(
 2705                 user_id=self.default_domain_user['id'],
 2706                 password=self.default_domain_user['password'],
 2707                 project_id=self.default_domain_project['id'])
 2708         )
 2709         self.headers['X-Subject-Token'] = project_scoped_token
 2710         resp = self.get('/auth/tokens', headers=self.headers)
 2711 
 2712         # populate the response result with some extra data
 2713         resp.result['token'][u'extra'] = str(uuid.uuid4().hex)
 2714         self.assertRaises(exception.SchemaValidationError,
 2715                           self.assertValidProjectScopedTokenResponse,
 2716                           resp)
 2717 
 2718 
 2719 class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
 2720     def config_overrides(self):
 2721         super(AllowRescopeScopedTokenDisabledTests, self).config_overrides()
 2722         self.config_fixture.config(
 2723             group='token',
 2724             allow_rescope_scoped_token=False)
 2725 
 2726     def test_rescoping_v3_to_v3_disabled(self):
 2727         self.v3_create_token(
 2728             self.build_authentication_request(
 2729                 token=self.get_scoped_token(),
 2730                 project_id=self.project_id),
 2731             expected_status=http.client.FORBIDDEN)
 2732 
 2733     def test_rescoped_domain_token_disabled(self):
 2734 
 2735         self.domainA = unit.new_domain_ref()
 2736         PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
 2737         PROVIDERS.assignment_api.create_grant(
 2738             self.role['id'], user_id=self.user['id'],
 2739             domain_id=self.domainA['id']
 2740         )
 2741         unscoped_token = self.get_requested_token(
 2742             self.build_authentication_request(
 2743                 user_id=self.user['id'],
 2744                 password=self.user['password']))
 2745         # Get a domain-scoped token from the unscoped token
 2746         domain_scoped_token = self.get_requested_token(
 2747             self.build_authentication_request(
 2748                 token=unscoped_token,
 2749                 domain_id=self.domainA['id']))
 2750         self.v3_create_token(
 2751             self.build_authentication_request(
 2752                 token=domain_scoped_token,
 2753                 project_id=self.project_id),
 2754             expected_status=http.client.FORBIDDEN)
 2755 
 2756 
 2757 class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests,
 2758                           TokenDataTests):
 2759     def config_overrides(self):
 2760         super(TestFernetTokenAPIs, self).config_overrides()
 2761         self.config_fixture.config(group='token', provider='fernet',
 2762                                    cache_on_issue=True)
 2763         self.useFixture(
 2764             ksfixtures.KeyRepository(
 2765                 self.config_fixture,
 2766                 'fernet_tokens',
 2767                 CONF.fernet_tokens.max_active_keys
 2768             )
 2769         )
 2770 
 2771     def setUp(self):
 2772         super(TestFernetTokenAPIs, self).setUp()
 2773         self.doSetUp()
 2774 
 2775     def _make_auth_request(self, auth_data):
 2776         token = super(TestFernetTokenAPIs, self)._make_auth_request(auth_data)
 2777         self.assertLess(len(token), 255)
 2778         return token
 2779 
 2780     def test_validate_tampered_unscoped_token_fails(self):
 2781         unscoped_token = self._get_unscoped_token()
 2782         tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
 2783                           unscoped_token[50 + 32:])
 2784         self._validate_token(tampered_token,
 2785                              expected_status=http.client.NOT_FOUND)
 2786 
 2787     def test_validate_tampered_project_scoped_token_fails(self):
 2788         project_scoped_token = self._get_project_scoped_token()
 2789         tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
 2790                           project_scoped_token[50 + 32:])
 2791         self._validate_token(tampered_token,
 2792                              expected_status=http.client.NOT_FOUND)
 2793 
 2794     def test_validate_tampered_trust_scoped_token_fails(self):
 2795         trustee_user, trust = self._create_trust()
 2796         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2797         # Get a trust scoped token
 2798         tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
 2799                           trust_scoped_token[50 + 32:])
 2800         self._validate_token(tampered_token,
 2801                              expected_status=http.client.NOT_FOUND)
 2802 
 2803     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 2804         # NOTE(amakarov): have to override this test for non-persistent tokens
 2805         # as TokenNotFound exception makes no sense for those.
 2806         trustee_user, trust = self._create_trust()
 2807         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2808         # Validate a trust scoped token
 2809         r = self._validate_token(trust_scoped_token)
 2810         self.assertValidProjectScopedTokenResponse(r)
 2811 
 2812         # Disable the trustor
 2813         trustor_update_ref = dict(enabled=False)
 2814         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 2815         # Ensure validating a token for a disabled user fails
 2816         self._validate_token(
 2817             trust_scoped_token,
 2818             expected_status=http.client.FORBIDDEN
 2819         )
 2820 
 2821 
 2822 class TestJWSTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, TokenDataTests):
 2823     def config_overrides(self):
 2824         super(TestJWSTokenAPIs, self).config_overrides()
 2825         self.config_fixture.config(group='token', provider='jws',
 2826                                    cache_on_issue=True)
 2827         self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture))
 2828 
 2829     def setUp(self):
 2830         super(TestJWSTokenAPIs, self).setUp()
 2831         self.doSetUp()
 2832 
 2833     def _make_auth_request(self, auth_data):
 2834         token = super(TestJWSTokenAPIs, self)._make_auth_request(auth_data)
 2835         self.assertLess(len(token), 350)
 2836         return token
 2837 
 2838     def test_validate_tampered_unscoped_token_fails(self):
 2839         unscoped_token = self._get_unscoped_token()
 2840         tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
 2841                           unscoped_token[50 + 32:])
 2842         self._validate_token(tampered_token,
 2843                              expected_status=http.client.NOT_FOUND)
 2844 
 2845     def test_validate_tampered_project_scoped_token_fails(self):
 2846         project_scoped_token = self._get_project_scoped_token()
 2847         tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
 2848                           project_scoped_token[50 + 32:])
 2849         self._validate_token(tampered_token,
 2850                              expected_status=http.client.NOT_FOUND)
 2851 
 2852     def test_validate_tampered_trust_scoped_token_fails(self):
 2853         trustee_user, trust = self._create_trust()
 2854         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2855         # Get a trust scoped token
 2856         tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
 2857                           trust_scoped_token[50 + 32:])
 2858         self._validate_token(tampered_token,
 2859                              expected_status=http.client.NOT_FOUND)
 2860 
 2861     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 2862         # NOTE(amakarov): have to override this test for non-persistent tokens
 2863         # as TokenNotFound exception makes no sense for those.
 2864         trustee_user, trust = self._create_trust()
 2865         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2866         # Validate a trust scoped token
 2867         r = self._validate_token(trust_scoped_token)
 2868         self.assertValidProjectScopedTokenResponse(r)
 2869 
 2870         # Disable the trustor
 2871         trustor_update_ref = dict(enabled=False)
 2872         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 2873         # Ensure validating a token for a disabled user fails
 2874         self._validate_token(
 2875             trust_scoped_token,
 2876             expected_status=http.client.FORBIDDEN
 2877         )
 2878 
 2879 
 2880 class TestTokenRevokeById(test_v3.RestfulTestCase):
 2881     """Test token revocation on the v3 Identity API."""
 2882 
 2883     def config_overrides(self):
 2884         super(TestTokenRevokeById, self).config_overrides()
 2885         self.config_fixture.config(
 2886             group='token',
 2887             provider='fernet',
 2888             revoke_by_id=False)
 2889         self.useFixture(
 2890             ksfixtures.KeyRepository(
 2891                 self.config_fixture,
 2892                 'fernet_tokens',
 2893                 CONF.fernet_tokens.max_active_keys
 2894             )
 2895         )
 2896 
 2897     def setUp(self):
 2898         """Setup for Token Revoking Test Cases.
 2899 
 2900         As well as the usual housekeeping, create a set of domains,
 2901         users, groups, roles and projects for the subsequent tests:
 2902 
 2903         - Two domains: A & B
 2904         - Three users (1, 2 and 3)
 2905         - Three groups (1, 2 and 3)
 2906         - Two roles (1 and 2)
 2907         - DomainA owns user1, domainB owns user2 and user3
 2908         - DomainA owns group1 and group2, domainB owns group3
 2909         - User1 and user2 are members of group1
 2910         - User3 is a member of group2
 2911         - Two projects: A & B, both in domainA
 2912         - Group1 has role1 on Project A and B, meaning that user1 and user2
 2913           will get these roles by virtue of membership
 2914         - User1, 2 and 3 have role1 assigned to projectA
 2915         - Group1 has role1 on Project A and B, meaning that user1 and user2
 2916           will get role1 (duplicated) by virtue of membership
 2917         - User1 has role2 assigned to domainA
 2918 
 2919         """
 2920         super(TestTokenRevokeById, self).setUp()
 2921 
 2922         # Start by creating a couple of domains and projects
 2923         self.domainA = unit.new_domain_ref()
 2924         PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
 2925         self.domainB = unit.new_domain_ref()
 2926         PROVIDERS.resource_api.create_domain(self.domainB['id'], self.domainB)
 2927         self.projectA = unit.new_project_ref(domain_id=self.domainA['id'])
 2928         PROVIDERS.resource_api.create_project(
 2929             self.projectA['id'], self.projectA
 2930         )
 2931         self.projectB = unit.new_project_ref(domain_id=self.domainA['id'])
 2932         PROVIDERS.resource_api.create_project(
 2933             self.projectB['id'], self.projectB
 2934         )
 2935 
 2936         # Now create some users
 2937         self.user1 = unit.create_user(PROVIDERS.identity_api,
 2938                                       domain_id=self.domainA['id'])
 2939 
 2940         self.user2 = unit.create_user(PROVIDERS.identity_api,
 2941                                       domain_id=self.domainB['id'])
 2942 
 2943         self.user3 = unit.create_user(PROVIDERS.identity_api,
 2944                                       domain_id=self.domainB['id'])
 2945 
 2946         self.group1 = unit.new_group_ref(domain_id=self.domainA['id'])
 2947         self.group1 = PROVIDERS.identity_api.create_group(self.group1)
 2948 
 2949         self.group2 = unit.new_group_ref(domain_id=self.domainA['id'])
 2950         self.group2 = PROVIDERS.identity_api.create_group(self.group2)
 2951 
 2952         self.group3 = unit.new_group_ref(domain_id=self.domainB['id'])
 2953         self.group3 = PROVIDERS.identity_api.create_group(self.group3)
 2954 
 2955         PROVIDERS.identity_api.add_user_to_group(
 2956             self.user1['id'], self.group1['id']
 2957         )
 2958         PROVIDERS.identity_api.add_user_to_group(
 2959             self.user2['id'], self.group1['id']
 2960         )
 2961         PROVIDERS.identity_api.add_user_to_group(
 2962             self.user3['id'], self.group2['id']
 2963         )
 2964 
 2965         self.role1 = unit.new_role_ref()
 2966         PROVIDERS.role_api.create_role(self.role1['id'], self.role1)
 2967         self.role2 = unit.new_role_ref()
 2968         PROVIDERS.role_api.create_role(self.role2['id'], self.role2)
 2969 
 2970         PROVIDERS.assignment_api.create_grant(
 2971             self.role2['id'], user_id=self.user1['id'],
 2972             domain_id=self.domainA['id']
 2973         )
 2974         PROVIDERS.assignment_api.create_grant(
 2975             self.role1['id'], user_id=self.user1['id'],
 2976             project_id=self.projectA['id']
 2977         )
 2978         PROVIDERS.assignment_api.create_grant(
 2979             self.role1['id'], user_id=self.user2['id'],
 2980             project_id=self.projectA['id']
 2981         )
 2982         PROVIDERS.assignment_api.create_grant(
 2983             self.role1['id'], user_id=self.user3['id'],
 2984             project_id=self.projectA['id']
 2985         )
 2986         PROVIDERS.assignment_api.create_grant(
 2987             self.role1['id'], group_id=self.group1['id'],
 2988             project_id=self.projectA['id']
 2989         )
 2990 
 2991     def test_unscoped_token_remains_valid_after_role_assignment(self):
 2992         unscoped_token = self.get_requested_token(
 2993             self.build_authentication_request(
 2994                 user_id=self.user1['id'],
 2995                 password=self.user1['password']))
 2996 
 2997         scoped_token = self.get_requested_token(
 2998             self.build_authentication_request(
 2999                 token=unscoped_token,
 3000                 project_id=self.projectA['id']))
 3001 
 3002         # confirm both tokens are valid
 3003         self.head('/auth/tokens',
 3004                   headers={'X-Subject-Token': unscoped_token},
 3005                   expected_status=http.client.OK)
 3006         self.head('/auth/tokens',
 3007                   headers={'X-Subject-Token': scoped_token},
 3008                   expected_status=http.client.OK)
 3009 
 3010         # create a new role
 3011         role = unit.new_role_ref()
 3012         PROVIDERS.role_api.create_role(role['id'], role)
 3013 
 3014         # assign a new role
 3015         self.put(
 3016             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
 3017                 'project_id': self.projectA['id'],
 3018                 'user_id': self.user1['id'],
 3019                 'role_id': role['id']})
 3020 
 3021         # both tokens should remain valid
 3022         self.head('/auth/tokens',
 3023                   headers={'X-Subject-Token': unscoped_token},
 3024                   expected_status=http.client.OK)
 3025         self.head('/auth/tokens',
 3026                   headers={'X-Subject-Token': scoped_token},
 3027                   expected_status=http.client.OK)
 3028 
 3029     def test_deleting_user_grant_revokes_token(self):
 3030         """Test deleting a user grant revokes token.
 3031 
 3032         Test Plan:
 3033 
 3034         - Get a token for user, scoped to Project
 3035         - Delete the grant user has on Project
 3036         - Check token is no longer valid
 3037 
 3038         """
 3039         auth_data = self.build_authentication_request(
 3040             user_id=self.user['id'],
 3041             password=self.user['password'],
 3042             project_id=self.project['id'])
 3043         token = self.get_requested_token(auth_data)
 3044         # Confirm token is valid
 3045         self.head('/auth/tokens',
 3046                   headers={'X-Subject-Token': token},
 3047                   expected_status=http.client.OK)
 3048         # Delete the grant, which should invalidate the token
 3049         grant_url = (
 3050             '/projects/%(project_id)s/users/%(user_id)s/'
 3051             'roles/%(role_id)s' % {
 3052                 'project_id': self.project['id'],
 3053                 'user_id': self.user['id'],
 3054                 'role_id': self.role['id']})
 3055         self.delete(grant_url)
 3056         self.head('/auth/tokens', token=token,
 3057                   expected_status=http.client.UNAUTHORIZED)
 3058 
 3059     def role_data_fixtures(self):
 3060         self.projectC = unit.new_project_ref(domain_id=self.domainA['id'])
 3061         PROVIDERS.resource_api.create_project(
 3062             self.projectC['id'], self.projectC
 3063         )
 3064         self.user4 = unit.create_user(PROVIDERS.identity_api,
 3065                                       domain_id=self.domainB['id'])
 3066         self.user5 = unit.create_user(PROVIDERS.identity_api,
 3067                                       domain_id=self.domainA['id'])
 3068         self.user6 = unit.create_user(PROVIDERS.identity_api,
 3069                                       domain_id=self.domainA['id'])
 3070         PROVIDERS.identity_api.add_user_to_group(
 3071             self.user5['id'], self.group1['id']
 3072         )
 3073         PROVIDERS.assignment_api.create_grant(
 3074             self.role1['id'], group_id=self.group1['id'],
 3075             project_id=self.projectB['id']
 3076         )
 3077         PROVIDERS.assignment_api.create_grant(
 3078             self.role2['id'], user_id=self.user4['id'],
 3079             project_id=self.projectC['id']
 3080         )
 3081         PROVIDERS.assignment_api.create_grant(
 3082             self.role1['id'], user_id=self.user6['id'],
 3083             project_id=self.projectA['id']
 3084         )
 3085         PROVIDERS.assignment_api.create_grant(
 3086             self.role1['id'], user_id=self.user6['id'],
 3087             domain_id=self.domainA['id']
 3088         )
 3089 
 3090     def test_deleting_role_revokes_token(self):
 3091         """Test deleting a role revokes token.
 3092 
 3093         Add some additional test data, namely:
 3094 
 3095         - A third project (project C)
 3096         - Three additional users - user4 owned by domainB and user5 and 6 owned
 3097           by domainA (different domain ownership should not affect the test
 3098           results, just provided to broaden test coverage)
 3099         - User5 is a member of group1
 3100         - Group1 gets an additional assignment - role1 on projectB as well as
 3101           its existing role1 on projectA
 3102         - User4 has role2 on Project C
 3103         - User6 has role1 on projectA and domainA
 3104         - This allows us to create 5 tokens by virtue of different types of
 3105           role assignment:
 3106           - user1, scoped to ProjectA by virtue of user role1 assignment
 3107           - user5, scoped to ProjectB by virtue of group role1 assignment
 3108           - user4, scoped to ProjectC by virtue of user role2 assignment
 3109           - user6, scoped to ProjectA by virtue of user role1 assignment
 3110           - user6, scoped to DomainA by virtue of user role1 assignment
 3111         - role1 is then deleted
 3112         - Check the tokens on Project A and B, and DomainA are revoked, but not
 3113           the one for Project C
 3114 
 3115         """
 3116         self.role_data_fixtures()
 3117 
 3118         # Now we are ready to start issuing requests
 3119         auth_data = self.build_authentication_request(
 3120             user_id=self.user1['id'],
 3121             password=self.user1['password'],
 3122             project_id=self.projectA['id'])
 3123         tokenA = self.get_requested_token(auth_data)
 3124         auth_data = self.build_authentication_request(
 3125             user_id=self.user5['id'],
 3126             password=self.user5['password'],
 3127             project_id=self.projectB['id'])
 3128         tokenB = self.get_requested_token(auth_data)
 3129         auth_data = self.build_authentication_request(
 3130             user_id=self.user4['id'],
 3131             password=self.user4['password'],
 3132             project_id=self.projectC['id'])
 3133         tokenC = self.get_requested_token(auth_data)
 3134         auth_data = self.build_authentication_request(
 3135             user_id=self.user6['id'],
 3136             password=self.user6['password'],
 3137             project_id=self.projectA['id'])
 3138         tokenD = self.get_requested_token(auth_data)
 3139         auth_data = self.build_authentication_request(
 3140             user_id=self.user6['id'],
 3141             password=self.user6['password'],
 3142             domain_id=self.domainA['id'])
 3143         tokenE = self.get_requested_token(auth_data)
 3144         # Confirm tokens are valid
 3145         self.head('/auth/tokens',
 3146                   headers={'X-Subject-Token': tokenA},
 3147                   expected_status=http.client.OK)
 3148         self.head('/auth/tokens',
 3149                   headers={'X-Subject-Token': tokenB},
 3150                   expected_status=http.client.OK)
 3151         self.head('/auth/tokens',
 3152                   headers={'X-Subject-Token': tokenC},
 3153                   expected_status=http.client.OK)
 3154         self.head('/auth/tokens',
 3155                   headers={'X-Subject-Token': tokenD},
 3156                   expected_status=http.client.OK)
 3157         self.head('/auth/tokens',
 3158                   headers={'X-Subject-Token': tokenE},
 3159                   expected_status=http.client.OK)
 3160 
 3161         # Delete the role, which should invalidate the tokens
 3162         role_url = '/roles/%s' % self.role1['id']
 3163         self.delete(role_url)
 3164 
 3165         # Check the tokens that used role1 is invalid
 3166         self.head('/auth/tokens',
 3167                   headers={'X-Subject-Token': tokenA},
 3168                   expected_status=http.client.NOT_FOUND)
 3169         self.head('/auth/tokens',
 3170                   headers={'X-Subject-Token': tokenB},
 3171                   expected_status=http.client.NOT_FOUND)
 3172         self.head('/auth/tokens',
 3173                   headers={'X-Subject-Token': tokenD},
 3174                   expected_status=http.client.NOT_FOUND)
 3175         self.head('/auth/tokens',
 3176                   headers={'X-Subject-Token': tokenE},
 3177                   expected_status=http.client.NOT_FOUND)
 3178 
 3179         # ...but the one using role2 is still valid
 3180         self.head('/auth/tokens',
 3181                   headers={'X-Subject-Token': tokenC},
 3182                   expected_status=http.client.OK)
 3183 
 3184     def test_domain_user_role_assignment_maintains_token(self):
 3185         """Test user-domain role assignment maintains existing token.
 3186 
 3187         Test Plan:
 3188 
 3189         - Get a token for user1, scoped to ProjectA
 3190         - Create a grant for user1 on DomainB
 3191         - Check token is still valid
 3192 
 3193         """
 3194         auth_data = self.build_authentication_request(
 3195             user_id=self.user1['id'],
 3196             password=self.user1['password'],
 3197             project_id=self.projectA['id'])
 3198         token = self.get_requested_token(auth_data)
 3199         # Confirm token is valid
 3200         self.head('/auth/tokens',
 3201                   headers={'X-Subject-Token': token},
 3202                   expected_status=http.client.OK)
 3203         # Assign a role, which should not affect the token
 3204         grant_url = (
 3205             '/domains/%(domain_id)s/users/%(user_id)s/'
 3206             'roles/%(role_id)s' % {
 3207                 'domain_id': self.domainB['id'],
 3208                 'user_id': self.user1['id'],
 3209                 'role_id': self.role1['id']})
 3210         self.put(grant_url)
 3211         self.head('/auth/tokens',
 3212                   headers={'X-Subject-Token': token},
 3213                   expected_status=http.client.OK)
 3214 
 3215     def test_disabling_project_revokes_token(self):
 3216         token = self.get_requested_token(
 3217             self.build_authentication_request(
 3218                 user_id=self.user3['id'],
 3219                 password=self.user3['password'],
 3220                 project_id=self.projectA['id']))
 3221 
 3222         # confirm token is valid
 3223         self.head('/auth/tokens',
 3224                   headers={'X-Subject-Token': token},
 3225                   expected_status=http.client.OK)
 3226 
 3227         # disable the project, which should invalidate the token
 3228         self.patch(
 3229             '/projects/%(project_id)s' % {'project_id': self.projectA['id']},
 3230             body={'project': {'enabled': False}})
 3231 
 3232         # user should no longer have access to the project
 3233         self.head('/auth/tokens',
 3234                   headers={'X-Subject-Token': token},
 3235                   expected_status=http.client.NOT_FOUND)
 3236         self.v3_create_token(
 3237             self.build_authentication_request(
 3238                 user_id=self.user3['id'],
 3239                 password=self.user3['password'],
 3240                 project_id=self.projectA['id']),
 3241             expected_status=http.client.UNAUTHORIZED)
 3242 
 3243     def test_deleting_project_revokes_token(self):
 3244         token = self.get_requested_token(
 3245             self.build_authentication_request(
 3246                 user_id=self.user3['id'],
 3247                 password=self.user3['password'],
 3248                 project_id=self.projectA['id']))
 3249 
 3250         # confirm token is valid
 3251         self.head('/auth/tokens',
 3252                   headers={'X-Subject-Token': token},
 3253                   expected_status=http.client.OK)
 3254 
 3255         # delete the project, which should invalidate the token
 3256         self.delete(
 3257             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3258 
 3259         # user should no longer have access to the project
 3260         self.head('/auth/tokens',
 3261                   headers={'X-Subject-Token': token},
 3262                   expected_status=http.client.NOT_FOUND)
 3263         self.v3_create_token(
 3264             self.build_authentication_request(
 3265                 user_id=self.user3['id'],
 3266                 password=self.user3['password'],
 3267                 project_id=self.projectA['id']),
 3268             expected_status=http.client.UNAUTHORIZED)
 3269 
 3270     def test_deleting_group_grant_revokes_tokens(self):
 3271         """Test deleting a group grant revokes tokens.
 3272 
 3273         Test Plan:
 3274 
 3275         - Get a token for user1, scoped to ProjectA
 3276         - Get a token for user2, scoped to ProjectA
 3277         - Get a token for user3, scoped to ProjectA
 3278         - Delete the grant group1 has on ProjectA
 3279         - Check tokens for user1 & user2 are no longer valid,
 3280           since user1 and user2 are members of group1
 3281         - Check token for user3 is invalid too
 3282 
 3283         """
 3284         auth_data = self.build_authentication_request(
 3285             user_id=self.user1['id'],
 3286             password=self.user1['password'],
 3287             project_id=self.projectA['id'])
 3288         token1 = self.get_requested_token(auth_data)
 3289         auth_data = self.build_authentication_request(
 3290             user_id=self.user2['id'],
 3291             password=self.user2['password'],
 3292             project_id=self.projectA['id'])
 3293         token2 = self.get_requested_token(auth_data)
 3294         auth_data = self.build_authentication_request(
 3295             user_id=self.user3['id'],
 3296             password=self.user3['password'],
 3297             project_id=self.projectA['id'])
 3298         token3 = self.get_requested_token(auth_data)
 3299         # Confirm tokens are valid
 3300         self.head('/auth/tokens',
 3301                   headers={'X-Subject-Token': token1},
 3302                   expected_status=http.client.OK)
 3303         self.head('/auth/tokens',
 3304                   headers={'X-Subject-Token': token2},
 3305                   expected_status=http.client.OK)
 3306         self.head('/auth/tokens',
 3307                   headers={'X-Subject-Token': token3},
 3308                   expected_status=http.client.OK)
 3309         # Delete the group grant, which should invalidate the
 3310         # tokens for user1 and user2
 3311         grant_url = (
 3312             '/projects/%(project_id)s/groups/%(group_id)s/'
 3313             'roles/%(role_id)s' % {
 3314                 'project_id': self.projectA['id'],
 3315                 'group_id': self.group1['id'],
 3316                 'role_id': self.role1['id']})
 3317         self.delete(grant_url)
 3318         PROVIDERS.assignment_api.delete_grant(
 3319             role_id=self.role1['id'], project_id=self.projectA['id'],
 3320             user_id=self.user1['id']
 3321         )
 3322         PROVIDERS.assignment_api.delete_grant(
 3323             role_id=self.role1['id'], project_id=self.projectA['id'],
 3324             user_id=self.user2['id']
 3325         )
 3326         self.head('/auth/tokens', token=token1,
 3327                   expected_status=http.client.UNAUTHORIZED)
 3328         self.head('/auth/tokens', token=token2,
 3329                   expected_status=http.client.UNAUTHORIZED)
 3330         # But user3's token should be invalid too as revocation is done for
 3331         # scope role & project
 3332         self.head('/auth/tokens',
 3333                   headers={'X-Subject-Token': token3},
 3334                   expected_status=http.client.OK)
 3335 
 3336     def test_domain_group_role_assignment_maintains_token(self):
 3337         """Test domain-group role assignment maintains existing token.
 3338 
 3339         Test Plan:
 3340 
 3341         - Get a token for user1, scoped to ProjectA
 3342         - Create a grant for group1 on DomainB
 3343         - Check token is still longer valid
 3344 
 3345         """
 3346         auth_data = self.build_authentication_request(
 3347             user_id=self.user1['id'],
 3348             password=self.user1['password'],
 3349             project_id=self.projectA['id'])
 3350         token = self.get_requested_token(auth_data)
 3351         # Confirm token is valid
 3352         self.head('/auth/tokens',
 3353                   headers={'X-Subject-Token': token},
 3354                   expected_status=http.client.OK)
 3355         # Delete the grant, which should invalidate the token
 3356         grant_url = (
 3357             '/domains/%(domain_id)s/groups/%(group_id)s/'
 3358             'roles/%(role_id)s' % {
 3359                 'domain_id': self.domainB['id'],
 3360                 'group_id': self.group1['id'],
 3361                 'role_id': self.role1['id']})
 3362         self.put(grant_url)
 3363         self.head('/auth/tokens',
 3364                   headers={'X-Subject-Token': token},
 3365                   expected_status=http.client.OK)
 3366 
 3367     def test_group_membership_changes_revokes_token(self):
 3368         """Test add/removal to/from group revokes token.
 3369 
 3370         Test Plan:
 3371 
 3372         - Get a token for user1, scoped to ProjectA
 3373         - Get a token for user2, scoped to ProjectA
 3374         - Remove user1 from group1
 3375         - Check token for user1 is no longer valid
 3376         - Check token for user2 is still valid, even though
 3377           user2 is also part of group1
 3378         - Add user2 to group2
 3379         - Check token for user2 is now no longer valid
 3380 
 3381         """
 3382         auth_data = self.build_authentication_request(
 3383             user_id=self.user1['id'],
 3384             password=self.user1['password'],
 3385             project_id=self.projectA['id'])
 3386         token1 = self.get_requested_token(auth_data)
 3387         auth_data = self.build_authentication_request(
 3388             user_id=self.user2['id'],
 3389             password=self.user2['password'],
 3390             project_id=self.projectA['id'])
 3391         token2 = self.get_requested_token(auth_data)
 3392         # Confirm tokens are valid
 3393         self.head('/auth/tokens',
 3394                   headers={'X-Subject-Token': token1},
 3395                   expected_status=http.client.OK)
 3396         self.head('/auth/tokens',
 3397                   headers={'X-Subject-Token': token2},
 3398                   expected_status=http.client.OK)
 3399         # Remove user1 from group1, which should invalidate
 3400         # the token
 3401         self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
 3402             'group_id': self.group1['id'],
 3403             'user_id': self.user1['id']})
 3404         self.head('/auth/tokens',
 3405                   headers={'X-Subject-Token': token1},
 3406                   expected_status=http.client.NOT_FOUND)
 3407         # But user2's token should still be valid
 3408         self.head('/auth/tokens',
 3409                   headers={'X-Subject-Token': token2},
 3410                   expected_status=http.client.OK)
 3411         # Adding user2 to a group should not invalidate token
 3412         self.put('/groups/%(group_id)s/users/%(user_id)s' % {
 3413             'group_id': self.group2['id'],
 3414             'user_id': self.user2['id']})
 3415         self.head('/auth/tokens',
 3416                   headers={'X-Subject-Token': token2},
 3417                   expected_status=http.client.OK)
 3418 
 3419     def test_removing_role_assignment_does_not_affect_other_users(self):
 3420         """Revoking a role from one user should not affect other users."""
 3421         time = datetime.datetime.utcnow()
 3422         with freezegun.freeze_time(time) as frozen_datetime:
 3423             # This group grant is not needed for the test
 3424             self.delete(
 3425                 '/projects/%(p_id)s/groups/%(g_id)s/roles/%(r_id)s' %
 3426                 {'p_id': self.projectA['id'],
 3427                  'g_id': self.group1['id'],
 3428                  'r_id': self.role1['id']})
 3429 
 3430             # NOTE(lbragstad): Here we advance the clock one second to pass
 3431             # into the threshold of a new second because we just persisted a
 3432             # revocation event for removing a role from a group on a project.
 3433             # One thing to note about that revocation event is that it has no
 3434             # context about the group, so even though user3 might not be in
 3435             # group1, they could have their token revoked because the
 3436             # revocation event is very general.
 3437             frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
 3438 
 3439             user1_token = self.get_requested_token(
 3440                 self.build_authentication_request(
 3441                     user_id=self.user1['id'],
 3442                     password=self.user1['password'],
 3443                     project_id=self.projectA['id']))
 3444 
 3445             user3_token = self.get_requested_token(
 3446                 self.build_authentication_request(
 3447                     user_id=self.user3['id'],
 3448                     password=self.user3['password'],
 3449                     project_id=self.projectA['id']))
 3450 
 3451             # delete relationships between user1 and projectA from setUp
 3452             self.delete(
 3453                 '/projects/%(p_id)s/users/%(u_id)s/roles/%(r_id)s' % {
 3454                     'p_id': self.projectA['id'],
 3455                     'u_id': self.user1['id'],
 3456                     'r_id': self.role1['id']})
 3457             # authorization for the first user should now fail
 3458             self.head('/auth/tokens',
 3459                       headers={'X-Subject-Token': user1_token},
 3460                       expected_status=http.client.NOT_FOUND)
 3461             self.v3_create_token(
 3462                 self.build_authentication_request(
 3463                     user_id=self.user1['id'],
 3464                     password=self.user1['password'],
 3465                     project_id=self.projectA['id']),
 3466                 expected_status=http.client.UNAUTHORIZED)
 3467 
 3468             # authorization for the second user should still succeed
 3469             self.head('/auth/tokens',
 3470                       headers={'X-Subject-Token': user3_token},
 3471                       expected_status=http.client.OK)
 3472             self.v3_create_token(
 3473                 self.build_authentication_request(
 3474                     user_id=self.user3['id'],
 3475                     password=self.user3['password'],
 3476                     project_id=self.projectA['id']))
 3477 
 3478     def test_deleting_project_deletes_grants(self):
 3479         # This is to make it a little bit more pretty with PEP8
 3480         role_path = ('/projects/%(project_id)s/users/%(user_id)s/'
 3481                      'roles/%(role_id)s')
 3482         role_path = role_path % {'user_id': self.user['id'],
 3483                                  'project_id': self.projectA['id'],
 3484                                  'role_id': self.role['id']}
 3485 
 3486         # grant the user a role on the project
 3487         self.put(role_path)
 3488 
 3489         # delete the project, which should remove the roles
 3490         self.delete(
 3491             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3492 
 3493         # Make sure that we get a 404 Not Found when heading that role.
 3494         self.head(role_path, expected_status=http.client.NOT_FOUND)
 3495 
 3496     def test_revoke_token_from_token(self):
 3497         # Test that a scoped token can be requested from an unscoped token,
 3498         # the scoped token can be revoked, and the unscoped token remains
 3499         # valid.
 3500 
 3501         unscoped_token = self.get_requested_token(
 3502             self.build_authentication_request(
 3503                 user_id=self.user1['id'],
 3504                 password=self.user1['password']))
 3505 
 3506         # Get a project-scoped token from the unscoped token
 3507         project_scoped_token = self.get_requested_token(
 3508             self.build_authentication_request(
 3509                 token=unscoped_token,
 3510                 project_id=self.projectA['id']))
 3511 
 3512         # Get a domain-scoped token from the unscoped token
 3513         domain_scoped_token = self.get_requested_token(
 3514             self.build_authentication_request(
 3515                 token=unscoped_token,
 3516                 domain_id=self.domainA['id']))
 3517 
 3518         # revoke the project-scoped token.
 3519         self.delete('/auth/tokens',
 3520                     headers={'X-Subject-Token': project_scoped_token})
 3521 
 3522         # The project-scoped token is invalidated.
 3523         self.head('/auth/tokens',
 3524                   headers={'X-Subject-Token': project_scoped_token},
 3525                   expected_status=http.client.NOT_FOUND)
 3526 
 3527         # The unscoped token should still be valid.
 3528         self.head('/auth/tokens',
 3529                   headers={'X-Subject-Token': unscoped_token},
 3530                   expected_status=http.client.OK)
 3531 
 3532         # The domain-scoped token should still be valid.
 3533         self.head('/auth/tokens',
 3534                   headers={'X-Subject-Token': domain_scoped_token},
 3535                   expected_status=http.client.OK)
 3536 
 3537         # revoke the domain-scoped token.
 3538         self.delete('/auth/tokens',
 3539                     headers={'X-Subject-Token': domain_scoped_token})
 3540 
 3541         # The domain-scoped token is invalid.
 3542         self.head('/auth/tokens',
 3543                   headers={'X-Subject-Token': domain_scoped_token},
 3544                   expected_status=http.client.NOT_FOUND)
 3545 
 3546         # The unscoped token should still be valid.
 3547         self.head('/auth/tokens',
 3548                   headers={'X-Subject-Token': unscoped_token},
 3549                   expected_status=http.client.OK)
 3550 
 3551 
 3552 class TestTokenRevokeApi(TestTokenRevokeById):
 3553     """Test token revocation on the v3 Identity API."""
 3554 
 3555     def config_overrides(self):
 3556         super(TestTokenRevokeApi, self).config_overrides()
 3557         self.config_fixture.config(
 3558             group='token',
 3559             provider='fernet',
 3560             revoke_by_id=False)
 3561         self.useFixture(
 3562             ksfixtures.KeyRepository(
 3563                 self.config_fixture,
 3564                 'fernet_tokens',
 3565                 CONF.fernet_tokens.max_active_keys
 3566             )
 3567         )
 3568 
 3569     def assertValidDeletedProjectResponse(self, events_response, project_id):
 3570         events = events_response['events']
 3571         self.assertEqual(1, len(events))
 3572         self.assertEqual(project_id, events[0]['project_id'])
 3573         self.assertIsNotNone(events[0]['issued_before'])
 3574         self.assertIsNotNone(events_response['links'])
 3575         del (events_response['events'][0]['issued_before'])
 3576         del (events_response['events'][0]['revoked_at'])
 3577         del (events_response['links'])
 3578         expected_response = {'events': [{'project_id': project_id}]}
 3579         self.assertEqual(expected_response, events_response)
 3580 
 3581     def assertValidRevokedTokenResponse(self, events_response, **kwargs):
 3582         events = events_response['events']
 3583         self.assertEqual(1, len(events))
 3584         for k, v in kwargs.items():
 3585             self.assertEqual(v, events[0].get(k))
 3586         self.assertIsNotNone(events[0]['issued_before'])
 3587         self.assertIsNotNone(events_response['links'])
 3588         del (events_response['events'][0]['issued_before'])
 3589         del (events_response['events'][0]['revoked_at'])
 3590         del (events_response['links'])
 3591 
 3592         expected_response = {'events': [kwargs]}
 3593         self.assertEqual(expected_response, events_response)
 3594 
 3595     def test_revoke_token(self):
 3596         scoped_token = self.get_scoped_token()
 3597         headers = {'X-Subject-Token': scoped_token}
 3598         response = self.get('/auth/tokens', headers=headers).json_body['token']
 3599 
 3600         self.delete('/auth/tokens', headers=headers)
 3601         self.head('/auth/tokens', headers=headers,
 3602                   expected_status=http.client.NOT_FOUND)
 3603         events_response = self.get('/OS-REVOKE/events').json_body
 3604         self.assertValidRevokedTokenResponse(events_response,
 3605                                              audit_id=response['audit_ids'][0])
 3606 
 3607     def test_get_revoke_by_id_false_returns_gone(self):
 3608         self.get('/auth/tokens/OS-PKI/revoked',
 3609                  expected_status=http.client.GONE)
 3610 
 3611     def test_head_revoke_by_id_false_returns_gone(self):
 3612         self.head('/auth/tokens/OS-PKI/revoked',
 3613                   expected_status=http.client.GONE)
 3614 
 3615     def test_revoke_by_id_true_returns_forbidden(self):
 3616         self.config_fixture.config(
 3617             group='token',
 3618             revoke_by_id=True)
 3619         self.get(
 3620             '/auth/tokens/OS-PKI/revoked',
 3621             expected_status=http.client.FORBIDDEN
 3622         )
 3623         self.head(
 3624             '/auth/tokens/OS-PKI/revoked',
 3625             expected_status=http.client.FORBIDDEN
 3626         )
 3627 
 3628     def test_list_delete_project_shows_in_event_list(self):
 3629         self.role_data_fixtures()
 3630         events = self.get('/OS-REVOKE/events').json_body['events']
 3631         self.assertEqual([], events)
 3632         self.delete(
 3633             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3634         events_response = self.get('/OS-REVOKE/events').json_body
 3635 
 3636         self.assertValidDeletedProjectResponse(events_response,
 3637                                                self.projectA['id'])
 3638 
 3639     def assertEventDataInList(self, events, **kwargs):
 3640         found = False
 3641         for e in events:
 3642             for key, value in kwargs.items():
 3643                 try:
 3644                     if e[key] != value:
 3645                         break
 3646                 except KeyError:
 3647                     # Break the loop and present a nice error instead of
 3648                     # KeyError
 3649                     break
 3650             else:
 3651                 # If the value of the event[key] matches the value of the kwarg
 3652                 # for each item in kwargs, the event was fully matched and
 3653                 # the assertTrue below should succeed.
 3654                 found = True
 3655         self.assertTrue(found,
 3656                         'event with correct values not in list, expected to '
 3657                         'find event with key-value pairs. Expected: '
 3658                         '"%(expected)s" Events: "%(events)s"' %
 3659                         {'expected': ','.join(
 3660                             ["'%s=%s'" % (k, v) for k, v in kwargs.items()]),
 3661                          'events': events})
 3662 
 3663     def test_list_delete_token_shows_in_event_list(self):
 3664         self.role_data_fixtures()
 3665         events = self.get('/OS-REVOKE/events').json_body['events']
 3666         self.assertEqual([], events)
 3667 
 3668         scoped_token = self.get_scoped_token()
 3669         headers = {'X-Subject-Token': scoped_token}
 3670         auth_req = self.build_authentication_request(token=scoped_token)
 3671         response = self.v3_create_token(auth_req)
 3672         token2 = response.json_body['token']
 3673         headers2 = {'X-Subject-Token': response.headers['X-Subject-Token']}
 3674 
 3675         response = self.v3_create_token(auth_req)
 3676         response.json_body['token']
 3677         headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']}
 3678 
 3679         self.head('/auth/tokens', headers=headers,
 3680                   expected_status=http.client.OK)
 3681         self.head('/auth/tokens', headers=headers2,
 3682                   expected_status=http.client.OK)
 3683         self.head('/auth/tokens', headers=headers3,
 3684                   expected_status=http.client.OK)
 3685 
 3686         self.delete('/auth/tokens', headers=headers)
 3687         # NOTE(ayoung): not deleting token3, as it should be deleted
 3688         # by previous
 3689         events_response = self.get('/OS-REVOKE/events').json_body
 3690         events = events_response['events']
 3691         self.assertEqual(1, len(events))
 3692         self.assertEventDataInList(
 3693             events,
 3694             audit_id=token2['audit_ids'][1])
 3695         self.head('/auth/tokens', headers=headers,
 3696                   expected_status=http.client.NOT_FOUND)
 3697         self.head('/auth/tokens', headers=headers2,
 3698                   expected_status=http.client.OK)
 3699         self.head('/auth/tokens', headers=headers3,
 3700                   expected_status=http.client.OK)
 3701 
 3702     def test_list_with_filter(self):
 3703 
 3704         self.role_data_fixtures()
 3705         events = self.get('/OS-REVOKE/events').json_body['events']
 3706         self.assertEqual(0, len(events))
 3707 
 3708         scoped_token = self.get_scoped_token()
 3709         headers = {'X-Subject-Token': scoped_token}
 3710         auth = self.build_authentication_request(token=scoped_token)
 3711         headers2 = {'X-Subject-Token': self.get_requested_token(auth)}
 3712         self.delete('/auth/tokens', headers=headers)
 3713         self.delete('/auth/tokens', headers=headers2)
 3714 
 3715         events = self.get('/OS-REVOKE/events').json_body['events']
 3716 
 3717         self.assertEqual(2, len(events))
 3718         future = utils.isotime(timeutils.utcnow() +
 3719                                datetime.timedelta(seconds=1000))
 3720 
 3721         events = self.get('/OS-REVOKE/events?since=%s' % (future)
 3722                           ).json_body['events']
 3723         self.assertEqual(0, len(events))
 3724 
 3725 
 3726 class TestAuthExternalDisabled(test_v3.RestfulTestCase):
 3727     def config_overrides(self):
 3728         super(TestAuthExternalDisabled, self).config_overrides()
 3729         self.config_fixture.config(
 3730             group='auth',
 3731             methods=['password', 'token'])
 3732 
 3733     def test_remote_user_disabled(self):
 3734         app = self.loadapp()
 3735         remote_user = '%s@%s' % (self.user['name'], self.domain['name'])
 3736         with app.test_client() as c:
 3737             c.environ_base.update(self.build_external_auth_environ(
 3738                 remote_user))
 3739             auth_data = self.build_authentication_request()
 3740             c.post('/v3/auth/tokens', json=auth_data,
 3741                    expected_status_code=http.client.UNAUTHORIZED)
 3742 
 3743 # FIXME(morgan): This test case must be re-worked to function under flask. It
 3744 # has been commented out until it is re-worked ensuring no issues when webob
 3745 # classes are removed.
 3746 # https://bugs.launchpad.net/keystone/+bug/1793756
 3747 # class AuthExternalDomainBehavior(object):
 3748 #     content_type = 'json'
 3749 #
 3750 #     def test_remote_user_with_realm(self):
 3751 #         api = auth.controllers.Auth()
 3752 #         remote_user = self.user['name']
 3753 #         remote_domain = self.domain['name']
 3754 #         request, auth_info, auth_context = self.build_external_auth_request(
 3755 #             remote_user, remote_domain=remote_domain, kerberos=self.kerberos)
 3756 #
 3757 #         api.authenticate(request, auth_info, auth_context)
 3758 #         self.assertEqual(self.user['id'], auth_context['user_id'])
 3759 #
 3760 #         # Now test to make sure the user name can, itself, contain the
 3761 #         # '@' character.
 3762 #         user = {'name': 'myname@mydivision'}
 3763 #         PROVIDERS.identity_api.update_user(self.user['id'], user)
 3764 #         remote_user = user['name']
 3765 #         request, auth_info, auth_context = self.build_external_auth_request(
 3766 #             remote_user, remote_domain=remote_domain, kerberos=self.kerberos)
 3767 #
 3768 #         api.authenticate(request, auth_info, auth_context)
 3769 #         self.assertEqual(self.user['id'], auth_context['user_id'])
 3770 #
 3771 #
 3772 # FIXME(morgan): This test case must be re-worked to function under flask. It
 3773 # has been commented out until it is re-worked ensuring no issues when webob
 3774 # classes are removed.
 3775 # https://bugs.launchpad.net/keystone/+bug/1793756
 3776 # class TestAuthExternalDefaultDomain(object):
 3777 #     content_type = 'json'
 3778 #
 3779 #     def config_overrides(self):
 3780 #         super(TestAuthExternalDefaultDomain, self).config_overrides()
 3781 #         self.kerberos = False
 3782 #         self.auth_plugin_config_override(external='DefaultDomain')
 3783 #
 3784 #     def test_remote_user_with_default_domain(self):
 3785 #         api = auth.controllers.Auth()
 3786 #         remote_user = self.default_domain_user['name']
 3787 #         request, auth_info, auth_context = self.build_external_auth_request(
 3788 #             remote_user, kerberos=self.kerberos)
 3789 #
 3790 #         api.authenticate(request, auth_info, auth_context)
 3791 #         self.assertEqual(self.default_domain_user['id'],
 3792 #                          auth_context['user_id'])
 3793 #
 3794 #         # Now test to make sure the user name can, itself, contain the
 3795 #         # '@' character.
 3796 #         user = {'name': 'myname@mydivision'}
 3797 #         PROVIDERS.identity_api.update_user(
 3798 #             self.default_domain_user['id'], user
 3799 #         )
 3800 #         remote_user = user['name']
 3801 #         request, auth_info, auth_context = self.build_external_auth_request(
 3802 #             remote_user, kerberos=self.kerberos)
 3803 #
 3804 #         api.authenticate(request, auth_info, auth_context)
 3805 #         self.assertEqual(self.default_domain_user['id'],
 3806 #                          auth_context['user_id'])
 3807 #
 3808 
 3809 
 3810 class TestAuthJSONExternal(test_v3.RestfulTestCase):
 3811     content_type = 'json'
 3812 
 3813     def auth_plugin_config_override(self, methods=None, **method_classes):
 3814         self.config_fixture.config(group='auth', methods=[])
 3815 
 3816     def test_remote_user_no_method(self):
 3817         app = self.loadapp()
 3818         with app.test_client() as c:
 3819             c.environ_base.update(self.build_external_auth_environ(
 3820                 self.default_domain_user['name']))
 3821             auth_data = self.build_authentication_request()
 3822             c.post('/v3/auth/tokens', json=auth_data,
 3823                    expected_status_code=http.client.UNAUTHORIZED)
 3824 
 3825 
 3826 class TrustAPIBehavior(test_v3.RestfulTestCase):
 3827     """Redelegation valid and secure.
 3828 
 3829     Redelegation is a hierarchical structure of trusts between initial trustor
 3830     and a group of users allowed to impersonate trustor and act in his name.
 3831     Hierarchy is created in a process of trusting already trusted permissions
 3832     and organized as an adjacency list using 'redelegated_trust_id' field.
 3833     Redelegation is valid if each subsequent trust in a chain passes 'not more'
 3834     permissions than being redelegated.
 3835 
 3836     Trust constraints are:
 3837      * roles - set of roles trusted by trustor
 3838      * expiration_time
 3839      * allow_redelegation - a flag
 3840      * redelegation_count - decreasing value restricting length of trust chain
 3841      * remaining_uses - DISALLOWED when allow_redelegation == True
 3842 
 3843     Trust becomes invalid in case:
 3844      * trust roles were revoked from trustor
 3845      * one of the users in the delegation chain was disabled or deleted
 3846      * expiration time passed
 3847      * one of the parent trusts has become invalid
 3848      * one of the parent trusts was deleted
 3849 
 3850     """
 3851 
 3852     def config_overrides(self):
 3853         super(TrustAPIBehavior, self).config_overrides()
 3854         self.config_fixture.config(
 3855             group='trust',
 3856             allow_redelegation=True,
 3857             max_redelegation_count=10
 3858         )
 3859 
 3860     def setUp(self):
 3861         super(TrustAPIBehavior, self).setUp()
 3862         # Create a trustee to delegate stuff to
 3863         self.trustee_user = unit.create_user(PROVIDERS.identity_api,
 3864                                              domain_id=self.domain_id)
 3865 
 3866         # trustor->trustee
 3867         self.redelegated_trust_ref = unit.new_trust_ref(
 3868             trustor_user_id=self.user_id,
 3869             trustee_user_id=self.trustee_user['id'],
 3870             project_id=self.project_id,
 3871             impersonation=True,
 3872             expires=dict(minutes=1),
 3873             role_ids=[self.role_id],
 3874             allow_redelegation=True)
 3875 
 3876         # trustor->trustee (no redelegation)
 3877         self.chained_trust_ref = unit.new_trust_ref(
 3878             trustor_user_id=self.user_id,
 3879             trustee_user_id=self.trustee_user['id'],
 3880             project_id=self.project_id,
 3881             impersonation=True,
 3882             role_ids=[self.role_id],
 3883             allow_redelegation=True)
 3884 
 3885     def _get_trust_token(self, trust):
 3886         trust_id = trust['id']
 3887         auth_data = self.build_authentication_request(
 3888             user_id=self.trustee_user['id'],
 3889             password=self.trustee_user['password'],
 3890             trust_id=trust_id)
 3891         trust_token = self.get_requested_token(auth_data)
 3892         return trust_token
 3893 
 3894     def test_depleted_redelegation_count_error(self):
 3895         self.redelegated_trust_ref['redelegation_count'] = 0
 3896         r = self.post('/OS-TRUST/trusts',
 3897                       body={'trust': self.redelegated_trust_ref})
 3898         trust = self.assertValidTrustResponse(r)
 3899         trust_token = self._get_trust_token(trust)
 3900 
 3901         # Attempt to create a redelegated trust.
 3902         self.post('/OS-TRUST/trusts',
 3903                   body={'trust': self.chained_trust_ref},
 3904                   token=trust_token,
 3905                   expected_status=http.client.FORBIDDEN)
 3906 
 3907     def test_modified_redelegation_count_error(self):
 3908         r = self.post('/OS-TRUST/trusts',
 3909                       body={'trust': self.redelegated_trust_ref})
 3910         trust = self.assertValidTrustResponse(r)
 3911         trust_token = self._get_trust_token(trust)
 3912 
 3913         # Attempt to create a redelegated trust with incorrect
 3914         # redelegation_count.
 3915         correct = trust['redelegation_count'] - 1
 3916         incorrect = correct - 1
 3917         self.chained_trust_ref['redelegation_count'] = incorrect
 3918         self.post('/OS-TRUST/trusts',
 3919                   body={'trust': self.chained_trust_ref},
 3920                   token=trust_token,
 3921                   expected_status=http.client.FORBIDDEN)
 3922 
 3923     def test_max_redelegation_count_constraint(self):
 3924         incorrect = CONF.trust.max_redelegation_count + 1
 3925         self.redelegated_trust_ref['redelegation_count'] = incorrect
 3926         self.post('/OS-TRUST/trusts',
 3927                   body={'trust': self.redelegated_trust_ref},
 3928                   expected_status=http.client.FORBIDDEN)
 3929 
 3930     def test_redelegation_expiry(self):
 3931         r = self.post('/OS-TRUST/trusts',
 3932                       body={'trust': self.redelegated_trust_ref})
 3933         trust = self.assertValidTrustResponse(r)
 3934         trust_token = self._get_trust_token(trust)
 3935 
 3936         # Attempt to create a redelegated trust supposed to last longer
 3937         # than the parent trust: let's give it 10 minutes (>1 minute).
 3938         too_long_live_chained_trust_ref = unit.new_trust_ref(
 3939             trustor_user_id=self.user_id,
 3940             trustee_user_id=self.trustee_user['id'],
 3941             project_id=self.project_id,
 3942             impersonation=True,
 3943             expires=dict(minutes=10),
 3944             role_ids=[self.role_id])
 3945         self.post('/OS-TRUST/trusts',
 3946                   body={'trust': too_long_live_chained_trust_ref},
 3947                   token=trust_token,
 3948                   expected_status=http.client.FORBIDDEN)
 3949 
 3950     def test_redelegation_remaining_uses(self):
 3951         r = self.post('/OS-TRUST/trusts',
 3952                       body={'trust': self.redelegated_trust_ref})
 3953         trust = self.assertValidTrustResponse(r)
 3954         trust_token = self._get_trust_token(trust)
 3955 
 3956         # Attempt to create a redelegated trust with remaining_uses defined.
 3957         # It must fail according to specification: remaining_uses must be
 3958         # omitted for trust redelegation. Any number here.
 3959         self.chained_trust_ref['remaining_uses'] = 5
 3960         self.post('/OS-TRUST/trusts',
 3961                   body={'trust': self.chained_trust_ref},
 3962                   token=trust_token,
 3963                   expected_status=http.client.BAD_REQUEST)
 3964 
 3965     def test_roles_subset(self):
 3966         # Build second role
 3967         role = unit.new_role_ref()
 3968         PROVIDERS.role_api.create_role(role['id'], role)
 3969         # assign a new role to the user
 3970         PROVIDERS.assignment_api.create_grant(
 3971             role_id=role['id'], user_id=self.user_id,
 3972             project_id=self.project_id
 3973         )
 3974 
 3975         # Create first trust with extended set of roles
 3976         ref = self.redelegated_trust_ref
 3977         ref['expires_at'] = datetime.datetime.utcnow().replace(
 3978             year=2032).strftime(unit.TIME_FORMAT)
 3979         ref['roles'].append({'id': role['id']})
 3980         r = self.post('/OS-TRUST/trusts',
 3981                       body={'trust': ref})
 3982         trust = self.assertValidTrustResponse(r)
 3983         # Trust created with exact set of roles (checked by role id)
 3984         role_id_set = set(r['id'] for r in ref['roles'])
 3985         trust_role_id_set = set(r['id'] for r in trust['roles'])
 3986         self.assertEqual(role_id_set, trust_role_id_set)
 3987 
 3988         trust_token = self._get_trust_token(trust)
 3989 
 3990         # Chain second trust with roles subset
 3991         self.chained_trust_ref['expires_at'] = (
 3992             datetime.datetime.utcnow().replace(year=2028).strftime(
 3993                 unit.TIME_FORMAT))
 3994         r = self.post('/OS-TRUST/trusts',
 3995                       body={'trust': self.chained_trust_ref},
 3996                       token=trust_token)
 3997         trust2 = self.assertValidTrustResponse(r)
 3998         # First trust contains roles superset
 3999         # Second trust contains roles subset
 4000         role_id_set1 = set(r['id'] for r in trust['roles'])
 4001         role_id_set2 = set(r['id'] for r in trust2['roles'])
 4002         self.assertThat(role_id_set1, matchers.GreaterThan(role_id_set2))
 4003 
 4004     def test_trust_with_implied_roles(self):
 4005         # Create some roles
 4006         role1 = unit.new_role_ref()
 4007         PROVIDERS.role_api.create_role(role1['id'], role1)
 4008         role2 = unit.new_role_ref()
 4009         PROVIDERS.role_api.create_role(role2['id'], role2)
 4010 
 4011         # Implication
 4012         PROVIDERS.role_api.create_implied_role(role1['id'], role2['id'])
 4013 
 4014         # Assign new roles to the user (with role2 implied)
 4015         PROVIDERS.assignment_api.create_grant(
 4016             role_id=role1['id'], user_id=self.user_id,
 4017             project_id=self.project_id
 4018         )
 4019 
 4020         # Create trust
 4021         ref = self.redelegated_trust_ref
 4022         ref['roles'] = [{'id': role1['id']}, {'id': role2['id']}]
 4023         resp = self.post('/OS-TRUST/trusts',
 4024                          body={'trust': ref})
 4025         trust = self.assertValidTrustResponse(resp)
 4026 
 4027         # Trust created with exact set of roles (checked by role id)
 4028         role_ids = [r['id'] for r in ref['roles']]
 4029         trust_role_ids = [r['id'] for r in trust['roles']]
 4030         # Compare requested roles with roles in response
 4031         self.assertEqual(role_ids, trust_role_ids)
 4032 
 4033         # Get a trust-scoped token
 4034         auth_data = self.build_authentication_request(
 4035             user_id=self.trustee_user['id'],
 4036             password=self.trustee_user['password'],
 4037             trust_id=trust['id']
 4038         )
 4039         resp = self.post('/auth/tokens', body=auth_data)
 4040         trust_token_role_ids = [r['id'] for r in resp.json['token']['roles']]
 4041         # Compare requested roles with roles given in token data
 4042         self.assertEqual(sorted(role_ids), sorted(trust_token_role_ids))
 4043 
 4044     def test_redelegate_with_role_by_name(self):
 4045         # For role by name testing
 4046         ref = unit.new_trust_ref(
 4047             trustor_user_id=self.user_id,
 4048             trustee_user_id=self.trustee_user['id'],
 4049             project_id=self.project_id,
 4050             impersonation=True,
 4051             expires=dict(minutes=1),
 4052             role_names=[self.role['name']],
 4053             allow_redelegation=True)
 4054         ref['expires_at'] = datetime.datetime.utcnow().replace(
 4055             year=2032).strftime(unit.TIME_FORMAT)
 4056         r = self.post('/OS-TRUST/trusts',
 4057                       body={'trust': ref})
 4058         trust = self.assertValidTrustResponse(r)
 4059         # Ensure we can get a token with this trust
 4060         trust_token = self._get_trust_token(trust)
 4061         # Chain second trust with roles subset
 4062         ref = unit.new_trust_ref(
 4063             trustor_user_id=self.user_id,
 4064             trustee_user_id=self.trustee_user['id'],
 4065             project_id=self.project_id,
 4066             impersonation=True,
 4067             role_names=[self.role['name']],
 4068             allow_redelegation=True)
 4069         ref['expires_at'] = datetime.datetime.utcnow().replace(
 4070             year=2028).strftime(unit.TIME_FORMAT)
 4071         r = self.post('/OS-TRUST/trusts',
 4072                       body={'trust': ref},
 4073                       token=trust_token)
 4074         trust = self.assertValidTrustResponse(r)
 4075         # Ensure we can get a token with this trust
 4076         self._get_trust_token(trust)
 4077 
 4078     def test_redelegate_new_role_fails(self):
 4079         r = self.post('/OS-TRUST/trusts',
 4080                       body={'trust': self.redelegated_trust_ref})
 4081         trust = self.assertValidTrustResponse(r)
 4082         trust_token = self._get_trust_token(trust)
 4083 
 4084         # Build second trust with a role not in parent's roles
 4085         role = unit.new_role_ref()
 4086         PROVIDERS.role_api.create_role(role['id'], role)
 4087         # assign a new role to the user
 4088         PROVIDERS.assignment_api.create_grant(
 4089             role_id=role['id'], user_id=self.user_id,
 4090             project_id=self.project_id
 4091         )
 4092 
 4093         # Try to chain a trust with the role not from parent trust
 4094         self.chained_trust_ref['roles'] = [{'id': role['id']}]
 4095 
 4096         # Bypass policy enforcement
 4097         with mock.patch.object(policy, 'enforce', return_value=True):
 4098             self.post('/OS-TRUST/trusts',
 4099                       body={'trust': self.chained_trust_ref},
 4100                       token=trust_token,
 4101                       expected_status=http.client.FORBIDDEN)
 4102 
 4103     def test_redelegation_terminator(self):
 4104         self.redelegated_trust_ref['expires_at'] = (
 4105             datetime.datetime.utcnow().replace(year=2032).strftime(
 4106                 unit.TIME_FORMAT))
 4107         r = self.post('/OS-TRUST/trusts',
 4108                       body={'trust': self.redelegated_trust_ref})
 4109         trust = self.assertValidTrustResponse(r)
 4110         trust_token = self._get_trust_token(trust)
 4111 
 4112         # Build second trust - the terminator
 4113         self.chained_trust_ref['expires_at'] = (
 4114             datetime.datetime.utcnow().replace(year=2028).strftime(
 4115                 unit.TIME_FORMAT))
 4116         ref = dict(self.chained_trust_ref,
 4117                    redelegation_count=1,
 4118                    allow_redelegation=False)
 4119 
 4120         r = self.post('/OS-TRUST/trusts',
 4121                       body={'trust': ref},
 4122                       token=trust_token)
 4123 
 4124         trust = self.assertValidTrustResponse(r)
 4125         # Check that allow_redelegation == False caused redelegation_count
 4126         # to be set to 0, while allow_redelegation is removed
 4127         self.assertNotIn('allow_redelegation', trust)
 4128         self.assertEqual(0, trust['redelegation_count'])
 4129         trust_token = self._get_trust_token(trust)
 4130 
 4131         # Build third trust, same as second
 4132         self.post('/OS-TRUST/trusts',
 4133                   body={'trust': ref},
 4134                   token=trust_token,
 4135                   expected_status=http.client.FORBIDDEN)
 4136 
 4137     def test_redelegation_without_impersonation(self):
 4138         # Update trust to not allow impersonation
 4139         self.redelegated_trust_ref['impersonation'] = False
 4140 
 4141         # Create trust
 4142         resp = self.post('/OS-TRUST/trusts',
 4143                          body={'trust': self.redelegated_trust_ref},
 4144                          expected_status=http.client.CREATED)
 4145         trust = self.assertValidTrustResponse(resp)
 4146 
 4147         # Get trusted token without impersonation
 4148         auth_data = self.build_authentication_request(
 4149             user_id=self.trustee_user['id'],
 4150             password=self.trustee_user['password'],
 4151             trust_id=trust['id'])
 4152         trust_token = self.get_requested_token(auth_data)
 4153 
 4154         # Create second user for redelegation
 4155         trustee_user_2 = unit.create_user(PROVIDERS.identity_api,
 4156                                           domain_id=self.domain_id)
 4157 
 4158         # Trust for redelegation
 4159         trust_ref_2 = unit.new_trust_ref(
 4160             trustor_user_id=self.trustee_user['id'],
 4161             trustee_user_id=trustee_user_2['id'],
 4162             project_id=self.project_id,
 4163             impersonation=False,
 4164             expires=dict(minutes=1),
 4165             role_ids=[self.role_id],
 4166             allow_redelegation=False)
 4167 
 4168         # Creating a second trust should not be allowed since trustor does not
 4169         # have the role to delegate thus returning 404 NOT FOUND.
 4170         resp = self.post('/OS-TRUST/trusts',
 4171                          body={'trust': trust_ref_2},
 4172                          token=trust_token,
 4173                          expected_status=http.client.NOT_FOUND)
 4174 
 4175     def test_create_unscoped_trust(self):
 4176         ref = unit.new_trust_ref(
 4177             trustor_user_id=self.user_id,
 4178             trustee_user_id=self.trustee_user['id'])
 4179         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4180         self.assertValidTrustResponse(r, ref)
 4181 
 4182     def test_create_trust_no_roles(self):
 4183         ref = unit.new_trust_ref(
 4184             trustor_user_id=self.user_id,
 4185             trustee_user_id=self.trustee_user['id'],
 4186             project_id=self.project_id)
 4187         self.post('/OS-TRUST/trusts', body={'trust': ref},
 4188                   expected_status=http.client.FORBIDDEN)
 4189 
 4190     def _initialize_test_consume_trust(self, count):
 4191         # Make sure remaining_uses is decremented as we consume the trust
 4192         ref = unit.new_trust_ref(
 4193             trustor_user_id=self.user_id,
 4194             trustee_user_id=self.trustee_user['id'],
 4195             project_id=self.project_id,
 4196             remaining_uses=count,
 4197             role_ids=[self.role_id])
 4198         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4199         # make sure the trust exists
 4200         trust = self.assertValidTrustResponse(r, ref)
 4201         r = self.get(
 4202             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4203         # get a token for the trustee
 4204         auth_data = self.build_authentication_request(
 4205             user_id=self.trustee_user['id'],
 4206             password=self.trustee_user['password'])
 4207         r = self.v3_create_token(auth_data)
 4208         token = r.headers.get('X-Subject-Token')
 4209         # get a trust token, consume one use
 4210         auth_data = self.build_authentication_request(
 4211             token=token,
 4212             trust_id=trust['id'])
 4213         r = self.v3_create_token(auth_data)
 4214         return trust
 4215 
 4216     def test_authenticate_without_trust_dict_returns_bad_request(self):
 4217         # Authenticate for a token to use in the request
 4218         token = self.v3_create_token(
 4219             self.build_authentication_request(
 4220                 user_id=self.trustee_user['id'],
 4221                 password=self.trustee_user['password']
 4222             )
 4223         ).headers.get('X-Subject-Token')
 4224 
 4225         auth_data = {
 4226             'auth': {
 4227                 'identity': {
 4228                     'methods': ['token'],
 4229                     'token': {'id': token}
 4230                 },
 4231                 # We don't need a trust to execute this test, the
 4232                 # OS-TRUST:trust key of the request body just has to be a
 4233                 # string instead of a dictionary in order to throw a 500 when
 4234                 # it should a 400 Bad Request.
 4235                 'scope': {'OS-TRUST:trust': ''}
 4236             }
 4237         }
 4238         self.admin_request(
 4239             method='POST', path='/v3/auth/tokens', body=auth_data,
 4240             expected_status=http.client.BAD_REQUEST
 4241         )
 4242 
 4243     def test_consume_trust_once(self):
 4244         trust = self._initialize_test_consume_trust(2)
 4245         # check decremented value
 4246         r = self.get(
 4247             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4248         trust = r.result.get('trust')
 4249         self.assertIsNotNone(trust)
 4250         self.assertEqual(1, trust['remaining_uses'])
 4251         self.assertEqual(self.role['name'], trust['roles'][0]['name'])
 4252         self.assertEqual(self.role['id'], trust['roles'][0]['id'])
 4253 
 4254     def test_create_one_time_use_trust(self):
 4255         trust = self._initialize_test_consume_trust(1)
 4256         # No more uses, the trust is made unavailable
 4257         self.get(
 4258             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
 4259             expected_status=http.client.NOT_FOUND)
 4260         # this time we can't get a trust token
 4261         auth_data = self.build_authentication_request(
 4262             user_id=self.trustee_user['id'],
 4263             password=self.trustee_user['password'],
 4264             trust_id=trust['id'])
 4265         self.v3_create_token(auth_data,
 4266                              expected_status=http.client.UNAUTHORIZED)
 4267 
 4268     def test_create_unlimited_use_trust(self):
 4269         # by default trusts are unlimited in terms of tokens that can be
 4270         # generated from them, this test creates such a trust explicitly
 4271         ref = unit.new_trust_ref(
 4272             trustor_user_id=self.user_id,
 4273             trustee_user_id=self.trustee_user['id'],
 4274             project_id=self.project_id,
 4275             remaining_uses=None,
 4276             role_ids=[self.role_id])
 4277         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4278         trust = self.assertValidTrustResponse(r, ref)
 4279 
 4280         r = self.get(
 4281             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4282         auth_data = self.build_authentication_request(
 4283             user_id=self.trustee_user['id'],
 4284             password=self.trustee_user['password'])
 4285         r = self.v3_create_token(auth_data)
 4286         token = r.headers.get('X-Subject-Token')
 4287         auth_data = self.build_authentication_request(
 4288             token=token,
 4289             trust_id=trust['id'])
 4290         r = self.v3_create_token(auth_data)
 4291         r = self.get(
 4292             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4293         trust = r.result.get('trust')
 4294         self.assertIsNone(trust['remaining_uses'])
 4295 
 4296     def test_impersonation_token_cannot_create_new_trust(self):
 4297         ref = unit.new_trust_ref(
 4298             trustor_user_id=self.user_id,
 4299             trustee_user_id=self.trustee_user['id'],
 4300             project_id=self.project_id,
 4301             impersonation=True,
 4302             expires=dict(minutes=1),
 4303             role_ids=[self.role_id])
 4304 
 4305         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4306         trust = self.assertValidTrustResponse(r)
 4307 
 4308         auth_data = self.build_authentication_request(
 4309             user_id=self.trustee_user['id'],
 4310             password=self.trustee_user['password'],
 4311             trust_id=trust['id'])
 4312 
 4313         trust_token = self.get_requested_token(auth_data)
 4314 
 4315         # Build second trust
 4316         ref = unit.new_trust_ref(
 4317             trustor_user_id=self.user_id,
 4318             trustee_user_id=self.trustee_user['id'],
 4319             project_id=self.project_id,
 4320             impersonation=True,
 4321             expires=dict(minutes=1),
 4322             role_ids=[self.role_id])
 4323 
 4324         self.post('/OS-TRUST/trusts',
 4325                   body={'trust': ref},
 4326                   token=trust_token,
 4327                   expected_status=http.client.FORBIDDEN)
 4328 
 4329     def test_trust_deleted_grant(self):
 4330         # create a new role
 4331         role = unit.new_role_ref()
 4332         PROVIDERS.role_api.create_role(role['id'], role)
 4333 
 4334         grant_url = (
 4335             '/projects/%(project_id)s/users/%(user_id)s/'
 4336             'roles/%(role_id)s' % {
 4337                 'project_id': self.project_id,
 4338                 'user_id': self.user_id,
 4339                 'role_id': role['id']})
 4340 
 4341         # assign a new role
 4342         self.put(grant_url)
 4343 
 4344         # create a trust that delegates the new role
 4345         ref = unit.new_trust_ref(
 4346             trustor_user_id=self.user_id,
 4347             trustee_user_id=self.trustee_user['id'],
 4348             project_id=self.project_id,
 4349             impersonation=False,
 4350             expires=dict(minutes=1),
 4351             role_ids=[role['id']])
 4352 
 4353         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4354         trust = self.assertValidTrustResponse(r)
 4355 
 4356         # delete the grant
 4357         self.delete(grant_url)
 4358 
 4359         # attempt to get a trust token with the deleted grant
 4360         # and ensure it's unauthorized
 4361         auth_data = self.build_authentication_request(
 4362             user_id=self.trustee_user['id'],
 4363             password=self.trustee_user['password'],
 4364             trust_id=trust['id'])
 4365         r = self.v3_create_token(auth_data,
 4366                                  expected_status=http.client.FORBIDDEN)
 4367 
 4368     def test_trust_chained(self):
 4369         """Test that a trust token can't be used to execute another trust.
 4370 
 4371         To do this, we create an A->B->C hierarchy of trusts, then attempt to
 4372         execute the trusts in series (C->B->A).
 4373 
 4374         """
 4375         # create a sub-trustee user
 4376         sub_trustee_user = unit.create_user(
 4377             PROVIDERS.identity_api,
 4378             domain_id=test_v3.DEFAULT_DOMAIN_ID)
 4379         sub_trustee_user_id = sub_trustee_user['id']
 4380 
 4381         # create a new role
 4382         role = unit.new_role_ref()
 4383         PROVIDERS.role_api.create_role(role['id'], role)
 4384 
 4385         # assign the new role to trustee
 4386         self.put(
 4387             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
 4388                 'project_id': self.project_id,
 4389                 'user_id': self.trustee_user['id'],
 4390                 'role_id': role['id']})
 4391 
 4392         # create a trust from trustor -> trustee
 4393         ref = unit.new_trust_ref(
 4394             trustor_user_id=self.user_id,
 4395             trustee_user_id=self.trustee_user['id'],
 4396             project_id=self.project_id,
 4397             impersonation=True,
 4398             expires=dict(minutes=1),
 4399             role_ids=[self.role_id])
 4400         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4401         trust1 = self.assertValidTrustResponse(r)
 4402 
 4403         # authenticate as trustee so we can create a second trust
 4404         auth_data = self.build_authentication_request(
 4405             user_id=self.trustee_user['id'],
 4406             password=self.trustee_user['password'],
 4407             project_id=self.project_id)
 4408         token = self.get_requested_token(auth_data)
 4409 
 4410         # create a trust from trustee -> sub-trustee
 4411         ref = unit.new_trust_ref(
 4412             trustor_user_id=self.trustee_user['id'],
 4413             trustee_user_id=sub_trustee_user_id,
 4414             project_id=self.project_id,
 4415             impersonation=True,
 4416             expires=dict(minutes=1),
 4417             role_ids=[role['id']])
 4418         r = self.post('/OS-TRUST/trusts', token=token, body={'trust': ref})
 4419         trust2 = self.assertValidTrustResponse(r)
 4420 
 4421         # authenticate as sub-trustee and get a trust token
 4422         auth_data = self.build_authentication_request(
 4423             user_id=sub_trustee_user['id'],
 4424             password=sub_trustee_user['password'],
 4425             trust_id=trust2['id'])
 4426         trust_token = self.get_requested_token(auth_data)
 4427 
 4428         # attempt to get the second trust using a trust token
 4429         auth_data = self.build_authentication_request(
 4430             token=trust_token,
 4431             trust_id=trust1['id'])
 4432         r = self.v3_create_token(auth_data,
 4433                                  expected_status=http.client.FORBIDDEN)
 4434 
 4435     def assertTrustTokensRevoked(self, trust_id):
 4436         revocation_response = self.get('/OS-REVOKE/events')
 4437         revocation_events = revocation_response.json_body['events']
 4438         found = False
 4439         for event in revocation_events:
 4440             if event.get('OS-TRUST:trust_id') == trust_id:
 4441                 found = True
 4442         self.assertTrue(found, 'event with trust_id %s not found in list' %
 4443                         trust_id)
 4444 
 4445     def test_delete_trust_revokes_tokens(self):
 4446         ref = unit.new_trust_ref(
 4447             trustor_user_id=self.user_id,
 4448             trustee_user_id=self.trustee_user['id'],
 4449             project_id=self.project_id,
 4450             impersonation=False,
 4451             expires=dict(minutes=1),
 4452             role_ids=[self.role_id])
 4453         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4454         trust = self.assertValidTrustResponse(r)
 4455         trust_id = trust['id']
 4456         auth_data = self.build_authentication_request(
 4457             user_id=self.trustee_user['id'],
 4458             password=self.trustee_user['password'],
 4459             trust_id=trust_id)
 4460         r = self.v3_create_token(auth_data)
 4461         self.assertValidProjectScopedTokenResponse(
 4462             r, self.trustee_user)
 4463         trust_token = r.headers['X-Subject-Token']
 4464         self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
 4465             'trust_id': trust_id})
 4466         headers = {'X-Subject-Token': trust_token}
 4467         self.head('/auth/tokens', headers=headers,
 4468                   expected_status=http.client.NOT_FOUND)
 4469         self.assertTrustTokensRevoked(trust_id)
 4470 
 4471     def disable_user(self, user):
 4472         user['enabled'] = False
 4473         PROVIDERS.identity_api.update_user(user['id'], user)
 4474 
 4475     def test_trust_get_token_fails_if_trustor_disabled(self):
 4476         ref = unit.new_trust_ref(
 4477             trustor_user_id=self.user_id,
 4478             trustee_user_id=self.trustee_user['id'],
 4479             project_id=self.project_id,
 4480             impersonation=False,
 4481             expires=dict(minutes=1),
 4482             role_ids=[self.role_id])
 4483 
 4484         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4485 
 4486         trust = self.assertValidTrustResponse(r, ref)
 4487 
 4488         auth_data = self.build_authentication_request(
 4489             user_id=self.trustee_user['id'],
 4490             password=self.trustee_user['password'],
 4491             trust_id=trust['id'])
 4492         self.v3_create_token(auth_data)
 4493 
 4494         self.disable_user(self.user)
 4495 
 4496         auth_data = self.build_authentication_request(
 4497             user_id=self.trustee_user['id'],
 4498             password=self.trustee_user['password'],
 4499             trust_id=trust['id'])
 4500         self.v3_create_token(auth_data,
 4501                              expected_status=http.client.FORBIDDEN)
 4502 
 4503     def test_trust_get_token_fails_if_trustee_disabled(self):
 4504         ref = unit.new_trust_ref(
 4505             trustor_user_id=self.user_id,
 4506             trustee_user_id=self.trustee_user['id'],
 4507             project_id=self.project_id,
 4508             impersonation=False,
 4509             expires=dict(minutes=1),
 4510             role_ids=[self.role_id])
 4511 
 4512         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4513 
 4514         trust = self.assertValidTrustResponse(r, ref)
 4515 
 4516         auth_data = self.build_authentication_request(
 4517             user_id=self.trustee_user['id'],
 4518             password=self.trustee_user['password'],
 4519             trust_id=trust['id'])
 4520         self.v3_create_token(auth_data)
 4521 
 4522         self.disable_user(self.trustee_user)
 4523 
 4524         auth_data = self.build_authentication_request(
 4525             user_id=self.trustee_user['id'],
 4526             password=self.trustee_user['password'],
 4527             trust_id=trust['id'])
 4528         self.v3_create_token(auth_data,
 4529                              expected_status=http.client.UNAUTHORIZED)
 4530 
 4531     def test_delete_trust(self):
 4532         ref = unit.new_trust_ref(
 4533             trustor_user_id=self.user_id,
 4534             trustee_user_id=self.trustee_user['id'],
 4535             project_id=self.project_id,
 4536             impersonation=False,
 4537             expires=dict(minutes=1),
 4538             role_ids=[self.role_id])
 4539 
 4540         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4541 
 4542         trust = self.assertValidTrustResponse(r, ref)
 4543 
 4544         self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
 4545             'trust_id': trust['id']})
 4546 
 4547         auth_data = self.build_authentication_request(
 4548             user_id=self.trustee_user['id'],
 4549             password=self.trustee_user['password'],
 4550             trust_id=trust['id'])
 4551         self.v3_create_token(auth_data,
 4552                              expected_status=http.client.UNAUTHORIZED)
 4553 
 4554     def test_change_password_invalidates_trust_tokens(self):
 4555         ref = unit.new_trust_ref(
 4556             trustor_user_id=self.user_id,
 4557             trustee_user_id=self.trustee_user['id'],
 4558             project_id=self.project_id,
 4559             impersonation=True,
 4560             expires=dict(minutes=1),
 4561             role_ids=[self.role_id])
 4562 
 4563         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4564         trust = self.assertValidTrustResponse(r)
 4565 
 4566         auth_data = self.build_authentication_request(
 4567             user_id=self.trustee_user['id'],
 4568             password=self.trustee_user['password'],
 4569             trust_id=trust['id'])
 4570         r = self.v3_create_token(auth_data)
 4571 
 4572         self.assertValidProjectScopedTokenResponse(r, self.user)
 4573         trust_token = r.headers.get('X-Subject-Token')
 4574 
 4575         self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
 4576                  self.user_id, token=trust_token)
 4577 
 4578         self.assertValidUserResponse(
 4579             self.patch('/users/%s' % self.trustee_user['id'],
 4580                        body={'user': {'password': uuid.uuid4().hex}}))
 4581 
 4582         self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
 4583                  self.user_id, expected_status=http.client.UNAUTHORIZED,
 4584                  token=trust_token)
 4585 
 4586     def test_trustee_can_do_role_ops(self):
 4587         resp = self.post('/OS-TRUST/trusts',
 4588                          body={'trust': self.redelegated_trust_ref})
 4589         trust = self.assertValidTrustResponse(resp)
 4590         trust_token = self._get_trust_token(trust)
 4591 
 4592         resp = self.get(
 4593             '/OS-TRUST/trusts/%(trust_id)s/roles' % {
 4594                 'trust_id': trust['id']},
 4595             token=trust_token)
 4596         self.assertValidRoleListResponse(resp, self.role)
 4597 
 4598         self.head(
 4599             '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
 4600                 'trust_id': trust['id'],
 4601                 'role_id': self.role['id']},
 4602             token=trust_token,
 4603             expected_status=http.client.OK)
 4604 
 4605         resp = self.get(
 4606             '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
 4607                 'trust_id': trust['id'],
 4608                 'role_id': self.role['id']},
 4609             token=trust_token)
 4610         self.assertValidRoleResponse(resp, self.role)
 4611 
 4612     def test_do_not_consume_remaining_uses_when_get_token_fails(self):
 4613         ref = unit.new_trust_ref(
 4614             trustor_user_id=self.user_id,
 4615             trustee_user_id=self.trustee_user['id'],
 4616             project_id=self.project_id,
 4617             impersonation=False,
 4618             expires=dict(minutes=1),
 4619             role_ids=[self.role_id],
 4620             remaining_uses=3)
 4621         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4622 
 4623         new_trust = r.result.get('trust')
 4624         trust_id = new_trust.get('id')
 4625         # Pass in another user's ID as the trustee, the result being a failed
 4626         # token authenticate and the remaining_uses of the trust should not be
 4627         # decremented.
 4628         auth_data = self.build_authentication_request(
 4629             user_id=self.default_domain_user['id'],
 4630             password=self.default_domain_user['password'],
 4631             trust_id=trust_id)
 4632         self.v3_create_token(auth_data,
 4633                              expected_status=http.client.FORBIDDEN)
 4634 
 4635         r = self.get('/OS-TRUST/trusts/%s' % trust_id)
 4636         self.assertEqual(3, r.result.get('trust').get('remaining_uses'))
 4637 
 4638 
 4639 class TestTrustChain(test_v3.RestfulTestCase):
 4640 
 4641     def config_overrides(self):
 4642         super(TestTrustChain, self).config_overrides()
 4643         self.config_fixture.config(
 4644             group='trust',
 4645             allow_redelegation=True,
 4646             max_redelegation_count=10
 4647         )
 4648 
 4649     def setUp(self):
 4650         super(TestTrustChain, self).setUp()
 4651         """Create a trust chain using redelegation.
 4652 
 4653         A trust chain is a series of trusts that are redelegated. For example,
 4654         self.user_list consists of userA, userB, and userC. The first trust in
 4655         the trust chain is going to be established between self.user and userA,
 4656         call it trustA. Then, userA is going to obtain a trust scoped token
 4657         using trustA, and with that token create a trust between userA and
 4658         userB called trustB. This pattern will continue with userB creating a
 4659         trust with userC.
 4660         So the trust chain should look something like:
 4661             trustA -> trustB -> trustC
 4662         Where:
 4663             self.user is trusting userA with trustA
 4664             userA is trusting userB with trustB
 4665             userB is trusting userC with trustC
 4666 
 4667         """
 4668         self.user_list = list()
 4669         self.trust_chain = list()
 4670         for _ in range(3):
 4671             user = unit.create_user(PROVIDERS.identity_api,
 4672                                     domain_id=self.domain_id)
 4673             self.user_list.append(user)
 4674 
 4675         # trustor->trustee redelegation with impersonation
 4676         trustee = self.user_list[0]
 4677         trust_ref = unit.new_trust_ref(
 4678             trustor_user_id=self.user_id,
 4679             trustee_user_id=trustee['id'],
 4680             project_id=self.project_id,
 4681             impersonation=True,
 4682             expires=dict(minutes=1),
 4683             role_ids=[self.role_id],
 4684             allow_redelegation=True,
 4685             redelegation_count=3)
 4686 
 4687         # Create a trust between self.user and the first user in the list
 4688         r = self.post('/OS-TRUST/trusts',
 4689                       body={'trust': trust_ref})
 4690 
 4691         trust = self.assertValidTrustResponse(r)
 4692         auth_data = self.build_authentication_request(
 4693             user_id=trustee['id'],
 4694             password=trustee['password'],
 4695             trust_id=trust['id'])
 4696 
 4697         # Generate a trusted token for the first user
 4698         trust_token = self.get_requested_token(auth_data)
 4699         self.trust_chain