"Fossies" - the Fresh Open Source Software Archive

Member "keystone-16.0.2/keystone/tests/unit/test_v3_auth.py" (7 Jun 2021, 242677 Bytes) of package /linux/misc/openstack/keystone-16.0.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_v3_auth.py": 16.0.1_vs_16.0.2.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import copy
   16 import datetime
   17 import fixtures
   18 import itertools
   19 import operator
   20 import re
   21 import uuid
   22 
   23 import freezegun
   24 import mock
   25 from oslo_serialization import jsonutils as json
   26 from oslo_utils import fixture
   27 from oslo_utils import timeutils
   28 import six
   29 from six.moves import http_client
   30 from six.moves import range
   31 from testtools import matchers
   32 from testtools import testcase
   33 
   34 from keystone import auth
   35 from keystone.auth.plugins import totp
   36 from keystone.common import authorization
   37 from keystone.common import provider_api
   38 from keystone.common.rbac_enforcer import policy
   39 from keystone.common import utils
   40 import keystone.conf
   41 from keystone.credential.providers import fernet as credential_fernet
   42 from keystone import exception
   43 from keystone.identity.backends import resource_options as ro
   44 from keystone.tests.common import auth as common_auth
   45 from keystone.tests import unit
   46 from keystone.tests.unit import ksfixtures
   47 from keystone.tests.unit import test_v3
   48 
   49 
   50 CONF = keystone.conf.CONF
   51 PROVIDERS = provider_api.ProviderAPIs
   52 
   53 
   54 class TestMFARules(test_v3.RestfulTestCase):
   55     def config_overrides(self):
   56         super(TestMFARules, self).config_overrides()
   57 
   58         self.useFixture(
   59             ksfixtures.KeyRepository(
   60                 self.config_fixture,
   61                 'fernet_tokens',
   62                 CONF.fernet_tokens.max_active_keys
   63             )
   64         )
   65 
   66         self.useFixture(
   67             ksfixtures.KeyRepository(
   68                 self.config_fixture,
   69                 'credential',
   70                 credential_fernet.MAX_ACTIVE_KEYS
   71             )
   72         )
   73 
   74     def assertValidErrorResponse(self, r):
   75         resp = r.result
   76         if r.headers.get(authorization.AUTH_RECEIPT_HEADER):
   77             self.assertIsNotNone(resp.get('receipt'))
   78             self.assertIsNotNone(resp.get('receipt').get('methods'))
   79         else:
   80             self.assertIsNotNone(resp.get('error'))
   81             self.assertIsNotNone(resp['error'].get('code'))
   82             self.assertIsNotNone(resp['error'].get('title'))
   83             self.assertIsNotNone(resp['error'].get('message'))
   84             self.assertEqual(int(resp['error']['code']), r.status_code)
   85 
   86     def _create_totp_cred(self):
   87         totp_cred = unit.new_totp_credential(self.user_id, self.project_id)
   88         PROVIDERS.credential_api.create_credential(uuid.uuid4().hex, totp_cred)
   89 
   90         def cleanup(testcase):
   91             totp_creds = testcase.credential_api.list_credentials_for_user(
   92                 testcase.user['id'], type='totp')
   93 
   94             for cred in totp_creds:
   95                 testcase.credential_api.delete_credential(cred['id'])
   96 
   97         self.addCleanup(cleanup, testcase=self)
   98         return totp_cred
   99 
  100     def auth_plugin_config_override(self, methods=None, **method_classes):
  101         methods = ['totp', 'token', 'password']
  102         super(TestMFARules, self).auth_plugin_config_override(methods)
  103 
  104     def _update_user_with_MFA_rules(self, rule_list, rules_enabled=True):
  105         user = self.user.copy()
  106         # Do not update password
  107         user.pop('password')
  108         user['options'][ro.MFA_RULES_OPT.option_name] = rule_list
  109         user['options'][ro.MFA_ENABLED_OPT.option_name] = rules_enabled
  110         PROVIDERS.identity_api.update_user(user['id'], user)
  111 
  112     def test_MFA_single_method_rules_requirements_met_succeeds(self):
  113         # ensure that a simple password works if a password-only rules exists
  114         rule_list = [['password'], ['password', 'totp']]
  115         self._update_user_with_MFA_rules(rule_list=rule_list)
  116         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  117         # issues with revocation events that occur at the same time as the
  118         # token issuance. This is a bug with the limited resolution that
  119         # tokens and revocation events have.
  120         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  121         with freezegun.freeze_time(time):
  122             self.v3_create_token(
  123                 self.build_authentication_request(
  124                     user_id=self.user_id,
  125                     password=self.user['password'],
  126                     user_domain_id=self.domain_id,
  127                     project_id=self.project_id))
  128 
  129     def test_MFA_multi_method_rules_requirements_met_succeeds(self):
  130         # validate that multiple auth-methods function if all are specified
  131         # and the rules requires it
  132         rule_list = [['password', 'totp']]
  133         totp_cred = self._create_totp_cred()
  134         self._update_user_with_MFA_rules(rule_list=rule_list)
  135         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  136         # issues with revocation events that occur at the same time as the
  137         # token issuance. This is a bug with the limited resolution that
  138         # tokens and revocation events have.
  139         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  140         with freezegun.freeze_time(time):
  141             auth_req = self.build_authentication_request(
  142                 user_id=self.user_id,
  143                 password=self.user['password'],
  144                 user_domain_id=self.domain_id,
  145                 passcode=totp._generate_totp_passcodes(totp_cred['blob'])[0])
  146             self.v3_create_token(auth_req)
  147 
  148     def test_MFA_single_method_rules_requirements_not_met_fails(self):
  149         # if a rule matching a single auth type is specified and is not matched
  150         # the result should be unauthorized
  151         rule_list = [['totp']]
  152         self._update_user_with_MFA_rules(rule_list=rule_list)
  153         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  154         # issues with revocation events that occur at the same time as the
  155         # token issuance. This is a bug with the limited resolution that
  156         # tokens and revocation events have.
  157         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  158         with freezegun.freeze_time(time):
  159             self.v3_create_token(
  160                 self.build_authentication_request(
  161                     user_id=self.user_id,
  162                     password=self.user['password'],
  163                     user_domain_id=self.domain_id,
  164                     project_id=self.project_id),
  165                 expected_status=http_client.UNAUTHORIZED)
  166 
  167     def test_MFA_multi_method_rules_requirements_not_met_fails(self):
  168         # if multiple rules are specified and only one is passed,
  169         # unauthorized is expected
  170         rule_list = [['password', 'totp']]
  171         self._update_user_with_MFA_rules(rule_list=rule_list)
  172         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  173         # issues with revocation events that occur at the same time as the
  174         # token issuance. This is a bug with the limited resolution that
  175         # tokens and revocation events have.
  176         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  177         with freezegun.freeze_time(time):
  178             self.v3_create_token(
  179                 self.build_authentication_request(
  180                     user_id=self.user_id,
  181                     password=self.user['password'],
  182                     user_domain_id=self.domain_id,
  183                     project_id=self.project_id),
  184                 expected_status=http_client.UNAUTHORIZED)
  185 
  186     def test_MFA_rules_bogus_non_existing_auth_method_succeeds(self):
  187         # Bogus auth methods are thrown out from rules.
  188         rule_list = [['password'], ['BoGusAuThMeTh0dHandl3r']]
  189         self._update_user_with_MFA_rules(rule_list=rule_list)
  190         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  191         # issues with revocation events that occur at the same time as the
  192         # token issuance. This is a bug with the limited resolution that
  193         # tokens and revocation events have.
  194         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  195         with freezegun.freeze_time(time):
  196             self.v3_create_token(
  197                 self.build_authentication_request(
  198                     user_id=self.user_id,
  199                     password=self.user['password'],
  200                     user_domain_id=self.domain_id,
  201                     project_id=self.project_id))
  202 
  203     def test_MFA_rules_disabled_MFA_succeeeds(self):
  204         # ensure that if MFA is "disableD" authentication succeeds, even if
  205         # not enough auth methods are specified
  206         rule_list = [['password', 'totp']]
  207         self._update_user_with_MFA_rules(rule_list=rule_list,
  208                                          rules_enabled=False)
  209         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  210         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  211         # issues with revocation events that occur at the same time as the
  212         # token issuance. This is a bug with the limited resolution that
  213         # tokens and revocation events have.
  214         with freezegun.freeze_time(time):
  215             self.v3_create_token(
  216                 self.build_authentication_request(
  217                     user_id=self.user_id,
  218                     password=self.user['password'],
  219                     user_domain_id=self.domain_id,
  220                     project_id=self.project_id))
  221 
  222     def test_MFA_rules_all_bogus_rules_results_in_default_behavior(self):
  223         # if all the rules are bogus, the result is the same as the default
  224         # behavior, any single password method is sufficient
  225         rule_list = [[uuid.uuid4().hex, uuid.uuid4().hex],
  226                      ['BoGus'],
  227                      ['NonExistantMethod']]
  228         self._update_user_with_MFA_rules(rule_list=rule_list)
  229         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  230         # issues with revocation events that occur at the same time as the
  231         # token issuance. This is a bug with the limited resolution that
  232         # tokens and revocation events have.
  233         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  234         with freezegun.freeze_time(time):
  235             self.v3_create_token(
  236                 self.build_authentication_request(
  237                     user_id=self.user_id,
  238                     password=self.user['password'],
  239                     user_domain_id=self.domain_id,
  240                     project_id=self.project_id))
  241 
  242     def test_MFA_rules_rescope_works_without_token_method_in_rules(self):
  243         rule_list = [['password', 'totp']]
  244         totp_cred = self._create_totp_cred()
  245         self._update_user_with_MFA_rules(rule_list=rule_list)
  246         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  247         # issues with revocation events that occur at the same time as the
  248         # token issuance. This is a bug with the limited resolution that
  249         # tokens and revocation events have.
  250         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  251         with freezegun.freeze_time(time):
  252             auth_data = self.build_authentication_request(
  253                 user_id=self.user_id,
  254                 password=self.user['password'],
  255                 user_domain_id=self.domain_id,
  256                 passcode=totp._generate_totp_passcodes(totp_cred['blob'])[0])
  257             r = self.v3_create_token(auth_data)
  258             auth_data = self.build_authentication_request(
  259                 token=r.headers.get('X-Subject-Token'),
  260                 project_id=self.project_id)
  261             self.v3_create_token(auth_data)
  262 
  263     def test_MFA_requirements_makes_correct_receipt_for_password(self):
  264         # if multiple rules are specified and only one is passed,
  265         # unauthorized is expected
  266         rule_list = [['password', 'totp']]
  267         self._update_user_with_MFA_rules(rule_list=rule_list)
  268         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  269         # issues with revocation events that occur at the same time as the
  270         # token issuance. This is a bug with the limited resolution that
  271         # tokens and revocation events have.
  272         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  273         with freezegun.freeze_time(time):
  274             response = self.admin_request(
  275                 method='POST',
  276                 path='/v3/auth/tokens',
  277                 body=self.build_authentication_request(
  278                     user_id=self.user_id,
  279                     password=self.user['password'],
  280                     user_domain_id=self.domain_id,
  281                     project_id=self.project_id),
  282                 expected_status=http_client.UNAUTHORIZED)
  283 
  284         self.assertIsNotNone(
  285             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  286         resp_data = response.result
  287         # NOTE(adriant): We convert to sets to avoid any potential sorting
  288         # related failures since order isn't important, just content.
  289         self.assertEqual(
  290             {'password'}, set(resp_data.get('receipt').get('methods')))
  291         self.assertEqual(
  292             set(frozenset(r) for r in rule_list),
  293             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  294 
  295     def test_MFA_requirements_makes_correct_receipt_for_totp(self):
  296         # if multiple rules are specified and only one is passed,
  297         # unauthorized is expected
  298         totp_cred = self._create_totp_cred()
  299         rule_list = [['password', 'totp']]
  300         self._update_user_with_MFA_rules(rule_list=rule_list)
  301         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  302         # issues with revocation events that occur at the same time as the
  303         # token issuance. This is a bug with the limited resolution that
  304         # tokens and revocation events have.
  305         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  306         with freezegun.freeze_time(time):
  307             response = self.admin_request(
  308                 method='POST',
  309                 path='/v3/auth/tokens',
  310                 body=self.build_authentication_request(
  311                     user_id=self.user_id,
  312                     user_domain_id=self.domain_id,
  313                     project_id=self.project_id,
  314                     passcode=totp._generate_totp_passcodes(
  315                         totp_cred['blob'])[0]),
  316                 expected_status=http_client.UNAUTHORIZED)
  317 
  318         self.assertIsNotNone(
  319             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  320         resp_data = response.result
  321         # NOTE(adriant): We convert to sets to avoid any potential sorting
  322         # related failures since order isn't important, just content.
  323         self.assertEqual(
  324             {'totp'}, set(resp_data.get('receipt').get('methods')))
  325         self.assertEqual(
  326             set(frozenset(r) for r in rule_list),
  327             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  328 
  329     def test_MFA_requirements_makes_correct_receipt_for_pass_and_totp(self):
  330         # if multiple rules are specified and only one is passed,
  331         # unauthorized is expected
  332         totp_cred = self._create_totp_cred()
  333         rule_list = [['password', 'totp', 'token']]
  334         self._update_user_with_MFA_rules(rule_list=rule_list)
  335         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  336         # issues with revocation events that occur at the same time as the
  337         # token issuance. This is a bug with the limited resolution that
  338         # tokens and revocation events have.
  339         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  340         with freezegun.freeze_time(time):
  341             response = self.admin_request(
  342                 method='POST',
  343                 path='/v3/auth/tokens',
  344                 body=self.build_authentication_request(
  345                     user_id=self.user_id,
  346                     password=self.user['password'],
  347                     user_domain_id=self.domain_id,
  348                     project_id=self.project_id,
  349                     passcode=totp._generate_totp_passcodes(
  350                         totp_cred['blob'])[0]),
  351                 expected_status=http_client.UNAUTHORIZED)
  352 
  353         self.assertIsNotNone(
  354             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  355         resp_data = response.result
  356         # NOTE(adriant): We convert to sets to avoid any potential sorting
  357         # related failures since order isn't important, just content.
  358         self.assertEqual(
  359             {'password', 'totp'}, set(resp_data.get('receipt').get('methods')))
  360         self.assertEqual(
  361             set(frozenset(r) for r in rule_list),
  362             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  363 
  364     def test_MFA_requirements_returns_correct_required_auth_methods(self):
  365         # if multiple rules are specified and only one is passed,
  366         # unauthorized is expected
  367         rule_list = [
  368             ['password', 'totp', 'token'],
  369             ['password', 'totp'],
  370             ['token', 'totp'],
  371             ['BoGusAuThMeTh0dHandl3r']
  372         ]
  373         expect_rule_list = rule_list = [
  374             ['password', 'totp', 'token'],
  375             ['password', 'totp'],
  376         ]
  377         self._update_user_with_MFA_rules(rule_list=rule_list)
  378         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  379         # issues with revocation events that occur at the same time as the
  380         # token issuance. This is a bug with the limited resolution that
  381         # tokens and revocation events have.
  382         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  383         with freezegun.freeze_time(time):
  384             response = self.admin_request(
  385                 method='POST',
  386                 path='/v3/auth/tokens',
  387                 body=self.build_authentication_request(
  388                     user_id=self.user_id,
  389                     password=self.user['password'],
  390                     user_domain_id=self.domain_id,
  391                     project_id=self.project_id),
  392                 expected_status=http_client.UNAUTHORIZED)
  393 
  394         self.assertIsNotNone(
  395             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  396         resp_data = response.result
  397         # NOTE(adriant): We convert to sets to avoid any potential sorting
  398         # related failures since order isn't important, just content.
  399         self.assertEqual(
  400             {'password'}, set(resp_data.get('receipt').get('methods')))
  401         self.assertEqual(
  402             set(frozenset(r) for r in expect_rule_list),
  403             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  404 
  405     def test_MFA_consuming_receipt_with_totp(self):
  406         # if multiple rules are specified and only one is passed,
  407         # unauthorized is expected
  408         totp_cred = self._create_totp_cred()
  409         rule_list = [['password', 'totp']]
  410         self._update_user_with_MFA_rules(rule_list=rule_list)
  411         # NOTE(notmorgan): Step forward in time to ensure we're not causing
  412         # issues with revocation events that occur at the same time as the
  413         # token issuance. This is a bug with the limited resolution that
  414         # tokens and revocation events have.
  415         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  416         with freezegun.freeze_time(time):
  417             response = self.admin_request(
  418                 method='POST',
  419                 path='/v3/auth/tokens',
  420                 body=self.build_authentication_request(
  421                     user_id=self.user_id,
  422                     password=self.user['password'],
  423                     user_domain_id=self.domain_id,
  424                     project_id=self.project_id),
  425                 expected_status=http_client.UNAUTHORIZED)
  426 
  427         self.assertIsNotNone(
  428             response.headers.get(authorization.AUTH_RECEIPT_HEADER))
  429         receipt = response.headers.get(authorization.AUTH_RECEIPT_HEADER)
  430         resp_data = response.result
  431         # NOTE(adriant): We convert to sets to avoid any potential sorting
  432         # related failures since order isn't important, just content.
  433         self.assertEqual(
  434             {'password'}, set(resp_data.get('receipt').get('methods')))
  435         self.assertEqual(
  436             set(frozenset(r) for r in rule_list),
  437             set(frozenset(r) for r in resp_data.get('required_auth_methods')))
  438 
  439         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  440         with freezegun.freeze_time(time):
  441             response = self.admin_request(
  442                 method='POST',
  443                 path='/v3/auth/tokens',
  444                 headers={authorization.AUTH_RECEIPT_HEADER: receipt},
  445                 body=self.build_authentication_request(
  446                     user_id=self.user_id,
  447                     user_domain_id=self.domain_id,
  448                     project_id=self.project_id,
  449                     passcode=totp._generate_totp_passcodes(
  450                         totp_cred['blob'])[0]))
  451 
  452     def test_MFA_consuming_receipt_not_found(self):
  453         time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
  454         with freezegun.freeze_time(time):
  455             response = self.admin_request(
  456                 method='POST',
  457                 path='/v3/auth/tokens',
  458                 headers={authorization.AUTH_RECEIPT_HEADER: "bogus-receipt"},
  459                 body=self.build_authentication_request(
  460                     user_id=self.user_id,
  461                     user_domain_id=self.domain_id,
  462                     project_id=self.project_id),
  463                 expected_status=http_client.UNAUTHORIZED)
  464         self.assertEqual(401, response.result['error']['code'])
  465 
  466 
  467 class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
  468     def setUp(self):
  469         super(TestAuthInfo, self).setUp()
  470         auth.core.load_auth_methods()
  471 
  472     def test_unsupported_auth_method(self):
  473         auth_data = {'methods': ['abc']}
  474         auth_data['abc'] = {'test': 'test'}
  475         auth_data = {'identity': auth_data}
  476         self.assertRaises(exception.AuthMethodNotSupported,
  477                           auth.core.AuthInfo.create,
  478                           auth_data)
  479 
  480     def test_missing_auth_method_data(self):
  481         auth_data = {'methods': ['password']}
  482         auth_data = {'identity': auth_data}
  483         self.assertRaises(exception.ValidationError,
  484                           auth.core.AuthInfo.create,
  485                           auth_data)
  486 
  487     def test_project_name_no_domain(self):
  488         auth_data = self.build_authentication_request(
  489             username='test',
  490             password='test',
  491             project_name='abc')['auth']
  492         self.assertRaises(exception.ValidationError,
  493                           auth.core.AuthInfo.create,
  494                           auth_data)
  495 
  496     def test_both_project_and_domain_in_scope(self):
  497         auth_data = self.build_authentication_request(
  498             user_id='test',
  499             password='test',
  500             project_name='test',
  501             domain_name='test')['auth']
  502         self.assertRaises(exception.ValidationError,
  503                           auth.core.AuthInfo.create,
  504                           auth_data)
  505 
  506     def test_get_method_names_duplicates(self):
  507         auth_data = self.build_authentication_request(
  508             token='test',
  509             user_id='test',
  510             password='test')['auth']
  511         auth_data['identity']['methods'] = ['password', 'token',
  512                                             'password', 'password']
  513         auth_info = auth.core.AuthInfo.create(auth_data)
  514         self.assertEqual(['password', 'token'],
  515                          auth_info.get_method_names())
  516 
  517     def test_get_method_data_invalid_method(self):
  518         auth_data = self.build_authentication_request(
  519             user_id='test',
  520             password='test')['auth']
  521         auth_info = auth.core.AuthInfo.create(auth_data)
  522 
  523         method_name = uuid.uuid4().hex
  524         self.assertRaises(exception.ValidationError,
  525                           auth_info.get_method_data,
  526                           method_name)
  527 
  528 
  529 class TokenAPITests(object):
  530     # Why is this not just setUp? Because TokenAPITests is not a test class
  531     # itself. If TokenAPITests became a subclass of the testcase, it would get
  532     # called by the enumerate-tests-in-file code. The way the functions get
  533     # resolved in Python for multiple inheritance means that a setUp in this
  534     # would get skipped by the testrunner.
  535     def doSetUp(self):
  536         r = self.v3_create_token(self.build_authentication_request(
  537             username=self.user['name'],
  538             user_domain_id=self.domain_id,
  539             password=self.user['password']))
  540         self.v3_token_data = r.result
  541         self.v3_token = r.headers.get('X-Subject-Token')
  542         self.headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')}
  543 
  544     def _get_unscoped_token(self):
  545         auth_data = self.build_authentication_request(
  546             user_id=self.user['id'],
  547             password=self.user['password'])
  548         r = self.post('/auth/tokens', body=auth_data)
  549         self.assertValidUnscopedTokenResponse(r)
  550         return r.headers.get('X-Subject-Token')
  551 
  552     def _get_domain_scoped_token(self):
  553         auth_data = self.build_authentication_request(
  554             user_id=self.user['id'],
  555             password=self.user['password'],
  556             domain_id=self.domain_id)
  557         r = self.post('/auth/tokens', body=auth_data)
  558         self.assertValidDomainScopedTokenResponse(r)
  559         return r.headers.get('X-Subject-Token')
  560 
  561     def _get_project_scoped_token(self):
  562         auth_data = self.build_authentication_request(
  563             user_id=self.user['id'],
  564             password=self.user['password'],
  565             project_id=self.project_id)
  566         r = self.post('/auth/tokens', body=auth_data)
  567         self.assertValidProjectScopedTokenResponse(r)
  568         return r.headers.get('X-Subject-Token')
  569 
  570     def _get_trust_scoped_token(self, trustee_user, trust):
  571         auth_data = self.build_authentication_request(
  572             user_id=trustee_user['id'],
  573             password=trustee_user['password'],
  574             trust_id=trust['id'])
  575         r = self.post('/auth/tokens', body=auth_data)
  576         self.assertValidProjectScopedTokenResponse(r)
  577         return r.headers.get('X-Subject-Token')
  578 
  579     def _create_trust(self, impersonation=False):
  580         # Create a trustee user
  581         trustee_user = unit.create_user(PROVIDERS.identity_api,
  582                                         domain_id=self.domain_id)
  583         ref = unit.new_trust_ref(
  584             trustor_user_id=self.user_id,
  585             trustee_user_id=trustee_user['id'],
  586             project_id=self.project_id,
  587             impersonation=impersonation,
  588             role_ids=[self.role_id])
  589 
  590         # Create a trust
  591         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
  592         trust = self.assertValidTrustResponse(r)
  593         return (trustee_user, trust)
  594 
  595     def _validate_token(self, token,
  596                         expected_status=http_client.OK, allow_expired=False):
  597         path = '/v3/auth/tokens'
  598 
  599         if allow_expired:
  600             path += '?allow_expired=1'
  601 
  602         return self.admin_request(
  603             path=path,
  604             headers={'X-Auth-Token': self.get_admin_token(),
  605                      'X-Subject-Token': token},
  606             method='GET',
  607             expected_status=expected_status
  608         )
  609 
  610     def _revoke_token(self, token, expected_status=http_client.NO_CONTENT):
  611         return self.delete(
  612             '/auth/tokens',
  613             headers={'x-subject-token': token},
  614             expected_status=expected_status)
  615 
  616     def _set_user_enabled(self, user, enabled=True):
  617         user['enabled'] = enabled
  618         PROVIDERS.identity_api.update_user(user['id'], user)
  619 
  620     def _create_project_and_set_as_default_project(self):
  621         # create a new project
  622         ref = unit.new_project_ref(domain_id=self.domain_id)
  623         r = self.post('/projects', body={'project': ref})
  624         project = self.assertValidProjectResponse(r, ref)
  625 
  626         # grant the user a role on the project
  627         self.put(
  628             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
  629                 'user_id': self.user['id'],
  630                 'project_id': project['id'],
  631                 'role_id': self.role['id']})
  632 
  633         # make the new project the user's default project
  634         body = {'user': {'default_project_id': project['id']}}
  635         r = self.patch('/users/%(user_id)s' % {
  636             'user_id': self.user['id']},
  637             body=body)
  638         self.assertValidUserResponse(r)
  639 
  640         return project
  641 
  642     def test_auth_with_token_as_different_user_fails(self):
  643         # get the token for a user. This is self.user which is different from
  644         # self.default_domain_user.
  645         token = self.get_scoped_token()
  646         # try both password and token methods with different identities and it
  647         # should fail
  648         auth_data = self.build_authentication_request(
  649             token=token,
  650             user_id=self.default_domain_user['id'],
  651             password=self.default_domain_user['password'])
  652         self.v3_create_token(auth_data,
  653                              expected_status=http_client.UNAUTHORIZED)
  654 
  655     def test_create_token_for_user_without_password_fails(self):
  656         user = unit.new_user_ref(domain_id=self.domain['id'])
  657         del user['password']  # can't have a password for this test
  658         user = PROVIDERS.identity_api.create_user(user)
  659 
  660         auth_data = self.build_authentication_request(
  661             user_id=user['id'],
  662             password='password')
  663 
  664         self.v3_create_token(auth_data,
  665                              expected_status=http_client.UNAUTHORIZED)
  666 
  667     def test_create_unscoped_token_by_authenticating_with_unscoped_token(self):
  668         auth_data = self.build_authentication_request(
  669             user_id=self.user['id'],
  670             password=self.user['password'])
  671         r = self.v3_create_token(auth_data)
  672         self.assertValidUnscopedTokenResponse(r)
  673         token_id = r.headers.get('X-Subject-Token')
  674 
  675         auth_data = self.build_authentication_request(token=token_id)
  676         r = self.v3_create_token(auth_data)
  677         self.assertValidUnscopedTokenResponse(r)
  678 
  679     def test_create_unscoped_token_with_user_id(self):
  680         auth_data = self.build_authentication_request(
  681             user_id=self.user['id'],
  682             password=self.user['password'])
  683         r = self.v3_create_token(auth_data)
  684         self.assertValidUnscopedTokenResponse(r)
  685 
  686     def test_create_unscoped_token_with_user_domain_id(self):
  687         auth_data = self.build_authentication_request(
  688             username=self.user['name'],
  689             user_domain_id=self.domain['id'],
  690             password=self.user['password'])
  691         r = self.v3_create_token(auth_data)
  692         self.assertValidUnscopedTokenResponse(r)
  693 
  694     def test_create_unscoped_token_with_user_domain_name(self):
  695         auth_data = self.build_authentication_request(
  696             username=self.user['name'],
  697             user_domain_name=self.domain['name'],
  698             password=self.user['password'])
  699         r = self.v3_create_token(auth_data)
  700         self.assertValidUnscopedTokenResponse(r)
  701 
  702     def test_validate_unscoped_token(self):
  703         unscoped_token = self._get_unscoped_token()
  704         r = self._validate_token(unscoped_token)
  705         self.assertValidUnscopedTokenResponse(r)
  706 
  707     def test_validate_expired_unscoped_token_returns_not_found(self):
  708         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
  709         # use the context manager of freezegun without sqlite issues.
  710         self.config_fixture.config(group='token',
  711                                    expiration=10)
  712         time = datetime.datetime.utcnow()
  713         with freezegun.freeze_time(time) as frozen_datetime:
  714             unscoped_token = self._get_unscoped_token()
  715             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
  716             self._validate_token(
  717                 unscoped_token,
  718                 expected_status=http_client.NOT_FOUND
  719             )
  720 
  721     def test_revoke_unscoped_token(self):
  722         unscoped_token = self._get_unscoped_token()
  723         r = self._validate_token(unscoped_token)
  724         self.assertValidUnscopedTokenResponse(r)
  725         self._revoke_token(unscoped_token)
  726         self._validate_token(unscoped_token,
  727                              expected_status=http_client.NOT_FOUND)
  728 
  729     def test_create_explicit_unscoped_token(self):
  730         self._create_project_and_set_as_default_project()
  731 
  732         # explicitly ask for an unscoped token
  733         auth_data = self.build_authentication_request(
  734             user_id=self.user['id'],
  735             password=self.user['password'],
  736             unscoped="unscoped")
  737         r = self.post('/auth/tokens', body=auth_data, noauth=True)
  738         self.assertValidUnscopedTokenResponse(r)
  739 
  740     def test_disabled_users_default_project_result_in_unscoped_token(self):
  741         # create a disabled project to work with
  742         project = self.create_new_default_project_for_user(
  743             self.user['id'], self.domain_id, enable_project=False)
  744 
  745         # assign a role to user for the new project
  746         PROVIDERS.assignment_api.add_role_to_user_and_project(
  747             self.user['id'], project['id'], self.role_id
  748         )
  749 
  750         # attempt to authenticate without requesting a project
  751         auth_data = self.build_authentication_request(
  752             user_id=self.user['id'],
  753             password=self.user['password'])
  754         r = self.v3_create_token(auth_data)
  755         self.assertValidUnscopedTokenResponse(r)
  756 
  757     def test_disabled_default_project_domain_result_in_unscoped_token(self):
  758         domain_ref = unit.new_domain_ref()
  759         r = self.post('/domains', body={'domain': domain_ref})
  760         domain = self.assertValidDomainResponse(r, domain_ref)
  761 
  762         project = self.create_new_default_project_for_user(
  763             self.user['id'], domain['id'])
  764 
  765         # assign a role to user for the new project
  766         PROVIDERS.assignment_api.add_role_to_user_and_project(
  767             self.user['id'], project['id'], self.role_id
  768         )
  769 
  770         # now disable the project domain
  771         body = {'domain': {'enabled': False}}
  772         r = self.patch('/domains/%(domain_id)s' % {'domain_id': domain['id']},
  773                        body=body)
  774         self.assertValidDomainResponse(r)
  775 
  776         # attempt to authenticate without requesting a project
  777         auth_data = self.build_authentication_request(
  778             user_id=self.user['id'],
  779             password=self.user['password'])
  780         r = self.v3_create_token(auth_data)
  781         self.assertValidUnscopedTokenResponse(r)
  782 
  783     def test_unscoped_token_is_invalid_after_disabling_user(self):
  784         unscoped_token = self._get_unscoped_token()
  785         # Make sure the token is valid
  786         r = self._validate_token(unscoped_token)
  787         self.assertValidUnscopedTokenResponse(r)
  788         # Disable the user
  789         self._set_user_enabled(self.user, enabled=False)
  790         # Ensure validating a token for a disabled user fails
  791         self._validate_token(
  792             unscoped_token,
  793             expected_status=http_client.NOT_FOUND
  794         )
  795 
  796     def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
  797         unscoped_token = self._get_unscoped_token()
  798         # Make sure the token is valid
  799         r = self._validate_token(unscoped_token)
  800         self.assertValidUnscopedTokenResponse(r)
  801         # Disable the user
  802         self._set_user_enabled(self.user, enabled=False)
  803         # Ensure validating a token for a disabled user fails
  804         self._validate_token(
  805             unscoped_token,
  806             expected_status=http_client.NOT_FOUND
  807         )
  808         # Enable the user
  809         self._set_user_enabled(self.user)
  810         # Ensure validating a token for a re-enabled user fails
  811         self._validate_token(
  812             unscoped_token,
  813             expected_status=http_client.NOT_FOUND
  814         )
  815 
  816     def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
  817         unscoped_token = self._get_unscoped_token()
  818         # Make sure the token is valid
  819         r = self._validate_token(unscoped_token)
  820         self.assertValidUnscopedTokenResponse(r)
  821         # Disable the user's domain
  822         self.domain['enabled'] = False
  823         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
  824         # Ensure validating a token for a disabled user fails
  825         self._validate_token(
  826             unscoped_token,
  827             expected_status=http_client.NOT_FOUND
  828         )
  829 
  830     def test_unscoped_token_is_invalid_after_changing_user_password(self):
  831         unscoped_token = self._get_unscoped_token()
  832         # Make sure the token is valid
  833         r = self._validate_token(unscoped_token)
  834         self.assertValidUnscopedTokenResponse(r)
  835         # Change user's password
  836         self.user['password'] = 'Password1'
  837         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  838         # Ensure updating user's password revokes existing user's tokens
  839         self._validate_token(
  840             unscoped_token,
  841             expected_status=http_client.NOT_FOUND
  842         )
  843 
  844     def test_create_system_token_with_user_id(self):
  845         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  846             'user_id': self.user['id'],
  847             'role_id': self.role_id
  848         }
  849         self.put(path=path)
  850 
  851         auth_request_body = self.build_authentication_request(
  852             user_id=self.user['id'],
  853             password=self.user['password'],
  854             system=True
  855         )
  856 
  857         response = self.v3_create_token(auth_request_body)
  858         self.assertValidSystemScopedTokenResponse(response)
  859 
  860     def test_create_system_token_with_username(self):
  861         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  862             'user_id': self.user['id'],
  863             'role_id': self.role_id
  864         }
  865         self.put(path=path)
  866 
  867         auth_request_body = self.build_authentication_request(
  868             username=self.user['name'],
  869             password=self.user['password'],
  870             user_domain_id=self.domain['id'],
  871             system=True
  872         )
  873 
  874         response = self.v3_create_token(auth_request_body)
  875         self.assertValidSystemScopedTokenResponse(response)
  876 
  877     def test_create_system_token_fails_without_system_assignment(self):
  878         auth_request_body = self.build_authentication_request(
  879             user_id=self.user['id'],
  880             password=self.user['password'],
  881             system=True
  882         )
  883         self.v3_create_token(
  884             auth_request_body,
  885             expected_status=http_client.UNAUTHORIZED
  886         )
  887 
  888     def test_system_token_is_invalid_after_disabling_user(self):
  889         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  890             'user_id': self.user['id'],
  891             'role_id': self.role_id
  892         }
  893         self.put(path=path)
  894 
  895         auth_request_body = self.build_authentication_request(
  896             username=self.user['name'],
  897             password=self.user['password'],
  898             user_domain_id=self.domain['id'],
  899             system=True
  900         )
  901 
  902         response = self.v3_create_token(auth_request_body)
  903         self.assertValidSystemScopedTokenResponse(response)
  904         token = response.headers.get('X-Subject-Token')
  905         self._validate_token(token)
  906 
  907         # NOTE(lbragstad): This would make a good test for groups, but
  908         # apparently it's not possible to disable a group.
  909         user_ref = {
  910             'user': {
  911                 'enabled': False
  912             }
  913         }
  914         self.patch(
  915             '/users/%(user_id)s' % {'user_id': self.user['id']},
  916             body=user_ref
  917         )
  918 
  919         self.admin_request(
  920             path='/v3/auth/tokens',
  921             headers={'X-Auth-Token': token,
  922                      'X-Subject-Token': token},
  923             method='GET',
  924             expected_status=http_client.UNAUTHORIZED
  925         )
  926         self.admin_request(
  927             path='/v3/auth/tokens',
  928             headers={'X-Auth-Token': token,
  929                      'X-Subject-Token': token},
  930             method='HEAD',
  931             expected_status=http_client.UNAUTHORIZED
  932         )
  933 
  934     def test_create_system_token_via_system_group_assignment(self):
  935         ref = {
  936             'group': unit.new_group_ref(
  937                 domain_id=CONF.identity.default_domain_id
  938             )
  939         }
  940 
  941         group = self.post('/groups', body=ref).json_body['group']
  942         path = '/system/groups/%(group_id)s/roles/%(role_id)s' % {
  943             'group_id': group['id'],
  944             'role_id': self.role_id
  945         }
  946         self.put(path=path)
  947 
  948         path = '/groups/%(group_id)s/users/%(user_id)s' % {
  949             'group_id': group['id'],
  950             'user_id': self.user['id']
  951         }
  952         self.put(path=path)
  953 
  954         auth_request_body = self.build_authentication_request(
  955             user_id=self.user['id'],
  956             password=self.user['password'],
  957             system=True
  958         )
  959         response = self.v3_create_token(auth_request_body)
  960         self.assertValidSystemScopedTokenResponse(response)
  961         token = response.headers.get('X-Subject-Token')
  962         self._validate_token(token)
  963 
  964     def test_revoke_system_token(self):
  965         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  966             'user_id': self.user['id'],
  967             'role_id': self.role_id
  968         }
  969         self.put(path=path)
  970 
  971         auth_request_body = self.build_authentication_request(
  972             username=self.user['name'],
  973             password=self.user['password'],
  974             user_domain_id=self.domain['id'],
  975             system=True
  976         )
  977 
  978         response = self.v3_create_token(auth_request_body)
  979         self.assertValidSystemScopedTokenResponse(response)
  980         token = response.headers.get('X-Subject-Token')
  981         self._validate_token(token)
  982         self._revoke_token(token)
  983         self._validate_token(token, expected_status=http_client.NOT_FOUND)
  984 
  985     def test_system_token_is_invalid_after_deleting_system_role(self):
  986         ref = {'role': unit.new_role_ref()}
  987         system_role = self.post('/roles', body=ref).json_body['role']
  988 
  989         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
  990             'user_id': self.user['id'],
  991             'role_id': system_role['id']
  992         }
  993         self.put(path=path)
  994 
  995         auth_request_body = self.build_authentication_request(
  996             username=self.user['name'],
  997             password=self.user['password'],
  998             user_domain_id=self.domain['id'],
  999             system=True
 1000         )
 1001 
 1002         response = self.v3_create_token(auth_request_body)
 1003         self.assertValidSystemScopedTokenResponse(response)
 1004         token = response.headers.get('X-Subject-Token')
 1005         self._validate_token(token)
 1006 
 1007         self.delete('/roles/%(role_id)s' % {'role_id': system_role['id']})
 1008         self._validate_token(token, expected_status=http_client.NOT_FOUND)
 1009 
 1010     def test_rescoping_a_system_token_for_a_project_token_fails(self):
 1011         ref = {'role': unit.new_role_ref()}
 1012         system_role = self.post('/roles', body=ref).json_body['role']
 1013 
 1014         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
 1015             'user_id': self.user['id'],
 1016             'role_id': system_role['id']
 1017         }
 1018         self.put(path=path)
 1019 
 1020         auth_request_body = self.build_authentication_request(
 1021             username=self.user['name'],
 1022             password=self.user['password'],
 1023             user_domain_id=self.domain['id'],
 1024             system=True
 1025         )
 1026         response = self.v3_create_token(auth_request_body)
 1027         self.assertValidSystemScopedTokenResponse(response)
 1028         system_token = response.headers.get('X-Subject-Token')
 1029 
 1030         auth_request_body = self.build_authentication_request(
 1031             token=system_token, project_id=self.project_id
 1032         )
 1033         self.v3_create_token(
 1034             auth_request_body, expected_status=http_client.FORBIDDEN
 1035         )
 1036 
 1037     def test_rescoping_a_system_token_for_a_domain_token_fails(self):
 1038         ref = {'role': unit.new_role_ref()}
 1039         system_role = self.post('/roles', body=ref).json_body['role']
 1040 
 1041         path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
 1042             'user_id': self.user['id'],
 1043             'role_id': system_role['id']
 1044         }
 1045         self.put(path=path)
 1046 
 1047         auth_request_body = self.build_authentication_request(
 1048             username=self.user['name'],
 1049             password=self.user['password'],
 1050             user_domain_id=self.domain['id'],
 1051             system=True
 1052         )
 1053         response = self.v3_create_token(auth_request_body)
 1054         self.assertValidSystemScopedTokenResponse(response)
 1055         system_token = response.headers.get('X-Subject-Token')
 1056 
 1057         auth_request_body = self.build_authentication_request(
 1058             token=system_token, domain_id=CONF.identity.default_domain_id
 1059         )
 1060         self.v3_create_token(
 1061             auth_request_body, expected_status=http_client.FORBIDDEN
 1062         )
 1063 
 1064     def test_create_domain_token_scoped_with_domain_id_and_user_id(self):
 1065         # grant the user a role on the domain
 1066         path = '/domains/%s/users/%s/roles/%s' % (
 1067             self.domain['id'], self.user['id'], self.role['id'])
 1068         self.put(path=path)
 1069 
 1070         auth_data = self.build_authentication_request(
 1071             user_id=self.user['id'],
 1072             password=self.user['password'],
 1073             domain_id=self.domain['id'])
 1074         r = self.v3_create_token(auth_data)
 1075         self.assertValidDomainScopedTokenResponse(r)
 1076 
 1077     def test_create_domain_token_scoped_with_domain_id_and_username(self):
 1078         # grant the user a role on the domain
 1079         path = '/domains/%s/users/%s/roles/%s' % (
 1080             self.domain['id'], self.user['id'], self.role['id'])
 1081         self.put(path=path)
 1082 
 1083         auth_data = self.build_authentication_request(
 1084             username=self.user['name'],
 1085             user_domain_id=self.domain['id'],
 1086             password=self.user['password'],
 1087             domain_id=self.domain['id'])
 1088         r = self.v3_create_token(auth_data)
 1089         self.assertValidDomainScopedTokenResponse(r)
 1090 
 1091     def test_create_domain_token_scoped_with_domain_id(self):
 1092         # grant the user a role on the domain
 1093         path = '/domains/%s/users/%s/roles/%s' % (
 1094             self.domain['id'], self.user['id'], self.role['id'])
 1095         self.put(path=path)
 1096 
 1097         auth_data = self.build_authentication_request(
 1098             username=self.user['name'],
 1099             user_domain_name=self.domain['name'],
 1100             password=self.user['password'],
 1101             domain_id=self.domain['id'])
 1102         r = self.v3_create_token(auth_data)
 1103         self.assertValidDomainScopedTokenResponse(r)
 1104 
 1105     def test_create_domain_token_scoped_with_domain_name(self):
 1106         # grant the user a role on the domain
 1107         path = '/domains/%s/users/%s/roles/%s' % (
 1108             self.domain['id'], self.user['id'], self.role['id'])
 1109         self.put(path=path)
 1110 
 1111         auth_data = self.build_authentication_request(
 1112             user_id=self.user['id'],
 1113             password=self.user['password'],
 1114             domain_name=self.domain['name'])
 1115         r = self.v3_create_token(auth_data)
 1116         self.assertValidDomainScopedTokenResponse(r)
 1117 
 1118     def test_create_domain_token_scoped_with_domain_name_and_username(self):
 1119         # grant the user a role on the domain
 1120         path = '/domains/%s/users/%s/roles/%s' % (
 1121             self.domain['id'], self.user['id'], self.role['id'])
 1122         self.put(path=path)
 1123 
 1124         auth_data = self.build_authentication_request(
 1125             username=self.user['name'],
 1126             user_domain_id=self.domain['id'],
 1127             password=self.user['password'],
 1128             domain_name=self.domain['name'])
 1129         r = self.v3_create_token(auth_data)
 1130         self.assertValidDomainScopedTokenResponse(r)
 1131 
 1132     def test_create_domain_token_with_only_domain_name_and_username(self):
 1133         # grant the user a role on the domain
 1134         path = '/domains/%s/users/%s/roles/%s' % (
 1135             self.domain['id'], self.user['id'], self.role['id'])
 1136         self.put(path=path)
 1137 
 1138         auth_data = self.build_authentication_request(
 1139             username=self.user['name'],
 1140             user_domain_name=self.domain['name'],
 1141             password=self.user['password'],
 1142             domain_name=self.domain['name'])
 1143         r = self.v3_create_token(auth_data)
 1144         self.assertValidDomainScopedTokenResponse(r)
 1145 
 1146     def test_create_domain_token_with_group_role(self):
 1147         group = unit.new_group_ref(domain_id=self.domain_id)
 1148         group = PROVIDERS.identity_api.create_group(group)
 1149 
 1150         # add user to group
 1151         PROVIDERS.identity_api.add_user_to_group(self.user['id'], group['id'])
 1152 
 1153         # grant the domain role to group
 1154         path = '/domains/%s/groups/%s/roles/%s' % (
 1155             self.domain['id'], group['id'], self.role['id'])
 1156         self.put(path=path)
 1157 
 1158         # now get a domain-scoped token
 1159         auth_data = self.build_authentication_request(
 1160             user_id=self.user['id'],
 1161             password=self.user['password'],
 1162             domain_id=self.domain['id'])
 1163         r = self.v3_create_token(auth_data)
 1164         self.assertValidDomainScopedTokenResponse(r)
 1165 
 1166     def test_create_domain_token_fails_if_domain_name_unsafe(self):
 1167         """Verify authenticate to a domain with unsafe name fails."""
 1168         # Start with url name restrictions off, so we can create the unsafe
 1169         # named domain
 1170         self.config_fixture.config(group='resource',
 1171                                    domain_name_url_safe='off')
 1172         unsafe_name = 'i am not / safe'
 1173         domain = unit.new_domain_ref(name=unsafe_name)
 1174         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1175         role_member = unit.new_role_ref()
 1176         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1177         PROVIDERS.assignment_api.create_grant(
 1178             role_member['id'],
 1179             user_id=self.user['id'],
 1180             domain_id=domain['id'])
 1181 
 1182         auth_data = self.build_authentication_request(
 1183             user_id=self.user['id'],
 1184             password=self.user['password'],
 1185             domain_name=domain['name'])
 1186 
 1187         # Since name url restriction is off, we should be able to authenticate
 1188         self.v3_create_token(auth_data)
 1189 
 1190         # Set the name url restriction to new, which should still allow us to
 1191         # authenticate
 1192         self.config_fixture.config(group='resource',
 1193                                    project_name_url_safe='new')
 1194         self.v3_create_token(auth_data)
 1195 
 1196         # Set the name url restriction to strict and we should fail to
 1197         # authenticate
 1198         self.config_fixture.config(group='resource',
 1199                                    domain_name_url_safe='strict')
 1200         self.v3_create_token(auth_data,
 1201                              expected_status=http_client.UNAUTHORIZED)
 1202 
 1203     def test_create_domain_token_without_grant_returns_unauthorized(self):
 1204         auth_data = self.build_authentication_request(
 1205             user_id=self.user['id'],
 1206             password=self.user['password'],
 1207             domain_id=self.domain['id'])
 1208         # this fails because the user does not have a role on self.domain
 1209         self.v3_create_token(auth_data,
 1210                              expected_status=http_client.UNAUTHORIZED)
 1211 
 1212     def test_validate_domain_scoped_token(self):
 1213         # Grant user access to domain
 1214         PROVIDERS.assignment_api.create_grant(
 1215             self.role['id'], user_id=self.user['id'],
 1216             domain_id=self.domain['id']
 1217         )
 1218         domain_scoped_token = self._get_domain_scoped_token()
 1219         r = self._validate_token(domain_scoped_token)
 1220         self.assertValidDomainScopedTokenResponse(r)
 1221         resp_json = json.loads(r.body)
 1222         self.assertIsNotNone(resp_json['token']['catalog'])
 1223         self.assertIsNotNone(resp_json['token']['roles'])
 1224         self.assertIsNotNone(resp_json['token']['domain'])
 1225 
 1226     def test_validate_expired_domain_scoped_token_returns_not_found(self):
 1227         # Grant user access to domain
 1228         PROVIDERS.assignment_api.create_grant(
 1229             self.role['id'], user_id=self.user['id'],
 1230             domain_id=self.domain['id']
 1231         )
 1232         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1233         # use the context manager of freezegun without sqlite issues.
 1234         self.config_fixture.config(group='token',
 1235                                    expiration=10)
 1236         time = datetime.datetime.utcnow()
 1237         with freezegun.freeze_time(time) as frozen_datetime:
 1238             domain_scoped_token = self._get_domain_scoped_token()
 1239             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1240             self._validate_token(
 1241                 domain_scoped_token,
 1242                 expected_status=http_client.NOT_FOUND
 1243             )
 1244 
 1245     def test_domain_scoped_token_is_invalid_after_disabling_user(self):
 1246         # Grant user access to domain
 1247         PROVIDERS.assignment_api.create_grant(
 1248             self.role['id'], user_id=self.user['id'],
 1249             domain_id=self.domain['id']
 1250         )
 1251         domain_scoped_token = self._get_domain_scoped_token()
 1252         # Make sure the token is valid
 1253         r = self._validate_token(domain_scoped_token)
 1254         self.assertValidDomainScopedTokenResponse(r)
 1255         # Disable user
 1256         self._set_user_enabled(self.user, enabled=False)
 1257         # Ensure validating a token for a disabled user fails
 1258         self._validate_token(
 1259             domain_scoped_token,
 1260             expected_status=http_client.NOT_FOUND
 1261         )
 1262 
 1263     def test_domain_scoped_token_is_invalid_after_deleting_grant(self):
 1264         # Grant user access to domain
 1265         PROVIDERS.assignment_api.create_grant(
 1266             self.role['id'], user_id=self.user['id'],
 1267             domain_id=self.domain['id']
 1268         )
 1269         domain_scoped_token = self._get_domain_scoped_token()
 1270         # Make sure the token is valid
 1271         r = self._validate_token(domain_scoped_token)
 1272         self.assertValidDomainScopedTokenResponse(r)
 1273         # Delete access to domain
 1274         PROVIDERS.assignment_api.delete_grant(
 1275             self.role['id'], user_id=self.user['id'],
 1276             domain_id=self.domain['id']
 1277         )
 1278         # Ensure validating a token for a disabled user fails
 1279         self._validate_token(
 1280             domain_scoped_token,
 1281             expected_status=http_client.NOT_FOUND
 1282         )
 1283 
 1284     def test_domain_scoped_token_invalid_after_disabling_domain(self):
 1285         # Grant user access to domain
 1286         PROVIDERS.assignment_api.create_grant(
 1287             self.role['id'], user_id=self.user['id'],
 1288             domain_id=self.domain['id']
 1289         )
 1290         domain_scoped_token = self._get_domain_scoped_token()
 1291         # Make sure the token is valid
 1292         r = self._validate_token(domain_scoped_token)
 1293         self.assertValidDomainScopedTokenResponse(r)
 1294         # Disable domain
 1295         self.domain['enabled'] = False
 1296         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
 1297         # Ensure validating a token for a disabled domain fails
 1298         self._validate_token(
 1299             domain_scoped_token,
 1300             expected_status=http_client.NOT_FOUND
 1301         )
 1302 
 1303     def test_create_project_scoped_token_with_project_id_and_user_id(self):
 1304         auth_data = self.build_authentication_request(
 1305             user_id=self.user['id'],
 1306             password=self.user['password'],
 1307             project_id=self.project['id'])
 1308         r = self.v3_create_token(auth_data)
 1309         self.assertValidProjectScopedTokenResponse(r)
 1310 
 1311     def test_validate_project_scoped_token(self):
 1312         project_scoped_token = self._get_project_scoped_token()
 1313         r = self._validate_token(project_scoped_token)
 1314         self.assertValidProjectScopedTokenResponse(r)
 1315 
 1316     def test_validate_expired_project_scoped_token_returns_not_found(self):
 1317         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1318         # use the context manager of freezegun without sqlite issues.
 1319         self.config_fixture.config(group='token',
 1320                                    expiration=10)
 1321         time = datetime.datetime.utcnow()
 1322         with freezegun.freeze_time(time) as frozen_datetime:
 1323             project_scoped_token = self._get_project_scoped_token()
 1324             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1325             self._validate_token(
 1326                 project_scoped_token,
 1327                 expected_status=http_client.NOT_FOUND
 1328             )
 1329 
 1330     def test_revoke_project_scoped_token(self):
 1331         project_scoped_token = self._get_project_scoped_token()
 1332         r = self._validate_token(project_scoped_token)
 1333         self.assertValidProjectScopedTokenResponse(r)
 1334         self._revoke_token(project_scoped_token)
 1335         self._validate_token(project_scoped_token,
 1336                              expected_status=http_client.NOT_FOUND)
 1337 
 1338     def test_project_scoped_token_is_scoped_to_default_project(self):
 1339         project = self._create_project_and_set_as_default_project()
 1340 
 1341         # attempt to authenticate without requesting a project
 1342         auth_data = self.build_authentication_request(
 1343             user_id=self.user['id'],
 1344             password=self.user['password'])
 1345         r = self.v3_create_token(auth_data)
 1346 
 1347         # ensure the project id in the token matches the default project id
 1348         self.assertValidProjectScopedTokenResponse(r)
 1349         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1350 
 1351     def test_project_scoped_token_no_catalog_is_scoped_to_default_project(
 1352             self):
 1353         project = self._create_project_and_set_as_default_project()
 1354 
 1355         # attempt to authenticate without requesting a project or catalog
 1356         auth_data = self.build_authentication_request(
 1357             user_id=self.user['id'],
 1358             password=self.user['password'])
 1359         r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
 1360 
 1361         # ensure the project id in the token matches the default project id
 1362         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1363         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1364 
 1365     def test_implicit_project_id_scoped_token_with_user_id_no_catalog(self):
 1366         self._create_project_and_set_as_default_project()
 1367 
 1368         # create a project scoped token that isn't scoped to the default
 1369         # project
 1370         auth_data = self.build_authentication_request(
 1371             user_id=self.user['id'],
 1372             password=self.user['password'],
 1373             project_id=self.project['id'])
 1374         r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True)
 1375 
 1376         # ensure the project id in the token matches the one we as for
 1377         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1378         self.assertEqual(self.project['id'],
 1379                          r.result['token']['project']['id'])
 1380 
 1381     def test_project_scoped_token_catalog_attributes(self):
 1382         auth_data = self.build_authentication_request(
 1383             user_id=self.user['id'],
 1384             password=self.user['password'],
 1385             project_id=self.project['id'])
 1386         r = self.v3_create_token(auth_data)
 1387 
 1388         catalog = r.result['token']['catalog']
 1389         self.assertEqual(1, len(catalog))
 1390         catalog = catalog[0]
 1391 
 1392         self.assertEqual(self.service['id'], catalog['id'])
 1393         self.assertEqual(self.service['name'], catalog['name'])
 1394         self.assertEqual(self.service['type'], catalog['type'])
 1395 
 1396         endpoint = catalog['endpoints']
 1397         self.assertEqual(1, len(endpoint))
 1398         endpoint = endpoint[0]
 1399 
 1400         self.assertEqual(self.endpoint['id'], endpoint['id'])
 1401         self.assertEqual(self.endpoint['interface'], endpoint['interface'])
 1402         self.assertEqual(self.endpoint['region_id'], endpoint['region_id'])
 1403         self.assertEqual(self.endpoint['url'], endpoint['url'])
 1404 
 1405     def test_project_scoped_token_catalog_excludes_disabled_endpoint(self):
 1406         # Create a disabled endpoint
 1407         disabled_endpoint_ref = copy.copy(self.endpoint)
 1408         disabled_endpoint_id = uuid.uuid4().hex
 1409         disabled_endpoint_ref.update({
 1410             'id': disabled_endpoint_id,
 1411             'enabled': False,
 1412             'interface': 'internal'
 1413         })
 1414         PROVIDERS.catalog_api.create_endpoint(
 1415             disabled_endpoint_id, disabled_endpoint_ref
 1416         )
 1417 
 1418         auth_data = self.build_authentication_request(
 1419             user_id=self.user['id'],
 1420             password=self.user['password'],
 1421             project_id=self.project['id'])
 1422         resp = self.v3_create_token(auth_data)
 1423 
 1424         # make sure the disabled endpoint id isn't in the list of endpoints
 1425         endpoints = resp.result['token']['catalog'][0]['endpoints']
 1426         endpoint_ids = [endpoint['id'] for endpoint in endpoints]
 1427         self.assertNotIn(disabled_endpoint_id, endpoint_ids)
 1428 
 1429     def test_project_scoped_token_catalog_excludes_disabled_service(self):
 1430         """On authenticate, get a catalog that excludes disabled services."""
 1431         # although the endpoint associated with the service is enabled, the
 1432         # service is disabled
 1433         self.assertTrue(self.endpoint['enabled'])
 1434         PROVIDERS.catalog_api.update_service(
 1435             self.endpoint['service_id'], {'enabled': False})
 1436         service = PROVIDERS.catalog_api.get_service(
 1437             self.endpoint['service_id']
 1438         )
 1439         self.assertFalse(service['enabled'])
 1440 
 1441         auth_data = self.build_authentication_request(
 1442             user_id=self.user['id'],
 1443             password=self.user['password'],
 1444             project_id=self.project['id'])
 1445         r = self.v3_create_token(auth_data)
 1446 
 1447         self.assertEqual([], r.result['token']['catalog'])
 1448 
 1449     def test_scope_to_project_without_grant_returns_unauthorized(self):
 1450         project = unit.new_project_ref(domain_id=self.domain_id)
 1451         PROVIDERS.resource_api.create_project(project['id'], project)
 1452 
 1453         auth_data = self.build_authentication_request(
 1454             user_id=self.user['id'],
 1455             password=self.user['password'],
 1456             project_id=project['id'])
 1457         self.v3_create_token(auth_data,
 1458                              expected_status=http_client.UNAUTHORIZED)
 1459 
 1460     def test_create_project_scoped_token_with_username_and_domain_id(self):
 1461         auth_data = self.build_authentication_request(
 1462             username=self.user['name'],
 1463             user_domain_id=self.domain['id'],
 1464             password=self.user['password'],
 1465             project_id=self.project['id'])
 1466         r = self.v3_create_token(auth_data)
 1467         self.assertValidProjectScopedTokenResponse(r)
 1468 
 1469     def test_create_project_scoped_token_with_username_and_domain_name(self):
 1470         auth_data = self.build_authentication_request(
 1471             username=self.user['name'],
 1472             user_domain_name=self.domain['name'],
 1473             password=self.user['password'],
 1474             project_id=self.project['id'])
 1475         r = self.v3_create_token(auth_data)
 1476         self.assertValidProjectScopedTokenResponse(r)
 1477 
 1478     def test_create_project_scoped_token_fails_if_project_name_unsafe(self):
 1479         """Verify authenticate to a project with unsafe name fails."""
 1480         # Start with url name restrictions off, so we can create the unsafe
 1481         # named project
 1482         self.config_fixture.config(group='resource',
 1483                                    project_name_url_safe='off')
 1484         unsafe_name = 'i am not / safe'
 1485         project = unit.new_project_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID,
 1486                                        name=unsafe_name)
 1487         PROVIDERS.resource_api.create_project(project['id'], project)
 1488         role_member = unit.new_role_ref()
 1489         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1490         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1491             self.user['id'], project['id'], role_member['id'])
 1492 
 1493         auth_data = self.build_authentication_request(
 1494             user_id=self.user['id'],
 1495             password=self.user['password'],
 1496             project_name=project['name'],
 1497             project_domain_id=test_v3.DEFAULT_DOMAIN_ID)
 1498 
 1499         # Since name url restriction is off, we should be able to authenticate
 1500         self.v3_create_token(auth_data)
 1501 
 1502         # Set the name url restriction to new, which should still allow us to
 1503         # authenticate
 1504         self.config_fixture.config(group='resource',
 1505                                    project_name_url_safe='new')
 1506         self.v3_create_token(auth_data)
 1507 
 1508         # Set the name url restriction to strict and we should fail to
 1509         # authenticate
 1510         self.config_fixture.config(group='resource',
 1511                                    project_name_url_safe='strict')
 1512         self.v3_create_token(auth_data,
 1513                              expected_status=http_client.UNAUTHORIZED)
 1514 
 1515     def test_create_project_scoped_token_fails_if_domain_name_unsafe(self):
 1516         """Verify authenticate to a project using unsafe domain name fails."""
 1517         # Start with url name restrictions off, so we can create the unsafe
 1518         # named domain
 1519         self.config_fixture.config(group='resource',
 1520                                    domain_name_url_safe='off')
 1521         unsafe_name = 'i am not / safe'
 1522         domain = unit.new_domain_ref(name=unsafe_name)
 1523         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1524         # Add a (safely named) project to that domain
 1525         project = unit.new_project_ref(domain_id=domain['id'])
 1526         PROVIDERS.resource_api.create_project(project['id'], project)
 1527         role_member = unit.new_role_ref()
 1528         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1529         PROVIDERS.assignment_api.create_grant(
 1530             role_member['id'],
 1531             user_id=self.user['id'],
 1532             project_id=project['id'])
 1533 
 1534         # An auth request via project ID, but specifying domain by name
 1535         auth_data = self.build_authentication_request(
 1536             user_id=self.user['id'],
 1537             password=self.user['password'],
 1538             project_name=project['name'],
 1539             project_domain_name=domain['name'])
 1540 
 1541         # Since name url restriction is off, we should be able to authenticate
 1542         self.v3_create_token(auth_data)
 1543 
 1544         # Set the name url restriction to new, which should still allow us to
 1545         # authenticate
 1546         self.config_fixture.config(group='resource',
 1547                                    project_name_url_safe='new')
 1548         self.v3_create_token(auth_data)
 1549 
 1550         # Set the name url restriction to strict and we should fail to
 1551         # authenticate
 1552         self.config_fixture.config(group='resource',
 1553                                    domain_name_url_safe='strict')
 1554         self.v3_create_token(auth_data,
 1555                              expected_status=http_client.UNAUTHORIZED)
 1556 
 1557     def test_create_project_token_with_same_domain_and_project_name(self):
 1558         """Authenticate to a project with the same name as its domain."""
 1559         domain = unit.new_project_ref(is_domain=True)
 1560         domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
 1561         project = unit.new_project_ref(domain_id=domain['id'],
 1562                                        name=domain['name'])
 1563         PROVIDERS.resource_api.create_project(project['id'], project)
 1564         role_member = unit.new_role_ref()
 1565         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1566         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1567             self.user['id'], project['id'], role_member['id'])
 1568 
 1569         auth_data = self.build_authentication_request(
 1570             user_id=self.user['id'],
 1571             password=self.user['password'],
 1572             project_name=project['name'],
 1573             project_domain_name=domain['name'])
 1574 
 1575         r = self.v3_create_token(auth_data)
 1576         self.assertEqual(project['id'], r.result['token']['project']['id'])
 1577 
 1578     def test_create_project_token_fails_with_project_acting_as_domain(self):
 1579         domain = unit.new_project_ref(is_domain=True)
 1580         domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
 1581         role_member = unit.new_role_ref()
 1582         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 1583         PROVIDERS.assignment_api.create_grant(
 1584             role_member['id'],
 1585             user_id=self.user['id'],
 1586             domain_id=domain['id'])
 1587 
 1588         # authentication will fail because the project name is incorrect
 1589         auth_data = self.build_authentication_request(
 1590             user_id=self.user['id'],
 1591             password=self.user['password'],
 1592             project_name=domain['name'],
 1593             project_domain_name=domain['name'])
 1594         self.v3_create_token(auth_data,
 1595                              expected_status=http_client.UNAUTHORIZED)
 1596 
 1597     def test_create_project_token_with_disabled_project_domain_fails(self):
 1598         # create a disabled domain
 1599         domain = unit.new_domain_ref()
 1600         domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1601 
 1602         # create a project in the domain
 1603         project = unit.new_project_ref(domain_id=domain['id'])
 1604         PROVIDERS.resource_api.create_project(project['id'], project)
 1605 
 1606         # assign some role to self.user for the project in the domain
 1607         PROVIDERS.assignment_api.add_role_to_user_and_project(
 1608             self.user['id'],
 1609             project['id'],
 1610             self.role_id)
 1611 
 1612         # Disable the domain
 1613         domain['enabled'] = False
 1614         PROVIDERS.resource_api.update_domain(domain['id'], domain)
 1615 
 1616         # user should not be able to auth with project_id
 1617         auth_data = self.build_authentication_request(
 1618             user_id=self.user['id'],
 1619             password=self.user['password'],
 1620             project_id=project['id'])
 1621         self.v3_create_token(auth_data,
 1622                              expected_status=http_client.UNAUTHORIZED)
 1623 
 1624         # user should not be able to auth with project_name & domain
 1625         auth_data = self.build_authentication_request(
 1626             user_id=self.user['id'],
 1627             password=self.user['password'],
 1628             project_name=project['name'],
 1629             project_domain_id=domain['id'])
 1630         self.v3_create_token(auth_data,
 1631                              expected_status=http_client.UNAUTHORIZED)
 1632 
 1633     def test_create_project_token_with_default_domain_as_project(self):
 1634         # Authenticate to a project with the default domain as project
 1635         auth_data = self.build_authentication_request(
 1636             user_id=self.user['id'],
 1637             password=self.user['password'],
 1638             project_id=test_v3.DEFAULT_DOMAIN_ID)
 1639         self.v3_create_token(auth_data,
 1640                              expected_status=http_client.UNAUTHORIZED)
 1641 
 1642     def test_project_scoped_token_is_invalid_after_disabling_user(self):
 1643         project_scoped_token = self._get_project_scoped_token()
 1644         # Make sure the token is valid
 1645         r = self._validate_token(project_scoped_token)
 1646         self.assertValidProjectScopedTokenResponse(r)
 1647         # Disable the user
 1648         self._set_user_enabled(self.user, enabled=False)
 1649         # Ensure validating a token for a disabled user fails
 1650         self._validate_token(
 1651             project_scoped_token,
 1652             expected_status=http_client.NOT_FOUND
 1653         )
 1654 
 1655     def test_project_scoped_token_invalid_after_changing_user_password(self):
 1656         project_scoped_token = self._get_project_scoped_token()
 1657         # Make sure the token is valid
 1658         r = self._validate_token(project_scoped_token)
 1659         self.assertValidProjectScopedTokenResponse(r)
 1660         # Update user's password
 1661         self.user['password'] = 'Password1'
 1662         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
 1663         # Ensure updating user's password revokes existing tokens
 1664         self._validate_token(
 1665             project_scoped_token,
 1666             expected_status=http_client.NOT_FOUND
 1667         )
 1668 
 1669     def test_project_scoped_token_invalid_after_disabling_project(self):
 1670         project_scoped_token = self._get_project_scoped_token()
 1671         # Make sure the token is valid
 1672         r = self._validate_token(project_scoped_token)
 1673         self.assertValidProjectScopedTokenResponse(r)
 1674         # Disable project
 1675         self.project['enabled'] = False
 1676         PROVIDERS.resource_api.update_project(self.project['id'], self.project)
 1677         # Ensure validating a token for a disabled project fails
 1678         self._validate_token(
 1679             project_scoped_token,
 1680             expected_status=http_client.NOT_FOUND
 1681         )
 1682 
 1683     def test_project_scoped_token_is_invalid_after_deleting_grant(self):
 1684         # disable caching so that user grant deletion is not hidden
 1685         # by token caching
 1686         self.config_fixture.config(
 1687             group='cache',
 1688             enabled=False)
 1689         # Grant user access to project
 1690         PROVIDERS.assignment_api.create_grant(
 1691             self.role['id'], user_id=self.user['id'],
 1692             project_id=self.project['id']
 1693         )
 1694         project_scoped_token = self._get_project_scoped_token()
 1695         # Make sure the token is valid
 1696         r = self._validate_token(project_scoped_token)
 1697         self.assertValidProjectScopedTokenResponse(r)
 1698         # Delete access to project
 1699         PROVIDERS.assignment_api.delete_grant(
 1700             self.role['id'], user_id=self.user['id'],
 1701             project_id=self.project['id']
 1702         )
 1703         # Ensure the token has been revoked
 1704         self._validate_token(
 1705             project_scoped_token,
 1706             expected_status=http_client.NOT_FOUND
 1707         )
 1708 
 1709     def test_no_access_to_default_project_result_in_unscoped_token(self):
 1710         # create a disabled project to work with
 1711         self.create_new_default_project_for_user(self.user['id'],
 1712                                                  self.domain_id)
 1713 
 1714         # attempt to authenticate without requesting a project
 1715         auth_data = self.build_authentication_request(
 1716             user_id=self.user['id'],
 1717             password=self.user['password'])
 1718         r = self.v3_create_token(auth_data)
 1719         self.assertValidUnscopedTokenResponse(r)
 1720 
 1721     def test_rescope_unscoped_token_with_trust(self):
 1722         trustee_user, trust = self._create_trust()
 1723         self._get_trust_scoped_token(trustee_user, trust)
 1724 
 1725     def test_validate_a_trust_scoped_token(self):
 1726         trustee_user, trust = self._create_trust()
 1727         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1728         # Validate a trust scoped token
 1729         r = self._validate_token(trust_scoped_token)
 1730         self.assertValidProjectScopedTokenResponse(r)
 1731 
 1732     def test_validate_expired_trust_scoped_token_returns_not_found(self):
 1733         # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
 1734         # use the context manager of freezegun without sqlite issues.
 1735         self.config_fixture.config(group='token',
 1736                                    expiration=10)
 1737         time = datetime.datetime.utcnow()
 1738         with freezegun.freeze_time(time) as frozen_datetime:
 1739             trustee_user, trust = self._create_trust()
 1740             trust_scoped_token = self._get_trust_scoped_token(
 1741                 trustee_user, trust
 1742             )
 1743             frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
 1744             self._validate_token(
 1745                 trust_scoped_token,
 1746                 expected_status=http_client.NOT_FOUND
 1747             )
 1748 
 1749     def test_validate_a_trust_scoped_token_impersonated(self):
 1750         trustee_user, trust = self._create_trust(impersonation=True)
 1751         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1752         # Validate a trust scoped token
 1753         r = self._validate_token(trust_scoped_token)
 1754         self.assertValidProjectScopedTokenResponse(r)
 1755 
 1756     def test_revoke_trust_scoped_token(self):
 1757         trustee_user, trust = self._create_trust()
 1758         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1759         # Validate a trust scoped token
 1760         r = self._validate_token(trust_scoped_token)
 1761         self.assertValidProjectScopedTokenResponse(r)
 1762         self._revoke_token(trust_scoped_token)
 1763         self._validate_token(trust_scoped_token,
 1764                              expected_status=http_client.NOT_FOUND)
 1765 
 1766     def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
 1767         trustee_user, trust = self._create_trust()
 1768         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1769         # Validate a trust scoped token
 1770         r = self._validate_token(trust_scoped_token)
 1771         self.assertValidProjectScopedTokenResponse(r)
 1772 
 1773         # Disable trustee
 1774         trustee_update_ref = dict(enabled=False)
 1775         PROVIDERS.identity_api.update_user(
 1776             trustee_user['id'], trustee_update_ref
 1777         )
 1778         # Ensure validating a token for a disabled user fails
 1779         self._validate_token(
 1780             trust_scoped_token,
 1781             expected_status=http_client.NOT_FOUND
 1782         )
 1783 
 1784     def test_trust_token_is_invalid_when_trustee_domain_disabled(self):
 1785         # create a new domain with new user in that domain
 1786         new_domain_ref = unit.new_domain_ref()
 1787         PROVIDERS.resource_api.create_domain(
 1788             new_domain_ref['id'], new_domain_ref
 1789         )
 1790 
 1791         trustee_ref = unit.create_user(PROVIDERS.identity_api,
 1792                                        domain_id=new_domain_ref['id'])
 1793 
 1794         new_project_ref = unit.new_project_ref(domain_id=self.domain_id)
 1795         PROVIDERS.resource_api.create_project(
 1796             new_project_ref['id'], new_project_ref
 1797         )
 1798 
 1799         # grant the trustor access to the new project
 1800         PROVIDERS.assignment_api.create_grant(
 1801             self.role['id'],
 1802             user_id=self.user_id,
 1803             project_id=new_project_ref['id'])
 1804 
 1805         trust_ref = unit.new_trust_ref(trustor_user_id=self.user_id,
 1806                                        trustee_user_id=trustee_ref['id'],
 1807                                        expires=dict(minutes=1),
 1808                                        project_id=new_project_ref['id'],
 1809                                        impersonation=True,
 1810                                        role_ids=[self.role['id']])
 1811 
 1812         resp = self.post('/OS-TRUST/trusts', body={'trust': trust_ref})
 1813         self.assertValidTrustResponse(resp, trust_ref)
 1814         trust_id = resp.json_body['trust']['id']
 1815 
 1816         # get a project-scoped token using the trust
 1817         trust_auth_data = self.build_authentication_request(
 1818             user_id=trustee_ref['id'],
 1819             password=trustee_ref['password'],
 1820             trust_id=trust_id)
 1821         trust_scoped_token = self.get_requested_token(trust_auth_data)
 1822 
 1823         # ensure the project-scoped token from the trust is valid
 1824         self._validate_token(trust_scoped_token)
 1825 
 1826         disable_body = {'domain': {'enabled': False}}
 1827         self.patch(
 1828             '/domains/%(domain_id)s' % {'domain_id': new_domain_ref['id']},
 1829             body=disable_body)
 1830 
 1831         # ensure the project-scoped token from the trust is invalid
 1832         self._validate_token(trust_scoped_token,
 1833                              expected_status=http_client.NOT_FOUND)
 1834 
 1835     def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
 1836         trustee_user, trust = self._create_trust()
 1837         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1838         # Validate a trust scoped token
 1839         r = self._validate_token(trust_scoped_token)
 1840         self.assertValidProjectScopedTokenResponse(r)
 1841         # Change trustee's password
 1842         trustee_update_ref = dict(password='Password1')
 1843         PROVIDERS.identity_api.update_user(
 1844             trustee_user['id'], trustee_update_ref
 1845         )
 1846         # Ensure updating trustee's password revokes existing tokens
 1847         self._validate_token(
 1848             trust_scoped_token,
 1849             expected_status=http_client.NOT_FOUND
 1850         )
 1851 
 1852     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 1853         trustee_user, trust = self._create_trust()
 1854         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1855         # Validate a trust scoped token
 1856         r = self._validate_token(trust_scoped_token)
 1857         self.assertValidProjectScopedTokenResponse(r)
 1858 
 1859         # Disable the trustor
 1860         trustor_update_ref = dict(enabled=False)
 1861         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1862         # Ensure validating a token for a disabled user fails
 1863         self._validate_token(
 1864             trust_scoped_token,
 1865             expected_status=http_client.NOT_FOUND
 1866         )
 1867 
 1868     def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
 1869         trustee_user, trust = self._create_trust()
 1870         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1871         # Validate a trust scoped token
 1872         r = self._validate_token(trust_scoped_token)
 1873         self.assertValidProjectScopedTokenResponse(r)
 1874 
 1875         # Change trustor's password
 1876         trustor_update_ref = dict(password='Password1')
 1877         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1878         # Ensure updating trustor's password revokes existing user's tokens
 1879         self._validate_token(
 1880             trust_scoped_token,
 1881             expected_status=http_client.NOT_FOUND
 1882         )
 1883 
 1884     def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
 1885         trustee_user, trust = self._create_trust()
 1886         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 1887         # Validate a trust scoped token
 1888         r = self._validate_token(trust_scoped_token)
 1889         self.assertValidProjectScopedTokenResponse(r)
 1890 
 1891         # Disable trustor's domain
 1892         self.domain['enabled'] = False
 1893         PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
 1894 
 1895         trustor_update_ref = dict(password='Password1')
 1896         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 1897         # Ensure updating trustor's password revokes existing user's tokens
 1898         self._validate_token(
 1899             trust_scoped_token,
 1900             expected_status=http_client.NOT_FOUND
 1901         )
 1902 
 1903     def test_default_fixture_scope_token(self):
 1904         self.assertIsNotNone(self.get_scoped_token())
 1905 
 1906     def test_rescoping_token(self):
 1907         expires = self.v3_token_data['token']['expires_at']
 1908 
 1909         # rescope the token
 1910         r = self.v3_create_token(self.build_authentication_request(
 1911             token=self.v3_token,
 1912             project_id=self.project_id))
 1913         self.assertValidProjectScopedTokenResponse(r)
 1914 
 1915         # ensure token expiration stayed the same
 1916         self.assertTimestampEqual(expires, r.result['token']['expires_at'])
 1917 
 1918     def test_check_token(self):
 1919         self.head('/auth/tokens', headers=self.headers,
 1920                   expected_status=http_client.OK)
 1921 
 1922     def test_validate_token(self):
 1923         r = self.get('/auth/tokens', headers=self.headers)
 1924         self.assertValidUnscopedTokenResponse(r)
 1925 
 1926     def test_validate_missing_subject_token(self):
 1927         self.get('/auth/tokens',
 1928                  expected_status=http_client.NOT_FOUND)
 1929 
 1930     def test_validate_missing_auth_token(self):
 1931         self.admin_request(
 1932             method='GET',
 1933             path='/v3/projects',
 1934             token=None,
 1935             expected_status=http_client.UNAUTHORIZED)
 1936 
 1937     def test_validate_token_nocatalog(self):
 1938         v3_token = self.get_requested_token(self.build_authentication_request(
 1939             user_id=self.user['id'],
 1940             password=self.user['password'],
 1941             project_id=self.project['id']))
 1942         r = self.get(
 1943             '/auth/tokens?nocatalog',
 1944             headers={'X-Subject-Token': v3_token})
 1945         self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
 1946 
 1947     def test_is_admin_token_by_ids(self):
 1948         self.config_fixture.config(
 1949             group='resource',
 1950             admin_project_domain_name=self.domain['name'],
 1951             admin_project_name=self.project['name'])
 1952         r = self.v3_create_token(self.build_authentication_request(
 1953             user_id=self.user['id'],
 1954             password=self.user['password'],
 1955             project_id=self.project['id']))
 1956         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1957         v3_token = r.headers.get('X-Subject-Token')
 1958         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1959         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1960 
 1961     def test_is_admin_token_by_names(self):
 1962         self.config_fixture.config(
 1963             group='resource',
 1964             admin_project_domain_name=self.domain['name'],
 1965             admin_project_name=self.project['name'])
 1966         r = self.v3_create_token(self.build_authentication_request(
 1967             user_id=self.user['id'],
 1968             password=self.user['password'],
 1969             project_domain_name=self.domain['name'],
 1970             project_name=self.project['name']))
 1971         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1972         v3_token = r.headers.get('X-Subject-Token')
 1973         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1974         self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
 1975 
 1976     def test_token_for_non_admin_project_is_not_admin(self):
 1977         self.config_fixture.config(
 1978             group='resource',
 1979             admin_project_domain_name=self.domain['name'],
 1980             admin_project_name=uuid.uuid4().hex)
 1981         r = self.v3_create_token(self.build_authentication_request(
 1982             user_id=self.user['id'],
 1983             password=self.user['password'],
 1984             project_id=self.project['id']))
 1985         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 1986         v3_token = r.headers.get('X-Subject-Token')
 1987         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 1988         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 1989 
 1990     def test_token_for_non_admin_domain_same_project_name_is_not_admin(self):
 1991         self.config_fixture.config(
 1992             group='resource',
 1993             admin_project_domain_name=uuid.uuid4().hex,
 1994             admin_project_name=self.project['name'])
 1995         r = self.v3_create_token(self.build_authentication_request(
 1996             user_id=self.user['id'],
 1997             password=self.user['password'],
 1998             project_id=self.project['id']))
 1999         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 2000         v3_token = r.headers.get('X-Subject-Token')
 2001         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2002         self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
 2003 
 2004     def test_only_admin_project_set_acts_as_non_admin(self):
 2005         self.config_fixture.config(
 2006             group='resource',
 2007             admin_project_name=self.project['name'])
 2008         r = self.v3_create_token(self.build_authentication_request(
 2009             user_id=self.user['id'],
 2010             password=self.user['password'],
 2011             project_id=self.project['id']))
 2012         self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
 2013         v3_token = r.headers.get('X-Subject-Token')
 2014         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2015         self.assertValidProjectScopedTokenResponse(r, is_admin_project=None)
 2016 
 2017     def _create_role(self, domain_id=None):
 2018         """Call ``POST /roles``."""
 2019         ref = unit.new_role_ref(domain_id=domain_id)
 2020         r = self.post('/roles', body={'role': ref})
 2021         return self.assertValidRoleResponse(r, ref)
 2022 
 2023     def _create_implied_role(self, prior_id):
 2024         implied = self._create_role()
 2025         url = '/roles/%s/implies/%s' % (prior_id, implied['id'])
 2026         self.put(url, expected_status=http_client.CREATED)
 2027         return implied
 2028 
 2029     def _delete_implied_role(self, prior_role_id, implied_role_id):
 2030         url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id)
 2031         self.delete(url)
 2032 
 2033     def _get_scoped_token_roles(self, is_domain=False):
 2034         if is_domain:
 2035             v3_token = self.get_domain_scoped_token()
 2036         else:
 2037             v3_token = self.get_scoped_token()
 2038 
 2039         r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
 2040         v3_token_data = r.result
 2041         token_roles = v3_token_data['token']['roles']
 2042         return token_roles
 2043 
 2044     def _create_implied_role_shows_in_v3_token(self, is_domain):
 2045         token_roles = self._get_scoped_token_roles(is_domain)
 2046         self.assertEqual(1, len(token_roles))
 2047 
 2048         prior = token_roles[0]['id']
 2049         implied1 = self._create_implied_role(prior)
 2050 
 2051         token_roles = self._get_scoped_token_roles(is_domain)
 2052         self.assertEqual(2, len(token_roles))
 2053 
 2054         implied2 = self._create_implied_role(prior)
 2055         token_roles = self._get_scoped_token_roles(is_domain)
 2056         self.assertEqual(3, len(token_roles))
 2057 
 2058         token_role_ids = [role['id'] for role in token_roles]
 2059         self.assertIn(prior, token_role_ids)
 2060         self.assertIn(implied1['id'], token_role_ids)
 2061         self.assertIn(implied2['id'], token_role_ids)
 2062 
 2063     def test_create_implied_role_shows_in_v3_project_token(self):
 2064         # regardless of the default chosen, this should always
 2065         # test with the option set.
 2066         self.config_fixture.config(group='token')
 2067         self._create_implied_role_shows_in_v3_token(False)
 2068 
 2069     def test_create_implied_role_shows_in_v3_domain_token(self):
 2070         self.config_fixture.config(group='token')
 2071         PROVIDERS.assignment_api.create_grant(
 2072             self.role['id'], user_id=self.user['id'],
 2073             domain_id=self.domain['id']
 2074         )
 2075 
 2076         self._create_implied_role_shows_in_v3_token(True)
 2077 
 2078     def test_create_implied_role_shows_in_v3_system_token(self):
 2079         self.config_fixture.config(group='token')
 2080         PROVIDERS.assignment_api.create_system_grant_for_user(
 2081             self.user['id'], self.role['id']
 2082         )
 2083 
 2084         token_id = self.get_system_scoped_token()
 2085         r = self.get('/auth/tokens', headers={'X-Subject-Token': token_id})
 2086         token_roles = r.result['token']['roles']
 2087 
 2088         prior = token_roles[0]['id']
 2089         self._create_implied_role(prior)
 2090 
 2091         r = self.get('/auth/tokens', headers={'X-Subject-Token': token_id})
 2092         token_roles = r.result['token']['roles']
 2093         self.assertEqual(2, len(token_roles))
 2094 
 2095     def test_group_assigned_implied_role_shows_in_v3_token(self):
 2096         self.config_fixture.config(group='token')
 2097         is_domain = False
 2098         token_roles = self._get_scoped_token_roles(is_domain)
 2099         self.assertEqual(1, len(token_roles))
 2100 
 2101         new_role = self._create_role()
 2102         prior = new_role['id']
 2103 
 2104         new_group_ref = unit.new_group_ref(domain_id=self.domain['id'])
 2105         new_group = PROVIDERS.identity_api.create_group(new_group_ref)
 2106         PROVIDERS.assignment_api.create_grant(
 2107             prior, group_id=new_group['id'], project_id=self.project['id']
 2108         )
 2109 
 2110         token_roles = self._get_scoped_token_roles(is_domain)
 2111         self.assertEqual(1, len(token_roles))
 2112 
 2113         PROVIDERS.identity_api.add_user_to_group(
 2114             self.user['id'], new_group['id']
 2115         )
 2116 
 2117         token_roles = self._get_scoped_token_roles(is_domain)
 2118         self.assertEqual(2, len(token_roles))
 2119 
 2120         implied1 = self._create_implied_role(prior)
 2121 
 2122         token_roles = self._get_scoped_token_roles(is_domain)
 2123         self.assertEqual(3, len(token_roles))
 2124 
 2125         implied2 = self._create_implied_role(prior)
 2126         token_roles = self._get_scoped_token_roles(is_domain)
 2127         self.assertEqual(4, len(token_roles))
 2128 
 2129         token_role_ids = [role['id'] for role in token_roles]
 2130         self.assertIn(prior, token_role_ids)
 2131         self.assertIn(implied1['id'], token_role_ids)
 2132         self.assertIn(implied2['id'], token_role_ids)
 2133 
 2134     def test_multiple_implied_roles_show_in_v3_token(self):
 2135         self.config_fixture.config(group='token')
 2136         token_roles = self._get_scoped_token_roles()
 2137         self.assertEqual(1, len(token_roles))
 2138 
 2139         prior = token_roles[0]['id']
 2140         implied1 = self._create_implied_role(prior)
 2141         implied2 = self._create_implied_role(prior)
 2142         implied3 = self._create_implied_role(prior)
 2143 
 2144         token_roles = self._get_scoped_token_roles()
 2145         self.assertEqual(4, len(token_roles))
 2146 
 2147         token_role_ids = [role['id'] for role in token_roles]
 2148         self.assertIn(prior, token_role_ids)
 2149         self.assertIn(implied1['id'], token_role_ids)
 2150         self.assertIn(implied2['id'], token_role_ids)
 2151         self.assertIn(implied3['id'], token_role_ids)
 2152 
 2153     def test_chained_implied_role_shows_in_v3_token(self):
 2154         self.config_fixture.config(group='token')
 2155         token_roles = self._get_scoped_token_roles()
 2156         self.assertEqual(1, len(token_roles))
 2157 
 2158         prior = token_roles[0]['id']
 2159         implied1 = self._create_implied_role(prior)
 2160         implied2 = self._create_implied_role(implied1['id'])
 2161         implied3 = self._create_implied_role(implied2['id'])
 2162 
 2163         token_roles = self._get_scoped_token_roles()
 2164         self.assertEqual(4, len(token_roles))
 2165 
 2166         token_role_ids = [role['id'] for role in token_roles]
 2167 
 2168         self.assertIn(prior, token_role_ids)
 2169         self.assertIn(implied1['id'], token_role_ids)
 2170         self.assertIn(implied2['id'], token_role_ids)
 2171         self.assertIn(implied3['id'], token_role_ids)
 2172 
 2173     def test_implied_role_disabled_by_config(self):
 2174         self.config_fixture.config(group='token')
 2175         token_roles = self._get_scoped_token_roles()
 2176         self.assertEqual(1, len(token_roles))
 2177 
 2178         prior = token_roles[0]['id']
 2179         implied1 = self._create_implied_role(prior)
 2180         implied2 = self._create_implied_role(implied1['id'])
 2181         self._create_implied_role(implied2['id'])
 2182 
 2183         token_roles = self._get_scoped_token_roles()
 2184         self.assertEqual(4, len(token_roles))
 2185         token_role_ids = [role['id'] for role in token_roles]
 2186         self.assertIn(prior, token_role_ids)
 2187 
 2188     def test_delete_implied_role_do_not_show_in_v3_token(self):
 2189         self.config_fixture.config(group='token')
 2190         token_roles = self._get_scoped_token_roles()
 2191         prior = token_roles[0]['id']
 2192         implied = self._create_implied_role(prior)
 2193 
 2194         token_roles = self._get_scoped_token_roles()
 2195         self.assertEqual(2, len(token_roles))
 2196         self._delete_implied_role(prior, implied['id'])
 2197 
 2198         token_roles = self._get_scoped_token_roles()
 2199         self.assertEqual(1, len(token_roles))
 2200 
 2201     def test_unrelated_implied_roles_do_not_change_v3_token(self):
 2202         self.config_fixture.config(group='token')
 2203         token_roles = self._get_scoped_token_roles()
 2204         prior = token_roles[0]['id']
 2205         implied = self._create_implied_role(prior)
 2206 
 2207         token_roles = self._get_scoped_token_roles()
 2208         self.assertEqual(2, len(token_roles))
 2209 
 2210         unrelated = self._create_role()
 2211         url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id'])
 2212         self.put(url, expected_status=http_client.CREATED)
 2213 
 2214         token_roles = self._get_scoped_token_roles()
 2215         self.assertEqual(2, len(token_roles))
 2216 
 2217         self._delete_implied_role(unrelated['id'], implied['id'])
 2218         token_roles = self._get_scoped_token_roles()
 2219         self.assertEqual(2, len(token_roles))
 2220 
 2221     def test_domain_specific_roles_do_not_show_v3_token(self):
 2222         self.config_fixture.config(group='token')
 2223         initial_token_roles = self._get_scoped_token_roles()
 2224 
 2225         new_role = self._create_role(domain_id=self.domain_id)
 2226         PROVIDERS.assignment_api.create_grant(
 2227             new_role['id'], user_id=self.user['id'],
 2228             project_id=self.project['id']
 2229         )
 2230         implied = self._create_implied_role(new_role['id'])
 2231 
 2232         token_roles = self._get_scoped_token_roles()
 2233         self.assertEqual(len(initial_token_roles) + 1, len(token_roles))
 2234 
 2235         # The implied role from the domain specific role should be in the
 2236         # token, but not the domain specific role itself.
 2237         token_role_ids = [role['id'] for role in token_roles]
 2238         self.assertIn(implied['id'], token_role_ids)
 2239         self.assertNotIn(new_role['id'], token_role_ids)
 2240 
 2241     def test_remove_all_roles_from_scope_result_in_404(self):
 2242         # create a new user
 2243         new_user = unit.create_user(PROVIDERS.identity_api,
 2244                                     domain_id=self.domain['id'])
 2245 
 2246         # give the new user a role on a project
 2247         path = '/projects/%s/users/%s/roles/%s' % (
 2248             self.project['id'], new_user['id'], self.role['id'])
 2249         self.put(path=path)
 2250 
 2251         # authenticate as the new user and get a project-scoped token
 2252         auth_data = self.build_authentication_request(
 2253             user_id=new_user['id'],
 2254             password=new_user['password'],
 2255             project_id=self.project['id'])
 2256         subject_token_id = self.v3_create_token(auth_data).headers.get(
 2257             'X-Subject-Token')
 2258 
 2259         # make sure the project-scoped token is valid
 2260         headers = {'X-Subject-Token': subject_token_id}
 2261         r = self.get('/auth/tokens', headers=headers)
 2262         self.assertValidProjectScopedTokenResponse(r)
 2263 
 2264         # remove the roles from the user for the given scope
 2265         path = '/projects/%s/users/%s/roles/%s' % (
 2266             self.project['id'], new_user['id'], self.role['id'])
 2267         self.delete(path=path)
 2268 
 2269         # token validation should now result in 404
 2270         self.get('/auth/tokens', headers=headers,
 2271                  expected_status=http_client.NOT_FOUND)
 2272 
 2273     def test_create_token_with_nonexistant_user_id_fails(self):
 2274         auth_data = self.build_authentication_request(
 2275             user_id=uuid.uuid4().hex,
 2276             password=self.user['password'])
 2277         self.v3_create_token(auth_data,
 2278                              expected_status=http_client.UNAUTHORIZED)
 2279 
 2280     def test_create_token_with_nonexistant_username_fails(self):
 2281         auth_data = self.build_authentication_request(
 2282             username=uuid.uuid4().hex,
 2283             user_domain_id=self.domain['id'],
 2284             password=self.user['password'])
 2285         self.v3_create_token(auth_data,
 2286                              expected_status=http_client.UNAUTHORIZED)
 2287 
 2288     def test_create_token_with_nonexistant_domain_id_fails(self):
 2289         auth_data = self.build_authentication_request(
 2290             username=self.user['name'],
 2291             user_domain_id=uuid.uuid4().hex,
 2292             password=self.user['password'])
 2293         self.v3_create_token(auth_data,
 2294                              expected_status=http_client.UNAUTHORIZED)
 2295 
 2296     def test_create_token_with_nonexistant_domain_name_fails(self):
 2297         auth_data = self.build_authentication_request(
 2298             username=self.user['name'],
 2299             user_domain_name=uuid.uuid4().hex,
 2300             password=self.user['password'])
 2301         self.v3_create_token(auth_data,
 2302                              expected_status=http_client.UNAUTHORIZED)
 2303 
 2304     def test_create_token_with_wrong_password_fails(self):
 2305         auth_data = self.build_authentication_request(
 2306             user_id=self.user['id'],
 2307             password=uuid.uuid4().hex)
 2308         self.v3_create_token(auth_data,
 2309                              expected_status=http_client.UNAUTHORIZED)
 2310 
 2311     def test_user_and_group_roles_scoped_token(self):
 2312         """Test correct roles are returned in scoped token.
 2313 
 2314         Test Plan:
 2315 
 2316         - Create a domain, with 1 project, 2 users (user1 and user2)
 2317           and 2 groups (group1 and group2)
 2318         - Make user1 a member of group1, user2 a member of group2
 2319         - Create 8 roles, assigning them to each of the 8 combinations
 2320           of users/groups on domain/project
 2321         - Get a project scoped token for user1, checking that the right
 2322           two roles are returned (one directly assigned, one by virtue
 2323           of group membership)
 2324         - Repeat this for a domain scoped token
 2325         - Make user1 also a member of group2
 2326         - Get another scoped token making sure the additional role
 2327           shows up
 2328         - User2 is just here as a spoiler, to make sure we don't get
 2329           any roles uniquely assigned to it returned in any of our
 2330           tokens
 2331 
 2332         """
 2333         domainA = unit.new_domain_ref()
 2334         PROVIDERS.resource_api.create_domain(domainA['id'], domainA)
 2335         projectA = unit.new_project_ref(domain_id=domainA['id'])
 2336         PROVIDERS.resource_api.create_project(projectA['id'], projectA)
 2337 
 2338         user1 = unit.create_user(
 2339             PROVIDERS.identity_api, domain_id=domainA['id']
 2340         )
 2341 
 2342         user2 = unit.create_user(
 2343             PROVIDERS.identity_api, domain_id=domainA['id']
 2344         )
 2345 
 2346         group1 = unit.new_group_ref(domain_id=domainA['id'])
 2347         group1 = PROVIDERS.identity_api.create_group(group1)
 2348 
 2349         group2 = unit.new_group_ref(domain_id=domainA['id'])
 2350         group2 = PROVIDERS.identity_api.create_group(group2)
 2351 
 2352         PROVIDERS.identity_api.add_user_to_group(
 2353             user1['id'], group1['id']
 2354         )
 2355         PROVIDERS.identity_api.add_user_to_group(
 2356             user2['id'], group2['id']
 2357         )
 2358 
 2359         # Now create all the roles and assign them
 2360         role_list = []
 2361         for _ in range(8):
 2362             role = unit.new_role_ref()
 2363             PROVIDERS.role_api.create_role(role['id'], role)
 2364             role_list.append(role)
 2365 
 2366         PROVIDERS.assignment_api.create_grant(
 2367             role_list[0]['id'], user_id=user1['id'], domain_id=domainA['id']
 2368         )
 2369         PROVIDERS.assignment_api.create_grant(
 2370             role_list[1]['id'], user_id=user1['id'], project_id=projectA['id']
 2371         )
 2372         PROVIDERS.assignment_api.create_grant(
 2373             role_list[2]['id'], user_id=user2['id'], domain_id=domainA['id']
 2374         )
 2375         PROVIDERS.assignment_api.create_grant(
 2376             role_list[3]['id'], user_id=user2['id'], project_id=projectA['id']
 2377         )
 2378         PROVIDERS.assignment_api.create_grant(
 2379             role_list[4]['id'], group_id=group1['id'], domain_id=domainA['id']
 2380         )
 2381         PROVIDERS.assignment_api.create_grant(
 2382             role_list[5]['id'], group_id=group1['id'],
 2383             project_id=projectA['id']
 2384         )
 2385         PROVIDERS.assignment_api.create_grant(
 2386             role_list[6]['id'], group_id=group2['id'], domain_id=domainA['id']
 2387         )
 2388         PROVIDERS.assignment_api.create_grant(
 2389             role_list[7]['id'], group_id=group2['id'],
 2390             project_id=projectA['id']
 2391         )
 2392 
 2393         # First, get a project scoped token - which should
 2394         # contain the direct user role and the one by virtue
 2395         # of group membership
 2396         auth_data = self.build_authentication_request(
 2397             user_id=user1['id'],
 2398             password=user1['password'],
 2399             project_id=projectA['id'])
 2400         r = self.v3_create_token(auth_data)
 2401         token = self.assertValidScopedTokenResponse(r)
 2402         roles_ids = []
 2403         for ref in token['roles']:
 2404             roles_ids.append(ref['id'])
 2405         self.assertEqual(2, len(token['roles']))
 2406         self.assertIn(role_list[1]['id'], roles_ids)
 2407         self.assertIn(role_list[5]['id'], roles_ids)
 2408 
 2409         # Now the same thing for a domain scoped token
 2410         auth_data = self.build_authentication_request(
 2411             user_id=user1['id'],
 2412             password=user1['password'],
 2413             domain_id=domainA['id'])
 2414         r = self.v3_create_token(auth_data)
 2415         token = self.assertValidScopedTokenResponse(r)
 2416         roles_ids = []
 2417         for ref in token['roles']:
 2418             roles_ids.append(ref['id'])
 2419         self.assertEqual(2, len(token['roles']))
 2420         self.assertIn(role_list[0]['id'], roles_ids)
 2421         self.assertIn(role_list[4]['id'], roles_ids)
 2422 
 2423         # Finally, add user1 to the 2nd group, and get a new
 2424         # scoped token - the extra role should now be included
 2425         # by virtue of the 2nd group
 2426         PROVIDERS.identity_api.add_user_to_group(
 2427             user1['id'], group2['id']
 2428         )
 2429         auth_data = self.build_authentication_request(
 2430             user_id=user1['id'],
 2431             password=user1['password'],
 2432             project_id=projectA['id'])
 2433         r = self.v3_create_token(auth_data)
 2434         token = self.assertValidScopedTokenResponse(r)
 2435         roles_ids = []
 2436         for ref in token['roles']:
 2437             roles_ids.append(ref['id'])
 2438         self.assertEqual(3, len(token['roles']))
 2439         self.assertIn(role_list[1]['id'], roles_ids)
 2440         self.assertIn(role_list[5]['id'], roles_ids)
 2441         self.assertIn(role_list[7]['id'], roles_ids)
 2442 
 2443     def test_auth_token_cross_domain_group_and_project(self):
 2444         """Verify getting a token in cross domain group/project roles."""
 2445         # create domain, project and group and grant roles to user
 2446         domain1 = unit.new_domain_ref()
 2447         PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
 2448         project1 = unit.new_project_ref(domain_id=domain1['id'])
 2449         PROVIDERS.resource_api.create_project(project1['id'], project1)
 2450         user_foo = unit.create_user(PROVIDERS.identity_api,
 2451                                     domain_id=test_v3.DEFAULT_DOMAIN_ID)
 2452         role_member = unit.new_role_ref()
 2453         PROVIDERS.role_api.create_role(role_member['id'], role_member)
 2454         role_admin = unit.new_role_ref()
 2455         PROVIDERS.role_api.create_role(role_admin['id'], role_admin)
 2456         role_foo_domain1 = unit.new_role_ref()
 2457         PROVIDERS.role_api.create_role(
 2458             role_foo_domain1['id'], role_foo_domain1
 2459         )
 2460         role_group_domain1 = unit.new_role_ref()
 2461         PROVIDERS.role_api.create_role(
 2462             role_group_domain1['id'], role_group_domain1
 2463         )
 2464         new_group = unit.new_group_ref(domain_id=domain1['id'])
 2465         new_group = PROVIDERS.identity_api.create_group(new_group)
 2466         PROVIDERS.identity_api.add_user_to_group(
 2467             user_foo['id'], new_group['id']
 2468         )
 2469         PROVIDERS.assignment_api.create_grant(
 2470             user_id=user_foo['id'],
 2471             project_id=project1['id'],
 2472             role_id=role_member['id'])
 2473         PROVIDERS.assignment_api.create_grant(
 2474             group_id=new_group['id'],
 2475             project_id=project1['id'],
 2476             role_id=role_admin['id'])
 2477         PROVIDERS.assignment_api.create_grant(
 2478             user_id=user_foo['id'],
 2479             domain_id=domain1['id'],
 2480             role_id=role_foo_domain1['id'])
 2481         PROVIDERS.assignment_api.create_grant(
 2482             group_id=new_group['id'],
 2483             domain_id=domain1['id'],
 2484             role_id=role_group_domain1['id'])
 2485 
 2486         # Get a scoped token for the project
 2487         auth_data = self.build_authentication_request(
 2488             username=user_foo['name'],
 2489             user_domain_id=test_v3.DEFAULT_DOMAIN_ID,
 2490             password=user_foo['password'],
 2491             project_name=project1['name'],
 2492             project_domain_id=domain1['id'])
 2493 
 2494         r = self.v3_create_token(auth_data)
 2495         scoped_token = self.assertValidScopedTokenResponse(r)
 2496         project = scoped_token["project"]
 2497         roles_ids = []
 2498         for ref in scoped_token['roles']:
 2499             roles_ids.append(ref['id'])
 2500         self.assertEqual(project1['id'], project["id"])
 2501         self.assertIn(role_member['id'], roles_ids)
 2502         self.assertIn(role_admin['id'], roles_ids)
 2503         self.assertNotIn(role_foo_domain1['id'], roles_ids)
 2504         self.assertNotIn(role_group_domain1['id'], roles_ids)
 2505 
 2506     def test_remote_user_no_realm(self):
 2507         app = self.loadapp()
 2508 
 2509         auth_contexts = []
 2510 
 2511         # NOTE(morgan): This __init__ is used to inject the auth context into
 2512         # the auth_contexts list so that we can perform introspection. This way
 2513         # we do not need to try and mock out anything deep within keystone's
 2514         # auth pipeline. Note that we are using MockPatch to ensure we undo
 2515         # the mock after the fact.
 2516         def new_init(self, *args, **kwargs):
 2517             super(auth.core.AuthContext, self).__init__(*args, **kwargs)
 2518             auth_contexts.append(self)
 2519 
 2520         self.useFixture(fixtures.MockPatch(
 2521             'keystone.auth.core.AuthContext.__init__', new_init))
 2522         with app.test_client() as c:
 2523             c.environ_base.update(self.build_external_auth_environ(
 2524                 self.default_domain_user['name']))
 2525             auth_req = self.build_authentication_request()
 2526             c.post('/v3/auth/tokens', json=auth_req)
 2527             self.assertEqual(self.default_domain_user['id'],
 2528                              auth_contexts[-1]['user_id'])
 2529 
 2530         # Now test to make sure the user name can, itself, contain the
 2531         # '@' character.
 2532         user = {'name': 'myname@mydivision'}
 2533         PROVIDERS.identity_api.update_user(
 2534             self.default_domain_user['id'], user
 2535         )
 2536         with app.test_client() as c:
 2537             c.environ_base.update(self.build_external_auth_environ(
 2538                 user['name']))
 2539             auth_req = self.build_authentication_request()
 2540             c.post('/v3/auth/tokens', json=auth_req)
 2541             self.assertEqual(self.default_domain_user['id'],
 2542                              auth_contexts[-1]['user_id'])
 2543         self.assertEqual(self.default_domain_user['id'],
 2544                          auth_contexts[-1]['user_id'])
 2545 
 2546     def test_remote_user_no_domain(self):
 2547         app = self.loadapp()
 2548         with app.test_client() as c:
 2549             c.environ_base.update(self.build_external_auth_environ(
 2550                 self.user['name']))
 2551             auth_request = self.build_authentication_request()
 2552             c.post('/v3/auth/tokens', json=auth_request,
 2553                    expected_status_code=http_client.UNAUTHORIZED)
 2554 
 2555     def test_remote_user_and_password(self):
 2556         # both REMOTE_USER and password methods must pass.
 2557         # note that they do not have to match
 2558         app = self.loadapp()
 2559         with app.test_client() as c:
 2560             auth_data = self.build_authentication_request(
 2561                 user_domain_id=self.default_domain_user['domain_id'],
 2562                 username=self.default_domain_user['name'],
 2563                 password=self.default_domain_user['password'])
 2564             c.post('/v3/auth/tokens', json=auth_data)
 2565 
 2566     def test_remote_user_and_explicit_external(self):
 2567         # both REMOTE_USER and password methods must pass.
 2568         # note that they do not have to match
 2569         auth_data = self.build_authentication_request(
 2570             user_domain_id=self.domain['id'],
 2571             username=self.user['name'],
 2572             password=self.user['password'])
 2573         auth_data['auth']['identity']['methods'] = ["password", "external"]
 2574         auth_data['auth']['identity']['external'] = {}
 2575         app = self.loadapp()
 2576         with app.test_client() as c:
 2577             c.post('/v3/auth/tokens', json=auth_data,
 2578                    expected_status_code=http_client.UNAUTHORIZED)
 2579 
 2580     def test_remote_user_bad_password(self):
 2581         # both REMOTE_USER and password methods must pass.
 2582         app = self.loadapp()
 2583         auth_data = self.build_authentication_request(
 2584             user_domain_id=self.domain['id'],
 2585             username=self.user['name'],
 2586             password='badpassword')
 2587         with app.test_client() as c:
 2588             c.post('/v3/auth/tokens', json=auth_data,
 2589                    expected_status_code=http_client.UNAUTHORIZED)
 2590 
 2591     def test_fetch_expired_allow_expired(self):
 2592         self.config_fixture.config(group='token',
 2593                                    expiration=10,
 2594                                    allow_expired_window=20)
 2595         time = datetime.datetime.utcnow()
 2596         with freezegun.freeze_time(time) as frozen_datetime:
 2597             token = self._get_project_scoped_token()
 2598 
 2599             # initially it validates because it's within time
 2600             frozen_datetime.tick(delta=datetime.timedelta(seconds=2))
 2601             self._validate_token(token)
 2602 
 2603             # after passing expiry time validation fails
 2604             frozen_datetime.tick(delta=datetime.timedelta(seconds=12))
 2605             self._validate_token(token, expected_status=http_client.NOT_FOUND)
 2606 
 2607             # but if we pass allow_expired it validates
 2608             self._validate_token(token, allow_expired=True)
 2609 
 2610             # and then if we're passed the allow_expired_window it will fail
 2611             # anyway raises expired when now > expiration + window
 2612             frozen_datetime.tick(delta=datetime.timedelta(seconds=22))
 2613             self._validate_token(token,
 2614                                  allow_expired=True,
 2615                                  expected_status=http_client.NOT_FOUND)
 2616 
 2617     def test_system_scoped_token_works_with_domain_specific_drivers(self):
 2618         self.config_fixture.config(
 2619             group='identity', domain_specific_drivers_enabled=True
 2620         )
 2621 
 2622         PROVIDERS.assignment_api.create_system_grant_for_user(
 2623             self.user['id'], self.role['id']
 2624         )
 2625 
 2626         token_id = self.get_system_scoped_token()
 2627         headers = {'X-Auth-Token': token_id}
 2628 
 2629         app = self.loadapp()
 2630         with app.test_client() as c:
 2631             c.get('/v3/users', headers=headers)
 2632 
 2633 
 2634 class TokenDataTests(object):
 2635     """Test the data in specific token types."""
 2636 
 2637     def test_unscoped_token_format(self):
 2638         # ensure the unscoped token response contains the appropriate data
 2639         r = self.get('/auth/tokens', headers=self.headers)
 2640         self.assertValidUnscopedTokenResponse(r)
 2641 
 2642     def test_domain_scoped_token_format(self):
 2643         # ensure the domain scoped token response contains the appropriate data
 2644         PROVIDERS.assignment_api.create_grant(
 2645             self.role['id'],
 2646             user_id=self.default_domain_user['id'],
 2647             domain_id=self.domain['id'])
 2648 
 2649         domain_scoped_token = self.get_requested_token(
 2650             self.build_authentication_request(
 2651                 user_id=self.default_domain_user['id'],
 2652                 password=self.default_domain_user['password'],
 2653                 domain_id=self.domain['id'])
 2654         )
 2655         self.headers['X-Subject-Token'] = domain_scoped_token
 2656         r = self.get('/auth/tokens', headers=self.headers)
 2657         self.assertValidDomainScopedTokenResponse(r)
 2658 
 2659     def test_project_scoped_token_format(self):
 2660         # ensure project scoped token responses contains the appropriate data
 2661         project_scoped_token = self.get_requested_token(
 2662             self.build_authentication_request(
 2663                 user_id=self.default_domain_user['id'],
 2664                 password=self.default_domain_user['password'],
 2665                 project_id=self.default_domain_project['id'])
 2666         )
 2667         self.headers['X-Subject-Token'] = project_scoped_token
 2668         r = self.get('/auth/tokens', headers=self.headers)
 2669         self.assertValidProjectScopedTokenResponse(r)
 2670 
 2671     def test_extra_data_in_unscoped_token_fails_validation(self):
 2672         # ensure unscoped token response contains the appropriate data
 2673         r = self.get('/auth/tokens', headers=self.headers)
 2674 
 2675         # populate the response result with some extra data
 2676         r.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex)
 2677         self.assertRaises(exception.SchemaValidationError,
 2678                           self.assertValidUnscopedTokenResponse,
 2679                           r)
 2680 
 2681     def test_extra_data_in_domain_scoped_token_fails_validation(self):
 2682         # ensure domain scoped token response contains the appropriate data
 2683         PROVIDERS.assignment_api.create_grant(
 2684             self.role['id'],
 2685             user_id=self.default_domain_user['id'],
 2686             domain_id=self.domain['id'])
 2687 
 2688         domain_scoped_token = self.get_requested_token(
 2689             self.build_authentication_request(
 2690                 user_id=self.default_domain_user['id'],
 2691                 password=self.default_domain_user['password'],
 2692                 domain_id=self.domain['id'])
 2693         )
 2694         self.headers['X-Subject-Token'] = domain_scoped_token
 2695         r = self.get('/auth/tokens', headers=self.headers)
 2696 
 2697         # populate the response result with some extra data
 2698         r.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex)
 2699         self.assertRaises(exception.SchemaValidationError,
 2700                           self.assertValidDomainScopedTokenResponse,
 2701                           r)
 2702 
 2703     def test_extra_data_in_project_scoped_token_fails_validation(self):
 2704         # ensure project scoped token responses contains the appropriate data
 2705         project_scoped_token = self.get_requested_token(
 2706             self.build_authentication_request(
 2707                 user_id=self.default_domain_user['id'],
 2708                 password=self.default_domain_user['password'],
 2709                 project_id=self.default_domain_project['id'])
 2710         )
 2711         self.headers['X-Subject-Token'] = project_scoped_token
 2712         resp = self.get('/auth/tokens', headers=self.headers)
 2713 
 2714         # populate the response result with some extra data
 2715         resp.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex)
 2716         self.assertRaises(exception.SchemaValidationError,
 2717                           self.assertValidProjectScopedTokenResponse,
 2718                           resp)
 2719 
 2720 
 2721 class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
 2722     def config_overrides(self):
 2723         super(AllowRescopeScopedTokenDisabledTests, self).config_overrides()
 2724         self.config_fixture.config(
 2725             group='token',
 2726             allow_rescope_scoped_token=False)
 2727 
 2728     def test_rescoping_v3_to_v3_disabled(self):
 2729         self.v3_create_token(
 2730             self.build_authentication_request(
 2731                 token=self.get_scoped_token(),
 2732                 project_id=self.project_id),
 2733             expected_status=http_client.FORBIDDEN)
 2734 
 2735     def test_rescoped_domain_token_disabled(self):
 2736 
 2737         self.domainA = unit.new_domain_ref()
 2738         PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
 2739         PROVIDERS.assignment_api.create_grant(
 2740             self.role['id'], user_id=self.user['id'],
 2741             domain_id=self.domainA['id']
 2742         )
 2743         unscoped_token = self.get_requested_token(
 2744             self.build_authentication_request(
 2745                 user_id=self.user['id'],
 2746                 password=self.user['password']))
 2747         # Get a domain-scoped token from the unscoped token
 2748         domain_scoped_token = self.get_requested_token(
 2749             self.build_authentication_request(
 2750                 token=unscoped_token,
 2751                 domain_id=self.domainA['id']))
 2752         self.v3_create_token(
 2753             self.build_authentication_request(
 2754                 token=domain_scoped_token,
 2755                 project_id=self.project_id),
 2756             expected_status=http_client.FORBIDDEN)
 2757 
 2758 
 2759 class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests,
 2760                           TokenDataTests):
 2761     def config_overrides(self):
 2762         super(TestFernetTokenAPIs, self).config_overrides()
 2763         self.config_fixture.config(group='token', provider='fernet',
 2764                                    cache_on_issue=True)
 2765         self.useFixture(
 2766             ksfixtures.KeyRepository(
 2767                 self.config_fixture,
 2768                 'fernet_tokens',
 2769                 CONF.fernet_tokens.max_active_keys
 2770             )
 2771         )
 2772 
 2773     def setUp(self):
 2774         super(TestFernetTokenAPIs, self).setUp()
 2775         self.doSetUp()
 2776 
 2777     def _make_auth_request(self, auth_data):
 2778         token = super(TestFernetTokenAPIs, self)._make_auth_request(auth_data)
 2779         self.assertLess(len(token), 255)
 2780         return token
 2781 
 2782     def test_validate_tampered_unscoped_token_fails(self):
 2783         unscoped_token = self._get_unscoped_token()
 2784         tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
 2785                           unscoped_token[50 + 32:])
 2786         self._validate_token(tampered_token,
 2787                              expected_status=http_client.NOT_FOUND)
 2788 
 2789     def test_validate_tampered_project_scoped_token_fails(self):
 2790         project_scoped_token = self._get_project_scoped_token()
 2791         tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
 2792                           project_scoped_token[50 + 32:])
 2793         self._validate_token(tampered_token,
 2794                              expected_status=http_client.NOT_FOUND)
 2795 
 2796     def test_validate_tampered_trust_scoped_token_fails(self):
 2797         trustee_user, trust = self._create_trust()
 2798         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2799         # Get a trust scoped token
 2800         tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
 2801                           trust_scoped_token[50 + 32:])
 2802         self._validate_token(tampered_token,
 2803                              expected_status=http_client.NOT_FOUND)
 2804 
 2805     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 2806         # NOTE(amakarov): have to override this test for non-persistent tokens
 2807         # as TokenNotFound exception makes no sense for those.
 2808         trustee_user, trust = self._create_trust()
 2809         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2810         # Validate a trust scoped token
 2811         r = self._validate_token(trust_scoped_token)
 2812         self.assertValidProjectScopedTokenResponse(r)
 2813 
 2814         # Disable the trustor
 2815         trustor_update_ref = dict(enabled=False)
 2816         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 2817         # Ensure validating a token for a disabled user fails
 2818         self._validate_token(
 2819             trust_scoped_token,
 2820             expected_status=http_client.FORBIDDEN
 2821         )
 2822 
 2823 
 2824 class TestJWSTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, TokenDataTests):
 2825     def config_overrides(self):
 2826         super(TestJWSTokenAPIs, self).config_overrides()
 2827         self.config_fixture.config(group='token', provider='jws',
 2828                                    cache_on_issue=True)
 2829         self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture))
 2830 
 2831     def setUp(self):
 2832         super(TestJWSTokenAPIs, self).setUp()
 2833         self.doSetUp()
 2834 
 2835     def _make_auth_request(self, auth_data):
 2836         token = super(TestJWSTokenAPIs, self)._make_auth_request(auth_data)
 2837         self.assertLess(len(token), 350)
 2838         return token
 2839 
 2840     def test_validate_tampered_unscoped_token_fails(self):
 2841         unscoped_token = self._get_unscoped_token()
 2842         tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
 2843                           unscoped_token[50 + 32:])
 2844         self._validate_token(tampered_token,
 2845                              expected_status=http_client.NOT_FOUND)
 2846 
 2847     def test_validate_tampered_project_scoped_token_fails(self):
 2848         project_scoped_token = self._get_project_scoped_token()
 2849         tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
 2850                           project_scoped_token[50 + 32:])
 2851         self._validate_token(tampered_token,
 2852                              expected_status=http_client.NOT_FOUND)
 2853 
 2854     def test_validate_tampered_trust_scoped_token_fails(self):
 2855         trustee_user, trust = self._create_trust()
 2856         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2857         # Get a trust scoped token
 2858         tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
 2859                           trust_scoped_token[50 + 32:])
 2860         self._validate_token(tampered_token,
 2861                              expected_status=http_client.NOT_FOUND)
 2862 
 2863     def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
 2864         # NOTE(amakarov): have to override this test for non-persistent tokens
 2865         # as TokenNotFound exception makes no sense for those.
 2866         trustee_user, trust = self._create_trust()
 2867         trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
 2868         # Validate a trust scoped token
 2869         r = self._validate_token(trust_scoped_token)
 2870         self.assertValidProjectScopedTokenResponse(r)
 2871 
 2872         # Disable the trustor
 2873         trustor_update_ref = dict(enabled=False)
 2874         PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
 2875         # Ensure validating a token for a disabled user fails
 2876         self._validate_token(
 2877             trust_scoped_token,
 2878             expected_status=http_client.FORBIDDEN
 2879         )
 2880 
 2881 
 2882 class TestTokenRevokeById(test_v3.RestfulTestCase):
 2883     """Test token revocation on the v3 Identity API."""
 2884 
 2885     def config_overrides(self):
 2886         super(TestTokenRevokeById, self).config_overrides()
 2887         self.config_fixture.config(
 2888             group='token',
 2889             provider='fernet',
 2890             revoke_by_id=False)
 2891         self.useFixture(
 2892             ksfixtures.KeyRepository(
 2893                 self.config_fixture,
 2894                 'fernet_tokens',
 2895                 CONF.fernet_tokens.max_active_keys
 2896             )
 2897         )
 2898 
 2899     def setUp(self):
 2900         """Setup for Token Revoking Test Cases.
 2901 
 2902         As well as the usual housekeeping, create a set of domains,
 2903         users, groups, roles and projects for the subsequent tests:
 2904 
 2905         - Two domains: A & B
 2906         - Three users (1, 2 and 3)
 2907         - Three groups (1, 2 and 3)
 2908         - Two roles (1 and 2)
 2909         - DomainA owns user1, domainB owns user2 and user3
 2910         - DomainA owns group1 and group2, domainB owns group3
 2911         - User1 and user2 are members of group1
 2912         - User3 is a member of group2
 2913         - Two projects: A & B, both in domainA
 2914         - Group1 has role1 on Project A and B, meaning that user1 and user2
 2915           will get these roles by virtue of membership
 2916         - User1, 2 and 3 have role1 assigned to projectA
 2917         - Group1 has role1 on Project A and B, meaning that user1 and user2
 2918           will get role1 (duplicated) by virtue of membership
 2919         - User1 has role2 assigned to domainA
 2920 
 2921         """
 2922         super(TestTokenRevokeById, self).setUp()
 2923 
 2924         # Start by creating a couple of domains and projects
 2925         self.domainA = unit.new_domain_ref()
 2926         PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
 2927         self.domainB = unit.new_domain_ref()
 2928         PROVIDERS.resource_api.create_domain(self.domainB['id'], self.domainB)
 2929         self.projectA = unit.new_project_ref(domain_id=self.domainA['id'])
 2930         PROVIDERS.resource_api.create_project(
 2931             self.projectA['id'], self.projectA
 2932         )
 2933         self.projectB = unit.new_project_ref(domain_id=self.domainA['id'])
 2934         PROVIDERS.resource_api.create_project(
 2935             self.projectB['id'], self.projectB
 2936         )
 2937 
 2938         # Now create some users
 2939         self.user1 = unit.create_user(PROVIDERS.identity_api,
 2940                                       domain_id=self.domainA['id'])
 2941 
 2942         self.user2 = unit.create_user(PROVIDERS.identity_api,
 2943                                       domain_id=self.domainB['id'])
 2944 
 2945         self.user3 = unit.create_user(PROVIDERS.identity_api,
 2946                                       domain_id=self.domainB['id'])
 2947 
 2948         self.group1 = unit.new_group_ref(domain_id=self.domainA['id'])
 2949         self.group1 = PROVIDERS.identity_api.create_group(self.group1)
 2950 
 2951         self.group2 = unit.new_group_ref(domain_id=self.domainA['id'])
 2952         self.group2 = PROVIDERS.identity_api.create_group(self.group2)
 2953 
 2954         self.group3 = unit.new_group_ref(domain_id=self.domainB['id'])
 2955         self.group3 = PROVIDERS.identity_api.create_group(self.group3)
 2956 
 2957         PROVIDERS.identity_api.add_user_to_group(
 2958             self.user1['id'], self.group1['id']
 2959         )
 2960         PROVIDERS.identity_api.add_user_to_group(
 2961             self.user2['id'], self.group1['id']
 2962         )
 2963         PROVIDERS.identity_api.add_user_to_group(
 2964             self.user3['id'], self.group2['id']
 2965         )
 2966 
 2967         self.role1 = unit.new_role_ref()
 2968         PROVIDERS.role_api.create_role(self.role1['id'], self.role1)
 2969         self.role2 = unit.new_role_ref()
 2970         PROVIDERS.role_api.create_role(self.role2['id'], self.role2)
 2971 
 2972         PROVIDERS.assignment_api.create_grant(
 2973             self.role2['id'], user_id=self.user1['id'],
 2974             domain_id=self.domainA['id']
 2975         )
 2976         PROVIDERS.assignment_api.create_grant(
 2977             self.role1['id'], user_id=self.user1['id'],
 2978             project_id=self.projectA['id']
 2979         )
 2980         PROVIDERS.assignment_api.create_grant(
 2981             self.role1['id'], user_id=self.user2['id'],
 2982             project_id=self.projectA['id']
 2983         )
 2984         PROVIDERS.assignment_api.create_grant(
 2985             self.role1['id'], user_id=self.user3['id'],
 2986             project_id=self.projectA['id']
 2987         )
 2988         PROVIDERS.assignment_api.create_grant(
 2989             self.role1['id'], group_id=self.group1['id'],
 2990             project_id=self.projectA['id']
 2991         )
 2992 
 2993     def test_unscoped_token_remains_valid_after_role_assignment(self):
 2994         unscoped_token = self.get_requested_token(
 2995             self.build_authentication_request(
 2996                 user_id=self.user1['id'],
 2997                 password=self.user1['password']))
 2998 
 2999         scoped_token = self.get_requested_token(
 3000             self.build_authentication_request(
 3001                 token=unscoped_token,
 3002                 project_id=self.projectA['id']))
 3003 
 3004         # confirm both tokens are valid
 3005         self.head('/auth/tokens',
 3006                   headers={'X-Subject-Token': unscoped_token},
 3007                   expected_status=http_client.OK)
 3008         self.head('/auth/tokens',
 3009                   headers={'X-Subject-Token': scoped_token},
 3010                   expected_status=http_client.OK)
 3011 
 3012         # create a new role
 3013         role = unit.new_role_ref()
 3014         PROVIDERS.role_api.create_role(role['id'], role)
 3015 
 3016         # assign a new role
 3017         self.put(
 3018             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
 3019                 'project_id': self.projectA['id'],
 3020                 'user_id': self.user1['id'],
 3021                 'role_id': role['id']})
 3022 
 3023         # both tokens should remain valid
 3024         self.head('/auth/tokens',
 3025                   headers={'X-Subject-Token': unscoped_token},
 3026                   expected_status=http_client.OK)
 3027         self.head('/auth/tokens',
 3028                   headers={'X-Subject-Token': scoped_token},
 3029                   expected_status=http_client.OK)
 3030 
 3031     def test_deleting_user_grant_revokes_token(self):
 3032         """Test deleting a user grant revokes token.
 3033 
 3034         Test Plan:
 3035 
 3036         - Get a token for user, scoped to Project
 3037         - Delete the grant user has on Project
 3038         - Check token is no longer valid
 3039 
 3040         """
 3041         auth_data = self.build_authentication_request(
 3042             user_id=self.user['id'],
 3043             password=self.user['password'],
 3044             project_id=self.project['id'])
 3045         token = self.get_requested_token(auth_data)
 3046         # Confirm token is valid
 3047         self.head('/auth/tokens',
 3048                   headers={'X-Subject-Token': token},
 3049                   expected_status=http_client.OK)
 3050         # Delete the grant, which should invalidate the token
 3051         grant_url = (
 3052             '/projects/%(project_id)s/users/%(user_id)s/'
 3053             'roles/%(role_id)s' % {
 3054                 'project_id': self.project['id'],
 3055                 'user_id': self.user['id'],
 3056                 'role_id': self.role['id']})
 3057         self.delete(grant_url)
 3058         self.head('/auth/tokens', token=token,
 3059                   expected_status=http_client.UNAUTHORIZED)
 3060 
 3061     def role_data_fixtures(self):
 3062         self.projectC = unit.new_project_ref(domain_id=self.domainA['id'])
 3063         PROVIDERS.resource_api.create_project(
 3064             self.projectC['id'], self.projectC
 3065         )
 3066         self.user4 = unit.create_user(PROVIDERS.identity_api,
 3067                                       domain_id=self.domainB['id'])
 3068         self.user5 = unit.create_user(PROVIDERS.identity_api,
 3069                                       domain_id=self.domainA['id'])
 3070         self.user6 = unit.create_user(PROVIDERS.identity_api,
 3071                                       domain_id=self.domainA['id'])
 3072         PROVIDERS.identity_api.add_user_to_group(
 3073             self.user5['id'], self.group1['id']
 3074         )
 3075         PROVIDERS.assignment_api.create_grant(
 3076             self.role1['id'], group_id=self.group1['id'],
 3077             project_id=self.projectB['id']
 3078         )
 3079         PROVIDERS.assignment_api.create_grant(
 3080             self.role2['id'], user_id=self.user4['id'],
 3081             project_id=self.projectC['id']
 3082         )
 3083         PROVIDERS.assignment_api.create_grant(
 3084             self.role1['id'], user_id=self.user6['id'],
 3085             project_id=self.projectA['id']
 3086         )
 3087         PROVIDERS.assignment_api.create_grant(
 3088             self.role1['id'], user_id=self.user6['id'],
 3089             domain_id=self.domainA['id']
 3090         )
 3091 
 3092     def test_deleting_role_revokes_token(self):
 3093         """Test deleting a role revokes token.
 3094 
 3095         Add some additional test data, namely:
 3096 
 3097         - A third project (project C)
 3098         - Three additional users - user4 owned by domainB and user5 and 6 owned
 3099           by domainA (different domain ownership should not affect the test
 3100           results, just provided to broaden test coverage)
 3101         - User5 is a member of group1
 3102         - Group1 gets an additional assignment - role1 on projectB as well as
 3103           its existing role1 on projectA
 3104         - User4 has role2 on Project C
 3105         - User6 has role1 on projectA and domainA
 3106         - This allows us to create 5 tokens by virtue of different types of
 3107           role assignment:
 3108           - user1, scoped to ProjectA by virtue of user role1 assignment
 3109           - user5, scoped to ProjectB by virtue of group role1 assignment
 3110           - user4, scoped to ProjectC by virtue of user role2 assignment
 3111           - user6, scoped to ProjectA by virtue of user role1 assignment
 3112           - user6, scoped to DomainA by virtue of user role1 assignment
 3113         - role1 is then deleted
 3114         - Check the tokens on Project A and B, and DomainA are revoked, but not
 3115           the one for Project C
 3116 
 3117         """
 3118         self.role_data_fixtures()
 3119 
 3120         # Now we are ready to start issuing requests
 3121         auth_data = self.build_authentication_request(
 3122             user_id=self.user1['id'],
 3123             password=self.user1['password'],
 3124             project_id=self.projectA['id'])
 3125         tokenA = self.get_requested_token(auth_data)
 3126         auth_data = self.build_authentication_request(
 3127             user_id=self.user5['id'],
 3128             password=self.user5['password'],
 3129             project_id=self.projectB['id'])
 3130         tokenB = self.get_requested_token(auth_data)
 3131         auth_data = self.build_authentication_request(
 3132             user_id=self.user4['id'],
 3133             password=self.user4['password'],
 3134             project_id=self.projectC['id'])
 3135         tokenC = self.get_requested_token(auth_data)
 3136         auth_data = self.build_authentication_request(
 3137             user_id=self.user6['id'],
 3138             password=self.user6['password'],
 3139             project_id=self.projectA['id'])
 3140         tokenD = self.get_requested_token(auth_data)
 3141         auth_data = self.build_authentication_request(
 3142             user_id=self.user6['id'],
 3143             password=self.user6['password'],
 3144             domain_id=self.domainA['id'])
 3145         tokenE = self.get_requested_token(auth_data)
 3146         # Confirm tokens are valid
 3147         self.head('/auth/tokens',
 3148                   headers={'X-Subject-Token': tokenA},
 3149                   expected_status=http_client.OK)
 3150         self.head('/auth/tokens',
 3151                   headers={'X-Subject-Token': tokenB},
 3152                   expected_status=http_client.OK)
 3153         self.head('/auth/tokens',
 3154                   headers={'X-Subject-Token': tokenC},
 3155                   expected_status=http_client.OK)
 3156         self.head('/auth/tokens',
 3157                   headers={'X-Subject-Token': tokenD},
 3158                   expected_status=http_client.OK)
 3159         self.head('/auth/tokens',
 3160                   headers={'X-Subject-Token': tokenE},
 3161                   expected_status=http_client.OK)
 3162 
 3163         # Delete the role, which should invalidate the tokens
 3164         role_url = '/roles/%s' % self.role1['id']
 3165         self.delete(role_url)
 3166 
 3167         # Check the tokens that used role1 is invalid
 3168         self.head('/auth/tokens',
 3169                   headers={'X-Subject-Token': tokenA},
 3170                   expected_status=http_client.NOT_FOUND)
 3171         self.head('/auth/tokens',
 3172                   headers={'X-Subject-Token': tokenB},
 3173                   expected_status=http_client.NOT_FOUND)
 3174         self.head('/auth/tokens',
 3175                   headers={'X-Subject-Token': tokenD},
 3176                   expected_status=http_client.NOT_FOUND)
 3177         self.head('/auth/tokens',
 3178                   headers={'X-Subject-Token': tokenE},
 3179                   expected_status=http_client.NOT_FOUND)
 3180 
 3181         # ...but the one using role2 is still valid
 3182         self.head('/auth/tokens',
 3183                   headers={'X-Subject-Token': tokenC},
 3184                   expected_status=http_client.OK)
 3185 
 3186     def test_domain_user_role_assignment_maintains_token(self):
 3187         """Test user-domain role assignment maintains existing token.
 3188 
 3189         Test Plan:
 3190 
 3191         - Get a token for user1, scoped to ProjectA
 3192         - Create a grant for user1 on DomainB
 3193         - Check token is still valid
 3194 
 3195         """
 3196         auth_data = self.build_authentication_request(
 3197             user_id=self.user1['id'],
 3198             password=self.user1['password'],
 3199             project_id=self.projectA['id'])
 3200         token = self.get_requested_token(auth_data)
 3201         # Confirm token is valid
 3202         self.head('/auth/tokens',
 3203                   headers={'X-Subject-Token': token},
 3204                   expected_status=http_client.OK)
 3205         # Assign a role, which should not affect the token
 3206         grant_url = (
 3207             '/domains/%(domain_id)s/users/%(user_id)s/'
 3208             'roles/%(role_id)s' % {
 3209                 'domain_id': self.domainB['id'],
 3210                 'user_id': self.user1['id'],
 3211                 'role_id': self.role1['id']})
 3212         self.put(grant_url)
 3213         self.head('/auth/tokens',
 3214                   headers={'X-Subject-Token': token},
 3215                   expected_status=http_client.OK)
 3216 
 3217     def test_disabling_project_revokes_token(self):
 3218         token = self.get_requested_token(
 3219             self.build_authentication_request(
 3220                 user_id=self.user3['id'],
 3221                 password=self.user3['password'],
 3222                 project_id=self.projectA['id']))
 3223 
 3224         # confirm token is valid
 3225         self.head('/auth/tokens',
 3226                   headers={'X-Subject-Token': token},
 3227                   expected_status=http_client.OK)
 3228 
 3229         # disable the project, which should invalidate the token
 3230         self.patch(
 3231             '/projects/%(project_id)s' % {'project_id': self.projectA['id']},
 3232             body={'project': {'enabled': False}})
 3233 
 3234         # user should no longer have access to the project
 3235         self.head('/auth/tokens',
 3236                   headers={'X-Subject-Token': token},
 3237                   expected_status=http_client.NOT_FOUND)
 3238         self.v3_create_token(
 3239             self.build_authentication_request(
 3240                 user_id=self.user3['id'],
 3241                 password=self.user3['password'],
 3242                 project_id=self.projectA['id']),
 3243             expected_status=http_client.UNAUTHORIZED)
 3244 
 3245     def test_deleting_project_revokes_token(self):
 3246         token = self.get_requested_token(
 3247             self.build_authentication_request(
 3248                 user_id=self.user3['id'],
 3249                 password=self.user3['password'],
 3250                 project_id=self.projectA['id']))
 3251 
 3252         # confirm token is valid
 3253         self.head('/auth/tokens',
 3254                   headers={'X-Subject-Token': token},
 3255                   expected_status=http_client.OK)
 3256 
 3257         # delete the project, which should invalidate the token
 3258         self.delete(
 3259             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3260 
 3261         # user should no longer have access to the project
 3262         self.head('/auth/tokens',
 3263                   headers={'X-Subject-Token': token},
 3264                   expected_status=http_client.NOT_FOUND)
 3265         self.v3_create_token(
 3266             self.build_authentication_request(
 3267                 user_id=self.user3['id'],
 3268                 password=self.user3['password'],
 3269                 project_id=self.projectA['id']),
 3270             expected_status=http_client.UNAUTHORIZED)
 3271 
 3272     def test_deleting_group_grant_revokes_tokens(self):
 3273         """Test deleting a group grant revokes tokens.
 3274 
 3275         Test Plan:
 3276 
 3277         - Get a token for user1, scoped to ProjectA
 3278         - Get a token for user2, scoped to ProjectA
 3279         - Get a token for user3, scoped to ProjectA
 3280         - Delete the grant group1 has on ProjectA
 3281         - Check tokens for user1 & user2 are no longer valid,
 3282           since user1 and user2 are members of group1
 3283         - Check token for user3 is invalid too
 3284 
 3285         """
 3286         auth_data = self.build_authentication_request(
 3287             user_id=self.user1['id'],
 3288             password=self.user1['password'],
 3289             project_id=self.projectA['id'])
 3290         token1 = self.get_requested_token(auth_data)
 3291         auth_data = self.build_authentication_request(
 3292             user_id=self.user2['id'],
 3293             password=self.user2['password'],
 3294             project_id=self.projectA['id'])
 3295         token2 = self.get_requested_token(auth_data)
 3296         auth_data = self.build_authentication_request(
 3297             user_id=self.user3['id'],
 3298             password=self.user3['password'],
 3299             project_id=self.projectA['id'])
 3300         token3 = self.get_requested_token(auth_data)
 3301         # Confirm tokens are valid
 3302         self.head('/auth/tokens',
 3303                   headers={'X-Subject-Token': token1},
 3304                   expected_status=http_client.OK)
 3305         self.head('/auth/tokens',
 3306                   headers={'X-Subject-Token': token2},
 3307                   expected_status=http_client.OK)
 3308         self.head('/auth/tokens',
 3309                   headers={'X-Subject-Token': token3},
 3310                   expected_status=http_client.OK)
 3311         # Delete the group grant, which should invalidate the
 3312         # tokens for user1 and user2
 3313         grant_url = (
 3314             '/projects/%(project_id)s/groups/%(group_id)s/'
 3315             'roles/%(role_id)s' % {
 3316                 'project_id': self.projectA['id'],
 3317                 'group_id': self.group1['id'],
 3318                 'role_id': self.role1['id']})
 3319         self.delete(grant_url)
 3320         PROVIDERS.assignment_api.delete_grant(
 3321             role_id=self.role1['id'], project_id=self.projectA['id'],
 3322             user_id=self.user1['id']
 3323         )
 3324         PROVIDERS.assignment_api.delete_grant(
 3325             role_id=self.role1['id'], project_id=self.projectA['id'],
 3326             user_id=self.user2['id']
 3327         )
 3328         self.head('/auth/tokens', token=token1,
 3329                   expected_status=http_client.UNAUTHORIZED)
 3330         self.head('/auth/tokens', token=token2,
 3331                   expected_status=http_client.UNAUTHORIZED)
 3332         # But user3's token should be invalid too as revocation is done for
 3333         # scope role & project
 3334         self.head('/auth/tokens',
 3335                   headers={'X-Subject-Token': token3},
 3336                   expected_status=http_client.OK)
 3337 
 3338     def test_domain_group_role_assignment_maintains_token(self):
 3339         """Test domain-group role assignment maintains existing token.
 3340 
 3341         Test Plan:
 3342 
 3343         - Get a token for user1, scoped to ProjectA
 3344         - Create a grant for group1 on DomainB
 3345         - Check token is still longer valid
 3346 
 3347         """
 3348         auth_data = self.build_authentication_request(
 3349             user_id=self.user1['id'],
 3350             password=self.user1['password'],
 3351             project_id=self.projectA['id'])
 3352         token = self.get_requested_token(auth_data)
 3353         # Confirm token is valid
 3354         self.head('/auth/tokens',
 3355                   headers={'X-Subject-Token': token},
 3356                   expected_status=http_client.OK)
 3357         # Delete the grant, which should invalidate the token
 3358         grant_url = (
 3359             '/domains/%(domain_id)s/groups/%(group_id)s/'
 3360             'roles/%(role_id)s' % {
 3361                 'domain_id': self.domainB['id'],
 3362                 'group_id': self.group1['id'],
 3363                 'role_id': self.role1['id']})
 3364         self.put(grant_url)
 3365         self.head('/auth/tokens',
 3366                   headers={'X-Subject-Token': token},
 3367                   expected_status=http_client.OK)
 3368 
 3369     def test_group_membership_changes_revokes_token(self):
 3370         """Test add/removal to/from group revokes token.
 3371 
 3372         Test Plan:
 3373 
 3374         - Get a token for user1, scoped to ProjectA
 3375         - Get a token for user2, scoped to ProjectA
 3376         - Remove user1 from group1
 3377         - Check token for user1 is no longer valid
 3378         - Check token for user2 is still valid, even though
 3379           user2 is also part of group1
 3380         - Add user2 to group2
 3381         - Check token for user2 is now no longer valid
 3382 
 3383         """
 3384         auth_data = self.build_authentication_request(
 3385             user_id=self.user1['id'],
 3386             password=self.user1['password'],
 3387             project_id=self.projectA['id'])
 3388         token1 = self.get_requested_token(auth_data)
 3389         auth_data = self.build_authentication_request(
 3390             user_id=self.user2['id'],
 3391             password=self.user2['password'],
 3392             project_id=self.projectA['id'])
 3393         token2 = self.get_requested_token(auth_data)
 3394         # Confirm tokens are valid
 3395         self.head('/auth/tokens',
 3396                   headers={'X-Subject-Token': token1},
 3397                   expected_status=http_client.OK)
 3398         self.head('/auth/tokens',
 3399                   headers={'X-Subject-Token': token2},
 3400                   expected_status=http_client.OK)
 3401         # Remove user1 from group1, which should invalidate
 3402         # the token
 3403         self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
 3404             'group_id': self.group1['id'],
 3405             'user_id': self.user1['id']})
 3406         self.head('/auth/tokens',
 3407                   headers={'X-Subject-Token': token1},
 3408                   expected_status=http_client.NOT_FOUND)
 3409         # But user2's token should still be valid
 3410         self.head('/auth/tokens',
 3411                   headers={'X-Subject-Token': token2},
 3412                   expected_status=http_client.OK)
 3413         # Adding user2 to a group should not invalidate token
 3414         self.put('/groups/%(group_id)s/users/%(user_id)s' % {
 3415             'group_id': self.group2['id'],
 3416             'user_id': self.user2['id']})
 3417         self.head('/auth/tokens',
 3418                   headers={'X-Subject-Token': token2},
 3419                   expected_status=http_client.OK)
 3420 
 3421     def test_removing_role_assignment_does_not_affect_other_users(self):
 3422         """Revoking a role from one user should not affect other users."""
 3423         time = datetime.datetime.utcnow()
 3424         with freezegun.freeze_time(time) as frozen_datetime:
 3425             # This group grant is not needed for the test
 3426             self.delete(
 3427                 '/projects/%(p_id)s/groups/%(g_id)s/roles/%(r_id)s' %
 3428                 {'p_id': self.projectA['id'],
 3429                  'g_id': self.group1['id'],
 3430                  'r_id': self.role1['id']})
 3431 
 3432             # NOTE(lbragstad): Here we advance the clock one second to pass
 3433             # into the threshold of a new second because we just persisted a
 3434             # revocation event for removing a role from a group on a project.
 3435             # One thing to note about that revocation event is that it has no
 3436             # context about the group, so even though user3 might not be in
 3437             # group1, they could have their token revoked because the
 3438             # revocation event is very general.
 3439             frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
 3440 
 3441             user1_token = self.get_requested_token(
 3442                 self.build_authentication_request(
 3443                     user_id=self.user1['id'],
 3444                     password=self.user1['password'],
 3445                     project_id=self.projectA['id']))
 3446 
 3447             user3_token = self.get_requested_token(
 3448                 self.build_authentication_request(
 3449                     user_id=self.user3['id'],
 3450                     password=self.user3['password'],
 3451                     project_id=self.projectA['id']))
 3452 
 3453             # delete relationships between user1 and projectA from setUp
 3454             self.delete(
 3455                 '/projects/%(p_id)s/users/%(u_id)s/roles/%(r_id)s' % {
 3456                     'p_id': self.projectA['id'],
 3457                     'u_id': self.user1['id'],
 3458                     'r_id': self.role1['id']})
 3459             # authorization for the first user should now fail
 3460             self.head('/auth/tokens',
 3461                       headers={'X-Subject-Token': user1_token},
 3462                       expected_status=http_client.NOT_FOUND)
 3463             self.v3_create_token(
 3464                 self.build_authentication_request(
 3465                     user_id=self.user1['id'],
 3466                     password=self.user1['password'],
 3467                     project_id=self.projectA['id']),
 3468                 expected_status=http_client.UNAUTHORIZED)
 3469 
 3470             # authorization for the second user should still succeed
 3471             self.head('/auth/tokens',
 3472                       headers={'X-Subject-Token': user3_token},
 3473                       expected_status=http_client.OK)
 3474             self.v3_create_token(
 3475                 self.build_authentication_request(
 3476                     user_id=self.user3['id'],
 3477                     password=self.user3['password'],
 3478                     project_id=self.projectA['id']))
 3479 
 3480     def test_deleting_project_deletes_grants(self):
 3481         # This is to make it a little bit more pretty with PEP8
 3482         role_path = ('/projects/%(project_id)s/users/%(user_id)s/'
 3483                      'roles/%(role_id)s')
 3484         role_path = role_path % {'user_id': self.user['id'],
 3485                                  'project_id': self.projectA['id'],
 3486                                  'role_id': self.role['id']}
 3487 
 3488         # grant the user a role on the project
 3489         self.put(role_path)
 3490 
 3491         # delete the project, which should remove the roles
 3492         self.delete(
 3493             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3494 
 3495         # Make sure that we get a 404 Not Found when heading that role.
 3496         self.head(role_path, expected_status=http_client.NOT_FOUND)
 3497 
 3498     def test_revoke_token_from_token(self):
 3499         # Test that a scoped token can be requested from an unscoped token,
 3500         # the scoped token can be revoked, and the unscoped token remains
 3501         # valid.
 3502 
 3503         unscoped_token = self.get_requested_token(
 3504             self.build_authentication_request(
 3505                 user_id=self.user1['id'],
 3506                 password=self.user1['password']))
 3507 
 3508         # Get a project-scoped token from the unscoped token
 3509         project_scoped_token = self.get_requested_token(
 3510             self.build_authentication_request(
 3511                 token=unscoped_token,
 3512                 project_id=self.projectA['id']))
 3513 
 3514         # Get a domain-scoped token from the unscoped token
 3515         domain_scoped_token = self.get_requested_token(
 3516             self.build_authentication_request(
 3517                 token=unscoped_token,
 3518                 domain_id=self.domainA['id']))
 3519 
 3520         # revoke the project-scoped token.
 3521         self.delete('/auth/tokens',
 3522                     headers={'X-Subject-Token': project_scoped_token})
 3523 
 3524         # The project-scoped token is invalidated.
 3525         self.head('/auth/tokens',
 3526                   headers={'X-Subject-Token': project_scoped_token},
 3527                   expected_status=http_client.NOT_FOUND)
 3528 
 3529         # The unscoped token should still be valid.
 3530         self.head('/auth/tokens',
 3531                   headers={'X-Subject-Token': unscoped_token},
 3532                   expected_status=http_client.OK)
 3533 
 3534         # The domain-scoped token should still be valid.
 3535         self.head('/auth/tokens',
 3536                   headers={'X-Subject-Token': domain_scoped_token},
 3537                   expected_status=http_client.OK)
 3538 
 3539         # revoke the domain-scoped token.
 3540         self.delete('/auth/tokens',
 3541                     headers={'X-Subject-Token': domain_scoped_token})
 3542 
 3543         # The domain-scoped token is invalid.
 3544         self.head('/auth/tokens',
 3545                   headers={'X-Subject-Token': domain_scoped_token},
 3546                   expected_status=http_client.NOT_FOUND)
 3547 
 3548         # The unscoped token should still be valid.
 3549         self.head('/auth/tokens',
 3550                   headers={'X-Subject-Token': unscoped_token},
 3551                   expected_status=http_client.OK)
 3552 
 3553 
 3554 class TestTokenRevokeApi(TestTokenRevokeById):
 3555     """Test token revocation on the v3 Identity API."""
 3556 
 3557     def config_overrides(self):
 3558         super(TestTokenRevokeApi, self).config_overrides()
 3559         self.config_fixture.config(
 3560             group='token',
 3561             provider='fernet',
 3562             revoke_by_id=False)
 3563         self.useFixture(
 3564             ksfixtures.KeyRepository(
 3565                 self.config_fixture,
 3566                 'fernet_tokens',
 3567                 CONF.fernet_tokens.max_active_keys
 3568             )
 3569         )
 3570 
 3571     def assertValidDeletedProjectResponse(self, events_response, project_id):
 3572         events = events_response['events']
 3573         self.assertEqual(1, len(events))
 3574         self.assertEqual(project_id, events[0]['project_id'])
 3575         self.assertIsNotNone(events[0]['issued_before'])
 3576         self.assertIsNotNone(events_response['links'])
 3577         del (events_response['events'][0]['issued_before'])
 3578         del (events_response['events'][0]['revoked_at'])
 3579         del (events_response['links'])
 3580         expected_response = {'events': [{'project_id': project_id}]}
 3581         self.assertEqual(expected_response, events_response)
 3582 
 3583     def assertValidRevokedTokenResponse(self, events_response, **kwargs):
 3584         events = events_response['events']
 3585         self.assertEqual(1, len(events))
 3586         for k, v in kwargs.items():
 3587             self.assertEqual(v, events[0].get(k))
 3588         self.assertIsNotNone(events[0]['issued_before'])
 3589         self.assertIsNotNone(events_response['links'])
 3590         del (events_response['events'][0]['issued_before'])
 3591         del (events_response['events'][0]['revoked_at'])
 3592         del (events_response['links'])
 3593 
 3594         expected_response = {'events': [kwargs]}
 3595         self.assertEqual(expected_response, events_response)
 3596 
 3597     def test_revoke_token(self):
 3598         scoped_token = self.get_scoped_token()
 3599         headers = {'X-Subject-Token': scoped_token}
 3600         response = self.get('/auth/tokens', headers=headers).json_body['token']
 3601 
 3602         self.delete('/auth/tokens', headers=headers)
 3603         self.head('/auth/tokens', headers=headers,
 3604                   expected_status=http_client.NOT_FOUND)
 3605         events_response = self.get('/OS-REVOKE/events').json_body
 3606         self.assertValidRevokedTokenResponse(events_response,
 3607                                              audit_id=response['audit_ids'][0])
 3608 
 3609     def test_get_revoke_by_id_false_returns_gone(self):
 3610         self.get('/auth/tokens/OS-PKI/revoked',
 3611                  expected_status=http_client.GONE)
 3612 
 3613     def test_head_revoke_by_id_false_returns_gone(self):
 3614         self.head('/auth/tokens/OS-PKI/revoked',
 3615                   expected_status=http_client.GONE)
 3616 
 3617     def test_revoke_by_id_true_returns_forbidden(self):
 3618         self.config_fixture.config(
 3619             group='token',
 3620             revoke_by_id=True)
 3621         self.get(
 3622             '/auth/tokens/OS-PKI/revoked',
 3623             expected_status=http_client.FORBIDDEN
 3624         )
 3625         self.head(
 3626             '/auth/tokens/OS-PKI/revoked',
 3627             expected_status=http_client.FORBIDDEN
 3628         )
 3629 
 3630     def test_list_delete_project_shows_in_event_list(self):
 3631         self.role_data_fixtures()
 3632         events = self.get('/OS-REVOKE/events').json_body['events']
 3633         self.assertEqual([], events)
 3634         self.delete(
 3635             '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
 3636         events_response = self.get('/OS-REVOKE/events').json_body
 3637 
 3638         self.assertValidDeletedProjectResponse(events_response,
 3639                                                self.projectA['id'])
 3640 
 3641     def assertEventDataInList(self, events, **kwargs):
 3642         found = False
 3643         for e in events:
 3644             for key, value in kwargs.items():
 3645                 try:
 3646                     if e[key] != value:
 3647                         break
 3648                 except KeyError:
 3649                     # Break the loop and present a nice error instead of
 3650                     # KeyError
 3651                     break
 3652             else:
 3653                 # If the value of the event[key] matches the value of the kwarg
 3654                 # for each item in kwargs, the event was fully matched and
 3655                 # the assertTrue below should succeed.
 3656                 found = True
 3657         self.assertTrue(found,
 3658                         'event with correct values not in list, expected to '
 3659                         'find event with key-value pairs. Expected: '
 3660                         '"%(expected)s" Events: "%(events)s"' %
 3661                         {'expected': ','.join(
 3662                             ["'%s=%s'" % (k, v) for k, v in kwargs.items()]),
 3663                          'events': events})
 3664 
 3665     def test_list_delete_token_shows_in_event_list(self):
 3666         self.role_data_fixtures()
 3667         events = self.get('/OS-REVOKE/events').json_body['events']
 3668         self.assertEqual([], events)
 3669 
 3670         scoped_token = self.get_scoped_token()
 3671         headers = {'X-Subject-Token': scoped_token}
 3672         auth_req = self.build_authentication_request(token=scoped_token)
 3673         response = self.v3_create_token(auth_req)
 3674         token2 = response.json_body['token']
 3675         headers2 = {'X-Subject-Token': response.headers['X-Subject-Token']}
 3676 
 3677         response = self.v3_create_token(auth_req)
 3678         response.json_body['token']
 3679         headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']}
 3680 
 3681         self.head('/auth/tokens', headers=headers,
 3682                   expected_status=http_client.OK)
 3683         self.head('/auth/tokens', headers=headers2,
 3684                   expected_status=http_client.OK)
 3685         self.head('/auth/tokens', headers=headers3,
 3686                   expected_status=http_client.OK)
 3687 
 3688         self.delete('/auth/tokens', headers=headers)
 3689         # NOTE(ayoung): not deleting token3, as it should be deleted
 3690         # by previous
 3691         events_response = self.get('/OS-REVOKE/events').json_body
 3692         events = events_response['events']
 3693         self.assertEqual(1, len(events))
 3694         self.assertEventDataInList(
 3695             events,
 3696             audit_id=token2['audit_ids'][1])
 3697         self.head('/auth/tokens', headers=headers,
 3698                   expected_status=http_client.NOT_FOUND)
 3699         self.head('/auth/tokens', headers=headers2,
 3700                   expected_status=http_client.OK)
 3701         self.head('/auth/tokens', headers=headers3,
 3702                   expected_status=http_client.OK)
 3703 
 3704     def test_list_with_filter(self):
 3705 
 3706         self.role_data_fixtures()
 3707         events = self.get('/OS-REVOKE/events').json_body['events']
 3708         self.assertEqual(0, len(events))
 3709 
 3710         scoped_token = self.get_scoped_token()
 3711         headers = {'X-Subject-Token': scoped_token}
 3712         auth = self.build_authentication_request(token=scoped_token)
 3713         headers2 = {'X-Subject-Token': self.get_requested_token(auth)}
 3714         self.delete('/auth/tokens', headers=headers)
 3715         self.delete('/auth/tokens', headers=headers2)
 3716 
 3717         events = self.get('/OS-REVOKE/events').json_body['events']
 3718 
 3719         self.assertEqual(2, len(events))
 3720         future = utils.isotime(timeutils.utcnow() +
 3721                                datetime.timedelta(seconds=1000))
 3722 
 3723         events = self.get('/OS-REVOKE/events?since=%s' % (future)
 3724                           ).json_body['events']
 3725         self.assertEqual(0, len(events))
 3726 
 3727 
 3728 class TestAuthExternalDisabled(test_v3.RestfulTestCase):
 3729     def config_overrides(self):
 3730         super(TestAuthExternalDisabled, self).config_overrides()
 3731         self.config_fixture.config(
 3732             group='auth',
 3733             methods=['password', 'token'])
 3734 
 3735     def test_remote_user_disabled(self):
 3736         app = self.loadapp()
 3737         remote_user = '%s@%s' % (self.user['name'], self.domain['name'])
 3738         with app.test_client() as c:
 3739             c.environ_base.update(self.build_external_auth_environ(
 3740                 remote_user))
 3741             auth_data = self.build_authentication_request()
 3742             c.post('/v3/auth/tokens', json=auth_data,
 3743                    expected_status_code=http_client.UNAUTHORIZED)
 3744 
 3745 # FIXME(morgan): This test case must be re-worked to function under flask. It
 3746 # has been commented out until it is re-worked ensuring no issues when webob
 3747 # classes are removed.
 3748 # https://bugs.launchpad.net/keystone/+bug/1793756
 3749 # class AuthExternalDomainBehavior(object):
 3750 #     content_type = 'json'
 3751 #
 3752 #     def test_remote_user_with_realm(self):
 3753 #         api = auth.controllers.Auth()
 3754 #         remote_user = self.user['name']
 3755 #         remote_domain = self.domain['name']
 3756 #         request, auth_info, auth_context = self.build_external_auth_request(
 3757 #             remote_user, remote_domain=remote_domain, kerberos=self.kerberos)
 3758 #
 3759 #         api.authenticate(request, auth_info, auth_context)
 3760 #         self.assertEqual(self.user['id'], auth_context['user_id'])
 3761 #
 3762 #         # Now test to make sure the user name can, itself, contain the
 3763 #         # '@' character.
 3764 #         user = {'name': 'myname@mydivision'}
 3765 #         PROVIDERS.identity_api.update_user(self.user['id'], user)
 3766 #         remote_user = user['name']
 3767 #         request, auth_info, auth_context = self.build_external_auth_request(
 3768 #             remote_user, remote_domain=remote_domain, kerberos=self.kerberos)
 3769 #
 3770 #         api.authenticate(request, auth_info, auth_context)
 3771 #         self.assertEqual(self.user['id'], auth_context['user_id'])
 3772 #
 3773 #
 3774 # FIXME(morgan): This test case must be re-worked to function under flask. It
 3775 # has been commented out until it is re-worked ensuring no issues when webob
 3776 # classes are removed.
 3777 # https://bugs.launchpad.net/keystone/+bug/1793756
 3778 # class TestAuthExternalDefaultDomain(object):
 3779 #     content_type = 'json'
 3780 #
 3781 #     def config_overrides(self):
 3782 #         super(TestAuthExternalDefaultDomain, self).config_overrides()
 3783 #         self.kerberos = False
 3784 #         self.auth_plugin_config_override(external='DefaultDomain')
 3785 #
 3786 #     def test_remote_user_with_default_domain(self):
 3787 #         api = auth.controllers.Auth()
 3788 #         remote_user = self.default_domain_user['name']
 3789 #         request, auth_info, auth_context = self.build_external_auth_request(
 3790 #             remote_user, kerberos=self.kerberos)
 3791 #
 3792 #         api.authenticate(request, auth_info, auth_context)
 3793 #         self.assertEqual(self.default_domain_user['id'],
 3794 #                          auth_context['user_id'])
 3795 #
 3796 #         # Now test to make sure the user name can, itself, contain the
 3797 #         # '@' character.
 3798 #         user = {'name': 'myname@mydivision'}
 3799 #         PROVIDERS.identity_api.update_user(
 3800 #             self.default_domain_user['id'], user
 3801 #         )
 3802 #         remote_user = user['name']
 3803 #         request, auth_info, auth_context = self.build_external_auth_request(
 3804 #             remote_user, kerberos=self.kerberos)
 3805 #
 3806 #         api.authenticate(request, auth_info, auth_context)
 3807 #         self.assertEqual(self.default_domain_user['id'],
 3808 #                          auth_context['user_id'])
 3809 #
 3810 
 3811 
 3812 class TestAuthJSONExternal(test_v3.RestfulTestCase):
 3813     content_type = 'json'
 3814 
 3815     def auth_plugin_config_override(self, methods=None, **method_classes):
 3816         self.config_fixture.config(group='auth', methods=[])
 3817 
 3818     def test_remote_user_no_method(self):
 3819         app = self.loadapp()
 3820         with app.test_client() as c:
 3821             c.environ_base.update(self.build_external_auth_environ(
 3822                 self.default_domain_user['name']))
 3823             auth_data = self.build_authentication_request()
 3824             c.post('/v3/auth/tokens', json=auth_data,
 3825                    expected_status_code=http_client.UNAUTHORIZED)
 3826 
 3827 
 3828 class TrustAPIBehavior(test_v3.RestfulTestCase):
 3829     """Redelegation valid and secure.
 3830 
 3831     Redelegation is a hierarchical structure of trusts between initial trustor
 3832     and a group of users allowed to impersonate trustor and act in his name.
 3833     Hierarchy is created in a process of trusting already trusted permissions
 3834     and organized as an adjacency list using 'redelegated_trust_id' field.
 3835     Redelegation is valid if each subsequent trust in a chain passes 'not more'
 3836     permissions than being redelegated.
 3837 
 3838     Trust constraints are:
 3839      * roles - set of roles trusted by trustor
 3840      * expiration_time
 3841      * allow_redelegation - a flag
 3842      * redelegation_count - decreasing value restricting length of trust chain
 3843      * remaining_uses - DISALLOWED when allow_redelegation == True
 3844 
 3845     Trust becomes invalid in case:
 3846      * trust roles were revoked from trustor
 3847      * one of the users in the delegation chain was disabled or deleted
 3848      * expiration time passed
 3849      * one of the parent trusts has become invalid
 3850      * one of the parent trusts was deleted
 3851 
 3852     """
 3853 
 3854     def config_overrides(self):
 3855         super(TrustAPIBehavior, self).config_overrides()
 3856         self.config_fixture.config(
 3857             group='trust',
 3858             allow_redelegation=True,
 3859             max_redelegation_count=10
 3860         )
 3861 
 3862     def setUp(self):
 3863         super(TrustAPIBehavior, self).setUp()
 3864         # Create a trustee to delegate stuff to
 3865         self.trustee_user = unit.create_user(PROVIDERS.identity_api,
 3866                                              domain_id=self.domain_id)
 3867 
 3868         # trustor->trustee
 3869         self.redelegated_trust_ref = unit.new_trust_ref(
 3870             trustor_user_id=self.user_id,
 3871             trustee_user_id=self.trustee_user['id'],
 3872             project_id=self.project_id,
 3873             impersonation=True,
 3874             expires=dict(minutes=1),
 3875             role_ids=[self.role_id],
 3876             allow_redelegation=True)
 3877 
 3878         # trustor->trustee (no redelegation)
 3879         self.chained_trust_ref = unit.new_trust_ref(
 3880             trustor_user_id=self.user_id,
 3881             trustee_user_id=self.trustee_user['id'],
 3882             project_id=self.project_id,
 3883             impersonation=True,
 3884             role_ids=[self.role_id],
 3885             allow_redelegation=True)
 3886 
 3887     def _get_trust_token(self, trust):
 3888         trust_id = trust['id']
 3889         auth_data = self.build_authentication_request(
 3890             user_id=self.trustee_user['id'],
 3891             password=self.trustee_user['password'],
 3892             trust_id=trust_id)
 3893         trust_token = self.get_requested_token(auth_data)
 3894         return trust_token
 3895 
 3896     def test_depleted_redelegation_count_error(self):
 3897         self.redelegated_trust_ref['redelegation_count'] = 0
 3898         r = self.post('/OS-TRUST/trusts',
 3899                       body={'trust': self.redelegated_trust_ref})
 3900         trust = self.assertValidTrustResponse(r)
 3901         trust_token = self._get_trust_token(trust)
 3902 
 3903         # Attempt to create a redelegated trust.
 3904         self.post('/OS-TRUST/trusts',
 3905                   body={'trust': self.chained_trust_ref},
 3906                   token=trust_token,
 3907                   expected_status=http_client.FORBIDDEN)
 3908 
 3909     def test_modified_redelegation_count_error(self):
 3910         r = self.post('/OS-TRUST/trusts',
 3911                       body={'trust': self.redelegated_trust_ref})
 3912         trust = self.assertValidTrustResponse(r)
 3913         trust_token = self._get_trust_token(trust)
 3914 
 3915         # Attempt to create a redelegated trust with incorrect
 3916         # redelegation_count.
 3917         correct = trust['redelegation_count'] - 1
 3918         incorrect = correct - 1
 3919         self.chained_trust_ref['redelegation_count'] = incorrect
 3920         self.post('/OS-TRUST/trusts',
 3921                   body={'trust': self.chained_trust_ref},
 3922                   token=trust_token,
 3923                   expected_status=http_client.FORBIDDEN)
 3924 
 3925     def test_max_redelegation_count_constraint(self):
 3926         incorrect = CONF.trust.max_redelegation_count + 1
 3927         self.redelegated_trust_ref['redelegation_count'] = incorrect
 3928         self.post('/OS-TRUST/trusts',
 3929                   body={'trust': self.redelegated_trust_ref},
 3930                   expected_status=http_client.FORBIDDEN)
 3931 
 3932     def test_redelegation_expiry(self):
 3933         r = self.post('/OS-TRUST/trusts',
 3934                       body={'trust': self.redelegated_trust_ref})
 3935         trust = self.assertValidTrustResponse(r)
 3936         trust_token = self._get_trust_token(trust)
 3937 
 3938         # Attempt to create a redelegated trust supposed to last longer
 3939         # than the parent trust: let's give it 10 minutes (>1 minute).
 3940         too_long_live_chained_trust_ref = unit.new_trust_ref(
 3941             trustor_user_id=self.user_id,
 3942             trustee_user_id=self.trustee_user['id'],
 3943             project_id=self.project_id,
 3944             impersonation=True,
 3945             expires=dict(minutes=10),
 3946             role_ids=[self.role_id])
 3947         self.post('/OS-TRUST/trusts',
 3948                   body={'trust': too_long_live_chained_trust_ref},
 3949                   token=trust_token,
 3950                   expected_status=http_client.FORBIDDEN)
 3951 
 3952     def test_redelegation_remaining_uses(self):
 3953         r = self.post('/OS-TRUST/trusts',
 3954                       body={'trust': self.redelegated_trust_ref})
 3955         trust = self.assertValidTrustResponse(r)
 3956         trust_token = self._get_trust_token(trust)
 3957 
 3958         # Attempt to create a redelegated trust with remaining_uses defined.
 3959         # It must fail according to specification: remaining_uses must be
 3960         # omitted for trust redelegation. Any number here.
 3961         self.chained_trust_ref['remaining_uses'] = 5
 3962         self.post('/OS-TRUST/trusts',
 3963                   body={'trust': self.chained_trust_ref},
 3964                   token=trust_token,
 3965                   expected_status=http_client.BAD_REQUEST)
 3966 
 3967     def test_roles_subset(self):
 3968         # Build second role
 3969         role = unit.new_role_ref()
 3970         PROVIDERS.role_api.create_role(role['id'], role)
 3971         # assign a new role to the user
 3972         PROVIDERS.assignment_api.create_grant(
 3973             role_id=role['id'], user_id=self.user_id,
 3974             project_id=self.project_id
 3975         )
 3976 
 3977         # Create first trust with extended set of roles
 3978         ref = self.redelegated_trust_ref
 3979         ref['expires_at'] = datetime.datetime.utcnow().replace(
 3980             year=2032).strftime(unit.TIME_FORMAT)
 3981         ref['roles'].append({'id': role['id']})
 3982         r = self.post('/OS-TRUST/trusts',
 3983                       body={'trust': ref})
 3984         trust = self.assertValidTrustResponse(r)
 3985         # Trust created with exact set of roles (checked by role id)
 3986         role_id_set = set(r['id'] for r in ref['roles'])
 3987         trust_role_id_set = set(r['id'] for r in trust['roles'])
 3988         self.assertEqual(role_id_set, trust_role_id_set)
 3989 
 3990         trust_token = self._get_trust_token(trust)
 3991 
 3992         # Chain second trust with roles subset
 3993         self.chained_trust_ref['expires_at'] = (
 3994             datetime.datetime.utcnow().replace(year=2028).strftime(
 3995                 unit.TIME_FORMAT))
 3996         r = self.post('/OS-TRUST/trusts',
 3997                       body={'trust': self.chained_trust_ref},
 3998                       token=trust_token)
 3999         trust2 = self.assertValidTrustResponse(r)
 4000         # First trust contains roles superset
 4001         # Second trust contains roles subset
 4002         role_id_set1 = set(r['id'] for r in trust['roles'])
 4003         role_id_set2 = set(r['id'] for r in trust2['roles'])
 4004         self.assertThat(role_id_set1, matchers.GreaterThan(role_id_set2))
 4005 
 4006     def test_trust_with_implied_roles(self):
 4007         # Create some roles
 4008         role1 = unit.new_role_ref()
 4009         PROVIDERS.role_api.create_role(role1['id'], role1)
 4010         role2 = unit.new_role_ref()
 4011         PROVIDERS.role_api.create_role(role2['id'], role2)
 4012 
 4013         # Implication
 4014         PROVIDERS.role_api.create_implied_role(role1['id'], role2['id'])
 4015 
 4016         # Assign new roles to the user (with role2 implied)
 4017         PROVIDERS.assignment_api.create_grant(
 4018             role_id=role1['id'], user_id=self.user_id,
 4019             project_id=self.project_id
 4020         )
 4021 
 4022         # Create trust
 4023         ref = self.redelegated_trust_ref
 4024         ref['roles'] = [{'id': role1['id']}, {'id': role2['id']}]
 4025         resp = self.post('/OS-TRUST/trusts',
 4026                          body={'trust': ref})
 4027         trust = self.assertValidTrustResponse(resp)
 4028 
 4029         # Trust created with exact set of roles (checked by role id)
 4030         role_ids = [r['id'] for r in ref['roles']]
 4031         trust_role_ids = [r['id'] for r in trust['roles']]
 4032         # Compare requested roles with roles in response
 4033         self.assertEqual(role_ids, trust_role_ids)
 4034 
 4035         # Get a trust-scoped token
 4036         auth_data = self.build_authentication_request(
 4037             user_id=self.trustee_user['id'],
 4038             password=self.trustee_user['password'],
 4039             trust_id=trust['id']
 4040         )
 4041         resp = self.post('/auth/tokens', body=auth_data)
 4042         trust_token_role_ids = [r['id'] for r in resp.json['token']['roles']]
 4043         # Compare requested roles with roles given in token data
 4044         self.assertEqual(sorted(role_ids), sorted(trust_token_role_ids))
 4045 
 4046     def test_redelegate_with_role_by_name(self):
 4047         # For role by name testing
 4048         ref = unit.new_trust_ref(
 4049             trustor_user_id=self.user_id,
 4050             trustee_user_id=self.trustee_user['id'],
 4051             project_id=self.project_id,
 4052             impersonation=True,
 4053             expires=dict(minutes=1),
 4054             role_names=[self.role['name']],
 4055             allow_redelegation=True)
 4056         ref['expires_at'] = datetime.datetime.utcnow().replace(
 4057             year=2032).strftime(unit.TIME_FORMAT)
 4058         r = self.post('/OS-TRUST/trusts',
 4059                       body={'trust': ref})
 4060         trust = self.assertValidTrustResponse(r)
 4061         # Ensure we can get a token with this trust
 4062         trust_token = self._get_trust_token(trust)
 4063         # Chain second trust with roles subset
 4064         ref = unit.new_trust_ref(
 4065             trustor_user_id=self.user_id,
 4066             trustee_user_id=self.trustee_user['id'],
 4067             project_id=self.project_id,
 4068             impersonation=True,
 4069             role_names=[self.role['name']],
 4070             allow_redelegation=True)
 4071         ref['expires_at'] = datetime.datetime.utcnow().replace(
 4072             year=2028).strftime(unit.TIME_FORMAT)
 4073         r = self.post('/OS-TRUST/trusts',
 4074                       body={'trust': ref},
 4075                       token=trust_token)
 4076         trust = self.assertValidTrustResponse(r)
 4077         # Ensure we can get a token with this trust
 4078         self._get_trust_token(trust)
 4079 
 4080     def test_redelegate_new_role_fails(self):
 4081         r = self.post('/OS-TRUST/trusts',
 4082                       body={'trust': self.redelegated_trust_ref})
 4083         trust = self.assertValidTrustResponse(r)
 4084         trust_token = self._get_trust_token(trust)
 4085 
 4086         # Build second trust with a role not in parent's roles
 4087         role = unit.new_role_ref()
 4088         PROVIDERS.role_api.create_role(role['id'], role)
 4089         # assign a new role to the user
 4090         PROVIDERS.assignment_api.create_grant(
 4091             role_id=role['id'], user_id=self.user_id,
 4092             project_id=self.project_id
 4093         )
 4094 
 4095         # Try to chain a trust with the role not from parent trust
 4096         self.chained_trust_ref['roles'] = [{'id': role['id']}]
 4097 
 4098         # Bypass policy enforcement
 4099         with mock.patch.object(policy, 'enforce', return_value=True):
 4100             self.post('/OS-TRUST/trusts',
 4101                       body={'trust': self.chained_trust_ref},
 4102                       token=trust_token,
 4103                       expected_status=http_client.FORBIDDEN)
 4104 
 4105     def test_redelegation_terminator(self):
 4106         self.redelegated_trust_ref['expires_at'] = (
 4107             datetime.datetime.utcnow().replace(year=2032).strftime(
 4108                 unit.TIME_FORMAT))
 4109         r = self.post('/OS-TRUST/trusts',
 4110                       body={'trust': self.redelegated_trust_ref})
 4111         trust = self.assertValidTrustResponse(r)
 4112         trust_token = self._get_trust_token(trust)
 4113 
 4114         # Build second trust - the terminator
 4115         self.chained_trust_ref['expires_at'] = (
 4116             datetime.datetime.utcnow().replace(year=2028).strftime(
 4117                 unit.TIME_FORMAT))
 4118         ref = dict(self.chained_trust_ref,
 4119                    redelegation_count=1,
 4120                    allow_redelegation=False)
 4121 
 4122         r = self.post('/OS-TRUST/trusts',
 4123                       body={'trust': ref},
 4124                       token=trust_token)
 4125 
 4126         trust = self.assertValidTrustResponse(r)
 4127         # Check that allow_redelegation == False caused redelegation_count
 4128         # to be set to 0, while allow_redelegation is removed
 4129         self.assertNotIn('allow_redelegation', trust)
 4130         self.assertEqual(0, trust['redelegation_count'])
 4131         trust_token = self._get_trust_token(trust)
 4132 
 4133         # Build third trust, same as second
 4134         self.post('/OS-TRUST/trusts',
 4135                   body={'trust': ref},
 4136                   token=trust_token,
 4137                   expected_status=http_client.FORBIDDEN)
 4138 
 4139     def test_redelegation_without_impersonation(self):
 4140         # Update trust to not allow impersonation
 4141         self.redelegated_trust_ref['impersonation'] = False
 4142 
 4143         # Create trust
 4144         resp = self.post('/OS-TRUST/trusts',
 4145                          body={'trust': self.redelegated_trust_ref},
 4146                          expected_status=http_client.CREATED)
 4147         trust = self.assertValidTrustResponse(resp)
 4148 
 4149         # Get trusted token without impersonation
 4150         auth_data = self.build_authentication_request(
 4151             user_id=self.trustee_user['id'],
 4152             password=self.trustee_user['password'],
 4153             trust_id=trust['id'])
 4154         trust_token = self.get_requested_token(auth_data)
 4155 
 4156         # Create second user for redelegation
 4157         trustee_user_2 = unit.create_user(PROVIDERS.identity_api,
 4158                                           domain_id=self.domain_id)
 4159 
 4160         # Trust for redelegation
 4161         trust_ref_2 = unit.new_trust_ref(
 4162             trustor_user_id=self.trustee_user['id'],
 4163             trustee_user_id=trustee_user_2['id'],
 4164             project_id=self.project_id,
 4165             impersonation=False,
 4166             expires=dict(minutes=1),
 4167             role_ids=[self.role_id],
 4168             allow_redelegation=False)
 4169 
 4170         # Creating a second trust should not be allowed since trustor does not
 4171         # have the role to delegate thus returning 404 NOT FOUND.
 4172         resp = self.post('/OS-TRUST/trusts',
 4173                          body={'trust': trust_ref_2},
 4174                          token=trust_token,
 4175                          expected_status=http_client.NOT_FOUND)
 4176 
 4177     def test_create_unscoped_trust(self):
 4178         ref = unit.new_trust_ref(
 4179             trustor_user_id=self.user_id,
 4180             trustee_user_id=self.trustee_user['id'])
 4181         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4182         self.assertValidTrustResponse(r, ref)
 4183 
 4184     def test_create_trust_no_roles(self):
 4185         ref = unit.new_trust_ref(
 4186             trustor_user_id=self.user_id,
 4187             trustee_user_id=self.trustee_user['id'],
 4188             project_id=self.project_id)
 4189         self.post('/OS-TRUST/trusts', body={'trust': ref},
 4190                   expected_status=http_client.FORBIDDEN)
 4191 
 4192     def _initialize_test_consume_trust(self, count):
 4193         # Make sure remaining_uses is decremented as we consume the trust
 4194         ref = unit.new_trust_ref(
 4195             trustor_user_id=self.user_id,
 4196             trustee_user_id=self.trustee_user['id'],
 4197             project_id=self.project_id,
 4198             remaining_uses=count,
 4199             role_ids=[self.role_id])
 4200         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4201         # make sure the trust exists
 4202         trust = self.assertValidTrustResponse(r, ref)
 4203         r = self.get(
 4204             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4205         # get a token for the trustee
 4206         auth_data = self.build_authentication_request(
 4207             user_id=self.trustee_user['id'],
 4208             password=self.trustee_user['password'])
 4209         r = self.v3_create_token(auth_data)
 4210         token = r.headers.get('X-Subject-Token')
 4211         # get a trust token, consume one use
 4212         auth_data = self.build_authentication_request(
 4213             token=token,
 4214             trust_id=trust['id'])
 4215         r = self.v3_create_token(auth_data)
 4216         return trust
 4217 
 4218     def test_authenticate_without_trust_dict_returns_bad_request(self):
 4219         # Authenticate for a token to use in the request
 4220         token = self.v3_create_token(
 4221             self.build_authentication_request(
 4222                 user_id=self.trustee_user['id'],
 4223                 password=self.trustee_user['password']
 4224             )
 4225         ).headers.get('X-Subject-Token')
 4226 
 4227         auth_data = {
 4228             'auth': {
 4229                 'identity': {
 4230                     'methods': ['token'],
 4231                     'token': {'id': token}
 4232                 },
 4233                 # We don't need a trust to execute this test, the
 4234                 # OS-TRUST:trust key of the request body just has to be a
 4235                 # string instead of a dictionary in order to throw a 500 when
 4236                 # it should a 400 Bad Request.
 4237                 'scope': {'OS-TRUST:trust': ''}
 4238             }
 4239         }
 4240         self.admin_request(
 4241             method='POST', path='/v3/auth/tokens', body=auth_data,
 4242             expected_status=http_client.BAD_REQUEST
 4243         )
 4244 
 4245     def test_consume_trust_once(self):
 4246         trust = self._initialize_test_consume_trust(2)
 4247         # check decremented value
 4248         r = self.get(
 4249             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4250         trust = r.result.get('trust')
 4251         self.assertIsNotNone(trust)
 4252         self.assertEqual(1, trust['remaining_uses'])
 4253         self.assertEqual(self.role['name'], trust['roles'][0]['name'])
 4254         self.assertEqual(self.role['id'], trust['roles'][0]['id'])
 4255 
 4256     def test_create_one_time_use_trust(self):
 4257         trust = self._initialize_test_consume_trust(1)
 4258         # No more uses, the trust is made unavailable
 4259         self.get(
 4260             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
 4261             expected_status=http_client.NOT_FOUND)
 4262         # this time we can't get a trust token
 4263         auth_data = self.build_authentication_request(
 4264             user_id=self.trustee_user['id'],
 4265             password=self.trustee_user['password'],
 4266             trust_id=trust['id'])
 4267         self.v3_create_token(auth_data,
 4268                              expected_status=http_client.UNAUTHORIZED)
 4269 
 4270     def test_create_unlimited_use_trust(self):
 4271         # by default trusts are unlimited in terms of tokens that can be
 4272         # generated from them, this test creates such a trust explicitly
 4273         ref = unit.new_trust_ref(
 4274             trustor_user_id=self.user_id,
 4275             trustee_user_id=self.trustee_user['id'],
 4276             project_id=self.project_id,
 4277             remaining_uses=None,
 4278             role_ids=[self.role_id])
 4279         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4280         trust = self.assertValidTrustResponse(r, ref)
 4281 
 4282         r = self.get(
 4283             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4284         auth_data = self.build_authentication_request(
 4285             user_id=self.trustee_user['id'],
 4286             password=self.trustee_user['password'])
 4287         r = self.v3_create_token(auth_data)
 4288         token = r.headers.get('X-Subject-Token')
 4289         auth_data = self.build_authentication_request(
 4290             token=token,
 4291             trust_id=trust['id'])
 4292         r = self.v3_create_token(auth_data)
 4293         r = self.get(
 4294             '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
 4295         trust = r.result.get('trust')
 4296         self.assertIsNone(trust['remaining_uses'])
 4297 
 4298     def test_impersonation_token_cannot_create_new_trust(self):
 4299         ref = unit.new_trust_ref(
 4300             trustor_user_id=self.user_id,
 4301             trustee_user_id=self.trustee_user['id'],
 4302             project_id=self.project_id,
 4303             impersonation=True,
 4304             expires=dict(minutes=1),
 4305             role_ids=[self.role_id])
 4306 
 4307         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4308         trust = self.assertValidTrustResponse(r)
 4309 
 4310         auth_data = self.build_authentication_request(
 4311             user_id=self.trustee_user['id'],
 4312             password=self.trustee_user['password'],
 4313             trust_id=trust['id'])
 4314 
 4315         trust_token = self.get_requested_token(auth_data)
 4316 
 4317         # Build second trust
 4318         ref = unit.new_trust_ref(
 4319             trustor_user_id=self.user_id,
 4320             trustee_user_id=self.trustee_user['id'],
 4321             project_id=self.project_id,
 4322             impersonation=True,
 4323             expires=dict(minutes=1),
 4324             role_ids=[self.role_id])
 4325 
 4326         self.post('/OS-TRUST/trusts',
 4327                   body={'trust': ref},
 4328                   token=trust_token,
 4329                   expected_status=http_client.FORBIDDEN)
 4330 
 4331     def test_trust_deleted_grant(self):
 4332         # create a new role
 4333         role = unit.new_role_ref()
 4334         PROVIDERS.role_api.create_role(role['id'], role)
 4335 
 4336         grant_url = (
 4337             '/projects/%(project_id)s/users/%(user_id)s/'
 4338             'roles/%(role_id)s' % {
 4339                 'project_id': self.project_id,
 4340                 'user_id': self.user_id,
 4341                 'role_id': role['id']})
 4342 
 4343         # assign a new role
 4344         self.put(grant_url)
 4345 
 4346         # create a trust that delegates the new role
 4347         ref = unit.new_trust_ref(
 4348             trustor_user_id=self.user_id,
 4349             trustee_user_id=self.trustee_user['id'],
 4350             project_id=self.project_id,
 4351             impersonation=False,
 4352             expires=dict(minutes=1),
 4353             role_ids=[role['id']])
 4354 
 4355         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4356         trust = self.assertValidTrustResponse(r)
 4357 
 4358         # delete the grant
 4359         self.delete(grant_url)
 4360 
 4361         # attempt to get a trust token with the deleted grant
 4362         # and ensure it's unauthorized
 4363         auth_data = self.build_authentication_request(
 4364             user_id=self.trustee_user['id'],
 4365             password=self.trustee_user['password'],
 4366             trust_id=trust['id'])
 4367         r = self.v3_create_token(auth_data,
 4368                                  expected_status=http_client.FORBIDDEN)
 4369 
 4370     def test_trust_chained(self):
 4371         """Test that a trust token can't be used to execute another trust.
 4372 
 4373         To do this, we create an A->B->C hierarchy of trusts, then attempt to
 4374         execute the trusts in series (C->B->A).
 4375 
 4376         """
 4377         # create a sub-trustee user
 4378         sub_trustee_user = unit.create_user(
 4379             PROVIDERS.identity_api,
 4380             domain_id=test_v3.DEFAULT_DOMAIN_ID)
 4381         sub_trustee_user_id = sub_trustee_user['id']
 4382 
 4383         # create a new role
 4384         role = unit.new_role_ref()
 4385         PROVIDERS.role_api.create_role(role['id'], role)
 4386 
 4387         # assign the new role to trustee
 4388         self.put(
 4389             '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
 4390                 'project_id': self.project_id,
 4391                 'user_id': self.trustee_user['id'],
 4392                 'role_id': role['id']})
 4393 
 4394         # create a trust from trustor -> trustee
 4395         ref = unit.new_trust_ref(
 4396             trustor_user_id=self.user_id,
 4397             trustee_user_id=self.trustee_user['id'],
 4398             project_id=self.project_id,
 4399             impersonation=True,
 4400             expires=dict(minutes=1),
 4401             role_ids=[self.role_id])
 4402         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4403         trust1 = self.assertValidTrustResponse(r)
 4404 
 4405         # authenticate as trustee so we can create a second trust
 4406         auth_data = self.build_authentication_request(
 4407             user_id=self.trustee_user['id'],
 4408             password=self.trustee_user['password'],
 4409             project_id=self.project_id)
 4410         token = self.get_requested_token(auth_data)
 4411 
 4412         # create a trust from trustee -> sub-trustee
 4413         ref = unit.new_trust_ref(
 4414             trustor_user_id=self.trustee_user['id'],
 4415             trustee_user_id=sub_trustee_user_id,
 4416             project_id=self.project_id,
 4417             impersonation=True,
 4418             expires=dict(minutes=1),
 4419             role_ids=[role['id']])
 4420         r = self.post('/OS-TRUST/trusts', token=token, body={'trust': ref})
 4421         trust2 = self.assertValidTrustResponse(r)
 4422 
 4423         # authenticate as sub-trustee and get a trust token
 4424         auth_data = self.build_authentication_request(
 4425             user_id=sub_trustee_user['id'],
 4426             password=sub_trustee_user['password'],
 4427             trust_id=trust2['id'])
 4428         trust_token = self.get_requested_token(auth_data)
 4429 
 4430         # attempt to get the second trust using a trust token
 4431         auth_data = self.build_authentication_request(
 4432             token=trust_token,
 4433             trust_id=trust1['id'])
 4434         r = self.v3_create_token(auth_data,
 4435                                  expected_status=http_client.FORBIDDEN)
 4436 
 4437     def assertTrustTokensRevoked(self, trust_id):
 4438         revocation_response = self.get('/OS-REVOKE/events')
 4439         revocation_events = revocation_response.json_body['events']
 4440         found = False
 4441         for event in revocation_events:
 4442             if event.get('OS-TRUST:trust_id') == trust_id:
 4443                 found = True
 4444         self.assertTrue(found, 'event with trust_id %s not found in list' %
 4445                         trust_id)
 4446 
 4447     def test_delete_trust_revokes_tokens(self):
 4448         ref = unit.new_trust_ref(
 4449             trustor_user_id=self.user_id,
 4450             trustee_user_id=self.trustee_user['id'],
 4451             project_id=self.project_id,
 4452             impersonation=False,
 4453             expires=dict(minutes=1),
 4454             role_ids=[self.role_id])
 4455         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4456         trust = self.assertValidTrustResponse(r)
 4457         trust_id = trust['id']
 4458         auth_data = self.build_authentication_request(
 4459             user_id=self.trustee_user['id'],
 4460             password=self.trustee_user['password'],
 4461             trust_id=trust_id)
 4462         r = self.v3_create_token(auth_data)
 4463         self.assertValidProjectScopedTokenResponse(
 4464             r, self.trustee_user)
 4465         trust_token = r.headers['X-Subject-Token']
 4466         self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
 4467             'trust_id': trust_id})
 4468         headers = {'X-Subject-Token': trust_token}
 4469         self.head('/auth/tokens', headers=headers,
 4470                   expected_status=http_client.NOT_FOUND)
 4471         self.assertTrustTokensRevoked(trust_id)
 4472 
 4473     def disable_user(self, user):
 4474         user['enabled'] = False
 4475         PROVIDERS.identity_api.update_user(user['id'], user)
 4476 
 4477     def test_trust_get_token_fails_if_trustor_disabled(self):
 4478         ref = unit.new_trust_ref(
 4479             trustor_user_id=self.user_id,
 4480             trustee_user_id=self.trustee_user['id'],
 4481             project_id=self.project_id,
 4482             impersonation=False,
 4483             expires=dict(minutes=1),
 4484             role_ids=[self.role_id])
 4485 
 4486         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4487 
 4488         trust = self.assertValidTrustResponse(r, ref)
 4489 
 4490         auth_data = self.build_authentication_request(
 4491             user_id=self.trustee_user['id'],
 4492             password=self.trustee_user['password'],
 4493             trust_id=trust['id'])
 4494         self.v3_create_token(auth_data)
 4495 
 4496         self.disable_user(self.user)
 4497 
 4498         auth_data = self.build_authentication_request(
 4499             user_id=self.trustee_user['id'],
 4500             password=self.trustee_user['password'],
 4501             trust_id=trust['id'])
 4502         self.v3_create_token(auth_data,
 4503                              expected_status=http_client.FORBIDDEN)
 4504 
 4505     def test_trust_get_token_fails_if_trustee_disabled(self):
 4506         ref = unit.new_trust_ref(
 4507             trustor_user_id=self.user_id,
 4508             trustee_user_id=self.trustee_user['id'],
 4509             project_id=self.project_id,
 4510             impersonation=False,
 4511             expires=dict(minutes=1),
 4512             role_ids=[self.role_id])
 4513 
 4514         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4515 
 4516         trust = self.assertValidTrustResponse(r, ref)
 4517 
 4518         auth_data = self.build_authentication_request(
 4519             user_id=self.trustee_user['id'],
 4520             password=self.trustee_user['password'],
 4521             trust_id=trust['id'])
 4522         self.v3_create_token(auth_data)
 4523 
 4524         self.disable_user(self.trustee_user)
 4525 
 4526         auth_data = self.build_authentication_request(
 4527             user_id=self.trustee_user['id'],
 4528             password=self.trustee_user['password'],
 4529             trust_id=trust['id'])
 4530         self.v3_create_token(auth_data,
 4531                              expected_status=http_client.UNAUTHORIZED)
 4532 
 4533     def test_delete_trust(self):
 4534         ref = unit.new_trust_ref(
 4535             trustor_user_id=self.user_id,
 4536             trustee_user_id=self.trustee_user['id'],
 4537             project_id=self.project_id,
 4538             impersonation=False,
 4539             expires=dict(minutes=1),
 4540             role_ids=[self.role_id])
 4541 
 4542         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4543 
 4544         trust = self.assertValidTrustResponse(r, ref)
 4545 
 4546         self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
 4547             'trust_id': trust['id']})
 4548 
 4549         auth_data = self.build_authentication_request(
 4550             user_id=self.trustee_user['id'],
 4551             password=self.trustee_user['password'],
 4552             trust_id=trust['id'])
 4553         self.v3_create_token(auth_data,
 4554                              expected_status=http_client.UNAUTHORIZED)
 4555 
 4556     def test_change_password_invalidates_trust_tokens(self):
 4557         ref = unit.new_trust_ref(
 4558             trustor_user_id=self.user_id,
 4559             trustee_user_id=self.trustee_user['id'],
 4560             project_id=self.project_id,
 4561             impersonation=True,
 4562             expires=dict(minutes=1),
 4563             role_ids=[self.role_id])
 4564 
 4565         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4566         trust = self.assertValidTrustResponse(r)
 4567 
 4568         auth_data = self.build_authentication_request(
 4569             user_id=self.trustee_user['id'],
 4570             password=self.trustee_user['password'],
 4571             trust_id=trust['id'])
 4572         r = self.v3_create_token(auth_data)
 4573 
 4574         self.assertValidProjectScopedTokenResponse(r, self.user)
 4575         trust_token = r.headers.get('X-Subject-Token')
 4576 
 4577         self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
 4578                  self.user_id, token=trust_token)
 4579 
 4580         self.assertValidUserResponse(
 4581             self.patch('/users/%s' % self.trustee_user['id'],
 4582                        body={'user': {'password': uuid.uuid4().hex}}))
 4583 
 4584         self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
 4585                  self.user_id, expected_status=http_client.UNAUTHORIZED,
 4586                  token=trust_token)
 4587 
 4588     def test_trustee_can_do_role_ops(self):
 4589         resp = self.post('/OS-TRUST/trusts',
 4590                          body={'trust': self.redelegated_trust_ref})
 4591         trust = self.assertValidTrustResponse(resp)
 4592         trust_token = self._get_trust_token(trust)
 4593 
 4594         resp = self.get(
 4595             '/OS-TRUST/trusts/%(trust_id)s/roles' % {
 4596                 'trust_id': trust['id']},
 4597             token=trust_token)
 4598         self.assertValidRoleListResponse(resp, self.role)
 4599 
 4600         self.head(
 4601             '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
 4602                 'trust_id': trust['id'],
 4603                 'role_id': self.role['id']},
 4604             token=trust_token,
 4605             expected_status=http_client.OK)
 4606 
 4607         resp = self.get(
 4608             '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
 4609                 'trust_id': trust['id'],
 4610                 'role_id': self.role['id']},
 4611             token=trust_token)
 4612         self.assertValidRoleResponse(resp, self.role)
 4613 
 4614     def test_do_not_consume_remaining_uses_when_get_token_fails(self):
 4615         ref = unit.new_trust_ref(
 4616             trustor_user_id=self.user_id,
 4617             trustee_user_id=self.trustee_user['id'],
 4618             project_id=self.project_id,
 4619             impersonation=False,
 4620             expires=dict(minutes=1),
 4621             role_ids=[self.role_id],
 4622             remaining_uses=3)
 4623         r = self.post('/OS-TRUST/trusts', body={'trust': ref})
 4624 
 4625         new_trust = r.result.get('trust')
 4626         trust_id = new_trust.get('id')
 4627         # Pass in another user's ID as the trustee, the result being a failed
 4628         # token authenticate and the remaining_uses of the trust should not be
 4629         # decremented.
 4630         auth_data = self.build_authentication_request(
 4631             user_id=self.default_domain_user['id'],
 4632             password=self.default_domain_user['password'],
 4633             trust_id=trust_id)
 4634         self.v3_create_token(auth_data,
 4635                              expected_status=http_client.FORBIDDEN)
 4636 
 4637         r = self.get('/OS-TRUST/trusts/%s' % trust_id)
 4638         self.assertEqual(3, r.result.get('trust').get('remaining_uses'))
 4639 
 4640 
 4641 class TestTrustChain(test_v3.RestfulTestCase):
 4642 
 4643     def config_overrides(self):
 4644         super(TestTrustChain, self).config_overrides()
 4645         self.config_fixture.config(
 4646             group='trust',
 4647             allow_redelegation=True,
 4648             max_redelegation_count=10
 4649         )
 4650 
 4651     def setUp(self):
 4652         super(TestTrustChain, self).setUp()
 4653         """Create a trust chain using redelegation.
 4654 
 4655         A trust chain is a series of trusts that are redelegated. For example,
 4656         self.user_list consists of userA, userB, and userC. The first trust in
 4657         the trust chain is going to be established between self.user and userA,
 4658         call it trustA. Then, userA is going to obtain a trust scoped token
 4659         using trustA, and with that token create a trust between userA and
 4660         userB called trustB. This pattern will continue with userB creating a
 4661         trust with userC.
 4662         So the trust chain should look something like:
 4663             trustA -> trustB -> trustC
 4664         Where:
 4665             self.user is trusting userA with trustA
 4666             userA is trusting userB with trustB
 4667             userB is trusting userC with trustC
 4668 
 4669         """
 4670         self.user_list = list()
 4671         self.trust_chain = list()
 4672         for _ in range(3):
 4673             user = unit.create_user(PROVIDERS.identity_api,
 4674                                     domain_id=self.domain_id)
 4675             self.user_list.append(user)
 4676 
 4677         # trustor->trustee redelegation with impersonation
 4678         trustee = self.user_list[0]
 4679         trust_ref = unit.new_trust_ref(
 4680             trustor_user_id=self.user_id,
 4681             trustee_user_id=trustee['id'],
 4682             project_id=self.project_id,
 4683             impersonation=True,
 4684             expires=dict(minutes=1),
 4685             role_ids=[self.role_id],
 4686             allow_redelegation=True,
 4687             redelegation_count=3)
 4688 
 4689         # Create a trust between self.user and the first user in the list
 4690         r = self.post('/OS-TRUST/trusts',
 4691                       body={'trust': trust_ref})
 4692 
 4693         trust = self.assertValidTrustResponse(r)
 4694         auth_data = self.build_authentication_request(
 4695             user_id=trustee['id'],
 4696             password=trustee['password'],
 4697             trust_id=trust['id'])
 4698 
 4699         # Generate a trusted token for the first user
 4700         trust_token = self.get_requested_token(auth_data)
 4701         self.trust_chain.append(trust)
 4702 
 4703         # Loop through the user to create a chain of redelegated trust.
 4704         for next_trustee in self.user_list[1:]:
 4705             trust_ref = unit.new_trust_ref(
 4706                 trustor_user_id=self.user_id,
 4707                 trustee_user_id=next_trustee['id'],
 4708                 project_id=self.project_id,
 4709                 impersonation=True,
 4710                 role_ids=[self.role_id],
 4711                 allow_redelegation=True)
 4712             r = self.post('/OS-TRUST/trusts',
 4713                           body={'trust': trust_ref},
 4714                           token=trust_token)
 4715             trust = self.assertValidTrustResponse(r)
 4716             auth_data = self.build_authentication_request(
 4717                 user_id=next_trustee['id'],
 4718                 password=next_trustee['password'],
 4719                 trust_id=trust['id'])