"Fossies" - the Fresh Open Source Software Archive

Member "keystone-18.0.0/keystone/tests/unit/test_v3_auth.py" (14 Oct 2020, 243406 Bytes) of package /linux/misc/openstack/keystone-18.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_v3_auth.py": 17.0.0_vs_18.0.0.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import copy
   16 import datetime
   17 import fixtures
   18 import itertools
   19 import operator
   20 import re
   21 from unittest import mock
   22 import uuid
   23 
   24 import freezegun
   25 import http.client
   26 from oslo_serialization import jsonutils as json
   27 from oslo_utils import fixture
   28 from oslo_utils import timeutils
   29 from testtools import matchers
   30 from testtools import testcase
   31 
   32 from keystone import auth
   33 from keystone.auth.plugins import totp
   34 from keystone.common import authorization
   35 from keystone.common import provider_api
   36 from keystone.common.rbac_enforcer import policy
   37 from keystone.common import utils
   38 import keystone.conf
   39 from keystone.credential.providers import fernet as credential_fernet
   40 from keystone import exception
   41 from keystone.identity.backends import resource_options as ro
   42 from keystone.tests.common import auth as common_auth
   43 from keystone.tests import unit
   44 from keystone.tests.unit import ksfixtures
   45 from keystone.tests.unit import test_v3
   46 
   47 
   48 CONF = keystone.conf.CONF
   49 PROVIDERS = provider_api.ProviderAPIs
   50 
   51 
   52 class TestMFARules(test_v3.RestfulTestCase):
   53     def config_overrides(self):
   54         super(TestMFARules, self).config_overrides()
   55 
   56         self.useFixture(
   57             ksfixtures.KeyRepository(
   58                 self.config_fixture,
   59                 'fernet_tokens',
   60                 CONF.fernet_tokens.max_active_keys
   61             )
   62         )
   63 
   64         self.useFixture(
   65             ksfixtures.KeyRepository(
   66                 self.config_fixture,
   67                 'credential',
   68                 credential_fernet.MAX_ACTIVE_KEYS
   69             )
   70         )
   71 
   72     def assertValidErrorResponse(self, r):
   73         resp = r.result
   74         if r.headers.get(authorization.AUTH_RECEIPT_HEADER):
   75             self.assertIsNotNone(resp.get('receipt'))
   76             self.assertIsNotNone(resp.get('receipt').get('methods'))
   77         else:
   78             self.assertIsNotNone(resp.get('error'))
   79             self.assertIsNotNone(resp['error'].get('code'))
   80             self.assertIsNotNone(resp['error'].get('title'))
   81             self.assertIsNotNone(resp['error'].get('message'))
   82             self.assertEqual(int(resp['error']['code']), r.status_code)
   83 
   84     def _create_totp_cred(self):
   85         totp_cred = unit.new_totp_credential(self.user_id, self.project_id)
   86         PROVIDERS.credential_api.create_credential(uuid.uuid4().hex, totp_cred)
   87 
   88         def cleanup(testcase):
   89             totp_creds = testcase.credential_api.list_credentials_for_user(
   90                 testcase.user['id'], type='totp')
   91 
   92             for cred in totp_creds:
   93                 testcase.credential_api.delete_credential(cred['id'])
   94 
   95         self.addCleanup(cleanup, testcase=self)
   96         return totp_cred
   97 
   98     def auth_plugin_config_override(self, methods=None, **method_classes):
   99         methods = ['totp', 'token', 'password']
  100         super(TestMFARules, self).auth_plugin_config_override(methods)
  101 
  102     def _update_user_with_MFA_rules(self, rule_list, rules_enabled=True):
  103         user = self.user.copy()
  104         # Do not update password
  105         user.pop('password')
  106         user['options'][ro.MFA_RULES_OPT.option_name] = rule_list
  107         user['options'][ro.MFA_ENABLED_OPT.option_name] = rules_enabled
  108         PROVIDERS.identity_api.update_user(user['id'], user)
  109 
  110     def test_MFA_single_method_rules_requirements_met_succeeds(self):
  111         # ensure that a simple password works if a password-only rules exists
  112         rule_list = [['password'], ['password', 'totp']]
  113         self._update_user_with_MFA_rules(rule_list=rule_list)
  114         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  115         # issues with revocation events that occur at the same time as the
  116         # token issuance. This is a bug with the limited resolution that
  117         # tokens and revocation events have.
  118         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  119         with freezegun.freeze_time(time):
  120             self.v3_create_token(
  121                 self.build_authentication_request(
  122                     user_id=self.user_id,
  123                     password=self.user['password'],
  124                     user_domain_id=self.domain_id,
  125                     project_id=self.project_id))
  126 
  127     def test_MFA_multi_method_rules_requirements_met_succeeds(self):
  128         # validate that multiple auth-methods function if all are specified
  129         # and the rules requires it
  130         rule_list = [['password', 'totp']]
  131         totp_cred = self._create_totp_cred()
  132         self._update_user_with_MFA_rules(rule_list=rule_list)
  133         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  134         # issues with revocation events that occur at the same time as the
  135         # token issuance. This is a bug with the limited resolution that
  136         # tokens and revocation events have.
  137         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  138         with freezegun.freeze_time(time):
  139             auth_req = self.build_authentication_request(
  140                 user_id=self.user_id,
  141                 password=self.user['password'],
  142                 user_domain_id=self.domain_id,
  143                 passcode=totp._generate_totp_passcodes(totp_cred['blob'])[0])
  144             self.v3_create_token(auth_req)
  145 
  146     def test_MFA_single_method_rules_requirements_not_met_fails(self):
  147         # if a rule matching a single auth type is specified and is not matched
  148         # the result should be unauthorized
  149         rule_list = [['totp']]
  150         self._update_user_with_MFA_rules(rule_list=rule_list)
  151         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  152         # issues with revocation events that occur at the same time as the
  153         # token issuance. This is a bug with the limited resolution that
  154         # tokens and revocation events have.
  155         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  156         with freezegun.freeze_time(time):
  157             self.v3_create_token(
  158                 self.build_authentication_request(
  159                     user_id=self.user_id,
  160                     password=self.user['password'],
  161                     user_domain_id=self.domain_id,
  162                     project_id=self.project_id),
  163                 expected_status=http.client.UNAUTHORIZED)
  164 
  165     def test_MFA_multi_method_rules_requirements_not_met_fails(self):
  166         # if multiple rules are specified and only one is passed,
  167         # unauthorized is expected
  168         rule_list = [['password', 'totp']]
  169         self._update_user_with_MFA_rules(rule_list=rule_list)
  170         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  171         # issues with revocation events that occur at the same time as the
  172         # token issuance. This is a bug with the limited resolution that
  173         # tokens and revocation events have.
  174         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  175         with freezegun.freeze_time(time):
  176             self.v3_create_token(
  177                 self.build_authentication_request(
  178                     user_id=self.user_id,
  179                     password=self.user['password'],
  180                     user_domain_id=self.domain_id,
  181                     project_id=self.project_id),
  182                 expected_status=http.client.UNAUTHORIZED)
  183 
  184     def test_MFA_rules_bogus_non_existing_auth_method_succeeds(self):
  185         # Bogus auth methods are thrown out from rules.
  186         rule_list = [['password'], ['BoGusAuThMeTh0dHandl3r']]
  187         self._update_user_with_MFA_rules(rule_list=rule_list)
  188         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  189         # issues with revocation events that occur at the same time as the
  190         # token issuance. This is a bug with the limited resolution that
  191         # tokens and revocation events have.
  192         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  193         with freezegun.freeze_time(time):
  194             self.v3_create_token(
  195                 self.build_authentication_request(
  196                     user_id=self.user_id,
  197                     password=self.user['password'],
  198                     user_domain_id=self.domain_id,
  199                     project_id=self.project_id))
  200 
  201     def test_MFA_rules_disabled_MFA_succeeeds(self):
  202         # ensure that if MFA is "disableD" authentication succeeds, even if
  203         # not enough auth methods are specified
  204         rule_list = [['password', 'totp']]
  205         self._update_user_with_MFA_rules(rule_list=rule_list,
  206                                          rules_enabled=False)
  207         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  208         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  209         # issues with revocation events that occur at the same time as the
  210         # token issuance. This is a bug with the limited resolution that
  211         # tokens and revocation events have.
  212         with freezegun.freeze_time(time):
  213             self.v3_create_token(
  214                 self.build_authentication_request(
  215                     user_id=self.user_id,
  216                     password=self.user['password'],
  217                     user_domain_id=self.domain_id,
  218                     project_id=self.project_id))
  219 
  220     def test_MFA_rules_all_bogus_rules_results_in_default_behavior(self):
  221         # if all the rules are bogus, the result is the same as the default
  222         # behavior, any single password method is sufficient
  223         rule_list = [[uuid.uuid4().hex, uuid.uuid4().hex],
  224                      ['BoGus'],
  225                      ['NonExistantMethod']]
  226         self._update_user_with_MFA_rules(rule_list=rule_list)
  227         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  228         # issues with revocation events that occur at the same time as the
  229         # token issuance. This is a bug with the limited resolution that
  230         # tokens and revocation events have.
  231         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  232         with freezegun.freeze_time(time):
  233             self.v3_create_token(
  234                 self.build_authentication_request(
  235                     user_id=self.user_id,
  236                     password=self.user['password'],
  237                     user_domain_id=self.domain_id,
  238                     project_id=self.project_id))
  239 
  240     def test_MFA_rules_rescope_works_without_token_method_in_rules(self):
  241         rule_list = [['password', 'totp']]
  242         totp_cred = self._create_totp_cred()
  243         self._update_user_with_MFA_rules(rule_list=rule_list)
  244         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  245         # issues with revocation events that occur at the same time as the
  246         # token issuance. This is a bug with the limited resolution that
  247         # tokens and revocation events have.
  248         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  249         with freezegun.freeze_time(time):
  250             auth_data = self.build_authentication_request(
  251                 user_id=self.user_id,
  252                 password=self.user['password'],
  253                 user_domain_id=self.domain_id,
  254                 passcode=totp._generate_totp_passcodes(totp_cred['blob'])[0])
  255             r = self.v3_create_token(auth_data)
  256             auth_data = self.build_authentication_request(
  257                 token=r.headers.get('X-Subject-Token'),
  258                 project_id=self.project_id)
  259             self.v3_create_token(auth_data)
  260 
  261     def test_MFA_requirements_makes_correct_receipt_for_password(self):
  262         # if multiple rules are specified and only one is passed,
  263         # unauthorized is expected
  264         rule_list = [['password', 'totp']]
  265         self._update_user_with_MFA_rules(rule_list=rule_list)
  266         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  267         # issues with revocation events that occur at the same time as the
  268         # token issuance. This is a bug with the limited resolution that
  269         # tokens and revocation events have.
  270         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  271         with freezegun.freeze_time(time):
  272             response = self.admin_request(
  273                 method='POST',
  274                 path='/v3/auth/tokens',
  275                 body=self.build_authentication_request(
  276                     user_id=self.user_id,
  277                     password=self.user['password'],
  278                     user_domain_id=self.domain_id,
  279                     project_id=self.project_id),
  280                 expected_status=http.client.UNAUTHORIZED)
  281 
  282         self.assertIsNotNone(
  283             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  284         resp_data = response.result
  285         # NOTE(adriant): We convert to sets to avoid any potential sorting
  286         # related failures since order isn't important, just content.
  287         self.assertEqual(
  288             {'password'}, set(resp_data.get('receipt').get('methods')))
  289         self.assertEqual(
  290             set(frozenset(r) for r in rule_list),
  291             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  292 
  293     def test_MFA_requirements_makes_correct_receipt_for_totp(self):
  294         # if multiple rules are specified and only one is passed,
  295         # unauthorized is expected
  296         totp_cred = self._create_totp_cred()
  297         rule_list = [['password', 'totp']]
  298         self._update_user_with_MFA_rules(rule_list=rule_list)
  299         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  300         # issues with revocation events that occur at the same time as the
  301         # token issuance. This is a bug with the limited resolution that
  302         # tokens and revocation events have.
  303         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  304         with freezegun.freeze_time(time):
  305             response = self.admin_request(
  306                 method='POST',
  307                 path='/v3/auth/tokens',
  308                 body=self.build_authentication_request(
  309                     user_id=self.user_id,
  310                     user_domain_id=self.domain_id,
  311                     project_id=self.project_id,
  312                     passcode=totp._generate_totp_passcodes(
  313                         totp_cred['blob'])[0]),
  314                 expected_status=http.client.UNAUTHORIZED)
  315 
  316         self.assertIsNotNone(
  317             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  318         resp_data = response.result
  319         # NOTE(adriant): We convert to sets to avoid any potential sorting
  320         # related failures since order isn't important, just content.
  321         self.assertEqual(
  322             {'totp'}, set(resp_data.get('receipt').get('methods')))
  323         self.assertEqual(
  324             set(frozenset(r) for r in rule_list),
  325             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  326 
  327     def test_MFA_requirements_makes_correct_receipt_for_pass_and_totp(self):
  328         # if multiple rules are specified and only one is passed,
  329         # unauthorized is expected
  330         totp_cred = self._create_totp_cred()
  331         rule_list = [['password', 'totp', 'token']]
  332         self._update_user_with_MFA_rules(rule_list=rule_list)
  333         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  334         # issues with revocation events that occur at the same time as the
  335         # token issuance. This is a bug with the limited resolution that
  336         # tokens and revocation events have.
  337         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  338         with freezegun.freeze_time(time):
  339             response = self.admin_request(
  340                 method='POST',
  341                 path='/v3/auth/tokens',
  342                 body=self.build_authentication_request(
  343                     user_id=self.user_id,
  344                     password=self.user['password'],
  345                     user_domain_id=self.domain_id,
  346                     project_id=self.project_id,
  347                     passcode=totp._generate_totp_passcodes(
  348                         totp_cred['blob'])[0]),
  349                 expected_status=http.client.UNAUTHORIZED)
  350 
  351         self.assertIsNotNone(
  352             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  353         resp_data = response.result
  354         # NOTE(adriant): We convert to sets to avoid any potential sorting
  355         # related failures since order isn't important, just content.
  356         self.assertEqual(
  357             {'password', 'totp'}, set(resp_data.get('receipt').get('methods')))
  358         self.assertEqual(
  359             set(frozenset(r) for r in rule_list),
  360             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  361 
  362     def test_MFA_requirements_returns_correct_required_auth_methods(self):
  363         # if multiple rules are specified and only one is passed,
  364         # unauthorized is expected
  365         rule_list = [
  366             ['password', 'totp', 'token'],
  367             ['password', 'totp'],
  368             ['token', 'totp'],
  369             ['BoGusAuThMeTh0dHandl3r']
  370         ]
  371         expect_rule_list = rule_list = [
  372             ['password', 'totp', 'token'],
  373             ['password', 'totp'],
  374         ]
  375         self._update_user_with_MFA_rules(rule_list=rule_list)
  376         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  377         # issues with revocation events that occur at the same time as the
  378         # token issuance. This is a bug with the limited resolution that
  379         # tokens and revocation events have.
  380         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  381         with freezegun.freeze_time(time):
  382             response = self.admin_request(
  383                 method='POST',
  384                 path='/v3/auth/tokens',
  385                 body=self.build_authentication_request(
  386                     user_id=self.user_id,
  387                     password=self.user['password'],
  388                     user_domain_id=self.domain_id,
  389                     project_id=self.project_id),
  390                 expected_status=http.client.UNAUTHORIZED)
  391 
  392         self.assertIsNotNone(
  393             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  394         resp_data = response.result
  395         # NOTE(adriant): We convert to sets to avoid any potential sorting
  396         # related failures since order isn't important, just content.
  397         self.assertEqual(
  398             {'password'}, set(resp_data.get('receipt').get('methods')))
  399         self.assertEqual(
  400             set(frozenset(r) for r in expect_rule_list),
  401             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  402 
  403     def test_MFA_consuming_receipt_with_totp(self):
  404         # if multiple rules are specified and only one is passed,
  405         # unauthorized is expected
  406         totp_cred = self._create_totp_cred()
  407         rule_list = [['password', 'totp']]
  408         self._update_user_with_MFA_rules(rule_list=rule_list)
  409         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  410         # issues with revocation events that occur at the same time as the
  411         # token issuance. This is a bug with the limited resolution that
  412         # tokens and revocation events have.
  413         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  414         with freezegun.freeze_time(time):
  415             response = self.admin_request(
  416                 method='POST',
  417                 path='/v3/auth/tokens',
  418                 body=self.build_authentication_request(
  419                     user_id=self.user_id,
  420                     password=self.user['password'],
  421                     user_domain_id=self.domain_id,
  422                     project_id=self.project_id),
  423                 expected_status=http.client.UNAUTHORIZED)
  424 
  425         self.assertIsNotNone(
  426             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  427         receipt = response.headers.get(authorization.AUTH_RECEIPT_HEADER)
  428         resp_data = response.result
  429         # NOTE(adriant): We convert to sets to avoid any potential sorting
  430         # related failures since order isn't important, just content.
  431         self.assertEqual(
  432             {'password'}, set(resp_data.get('receipt').get('methods')))
  433         self.assertEqual(
  434             set(frozenset(r) for r in rule_list),
  435             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  436 
  437         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  438         with freezegun.freeze_time(time):
  439             response = self.admin_request(
  440                 method='POST',
  441                 path='/v3/auth/tokens',
  442                 headers={authorization.AUTH_RECEIPT_HEADER: receipt},
  443                 body=self.build_authentication_request(
  444                     user_id=self.user_id,
  445                     user_domain_id=self.domain_id,
  446                     project_id=self.project_id,
  447                     passcode=totp._generate_totp_passcodes(
  448                         totp_cred['blob'])[0]))
  449 
  450     def test_MFA_consuming_receipt_not_found(self):
  451         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  452         with freezegun.freeze_time(time):
  453             response = self.admin_request(
  454                 method='POST',
  455                 path='/v3/auth/tokens',
  456                 headers={authorization.AUTH_RECEIPT_HEADER: "bogus-receipt"},
  457                 body=self.build_authentication_request(
  458                     user_id=self.user_id,
  459                     user_domain_id=self.domain_id,
  460                     project_id=self.project_id),
  461                 expected_status=http.client.UNAUTHORIZED)
  462         self.assertEqual(401, response.result['error']['code'])
  463 
  464 
  465 class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
  466     def setUp(self):
  467         super(TestAuthInfo, self).setUp()
  468         auth.core.load_auth_methods()
  469 
  470     def test_unsupported_auth_method(self):
  471         auth_data = {'methods': ['abc']}
  472         auth_data['abc'] = {'test': 'test'}
  473         auth_data = {'identity': auth_data}
  474         self.assertRaises(exception.AuthMethodNotSupported,
  475                           auth.core.AuthInfo.create,
  476                           auth_data)
  477 
  478     def test_missing_auth_method_data(self):
  479         auth_data = {'methods': ['password']}
  480         auth_data = {'identity': auth_data}
  481         self.assertRaises(exception.ValidationError,
  482                           auth.core.AuthInfo.create,
  483                           auth_data)
  484 
  485     def test_project_name_no_domain(self):
  486         auth_data = self.build_authentication_request(
  487             username='test',
  488             password='test',
  489             project_name='abc')['auth']
  490         self.assertRaises(exception.ValidationError,
  491                           auth.core.AuthInfo.create,
  492                           auth_data)
  493 
  494     def test_both_project_and_domain_in_scope(self):
  495         auth_data = self.build_authentication_request(
  496             user_id='test',
  497             password='test',
  498             project_name='test',
  499             domain_name='test')['auth']
  500         self.assertRaises(exception.ValidationError,
  501                           auth.core.AuthInfo.create,
  502                           auth_data)
  503 
  504     def test_get_method_names_duplicates(self):
  505         auth_data = self.build_authentication_request(
  506             token='test',
  507             user_id='test',
  508             password='test')['auth']
  509         auth_data['identity']['methods'] = ['password', 'token',
  510                                             'password', 'password']
  511         auth_info = auth.core.AuthInfo.create(auth_data)
  512         self.assertEqual(['password', 'token'],
  513                          auth_info.get_method_names())
  514 
  515     def test_get_method_data_invalid_method(self):
  516         auth_data = self.build_authentication_request(
  517             user_id='test',
  518             password='test')['auth']
  519         auth_info = auth.core.AuthInfo.create(auth_data)
  520 
  521         method_name = uuid.uuid4().hex
  522         self.assertRaises(exception.ValidationError,
  523                           auth_info.get_method_data,
  524                           method_name)
  525 
  526 
  527 class TokenAPITests(object):
  528     # Why is this not just setUp? Because TokenAPITests is not a test class
  529     # itself. If TokenAPITests became a subclass of the testcase, it would get
  530     # called by the enumerate-tests-in-file code. The way the functions get
  531     # resolved in Python for multiple inheritance means that a setUp in this
  532     # would get skipped by the testrunner.
  533     def doSetUp(self):
  534         r = self.v3_create_token(self.build_authentication_request(
  535             username=self.user['name'],
  536             user_domain_id=self.domain_id,
  537             password=self.user['password']))
  538         self.v3_token_data = r.result
  539         self.v3_token = r.headers.get('X-Subject-Token')
  540         self.headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')}
  541 
  542     def _get_unscoped_token(self):
  543         auth_data = self.build_authentication_request(
  544             user_id=self.user['id'],
  545             password=self.user['password'])
  546         r = self.post('/auth/tokens', body=auth_data)
  547         self.assertValidUnscopedTokenResponse(r)
  548         return r.headers.get('X-Subject-Token')
  549 
  550     def _get_domain_scoped_token(self):
  551         auth_data = self.build_authentication_request(
  552             user_id=self.user['id'],
  553             password=self.user['password'],
  554             domain_id=self.domain_id)
  555         r = self.post('/auth/tokens', body=auth_data)
  556         self.assertValidDomainScopedTokenResponse(r)
  557         return r.headers.get('X-Subject-Token')
  558 
  559     def _get_project_scoped_token(self):
  560         auth_data = self.build_authentication_request(
  561             user_id=self.user['id'],
  562             password=self.user['password'],
  563             project_id=self.project_id)
  564         r = self.post('/auth/tokens', body=auth_data)
  565         self.assertValidProjectScopedTokenResponse(r)
  566         return r.headers.get('X-Subject-Token')
  567 
  568     def _get_trust_scoped_token(self, trustee_user, trust):
  569         auth_data = self.build_authentication_request(
  570             user_id=trustee_user['id'],
  571             password=trustee_user['password'],
  572             trust_id=trust['id'])
  573         r = self.post('/auth/tokens', body=auth_data)
  574         self.assertValidProjectScopedTokenResponse(r)
  575         return r.headers.get('X-Subject-Token')
  576 
  577     def _create_trust(self, impersonation=False):
  578         # Create a trustee user
  579         trustee_user = unit.create_user(PROVIDERS.identity_api,
  580                                         domain_id=self.domain_id)
  581         ref = unit.new_trust_ref(
  582             trustor_user_id=self.user_id,
  583             trustee_user_id=trustee_user['id'],
  584             project_id=self.project_id,
  585             impersonation=impersonation,
  586             role_ids=[self.role_id])
  587 
  588         # Create a trust
  589         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
  590         trust = self.assertValidTrustResponse(r)
  591         return (trustee_user, trust)
  592 
  593     def _validate_token(self, token,
  594                         expected_status=http.client.OK, allow_expired=False):
  595         path = '/v3/auth/tokens'
  596 
  597         if allow_expired:
  598             path += '?allow_expired=1'
  599 
  600         return self.admin_request(
  601             path=path,
  602             headers={'X-Auth-Token': self.get_admin_token(),
  603                      'X-Subject-Token': token},
  604             method='GET',
  605             expected_status=expected_status
  606         )
  607 
  608     def _revoke_token(self, token, expected_status=http.client.NO_CONTENT):
  609         return self.delete(
  610             '/auth/tokens',
  611             headers={'x-subject-token': token},
  612             expected_status=expected_status)
  613 
  614     def _set_user_enabled(self, user, enabled=True):
  615         user['enabled'] = enabled
  616         PROVIDERS.identity_api.update_user(user['id'], user)
  617 
  618     def _create_project_and_set_as_default_project(self):
  619         # create a new project
  620         ref = unit.new_project_ref(domain_id=self.domain_id)
  621         r = self.post('/projects', body={'project': ref})
  622         project = self.assertValidProjectResponse(r, ref)
  623 
  624         # grant the user a role on the project
  625         self.put(
  626             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
  627                 'user_id': self.user['id'],
  628                 'project_id': project['id'],
  629                 'role_id': self.role['id']})
  630 
  631         # make the new project the user's default project
  632         body = {'user': {'default_project_id': project['id']}}
  633         r = self.patch('/users/%(user_id)s' % {
  634             'user_id': self.user['id']},
  635             body=body)
  636         self.assertValidUserResponse(r)
  637 
  638         return project
  639 
  640     def test_auth_with_token_as_different_user_fails(self):
  641         # get the token for a user. This is self.user which is different from
  642         # self.default_domain_user.
  643         token = self.get_scoped_token()
  644         # try both password and token methods with different identities and it
  645         # should fail
  646         auth_data = self.build_authentication_request(
  647             token=token,
  648             user_id=self.default_domain_user['id'],
  649             password=self.default_domain_user['password'])
  650         self.v3_create_token(auth_data,
  651                              expected_status=http.client.UNAUTHORIZED)
  652 
  653     def test_create_token_for_user_without_password_fails(self):
  654         user = unit.new_user_ref(domain_id=self.domain['id'])
  655         del user['password']  # can't have a password for this test
  656         user = PROVIDERS.identity_api.create_user(user)
  657 
  658         auth_data = self.build_authentication_request(
  659             user_id=user['id'],
  660             password='password')
  661 
  662         self.v3_create_token(auth_data,
  663                              expected_status=http.client.UNAUTHORIZED)
  664 
  665     def test_create_unscoped_token_by_authenticating_with_unscoped_token(self):
  666         auth_data = self.build_authentication_request(
  667             user_id=self.user['id'],
  668             password=self.user['password'])
  669         r = self.v3_create_token(auth_data)
  670         self.assertValidUnscopedTokenResponse(r)
  671         token_id = r.headers.get('X-Subject-Token')
  672 
  673         auth_data = self.build_authentication_request(token=token_id)
  674         r = self.v3_create_token(auth_data)
  675         self.assertValidUnscopedTokenResponse(r)
  676 
  677     def test_create_unscoped_token_with_user_id(self):
  678         auth_data = self.build_authentication_request(
  679             user_id=self.user['id'],
  680             password=self.user['password'])
  681         r = self.v3_create_token(auth_data)
  682         self.assertValidUnscopedTokenResponse(r)
  683 
  684     def test_create_unscoped_token_with_user_domain_id(self):
  685         auth_data = self.build_authentication_request(
  686             username=self.user['name'],
  687             user_domain_id=self.domain['id'],
  688             password=self.user['password'])
  689         r = self.v3_create_token(auth_data)
  690         self.assertValidUnscopedTokenResponse(r)
  691 
  692     def test_create_unscoped_token_with_user_domain_name(self):
  693         auth_data = self.build_authentication_request(
  694             username=self.user['name'],
  695             user_domain_name=self.domain['name'],
  696             password=self.user['password'])
  697         r = self.v3_create_token(auth_data)
  698         self.assertValidUnscopedTokenResponse(r)
  699 
  700     def test_validate_unscoped_token(self):
  701         unscoped_token = self._get_unscoped_token()
  702         r = self._validate_token(unscoped_token)
  703         self.assertValidUnscopedTokenResponse(r)
  704 
  705     def test_validate_expired_unscoped_token_returns_not_found(self):
  706         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
  707         # use the context manager of freezegun without sqlite issues.
  708         self.config_fixture.config(group='token',
  709                                    expiration=10)
  710         time = datetime.datetime.utcnow()
  711         with freezegun.freeze_time(time) as frozen_datetime:
  712             unscoped_token = self._get_unscoped_token()
  713             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
  714             self._validate_token(
  715                 unscoped_token,
  716                 expected_status=http.client.NOT_FOUND
  717             )
  718 
  719     def test_revoke_unscoped_token(self):
  720         unscoped_token = self._get_unscoped_token()
  721         r = self._validate_token(unscoped_token)
  722         self.assertValidUnscopedTokenResponse(r)
  723         self._revoke_token(unscoped_token)
  724         self._validate_token(unscoped_token,
  725                              expected_status=http.client.NOT_FOUND)
  726 
  727     def test_create_explicit_unscoped_token(self):
  728         self._create_project_and_set_as_default_project()
  729 
  730         # explicitly ask for an unscoped token
  731         auth_data = self.build_authentication_request(
  732             user_id=self.user['id'],
  733             password=self.user['password'],
  734             unscoped="unscoped")
  735         r = self.post('/auth/tokens', body=auth_data, noauth=True)
  736         self.assertValidUnscopedTokenResponse(r)
  737 
  738     def test_disabled_users_default_project_result_in_unscoped_token(self):
  739         # create a disabled project to work with
  740         project = self.create_new_default_project_for_user(
  741             self.user['id'], self.domain_id, enable_project=False)
  742 
  743         # assign a role to user for the new project
  744         PROVIDERS.assignment_api.add_role_to_user_and_project(
  745             self.user['id'], project['id'], self.role_id
  746         )
  747 
  748         # attempt to authenticate without requesting a project
  749         auth_data = self.build_authentication_request(
  750             user_id=self.user['id'],
  751             password=self.user['password'])
  752         r = self.v3_create_token(auth_data)
  753         self.assertValidUnscopedTokenResponse(r)
  754 
  755     def test_disabled_default_project_domain_result_in_unscoped_token(self):
  756         domain_ref = unit.new_domain_ref()
  757         r = self.post('/domains', body={'domain': domain_ref})
  758         domain = self.assertValidDomainResponse(r, domain_ref)
  759 
  760         project = self.create_new_default_project_for_user(
  761             self.user['id'], domain['id'])
  762 
  763         # assign a role to user for the new project
  764         PROVIDERS.assignment_api.add_role_to_user_and_project(
  765             self.user['id'], project['id'], self.role_id
  766         )
  767 
  768         # now disable the project domain
  769         body = {'domain': {'enabled': False}}
  770         r = self.patch('/domains/%(domain_id)s' % {'domain_id': domain['id']},
  771                        body=body)
  772         self.assertValidDomainResponse(r)
  773 
  774         # attempt to authenticate without requesting a project
  775         auth_data = self.build_authentication_request(
  776             user_id=self.user['id'],
  777             password=self.user['password'])
  778         r = self.v3_create_token(auth_data)
  779         self.assertValidUnscopedTokenResponse(r)
  780 
  781     def test_unscoped_token_is_invalid_after_disabling_user(self):
  782         unscoped_token = self._get_unscoped_token()
  783         # Make sure the token is valid
  784         r = self._validate_token(unscoped_token)
  785         self.assertValidUnscopedTokenResponse(r)
  786         # Disable the user
  787         self._set_user_enabled(self.user, enabled=False)
  788         # Ensure validating a token for a disabled user fails
  789         self._validate_token(
  790             unscoped_token,
  791             expected_status=http.client.NOT_FOUND
  792         )
  793 
  794     def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
  795         unscoped_token = self._get_unscoped_token()
  796         # Make sure the token is valid
  797         r = self._validate_token(unscoped_token)
  798         self.assertValidUnscopedTokenResponse(r)
  799         # Disable the user
  800         self._set_user_enabled(self.user, enabled=False)
  801         # Ensure validating a token for a disabled user fails
  802         self._validate_token(
  803             unscoped_token,
  804             expected_status=http.client.NOT_FOUND
  805         )
  806         # Enable the user
  807         self._set_user_enabled(self.user)
  808         # Ensure validating a token for a re-enabled user fails
  809         self._validate_token(
  810             unscoped_token,
  811             expected_status=http.client.NOT_FOUND
  812         )
  813 
  814     def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
  815         unscoped_token = self._get_unscoped_token()
  816         # Make sure the token is valid
  817         r = self._validate_token(unscoped_token)
  818         self.assertValidUnscopedTokenResponse(r)
  819         # Disable the user's domain
  820         self.domain['enabled'] = False
  821         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
  822         # Ensure validating a token for a disabled user fails
  823         self._validate_token(
  824             unscoped_token,
  825             expected_status=http.client.NOT_FOUND
  826         )
  827 
  828     def test_unscoped_token_is_invalid_after_changing_user_password(self):
  829         unscoped_token = self._get_unscoped_token()
  830         # Make sure the token is valid
  831         r = self._validate_token(unscoped_token)
  832         self.assertValidUnscopedTokenResponse(r)
  833         # Change user's password
  834         self.user['password'] = 'Password1'
  835         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  836         # Ensure updating user's password revokes existing user's tokens
  837         self._validate_token(
  838             unscoped_token,
  839             expected_status=http.client.NOT_FOUND
  840         )
  841 
  842     def test_create_system_token_with_user_id(self):
  843         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  844             'user_id': self.user['id'],
  845             'role_id': self.role_id
  846         }
  847         self.put(path=path)
  848 
  849         auth_request_body = self.build_authentication_request(
  850             user_id=self.user['id'],
  851             password=self.user['password'],
  852             system=True
  853         )
  854 
  855         response = self.v3_create_token(auth_request_body)
  856         self.assertValidSystemScopedTokenResponse(response)
  857 
  858     def test_create_system_token_with_username(self):
  859         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  860             'user_id': self.user['id'],
  861             'role_id': self.role_id
  862         }
  863         self.put(path=path)
  864 
  865         auth_request_body = self.build_authentication_request(
  866             username=self.user['name'],
  867             password=self.user['password'],
  868             user_domain_id=self.domain['id'],
  869             system=True
  870         )
  871 
  872         response = self.v3_create_token(auth_request_body)
  873         self.assertValidSystemScopedTokenResponse(response)
  874 
  875     def test_create_system_token_fails_without_system_assignment(self):
  876         auth_request_body = self.build_authentication_request(
  877             user_id=self.user['id'],
  878             password=self.user['password'],
  879             system=True
  880         )
  881         self.v3_create_token(
  882             auth_request_body,
  883             expected_status=http.client.UNAUTHORIZED
  884         )
  885 
  886     def test_system_token_is_invalid_after_disabling_user(self):
  887         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  888             'user_id': self.user['id'],
  889             'role_id': self.role_id
  890         }
  891         self.put(path=path)
  892 
  893         auth_request_body = self.build_authentication_request(
  894             username=self.user['name'],
  895             password=self.user['password'],
  896             user_domain_id=self.domain['id'],
  897             system=True
  898         )
  899 
  900         response = self.v3_create_token(auth_request_body)
  901         self.assertValidSystemScopedTokenResponse(response)
  902         token = response.headers.get('X-Subject-Token')
  903         self._validate_token(token)
  904 
  905         # NOTE(lbragstad): This would make a good test for groups, but
  906         # apparently it's not possible to disable a group.
  907         user_ref = {
  908             'user': {
  909                 'enabled': False
  910             }
  911         }
  912         self.patch(
  913             '/users/%(user_id)s' % {'user_id': self.user['id']},
  914             body=user_ref
  915         )
  916 
  917         self.admin_request(
  918             path='/v3/auth/tokens',
  919             headers={'X-Auth-Token': token,
  920                      'X-Subject-Token': token},
  921             method='GET',
  922             expected_status=http.client.UNAUTHORIZED
  923         )
  924         self.admin_request(
  925             path='/v3/auth/tokens',
  926             headers={'X-Auth-Token': token,
  927                      'X-Subject-Token': token},
  928             method='HEAD',
  929             expected_status=http.client.UNAUTHORIZED
  930         )
  931 
  932     def test_create_system_token_via_system_group_assignment(self):
  933         ref = {
  934             'group': unit.new_group_ref(
  935                 domain_id=CONF.identity.default_domain_id
  936             )
  937         }
  938 
  939         group = self.post('/groups', body=ref).json_body['group']
  940         path = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
  941             'group_id': group['id'],
  942             'role_id': self.role_id
  943         }
  944         self.put(path=path)
  945 
  946         path = '/groups/%(group_id)s/users/%(user_id)s' % {
  947             'group_id': group['id'],
  948             'user_id': self.user['id']
  949         }
  950         self.put(path=path)
  951 
  952         auth_request_body = self.build_authentication_request(
  953             user_id=self.user['id'],
  954             password=self.user['password'],
  955             system=True
  956         )
  957         response = self.v3_create_token(auth_request_body)
  958         self.assertValidSystemScopedTokenResponse(response)
  959         token = response.headers.get('X-Subject-Token')
  960         self._validate_token(token)
  961 
  962     def test_revoke_system_token(self):
  963         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  964             'user_id': self.user['id'],
  965             'role_id': self.role_id
  966         }
  967         self.put(path=path)
  968 
  969         auth_request_body = self.build_authentication_request(
  970             username=self.user['name'],
  971             password=self.user['password'],
  972             user_domain_id=self.domain['id'],
  973             system=True
  974         )
  975 
  976         response = self.v3_create_token(auth_request_body)
  977         self.assertValidSystemScopedTokenResponse(response)
  978         token = response.headers.get('X-Subject-Token')
  979         self._validate_token(token)
  980         self._revoke_token(token)
  981         self._validate_token(token, expected_status=http.client.NOT_FOUND)
  982 
  983     def test_system_token_is_invalid_after_deleting_system_role(self):
  984         ref = {'role': unit.new_role_ref()}
  985         system_role = self.post('/roles', body=ref).json_body['role']
  986 
  987         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  988             'user_id': self.user['id'],
  989             'role_id': system_role['id']
  990         }
  991         self.put(path=path)
  992 
  993         auth_request_body = self.build_authentication_request(
  994             username=self.user['name'],
  995             password=self.user['password'],
  996             user_domain_id=self.domain['id'],
  997             system=True
  998         )
  999 
 1000         response = self.v3_create_token(auth_request_body)
 1001         self.assertValidSystemScopedTokenResponse(response)
 1002         token = response.headers.get('X-Subject-Token')
 1003         self._validate_token(token)
 1004 
 1005         self.delete('/roles/%(role_id)s' % {'role_id': system_role['id']})
 1006         self._validate_token(token, expected_status=http.client.NOT_FOUND)
 1007 
 1008     def test_rescoping_a_system_token_for_a_project_token_fails(self):
 1009         ref = {'role': unit.new_role_ref()}
 1010         system_role = self.post('/roles', body=ref).json_body['role']
 1011 
 1012         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
 1013             'user_id': self.user['id'],
 1014             'role_id': system_role['id']
 1015         }
 1016         self.put(path=path)
 1017 
 1018         auth_request_body = self.build_authentication_request(
 1019             username=self.user['name'],
 1020             password=self.user['password'],
 1021             user_domain_id=self.domain['id'],
 1022             system=True
 1023         )
 1024         response = self.v3_create_token(auth_request_body)
 1025         self.assertValidSystemScopedTokenResponse(response)
 1026         system_token = response.headers.get('X-Subject-Token')
 1027 
 1028         auth_request_body = self.build_authentication_request(
 1029             token=system_token, project_id=self.project_id
 1030         )
 1031         self.v3_create_token(
 1032             auth_request_body, expected_status=http.client.FORBIDDEN
 1033         )
 1034 
 1035     def test_rescoping_a_system_token_for_a_domain_token_fails(self):
 1036         ref = {'role': unit.new_role_ref()}
 1037         system_role = self.post('/roles', body=ref).json_body['role']
 1038 
 1039         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
 1040             'user_id': self.user['id'],
 1041             'role_id': system_role['id']
 1042         }
 1043         self.put(path=path)
 1044 
 1045         auth_request_body = self.build_authentication_request(
 1046             username=self.user['name'],
 1047             password=self.user['password'],
 1048             user_domain_id=self.domain['id'],
 1049             system=True
 1050         )
 1051         response = self.v3_create_token(auth_request_body)
 1052         self.assertValidSystemScopedTokenResponse(response)
 1053         system_token = response.headers.get('X-Subject-Token')
 1054 
 1055         auth_request_body = self.build_authentication_request(
 1056             token=system_token, domain_id=CONF.identity.default_domain_id
 1057         )
 1058         self.v3_create_token(
 1059             auth_request_body, expected_status=http.client.FORBIDDEN
 1060         )
 1061 
 1062     def test_create_domain_token_scoped_with_domain_id_and_user_id(self):
 1063         # grant the user a role on the domain
 1064         path = '/domains/%s/users/%s/roles/%s' % (
 1065             self.domain['id'], self.user['id'], self.role['id'])
 1066         self.put(path=path)
 1067 
 1068         auth_data = self.build_authentication_request(
 1069             user_id=self.user['id'],
 1070             password=self.user['password'],
 1071             domain_id=self.domain['id'])
 1072         r = self.v3_create_token(auth_data)
 1073         self.assertValidDomainScopedTokenResponse(r)
 1074 
 1075     def test_create_domain_token_scoped_with_domain_id_and_username(self):
 1076         # grant the user a role on the domain
 1077         path = '/domains/%s/users/%s/roles/%s' % (
 1078             self.domain['id'], self.user['id'], self.role['id'])
 1079         self.put(path=path)
 1080 
 1081         auth_data = self.build_authentication_request(
 1082             username=self.user['name'],
 1083             user_domain_id=self.domain['id'],
 1084             password=self.user['password'],
 1085             domain_id=self.domain['id'])
 1086         r = self.v3_create_token(auth_data)
 1087         self.assertValidDomainScopedTokenResponse(r)
 1088 
 1089     def test_create_domain_token_scoped_with_domain_id(self):
 1090         # grant the user a role on the domain
 1091         path = '/domains/%s/users/%s/roles/%s' % (
 1092             self.domain['id'], self.user['id'], self.role['id'])
 1093         self.put(path=path)
 1094 
 1095         auth_data = self.build_authentication_request(
 1096             username=self.user['name'],
 1097             user_domain_name=self.domain['name'],
 1098             password=self.user['password'],
 1099             domain_id=self.domain['id'])
 1100         r = self.v3_create_token(auth_data)
 1101         self.assertValidDomainScopedTokenResponse(r)
 1102 
 1103     def test_create_domain_token_scoped_with_domain_name(self):
 1104         # grant the user a role on the domain
 1105         path = '/domains/%s/users/%s/roles/%s' % (
 1106             self.domain['id'], self.user['id'], self.role['id'])
 1107         self.put(path=path)
 1108 
 1109         auth_data = self.build_authentication_request(
 1110             user_id=self.user['id'],
 1111             password=self.user['password'],
 1112             domain_name=self.domain['name'])
 1113         r = self.v3_create_token(auth_data)
 1114         self.assertValidDomainScopedTokenResponse(r)
 1115 
 1116     def test_create_domain_token_scoped_with_domain_name_and_username(self):
 1117         # grant the user a role on the domain
 1118         path = '/domains/%s/users/%s/roles/%s' % (
 1119             self.domain['id'], self.user['id'], self.role['id'])
 1120         self.put(path=path)
 1121 
 1122         auth_data = self.build_authentication_request(
 1123             username=self.user['name'],
 1124             user_domain_id=self.domain['id'],
 1125             password=self.user['password'],
 1126             domain_name=self.domain['name'])
 1127         r = self.v3_create_token(auth_data)
 1128         self.assertValidDomainScopedTokenResponse(r)
 1129 
 1130     def test_create_domain_token_with_only_domain_name_and_username(self):
 1131         # grant the user a role on the domain
 1132         path = '/domains/%s/users/%s/roles/%s' % (
 1133             self.domain['id'], self.user['id'], self.role['id'])
 1134         self.put(path=path)
 1135 
 1136         auth_data = self.build_authentication_request(
 1137             username=self.user['name'],
 1138             user_domain_name=self.domain['name'],
 1139             password=self.user['password'],
 1140             domain_name=self.domain['name'])
 1141         r = self.v3_create_token(auth_data)
 1142         self.assertValidDomainScopedTokenResponse(r)
 1143 
 1144     def test_create_domain_token_with_group_role(self):
 1145         group = unit.new_group_ref(domain_id=self.domain_id)
 1146         group = PROVIDERS.identity_api.create_group(group)
 1147 
 1148         # add user to group
 1149         PROVIDERS.identity_api.add_user_to_group(self.user['id'], group['id'])
 1150 
 1151         # grant the domain role to group
 1152         path = '/domains/%s/groups/%s/roles/%s' % (
 1153             self.domain['id'], group['id'], self.role['id'])
 1154         self.put(path=path)
 1155 
 1156         # now get a domain-scoped token
 1157         auth_data = self.build_authentication_request(
 1158             user_id=self.user['id'],
 1159             password=self.user['password'],
 1160             domain_id=self.domain['id'])
 1161         r = self.v3_create_token(auth_data)
 1162         self.assertValidDomainScopedTokenResponse(r)
 1163 
 1164     def test_create_domain_token_fails_if_domain_name_unsafe(self):
 1165         """Verify authenticate to a domain with unsafe name fails."""
 1166         # Start with url name restrictions off, so we can create the unsafe
 1167         # named domain
 1168         self.config_fixture.config(group='resource',
 1169                                    domain_name_url_safe='off')
 1170         unsafe_name = 'i am not / safe'
 1171         domain = unit.new_domain_ref(name=unsafe_name)
 1172         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1173         role_member = unit.new_role_ref()
 1174         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1175         PROVIDERS.assignment_api.create_grant(
 1176             role_member['id'],
 1177             user_id=self.user['id'],
 1178             domain_id=domain['id'])
 1179 
 1180         auth_data = self.build_authentication_request(
 1181             user_id=self.user['id'],
 1182             password=self.user['password'],
 1183             domain_name=domain['name'])
 1184 
 1185         # Since name url restriction is off, we should be able to authenticate
 1186         self.v3_create_token(auth_data)
 1187 
 1188         # Set the name url restriction to new, which should still allow us to
 1189         # authenticate
 1190         self.config_fixture.config(group='resource',
 1191                                    project_name_url_safe='new')
 1192         self.v3_create_token(auth_data)
 1193 
 1194         # Set the name url restriction to strict and we should fail to
 1195         # authenticate
 1196         self.config_fixture.config(group='resource',
 1197                                    domain_name_url_safe='strict')
 1198         self.v3_create_token(auth_data,
 1199                              expected_status=http.client.UNAUTHORIZED)
 1200 
 1201     def test_create_domain_token_without_grant_returns_unauthorized(self):
 1202         auth_data = self.build_authentication_request(
 1203             user_id=self.user['id'],
 1204             password=self.user['password'],
 1205             domain_id=self.domain['id'])
 1206         # this fails because the user does not have a role on self.domain
 1207         self.v3_create_token(auth_data,
 1208                              expected_status=http.client.UNAUTHORIZED)
 1209 
 1210     def test_validate_domain_scoped_token(self):
 1211         # Grant user access to domain
 1212         PROVIDERS.assignment_api.create_grant(
 1213             self.role['id'], user_id=self.user['id'],
 1214             domain_id=self.domain['id']
 1215         )
 1216         domain_scoped_token = self._get_domain_scoped_token()
 1217         r = self._validate_token(domain_scoped_token)
 1218         self.assertValidDomainScopedTokenResponse(r)
 1219         resp_json = json.loads(r.body)
 1220         self.assertIsNotNone(resp_json['token']['catalog'])
 1221         self.assertIsNotNone(resp_json['token']['roles'])
 1222         self.assertIsNotNone(resp_json['token']['domain'])
 1223 
 1224     def test_validate_expired_domain_scoped_token_returns_not_found(self):
 1225         # Grant user access to domain
 1226         PROVIDERS.assignment_api.create_grant(
 1227             self.role['id'], user_id=self.user['id'],
 1228             domain_id=self.domain['id']
 1229         )
 1230         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1231         # use the context manager of freezegun without sqlite issues.
 1232         self.config_fixture.config(group='token',
 1233                                    expiration=10)
 1234         time = datetime.datetime.utcnow()
 1235         with freezegun.freeze_time(time) as frozen_datetime:
 1236             domain_scoped_token = self._get_domain_scoped_token()
 1237             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1238             self._validate_token(
 1239                 domain_scoped_token,
 1240                 expected_status=http.client.NOT_FOUND
 1241             )
 1242 
 1243     def test_domain_scoped_token_is_invalid_after_disabling_user(self):
 1244         # Grant user access to domain
 1245         PROVIDERS.assignment_api.create_grant(
 1246             self.role['id'], user_id=self.user['id'],
 1247             domain_id=self.domain['id']
 1248         )
 1249         domain_scoped_token = self._get_domain_scoped_token()
 1250         # Make sure the token is valid
 1251         r = self._validate_token(domain_scoped_token)
 1252         self.assertValidDomainScopedTokenResponse(r)
 1253         # Disable user
 1254         self._set_user_enabled(self.user, enabled=False)
 1255         # Ensure validating a token for a disabled user fails
 1256         self._validate_token(
 1257             domain_scoped_token,
 1258             expected_status=http.client.NOT_FOUND
 1259         )
 1260 
 1261     def test_domain_scoped_token_is_invalid_after_deleting_grant(self):
 1262         # Grant user access to domain
 1263         PROVIDERS.assignment_api.create_grant(
 1264             self.role['id'], user_id=self.user['id'],
 1265             domain_id=self.domain['id']
 1266         )
 1267         domain_scoped_token = self._get_domain_scoped_token()
 1268         # Make sure the token is valid
 1269         r = self._validate_token(domain_scoped_token)
 1270         self.assertValidDomainScopedTokenResponse(r)
 1271         # Delete access to domain
 1272         PROVIDERS.assignment_api.delete_grant(
 1273             self.role['id'], user_id=self.user['id'],
 1274             domain_id=self.domain['id']
 1275         )
 1276         # Ensure validating a token for a disabled user fails
 1277         self._validate_token(
 1278             domain_scoped_token,
 1279             expected_status=http.client.NOT_FOUND
 1280         )
 1281 
 1282     def test_domain_scoped_token_invalid_after_disabling_domain(self):
 1283         # Grant user access to domain
 1284         PROVIDERS.assignment_api.create_grant(
 1285             self.role['id'], user_id=self.user['id'],
 1286             domain_id=self.domain['id']
 1287         )
 1288         domain_scoped_token = self._get_domain_scoped_token()
 1289         # Make sure the token is valid
 1290         r = self._validate_token(domain_scoped_token)
 1291         self.assertValidDomainScopedTokenResponse(r)
 1292         # Disable domain
 1293         self.domain['enabled'] = False
 1294         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
 1295         # Ensure validating a token for a disabled domain fails
 1296         self._validate_token(
 1297             domain_scoped_token,
 1298             expected_status=http.client.NOT_FOUND
 1299         )
 1300 
 1301     def test_create_project_scoped_token_with_project_id_and_user_id(self):
 1302         auth_data = self.build_authentication_request(
 1303             user_id=self.user['id'],
 1304             password=self.user['password'],
 1305             project_id=self.project['id'])
 1306         r = self.v3_create_token(auth_data)
 1307         self.assertValidProjectScopedTokenResponse(r)
 1308 
 1309     def test_validate_project_scoped_token(self):
 1310         project_scoped_token = self._get_project_scoped_token()
 1311         r = self._validate_token(project_scoped_token)
 1312         self.assertValidProjectScopedTokenResponse(r)
 1313 
 1314     def test_validate_expired_project_scoped_token_returns_not_found(self):
 1315         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1316         # use the context manager of freezegun without sqlite issues.
 1317         self.config_fixture.config(group='token',
 1318                                    expiration=10)
 1319         time = datetime.datetime.utcnow()
 1320         with freezegun.freeze_time(time) as frozen_datetime:
 1321             project_scoped_token = self._get_project_scoped_token()
 1322             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1323             self._validate_token(
 1324                 project_scoped_token,
 1325                 expected_status=http.client.NOT_FOUND
 1326             )
 1327 
 1328     def test_revoke_project_scoped_token(self):
 1329         project_scoped_token = self._get_project_scoped_token()
 1330         r = self._validate_token(project_scoped_token)
 1331         self.assertValidProjectScopedTokenResponse(r)
 1332         self._revoke_token(project_scoped_token)
 1333         self._validate_token(project_scoped_token,
 1334                              expected_status=http.client.NOT_FOUND)
 1335 
 1336     def test_project_scoped_token_is_scoped_to_default_project(self):
 1337         project = self._create_project_and_set_as_default_project()
 1338 
 1339         # attempt to authenticate without requesting a project
 1340         auth_data = self.build_authentication_request(
 1341             user_id=self.user['id'],
 1342             password=self.user['password'])
 1343         r = self.v3_create_token(auth_data)
 1344 
 1345         # ensure the project id in the token matches the default project id
 1346         self.assertValidProjectScopedTokenResponse(r)
 1347         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1348 
 1349     def test_project_scoped_token_no_catalog_is_scoped_to_default_project(
 1350             self):
 1351         project = self._create_project_and_set_as_default_project()
 1352 
 1353         # attempt to authenticate without requesting a project or catalog
 1354         auth_data = self.build_authentication_request(
 1355             user_id=self.user['id'],
 1356             password=self.user['password'])
 1357         r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
 1358 
 1359         # ensure the project id in the token matches the default project id
 1360         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1361         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1362 
 1363     def test_implicit_project_id_scoped_token_with_user_id_no_catalog(self):
 1364         self._create_project_and_set_as_default_project()
 1365 
 1366         # create a project scoped token that isn't scoped to the default
 1367         # project
 1368         auth_data = self.build_authentication_request(
 1369             user_id=self.user['id'],
 1370             password=self.user['password'],
 1371             project_id=self.project['id'])
 1372         r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
 1373 
 1374         # ensure the project id in the token matches the one we as for
 1375         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1376         self.assertEqual(self.project['id'],
 1377                          r.result['token']['project']['id'])
 1378 
 1379     def test_project_scoped_token_catalog_attributes(self):
 1380         auth_data = self.build_authentication_request(
 1381             user_id=self.user['id'],
 1382             password=self.user['password'],
 1383             project_id=self.project['id'])
 1384         r = self.v3_create_token(auth_data)
 1385 
 1386         catalog = r.result['token']['catalog']
 1387         self.assertEqual(1, len(catalog))
 1388         catalog = catalog[0]
 1389 
 1390         self.assertEqual(self.service['id'], catalog['id'])
 1391         self.assertEqual(self.service['name'], catalog['name'])
 1392         self.assertEqual(self.service['type'], catalog['type'])
 1393 
 1394         endpoint = catalog['endpoints']
 1395         self.assertEqual(1, len(endpoint))
 1396         endpoint = endpoint[0]
 1397 
 1398         self.assertEqual(self.endpoint['id'], endpoint['id'])
 1399         self.assertEqual(self.endpoint['interface'], endpoint['interface'])
 1400         self.assertEqual(self.endpoint['region_id'], endpoint['region_id'])
 1401         self.assertEqual(self.endpoint['url'], endpoint['url'])
 1402 
 1403     def test_project_scoped_token_catalog_excludes_disabled_endpoint(self):
 1404         # Create a disabled endpoint
 1405         disabled_endpoint_ref = copy.copy(self.endpoint)
 1406         disabled_endpoint_id = uuid.uuid4().hex
 1407         disabled_endpoint_ref.update({
 1408             'id': disabled_endpoint_id,
 1409             'enabled': False,
 1410             'interface': 'internal'
 1411         })
 1412         PROVIDERS.catalog_api.create_endpoint(
 1413             disabled_endpoint_id, disabled_endpoint_ref
 1414         )
 1415 
 1416         auth_data = self.build_authentication_request(
 1417             user_id=self.user['id'],
 1418             password=self.user['password'],
 1419             project_id=self.project['id'])
 1420         resp = self.v3_create_token(auth_data)
 1421 
 1422         # make sure the disabled endpoint id isn't in the list of endpoints
 1423         endpoints = resp.result['token']['catalog'][0]['endpoints']
 1424         endpoint_ids = [endpoint['id'] for endpoint in endpoints]
 1425         self.assertNotIn(disabled_endpoint_id, endpoint_ids)
 1426 
 1427     def test_project_scoped_token_catalog_excludes_disabled_service(self):
 1428         """On authenticate, get a catalog that excludes disabled services."""
 1429         # although the endpoint associated with the service is enabled, the
 1430         # service is disabled
 1431         self.assertTrue(self.endpoint['enabled'])
 1432         PROVIDERS.catalog_api.update_service(
 1433             self.endpoint['service_id'], {'enabled': False})
 1434         service = PROVIDERS.catalog_api.get_service(
 1435             self.endpoint['service_id']
 1436         )
 1437         self.assertFalse(service['enabled'])
 1438 
 1439         auth_data = self.build_authentication_request(
 1440             user_id=self.user['id'],
 1441             password=self.user['password'],
 1442             project_id=self.project['id'])
 1443         r = self.v3_create_token(auth_data)
 1444 
 1445         self.assertEqual([], r.result['token']['catalog'])
 1446 
 1447     def test_scope_to_project_without_grant_returns_unauthorized(self):
 1448         project = unit.new_project_ref(domain_id=self.domain_id)
 1449         PROVIDERS.resource_api.create_project(project['id'], project)
 1450 
 1451         auth_data = self.build_authentication_request(
 1452             user_id=self.user['id'],
 1453             password=self.user['password'],
 1454             project_id=project['id'])
 1455         self.v3_create_token(auth_data,
 1456                              expected_status=http.client.UNAUTHORIZED)
 1457 
 1458     def test_create_project_scoped_token_with_username_and_domain_id(self):
 1459         auth_data = self.build_authentication_request(
 1460             username=self.user['name'],
 1461             user_domain_id=self.domain['id'],
 1462             password=self.user['password'],
 1463             project_id=self.project['id'])
 1464         r = self.v3_create_token(auth_data)
 1465         self.assertValidProjectScopedTokenResponse(r)
 1466 
 1467     def test_create_project_scoped_token_with_username_and_domain_name(self):
 1468         auth_data = self.build_authentication_request(
 1469             username=self.user['name'],
 1470             user_domain_name=self.domain['name'],
 1471             password=self.user['password'],
 1472             project_id=self.project['id'])
 1473         r = self.v3_create_token(auth_data)
 1474         self.assertValidProjectScopedTokenResponse(r)
 1475 
 1476     def test_create_project_scoped_token_fails_if_project_name_unsafe(self):
 1477         """Verify authenticate to a project with unsafe name fails."""
 1478         # Start with url name restrictions off, so we can create the unsafe
 1479         # named project
 1480         self.config_fixture.config(group='resource',
 1481                                    project_name_url_safe='off')
 1482         unsafe_name = 'i am not / safe'
 1483         project = unit.new_project_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID,
 1484                                        name=unsafe_name)
 1485         PROVIDERS.resource_api.create_project(project['id'], project)
 1486         role_member = unit.new_role_ref()
 1487         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1488         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1489             self.user['id'], project['id'], role_member['id'])
 1490 
 1491         auth_data = self.build_authentication_request(
 1492             user_id=self.user['id'],
 1493             password=self.user['password'],
 1494             project_name=project['name'],
 1495             project_domain_id=test_v3.DEFAULT_DOMAIN_ID)
 1496 
 1497         # Since name url restriction is off, we should be able to authenticate
 1498         self.v3_create_token(auth_data)
 1499 
 1500         # Set the name url restriction to new, which should still allow us to
 1501         # authenticate
 1502         self.config_fixture.config(group='resource',
 1503                                    project_name_url_safe='new')
 1504         self.v3_create_token(auth_data)
 1505 
 1506         # Set the name url restriction to strict and we should fail to
 1507         # authenticate
 1508         self.config_fixture.config(group='resource',
 1509                                    project_name_url_safe='strict')
 1510         self.v3_create_token(auth_data,
 1511                              expected_status=http.client.UNAUTHORIZED)
 1512 
 1513     def test_create_project_scoped_token_fails_if_domain_name_unsafe(self):
 1514         """Verify authenticate to a project using unsafe domain name fails."""
 1515         # Start with url name restrictions off, so we can create the unsafe
 1516         # named domain
 1517         self.config_fixture.config(group='resource',
 1518                                    domain_name_url_safe='off')
 1519         unsafe_name = 'i am not / safe'
 1520         domain = unit.new_domain_ref(name=unsafe_name)
 1521         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1522         # Add a (safely named) project to that domain
 1523         project = unit.new_project_ref(domain_id=domain['id'])
 1524         PROVIDERS.resource_api.create_project(project['id'], project)
 1525         role_member = unit.new_role_ref()
 1526         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1527         PROVIDERS.assignment_api.create_grant(
 1528             role_member['id'],
 1529             user_id=self.user['id'],
 1530             project_id=project['id'])
 1531 
 1532         # An auth request via project ID, but specifying domain by name
 1533         auth_data = self.build_authentication_request(
 1534             user_id=self.user['id'],
 1535             password=self.user['password'],
 1536             project_name=project['name'],
 1537             project_domain_name=domain['name'])
 1538 
 1539         # Since name url restriction is off, we should be able to authenticate
 1540         self.v3_create_token(auth_data)
 1541 
 1542         # Set the name url restriction to new, which should still allow us to
 1543         # authenticate
 1544         self.config_fixture.config(group='resource',
 1545                                    project_name_url_safe='new')
 1546         self.v3_create_token(auth_data)
 1547 
 1548         # Set the name url restriction to strict and we should fail to
 1549         # authenticate
 1550         self.config_fixture.config(group='resource',
 1551                                    domain_name_url_safe='strict')
 1552         self.v3_create_token(auth_data,
 1553                              expected_status=http.client.UNAUTHORIZED)
 1554 
 1555     def test_create_project_token_with_same_domain_and_project_name(self):
 1556         """Authenticate to a project with the same name as its domain."""
 1557         domain = unit.new_project_ref(is_domain=True)
 1558         domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
 1559         project = unit.new_project_ref(domain_id=domain['id'],
 1560                                        name=domain['name'])
 1561         PROVIDERS.resource_api.create_project(project['id'], project)
 1562         role_member = unit.new_role_ref()
 1563         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1564         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1565             self.user['id'], project['id'], role_member['id'])
 1566 
 1567         auth_data = self.build_authentication_request(
 1568             user_id=self.user['id'],
 1569             password=self.user['password'],
 1570             project_name=project['name'],
 1571             project_domain_name=domain['name'])
 1572 
 1573         r = self.v3_create_token(auth_data)
 1574         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1575 
 1576     def test_create_project_token_fails_with_project_acting_as_domain(self):
 1577         domain = unit.new_project_ref(is_domain=True)
 1578         domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
 1579         role_member = unit.new_role_ref()
 1580         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1581         PROVIDERS.assignment_api.create_grant(
 1582             role_member['id'],
 1583             user_id=self.user['id'],
 1584             domain_id=domain['id'])
 1585 
 1586         # authentication will fail because the project name is incorrect
 1587         auth_data = self.build_authentication_request(
 1588             user_id=self.user['id'],
 1589             password=self.user['password'],
 1590             project_name=domain['name'],
 1591             project_domain_name=domain['name'])
 1592         self.v3_create_token(auth_data,
 1593                              expected_status=http.client.UNAUTHORIZED)
 1594 
 1595     def test_create_project_token_with_disabled_project_domain_fails(self):
 1596         # create a disabled domain
 1597         domain = unit.new_domain_ref()
 1598         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1599 
 1600         # create a project in the domain
 1601         project = unit.new_project_ref(domain_id=domain['id'])
 1602         PROVIDERS.resource_api.create_project(project['id'], project)
 1603 
 1604         # assign some role to self.user for the project in the domain
 1605         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1606             self.user['id'],
 1607             project['id'],
 1608             self.role_id)
 1609 
 1610         # Disable the domain
 1611         domain['enabled'] = False
 1612         PROVIDERS.resource_api.update_domain(domain['id'], domain)
 1613 
 1614         # user should not be able to auth with project_id
 1615         auth_data = self.build_authentication_request(
 1616             user_id=self.user['id'],
 1617             password=self.user['password'],
 1618             project_id=project['id'])
 1619         self.v3_create_token(auth_data,
 1620                              expected_status=http.client.UNAUTHORIZED)
 1621 
 1622         # user should not be able to auth with project_name & domain
 1623         auth_data = self.build_authentication_request(
 1624             user_id=self.user['id'],
 1625             password=self.user['password'],
 1626             project_name=project['name'],
 1627             project_domain_id=domain['id'])
 1628         self.v3_create_token(auth_data,
 1629                              expected_status=http.client.UNAUTHORIZED)
 1630 
 1631     def test_create_project_token_with_default_domain_as_project(self):
 1632         # Authenticate to a project with the default domain as project
 1633         auth_data = self.build_authentication_request(
 1634             user_id=self.user['id'],
 1635             password=self.user['password'],
 1636             project_id=test_v3.DEFAULT_DOMAIN_ID)
 1637         self.v3_create_token(auth_data,
 1638                              expected_status=http.client.UNAUTHORIZED)
 1639 
 1640     def test_project_scoped_token_is_invalid_after_disabling_user(self):
 1641         project_scoped_token = self._get_project_scoped_token()
 1642         # Make sure the token is valid
 1643         r = self._validate_token(project_scoped_token)
 1644         self.assertValidProjectScopedTokenResponse(r)
 1645         # Disable the user
 1646         self._set_user_enabled(self.user, enabled=False)
 1647         # Ensure validating a token for a disabled user fails
 1648         self._validate_token(
 1649             project_scoped_token,
 1650             expected_status=http.client.NOT_FOUND
 1651         )
 1652 
 1653     def test_project_scoped_token_invalid_after_changing_user_password(self):
 1654         project_scoped_token = self._get_project_scoped_token()
 1655         # Make sure the token is valid
 1656         r = self._validate_token(project_scoped_token)
 1657         self.assertValidProjectScopedTokenResponse(r)
 1658         # Update user's password
 1659         self.user['password'] = 'Password1'
 1660         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
 1661         # Ensure updating user's password revokes existing tokens
 1662         self._validate_token(
 1663             project_scoped_token,
 1664             expected_status=http.client.NOT_FOUND
 1665         )
 1666 
 1667     def test_project_scoped_token_invalid_after_disabling_project(self):
 1668         project_scoped_token = self._get_project_scoped_token()
 1669         # Make sure the token is valid
 1670         r = self._validate_token(project_scoped_token)
 1671         self.assertValidProjectScopedTokenResponse(r)
 1672         # Disable project
 1673         self.project['enabled'] = False
 1674         PROVIDERS.resource_api.update_project(self.project['id'], self.project)
 1675         # Ensure validating a token for a disabled project fails
 1676         self._validate_token(
 1677             project_scoped_token,
 1678             expected_status=http.client.NOT_FOUND
 1679         )
 1680 
 1681     def test_project_scoped_token_is_invalid_after_deleting_grant(self):
 1682         # disable caching so that user grant deletion is not hidden
 1683         # by token caching
 1684         self.config_fixture.config(
 1685             group='cache',
 1686             enabled=False)
 1687         # Grant user access to project
 1688         PROVIDERS.assignment_api.create_grant(
 1689             self.role['id'], user_id=self.user['id'],
 1690             project_id=self.project['id']
 1691         )
 1692         project_scoped_token = self._get_project_scoped_token()
 1693         # Make sure the token is valid
 1694         r = self._validate_token(project_scoped_token)
 1695         self.assertValidProjectScopedTokenResponse(r)
 1696         # Delete access to project
 1697         PROVIDERS.assignment_api.delete_grant(
 1698             self.role['id'], user_id=self.user['id'],
 1699             project_id=self.project['id']
 1700         )
 1701         # Ensure the token has been revoked
 1702         self._validate_token(
 1703             project_scoped_token,
 1704             expected_status=http.client.NOT_FOUND
 1705         )
 1706 
 1707     def test_no_access_to_default_project_result_in_unscoped_token(self):
 1708         # create a disabled project to work with
 1709         self.create_new_default_project_for_user(self.user['id'],
 1710                                                  self.domain_id)
 1711 
 1712         # attempt to authenticate without requesting a project
 1713         auth_data = self.build_authentication_request(
 1714             user_id=self.user['id'],
 1715             password=self.user['password'])
 1716         r = self.v3_create_token(auth_data)
 1717         self.assertValidUnscopedTokenResponse(r)
 1718 
 1719     def test_rescope_unscoped_token_with_trust(self):
 1720         trustee_user, trust = self._create_trust()
 1721         self._get_trust_scoped_token(trustee_user, trust)
 1722 
 1723     def test_validate_a_trust_scoped_token(self):
 1724         trustee_user, trust = self._create_trust()
 1725         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1726         # Validate a trust scoped token
 1727         r = self._validate_token(trust_scoped_token)
 1728         self.assertValidProjectScopedTokenResponse(r)
 1729 
 1730     def test_validate_expired_trust_scoped_token_returns_not_found(self):
 1731         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1732         # use the context manager of freezegun without sqlite issues.
 1733         self.config_fixture.config(group='token',
 1734                                    expiration=10)
 1735         time = datetime.datetime.utcnow()
 1736         with freezegun.freeze_time(time) as frozen_datetime:
 1737             trustee_user, trust = self._create_trust()
 1738             trust_scoped_token = self._get_trust_scoped_token(
 1739                 trustee_user, trust
 1740             )
 1741             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1742             self._validate_token(
 1743                 trust_scoped_token,
 1744                 expected_status=http.client.NOT_FOUND
 1745             )
 1746 
 1747     def test_validate_a_trust_scoped_token_impersonated(self):
 1748         trustee_user, trust = self._create_trust(impersonation=True)
 1749         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1750         # Validate a trust scoped token
 1751         r = self._validate_token(trust_scoped_token)
 1752         self.assertValidProjectScopedTokenResponse(r)
 1753 
 1754     def test_revoke_trust_scoped_token(self):
 1755         trustee_user, trust = self._create_trust()
 1756         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1757         # Validate a trust scoped token
 1758         r = self._validate_token(trust_scoped_token)
 1759         self.assertValidProjectScopedTokenResponse(r)
 1760         self._revoke_token(trust_scoped_token)
 1761         self._validate_token(trust_scoped_token,
 1762                              expected_status=http.client.NOT_FOUND)
 1763 
 1764     def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
 1765         trustee_user, trust = self._create_trust()
 1766         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1767         # Validate a trust scoped token
 1768         r = self._validate_token(trust_scoped_token)
 1769         self.assertValidProjectScopedTokenResponse(r)
 1770 
 1771         # Disable trustee
 1772         trustee_update_ref = dict(enabled=False)
 1773         PROVIDERS.identity_api.update_user(
 1774             trustee_user['id'], trustee_update_ref
 1775         )
 1776         # Ensure validating a token for a disabled user fails
 1777         self._validate_token(
 1778             trust_scoped_token,
 1779             expected_status=http.client.NOT_FOUND
 1780         )
 1781 
 1782     def test_trust_token_is_invalid_when_trustee_domain_disabled(self):
 1783         # create a new domain with new user in that domain
 1784         new_domain_ref = unit.new_domain_ref()
 1785         PROVIDERS.resource_api.create_domain(
 1786             new_domain_ref['id'], new_domain_ref
 1787         )
 1788 
 1789         trustee_ref = unit.create_user(PROVIDERS.identity_api,
 1790                                        domain_id=new_domain_ref['id'])
 1791 
 1792         new_project_ref = unit.new_project_ref(domain_id=self.domain_id)
 1793         PROVIDERS.resource_api.create_project(
 1794             new_project_ref['id'], new_project_ref
 1795         )
 1796 
 1797         # grant the trustor access to the new project
 1798         PROVIDERS.assignment_api.create_grant(
 1799             self.role['id'],
 1800             user_id=self.user_id,
 1801             project_id=new_project_ref['id'])
 1802 
 1803         trust_ref = unit.new_trust_ref(trustor_user_id=self.user_id,
 1804                                        trustee_user_id=trustee_ref['id'],
 1805                                        expires=dict(minutes=1),
 1806                                        project_id=new_project_ref['id'],
 1807                                        impersonation=True,
 1808                                        role_ids=[self.role['id']])
 1809 
 1810         resp = self.post('/OS-TRUST/trusts', body={'trust': trust_ref})
 1811         self.assertValidTrustResponse(resp, trust_ref)
 1812         trust_id = resp.json_body['trust']['id']
 1813 
 1814         # get a project-scoped token using the trust
 1815         trust_auth_data = self.build_authentication_request(
 1816             user_id=trustee_ref['id'],
 1817             password=trustee_ref['password'],
 1818             trust_id=trust_id)
 1819         trust_scoped_token = self.get_requested_token(trust_auth_data)
 1820 
 1821         # ensure the project-scoped token from the trust is valid
 1822         self._validate_token(trust_scoped_token)
 1823 
 1824         disable_body = {'domain': {'enabled': False}}
 1825         self.patch(
 1826             '/domains/%(domain_id)s' % {'domain_id': new_domain_ref['id']},
 1827             body=disable_body)
 1828 
 1829         # ensure the project-scoped token from the trust is invalid
 1830         self._validate_token(trust_scoped_token,
 1831                              expected_status=http.client.NOT_FOUND)
 1832 
 1833     def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
 1834         trustee_user, trust = self._create_trust()
 1835         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1836         # Validate a trust scoped token
 1837         r = self._validate_token(trust_scoped_token)
 1838         self.assertValidProjectScopedTokenResponse(r)
 1839         # Change trustee's password
 1840         trustee_update_ref = dict(password='Password1')
 1841         PROVIDERS.identity_api.update_user(
 1842             trustee_user['id'], trustee_update_ref
 1843         )
 1844         # Ensure updating trustee's password revokes existing tokens
 1845         self._validate_token(
 1846             trust_scoped_token,
 1847             expected_status=http.client.NOT_FOUND
 1848         )
 1849 
 1850     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 1851         trustee_user, trust = self._create_trust()
 1852         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1853         # Validate a trust scoped token
 1854         r = self._validate_token(trust_scoped_token)
 1855         self.assertValidProjectScopedTokenResponse(r)
 1856 
 1857         # Disable the trustor
 1858         trustor_update_ref = dict(enabled=False)
 1859         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1860         # Ensure validating a token for a disabled user fails
 1861         self._validate_token(
 1862             trust_scoped_token,
 1863             expected_status=http.client.NOT_FOUND
 1864         )
 1865 
 1866     def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
 1867         trustee_user, trust = self._create_trust()
 1868         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1869         # Validate a trust scoped token
 1870         r = self._validate_token(trust_scoped_token)
 1871         self.assertValidProjectScopedTokenResponse(r)
 1872 
 1873         # Change trustor's password
 1874         trustor_update_ref = dict(password='Password1')
 1875         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1876         # Ensure updating trustor's password revokes existing user's tokens
 1877         self._validate_token(
 1878             trust_scoped_token,
 1879             expected_status=http.client.NOT_FOUND
 1880         )
 1881 
 1882     def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
 1883         trustee_user, trust = self._create_trust()
 1884         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1885         # Validate a trust scoped token
 1886         r = self._validate_token(trust_scoped_token)
 1887         self.assertValidProjectScopedTokenResponse(r)
 1888 
 1889         # Disable trustor's domain
 1890         self.domain['enabled'] = False
 1891         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
 1892 
 1893         trustor_update_ref = dict(password='Password1')
 1894         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1895         # Ensure updating trustor's password revokes existing user's tokens
 1896         self._validate_token(
 1897             trust_scoped_token,
 1898             expected_status=http.client.NOT_FOUND
 1899         )
 1900 
 1901     def test_default_fixture_scope_token(self):
 1902         self.assertIsNotNone(self.get_scoped_token())
 1903 
 1904     def test_rescoping_token(self):
 1905         expires = self.v3_token_data['token']['expires_at']
 1906 
 1907         # rescope the token
 1908         r = self.v3_create_token(self.build_authentication_request(
 1909             token=self.v3_token,
 1910             project_id=self.project_id))
 1911         self.assertValidProjectScopedTokenResponse(r)
 1912 
 1913         # ensure token expiration stayed the same
 1914         self.assertTimestampEqual(expires, r.result['token']['expires_at'])
 1915 
 1916     def test_check_token(self):
 1917         self.head('/auth/tokens', headers=self.headers,
 1918                   expected_status=http.client.OK)
 1919 
 1920     def test_validate_token(self):
 1921         r = self.get('/auth/tokens', headers=self.headers)
 1922         self.assertValidUnscopedTokenResponse(r)
 1923 
 1924     def test_validate_missing_subject_token(self):
 1925         self.get('/auth/tokens',
 1926                  expected_status=http.client.NOT_FOUND)
 1927 
 1928     def test_validate_missing_auth_token(self):
 1929         self.admin_request(
 1930             method='GET',
 1931             path='/v3/projects',
 1932             token=None,
 1933             expected_status=http.client.UNAUTHORIZED)
 1934 
 1935     def test_validate_token_nocatalog(self):
 1936         v3_token = self.get_requested_token(self.build_authentication_request(
 1937             user_id=self.user['id'],
 1938             password=self.user['password'],
 1939             project_id=self.project['id']))
 1940         r = self.get(
 1941             '/auth/tokens?nocatalog',
 1942             headers={'X-Subject-Token': v3_token})
 1943         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1944 
 1945     def test_is_admin_token_by_ids(self):
 1946         self.config_fixture.config(
 1947             group='resource',
 1948             admin_project_domain_name=self.domain['name'],
 1949             admin_project_name=self.project['name'])
 1950         r = self.v3_create_token(self.build_authentication_request(
 1951             user_id=self.user['id'],
 1952             password=self.user['password'],
 1953             project_id=self.project['id']))
 1954         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1955         v3_token = r.headers.get('X-Subject-Token')
 1956         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1957         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1958 
 1959     def test_is_admin_token_by_names(self):
 1960         self.config_fixture.config(
 1961             group='resource',
 1962             admin_project_domain_name=self.domain['name'],
 1963             admin_project_name=self.project['name'])
 1964         r = self.v3_create_token(self.build_authentication_request(
 1965             user_id=self.user['id'],
 1966             password=self.user['password'],
 1967             project_domain_name=self.domain['name'],
 1968             project_name=self.project['name']))
 1969         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1970         v3_token = r.headers.get('X-Subject-Token')
 1971         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1972         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1973 
 1974     def test_token_for_non_admin_project_is_not_admin(self):
 1975         self.config_fixture.config(
 1976             group='resource',
 1977             admin_project_domain_name=self.domain['name'],
 1978             admin_project_name=uuid.uuid4().hex)
 1979         r = self.v3_create_token(self.build_authentication_request(
 1980             user_id=self.user['id'],
 1981             password=self.user['password'],
 1982             project_id=self.project['id']))
 1983         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 1984         v3_token = r.headers.get('X-Subject-Token')
 1985         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1986         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 1987 
 1988     def test_token_for_non_admin_domain_same_project_name_is_not_admin(self):
 1989         self.config_fixture.config(
 1990             group='resource',
 1991             admin_project_domain_name=uuid.uuid4().hex,
 1992             admin_project_name=self.project['name'])
 1993         r = self.v3_create_token(self.build_authentication_request(
 1994             user_id=self.user['id'],
 1995             password=self.user['password'],
 1996             project_id=self.project['id']))
 1997         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 1998         v3_token = r.headers.get('X-Subject-Token')
 1999         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2000         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 2001 
 2002     def test_only_admin_project_set_acts_as_non_admin(self):
 2003         self.config_fixture.config(
 2004             group='resource',
 2005             admin_project_name=self.project['name'])
 2006         r = self.v3_create_token(self.build_authentication_request(
 2007             user_id=self.user['id'],
 2008             password=self.user['password'],
 2009             project_id=self.project['id']))
 2010         self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
 2011         v3_token = r.headers.get('X-Subject-Token')
 2012         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2013         self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
 2014 
 2015     def _create_role(self, domain_id=None):
 2016         """Call ``POST /roles``."""
 2017         ref = unit.new_role_ref(domain_id=domain_id)
 2018         r = self.post('/roles', body={'role': ref})
 2019         return self.assertValidRoleResponse(r, ref)
 2020 
 2021     def _create_implied_role(self, prior_id):
 2022         implied = self._create_role()
 2023         url = '/roles/%s/implies/%s' % (prior_id, implied['id'])
 2024         self.put(url, expected_status=http.client.CREATED)
 2025         return implied
 2026 
 2027     def _delete_implied_role(self, prior_role_id, implied_role_id):
 2028         url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id)
 2029         self.delete(url)
 2030 
 2031     def _get_scoped_token_roles(self, is_domain=False):
 2032         if is_domain:
 2033             v3_token = self.get_domain_scoped_token()
 2034         else:
 2035             v3_token = self.get_scoped_token()
 2036 
 2037         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2038         v3_token_data = r.result
 2039         token_roles = v3_token_data['token']['roles']
 2040         return token_roles
 2041 
 2042     def _create_implied_role_shows_in_v3_token(self, is_domain):
 2043         token_roles = self._get_scoped_token_roles(is_domain)
 2044         self.assertEqual(1, len(token_roles))
 2045 
 2046         prior = token_roles[0]['id']
 2047         implied1 = self._create_implied_role(prior)
 2048 
 2049         token_roles = self._get_scoped_token_roles(is_domain)
 2050         self.assertEqual(2, len(token_roles))
 2051 
 2052         implied2 = self._create_implied_role(prior)
 2053         token_roles = self._get_scoped_token_roles(is_domain)
 2054         self.assertEqual(3, len(token_roles))
 2055 
 2056         token_role_ids = [role['id'] for role in token_roles]
 2057         self.assertIn(prior, token_role_ids)
 2058         self.assertIn(implied1['id'], token_role_ids)
 2059         self.assertIn(implied2['id'], token_role_ids)
 2060 
 2061     def test_create_implied_role_shows_in_v3_project_token(self):
 2062         # regardless of the default chosen, this should always
 2063         # test with the option set.
 2064         self.config_fixture.config(group='token')
 2065         self._create_implied_role_shows_in_v3_token(False)
 2066 
 2067     def test_create_implied_role_shows_in_v3_domain_token(self):
 2068         self.config_fixture.config(group='token')
 2069         PROVIDERS.assignment_api.create_grant(
 2070             self.role['id'], user_id=self.user['id'],
 2071             domain_id=self.domain['id']
 2072         )
 2073 
 2074         self._create_implied_role_shows_in_v3_token(True)
 2075 
 2076     def test_create_implied_role_shows_in_v3_system_token(self):
 2077         self.config_fixture.config(group='token')
 2078         PROVIDERS.assignment_api.create_system_grant_for_user(
 2079             self.user['id'], self.role['id']
 2080         )
 2081 
 2082         token_id = self.get_system_scoped_token()
 2083         r = self.get('/auth/tokens', headers={'X-Subject-Token': token_id})
 2084         token_roles = r.result['token']['roles']
 2085 
 2086         prior = token_roles[0]['id']
 2087         self._create_implied_role(prior)
 2088 
 2089         r = self.get('/auth/tokens', headers={'X-Subject-Token': token_id})
 2090         token_roles = r.result['token']['roles']
 2091         self.assertEqual(2, len(token_roles))
 2092 
 2093     def test_group_assigned_implied_role_shows_in_v3_token(self):
 2094         self.config_fixture.config(group='token')
 2095         is_domain = False
 2096         token_roles = self._get_scoped_token_roles(is_domain)
 2097         self.assertEqual(1, len(token_roles))
 2098 
 2099         new_role = self._create_role()
 2100         prior = new_role['id']
 2101 
 2102         new_group_ref = unit.new_group_ref(domain_id=self.domain['id'])
 2103         new_group = PROVIDERS.identity_api.create_group(new_group_ref)
 2104         PROVIDERS.assignment_api.create_grant(
 2105             prior, group_id=new_group['id'], project_id=self.project['id']
 2106         )
 2107 
 2108         token_roles = self._get_scoped_token_roles(is_domain)
 2109         self.assertEqual(1, len(token_roles))
 2110 
 2111         PROVIDERS.identity_api.add_user_to_group(
 2112             self.user['id'], new_group['id']
 2113         )
 2114 
 2115         token_roles = self._get_scoped_token_roles(is_domain)
 2116         self.assertEqual(2, len(token_roles))
 2117 
 2118         implied1 = self._create_implied_role(prior)
 2119 
 2120         token_roles = self._get_scoped_token_roles(is_domain)
 2121         self.assertEqual(3, len(token_roles))
 2122 
 2123         implied2 = self._create_implied_role(prior)
 2124         token_roles = self._get_scoped_token_roles(is_domain)
 2125         self.assertEqual(4, len(token_roles))
 2126 
 2127         token_role_ids = [role['id'] for role in token_roles]
 2128         self.assertIn(prior, token_role_ids)
 2129         self.assertIn(implied1['id'], token_role_ids)
 2130         self.assertIn(implied2['id'], token_role_ids)
 2131 
 2132     def test_multiple_implied_roles_show_in_v3_token(self):
 2133         self.config_fixture.config(group='token')
 2134         token_roles = self._get_scoped_token_roles()
 2135         self.assertEqual(1, len(token_roles))
 2136 
 2137         prior = token_roles[0]['id']
 2138         implied1 = self._create_implied_role(prior)
 2139         implied2 = self._create_implied_role(prior)
 2140         implied3 = self._create_implied_role(prior)
 2141 
 2142         token_roles = self._get_scoped_token_roles()
 2143         self.assertEqual(4, len(token_roles))
 2144 
 2145         token_role_ids = [role['id'] for role in token_roles]
 2146         self.assertIn(prior, token_role_ids)
 2147         self.assertIn(implied1['id'], token_role_ids)
 2148         self.assertIn(implied2['id'], token_role_ids)
 2149         self.assertIn(implied3['id'], token_role_ids)
 2150 
 2151     def test_chained_implied_role_shows_in_v3_token(self):
 2152         self.config_fixture.config(group='token')
 2153         token_roles = self._get_scoped_token_roles()
 2154         self.assertEqual(1, len(token_roles))
 2155 
 2156         prior = token_roles[0]['id']
 2157         implied1 = self._create_implied_role(prior)
 2158         implied2 = self._create_implied_role(implied1['id'])
 2159         implied3 = self._create_implied_role(implied2['id'])
 2160 
 2161         token_roles = self._get_scoped_token_roles()
 2162         self.assertEqual(4, len(token_roles))
 2163 
 2164         token_role_ids = [role['id'] for role in token_roles]
 2165 
 2166         self.assertIn(prior, token_role_ids)
 2167         self.assertIn(implied1['id'], token_role_ids)
 2168         self.assertIn(implied2['id'], token_role_ids)
 2169         self.assertIn(implied3['id'], token_role_ids)
 2170 
 2171     def test_implied_role_disabled_by_config(self):
 2172         self.config_fixture.config(group='token')
 2173         token_roles = self._get_scoped_token_roles()
 2174         self.assertEqual(1, len(token_roles))
 2175 
 2176         prior = token_roles[0]['id']
 2177         implied1 = self._create_implied_role(prior)
 2178         implied2 = self._create_implied_role(implied1['id'])
 2179         self._create_implied_role(implied2['id'])
 2180 
 2181         token_roles = self._get_scoped_token_roles()
 2182         self.assertEqual(4, len(token_roles))
 2183         token_role_ids = [role['id'] for role in token_roles]
 2184         self.assertIn(prior, token_role_ids)
 2185 
 2186     def test_delete_implied_role_do_not_show_in_v3_token(self):
 2187         self.config_fixture.config(group='token')
 2188         token_roles = self._get_scoped_token_roles()
 2189         prior = token_roles[0]['id']
 2190         implied = self._create_implied_role(prior)
 2191 
 2192         token_roles = self._get_scoped_token_roles()
 2193         self.assertEqual(2, len(token_roles))
 2194         self._delete_implied_role(prior, implied['id'])
 2195 
 2196         token_roles = self._get_scoped_token_roles()
 2197         self.assertEqual(1, len(token_roles))
 2198 
 2199     def test_unrelated_implied_roles_do_not_change_v3_token(self):
 2200         self.config_fixture.config(group='token')
 2201         token_roles = self._get_scoped_token_roles()
 2202         prior = token_roles[0]['id']
 2203         implied = self._create_implied_role(prior)
 2204 
 2205         token_roles = self._get_scoped_token_roles()
 2206         self.assertEqual(2, len(token_roles))
 2207 
 2208         unrelated = self._create_role()
 2209         url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id'])
 2210         self.put(url, expected_status=http.client.CREATED)
 2211 
 2212         token_roles = self._get_scoped_token_roles()
 2213         self.assertEqual(2, len(token_roles))
 2214 
 2215         self._delete_implied_role(unrelated['id'], implied['id'])
 2216         token_roles = self._get_scoped_token_roles()
 2217         self.assertEqual(2, len(token_roles))
 2218 
 2219     def test_domain_specific_roles_do_not_show_v3_token(self):
 2220         self.config_fixture.config(group='token')
 2221         initial_token_roles = self._get_scoped_token_roles()
 2222 
 2223         new_role = self._create_role(domain_id=self.domain_id)
 2224         PROVIDERS.assignment_api.create_grant(
 2225             new_role['id'], user_id=self.user['id'],
 2226             project_id=self.project['id']
 2227         )
 2228         implied = self._create_implied_role(new_role['id'])
 2229 
 2230         token_roles = self._get_scoped_token_roles()
 2231         self.assertEqual(len(initial_token_roles) + 1, len(token_roles))
 2232 
 2233         # The implied role from the domain specific role should be in the
 2234         # token, but not the domain specific role itself.
 2235         token_role_ids = [role['id'] for role in token_roles]
 2236         self.assertIn(implied['id'], token_role_ids)
 2237         self.assertNotIn(new_role['id'], token_role_ids)
 2238 
 2239     def test_remove_all_roles_from_scope_result_in_404(self):
 2240         # create a new user
 2241         new_user = unit.create_user(PROVIDERS.identity_api,
 2242                                     domain_id=self.domain['id'])
 2243 
 2244         # give the new user a role on a project
 2245         path = '/projects/%s/users/%s/roles/%s' % (
 2246             self.project['id'], new_user['id'], self.role['id'])
 2247         self.put(path=path)
 2248 
 2249         # authenticate as the new user and get a project-scoped token
 2250         auth_data = self.build_authentication_request(
 2251             user_id=new_user['id'],
 2252             password=new_user['password'],
 2253             project_id=self.project['id'])
 2254         subject_token_id = self.v3_create_token(auth_data).headers.get(
 2255             'X-Subject-Token')
 2256 
 2257         # make sure the project-scoped token is valid
 2258         headers = {'X-Subject-Token': subject_token_id}
 2259         r = self.get('/auth/tokens', headers=headers)
 2260         self.assertValidProjectScopedTokenResponse(r)
 2261 
 2262         # remove the roles from the user for the given scope
 2263         path = '/projects/%s/users/%s/roles/%s' % (
 2264             self.project['id'], new_user['id'], self.role['id'])
 2265         self.delete(path=path)
 2266 
 2267         # token validation should now result in 404
 2268         self.get('/auth/tokens', headers=headers,
 2269                  expected_status=http.client.NOT_FOUND)
 2270 
 2271     def test_create_token_with_nonexistant_user_id_fails(self):
 2272         auth_data = self.build_authentication_request(
 2273             user_id=uuid.uuid4().hex,
 2274             password=self.user['password'])
 2275         self.v3_create_token(auth_data,
 2276                              expected_status=http.client.UNAUTHORIZED)
 2277 
 2278     def test_create_token_with_nonexistant_username_fails(self):
 2279         auth_data = self.build_authentication_request(
 2280             username=uuid.uuid4().hex,
 2281             user_domain_id=self.domain['id'],
 2282             password=self.user['password'])
 2283         self.v3_create_token(auth_data,
 2284                              expected_status=http.client.UNAUTHORIZED)
 2285 
 2286     def test_create_token_with_nonexistant_domain_id_fails(self):
 2287         auth_data = self.build_authentication_request(
 2288             username=self.user['name'],
 2289             user_domain_id=uuid.uuid4().hex,
 2290             password=self.user['password'])
 2291         self.v3_create_token(auth_data,
 2292                              expected_status=http.client.UNAUTHORIZED)
 2293 
 2294     def test_create_token_with_nonexistant_domain_name_fails(self):
 2295         auth_data = self.build_authentication_request(
 2296             username=self.user['name'],
 2297             user_domain_name=uuid.uuid4().hex,
 2298             password=self.user['password'])
 2299         self.v3_create_token(auth_data,
 2300                              expected_status=http.client.UNAUTHORIZED)
 2301 
 2302     def test_create_token_with_wrong_password_fails(self):
 2303         auth_data = self.build_authentication_request(
 2304             user_id=self.user['id'],
 2305             password=uuid.uuid4().hex)
 2306         self.v3_create_token(auth_data,
 2307                              expected_status=http.client.UNAUTHORIZED)
 2308 
 2309     def test_user_and_group_roles_scoped_token(self):
 2310         """Test correct roles are returned in scoped token.
 2311 
 2312         Test Plan:
 2313 
 2314         - Create a domain, with 1 project, 2 users (user1 and user2)
 2315           and 2 groups (group1 and group2)
 2316         - Make user1 a member of group1, user2 a member of group2
 2317         - Create 8 roles, assigning them to each of the 8 combinations
 2318           of users/groups on domain/project
 2319         - Get a project scoped token for user1, checking that the right
 2320           two roles are returned (one directly assigned, one by virtue
 2321           of group membership)
 2322         - Repeat this for a domain scoped token
 2323         - Make user1 also a member of group2
 2324         - Get another scoped token making sure the additional role
 2325           shows up
 2326         - User2 is just here as a spoiler, to make sure we don't get
 2327           any roles uniquely assigned to it returned in any of our
 2328           tokens
 2329 
 2330         """
 2331         domainA = unit.new_domain_ref()
 2332         PROVIDERS.resource_api.create_domain(domainA['id'], domainA)
 2333         projectA = unit.new_project_ref(domain_id=domainA['id'])
 2334         PROVIDERS.resource_api.create_project(projectA['id'], projectA)
 2335 
 2336         user1 = unit.create_user(
 2337             PROVIDERS.identity_api, domain_id=domainA['id']
 2338         )
 2339 
 2340         user2 = unit.create_user(
 2341             PROVIDERS.identity_api, domain_id=domainA['id']
 2342         )
 2343 
 2344         group1 = unit.new_group_ref(domain_id=domainA['id'])
 2345         group1 = PROVIDERS.identity_api.create_group(group1)
 2346 
 2347         group2 = unit.new_group_ref(domain_id=domainA['id'])
 2348         group2 = PROVIDERS.identity_api.create_group(group2)
 2349 
 2350         PROVIDERS.identity_api.add_user_to_group(
 2351             user1['id'], group1['id']
 2352         )
 2353         PROVIDERS.identity_api.add_user_to_group(
 2354             user2['id'], group2['id']
 2355         )
 2356 
 2357         # Now create all the roles and assign them
 2358         role_list = []
 2359         for _ in range(8):
 2360             role = unit.new_role_ref()
 2361             PROVIDERS.role_api.create_role(role['id'], role)
 2362             role_list.append(role)
 2363 
 2364         PROVIDERS.assignment_api.create_grant(
 2365             role_list[0]['id'], user_id=user1['id'], domain_id=domainA['id']
 2366         )
 2367         PROVIDERS.assignment_api.create_grant(
 2368             role_list[1]['id'], user_id=user1['id'], project_id=projectA['id']
 2369         )
 2370         PROVIDERS.assignment_api.create_grant(
 2371             role_list[2]['id'], user_id=user2['id'], domain_id=domainA['id']
 2372         )
 2373         PROVIDERS.assignment_api.create_grant(
 2374             role_list[3]['id'], user_id=user2['id'], project_id=projectA['id']
 2375         )
 2376         PROVIDERS.assignment_api.create_grant(
 2377             role_list[4]['id'], group_id=group1['id'], domain_id=domainA['id']
 2378         )
 2379         PROVIDERS.assignment_api.create_grant(
 2380             role_list[5]['id'], group_id=group1['id'],
 2381             project_id=projectA['id']
 2382         )
 2383         PROVIDERS.assignment_api.create_grant(
 2384             role_list[6]['id'], group_id=group2['id'], domain_id=domainA['id']
 2385         )
 2386         PROVIDERS.assignment_api.create_grant(
 2387             role_list[7]['id'], group_id=group2['id'],
 2388             project_id=projectA['id']
 2389         )
 2390 
 2391         # First, get a project scoped token - which should
 2392         # contain the direct user role and the one by virtue
 2393         # of group membership
 2394         auth_data = self.build_authentication_request(
 2395             user_id=user1['id'],
 2396             password=user1['password'],
 2397             project_id=projectA['id'])
 2398         r = self.v3_create_token(auth_data)
 2399         token = self.assertValidScopedTokenResponse(r)
 2400         roles_ids = []
 2401         for ref in token['roles']:
 2402             roles_ids.append(ref['id'])
 2403         self.assertEqual(2, len(token['roles']))
 2404         self.assertIn(role_list[1]['id'], roles_ids)
 2405         self.assertIn(role_list[5]['id'], roles_ids)
 2406 
 2407         # Now the same thing for a domain scoped token
 2408         auth_data = self.build_authentication_request(
 2409             user_id=user1['id'],
 2410             password=user1['password'],
 2411             domain_id=domainA['id'])
 2412         r = self.v3_create_token(auth_data)
 2413         token = self.assertValidScopedTokenResponse(r)
 2414         roles_ids = []
 2415         for ref in token['roles']:
 2416             roles_ids.append(ref['id'])
 2417         self.assertEqual(2, len(token['roles']))
 2418         self.assertIn(role_list[0]['id'], roles_ids)
 2419         self.assertIn(role_list[4]['id'], roles_ids)
 2420 
 2421         # Finally, add user1 to the 2nd group, and get a new
 2422         # scoped token - the extra role should now be included
 2423         # by virtue of the 2nd group
 2424         PROVIDERS.identity_api.add_user_to_group(
 2425             user1['id'], group2['id']
 2426         )
 2427         auth_data = self.build_authentication_request(
 2428             user_id=user1['id'],
 2429             password=user1['password'],
 2430             project_id=projectA['id'])
 2431         r = self.v3_create_token(auth_data)
 2432         token = self.assertValidScopedTokenResponse(r)
 2433         roles_ids = []
 2434         for ref in token['roles']:
 2435             roles_ids.append(ref['id'])
 2436         self.assertEqual(3, len(token['roles']))
 2437         self.assertIn(role_list[1]['id'], roles_ids)
 2438         self.assertIn(role_list[5]['id'], roles_ids)
 2439         self.assertIn(role_list[7]['id'], roles_ids)
 2440 
 2441     def test_auth_token_cross_domain_group_and_project(self):
 2442         """Verify getting a token in cross domain group/project roles."""
 2443         # create domain, project and group and grant roles to user
 2444         domain1 = unit.new_domain_ref()
 2445         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
 2446         project1 = unit.new_project_ref(domain_id=domain1['id'])
 2447         PROVIDERS.resource_api.create_project(project1['id'], project1)
 2448         user_foo = unit.create_user(PROVIDERS.identity_api,
 2449                                     domain_id=test_v3.DEFAULT_DOMAIN_ID)
 2450         role_member = unit.new_role_ref()
 2451         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 2452         role_admin = unit.new_role_ref()
 2453         PROVIDERS.role_api.create_role(role_admin['id'], role_admin)
 2454         role_foo_domain1 = unit.new_role_ref()
 2455         PROVIDERS.role_api.create_role(
 2456             role_foo_domain1['id'], role_foo_domain1
 2457         )
 2458         role_group_domain1 = unit.new_role_ref()
 2459         PROVIDERS.role_api.create_role(
 2460             role_group_domain1['id'], role_group_domain1
 2461         )
 2462         new_group = unit.new_group_ref(domain_id=domain1['id'])
 2463         new_group = PROVIDERS.identity_api.create_group(new_group)
 2464         PROVIDERS.identity_api.add_user_to_group(
 2465             user_foo['id'], new_group['id']
 2466         )
 2467         PROVIDERS.assignment_api.create_grant(
 2468             user_id=user_foo['id'],
 2469             project_id=project1['id'],
 2470             role_id=role_member['id'])
 2471         PROVIDERS.assignment_api.create_grant(
 2472             group_id=new_group['id'],
 2473             project_id=project1['id'],
 2474             role_id=role_admin['id'])
 2475         PROVIDERS.assignment_api.create_grant(
 2476             user_id=user_foo['id'],
 2477             domain_id=domain1['id'],
 2478             role_id=role_foo_domain1['id'])
 2479         PROVIDERS.assignment_api.create_grant(
 2480             group_id=new_group['id'],
 2481             domain_id=domain1['id'],
 2482             role_id=role_group_domain1['id'])
 2483 
 2484         # Get a scoped token for the project
 2485         auth_data = self.build_authentication_request(
 2486             username=user_foo['name'],
 2487             user_domain_id=test_v3.DEFAULT_DOMAIN_ID,
 2488             password=user_foo['password'],
 2489             project_name=project1['name'],
 2490             project_domain_id=domain1['id'])
 2491 
 2492         r = self.v3_create_token(auth_data)
 2493         scoped_token = self.assertValidScopedTokenResponse(r)
 2494         project = scoped_token["project"]
 2495         roles_ids = []
 2496         for ref in scoped_token['roles']:
 2497             roles_ids.append(ref['id'])
 2498         self.assertEqual(project1['id'], project["id"])
 2499         self.assertIn(role_member['id'], roles_ids)
 2500         self.assertIn(role_admin['id'], roles_ids)
 2501         self.assertNotIn(role_foo_domain1['id'], roles_ids)
 2502         self.assertNotIn(role_group_domain1['id'], roles_ids)
 2503 
 2504     def test_remote_user_no_realm(self):
 2505         app = self.loadapp()
 2506 
 2507         auth_contexts = []
 2508 
 2509         # NOTE(morgan): This __init__ is used to inject the auth context into
 2510         # the auth_contexts list so that we can perform introspection. This way
 2511         # we do not need to try and mock out anything deep within keystone's
 2512         # auth pipeline. Note that we are using MockPatch to ensure we undo
 2513         # the mock after the fact.
 2514         def new_init(self, *args, **kwargs):
 2515             super(auth.core.AuthContext, self).__init__(*args, **kwargs)
 2516             auth_contexts.append(self)
 2517 
 2518         self.useFixture(fixtures.MockPatch(
 2519             'keystone.auth.core.AuthContext.__init__', new_init))
 2520         with app.test_client() as c:
 2521             c.environ_base.update(self.build_external_auth_environ(
 2522                 self.default_domain_user['name']))
 2523             auth_req = self.build_authentication_request()
 2524             c.post('/v3/auth/tokens', json=auth_req)
 2525             self.assertEqual(self.default_domain_user['id'],
 2526                              auth_contexts[-1]['user_id'])
 2527 
 2528         # Now test to make sure the user name can, itself, contain the
 2529         # '@' character.
 2530         user = {'name': 'myname@mydivision'}
 2531         PROVIDERS.identity_api.update_user(
 2532             self.default_domain_user['id'], user
 2533         )
 2534         with app.test_client() as c:
 2535             c.environ_base.update(self.build_external_auth_environ(
 2536                 user['name']))
 2537             auth_req = self.build_authentication_request()
 2538             c.post('/v3/auth/tokens', json=auth_req)
 2539             self.assertEqual(self.default_domain_user['id'],
 2540                              auth_contexts[-1]['user_id'])
 2541         self.assertEqual(self.default_domain_user['id'],
 2542                          auth_contexts[-1]['user_id'])
 2543 
 2544     def test_remote_user_no_domain(self):
 2545         app = self.loadapp()
 2546         with app.test_client() as c:
 2547             c.environ_base.update(self.build_external_auth_environ(
 2548                 self.user['name']))
 2549             auth_request = self.build_authentication_request()
 2550             c.post('/v3/auth/tokens', json=auth_request,
 2551                    expected_status_code=http.client.UNAUTHORIZED)
 2552 
 2553     def test_remote_user_and_password(self):
 2554         # both REMOTE_USER and password methods must pass.
 2555         # note that they do not have to match
 2556         app = self.loadapp()
 2557         with app.test_client() as c:
 2558             auth_data = self.build_authentication_request(
 2559                 user_domain_id=self.default_domain_user['domain_id'],
 2560                 username=self.default_domain_user['name'],
 2561                 password=self.default_domain_user['password'])
 2562             c.post('/v3/auth/tokens', json=auth_data)
 2563 
 2564     def test_remote_user_and_explicit_external(self):
 2565         # both REMOTE_USER and password methods must pass.
 2566         # note that they do not have to match
 2567         auth_data = self.build_authentication_request(
 2568             user_domain_id=self.domain['id'],
 2569             username=self.user['name'],
 2570             password=self.user['password'])
 2571         auth_data['auth']['identity']['methods'] = ["password", "external"]
 2572         auth_data['auth']['identity']['external'] = {}
 2573         app = self.loadapp()
 2574         with app.test_client() as c:
 2575             c.post('/v3/auth/tokens', json=auth_data,
 2576                    expected_status_code=http.client.UNAUTHORIZED)
 2577 
 2578     def test_remote_user_bad_password(self):
 2579         # both REMOTE_USER and password methods must pass.
 2580         app = self.loadapp()
 2581         auth_data = self.build_authentication_request(
 2582             user_domain_id=self.domain['id'],
 2583             username=self.user['name'],
 2584             password='badpassword')
 2585         with app.test_client() as c:
 2586             c.post('/v3/auth/tokens', json=auth_data,
 2587                    expected_status_code=http.client.UNAUTHORIZED)
 2588 
 2589     def test_fetch_expired_allow_expired(self):
 2590         self.config_fixture.config(group='token',
 2591                                    expiration=10,
 2592                                    allow_expired_window=20)
 2593         time = datetime.datetime.utcnow()
 2594         with freezegun.freeze_time(time) as frozen_datetime:
 2595             token = self._get_project_scoped_token()
 2596 
 2597             # initially it validates because it's within time
 2598             frozen_datetime.tick(delta=datetime.timedelta(seconds=2))
 2599             self._validate_token(token)
 2600 
 2601             # after passing expiry time validation fails
 2602             frozen_datetime.tick(delta=datetime.timedelta(seconds=12))
 2603             self._validate_token(token, expected_status=http.client.NOT_FOUND)
 2604 
 2605             # but if we pass allow_expired it validates
 2606             self._validate_token(token, allow_expired=True)
 2607 
 2608             # and then if we're passed the allow_expired_window it will fail
 2609             # anyway raises expired when now > expiration + window
 2610             frozen_datetime.tick(delta=datetime.timedelta(seconds=22))
 2611             self._validate_token(token,
 2612                                  allow_expired=True,
 2613                                  expected_status=http.client.NOT_FOUND)
 2614 
 2615     def test_system_scoped_token_works_with_domain_specific_drivers(self):
 2616         self.config_fixture.config(
 2617             group='identity', domain_specific_drivers_enabled=True
 2618         )
 2619 
 2620         PROVIDERS.assignment_api.create_system_grant_for_user(
 2621             self.user['id'], self.role['id']
 2622         )
 2623 
 2624         token_id = self.get_system_scoped_token()
 2625         headers = {'X-Auth-Token': token_id}
 2626 
 2627         app = self.loadapp()
 2628         with app.test_client() as c:
 2629             c.get('/v3/users', headers=headers)
 2630 
 2631     def test_fetch_expired_allow_expired_in_expired_window(self):
 2632         self.config_fixture.config(group='token',
 2633                                    expiration=10,
 2634                                    allow_expired_window=20)
 2635         time = datetime.datetime.utcnow()
 2636         with freezegun.freeze_time(time):
 2637             token = self._get_project_scoped_token()
 2638 
 2639         tick = datetime.timedelta(seconds=15)
 2640         with freezegun.freeze_time(time + tick):
 2641             # after passing expiry time validation fails
 2642             self._validate_token(token, expected_status=http.client.NOT_FOUND)
 2643 
 2644             # but if we pass allow_expired it validates
 2645             r = self._validate_token(token, allow_expired=True)
 2646             self.assertValidProjectScopedTokenResponse(r)
 2647 
 2648 
 2649 class TokenDataTests(object):
 2650     """Test the data in specific token types."""
 2651 
 2652     def test_unscoped_token_format(self):
 2653         # ensure the unscoped token response contains the appropriate data
 2654         r = self.get('/auth/tokens', headers=self.headers)
 2655         self.assertValidUnscopedTokenResponse(r)
 2656 
 2657     def test_domain_scoped_token_format(self):
 2658         # ensure the domain scoped token response contains the appropriate data
 2659         PROVIDERS.assignment_api.create_grant(
 2660             self.role['id'],
 2661             user_id=self.default_domain_user['id'],
 2662             domain_id=self.domain['id'])
 2663 
 2664         domain_scoped_token = self.get_requested_token(
 2665             self.build_authentication_request(
 2666                 user_id=self.default_domain_user['id'],
 2667                 password=self.default_domain_user['password'],
 2668                 domain_id=self.domain['id'])
 2669         )
 2670         self.headers['X-Subject-Token'] = domain_scoped_token
 2671         r = self.get('/auth/tokens', headers=self.headers)
 2672         self.assertValidDomainScopedTokenResponse(r)
 2673 
 2674     def test_project_scoped_token_format(self):
 2675         # ensure project scoped token responses contains the appropriate data
 2676         project_scoped_token = self.get_requested_token(
 2677             self.build_authentication_request(
 2678                 user_id=self.default_domain_user['id'],
 2679                 password=self.default_domain_user['password'],
 2680                 project_id=self.default_domain_project['id'])
 2681         )
 2682         self.headers['X-Subject-Token'] = project_scoped_token
 2683         r = self.get('/auth/tokens', headers=self.headers)
 2684         self.assertValidProjectScopedTokenResponse(r)
 2685 
 2686     def test_extra_data_in_unscoped_token_fails_validation(self):
 2687         # ensure unscoped token response contains the appropriate data
 2688         r = self.get('/auth/tokens', headers=self.headers)
 2689 
 2690         # populate the response result with some extra data
 2691         r.result['token'][u'extra'] = str(uuid.uuid4().hex)
 2692         self.assertRaises(exception.SchemaValidationError,
 2693                           self.assertValidUnscopedTokenResponse,
 2694                           r)
 2695 
 2696     def test_extra_data_in_domain_scoped_token_fails_validation(self):
 2697         # ensure domain scoped token response contains the appropriate data
 2698         PROVIDERS.assignment_api.create_grant(
 2699             self.role['id'],
 2700             user_id=self.default_domain_user['id'],
 2701             domain_id=self.domain['id'])
 2702 
 2703         domain_scoped_token = self.get_requested_token(
 2704             self.build_authentication_request(
 2705                 user_id=self.default_domain_user['id'],
 2706                 password=self.default_domain_user['password'],
 2707                 domain_id=self.domain['id'])
 2708         )
 2709         self.headers['X-Subject-Token'] = domain_scoped_token
 2710         r = self.get('/auth/tokens', headers=self.headers)
 2711 
 2712         # populate the response result with some extra data
 2713         r.result['token'][u'extra'] = str(uuid.uuid4().hex)
 2714         self.assertRaises(exception.SchemaValidationError,
 2715                           self.assertValidDomainScopedTokenResponse,
 2716                           r)
 2717 
 2718     def test_extra_data_in_project_scoped_token_fails_validation(self):
 2719         # ensure project scoped token responses contains the appropriate data
 2720         project_scoped_token = self.get_requested_token(
 2721             self.build_authentication_request(
 2722                 user_id=self.default_domain_user['id'],
 2723                 password=self.default_domain_user['password'],
 2724                 project_id=self.default_domain_project['id'])
 2725         )
 2726         self.headers['X-Subject-Token'] = project_scoped_token
 2727         resp = self.get('/auth/tokens', headers=self.headers)
 2728 
 2729         # populate the response result with some extra data
 2730         resp.result['token'][u'extra'] = str(uuid.uuid4().hex)
 2731         self.assertRaises(exception.SchemaValidationError,
 2732                           self.assertValidProjectScopedTokenResponse,
 2733                           resp)
 2734 
 2735 
 2736 class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
 2737     def config_overrides(self):
 2738         super(AllowRescopeScopedTokenDisabledTests, self).config_overrides()
 2739         self.config_fixture.config(
 2740             group='token',
 2741             allow_rescope_scoped_token=False)
 2742 
 2743     def test_rescoping_v3_to_v3_disabled(self):
 2744         self.v3_create_token(
 2745             self.build_authentication_request(
 2746                 token=self.get_scoped_token(),
 2747                 project_id=self.project_id),
 2748             expected_status=http.client.FORBIDDEN)
 2749 
 2750     def test_rescoped_domain_token_disabled(self):
 2751 
 2752         self.domainA = unit.new_domain_ref()
 2753         PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
 2754         PROVIDERS.assignment_api.create_grant(
 2755             self.role['id'], user_id=self.user['id'],
 2756             domain_id=self.domainA['id']
 2757         )
 2758         unscoped_token = self.get_requested_token(
 2759             self.build_authentication_request(
 2760                 user_id=self.user['id'],
 2761                 password=self.user['password']))
 2762         # Get a domain-scoped token from the unscoped token
 2763         domain_scoped_token = self.get_requested_token(
 2764             self.build_authentication_request(
 2765                 token=unscoped_token,
 2766                 domain_id=self.domainA['id']))
 2767         self.v3_create_token(
 2768             self.build_authentication_request(
 2769                 token=domain_scoped_token,
 2770                 project_id=self.project_id),
 2771             expected_status=http.client.FORBIDDEN)
 2772 
 2773 
 2774 class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests,
 2775                           TokenDataTests):
 2776     def config_overrides(self):
 2777         super(TestFernetTokenAPIs, self).config_overrides()
 2778         self.config_fixture.config(group='token', provider='fernet',
 2779                                    cache_on_issue=True)
 2780         self.useFixture(
 2781             ksfixtures.KeyRepository(
 2782                 self.config_fixture,
 2783                 'fernet_tokens',
 2784                 CONF.fernet_tokens.max_active_keys
 2785             )
 2786         )
 2787 
 2788     def setUp(self):
 2789         super(TestFernetTokenAPIs, self).setUp()
 2790         self.doSetUp()
 2791 
 2792     def _make_auth_request(self, auth_data):
 2793         token = super(TestFernetTokenAPIs, self)._make_auth_request(auth_data)
 2794         self.assertLess(len(token), 255)
 2795         return token
 2796 
 2797     def test_validate_tampered_unscoped_token_fails(self):
 2798         unscoped_token = self._get_unscoped_token()
 2799         tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
 2800                           unscoped_token[50 + 32:])
 2801         self._validate_token(tampered_token,
 2802                              expected_status=http.client.NOT_FOUND)
 2803 
 2804     def test_validate_tampered_project_scoped_token_fails(self):
 2805         project_scoped_token = self._get_project_scoped_token()
 2806         tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
 2807                           project_scoped_token[50 + 32:])
 2808         self._validate_token(tampered_token,
 2809                              expected_status=http.client.NOT_FOUND)
 2810 
 2811     def test_validate_tampered_trust_scoped_token_fails(self):
 2812         trustee_user, trust = self._create_trust()
 2813         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2814         # Get a trust scoped token
 2815         tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
 2816                           trust_scoped_token[50 + 32:])
 2817         self._validate_token(tampered_token,
 2818                              expected_status=http.client.NOT_FOUND)
 2819 
 2820     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 2821         # NOTE(amakarov): have to override this test for non-persistent tokens
 2822         # as TokenNotFound exception makes no sense for those.
 2823         trustee_user, trust = self._create_trust()
 2824         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2825         # Validate a trust scoped token
 2826         r = self._validate_token(trust_scoped_token)
 2827         self.assertValidProjectScopedTokenResponse(r)
 2828 
 2829         # Disable the trustor
 2830         trustor_update_ref = dict(enabled=False)
 2831         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 2832         # Ensure validating a token for a disabled user fails
 2833         self._validate_token(
 2834             trust_scoped_token,
 2835             expected_status=http.client.FORBIDDEN
 2836         )
 2837 
 2838 
 2839 class TestJWSTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, TokenDataTests):
 2840     def config_overrides(self):
 2841         super(TestJWSTokenAPIs, self).config_overrides()
 2842         self.config_fixture.config(group='token', provider='jws',
 2843                                    cache_on_issue=True)
 2844         self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture))
 2845 
 2846     def setUp(self):
 2847         super(TestJWSTokenAPIs, self).setUp()
 2848         self.doSetUp()
 2849 
 2850     def _make_auth_request(self, auth_data):
 2851         token = super(TestJWSTokenAPIs, self)._make_auth_request(auth_data)
 2852         self.assertLess(len(token), 350)
 2853         return token
 2854 
 2855     def test_validate_tampered_unscoped_token_fails(self):
 2856         unscoped_token = self._get_unscoped_token()
 2857         tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
 2858                           unscoped_token[50 + 32:])
 2859         self._validate_token(tampered_token,
 2860                              expected_status=http.client.NOT_FOUND)
 2861 
 2862     def test_validate_tampered_project_scoped_token_fails(self):
 2863         project_scoped_token = self._get_project_scoped_token()
 2864         tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
 2865                           project_scoped_token[50 + 32:])
 2866         self._validate_token(tampered_token,
 2867                              expected_status=http.client.NOT_FOUND)
 2868 
 2869     def test_validate_tampered_trust_scoped_token_fails(self):
 2870         trustee_user, trust = self._create_trust()
 2871         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2872         # Get a trust scoped token
 2873         tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
 2874                           trust_scoped_token[50 + 32:])
 2875         self._validate_token(tampered_token,
 2876                              expected_status=http.client.NOT_FOUND)
 2877 
 2878     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 2879         # NOTE(amakarov): have to override this test for non-persistent tokens
 2880         # as TokenNotFound exception makes no sense for those.
 2881         trustee_user, trust = self._create_trust()
 2882         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2883         # Validate a trust scoped token
 2884         r = self._validate_token(trust_scoped_token)
 2885         self.assertValidProjectScopedTokenResponse(r)
 2886 
 2887         # Disable the trustor
 2888         trustor_update_ref = dict(enabled=False)
 2889         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 2890         # Ensure validating a token for a disabled user fails
 2891         self._validate_token(
 2892             trust_scoped_token,
 2893             expected_status=http.client.FORBIDDEN
 2894         )
 2895 
 2896 
 2897 class TestTokenRevokeById(test_v3.RestfulTestCase):
 2898     """Test token revocation on the v3 Identity API."""
 2899 
 2900     def config_overrides(self):
 2901         super(TestTokenRevokeById, self).config_overrides()
 2902         self.config_fixture.config(
 2903             group='token',
 2904             provider='fernet',
 2905             revoke_by_id=False)
 2906         self.useFixture(
 2907             ksfixtures.KeyRepository(
 2908                 self.config_fixture,
 2909                 'fernet_tokens',
 2910                 CONF.fernet_tokens.max_active_keys
 2911             )
 2912         )
 2913 
 2914     def setUp(self):
 2915         """Setup for Token Revoking Test Cases.
 2916 
 2917         As well as the usual housekeeping, create a set of domains,
 2918         users, groups, roles and projects for the subsequent tests:
 2919 
 2920         - Two domains: A & B
 2921         - Three users (1, 2 and 3)
 2922         - Three groups (1, 2 and 3)
 2923         - Two roles (1 and 2)
 2924         - DomainA owns user1, domainB owns user2 and user3
 2925         - DomainA owns group1 and group2, domainB owns group3
 2926         - User1 and user2 are members of group1
 2927         - User3 is a member of group2
 2928         - Two projects: A & B, both in domainA
 2929         - Group1 has role1 on Project A and B, meaning that user1 and user2
 2930           will get these roles by virtue of membership
 2931         - User1, 2 and 3 have role1 assigned to projectA
 2932         - Group1 has role1 on Project A and B, meaning that user1 and user2
 2933           will get role1 (duplicated) by virtue of membership
 2934         - User1 has role2 assigned to domainA
 2935 
 2936         """
 2937         super(TestTokenRevokeById, self).setUp()
 2938 
 2939         # Start by creating a couple of domains and projects
 2940         self.domainA = unit.new_domain_ref()
 2941         PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
 2942         self.domainB = unit.new_domain_ref()
 2943         PROVIDERS.resource_api.create_domain(self.domainB['id'], self.domainB)
 2944         self.projectA = unit.new_project_ref(domain_id=self.domainA['id'])
 2945         PROVIDERS.resource_api.create_project(
 2946             self.projectA['id'], self.projectA
 2947         )
 2948         self.projectB = unit.new_project_ref(domain_id=self.domainA['id'])
 2949         PROVIDERS.resource_api.create_project(
 2950             self.projectB['id'], self.projectB
 2951         )
 2952 
 2953         # Now create some users
 2954         self.user1 = unit.create_user(PROVIDERS.identity_api,
 2955                                       domain_id=self.domainA['id'])
 2956 
 2957         self.user2 = unit.create_user(PROVIDERS.identity_api,
 2958                                       domain_id=self.domainB['id'])
 2959 
 2960         self.user3 = unit.create_user(PROVIDERS.identity_api,
 2961                                       domain_id=self.domainB['id'])
 2962 
 2963         self.group1 = unit.new_group_ref(domain_id=self.domainA['id'])
 2964         self.group1 = PROVIDERS.identity_api.create_group(self.group1)
 2965 
 2966         self.group2 = unit.new_group_ref(domain_id=self.domainA['id'])
 2967         self.group2 = PROVIDERS.identity_api.create_group(self.group2)
 2968 
 2969         self.group3 = unit.new_group_ref(domain_id=self.domainB['id'])
 2970         self.group3 = PROVIDERS.identity_api.create_group(self.group3)
 2971 
 2972         PROVIDERS.identity_api.add_user_to_group(
 2973             self.user1['id'], self.group1['id']
 2974         )
 2975         PROVIDERS.identity_api.add_user_to_group(
 2976             self.user2['id'], self.group1['id']
 2977         )
 2978         PROVIDERS.identity_api.add_user_to_group(
 2979             self.user3['id'], self.group2['id']
 2980         )
 2981 
 2982         self.role1 = unit.new_role_ref()
 2983         PROVIDERS.role_api.create_role(self.role1['id'], self.role1)
 2984         self.role2 = unit.new_role_ref()
 2985         PROVIDERS.role_api.create_role(self.role2['id'], self.role2)
 2986 
 2987         PROVIDERS.assignment_api.create_grant(
 2988             self.role2['id'], user_id=self.user1['id'],
 2989             domain_id=self.domainA['id']
 2990         )
 2991         PROVIDERS.assignment_api.create_grant(
 2992             self.role1['id'], user_id=self.user1['id'],
 2993             project_id=self.projectA['id']
 2994         )
 2995         PROVIDERS.assignment_api.create_grant(
 2996             self.role1['id'], user_id=self.user2['id'],
 2997             project_id=self.projectA['id']
 2998         )
 2999         PROVIDERS.assignment_api.create_grant(
 3000             self.role1['id'], user_id=self.user3['id'],
 3001             project_id=self.projectA['id']
 3002         )
 3003         PROVIDERS.assignment_api.create_grant(
 3004             self.role1['id'], group_id=self.group1['id'],
 3005             project_id=self.projectA['id']
 3006         )
 3007 
 3008     def test_unscoped_token_remains_valid_after_role_assignment(self):
 3009         unscoped_token = self.get_requested_token(
 3010             self.build_authentication_request(
 3011                 user_id=self.user1['id'],
 3012                 password=self.user1['password']))
 3013 
 3014         scoped_token = self.get_requested_token(
 3015             self.build_authentication_request(
 3016                 token=unscoped_token,
 3017                 project_id=self.projectA['id']))
 3018 
 3019         # confirm both tokens are valid
 3020         self.head('/auth/tokens',
 3021                   headers={'X-Subject-Token': unscoped_token},
 3022                   expected_status=http.client.OK)
 3023         self.head('/auth/tokens',
 3024                   headers={'X-Subject-Token': scoped_token},
 3025                   expected_status=http.client.OK)
 3026 
 3027         # create a new role
 3028         role = unit.new_role_ref()
 3029         PROVIDERS.role_api.create_role(role['id'], role)
 3030 
 3031         # assign a new role
 3032         self.put(
 3033             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
 3034                 'project_id': self.projectA['id'],
 3035                 'user_id': self.user1['id'],
 3036                 'role_id': role['id']})
 3037 
 3038         # both tokens should remain valid
 3039         self.head('/auth/tokens',
 3040                   headers={'X-Subject-Token': unscoped_token},
 3041                   expected_status=http.client.OK)
 3042         self.head('/auth/tokens',
 3043                   headers={'X-Subject-Token': scoped_token},
 3044                   expected_status=http.client.OK)
 3045 
 3046     def test_deleting_user_grant_revokes_token(self):
 3047         """Test deleting a user grant revokes token.
 3048 
 3049         Test Plan:
 3050 
 3051         - Get a token for user, scoped to Project
 3052         - Delete the grant user has on Project
 3053         - Check token is no longer valid
 3054 
 3055         """
 3056         auth_data = self.build_authentication_request(
 3057             user_id=self.user['id'],
 3058             password=self.user['password'],
 3059             project_id=self.project['id'])
 3060         token = self.get_requested_token(auth_data)
 3061         # Confirm token is valid
 3062         self.head('/auth/tokens',
 3063                   headers={'X-Subject-Token': token},
 3064                   expected_status=http.client.OK)
 3065         # Delete the grant, which should invalidate the token
 3066         grant_url = (
 3067             '/projects/%(project_id)s/users/%(user_id)s/'
 3068             'roles/%(role_id)s' % {
 3069                 'project_id': self.project['id'],
 3070                 'user_id': self.user['id'],
 3071                 'role_id': self.role['id']})
 3072         self.delete(grant_url)
 3073         self.head('/auth/tokens', token=token,
 3074                   expected_status=http.client.UNAUTHORIZED)
 3075 
 3076     def role_data_fixtures(self):
 3077         self.projectC = unit.new_project_ref(domain_id=self.domainA['id'])
 3078         PROVIDERS.resource_api.create_project(
 3079             self.projectC['id'], self.projectC
 3080         )
 3081         self.user4 = unit.create_user(PROVIDERS.identity_api,
 3082                                       domain_id=self.domainB['id'])
 3083         self.user5 = unit.create_user(PROVIDERS.identity_api,
 3084                                       domain_id=self.domainA['id'])
 3085         self.user6 = unit.create_user(PROVIDERS.identity_api,
 3086                                       domain_id=self.domainA['id'])
 3087         PROVIDERS.identity_api.add_user_to_group(
 3088             self.user5['id'], self.group1['id']
 3089         )
 3090         PROVIDERS.assignment_api.create_grant(
 3091             self.role1['id'], group_id=self.group1['id'],
 3092             project_id=self.projectB['id']
 3093         )
 3094         PROVIDERS.assignment_api.create_grant(
 3095             self.role2['id'], user_id=self.user4['id'],
 3096             project_id=self.projectC['id']
 3097         )
 3098         PROVIDERS.assignment_api.create_grant(
 3099             self.role1['id'], user_id=self.user6['id'],
 3100             project_id=self.projectA['id']
 3101         )
 3102         PROVIDERS.assignment_api.create_grant(
 3103             self.role1['id'], user_id=self.user6['id'],
 3104             domain_id=self.domainA['id']
 3105         )
 3106 
 3107     def test_deleting_role_revokes_token(self):
 3108         """Test deleting a role revokes token.
 3109 
 3110         Add some additional test data, namely:
 3111 
 3112         - A third project (project C)
 3113         - Three additional users - user4 owned by domainB and user5 and 6 owned
 3114           by domainA (different domain ownership should not affect the test
 3115           results, just provided to broaden test coverage)
 3116         - User5 is a member of group1
 3117         - Group1 gets an additional assignment - role1 on projectB as well as
 3118           its existing role1 on projectA
 3119         - User4 has role2 on Project C
 3120         - User6 has role1 on projectA and domainA
 3121         - This allows us to create 5 tokens by virtue of different types of
 3122           role assignment:
 3123           - user1, scoped to ProjectA by virtue of user role1 assignment
 3124           - user5, scoped to ProjectB by virtue of group role1 assignment
 3125           - user4, scoped to ProjectC by virtue of user role2 assignment
 3126           - user6, scoped to ProjectA by virtue of user role1 assignment
 3127           - user6, scoped to DomainA by virtue of user role1 assignment
 3128         - role1 is then deleted
 3129         - Check the tokens on Project A and B, and DomainA are revoked, but not
 3130           the one for Project C
 3131 
 3132         """
 3133         self.role_data_fixtures()
 3134 
 3135         # Now we are ready to start issuing requests
 3136         auth_data = self.build_authentication_request(
 3137             user_id=self.user1['id'],
 3138             password=self.user1['password'],
 3139             project_id=self.projectA['id'])
 3140         tokenA = self.get_requested_token(auth_data)
 3141         auth_data = self.build_authentication_request(
 3142             user_id=self.user5['id'],
 3143             password=self.user5['password'],
 3144             project_id=self.projectB['id'])
 3145         tokenB = self.get_requested_token(auth_data)
 3146         auth_data = self.build_authentication_request(
 3147             user_id=self.user4['id'],
 3148             password=self.user4['password'],
 3149             project_id=self.projectC['id'])
 3150         tokenC = self.get_requested_token(auth_data)
 3151         auth_data = self.build_authentication_request(
 3152             user_id=self.user6['id'],
 3153             password=self.user6['password'],
 3154             project_id=self.projectA['id'])
 3155         tokenD = self.get_requested_token(auth_data)
 3156         auth_data = self.build_authentication_request(
 3157             user_id=self.user6['id'],
 3158             password=self.user6['password'],
 3159             domain_id=self.domainA['id'])
 3160         tokenE = self.get_requested_token(auth_data)
 3161         # Confirm tokens are valid
 3162         self.head('/auth/tokens',
 3163                   headers={'X-Subject-Token': tokenA},
 3164                   expected_status=http.client.OK)
 3165         self.head('/auth/tokens',
 3166                   headers={'X-Subject-Token': tokenB},
 3167                   expected_status=http.client.OK)
 3168         self.head('/auth/tokens',
 3169                   headers={'X-Subject-Token': tokenC},
 3170                   expected_status=http.client.OK)
 3171         self.head('/auth/tokens',
 3172                   headers={'X-Subject-Token': tokenD},
 3173                   expected_status=http.client.OK)
 3174         self.head('/auth/tokens',
 3175                   headers={'X-Subject-Token': tokenE},
 3176                   expected_status=http.client.OK)
 3177 
 3178         # Delete the role, which should invalidate the tokens
 3179         role_url = '/roles/%s' % self.role1['id']
 3180         self.delete(role_url)
 3181 
 3182         # Check the tokens that used role1 is invalid
 3183         self.head('/auth/tokens',
 3184                   headers={'X-Subject-Token': tokenA},
 3185                   expected_status=http.client.NOT_FOUND)
 3186         self.head('/auth/tokens',
 3187                   headers={'X-Subject-Token': tokenB},
 3188                   expected_status=http.client.NOT_FOUND)
 3189         self.head('/auth/tokens',
 3190                   headers={'X-Subject-Token': tokenD},
 3191                   expected_status=http.client.NOT_FOUND)
 3192         self.head('/auth/tokens',
 3193                   headers={'X-Subject-Token': tokenE},
 3194                   expected_status=http.client.NOT_FOUND)
 3195 
 3196         # ...but the one using role2 is still valid
 3197         self.head('/auth/tokens',
 3198                   headers={'X-Subject-Token': tokenC},
 3199                   expected_status=http.client.OK)
 3200 
 3201     def test_domain_user_role_assignment_maintains_token(self):
 3202         """Test user-domain role assignment maintains existing token.
 3203 
 3204         Test Plan:
 3205 
 3206         - Get a token for user1, scoped to ProjectA
 3207         - Create a grant for user1 on DomainB
 3208         - Check token is still valid
 3209 
 3210         """
 3211         auth_data = self.build_authentication_request(
 3212             user_id=self.user1['id'],
 3213             password=self.user1['password'],
 3214             project_id=self.projectA['id'])
 3215         token = self.get_requested_token(auth_data)
 3216         # Confirm token is valid
 3217         self.head('/auth/tokens',
 3218                   headers={'X-Subject-Token': token},
 3219                   expected_status=http.client.OK)
 3220         # Assign a role, which should not affect the token
 3221         grant_url = (
 3222             '/domains/%(domain_id)s/users/%(user_id)s/'
 3223             'roles/%(role_id)s' % {
 3224                 'domain_id': self.domainB['id'],
 3225                 'user_id': self.user1['id'],
 3226                 'role_id': self.role1['id']})
 3227         self.put(grant_url)
 3228         self.head('/auth/tokens',
 3229                   headers={'X-Subject-Token': token},
 3230                   expected_status=http.client.OK)
 3231 
 3232     def test_disabling_project_revokes_token(self):
 3233         token = self.get_requested_token(
 3234             self.build_authentication_request(
 3235                 user_id=self.user3['id'],
 3236                 password=self.user3['password'],
 3237                 project_id=self.projectA['id']))
 3238 
 3239         # confirm token is valid
 3240         self.head('/auth/tokens',
 3241                   headers={'X-Subject-Token': token},
 3242                   expected_status=http.client.OK)
 3243 
 3244         # disable the project, which should invalidate the token
 3245         self.patch(
 3246             '/projects/%(project_id)s' % {'project_id': self.projectA['id']},
 3247             body={'project': {'enabled': False}})
 3248 
 3249         # user should no longer have access to the project
 3250         self.head('/auth/tokens',
 3251                   headers={'X-Subject-Token': token},
 3252                   expected_status=http.client.NOT_FOUND)
 3253         self.v3_create_token(
 3254             self.build_authentication_request(
 3255                 user_id=self.user3['id'],
 3256                 password=self.user3['password'],
 3257                 project_id=self.projectA['id']),
 3258             expected_status=http.client.UNAUTHORIZED)
 3259 
 3260     def test_deleting_project_revokes_token(self):
 3261         token = self.get_requested_token(
 3262             self.build_authentication_request(
 3263                 user_id=self.user3['id'],
 3264                 password=self.user3['password'],
 3265                 project_id=self.projectA['id']))
 3266 
 3267         # confirm token is valid
 3268         self.head('/auth/tokens',
 3269                   headers={'X-Subject-Token': token},
 3270                   expected_status=http.client.OK)
 3271 
 3272         # delete the project, which should invalidate the token
 3273         self.delete(
 3274             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3275 
 3276         # user should no longer have access to the project
 3277         self.head('/auth/tokens',
 3278                   headers={'X-Subject-Token': token},
 3279                   expected_status=http.client.NOT_FOUND)
 3280         self.v3_create_token(
 3281             self.build_authentication_request(
 3282                 user_id=self.user3['id'],
 3283                 password=self.user3['password'],
 3284                 project_id=self.projectA['id']),
 3285             expected_status=http.client.UNAUTHORIZED)
 3286 
 3287     def test_deleting_group_grant_revokes_tokens(self):
 3288         """Test deleting a group grant revokes tokens.
 3289 
 3290         Test Plan:
 3291 
 3292         - Get a token for user1, scoped to ProjectA
 3293         - Get a token for user2, scoped to ProjectA
 3294         - Get a token for user3, scoped to ProjectA
 3295         - Delete the grant group1 has on ProjectA
 3296         - Check tokens for user1 & user2 are no longer valid,
 3297           since user1 and user2 are members of group1
 3298         - Check token for user3 is invalid too
 3299 
 3300         """
 3301         auth_data = self.build_authentication_request(
 3302             user_id=self.user1['id'],
 3303             password=self.user1['password'],
 3304             project_id=self.projectA['id'])
 3305         token1 = self.get_requested_token(auth_data)
 3306         auth_data = self.build_authentication_request(
 3307             user_id=self.user2['id'],
 3308             password=self.user2['password'],
 3309             project_id=self.projectA['id'])
 3310         token2 = self.get_requested_token(auth_data)
 3311         auth_data = self.build_authentication_request(
 3312             user_id=self.user3['id'],
 3313             password=self.user3['password'],
 3314             project_id=self.projectA['id'])
 3315         token3 = self.get_requested_token(auth_data)
 3316         # Confirm tokens are valid
 3317         self.head('/auth/tokens',
 3318                   headers={'X-Subject-Token': token1},
 3319                   expected_status=http.client.OK)
 3320         self.head('/auth/tokens',
 3321                   headers={'X-Subject-Token': token2},
 3322                   expected_status=http.client.OK)
 3323         self.head('/auth/tokens',
 3324                   headers={'X-Subject-Token': token3},
 3325                   expected_status=http.client.OK)
 3326         # Delete the group grant, which should invalidate the
 3327         # tokens for user1 and user2
 3328         grant_url = (
 3329             '/projects/%(project_id)s/groups/%(group_id)s/'
 3330             'roles/%(role_id)s' % {
 3331                 'project_id': self.projectA['id'],
 3332                 'group_id': self.group1['id'],
 3333                 'role_id': self.role1['id']})
 3334         self.delete(grant_url)
 3335         PROVIDERS.assignment_api.delete_grant(
 3336             role_id=self.role1['id'], project_id=self.projectA['id'],
 3337             user_id=self.user1['id']
 3338         )
 3339         PROVIDERS.assignment_api.delete_grant(
 3340             role_id=self.role1['id'], project_id=self.projectA['id'],
 3341             user_id=self.user2['id']
 3342         )
 3343         self.head('/auth/tokens', token=token1,
 3344                   expected_status=http.client.UNAUTHORIZED)
 3345         self.head('/auth/tokens', token=token2,
 3346                   expected_status=http.client.UNAUTHORIZED)
 3347         # But user3's token should be invalid too as revocation is done for
 3348         # scope role & project
 3349         self.head('/auth/tokens',
 3350                   headers={'X-Subject-Token': token3},
 3351                   expected_status=http.client.OK)
 3352 
 3353     def test_domain_group_role_assignment_maintains_token(self):
 3354         """Test domain-group role assignment maintains existing token.
 3355 
 3356         Test Plan:
 3357 
 3358         - Get a token for user1, scoped to ProjectA
 3359         - Create a grant for group1 on DomainB
 3360         - Check token is still longer valid
 3361 
 3362         """
 3363         auth_data = self.build_authentication_request(
 3364             user_id=self.user1['id'],
 3365             password=self.user1['password'],
 3366             project_id=self.projectA['id'])
 3367         token = self.get_requested_token(auth_data)
 3368         # Confirm token is valid
 3369         self.head('/auth/tokens',
 3370                   headers={'X-Subject-Token': token},
 3371                   expected_status=http.client.OK)
 3372         # Delete the grant, which should invalidate the token
 3373         grant_url = (
 3374             '/domains/%(domain_id)s/groups/%(group_id)s/'
 3375             'roles/%(role_id)s' % {
 3376                 'domain_id': self.domainB['id'],
 3377                 'group_id': self.group1['id'],
 3378                 'role_id': self.role1['id']})
 3379         self.put(grant_url)
 3380         self.head('/auth/tokens',
 3381                   headers={'X-Subject-Token': token},
 3382                   expected_status=http.client.OK)
 3383 
 3384     def test_group_membership_changes_revokes_token(self):
 3385         """Test add/removal to/from group revokes token.
 3386 
 3387         Test Plan:
 3388 
 3389         - Get a token for user1, scoped to ProjectA
 3390         - Get a token for user2, scoped to ProjectA
 3391         - Remove user1 from group1
 3392         - Check token for user1 is no longer valid
 3393         - Check token for user2 is still valid, even though
 3394           user2 is also part of group1
 3395         - Add user2 to group2
 3396         - Check token for user2 is now no longer valid
 3397 
 3398         """
 3399         auth_data = self.build_authentication_request(
 3400             user_id=self.user1['id'],
 3401             password=self.user1['password'],
 3402             project_id=self.projectA['id'])
 3403         token1 = self.get_requested_token(auth_data)
 3404         auth_data = self.build_authentication_request(
 3405             user_id=self.user2['id'],
 3406             password=self.user2['password'],
 3407             project_id=self.projectA['id'])
 3408         token2 = self.get_requested_token(auth_data)
 3409         # Confirm tokens are valid
 3410         self.head('/auth/tokens',
 3411                   headers={'X-Subject-Token': token1},
 3412                   expected_status=http.client.OK)
 3413         self.head('/auth/tokens',
 3414                   headers={'X-Subject-Token': token2},
 3415                   expected_status=http.client.OK)
 3416         # Remove user1 from group1, which should invalidate
 3417         # the token
 3418         self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
 3419             'group_id': self.group1['id'],
 3420             'user_id': self.user1['id']})
 3421         self.head('/auth/tokens',
 3422                   headers={'X-Subject-Token': token1},
 3423                   expected_status=http.client.NOT_FOUND)
 3424         # But user2's token should still be valid
 3425         self.head('/auth/tokens',
 3426                   headers={'X-Subject-Token': token2},
 3427                   expected_status=http.client.OK)
 3428         # Adding user2 to a group should not invalidate token
 3429         self.put('/groups/%(group_id)s/users/%(user_id)s' % {
 3430             'group_id': self.group2['id'],
 3431             'user_id': self.user2['id']})
 3432         self.head('/auth/tokens',
 3433                   headers={'X-Subject-Token': token2},
 3434                   expected_status=http.client.OK)
 3435 
 3436     def test_removing_role_assignment_does_not_affect_other_users(self):
 3437         """Revoking a role from one user should not affect other users."""
 3438         time = datetime.datetime.utcnow()
 3439         with freezegun.freeze_time(time) as frozen_datetime:
 3440             # This group grant is not needed for the test
 3441             self.delete(
 3442                 '/projects/%(p_id)s/groups/%(g_id)s/roles/%(r_id)s' %
 3443                 {'p_id': self.projectA['id'],
 3444                  'g_id': self.group1['id'],
 3445                  'r_id': self.role1['id']})
 3446 
 3447             # NOTE(lbragstad): Here we advance the clock one second to pass
 3448             # into the threshold of a new second because we just persisted a
 3449             # revocation event for removing a role from a group on a project.
 3450             # One thing to note about that revocation event is that it has no
 3451             # context about the group, so even though user3 might not be in
 3452             # group1, they could have their token revoked because the
 3453             # revocation event is very general.
 3454             frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
 3455 
 3456             user1_token = self.get_requested_token(
 3457                 self.build_authentication_request(
 3458                     user_id=self.user1['id'],
 3459                     password=self.user1['password'],
 3460                     project_id=self.projectA['id']))
 3461 
 3462             user3_token = self.get_requested_token(
 3463                 self.build_authentication_request(
 3464                     user_id=self.user3['id'],
 3465                     password=self.user3['password'],
 3466                     project_id=self.projectA['id']))
 3467 
 3468             # delete relationships between user1 and projectA from setUp
 3469             self.delete(
 3470                 '/projects/%(p_id)s/users/%(u_id)s/roles/%(r_id)s' % {
 3471                     'p_id': self.projectA['id'],
 3472                     'u_id': self.user1['id'],
 3473                     'r_id': self.role1['id']})
 3474             # authorization for the first user should now fail
 3475             self.head('/auth/tokens',
 3476                       headers={'X-Subject-Token': user1_token},
 3477                       expected_status=http.client.NOT_FOUND)
 3478             self.v3_create_token(
 3479                 self.build_authentication_request(
 3480                     user_id=self.user1['id'],
 3481                     password=self.user1['password'],
 3482                     project_id=self.projectA['id']),
 3483                 expected_status=http.client.UNAUTHORIZED)
 3484 
 3485             # authorization for the second user should still succeed
 3486             self.head('/auth/tokens',
 3487                       headers={'X-Subject-Token': user3_token},
 3488                       expected_status=http.client.OK)
 3489             self.v3_create_token(
 3490                 self.build_authentication_request(
 3491                     user_id=self.user3['id'],
 3492                     password=self.user3['password'],
 3493                     project_id=self.projectA['id']))
 3494 
 3495     def test_deleting_project_deletes_grants(self):
 3496         # This is to make it a little bit more pretty with PEP8
 3497         role_path = ('/projects/%(project_id)s/users/%(user_id)s/'
 3498                      'roles/%(role_id)s')
 3499         role_path = role_path % {'user_id': self.user['id'],
 3500                                  'project_id': self.projectA['id'],
 3501                                  'role_id': self.role['id']}
 3502 
 3503         # grant the user a role on the project
 3504         self.put(role_path)
 3505 
 3506         # delete the project, which should remove the roles
 3507         self.delete(
 3508             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3509 
 3510         # Make sure that we get a 404 Not Found when heading that role.
 3511         self.head(role_path, expected_status=http.client.NOT_FOUND)
 3512 
 3513     def test_revoke_token_from_token(self):
 3514         # Test that a scoped token can be requested from an unscoped token,
 3515         # the scoped token can be revoked, and the unscoped token remains
 3516         # valid.
 3517 
 3518         unscoped_token = self.get_requested_token(
 3519             self.build_authentication_request(
 3520                 user_id=self.user1['id'],
 3521                 password=self.user1['password']))
 3522 
 3523         # Get a project-scoped token from the unscoped token
 3524         project_scoped_token = self.get_requested_token(
 3525             self.build_authentication_request(
 3526                 token=unscoped_token,
 3527                 project_id=self.projectA['id']))
 3528 
 3529         # Get a domain-scoped token from the unscoped token
 3530         domain_scoped_token = self.get_requested_token(
 3531             self.build_authentication_request(
 3532                 token=unscoped_token,
 3533                 domain_id=self.domainA['id']))
 3534 
 3535         # revoke the project-scoped token.
 3536         self.delete('/auth/tokens',
 3537                     headers={'X-Subject-Token': project_scoped_token})
 3538 
 3539         # The project-scoped token is invalidated.
 3540         self.head('/auth/tokens',
 3541                   headers={'X-Subject-Token': project_scoped_token},
 3542                   expected_status=http.client.NOT_FOUND)
 3543 
 3544         # The unscoped token should still be valid.
 3545         self.head('/auth/tokens',
 3546                   headers={'X-Subject-Token': unscoped_token},
 3547                   expected_status=http.client.OK)
 3548 
 3549         # The domain-scoped token should still be valid.
 3550         self.head('/auth/tokens',
 3551                   headers={'X-Subject-Token': domain_scoped_token},
 3552                   expected_status=http.client.OK)
 3553 
 3554         # revoke the domain-scoped token.
 3555         self.delete('/auth/tokens',
 3556                     headers={'X-Subject-Token': domain_scoped_token})
 3557 
 3558         # The domain-scoped token is invalid.
 3559         self.head('/auth/tokens',
 3560                   headers={'X-Subject-Token': domain_scoped_token},
 3561                   expected_status=http.client.NOT_FOUND)
 3562 
 3563         # The unscoped token should still be valid.
 3564         self.head('/auth/tokens',
 3565                   headers={'X-Subject-Token': unscoped_token},
 3566                   expected_status=http.client.OK)
 3567 
 3568 
 3569 class TestTokenRevokeApi(TestTokenRevokeById):
 3570     """Test token revocation on the v3 Identity API."""
 3571 
 3572     def config_overrides(self):
 3573         super(TestTokenRevokeApi, self).config_overrides()
 3574         self.config_fixture.config(
 3575             group='token',
 3576             provider='fernet',
 3577             revoke_by_id=False)
 3578         self.useFixture(
 3579             ksfixtures.KeyRepository(
 3580                 self.config_fixture,
 3581                 'fernet_tokens',
 3582                 CONF.fernet_tokens.max_active_keys
 3583             )
 3584         )
 3585 
 3586     def assertValidDeletedProjectResponse(self, events_response, project_id):
 3587         events = events_response['events']
 3588         self.assertEqual(1, len(events))
 3589         self.assertEqual(project_id, events[0]['project_id'])
 3590         self.assertIsNotNone(events[0]['issued_before'])
 3591         self.assertIsNotNone(events_response['links'])
 3592         del (events_response['events'][0]['issued_before'])
 3593         del (events_response['events'][0]['revoked_at'])
 3594         del (events_response['links'])
 3595         expected_response = {'events': [{'project_id': project_id}]}
 3596         self.assertEqual(expected_response, events_response)
 3597 
 3598     def assertValidRevokedTokenResponse(self, events_response, **kwargs):
 3599         events = events_response['events']
 3600         self.assertEqual(1, len(events))
 3601         for k, v in kwargs.items():
 3602             self.assertEqual(v, events[0].get(k))
 3603         self.assertIsNotNone(events[0]['issued_before'])
 3604         self.assertIsNotNone(events_response['links'])
 3605         del (events_response['events'][0]['issued_before'])
 3606         del (events_response['events'][0]['revoked_at'])
 3607         del (events_response['links'])
 3608 
 3609         expected_response = {'events': [kwargs]}
 3610         self.assertEqual(expected_response, events_response)
 3611 
 3612     def test_revoke_token(self):
 3613         scoped_token = self.get_scoped_token()
 3614         headers = {'X-Subject-Token': scoped_token}
 3615         response = self.get('/auth/tokens', headers=headers).json_body['token']
 3616 
 3617         self.delete('/auth/tokens', headers=headers)
 3618         self.head('/auth/tokens', headers=headers,
 3619                   expected_status=http.client.NOT_FOUND)
 3620         events_response = self.get('/OS-REVOKE/events').json_body
 3621         self.assertValidRevokedTokenResponse(events_response,
 3622                                              audit_id=response['audit_ids'][0])
 3623 
 3624     def test_get_revoke_by_id_false_returns_gone(self):
 3625         self.get('/auth/tokens/OS-PKI/revoked',
 3626                  expected_status=http.client.GONE)
 3627 
 3628     def test_head_revoke_by_id_false_returns_gone(self):
 3629         self.head('/auth/tokens/OS-PKI/revoked',
 3630                   expected_status=http.client.GONE)
 3631 
 3632     def test_revoke_by_id_true_returns_forbidden(self):
 3633         self.config_fixture.config(
 3634             group='token',
 3635             revoke_by_id=True)
 3636         self.get(
 3637             '/auth/tokens/OS-PKI/revoked',
 3638             expected_status=http.client.FORBIDDEN
 3639         )
 3640         self.head(
 3641             '/auth/tokens/OS-PKI/revoked',
 3642             expected_status=http.client.FORBIDDEN
 3643         )
 3644 
 3645     def test_list_delete_project_shows_in_event_list(self):
 3646         self.role_data_fixtures()
 3647         events = self.get('/OS-REVOKE/events').json_body['events']
 3648         self.assertEqual([], events)
 3649         self.delete(
 3650             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3651         events_response = self.get('/OS-REVOKE/events').json_body
 3652 
 3653         self.assertValidDeletedProjectResponse(events_response,
 3654                                                self.projectA['id'])
 3655 
 3656     def assertEventDataInList(self, events, **kwargs):
 3657         found = False
 3658         for e in events:
 3659             for key, value in kwargs.items():
 3660                 try:
 3661                     if e[key] != value:
 3662                         break
 3663                 except KeyError:
 3664                     # Break the loop and present a nice error instead of
 3665                     # KeyError
 3666                     break
 3667             else:
 3668                 # If the value of the event[key] matches the value of the kwarg
 3669                 # for each item in kwargs, the event was fully matched and
 3670                 # the assertTrue below should succeed.
 3671                 found = True
 3672         self.assertTrue(found,
 3673                         'event with correct values not in list, expected to '
 3674                         'find event with key-value pairs. Expected: '
 3675                         '"%(expected)s" Events: "%(events)s"' %
 3676                         {'expected': ','.join(
 3677                             ["'%s=%s'" % (k, v) for k, v in kwargs.items()]),
 3678                          'events': events})
 3679 
 3680     def test_list_delete_token_shows_in_event_list(self):
 3681         self.role_data_fixtures()
 3682         events = self.get('/OS-REVOKE/events').json_body['events']
 3683         self.assertEqual([], events)
 3684 
 3685         scoped_token = self.get_scoped_token()
 3686         headers = {'X-Subject-Token': scoped_token}
 3687         auth_req = self.build_authentication_request(token=scoped_token)
 3688         response = self.v3_create_token(auth_req)
 3689         token2 = response.json_body['token']
 3690         headers2 = {'X-Subject-Token': response.headers['X-Subject-Token']}
 3691 
 3692         response = self.v3_create_token(auth_req)
 3693         response.json_body['token']
 3694         headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']}
 3695 
 3696         self.head('/auth/tokens', headers=headers,
 3697                   expected_status=http.client.OK)
 3698         self.head('/auth/tokens', headers=headers2,
 3699                   expected_status=http.client.OK)
 3700         self.head('/auth/tokens', headers=headers3,
 3701                   expected_status=http.client.OK)
 3702 
 3703         self.delete('/auth/tokens', headers=headers)
 3704         # NOTE(ayoung): not deleting token3, as it should be deleted
 3705         # by previous
 3706         events_response = self.get('/OS-REVOKE/events').json_body
 3707         events = events_response['events']
 3708         self.assertEqual(1, len(events))
 3709         self.assertEventDataInList(
 3710             events,
 3711             audit_id=token2['audit_ids'][1])
 3712         self.head('/auth/tokens', headers=headers,
 3713                   expected_status=http.client.NOT_FOUND)
 3714         self.head('/auth/tokens', headers=headers2,
 3715                   expected_status=http.client.OK)
 3716         self.head('/auth/tokens', headers=headers3,
 3717                   expected_status=http.client.OK)
 3718 
 3719     def test_list_with_filter(self):
 3720 
 3721         self.role_data_fixtures()
 3722         events = self.get('/OS-REVOKE/events').json_body['events']
 3723         self.assertEqual(0, len(events))
 3724 
 3725         scoped_token = self.get_scoped_token()
 3726         headers = {'X-Subject-Token': scoped_token}
 3727         auth = self.build_authentication_request(token=scoped_token)
 3728         headers2 = {'X-Subject-Token': self.get_requested_token(auth)}
 3729         self.delete('/auth/tokens', headers=headers)
 3730         self.delete('/auth/tokens', headers=headers2)
 3731 
 3732         events = self.get('/OS-REVOKE/events').json_body['events']
 3733 
 3734         self.assertEqual(2, len(events))
 3735         future = utils.isotime(timeutils.utcnow() +
 3736                                datetime.timedelta(seconds=1000))
 3737 
 3738         events = self.get('/OS-REVOKE/events?since=%s' % (future)
 3739                           ).json_body['events']
 3740         self.assertEqual(0, len(events))
 3741 
 3742 
 3743 class TestAuthExternalDisabled(test_v3.RestfulTestCase):
 3744     def config_overrides(self):
 3745         super(TestAuthExternalDisabled, self).config_overrides()
 3746         self.config_fixture.config(
 3747             group='auth',
 3748             methods=['password', 'token'])
 3749 
 3750     def test_remote_user_disabled(self):
 3751         app = self.loadapp()
 3752         remote_user = '%s@%s' % (self.user['name'], self.domain['name'])
 3753         with app.test_client() as c:
 3754             c.environ_base.update(self.build_external_auth_environ(
 3755                 remote_user))
 3756             auth_data = self.build_authentication_request()
 3757             c.post('/v3/auth/tokens', json=auth_data,
 3758                    expected_status_code=http.client.UNAUTHORIZED)
 3759 
 3760 # FIXME(morgan): This test case must be re-worked to function under flask. It
 3761 # has been commented out until it is re-worked ensuring no issues when webob
 3762 # classes are removed.
 3763 # https://bugs.launchpad.net/keystone/+bug/1793756
 3764 # class AuthExternalDomainBehavior(object):
 3765 #     content_type = 'json'
 3766 #
 3767 #     def test_remote_user_with_realm(self):
 3768 #         api = auth.controllers.Auth()
 3769 #         remote_user = self.user['name']
 3770 #         remote_domain = self.domain['name']
 3771 #         request, auth_info, auth_context = self.build_external_auth_request(
 3772 #             remote_user, remote_domain=remote_domain, kerberos=self.kerberos)
 3773 #
 3774 #         api.authenticate(request, auth_info, auth_context)
 3775 #         self.assertEqual(self.user['id'], auth_context['user_id'])
 3776 #
 3777 #         # Now test to make sure the user name can, itself, contain the
 3778 #         # '@' character.
 3779 #         user = {'name': 'myname@mydivision'}
 3780 #         PROVIDERS.identity_api.update_user(self.user['id'], user)
 3781 #         remote_user = user['name']
 3782 #         request, auth_info, auth_context = self.build_external_auth_request(
 3783 #             remote_user, remote_domain=remote_domain, kerberos=self.kerberos)
 3784 #
 3785 #         api.authenticate(request, auth_info, auth_context)
 3786 #         self.assertEqual(self.user['id'], auth_context['user_id'])
 3787 #
 3788 #
 3789 # FIXME(morgan): This test case must be re-worked to function under flask. It
 3790 # has been commented out until it is re-worked ensuring no issues when webob
 3791 # classes are removed.
 3792 # https://bugs.launchpad.net/keystone/+bug/1793756
 3793 # class TestAuthExternalDefaultDomain(object):
 3794 #     content_type = 'json'
 3795 #
 3796 #     def config_overrides(self):
 3797 #         super(TestAuthExternalDefaultDomain, self).config_overrides()
 3798 #         self.kerberos = False
 3799 #         self.auth_plugin_config_override(external='DefaultDomain')
 3800 #
 3801 #     def test_remote_user_with_default_domain(self):
 3802 #         api = auth.controllers.Auth()
 3803 #         remote_user = self.default_domain_user['name']
 3804 #         request, auth_info, auth_context = self.build_external_auth_request(
 3805 #             remote_user, kerberos=self.kerberos)
 3806 #
 3807 #         api.authenticate(request, auth_info, auth_context)
 3808 #         self.assertEqual(self.default_domain_user['id'],
 3809 #                          auth_context['user_id'])
 3810 #
 3811 #         # Now test to make sure the user name can, itself, contain the
 3812 #         # '@' character.
 3813 #         user = {'name': 'myname@mydivision'}
 3814 #         PROVIDERS.identity_api.update_user(
 3815 #             self.default_domain_user['id'], user
 3816 #         )
 3817 #         remote_user = user['name']
 3818 #         request, auth_info, auth_context = self.build_external_auth_request(
 3819 #             remote_user, kerberos=self.kerberos)
 3820 #
 3821 #         api.authenticate(request, auth_info, auth_context)
 3822 #         self.assertEqual(self.default_domain_user['id'],
 3823 #                          auth_context['user_id'])
 3824 #
 3825 
 3826 
 3827 class TestAuthJSONExternal(test_v3.RestfulTestCase):
 3828     content_type = 'json'
 3829 
 3830     def auth_plugin_config_override(self, methods=None, **method_classes):
 3831         self.config_fixture.config(group='auth', methods=[])
 3832 
 3833     def test_remote_user_no_method(self):
 3834         app = self.loadapp()
 3835         with app.test_client() as c:
 3836             c.environ_base.update(self.build_external_auth_environ(
 3837                 self.default_domain_user['name']))
 3838             auth_data = self.build_authentication_request()
 3839             c.post('/v3/auth/tokens', json=auth_data,
 3840                    expected_status_code=http.client.UNAUTHORIZED)
 3841 
 3842 
 3843 class TrustAPIBehavior(test_v3.RestfulTestCase):
 3844     """Redelegation valid and secure.
 3845 
 3846     Redelegation is a hierarchical structure of trusts between initial trustor
 3847     and a group of users allowed to impersonate trustor and act in his name.
 3848     Hierarchy is created in a process of trusting already trusted permissions
 3849     and organized as an adjacency list using 'redelegated_trust_id' field.
 3850     Redelegation is valid if each subsequent trust in a chain passes 'not more'
 3851     permissions than being redelegated.
 3852 
 3853     Trust constraints are:
 3854      * roles - set of roles trusted by trustor
 3855      * expiration_time
 3856      * allow_redelegation - a flag
 3857      * redelegation_count - decreasing value restricting length of trust chain
 3858      * remaining_uses - DISALLOWED when allow_redelegation == True
 3859 
 3860     Trust becomes invalid in case:
 3861      * trust roles were revoked from trustor
 3862      * one of the users in the delegation chain was disabled or deleted
 3863      * expiration time passed
 3864      * one of the parent trusts has become invalid
 3865      * one of the parent trusts was deleted
 3866 
 3867     """
 3868 
 3869     def config_overrides(self):
 3870         super(TrustAPIBehavior, self).config_overrides()
 3871         self.config_fixture.config(
 3872             group='trust',
 3873             allow_redelegation=True,
 3874             max_redelegation_count=10
 3875         )
 3876 
 3877     def setUp(self):
 3878         super(TrustAPIBehavior, self).setUp()
 3879         # Create a trustee to delegate stuff to
 3880         self.trustee_user = unit.create_user(PROVIDERS.identity_api,
 3881                                              domain_id=self.domain_id)
 3882 
 3883         # trustor->trustee
 3884         self.redelegated_trust_ref = unit.new_trust_ref(
 3885             trustor_user_id=self.user_id,
 3886             trustee_user_id=self.trustee_user['id'],
 3887             project_id=self.project_id,
 3888             impersonation=True,
 3889             expires=dict(minutes=1),
 3890             role_ids=[self.role_id],
 3891             allow_redelegation=True)
 3892 
 3893         # trustor->trustee (no redelegation)
 3894         self.chained_trust_ref = unit.new_trust_ref(
 3895             trustor_user_id=self.user_id,
 3896             trustee_user_id=self.trustee_user['id'],
 3897             project_id=self.project_id,
 3898             impersonation=True,
 3899             role_ids=[self.role_id],
 3900             allow_redelegation=True)
 3901 
 3902     def _get_trust_token(self, trust):
 3903         trust_id = trust['id']
 3904         auth_data = self.build_authentication_request(
 3905             user_id=self.trustee_user['id'],
 3906             password=self.trustee_user['password'],
 3907             trust_id=trust_id)
 3908         trust_token = self.get_requested_token(auth_data)
 3909         return trust_token
 3910 
 3911     def test_depleted_redelegation_count_error(self):
 3912         self.redelegated_trust_ref['redelegation_count'] = 0
 3913         r = self.post('/OS-TRUST/trusts',
 3914                       body={'trust': self.redelegated_trust_ref})
 3915         trust = self.assertValidTrustResponse(r)
 3916         trust_token = self._get_trust_token(trust)
 3917 
 3918         # Attempt to create a redelegated trust.
 3919         self.post('/OS-TRUST/trusts',
 3920                   body={'trust': self.chained_trust_ref},
 3921                   token=trust_token,
 3922                   expected_status=http.client.FORBIDDEN)
 3923 
 3924     def test_modified_redelegation_count_error(self):
 3925         r = self.post('/OS-TRUST/trusts',
 3926                       body={'trust': self.redelegated_trust_ref})
 3927         trust = self.assertValidTrustResponse(r)
 3928         trust_token = self._get_trust_token(trust)
 3929 
 3930         # Attempt to create a redelegated trust with incorrect
 3931         # redelegation_count.
 3932         correct = trust['redelegation_count'] - 1
 3933         incorrect = correct - 1
 3934         self.chained_trust_ref['redelegation_count'] = incorrect
 3935         self.post('/OS-TRUST/trusts',
 3936                   body={'trust': self.chained_trust_ref},
 3937                   token=trust_token,
 3938                   expected_status=http.client.FORBIDDEN)
 3939 
 3940     def test_max_redelegation_count_constraint(self):
 3941         incorrect = CONF.trust.max_redelegation_count + 1
 3942         self.redelegated_trust_ref['redelegation_count'] = incorrect
 3943         self.post('/OS-TRUST/trusts',
 3944                   body={'trust': self.redelegated_trust_ref},
 3945                   expected_status=http.client.FORBIDDEN)
 3946 
 3947     def test_redelegation_expiry(self):
 3948         r = self.post('/OS-TRUST/trusts',
 3949                       body={'trust': self.redelegated_trust_ref})
 3950         trust = self.assertValidTrustResponse(r)
 3951         trust_token = self._get_trust_token(trust)
 3952 
 3953         # Attempt to create a redelegated trust supposed to last longer
 3954         # than the parent trust: let's give it 10 minutes (>1 minute).
 3955         too_long_live_chained_trust_ref = unit.new_trust_ref(
 3956             trustor_user_id=self.user_id,
 3957             trustee_user_id=self.trustee_user['id'],
 3958             project_id=self.project_id,
 3959             impersonation=True,
 3960             expires=dict(minutes=10),
 3961             role_ids=[self.role_id])
 3962         self.post('/OS-TRUST/trusts',
 3963                   body={'trust': too_long_live_chained_trust_ref},
 3964                   token=trust_token,
 3965                   expected_status=http.client.FORBIDDEN)
 3966 
 3967     def test_redelegation_remaining_uses(self):
 3968         r = self.post('/OS-TRUST/trusts',
 3969                       body={'trust': self.redelegated_trust_ref})
 3970         trust = self.assertValidTrustResponse(r)
 3971         trust_token = self._get_trust_token(trust)
 3972 
 3973         # Attempt to create a redelegated trust with remaining_uses defined.
 3974         # It must fail according to specification: remaining_uses must be
 3975         # omitted for trust redelegation. Any number here.
 3976         self.chained_trust_ref['remaining_uses'] = 5
 3977         self.post('/OS-TRUST/trusts',
 3978                   body={'trust': self.chained_trust_ref},
 3979                   token=trust_token,
 3980                   expected_status=http.client.BAD_REQUEST)
 3981 
 3982     def test_roles_subset(self):
 3983         # Build second role
 3984         role = unit.new_role_ref()
 3985         PROVIDERS.role_api.create_role(role['id'], role)
 3986         # assign a new role to the user
 3987         PROVIDERS.assignment_api.create_grant(
 3988             role_id=role['id'], user_id=self.user_id,
 3989             project_id=self.project_id
 3990         )
 3991 
 3992         # Create first trust with extended set of roles
 3993         ref = self.redelegated_trust_ref
 3994         ref['expires_at'] = datetime.datetime.utcnow().replace(
 3995             year=2032).strftime(unit.TIME_FORMAT)
 3996         ref['roles'].append({'id': role['id']})
 3997         r = self.post('/OS-TRUST/trusts',
 3998                       body={'trust': ref})
 3999         trust = self.assertValidTrustResponse(r)
 4000         # Trust created with exact set of roles (checked by role id)
 4001         role_id_set = set(r['id'] for r in ref['roles'])
 4002         trust_role_id_set = set(r['id'] for r in trust['roles'])
 4003         self.assertEqual(role_id_set, trust_role_id_set)
 4004 
 4005         trust_token = self._get_trust_token(trust)
 4006 
 4007         # Chain second trust with roles subset
 4008         self.chained_trust_ref['expires_at'] = (
 4009             datetime.datetime.utcnow().replace(year=2028).strftime(
 4010                 unit.TIME_FORMAT))
 4011         r = self.post('/OS-TRUST/trusts',
 4012                       body={'trust': self.chained_trust_ref},
 4013                       token=trust_token)
 4014         trust2 = self.assertValidTrustResponse(r)
 4015         # First trust contains roles superset
 4016         # Second trust contains roles subset
 4017         role_id_set1 = set(r['id'] for r in trust['roles'])
 4018         role_id_set2 = set(r['id'] for r in trust2['roles'])
 4019         self.assertThat(role_id_set1, matchers.GreaterThan(role_id_set2))
 4020 
 4021     def test_trust_with_implied_roles(self):
 4022         # Create some roles
 4023         role1 = unit.new_role_ref()
 4024         PROVIDERS.role_api.create_role(role1['id'], role1)
 4025         role2 = unit.new_role_ref()
 4026         PROVIDERS.role_api.create_role(role2['id'], role2)
 4027 
 4028         # Implication
 4029         PROVIDERS.role_api.create_implied_role(role1['id'], role2['id'])
 4030 
 4031         # Assign new roles to the user (with role2 implied)
 4032         PROVIDERS.assignment_api.create_grant(
 4033             role_id=role1['id'], user_id=self.user_id,
 4034             project_id=self.project_id
 4035         )
 4036 
 4037         # Create trust
 4038         ref = self.redelegated_trust_ref
 4039         ref['roles'] = [{'id': role1['id']}, {'id': role2['id']}]
 4040         resp = self.post('/OS-TRUST/trusts',
 4041                          body={'trust': ref})
 4042         trust = self.assertValidTrustResponse(resp)
 4043 
 4044         # Trust created with exact set of roles (checked by role id)
 4045         role_ids = [r['id'] for r in ref['roles']]
 4046         trust_role_ids = [r['id'] for r in trust['roles']]
 4047         # Compare requested roles with roles in response
 4048         self.assertEqual(role_ids, trust_role_ids)
 4049 
 4050         # Get a trust-scoped token
 4051         auth_data = self.build_authentication_request(
 4052             user_id=self.trustee_user['id'],
 4053             password=self.trustee_user['password'],
 4054             trust_id=trust['id']
 4055         )
 4056         resp = self.post('/auth/tokens', body=auth_data)
 4057         trust_token_role_ids = [r['id'] for r in resp.json['token']['roles']]
 4058         # Compare requested roles with roles given in token data
 4059         self.assertEqual(sorted(role_ids), sorted(trust_token_role_ids))
 4060 
 4061     def test_redelegate_with_role_by_name(self):
 4062         # For role by name testing
 4063         ref = unit.new_trust_ref(
 4064             trustor_user_id=self.user_id,
 4065             trustee_user_id=self.trustee_user['id'],
 4066             project_id=self.project_id,
 4067             impersonation=True,
 4068             expires=dict(minutes=1),
 4069             role_names=[self.role['name']],
 4070             allow_redelegation=True)
 4071         ref['expires_at'] = datetime.datetime.utcnow().replace(
 4072             year=2032).strftime(unit.TIME_FORMAT)
 4073         r = self.post('/OS-TRUST/trusts',
 4074                       body={'trust': ref})
 4075         trust = self.assertValidTrustResponse(r)
 4076         # Ensure we can get a token with this trust
 4077         trust_token = self._get_trust_token(trust)
 4078         # Chain second trust with roles subset
 4079         ref = unit.new_trust_ref(
 4080             trustor_user_id=self.user_id,
 4081             trustee_user_id=self.trustee_user['id'],
 4082             project_id=self.project_id,
 4083             impersonation=True,
 4084             role_names=[self.role['name']],
 4085             allow_redelegation=True)
 4086         ref['expires_at'] = datetime.datetime.utcnow().replace(
 4087             year=2028).strftime(unit.TIME_FORMAT)
 4088         r = self.post('/OS-TRUST/trusts',
 4089                       body={'trust': ref},
 4090                       token=trust_token)
 4091         trust = self.assertValidTrustResponse(r)
 4092         # Ensure we can get a token with this trust
 4093         self._get_trust_token(trust)
 4094 
 4095     def test_redelegate_new_role_fails(self):
 4096         r = self.post('/OS-TRUST/trusts',
 4097                       body={'trust': self.redelegated_trust_ref})
 4098         trust = self.assertValidTrustResponse(r)
 4099         trust_token = self._get_trust_token(trust)
 4100 
 4101         # Build second trust with a role not in parent's roles
 4102         role = unit.new_role_ref()
 4103         PROVIDERS.role_api.create_role(role['id'], role)
 4104         # assign a new role to the user
 4105         PROVIDERS.assignment_api.create_grant(
 4106             role_id=role['id'], user_id=self.user_id,
 4107             project_id=self.project_id
 4108         )
 4109 
 4110         # Try to chain a trust with the role not from parent trust
 4111         self.chained_trust_ref['roles'] = [{'id': role['id']}]
 4112 
 4113         # Bypass policy enforcement
 4114         with mock.patch.object(policy, 'enforce', return_value=True):
 4115             self.post('/OS-TRUST/trusts',
 4116                       body={'trust': self.chained_trust_ref},
 4117                       token=trust_token,
 4118                       expected_status=http.client.FORBIDDEN)
 4119 
 4120     def test_redelegation_terminator(self):
 4121         self.redelegated_trust_ref['expires_at'] = (
 4122             datetime.datetime.utcnow().replace(year=2032).strftime(
 4123                 unit.TIME_FORMAT))
 4124         r = self.post('/OS-TRUST/trusts',
 4125                       body={'trust': self.redelegated_trust_ref})
 4126         trust = self.assertValidTrustResponse(r)
 4127         trust_token = self._get_trust_token(trust)
 4128 
 4129         # Build second trust - the terminator
 4130         self.chained_trust_ref['expires_at'] = (
 4131             datetime.datetime.utcnow().replace(year=2028).strftime(
 4132                 unit.TIME_FORMAT))
 4133         ref = dict(self.chained_trust_ref,
 4134                    redelegation_count=1,
 4135                    allow_redelegation=False)
 4136 
 4137         r = self.post('/OS-TRUST/trusts',
 4138                       body={'trust': ref},
 4139                       token=trust_token)
 4140 
 4141         trust = self.assertValidTrustResponse(r)
 4142         # Check that allow_redelegation == False caused redelegation_count
 4143         # to be set to 0, while allow_redelegation is removed
 4144         self.assertNotIn('allow_redelegation', trust)
 4145         self.assertEqual(0, trust['redelegation_count'])
 4146         trust_token = self._get_trust_token(trust)
 4147 
 4148         # Build third trust, same as second
 4149         self.post('/OS-TRUST/trusts',
 4150                   body={'trust': ref},
 4151                   token=trust_token,
 4152                   expected_status=http.client.FORBIDDEN)
 4153 
 4154     def test_redelegation_without_impersonation(self):
 4155         # Update trust to not allow impersonation
 4156         self.redelegated_trust_ref['impersonation'] = False
 4157 
 4158         # Create trust
 4159         resp = self.post('/OS-TRUST/trusts',
 4160                          body={'trust': self.redelegated_trust_ref},
 4161                          expected_status=http.client.CREATED)
 4162         trust = self.assertValidTrustResponse(resp)
 4163 
 4164         # Get trusted token without impersonation
 4165         auth_data = self.build_authentication_request(
 4166             user_id=self.trustee_user['id'],
 4167             password=self.trustee_user['password'],
 4168             trust_id=trust['id'])
 4169         trust_token = self.get_requested_token(auth_data)
 4170 
 4171         # Create second user for redelegation
 4172         trustee_user_2 = unit.create_user(PROVIDERS.identity_api,
 4173                                           domain_id=self.domain_id)
 4174 
 4175         # Trust for redelegation
 4176         trust_ref_2 = unit.new_trust_ref(
 4177             trustor_user_id=self.trustee_user['id'],
 4178             trustee_user_id=trustee_user_2['id'],
 4179             project_id=self.project_id,
 4180             impersonation=False,
 4181             expires=dict(minutes=1),
 4182             role_ids=[self.role_id],
 4183             allow_redelegation=False)
 4184 
 4185         # Creating a second trust should not be allowed since trustor does not
 4186         # have the role to delegate thus returning 404 NOT FOUND.
 4187         resp = self.post('/OS-TRUST/trusts',
 4188                          body={'trust': trust_ref_2},
 4189                          token=trust_token,
 4190                          expected_status=http.client.NOT_FOUND)
 4191 
 4192     def test_create_unscoped_trust(self):
 4193         ref = unit.new_trust_ref(
 4194             trustor_user_id=self.user_id,
 4195             trustee_user_id=self.trustee_user['id'])
 4196         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4197         self.assertValidTrustResponse(r, ref)
 4198 
 4199     def test_create_trust_no_roles(self):
 4200         ref = unit.new_trust_ref(
 4201             trustor_user_id=self.user_id,
 4202             trustee_user_id=self.trustee_user['id'],
 4203             project_id=self.project_id)
 4204         self.post('/OS-TRUST/trusts', body={'trust': ref},
 4205                   expected_status=http.client.FORBIDDEN)
 4206 
 4207     def _initialize_test_consume_trust(self, count):
 4208         # Make sure remaining_uses is decremented as we consume the trust
 4209         ref = unit.new_trust_ref(
 4210             trustor_user_id=self.user_id,
 4211             trustee_user_id=self.trustee_user['id'],
 4212             project_id=self.project_id,
 4213             remaining_uses=count,
 4214             role_ids=[self.role_id])
 4215         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4216         # make sure the trust exists
 4217         trust = self.assertValidTrustResponse(r, ref)
 4218         r = self.get(
 4219             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4220         # get a token for the trustee
 4221         auth_data = self.build_authentication_request(
 4222             user_id=self.trustee_user['id'],
 4223             password=self.trustee_user['password'])
 4224         r = self.v3_create_token(auth_data)
 4225         token = r.headers.get('X-Subject-Token')
 4226         # get a trust token, consume one use
 4227         auth_data = self.build_authentication_request(
 4228             token=token,
 4229             trust_id=trust['id'])
 4230         r = self.v3_create_token(auth_data)
 4231         return trust
 4232 
 4233     def test_authenticate_without_trust_dict_returns_bad_request(self):
 4234         # Authenticate for a token to use in the request
 4235         token = self.v3_create_token(
 4236             self.build_authentication_request(
 4237                 user_id=self.trustee_user['id'],
 4238                 password=self.trustee_user['password']
 4239             )
 4240         ).headers.get('X-Subject-Token')
 4241 
 4242         auth_data = {
 4243             'auth': {
 4244                 'identity': {
 4245                     'methods': ['token'],
 4246                     'token': {'id': token}
 4247                 },
 4248                 # We don't need a trust to execute this test, the
 4249                 # OS-TRUST:trust key of the request body just has to be a
 4250                 # string instead of a dictionary in order to throw a 500 when
 4251                 # it should a 400 Bad Request.
 4252                 'scope': {'OS-TRUST:trust': ''}
 4253             }
 4254         }
 4255         self.admin_request(
 4256             method='POST', path='/v3/auth/tokens', body=auth_data,
 4257             expected_status=http.client.BAD_REQUEST
 4258         )
 4259 
 4260     def test_consume_trust_once(self):
 4261         trust = self._initialize_test_consume_trust(2)
 4262         # check decremented value
 4263         r = self.get(
 4264             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4265         trust = r.result.get('trust')
 4266         self.assertIsNotNone(trust)
 4267         self.assertEqual(1, trust['remaining_uses'])
 4268         self.assertEqual(self.role['name'], trust['roles'][0]['name'])
 4269         self.assertEqual(self.role['id'], trust['roles'][0]['id'])
 4270 
 4271     def test_create_one_time_use_trust(self):
 4272         trust = self._initialize_test_consume_trust(1)
 4273         # No more uses, the trust is made unavailable
 4274         self.get(
 4275             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
 4276             expected_status=http.client.NOT_FOUND)
 4277         # this time we can't get a trust token
 4278         auth_data = self.build_authentication_request(
 4279             user_id=self.trustee_user['id'],
 4280             password=self.trustee_user['password'],
 4281             trust_id=trust['id'])
 4282         self.v3_create_token(auth_data,
 4283                              expected_status=http.client.UNAUTHORIZED)
 4284 
 4285     def test_create_unlimited_use_trust(self):
 4286         # by default trusts are unlimited in terms of tokens that can be
 4287         # generated from them, this test creates such a trust explicitly
 4288         ref = unit.new_trust_ref(
 4289             trustor_user_id=self.user_id,
 4290             trustee_user_id=self.trustee_user['id'],
 4291             project_id=self.project_id,
 4292             remaining_uses=None,
 4293             role_ids=[self.role_id])
 4294         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4295         trust = self.assertValidTrustResponse(r, ref)
 4296 
 4297         r = self.get(
 4298             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4299         auth_data = self.build_authentication_request(
 4300             user_id=self.trustee_user['id'],
 4301             password=self.trustee_user['password'])
 4302         r = self.v3_create_token(auth_data)
 4303         token = r.headers.get('X-Subject-Token')
 4304         auth_data = self.build_authentication_request(
 4305             token=token,
 4306             trust_id=trust['id'])
 4307         r = self.v3_create_token(auth_data)
 4308         r = self.get(
 4309             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4310         trust = r.result.get('trust')
 4311         self.assertIsNone(trust['remaining_uses'])
 4312 
 4313     def test_impersonation_token_cannot_create_new_trust(self):
 4314         ref = unit.new_trust_ref(
 4315             trustor_user_id=self.user_id,
 4316             trustee_user_id=self.trustee_user['id'],
 4317             project_id=self.project_id,
 4318             impersonation=True,
 4319             expires=dict(minutes=1),
 4320             role_ids=[self.role_id])
 4321 
 4322         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4323         trust = self.assertValidTrustResponse(r)
 4324 
 4325         auth_data = self.build_authentication_request(
 4326             user_id=self.trustee_user['id'],
 4327             password=self.trustee_user['password'],
 4328             trust_id=trust['id'])
 4329 
 4330         trust_token = self.get_requested_token(auth_data)
 4331 
 4332         # Build second trust
 4333         ref = unit.new_trust_ref(
 4334             trustor_user_id=self.user_id,
 4335             trustee_user_id=self.trustee_user['id'],
 4336             project_id=self.project_id,
 4337             impersonation=True,
 4338             expires=dict(minutes=1),
 4339             role_ids=[self.role_id])
 4340 
 4341         self.post('/OS-TRUST/trusts',
 4342                   body={'trust': ref},
 4343                   token=trust_token,
 4344                   expected_status=http.client.FORBIDDEN)
 4345 
 4346     def test_trust_deleted_grant(self):
 4347         # create a new role
 4348         role = unit.new_role_ref()
 4349         PROVIDERS.role_api.create_role(role['id'], role)
 4350 
 4351         grant_url = (
 4352             '/projects/%(project_id)s/users/%(user_id)s/'
 4353             'roles/%(role_id)s' % {
 4354                 'project_id': self.project_id,
 4355                 'user_id': self.user_id,
 4356                 'role_id': role['id']})
 4357 
 4358         # assign a new role
 4359         self.put(grant_url)
 4360 
 4361         # create a trust that delegates the new role
 4362         ref = unit.new_trust_ref(
 4363             trustor_user_id=self.user_id,
 4364             trustee_user_id=self.trustee_user['id'],
 4365             project_id=self.project_id,
 4366             impersonation=False,
 4367             expires=dict(minutes=1),
 4368             role_ids=[role['id']])
 4369 
 4370         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4371         trust = self.assertValidTrustResponse(r)
 4372 
 4373         # delete the grant
 4374         self.delete(grant_url)
 4375 
 4376         # attempt to get a trust token with the deleted grant
 4377         # and ensure it's unauthorized
 4378         auth_data = self.build_authentication_request(
 4379             user_id=self.trustee_user['id'],
 4380             password=self.trustee_user['password'],
 4381             trust_id=trust['id'])
 4382         r = self.v3_create_token(auth_data,
 4383                                  expected_status=http.client.FORBIDDEN)
 4384 
 4385     def test_trust_chained(self):
 4386         """Test that a trust token can't be used to execute another trust.
 4387 
 4388         To do this, we create an A->B->C hierarchy of trusts, then attempt to
 4389         execute the trusts in series (C->B->A).
 4390 
 4391         """
 4392         # create a sub-trustee user
 4393         sub_trustee_user = unit.create_user(
 4394             PROVIDERS.identity_api,
 4395             domain_id=test_v3.DEFAULT_DOMAIN_ID)
 4396         sub_trustee_user_id = sub_trustee_user['id']
 4397 
 4398         # create a new role
 4399         role = unit.new_role_ref()
 4400         PROVIDERS.role_api.create_role(role['id'], role)
 4401 
 4402         # assign the new role to trustee
 4403         self.put(
 4404             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
 4405                 'project_id': self.project_id,
 4406                 'user_id': self.trustee_user['id'],
 4407                 'role_id': role['id']})
 4408 
 4409         # create a trust from trustor -> trustee
 4410         ref = unit.new_trust_ref(
 4411             trustor_user_id=self.user_id,
 4412             trustee_user_id=self.trustee_user['id'],
 4413             project_id=self.project_id,
 4414             impersonation=True,
 4415             expires=dict(minutes=1),
 4416             role_ids=[self.role_id])
 4417         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4418         trust1 = self.assertValidTrustResponse(r)
 4419 
 4420         # authenticate as trustee so we can create a second trust
 4421         auth_data = self.build_authentication_request(
 4422             user_id=self.trustee_user['id'],
 4423             password=self.trustee_user['password'],
 4424             project_id=self.project_id)
 4425         token = self.get_requested_token(auth_data)
 4426 
 4427         # create a trust from trustee -> sub-trustee
 4428         ref = unit.new_trust_ref(
 4429             trustor_user_id=self.trustee_user['id'],
 4430             trustee_user_id=sub_trustee_user_id,
 4431             project_id=self.project_id,
 4432             impersonation=True,
 4433             expires=dict(minutes=1),
 4434             role_ids=[role['id']])
 4435         r = self.post('/OS-TRUST/trusts', token=token, body={'trust': ref})
 4436         trust2 = self.assertValidTrustResponse(r)
 4437 
 4438         # authenticate as sub-trustee and get a trust token
 4439         auth_data = self.build_authentication_request(
 4440             user_id=sub_trustee_user['id'],
 4441             password=sub_trustee_user['password'],
 4442             trust_id=trust2['id'])
 4443         trust_token = self.get_requested_token(auth_data)
 4444 
 4445         # attempt to get the second trust using a trust token
 4446         auth_data = self.build_authentication_request(
 4447             token=trust_token,
 4448             trust_id=trust1['id'])
 4449         r = self.v3_create_token(auth_data,
 4450                                  expected_status=http.client.FORBIDDEN)
 4451 
 4452     def assertTrustTokensRevoked(self, trust_id):
 4453         revocation_response = self.get('/OS-REVOKE/events')
 4454         revocation_events = revocation_response.json_body['events']
 4455         found = False
 4456         for event in revocation_events:
 4457             if event.get('OS-TRUST:trust_id') == trust_id:
 4458                 found = True
 4459         self.assertTrue(found, 'event with trust_id %s not found in list' %
 4460                         trust_id)
 4461 
 4462     def test_delete_trust_revokes_tokens(self):
 4463         ref = unit.new_trust_ref(
 4464             trustor_user_id=self.user_id,
 4465             trustee_user_id=self.trustee_user['id'],
 4466             project_id=self.project_id,
 4467             impersonation=False,
 4468             expires=dict(minutes=1),
 4469             role_ids=[self.role_id])
 4470         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4471         trust = self.assertValidTrustResponse(r)
 4472         trust_id = trust['id']
 4473         auth_data = self.build_authentication_request(
 4474             user_id=self.trustee_user['id'],
 4475             password=self.trustee_user['password'],
 4476             trust_id=trust_id)
 4477         r = self.v3_create_token(auth_data)
 4478         self.assertValidProjectScopedTokenResponse(
 4479             r, self.trustee_user)
 4480         trust_token = r.headers['X-Subject-Token']
 4481         self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
 4482             'trust_id': trust_id})
 4483         headers = {'X-Subject-Token': trust_token}
 4484         self.head('/auth/tokens', headers=headers,
 4485                   expected_status=http.client.NOT_FOUND)
 4486         self.assertTrustTokensRevoked(trust_id)
 4487 
 4488     def disable_user(self, user):
 4489         user['enabled'] = False
 4490         PROVIDERS.identity_api.update_user(user['id'], user)
 4491 
 4492     def test_trust_get_token_fails_if_trustor_disabled(self):
 4493         ref = unit.new_trust_ref(
 4494             trustor_user_id=self.user_id,
 4495             trustee_user_id=self.trustee_user['id'],
 4496             project_id=self.project_id,
 4497             impersonation=False,
 4498             expires=dict(minutes=1),
 4499             role_ids=[self.role_id])
 4500 
 4501         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4502 
 4503         trust = self.assertValidTrustResponse(r, ref)
 4504 
 4505         auth_data = self.build_authentication_request(
 4506             user_id=self.trustee_user['id'],
 4507             password=self.trustee_user['password'],
 4508             trust_id=trust['id'])
 4509         self.v3_create_token(auth_data)
 4510 
 4511         self.disable_user(self.user)
 4512 
 4513         auth_data = self.build_authentication_request(
 4514             user_id=self.trustee_user['id'],
 4515             password=self.trustee_user['password'],
 4516             trust_id=trust['id'])
 4517         self.v3_create_token(auth_data,
 4518                              expected_status=http.client.FORBIDDEN)
 4519 
 4520     def test_trust_get_token_fails_if_trustee_disabled(self):
 4521         ref = unit.new_trust_ref(
 4522             trustor_user_id=self.user_id,
 4523             trustee_user_id=self.trustee_user['id'],
 4524             project_id=self.project_id,
 4525             impersonation=False,
 4526             expires=dict(minutes=1),
 4527             role_ids=[self.role_id])
 4528 
 4529         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4530 
 4531         trust = self.assertValidTrustResponse(r, ref)
 4532 
 4533         auth_data = self.build_authentication_request(
 4534             user_id=self.trustee_user['id'],
 4535             password=self.trustee_user['password'],
 4536             trust_id=trust['id'])
 4537         self.v3_create_token(auth_data)
 4538 
 4539         self.disable_user(self.trustee_user)
 4540 
 4541         auth_data = self.build_authentication_request(
 4542             user_id=self.trustee_user['id'],
 4543             password=self.trustee_user['password'],
 4544             trust_id=trust['id'])
 4545         self.v3_create_token(auth_data,
 4546                              expected_status=http.client.UNAUTHORIZED)
 4547 
 4548     def test_delete_trust(self):
 4549         ref = unit.new_trust_ref(
 4550             trustor_user_id=self.user_id,
 4551             trustee_user_id=self.trustee_user['id'],
 4552             project_id=self.project_id,
 4553             impersonation=False,
 4554             expires=dict(minutes=1),
 4555             role_ids=[self.role_id])
 4556 
 4557         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4558 
 4559         trust = self.assertValidTrustResponse(r, ref)
 4560 
 4561         self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
 4562             'trust_id': trust['id']})
 4563 
 4564         auth_data = self.build_authentication_request(
 4565             user_id=self.trustee_user['id'],
 4566             password=self.trustee_user['password'],
 4567             trust_id=trust['id'])
 4568         self.v3_create_token(auth_data,
 4569                              expected_status=http.client.UNAUTHORIZED)
 4570 
 4571     def test_change_password_invalidates_trust_tokens(self):
 4572         ref = unit.new_trust_ref(
 4573             trustor_user_id=self.user_id,
 4574             trustee_user_id=self.trustee_user['id'],
 4575             project_id=self.project_id,
 4576             impersonation=True,
 4577             expires=dict(minutes=1),
 4578             role_ids=[self.role_id])
 4579 
 4580         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4581         trust = self.assertValidTrustResponse(r)
 4582 
 4583         auth_data = self.build_authentication_request(
 4584             user_id=self.trustee_user['id'],
 4585             password=self.trustee_user['password'],
 4586             trust_id=trust['id'])
 4587         r = self.v3_create_token(auth_data)
 4588 
 4589         self.assertValidProjectScopedTokenResponse(r, self.user)
 4590         trust_token = r.headers.get('X-Subject-Token')
 4591 
 4592         self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
 4593                  self.user_id, token=trust_token)
 4594 
 4595         self.assertValidUserResponse(
 4596             self.patch('/users/%s' % self.trustee_user['id'],
 4597                        body={'user': {'password': uuid.uuid4().hex}}))
 4598 
 4599         self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
 4600                  self.user_id, expected_status=http.client.UNAUTHORIZED,
 4601                  token=trust_token)
 4602 
 4603     def test_trustee_can_do_role_ops(self):
 4604         resp = self.post('/OS-TRUST/trusts',
 4605                          body={'trust': self.redelegated_trust_ref})
 4606         trust = self.assertValidTrustResponse(resp)
 4607         trust_token = self._get_trust_token(trust)
 4608 
 4609         resp = self.get(
 4610             '/OS-TRUST/trusts/%(trust_id)s/roles' % {
 4611                 'trust_id': trust['id']},
 4612             token=trust_token)
 4613         self.assertValidRoleListResponse(resp, self.role)
 4614 
 4615         self.head(
 4616             '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
 4617                 'trust_id': trust['id'],
 4618                 'role_id': self.role['id']},
 4619             token=trust_token,
 4620             expected_status=http.client.OK)
 4621 
 4622         resp = self.get(
 4623             '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
 4624                 'trust_id': trust['id'],
 4625                 'role_id': self.role['id']},
 4626             token=trust_token)
 4627         self.assertValidRoleResponse(resp, self.role)
 4628 
 4629     def test_do_not_consume_remaining_uses_when_get_token_fails(self):
 4630         ref = unit.new_trust_ref(
 4631             trustor_user_id=self.user_id,
 4632             trustee_user_id=self.trustee_user['id'],
 4633             project_id=self.project_id,
 4634             impersonation=False,
 4635             expires=dict(minutes=1),
 4636             role_ids=[self.role_id],
 4637             remaining_uses=3)
 4638         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4639 
 4640         new_trust = r.result.get('trust')
 4641         trust_id = new_trust.get('id')
 4642         # Pass in another user's ID as the trustee, the result being a failed
 4643         # token authenticate and the remaining_uses of the trust should not be
 4644         # decremented.
 4645         auth_data = self.build_authentication_request(
 4646             user_id=self.default_domain_user['id'],
 4647             password=self.default_domain_user['password'],
 4648             trust_id=trust_id)
 4649         self.v3_create_token(auth_data,
 4650                              expected_status=http.client.FORBIDDEN)
 4651 
 4652         r = self.get('/OS-TRUST/trusts/%s' % trust_id)
 4653         self.assertEqual(3, r.result.get('trust').get('remaining_uses'))
 4654 
 4655 
 4656 class TestTrustChain(test_v3.RestfulTestCase):
 4657 
 4658     def config_overrides(self):
 4659         super(TestTrustChain, self).config_overrides()
 4660         self.config_fixture.config(
 4661             group='trust',
 4662             allow_redelegation=True,
 4663             max_redelegation_count=10
 4664         )
 4665 
 4666     def setUp(self):
 4667         super(TestTrustChain, self).setUp()
 4668         """Create a trust chain using redelegation.
 4669 
 4670         A trust chain is a series of trusts that are redelegated. For example,
 4671         self.user_list consists of userA, userB, and userC. The first trust in
 4672         the trust chain is going to be established between self.user and userA,
 4673         call it trustA. Then, userA is going to obtain a trust scoped token
 4674         using trustA, and with that token create a trust between userA and
 4675         userB called trustB. This pattern will continue with userB creating a
 4676         trust with userC.
 4677         So the trust chain should look something like:
 4678             trustA -> trustB -> trustC
 4679         Where:
 4680             self.user is trusting userA with trustA
 4681             userA is trusting userB with trustB
 4682             userB is trusting userC with trustC
 4683 
 4684         """
 4685         self.user_list = list()
 4686         self.trust_chain = list()
 4687         for _ in range(3):
 4688             user = unit.create_user(PROVIDERS.identity_api,
 4689                                     domain_id=self.domain_id)
 4690             self.user_list.append(user)
 4691 
 4692         # trustor->trustee redelegation with impersonation
 4693         trustee = self.user_list[0]
 4694         trust_ref = unit.new_trust_ref(
 4695             trustor_user_id=self.user_id,
 4696             trustee_user_id=trustee['id'],