"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "keystone/tests/unit/test_v3_auth.py" between
keystone-16.0.1.tar.gz and keystone-17.0.0.tar.gz

About: OpenStack Keystone (Core Service: Identity) provides an authentication and authorization service for other OpenStack services. Provides a catalog of endpoints for all OpenStack services.
The "Ussuri" series (latest release).

test_v3_auth.py  (keystone-16.0.1):test_v3_auth.py  (keystone-17.0.0)
skipping to change at line 21 skipping to change at line 21
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy import copy
import datetime import datetime
import fixtures import fixtures
import itertools import itertools
import operator import operator
import re import re
from unittest import mock
import uuid import uuid
import freezegun import freezegun
import mock import http.client
from oslo_serialization import jsonutils as json from oslo_serialization import jsonutils as json
from oslo_utils import fixture from oslo_utils import fixture
from oslo_utils import timeutils from oslo_utils import timeutils
import six
from six.moves import http_client
from six.moves import range
from testtools import matchers from testtools import matchers
from testtools import testcase from testtools import testcase
from keystone import auth from keystone import auth
from keystone.auth.plugins import totp from keystone.auth.plugins import totp
from keystone.common import authorization from keystone.common import authorization
from keystone.common import provider_api from keystone.common import provider_api
from keystone.common.rbac_enforcer import policy from keystone.common.rbac_enforcer import policy
from keystone.common import utils from keystone.common import utils
import keystone.conf import keystone.conf
skipping to change at line 163 skipping to change at line 161
# token issuance. This is a bug with the limited resolution that # token issuance. This is a bug with the limited resolution that
# tokens and revocation events have. # tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5) time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time): with freezegun.freeze_time(time):
self.v3_create_token( self.v3_create_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user_id, user_id=self.user_id,
password=self.user['password'], password=self.user['password'],
user_domain_id=self.domain_id, user_domain_id=self.domain_id,
project_id=self.project_id), project_id=self.project_id),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_MFA_multi_method_rules_requirements_not_met_fails(self): def test_MFA_multi_method_rules_requirements_not_met_fails(self):
# if multiple rules are specified and only one is passed, # if multiple rules are specified and only one is passed,
# unauthorized is expected # unauthorized is expected
rule_list = [['password', 'totp']] rule_list = [['password', 'totp']]
self._update_user_with_MFA_rules(rule_list=rule_list) self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing # NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the # issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that # token issuance. This is a bug with the limited resolution that
# tokens and revocation events have. # tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5) time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time): with freezegun.freeze_time(time):
self.v3_create_token( self.v3_create_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user_id, user_id=self.user_id,
password=self.user['password'], password=self.user['password'],
user_domain_id=self.domain_id, user_domain_id=self.domain_id,
project_id=self.project_id), project_id=self.project_id),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_MFA_rules_bogus_non_existing_auth_method_succeeds(self): def test_MFA_rules_bogus_non_existing_auth_method_succeeds(self):
# Bogus auth methods are thrown out from rules. # Bogus auth methods are thrown out from rules.
rule_list = [['password'], ['BoGusAuThMeTh0dHandl3r']] rule_list = [['password'], ['BoGusAuThMeTh0dHandl3r']]
self._update_user_with_MFA_rules(rule_list=rule_list) self._update_user_with_MFA_rules(rule_list=rule_list)
# NOTE(notmorgan): Step forward in time to ensure we're not causing # NOTE(notmorgan): Step forward in time to ensure we're not causing
# issues with revocation events that occur at the same time as the # issues with revocation events that occur at the same time as the
# token issuance. This is a bug with the limited resolution that # token issuance. This is a bug with the limited resolution that
# tokens and revocation events have. # tokens and revocation events have.
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5) time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
skipping to change at line 280 skipping to change at line 278
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5) time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time): with freezegun.freeze_time(time):
response = self.admin_request( response = self.admin_request(
method='POST', method='POST',
path='/v3/auth/tokens', path='/v3/auth/tokens',
body=self.build_authentication_request( body=self.build_authentication_request(
user_id=self.user_id, user_id=self.user_id,
password=self.user['password'], password=self.user['password'],
user_domain_id=self.domain_id, user_domain_id=self.domain_id,
project_id=self.project_id), project_id=self.project_id),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone( self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER)) response.headers.get(authorization.AUTH_RECEIPT_HEADER))
resp_data = response.result resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting # NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content. # related failures since order isn't important, just content.
self.assertEqual( self.assertEqual(
{'password'}, set(resp_data.get('receipt').get('methods'))) {'password'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual( self.assertEqual(
set(frozenset(r) for r in rule_list), set(frozenset(r) for r in rule_list),
skipping to change at line 314 skipping to change at line 312
with freezegun.freeze_time(time): with freezegun.freeze_time(time):
response = self.admin_request( response = self.admin_request(
method='POST', method='POST',
path='/v3/auth/tokens', path='/v3/auth/tokens',
body=self.build_authentication_request( body=self.build_authentication_request(
user_id=self.user_id, user_id=self.user_id,
user_domain_id=self.domain_id, user_domain_id=self.domain_id,
project_id=self.project_id, project_id=self.project_id,
passcode=totp._generate_totp_passcodes( passcode=totp._generate_totp_passcodes(
totp_cred['blob'])[0]), totp_cred['blob'])[0]),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone( self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER)) response.headers.get(authorization.AUTH_RECEIPT_HEADER))
resp_data = response.result resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting # NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content. # related failures since order isn't important, just content.
self.assertEqual( self.assertEqual(
{'totp'}, set(resp_data.get('receipt').get('methods'))) {'totp'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual( self.assertEqual(
set(frozenset(r) for r in rule_list), set(frozenset(r) for r in rule_list),
skipping to change at line 349 skipping to change at line 347
response = self.admin_request( response = self.admin_request(
method='POST', method='POST',
path='/v3/auth/tokens', path='/v3/auth/tokens',
body=self.build_authentication_request( body=self.build_authentication_request(
user_id=self.user_id, user_id=self.user_id,
password=self.user['password'], password=self.user['password'],
user_domain_id=self.domain_id, user_domain_id=self.domain_id,
project_id=self.project_id, project_id=self.project_id,
passcode=totp._generate_totp_passcodes( passcode=totp._generate_totp_passcodes(
totp_cred['blob'])[0]), totp_cred['blob'])[0]),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone( self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER)) response.headers.get(authorization.AUTH_RECEIPT_HEADER))
resp_data = response.result resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting # NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content. # related failures since order isn't important, just content.
self.assertEqual( self.assertEqual(
{'password', 'totp'}, set(resp_data.get('receipt').get('methods'))) {'password', 'totp'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual( self.assertEqual(
set(frozenset(r) for r in rule_list), set(frozenset(r) for r in rule_list),
skipping to change at line 390 skipping to change at line 388
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5) time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time): with freezegun.freeze_time(time):
response = self.admin_request( response = self.admin_request(
method='POST', method='POST',
path='/v3/auth/tokens', path='/v3/auth/tokens',
body=self.build_authentication_request( body=self.build_authentication_request(
user_id=self.user_id, user_id=self.user_id,
password=self.user['password'], password=self.user['password'],
user_domain_id=self.domain_id, user_domain_id=self.domain_id,
project_id=self.project_id), project_id=self.project_id),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone( self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER)) response.headers.get(authorization.AUTH_RECEIPT_HEADER))
resp_data = response.result resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting # NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content. # related failures since order isn't important, just content.
self.assertEqual( self.assertEqual(
{'password'}, set(resp_data.get('receipt').get('methods'))) {'password'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual( self.assertEqual(
set(frozenset(r) for r in expect_rule_list), set(frozenset(r) for r in expect_rule_list),
skipping to change at line 423 skipping to change at line 421
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5) time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time): with freezegun.freeze_time(time):
response = self.admin_request( response = self.admin_request(
method='POST', method='POST',
path='/v3/auth/tokens', path='/v3/auth/tokens',
body=self.build_authentication_request( body=self.build_authentication_request(
user_id=self.user_id, user_id=self.user_id,
password=self.user['password'], password=self.user['password'],
user_domain_id=self.domain_id, user_domain_id=self.domain_id,
project_id=self.project_id), project_id=self.project_id),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
self.assertIsNotNone( self.assertIsNotNone(
response.headers.get(authorization.AUTH_RECEIPT_HEADER)) response.headers.get(authorization.AUTH_RECEIPT_HEADER))
receipt = response.headers.get(authorization.AUTH_RECEIPT_HEADER) receipt = response.headers.get(authorization.AUTH_RECEIPT_HEADER)
resp_data = response.result resp_data = response.result
# NOTE(adriant): We convert to sets to avoid any potential sorting # NOTE(adriant): We convert to sets to avoid any potential sorting
# related failures since order isn't important, just content. # related failures since order isn't important, just content.
self.assertEqual( self.assertEqual(
{'password'}, set(resp_data.get('receipt').get('methods'))) {'password'}, set(resp_data.get('receipt').get('methods')))
self.assertEqual( self.assertEqual(
skipping to change at line 461 skipping to change at line 459
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5) time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
with freezegun.freeze_time(time): with freezegun.freeze_time(time):
response = self.admin_request( response = self.admin_request(
method='POST', method='POST',
path='/v3/auth/tokens', path='/v3/auth/tokens',
headers={authorization.AUTH_RECEIPT_HEADER: "bogus-receipt"}, headers={authorization.AUTH_RECEIPT_HEADER: "bogus-receipt"},
body=self.build_authentication_request( body=self.build_authentication_request(
user_id=self.user_id, user_id=self.user_id,
user_domain_id=self.domain_id, user_domain_id=self.domain_id,
project_id=self.project_id), project_id=self.project_id),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
self.assertEqual(401, response.result['error']['code']) self.assertEqual(401, response.result['error']['code'])
class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase): class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
def setUp(self): def setUp(self):
super(TestAuthInfo, self).setUp() super(TestAuthInfo, self).setUp()
auth.core.load_auth_methods() auth.core.load_auth_methods()
def test_unsupported_auth_method(self): def test_unsupported_auth_method(self):
auth_data = {'methods': ['abc']} auth_data = {'methods': ['abc']}
auth_data['abc'] = {'test': 'test'} auth_data['abc'] = {'test': 'test'}
skipping to change at line 592 skipping to change at line 590
project_id=self.project_id, project_id=self.project_id,
impersonation=impersonation, impersonation=impersonation,
role_ids=[self.role_id]) role_ids=[self.role_id])
# Create a trust # Create a trust
r = self.post('/OS-TRUST/trusts', body={'trust': ref}) r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r) trust = self.assertValidTrustResponse(r)
return (trustee_user, trust) return (trustee_user, trust)
def _validate_token(self, token, def _validate_token(self, token,
expected_status=http_client.OK, allow_expired=False): expected_status=http.client.OK, allow_expired=False):
path = '/v3/auth/tokens' path = '/v3/auth/tokens'
if allow_expired: if allow_expired:
path += '?allow_expired=1' path += '?allow_expired=1'
return self.admin_request( return self.admin_request(
path=path, path=path,
headers={'X-Auth-Token': self.get_admin_token(), headers={'X-Auth-Token': self.get_admin_token(),
'X-Subject-Token': token}, 'X-Subject-Token': token},
method='GET', method='GET',
expected_status=expected_status expected_status=expected_status
) )
def _revoke_token(self, token, expected_status=http_client.NO_CONTENT): def _revoke_token(self, token, expected_status=http.client.NO_CONTENT):
return self.delete( return self.delete(
'/auth/tokens', '/auth/tokens',
headers={'x-subject-token': token}, headers={'x-subject-token': token},
expected_status=expected_status) expected_status=expected_status)
def _set_user_enabled(self, user, enabled=True): def _set_user_enabled(self, user, enabled=True):
user['enabled'] = enabled user['enabled'] = enabled
PROVIDERS.identity_api.update_user(user['id'], user) PROVIDERS.identity_api.update_user(user['id'], user)
def _create_project_and_set_as_default_project(self): def _create_project_and_set_as_default_project(self):
skipping to change at line 649 skipping to change at line 647
# get the token for a user. This is self.user which is different from # get the token for a user. This is self.user which is different from
# self.default_domain_user. # self.default_domain_user.
token = self.get_scoped_token() token = self.get_scoped_token()
# try both password and token methods with different identities and it # try both password and token methods with different identities and it
# should fail # should fail
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
token=token, token=token,
user_id=self.default_domain_user['id'], user_id=self.default_domain_user['id'],
password=self.default_domain_user['password']) password=self.default_domain_user['password'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_token_for_user_without_password_fails(self): def test_create_token_for_user_without_password_fails(self):
user = unit.new_user_ref(domain_id=self.domain['id']) user = unit.new_user_ref(domain_id=self.domain['id'])
del user['password'] # can't have a password for this test del user['password'] # can't have a password for this test
user = PROVIDERS.identity_api.create_user(user) user = PROVIDERS.identity_api.create_user(user)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=user['id'], user_id=user['id'],
password='password') password='password')
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_unscoped_token_by_authenticating_with_unscoped_token(self): def test_create_unscoped_token_by_authenticating_with_unscoped_token(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password']) password=self.user['password'])
r = self.v3_create_token(auth_data) r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
token_id = r.headers.get('X-Subject-Token') token_id = r.headers.get('X-Subject-Token')
auth_data = self.build_authentication_request(token=token_id) auth_data = self.build_authentication_request(token=token_id)
skipping to change at line 714 skipping to change at line 712
# NOTE(lbragstad): We set token expiration to 10 seconds so that we can # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
# use the context manager of freezegun without sqlite issues. # use the context manager of freezegun without sqlite issues.
self.config_fixture.config(group='token', self.config_fixture.config(group='token',
expiration=10) expiration=10)
time = datetime.datetime.utcnow() time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime: with freezegun.freeze_time(time) as frozen_datetime:
unscoped_token = self._get_unscoped_token() unscoped_token = self._get_unscoped_token()
frozen_datetime.tick(delta=datetime.timedelta(seconds=15)) frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
self._validate_token( self._validate_token(
unscoped_token, unscoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_revoke_unscoped_token(self): def test_revoke_unscoped_token(self):
unscoped_token = self._get_unscoped_token() unscoped_token = self._get_unscoped_token()
r = self._validate_token(unscoped_token) r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
self._revoke_token(unscoped_token) self._revoke_token(unscoped_token)
self._validate_token(unscoped_token, self._validate_token(unscoped_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_create_explicit_unscoped_token(self): def test_create_explicit_unscoped_token(self):
self._create_project_and_set_as_default_project() self._create_project_and_set_as_default_project()
# explicitly ask for an unscoped token # explicitly ask for an unscoped token
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
unscoped="unscoped") unscoped="unscoped")
r = self.post('/auth/tokens', body=auth_data, noauth=True) r = self.post('/auth/tokens', body=auth_data, noauth=True)
skipping to change at line 789 skipping to change at line 787
def test_unscoped_token_is_invalid_after_disabling_user(self): def test_unscoped_token_is_invalid_after_disabling_user(self):
unscoped_token = self._get_unscoped_token() unscoped_token = self._get_unscoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(unscoped_token) r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
# Disable the user # Disable the user
self._set_user_enabled(self.user, enabled=False) self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
unscoped_token, unscoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_unscoped_token_is_invalid_after_enabling_disabled_user(self): def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
unscoped_token = self._get_unscoped_token() unscoped_token = self._get_unscoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(unscoped_token) r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
# Disable the user # Disable the user
self._set_user_enabled(self.user, enabled=False) self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
unscoped_token, unscoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
# Enable the user # Enable the user
self._set_user_enabled(self.user) self._set_user_enabled(self.user)
# Ensure validating a token for a re-enabled user fails # Ensure validating a token for a re-enabled user fails
self._validate_token( self._validate_token(
unscoped_token, unscoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_unscoped_token_is_invalid_after_disabling_user_domain(self): def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
unscoped_token = self._get_unscoped_token() unscoped_token = self._get_unscoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(unscoped_token) r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
# Disable the user's domain # Disable the user's domain
self.domain['enabled'] = False self.domain['enabled'] = False
PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain) PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
unscoped_token, unscoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_unscoped_token_is_invalid_after_changing_user_password(self): def test_unscoped_token_is_invalid_after_changing_user_password(self):
unscoped_token = self._get_unscoped_token() unscoped_token = self._get_unscoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(unscoped_token) r = self._validate_token(unscoped_token)
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
# Change user's password # Change user's password
self.user['password'] = 'Password1' self.user['password'] = 'Password1'
PROVIDERS.identity_api.update_user(self.user['id'], self.user) PROVIDERS.identity_api.update_user(self.user['id'], self.user)
# Ensure updating user's password revokes existing user's tokens # Ensure updating user's password revokes existing user's tokens
self._validate_token( self._validate_token(
unscoped_token, unscoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_create_system_token_with_user_id(self): def test_create_system_token_with_user_id(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % { path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': self.role_id 'role_id': self.role_id
} }
self.put(path=path) self.put(path=path)
auth_request_body = self.build_authentication_request( auth_request_body = self.build_authentication_request(
skipping to change at line 881 skipping to change at line 879
self.assertValidSystemScopedTokenResponse(response) self.assertValidSystemScopedTokenResponse(response)
def test_create_system_token_fails_without_system_assignment(self): def test_create_system_token_fails_without_system_assignment(self):
auth_request_body = self.build_authentication_request( auth_request_body = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
system=True system=True
) )
self.v3_create_token( self.v3_create_token(
auth_request_body, auth_request_body,
expected_status=http_client.UNAUTHORIZED expected_status=http.client.UNAUTHORIZED
) )
def test_system_token_is_invalid_after_disabling_user(self): def test_system_token_is_invalid_after_disabling_user(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % { path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': self.role_id 'role_id': self.role_id
} }
self.put(path=path) self.put(path=path)
auth_request_body = self.build_authentication_request( auth_request_body = self.build_authentication_request(
skipping to change at line 920 skipping to change at line 918
self.patch( self.patch(
'/users/%(user_id)s' % {'user_id': self.user['id']}, '/users/%(user_id)s' % {'user_id': self.user['id']},
body=user_ref body=user_ref
) )
self.admin_request( self.admin_request(
path='/v3/auth/tokens', path='/v3/auth/tokens',
headers={'X-Auth-Token': token, headers={'X-Auth-Token': token,
'X-Subject-Token': token}, 'X-Subject-Token': token},
method='GET', method='GET',
expected_status=http_client.UNAUTHORIZED expected_status=http.client.UNAUTHORIZED
) )
self.admin_request( self.admin_request(
path='/v3/auth/tokens', path='/v3/auth/tokens',
headers={'X-Auth-Token': token, headers={'X-Auth-Token': token,
'X-Subject-Token': token}, 'X-Subject-Token': token},
method='HEAD', method='HEAD',
expected_status=http_client.UNAUTHORIZED expected_status=http.client.UNAUTHORIZED
) )
def test_create_system_token_via_system_group_assignment(self): def test_create_system_token_via_system_group_assignment(self):
ref = { ref = {
'group': unit.new_group_ref( 'group': unit.new_group_ref(
domain_id=CONF.identity.default_domain_id domain_id=CONF.identity.default_domain_id
) )
} }
group = self.post('/groups', body=ref).json_body['group'] group = self.post('/groups', body=ref).json_body['group']
skipping to change at line 979 skipping to change at line 977
password=self.user['password'], password=self.user['password'],
user_domain_id=self.domain['id'], user_domain_id=self.domain['id'],
system=True system=True
) )
response = self.v3_create_token(auth_request_body) response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response) self.assertValidSystemScopedTokenResponse(response)
token = response.headers.get('X-Subject-Token') token = response.headers.get('X-Subject-Token')
self._validate_token(token) self._validate_token(token)
self._revoke_token(token) self._revoke_token(token)
self._validate_token(token, expected_status=http_client.NOT_FOUND) self._validate_token(token, expected_status=http.client.NOT_FOUND)
def test_system_token_is_invalid_after_deleting_system_role(self): def test_system_token_is_invalid_after_deleting_system_role(self):
ref = {'role': unit.new_role_ref()} ref = {'role': unit.new_role_ref()}
system_role = self.post('/roles', body=ref).json_body['role'] system_role = self.post('/roles', body=ref).json_body['role']
path = '/system/users/%(user_id)s/roles/%(role_id)s' % { path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': system_role['id'] 'role_id': system_role['id']
} }
self.put(path=path) self.put(path=path)
skipping to change at line 1004 skipping to change at line 1002
user_domain_id=self.domain['id'], user_domain_id=self.domain['id'],
system=True system=True
) )
response = self.v3_create_token(auth_request_body) response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response) self.assertValidSystemScopedTokenResponse(response)
token = response.headers.get('X-Subject-Token') token = response.headers.get('X-Subject-Token')
self._validate_token(token) self._validate_token(token)
self.delete('/roles/%(role_id)s' % {'role_id': system_role['id']}) self.delete('/roles/%(role_id)s' % {'role_id': system_role['id']})
self._validate_token(token, expected_status=http_client.NOT_FOUND) self._validate_token(token, expected_status=http.client.NOT_FOUND)
def test_rescoping_a_system_token_for_a_project_token_fails(self): def test_rescoping_a_system_token_for_a_project_token_fails(self):
ref = {'role': unit.new_role_ref()} ref = {'role': unit.new_role_ref()}
system_role = self.post('/roles', body=ref).json_body['role'] system_role = self.post('/roles', body=ref).json_body['role']
path = '/system/users/%(user_id)s/roles/%(role_id)s' % { path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': system_role['id'] 'role_id': system_role['id']
} }
self.put(path=path) self.put(path=path)
skipping to change at line 1030 skipping to change at line 1028
system=True system=True
) )
response = self.v3_create_token(auth_request_body) response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response) self.assertValidSystemScopedTokenResponse(response)
system_token = response.headers.get('X-Subject-Token') system_token = response.headers.get('X-Subject-Token')
auth_request_body = self.build_authentication_request( auth_request_body = self.build_authentication_request(
token=system_token, project_id=self.project_id token=system_token, project_id=self.project_id
) )
self.v3_create_token( self.v3_create_token(
auth_request_body, expected_status=http_client.FORBIDDEN auth_request_body, expected_status=http.client.FORBIDDEN
) )
def test_rescoping_a_system_token_for_a_domain_token_fails(self): def test_rescoping_a_system_token_for_a_domain_token_fails(self):
ref = {'role': unit.new_role_ref()} ref = {'role': unit.new_role_ref()}
system_role = self.post('/roles', body=ref).json_body['role'] system_role = self.post('/roles', body=ref).json_body['role']
path = '/system/users/%(user_id)s/roles/%(role_id)s' % { path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': system_role['id'] 'role_id': system_role['id']
} }
skipping to change at line 1057 skipping to change at line 1055
system=True system=True
) )
response = self.v3_create_token(auth_request_body) response = self.v3_create_token(auth_request_body)
self.assertValidSystemScopedTokenResponse(response) self.assertValidSystemScopedTokenResponse(response)
system_token = response.headers.get('X-Subject-Token') system_token = response.headers.get('X-Subject-Token')
auth_request_body = self.build_authentication_request( auth_request_body = self.build_authentication_request(
token=system_token, domain_id=CONF.identity.default_domain_id token=system_token, domain_id=CONF.identity.default_domain_id
) )
self.v3_create_token( self.v3_create_token(
auth_request_body, expected_status=http_client.FORBIDDEN auth_request_body, expected_status=http.client.FORBIDDEN
) )
def test_create_domain_token_scoped_with_domain_id_and_user_id(self): def test_create_domain_token_scoped_with_domain_id_and_user_id(self):
# grant the user a role on the domain # grant the user a role on the domain
path = '/domains/%s/users/%s/roles/%s' % ( path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id']) self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path) self.put(path=path)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
skipping to change at line 1197 skipping to change at line 1195
# authenticate # authenticate
self.config_fixture.config(group='resource', self.config_fixture.config(group='resource',
project_name_url_safe='new') project_name_url_safe='new')
self.v3_create_token(auth_data) self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to # Set the name url restriction to strict and we should fail to
# authenticate # authenticate
self.config_fixture.config(group='resource', self.config_fixture.config(group='resource',
domain_name_url_safe='strict') domain_name_url_safe='strict')
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_domain_token_without_grant_returns_unauthorized(self): def test_create_domain_token_without_grant_returns_unauthorized(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
domain_id=self.domain['id']) domain_id=self.domain['id'])
# this fails because the user does not have a role on self.domain # this fails because the user does not have a role on self.domain
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_validate_domain_scoped_token(self): def test_validate_domain_scoped_token(self):
# Grant user access to domain # Grant user access to domain
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'], self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id'] domain_id=self.domain['id']
) )
domain_scoped_token = self._get_domain_scoped_token() domain_scoped_token = self._get_domain_scoped_token()
r = self._validate_token(domain_scoped_token) r = self._validate_token(domain_scoped_token)
self.assertValidDomainScopedTokenResponse(r) self.assertValidDomainScopedTokenResponse(r)
skipping to change at line 1238 skipping to change at line 1236
# NOTE(lbragstad): We set token expiration to 10 seconds so that we can # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
# use the context manager of freezegun without sqlite issues. # use the context manager of freezegun without sqlite issues.
self.config_fixture.config(group='token', self.config_fixture.config(group='token',
expiration=10) expiration=10)
time = datetime.datetime.utcnow() time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime: with freezegun.freeze_time(time) as frozen_datetime:
domain_scoped_token = self._get_domain_scoped_token() domain_scoped_token = self._get_domain_scoped_token()
frozen_datetime.tick(delta=datetime.timedelta(seconds=15)) frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
self._validate_token( self._validate_token(
domain_scoped_token, domain_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_domain_scoped_token_is_invalid_after_disabling_user(self): def test_domain_scoped_token_is_invalid_after_disabling_user(self):
# Grant user access to domain # Grant user access to domain
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'], self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id'] domain_id=self.domain['id']
) )
domain_scoped_token = self._get_domain_scoped_token() domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(domain_scoped_token) r = self._validate_token(domain_scoped_token)
self.assertValidDomainScopedTokenResponse(r) self.assertValidDomainScopedTokenResponse(r)
# Disable user # Disable user
self._set_user_enabled(self.user, enabled=False) self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
domain_scoped_token, domain_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_domain_scoped_token_is_invalid_after_deleting_grant(self): def test_domain_scoped_token_is_invalid_after_deleting_grant(self):
# Grant user access to domain # Grant user access to domain
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'], self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id'] domain_id=self.domain['id']
) )
domain_scoped_token = self._get_domain_scoped_token() domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(domain_scoped_token) r = self._validate_token(domain_scoped_token)
self.assertValidDomainScopedTokenResponse(r) self.assertValidDomainScopedTokenResponse(r)
# Delete access to domain # Delete access to domain
PROVIDERS.assignment_api.delete_grant( PROVIDERS.assignment_api.delete_grant(
self.role['id'], user_id=self.user['id'], self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id'] domain_id=self.domain['id']
) )
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
domain_scoped_token, domain_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_domain_scoped_token_invalid_after_disabling_domain(self): def test_domain_scoped_token_invalid_after_disabling_domain(self):
# Grant user access to domain # Grant user access to domain
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'], self.role['id'], user_id=self.user['id'],
domain_id=self.domain['id'] domain_id=self.domain['id']
) )
domain_scoped_token = self._get_domain_scoped_token() domain_scoped_token = self._get_domain_scoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(domain_scoped_token) r = self._validate_token(domain_scoped_token)
self.assertValidDomainScopedTokenResponse(r) self.assertValidDomainScopedTokenResponse(r)
# Disable domain # Disable domain
self.domain['enabled'] = False self.domain['enabled'] = False
PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain) PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
# Ensure validating a token for a disabled domain fails # Ensure validating a token for a disabled domain fails
self._validate_token( self._validate_token(
domain_scoped_token, domain_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_create_project_scoped_token_with_project_id_and_user_id(self): def test_create_project_scoped_token_with_project_id_and_user_id(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
project_id=self.project['id']) project_id=self.project['id'])
r = self.v3_create_token(auth_data) r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
skipping to change at line 1323 skipping to change at line 1321
# NOTE(lbragstad): We set token expiration to 10 seconds so that we can # NOTE(lbragstad): We set token expiration to 10 seconds so that we can
# use the context manager of freezegun without sqlite issues. # use the context manager of freezegun without sqlite issues.
self.config_fixture.config(group='token', self.config_fixture.config(group='token',
expiration=10) expiration=10)
time = datetime.datetime.utcnow() time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime: with freezegun.freeze_time(time) as frozen_datetime:
project_scoped_token = self._get_project_scoped_token() project_scoped_token = self._get_project_scoped_token()
frozen_datetime.tick(delta=datetime.timedelta(seconds=15)) frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
self._validate_token( self._validate_token(
project_scoped_token, project_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_revoke_project_scoped_token(self): def test_revoke_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token() project_scoped_token = self._get_project_scoped_token()
r = self._validate_token(project_scoped_token) r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
self._revoke_token(project_scoped_token) self._revoke_token(project_scoped_token)
self._validate_token(project_scoped_token, self._validate_token(project_scoped_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_project_scoped_token_is_scoped_to_default_project(self): def test_project_scoped_token_is_scoped_to_default_project(self):
project = self._create_project_and_set_as_default_project() project = self._create_project_and_set_as_default_project()
# attempt to authenticate without requesting a project # attempt to authenticate without requesting a project
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password']) password=self.user['password'])
r = self.v3_create_token(auth_data) r = self.v3_create_token(auth_data)
skipping to change at line 1454 skipping to change at line 1452
def test_scope_to_project_without_grant_returns_unauthorized(self): def test_scope_to_project_without_grant_returns_unauthorized(self):
project = unit.new_project_ref(domain_id=self.domain_id) project = unit.new_project_ref(domain_id=self.domain_id)
PROVIDERS.resource_api.create_project(project['id'], project) PROVIDERS.resource_api.create_project(project['id'], project)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
project_id=project['id']) project_id=project['id'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_project_scoped_token_with_username_and_domain_id(self): def test_create_project_scoped_token_with_username_and_domain_id(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
username=self.user['name'], username=self.user['name'],
user_domain_id=self.domain['id'], user_domain_id=self.domain['id'],
password=self.user['password'], password=self.user['password'],
project_id=self.project['id']) project_id=self.project['id'])
r = self.v3_create_token(auth_data) r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
skipping to change at line 1509 skipping to change at line 1507
# authenticate # authenticate
self.config_fixture.config(group='resource', self.config_fixture.config(group='resource',
project_name_url_safe='new') project_name_url_safe='new')
self.v3_create_token(auth_data) self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to # Set the name url restriction to strict and we should fail to
# authenticate # authenticate
self.config_fixture.config(group='resource', self.config_fixture.config(group='resource',
project_name_url_safe='strict') project_name_url_safe='strict')
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_project_scoped_token_fails_if_domain_name_unsafe(self): def test_create_project_scoped_token_fails_if_domain_name_unsafe(self):
"""Verify authenticate to a project using unsafe domain name fails.""" """Verify authenticate to a project using unsafe domain name fails."""
# Start with url name restrictions off, so we can create the unsafe # Start with url name restrictions off, so we can create the unsafe
# named domain # named domain
self.config_fixture.config(group='resource', self.config_fixture.config(group='resource',
domain_name_url_safe='off') domain_name_url_safe='off')
unsafe_name = 'i am not / safe' unsafe_name = 'i am not / safe'
domain = unit.new_domain_ref(name=unsafe_name) domain = unit.new_domain_ref(name=unsafe_name)
PROVIDERS.resource_api.create_domain(domain['id'], domain) PROVIDERS.resource_api.create_domain(domain['id'], domain)
skipping to change at line 1551 skipping to change at line 1549
# authenticate # authenticate
self.config_fixture.config(group='resource', self.config_fixture.config(group='resource',
project_name_url_safe='new') project_name_url_safe='new')
self.v3_create_token(auth_data) self.v3_create_token(auth_data)
# Set the name url restriction to strict and we should fail to # Set the name url restriction to strict and we should fail to
# authenticate # authenticate
self.config_fixture.config(group='resource', self.config_fixture.config(group='resource',
domain_name_url_safe='strict') domain_name_url_safe='strict')
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_project_token_with_same_domain_and_project_name(self): def test_create_project_token_with_same_domain_and_project_name(self):
"""Authenticate to a project with the same name as its domain.""" """Authenticate to a project with the same name as its domain."""
domain = unit.new_project_ref(is_domain=True) domain = unit.new_project_ref(is_domain=True)
domain = PROVIDERS.resource_api.create_project(domain['id'], domain) domain = PROVIDERS.resource_api.create_project(domain['id'], domain)
project = unit.new_project_ref(domain_id=domain['id'], project = unit.new_project_ref(domain_id=domain['id'],
name=domain['name']) name=domain['name'])
PROVIDERS.resource_api.create_project(project['id'], project) PROVIDERS.resource_api.create_project(project['id'], project)
role_member = unit.new_role_ref() role_member = unit.new_role_ref()
PROVIDERS.role_api.create_role(role_member['id'], role_member) PROVIDERS.role_api.create_role(role_member['id'], role_member)
skipping to change at line 1591 skipping to change at line 1589
user_id=self.user['id'], user_id=self.user['id'],
domain_id=domain['id']) domain_id=domain['id'])
# authentication will fail because the project name is incorrect # authentication will fail because the project name is incorrect
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
project_name=domain['name'], project_name=domain['name'],
project_domain_name=domain['name']) project_domain_name=domain['name'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_project_token_with_disabled_project_domain_fails(self): def test_create_project_token_with_disabled_project_domain_fails(self):
# create a disabled domain # create a disabled domain
domain = unit.new_domain_ref() domain = unit.new_domain_ref()
domain = PROVIDERS.resource_api.create_domain(domain['id'], domain) domain = PROVIDERS.resource_api.create_domain(domain['id'], domain)
# create a project in the domain # create a project in the domain
project = unit.new_project_ref(domain_id=domain['id']) project = unit.new_project_ref(domain_id=domain['id'])
PROVIDERS.resource_api.create_project(project['id'], project) PROVIDERS.resource_api.create_project(project['id'], project)
skipping to change at line 1618 skipping to change at line 1616
# Disable the domain # Disable the domain
domain['enabled'] = False domain['enabled'] = False
PROVIDERS.resource_api.update_domain(domain['id'], domain) PROVIDERS.resource_api.update_domain(domain['id'], domain)
# user should not be able to auth with project_id # user should not be able to auth with project_id
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
project_id=project['id']) project_id=project['id'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
# user should not be able to auth with project_name & domain # user should not be able to auth with project_name & domain
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
project_name=project['name'], project_name=project['name'],
project_domain_id=domain['id']) project_domain_id=domain['id'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_project_token_with_default_domain_as_project(self): def test_create_project_token_with_default_domain_as_project(self):
# Authenticate to a project with the default domain as project # Authenticate to a project with the default domain as project
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
project_id=test_v3.DEFAULT_DOMAIN_ID) project_id=test_v3.DEFAULT_DOMAIN_ID)
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_project_scoped_token_is_invalid_after_disabling_user(self): def test_project_scoped_token_is_invalid_after_disabling_user(self):
project_scoped_token = self._get_project_scoped_token() project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(project_scoped_token) r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Disable the user # Disable the user
self._set_user_enabled(self.user, enabled=False) self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
project_scoped_token, project_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_project_scoped_token_invalid_after_changing_user_password(self): def test_project_scoped_token_invalid_after_changing_user_password(self):
project_scoped_token = self._get_project_scoped_token() project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(project_scoped_token) r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Update user's password # Update user's password
self.user['password'] = 'Password1' self.user['password'] = 'Password1'
PROVIDERS.identity_api.update_user(self.user['id'], self.user) PROVIDERS.identity_api.update_user(self.user['id'], self.user)
# Ensure updating user's password revokes existing tokens # Ensure updating user's password revokes existing tokens
self._validate_token( self._validate_token(
project_scoped_token, project_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_project_scoped_token_invalid_after_disabling_project(self): def test_project_scoped_token_invalid_after_disabling_project(self):
project_scoped_token = self._get_project_scoped_token() project_scoped_token = self._get_project_scoped_token()
# Make sure the token is valid # Make sure the token is valid
r = self._validate_token(project_scoped_token) r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Disable project # Disable project
self.project['enabled'] = False self.project['enabled'] = False
PROVIDERS.resource_api.update_project(self.project['id'], self.project) PROVIDERS.resource_api.update_project(self.project['id'], self.project)
# Ensure validating a token for a disabled project fails # Ensure validating a token for a disabled project fails
self._validate_token( self._validate_token(
project_scoped_token, project_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_project_scoped_token_is_invalid_after_deleting_grant(self): def test_project_scoped_token_is_invalid_after_deleting_grant(self):
# disable caching so that user grant deletion is not hidden # disable caching so that user grant deletion is not hidden
# by token caching # by token caching
self.config_fixture.config( self.config_fixture.config(
group='cache', group='cache',
enabled=False) enabled=False)
# Grant user access to project # Grant user access to project
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
skipping to change at line 1702 skipping to change at line 1700
r = self._validate_token(project_scoped_token) r = self._validate_token(project_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Delete access to project # Delete access to project
PROVIDERS.assignment_api.delete_grant( PROVIDERS.assignment_api.delete_grant(
self.role['id'], user_id=self.user['id'], self.role['id'], user_id=self.user['id'],
project_id=self.project['id'] project_id=self.project['id']
) )
# Ensure the token has been revoked # Ensure the token has been revoked
self._validate_token( self._validate_token(
project_scoped_token, project_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_no_access_to_default_project_result_in_unscoped_token(self): def test_no_access_to_default_project_result_in_unscoped_token(self):
# create a disabled project to work with # create a disabled project to work with
self.create_new_default_project_for_user(self.user['id'], self.create_new_default_project_for_user(self.user['id'],
self.domain_id) self.domain_id)
# attempt to authenticate without requesting a project # attempt to authenticate without requesting a project
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
skipping to change at line 1742 skipping to change at line 1740
expiration=10) expiration=10)
time = datetime.datetime.utcnow() time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime: with freezegun.freeze_time(time) as frozen_datetime:
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token( trust_scoped_token = self._get_trust_scoped_token(
trustee_user, trust trustee_user, trust
) )
frozen_datetime.tick(delta=datetime.timedelta(seconds=15)) frozen_datetime.tick(delta=datetime.timedelta(seconds=15))
self._validate_token( self._validate_token(
trust_scoped_token, trust_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_validate_a_trust_scoped_token_impersonated(self): def test_validate_a_trust_scoped_token_impersonated(self):
trustee_user, trust = self._create_trust(impersonation=True) trustee_user, trust = self._create_trust(impersonation=True)
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
def test_revoke_trust_scoped_token(self): def test_revoke_trust_scoped_token(self):
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
self._revoke_token(trust_scoped_token) self._revoke_token(trust_scoped_token)
self._validate_token(trust_scoped_token, self._validate_token(trust_scoped_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_trust_scoped_token_is_invalid_after_disabling_trustee(self): def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Disable trustee # Disable trustee
trustee_update_ref = dict(enabled=False) trustee_update_ref = dict(enabled=False)
PROVIDERS.identity_api.update_user( PROVIDERS.identity_api.update_user(
trustee_user['id'], trustee_update_ref trustee_user['id'], trustee_update_ref
) )
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
trust_scoped_token, trust_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_trust_token_is_invalid_when_trustee_domain_disabled(self): def test_trust_token_is_invalid_when_trustee_domain_disabled(self):
# create a new domain with new user in that domain # create a new domain with new user in that domain
new_domain_ref = unit.new_domain_ref() new_domain_ref = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain( PROVIDERS.resource_api.create_domain(
new_domain_ref['id'], new_domain_ref new_domain_ref['id'], new_domain_ref
) )
trustee_ref = unit.create_user(PROVIDERS.identity_api, trustee_ref = unit.create_user(PROVIDERS.identity_api,
skipping to change at line 1829 skipping to change at line 1827
# ensure the project-scoped token from the trust is valid # ensure the project-scoped token from the trust is valid
self._validate_token(trust_scoped_token) self._validate_token(trust_scoped_token)
disable_body = {'domain': {'enabled': False}} disable_body = {'domain': {'enabled': False}}
self.patch( self.patch(
'/domains/%(domain_id)s' % {'domain_id': new_domain_ref['id']}, '/domains/%(domain_id)s' % {'domain_id': new_domain_ref['id']},
body=disable_body) body=disable_body)
# ensure the project-scoped token from the trust is invalid # ensure the project-scoped token from the trust is invalid
self._validate_token(trust_scoped_token, self._validate_token(trust_scoped_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_trust_scoped_token_invalid_after_changing_trustee_password(self): def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Change trustee's password # Change trustee's password
trustee_update_ref = dict(password='Password1') trustee_update_ref = dict(password='Password1')
PROVIDERS.identity_api.update_user( PROVIDERS.identity_api.update_user(
trustee_user['id'], trustee_update_ref trustee_user['id'], trustee_update_ref
) )
# Ensure updating trustee's password revokes existing tokens # Ensure updating trustee's password revokes existing tokens
self._validate_token( self._validate_token(
trust_scoped_token, trust_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_trust_scoped_token_is_invalid_after_disabling_trustor(self): def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Disable the trustor # Disable the trustor
trustor_update_ref = dict(enabled=False) trustor_update_ref = dict(enabled=False)
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref) PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
trust_scoped_token, trust_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_trust_scoped_token_invalid_after_changing_trustor_password(self): def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Change trustor's password # Change trustor's password
trustor_update_ref = dict(password='Password1') trustor_update_ref = dict(password='Password1')
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref) PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure updating trustor's password revokes existing user's tokens # Ensure updating trustor's password revokes existing user's tokens
self._validate_token( self._validate_token(
trust_scoped_token, trust_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self): def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Disable trustor's domain # Disable trustor's domain
self.domain['enabled'] = False self.domain['enabled'] = False
PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain) PROVIDERS.resource_api.update_domain(self.domain['id'], self.domain)
trustor_update_ref = dict(password='Password1') trustor_update_ref = dict(password='Password1')
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref) PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure updating trustor's password revokes existing user's tokens # Ensure updating trustor's password revokes existing user's tokens
self._validate_token( self._validate_token(
trust_scoped_token, trust_scoped_token,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_default_fixture_scope_token(self): def test_default_fixture_scope_token(self):
self.assertIsNotNone(self.get_scoped_token()) self.assertIsNotNone(self.get_scoped_token())
def test_rescoping_token(self): def test_rescoping_token(self):
expires = self.v3_token_data['token']['expires_at'] expires = self.v3_token_data['token']['expires_at']
# rescope the token # rescope the token
r = self.v3_create_token(self.build_authentication_request( r = self.v3_create_token(self.build_authentication_request(
token=self.v3_token, token=self.v3_token,
project_id=self.project_id)) project_id=self.project_id))
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# ensure token expiration stayed the same # ensure token expiration stayed the same
self.assertTimestampEqual(expires, r.result['token']['expires_at']) self.assertTimestampEqual(expires, r.result['token']['expires_at'])
def test_check_token(self): def test_check_token(self):
self.head('/auth/tokens', headers=self.headers, self.head('/auth/tokens', headers=self.headers,
expected_status=http_client.OK) expected_status=http.client.OK)
def test_validate_token(self): def test_validate_token(self):
r = self.get('/auth/tokens', headers=self.headers) r = self.get('/auth/tokens', headers=self.headers)
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
def test_validate_missing_subject_token(self): def test_validate_missing_subject_token(self):
self.get('/auth/tokens', self.get('/auth/tokens',
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_validate_missing_auth_token(self): def test_validate_missing_auth_token(self):
self.admin_request( self.admin_request(
method='GET', method='GET',
path='/v3/projects', path='/v3/projects',
token=None, token=None,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_validate_token_nocatalog(self): def test_validate_token_nocatalog(self):
v3_token = self.get_requested_token(self.build_authentication_request( v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
project_id=self.project['id'])) project_id=self.project['id']))
r = self.get( r = self.get(
'/auth/tokens?nocatalog', '/auth/tokens?nocatalog',
headers={'X-Subject-Token': v3_token}) headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, require_catalog=False) self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
skipping to change at line 2022 skipping to change at line 2020
def _create_role(self, domain_id=None): def _create_role(self, domain_id=None):
"""Call ``POST /roles``.""" """Call ``POST /roles``."""
ref = unit.new_role_ref(domain_id=domain_id) ref = unit.new_role_ref(domain_id=domain_id)
r = self.post('/roles', body={'role': ref}) r = self.post('/roles', body={'role': ref})
return self.assertValidRoleResponse(r, ref) return self.assertValidRoleResponse(r, ref)
def _create_implied_role(self, prior_id): def _create_implied_role(self, prior_id):
implied = self._create_role() implied = self._create_role()
url = '/roles/%s/implies/%s' % (prior_id, implied['id']) url = '/roles/%s/implies/%s' % (prior_id, implied['id'])
self.put(url, expected_status=http_client.CREATED) self.put(url, expected_status=http.client.CREATED)
return implied return implied
def _delete_implied_role(self, prior_role_id, implied_role_id): def _delete_implied_role(self, prior_role_id, implied_role_id):
url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id) url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id)
self.delete(url) self.delete(url)
def _get_scoped_token_roles(self, is_domain=False): def _get_scoped_token_roles(self, is_domain=False):
if is_domain: if is_domain:
v3_token = self.get_domain_scoped_token() v3_token = self.get_domain_scoped_token()
else: else:
skipping to change at line 2208 skipping to change at line 2206
self.config_fixture.config(group='token') self.config_fixture.config(group='token')
token_roles = self._get_scoped_token_roles() token_roles = self._get_scoped_token_roles()
prior = token_roles[0]['id'] prior = token_roles[0]['id']
implied = self._create_implied_role(prior) implied = self._create_implied_role(prior)
token_roles = self._get_scoped_token_roles() token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles)) self.assertEqual(2, len(token_roles))
unrelated = self._create_role() unrelated = self._create_role()
url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id']) url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id'])
self.put(url, expected_status=http_client.CREATED) self.put(url, expected_status=http.client.CREATED)
token_roles = self._get_scoped_token_roles() token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles)) self.assertEqual(2, len(token_roles))
self._delete_implied_role(unrelated['id'], implied['id']) self._delete_implied_role(unrelated['id'], implied['id'])
token_roles = self._get_scoped_token_roles() token_roles = self._get_scoped_token_roles()
self.assertEqual(2, len(token_roles)) self.assertEqual(2, len(token_roles))
def test_domain_specific_roles_do_not_show_v3_token(self): def test_domain_specific_roles_do_not_show_v3_token(self):
self.config_fixture.config(group='token') self.config_fixture.config(group='token')
skipping to change at line 2267 skipping to change at line 2265
r = self.get('/auth/tokens', headers=headers) r = self.get('/auth/tokens', headers=headers)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# remove the roles from the user for the given scope # remove the roles from the user for the given scope
path = '/projects/%s/users/%s/roles/%s' % ( path = '/projects/%s/users/%s/roles/%s' % (
self.project['id'], new_user['id'], self.role['id']) self.project['id'], new_user['id'], self.role['id'])
self.delete(path=path) self.delete(path=path)
# token validation should now result in 404 # token validation should now result in 404
self.get('/auth/tokens', headers=headers, self.get('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_create_token_with_nonexistant_user_id_fails(self): def test_create_token_with_nonexistant_user_id_fails(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=uuid.uuid4().hex, user_id=uuid.uuid4().hex,
password=self.user['password']) password=self.user['password'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_token_with_nonexistant_username_fails(self): def test_create_token_with_nonexistant_username_fails(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
username=uuid.uuid4().hex, username=uuid.uuid4().hex,
user_domain_id=self.domain['id'], user_domain_id=self.domain['id'],
password=self.user['password']) password=self.user['password'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_token_with_nonexistant_domain_id_fails(self): def test_create_token_with_nonexistant_domain_id_fails(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
username=self.user['name'], username=self.user['name'],
user_domain_id=uuid.uuid4().hex, user_domain_id=uuid.uuid4().hex,
password=self.user['password']) password=self.user['password'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_token_with_nonexistant_domain_name_fails(self): def test_create_token_with_nonexistant_domain_name_fails(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
username=self.user['name'], username=self.user['name'],
user_domain_name=uuid.uuid4().hex, user_domain_name=uuid.uuid4().hex,
password=self.user['password']) password=self.user['password'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_token_with_wrong_password_fails(self): def test_create_token_with_wrong_password_fails(self):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=uuid.uuid4().hex) password=uuid.uuid4().hex)
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_user_and_group_roles_scoped_token(self): def test_user_and_group_roles_scoped_token(self):
"""Test correct roles are returned in scoped token. """Test correct roles are returned in scoped token.
Test Plan: Test Plan:
- Create a domain, with 1 project, 2 users (user1 and user2) - Create a domain, with 1 project, 2 users (user1 and user2)
and 2 groups (group1 and group2) and 2 groups (group1 and group2)
- Make user1 a member of group1, user2 a member of group2 - Make user1 a member of group1, user2 a member of group2
- Create 8 roles, assigning them to each of the 8 combinations - Create 8 roles, assigning them to each of the 8 combinations
skipping to change at line 2549 skipping to change at line 2547
self.assertEqual(self.default_domain_user['id'], self.assertEqual(self.default_domain_user['id'],
auth_contexts[-1]['user_id']) auth_contexts[-1]['user_id'])
def test_remote_user_no_domain(self): def test_remote_user_no_domain(self):
app = self.loadapp() app = self.loadapp()
with app.test_client() as c: with app.test_client() as c:
c.environ_base.update(self.build_external_auth_environ( c.environ_base.update(self.build_external_auth_environ(
self.user['name'])) self.user['name']))
auth_request = self.build_authentication_request() auth_request = self.build_authentication_request()
c.post('/v3/auth/tokens', json=auth_request, c.post('/v3/auth/tokens', json=auth_request,
expected_status_code=http_client.UNAUTHORIZED) expected_status_code=http.client.UNAUTHORIZED)
def test_remote_user_and_password(self): def test_remote_user_and_password(self):
# both REMOTE_USER and password methods must pass. # both REMOTE_USER and password methods must pass.
# note that they do not have to match # note that they do not have to match
app = self.loadapp() app = self.loadapp()
with app.test_client() as c: with app.test_client() as c:
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_domain_id=self.default_domain_user['domain_id'], user_domain_id=self.default_domain_user['domain_id'],
username=self.default_domain_user['name'], username=self.default_domain_user['name'],
password=self.default_domain_user['password']) password=self.default_domain_user['password'])
skipping to change at line 2574 skipping to change at line 2572
# note that they do not have to match # note that they do not have to match
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_domain_id=self.domain['id'], user_domain_id=self.domain['id'],
username=self.user['name'], username=self.user['name'],
password=self.user['password']) password=self.user['password'])
auth_data['auth']['identity']['methods'] = ["password", "external"] auth_data['auth']['identity']['methods'] = ["password", "external"]
auth_data['auth']['identity']['external'] = {} auth_data['auth']['identity']['external'] = {}
app = self.loadapp() app = self.loadapp()
with app.test_client() as c: with app.test_client() as c:
c.post('/v3/auth/tokens', json=auth_data, c.post('/v3/auth/tokens', json=auth_data,
expected_status_code=http_client.UNAUTHORIZED) expected_status_code=http.client.UNAUTHORIZED)
def test_remote_user_bad_password(self): def test_remote_user_bad_password(self):
# both REMOTE_USER and password methods must pass. # both REMOTE_USER and password methods must pass.
app = self.loadapp() app = self.loadapp()
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_domain_id=self.domain['id'], user_domain_id=self.domain['id'],
username=self.user['name'], username=self.user['name'],
password='badpassword') password='badpassword')
with app.test_client() as c: with app.test_client() as c:
c.post('/v3/auth/tokens', json=auth_data, c.post('/v3/auth/tokens', json=auth_data,
expected_status_code=http_client.UNAUTHORIZED) expected_status_code=http.client.UNAUTHORIZED)
def test_fetch_expired_allow_expired(self): def test_fetch_expired_allow_expired(self):
self.config_fixture.config(group='token', self.config_fixture.config(group='token',
expiration=10, expiration=10,
allow_expired_window=20) allow_expired_window=20)
time = datetime.datetime.utcnow() time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime: with freezegun.freeze_time(time) as frozen_datetime:
token = self._get_project_scoped_token() token = self._get_project_scoped_token()
# initially it validates because it's within time # initially it validates because it's within time
frozen_datetime.tick(delta=datetime.timedelta(seconds=2)) frozen_datetime.tick(delta=datetime.timedelta(seconds=2))
self._validate_token(token) self._validate_token(token)
# after passing expiry time validation fails # after passing expiry time validation fails
frozen_datetime.tick(delta=datetime.timedelta(seconds=12)) frozen_datetime.tick(delta=datetime.timedelta(seconds=12))
self._validate_token(token, expected_status=http_client.NOT_FOUND) self._validate_token(token, expected_status=http.client.NOT_FOUND)
# but if we pass allow_expired it validates # but if we pass allow_expired it validates
self._validate_token(token, allow_expired=True) self._validate_token(token, allow_expired=True)
# and then if we're passed the allow_expired_window it will fail # and then if we're passed the allow_expired_window it will fail
# anyway raises expired when now > expiration + window # anyway raises expired when now > expiration + window
frozen_datetime.tick(delta=datetime.timedelta(seconds=22)) frozen_datetime.tick(delta=datetime.timedelta(seconds=22))
self._validate_token(token, self._validate_token(token,
allow_expired=True, allow_expired=True,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_system_scoped_token_works_with_domain_specific_drivers(self): def test_system_scoped_token_works_with_domain_specific_drivers(self):
self.config_fixture.config( self.config_fixture.config(
group='identity', domain_specific_drivers_enabled=True group='identity', domain_specific_drivers_enabled=True
) )
PROVIDERS.assignment_api.create_system_grant_for_user( PROVIDERS.assignment_api.create_system_grant_for_user(
self.user['id'], self.role['id'] self.user['id'], self.role['id']
) )
skipping to change at line 2671 skipping to change at line 2669
) )
self.headers['X-Subject-Token'] = project_scoped_token self.headers['X-Subject-Token'] = project_scoped_token
r = self.get('/auth/tokens', headers=self.headers) r = self.get('/auth/tokens', headers=self.headers)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
def test_extra_data_in_unscoped_token_fails_validation(self): def test_extra_data_in_unscoped_token_fails_validation(self):
# ensure unscoped token response contains the appropriate data # ensure unscoped token response contains the appropriate data
r = self.get('/auth/tokens', headers=self.headers) r = self.get('/auth/tokens', headers=self.headers)
# populate the response result with some extra data # populate the response result with some extra data
r.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex) r.result['token'][u'extra'] = str(uuid.uuid4().hex)
self.assertRaises(exception.SchemaValidationError, self.assertRaises(exception.SchemaValidationError,
self.assertValidUnscopedTokenResponse, self.assertValidUnscopedTokenResponse,
r) r)
def test_extra_data_in_domain_scoped_token_fails_validation(self): def test_extra_data_in_domain_scoped_token_fails_validation(self):
# ensure domain scoped token response contains the appropriate data # ensure domain scoped token response contains the appropriate data
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
self.role['id'], self.role['id'],
user_id=self.default_domain_user['id'], user_id=self.default_domain_user['id'],
domain_id=self.domain['id']) domain_id=self.domain['id'])
skipping to change at line 2693 skipping to change at line 2691
domain_scoped_token = self.get_requested_token( domain_scoped_token = self.get_requested_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.default_domain_user['id'], user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'], password=self.default_domain_user['password'],
domain_id=self.domain['id']) domain_id=self.domain['id'])
) )
self.headers['X-Subject-Token'] = domain_scoped_token self.headers['X-Subject-Token'] = domain_scoped_token
r = self.get('/auth/tokens', headers=self.headers) r = self.get('/auth/tokens', headers=self.headers)
# populate the response result with some extra data # populate the response result with some extra data
r.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex) r.result['token'][u'extra'] = str(uuid.uuid4().hex)
self.assertRaises(exception.SchemaValidationError, self.assertRaises(exception.SchemaValidationError,
self.assertValidDomainScopedTokenResponse, self.assertValidDomainScopedTokenResponse,
r) r)
def test_extra_data_in_project_scoped_token_fails_validation(self): def test_extra_data_in_project_scoped_token_fails_validation(self):
# ensure project scoped token responses contains the appropriate data # ensure project scoped token responses contains the appropriate data
project_scoped_token = self.get_requested_token( project_scoped_token = self.get_requested_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.default_domain_user['id'], user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'], password=self.default_domain_user['password'],
project_id=self.default_domain_project['id']) project_id=self.default_domain_project['id'])
) )
self.headers['X-Subject-Token'] = project_scoped_token self.headers['X-Subject-Token'] = project_scoped_token
resp = self.get('/auth/tokens', headers=self.headers) resp = self.get('/auth/tokens', headers=self.headers)
# populate the response result with some extra data # populate the response result with some extra data
resp.result['token'][u'extra'] = six.text_type(uuid.uuid4().hex) resp.result['token'][u'extra'] = str(uuid.uuid4().hex)
self.assertRaises(exception.SchemaValidationError, self.assertRaises(exception.SchemaValidationError,
self.assertValidProjectScopedTokenResponse, self.assertValidProjectScopedTokenResponse,
resp) resp)
class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
def config_overrides(self): def config_overrides(self):
super(AllowRescopeScopedTokenDisabledTests, self).config_overrides() super(AllowRescopeScopedTokenDisabledTests, self).config_overrides()
self.config_fixture.config( self.config_fixture.config(
group='token', group='token',
allow_rescope_scoped_token=False) allow_rescope_scoped_token=False)
def test_rescoping_v3_to_v3_disabled(self): def test_rescoping_v3_to_v3_disabled(self):
self.v3_create_token( self.v3_create_token(
self.build_authentication_request( self.build_authentication_request(
token=self.get_scoped_token(), token=self.get_scoped_token(),
project_id=self.project_id), project_id=self.project_id),
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_rescoped_domain_token_disabled(self): def test_rescoped_domain_token_disabled(self):
self.domainA = unit.new_domain_ref() self.domainA = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA) PROVIDERS.resource_api.create_domain(self.domainA['id'], self.domainA)
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
self.role['id'], user_id=self.user['id'], self.role['id'], user_id=self.user['id'],
domain_id=self.domainA['id'] domain_id=self.domainA['id']
) )
unscoped_token = self.get_requested_token( unscoped_token = self.get_requested_token(
skipping to change at line 2750 skipping to change at line 2748
password=self.user['password'])) password=self.user['password']))
# Get a domain-scoped token from the unscoped token # Get a domain-scoped token from the unscoped token
domain_scoped_token = self.get_requested_token( domain_scoped_token = self.get_requested_token(
self.build_authentication_request( self.build_authentication_request(
token=unscoped_token, token=unscoped_token,
domain_id=self.domainA['id'])) domain_id=self.domainA['id']))
self.v3_create_token( self.v3_create_token(
self.build_authentication_request( self.build_authentication_request(
token=domain_scoped_token, token=domain_scoped_token,
project_id=self.project_id), project_id=self.project_id),
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests,
TokenDataTests): TokenDataTests):
def config_overrides(self): def config_overrides(self):
super(TestFernetTokenAPIs, self).config_overrides() super(TestFernetTokenAPIs, self).config_overrides()
self.config_fixture.config(group='token', provider='fernet', self.config_fixture.config(group='token', provider='fernet',
cache_on_issue=True) cache_on_issue=True)
self.useFixture( self.useFixture(
ksfixtures.KeyRepository( ksfixtures.KeyRepository(
self.config_fixture, self.config_fixture,
skipping to change at line 2780 skipping to change at line 2778
def _make_auth_request(self, auth_data): def _make_auth_request(self, auth_data):
token = super(TestFernetTokenAPIs, self)._make_auth_request(auth_data) token = super(TestFernetTokenAPIs, self)._make_auth_request(auth_data)
self.assertLess(len(token), 255) self.assertLess(len(token), 255)
return token return token
def test_validate_tampered_unscoped_token_fails(self): def test_validate_tampered_unscoped_token_fails(self):
unscoped_token = self._get_unscoped_token() unscoped_token = self._get_unscoped_token()
tampered_token = (unscoped_token[:50] + uuid.uuid4().hex + tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
unscoped_token[50 + 32:]) unscoped_token[50 + 32:])
self._validate_token(tampered_token, self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_validate_tampered_project_scoped_token_fails(self): def test_validate_tampered_project_scoped_token_fails(self):
project_scoped_token = self._get_project_scoped_token() project_scoped_token = self._get_project_scoped_token()
tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex + tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
project_scoped_token[50 + 32:]) project_scoped_token[50 + 32:])
self._validate_token(tampered_token, self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_validate_tampered_trust_scoped_token_fails(self): def test_validate_tampered_trust_scoped_token_fails(self):
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Get a trust scoped token # Get a trust scoped token
tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex + tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
trust_scoped_token[50 + 32:]) trust_scoped_token[50 + 32:])
self._validate_token(tampered_token, self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_trust_scoped_token_is_invalid_after_disabling_trustor(self): def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
# NOTE(amakarov): have to override this test for non-persistent tokens # NOTE(amakarov): have to override this test for non-persistent tokens
# as TokenNotFound exception makes no sense for those. # as TokenNotFound exception makes no sense for those.
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Disable the trustor # Disable the trustor
trustor_update_ref = dict(enabled=False) trustor_update_ref = dict(enabled=False)
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref) PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
trust_scoped_token, trust_scoped_token,
expected_status=http_client.FORBIDDEN expected_status=http.client.FORBIDDEN
) )
class TestJWSTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, TokenDataTests): class TestJWSTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, TokenDataTests):
def config_overrides(self): def config_overrides(self):
super(TestJWSTokenAPIs, self).config_overrides() super(TestJWSTokenAPIs, self).config_overrides()
self.config_fixture.config(group='token', provider='jws', self.config_fixture.config(group='token', provider='jws',
cache_on_issue=True) cache_on_issue=True)
self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture)) self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture))
def setUp(self): def setUp(self):
skipping to change at line 2837 skipping to change at line 2835
def _make_auth_request(self, auth_data): def _make_auth_request(self, auth_data):
token = super(TestJWSTokenAPIs, self)._make_auth_request(auth_data) token = super(TestJWSTokenAPIs, self)._make_auth_request(auth_data)
self.assertLess(len(token), 350) self.assertLess(len(token), 350)
return token return token
def test_validate_tampered_unscoped_token_fails(self): def test_validate_tampered_unscoped_token_fails(self):
unscoped_token = self._get_unscoped_token() unscoped_token = self._get_unscoped_token()
tampered_token = (unscoped_token[:50] + uuid.uuid4().hex + tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
unscoped_token[50 + 32:]) unscoped_token[50 + 32:])
self._validate_token(tampered_token, self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_validate_tampered_project_scoped_token_fails(self): def test_validate_tampered_project_scoped_token_fails(self):
project_scoped_token = self._get_project_scoped_token() project_scoped_token = self._get_project_scoped_token()
tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex + tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
project_scoped_token[50 + 32:]) project_scoped_token[50 + 32:])
self._validate_token(tampered_token, self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_validate_tampered_trust_scoped_token_fails(self): def test_validate_tampered_trust_scoped_token_fails(self):
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Get a trust scoped token # Get a trust scoped token
tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex + tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
trust_scoped_token[50 + 32:]) trust_scoped_token[50 + 32:])
self._validate_token(tampered_token, self._validate_token(tampered_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_trust_scoped_token_is_invalid_after_disabling_trustor(self): def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
# NOTE(amakarov): have to override this test for non-persistent tokens # NOTE(amakarov): have to override this test for non-persistent tokens
# as TokenNotFound exception makes no sense for those. # as TokenNotFound exception makes no sense for those.
trustee_user, trust = self._create_trust() trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Validate a trust scoped token # Validate a trust scoped token
r = self._validate_token(trust_scoped_token) r = self._validate_token(trust_scoped_token)
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
# Disable the trustor # Disable the trustor
trustor_update_ref = dict(enabled=False) trustor_update_ref = dict(enabled=False)
PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref) PROVIDERS.identity_api.update_user(self.user['id'], trustor_update_ref)
# Ensure validating a token for a disabled user fails # Ensure validating a token for a disabled user fails
self._validate_token( self._validate_token(
trust_scoped_token, trust_scoped_token,
expected_status=http_client.FORBIDDEN expected_status=http.client.FORBIDDEN
) )
class TestTokenRevokeById(test_v3.RestfulTestCase): class TestTokenRevokeById(test_v3.RestfulTestCase):
"""Test token revocation on the v3 Identity API.""" """Test token revocation on the v3 Identity API."""
def config_overrides(self): def config_overrides(self):
super(TestTokenRevokeById, self).config_overrides() super(TestTokenRevokeById, self).config_overrides()
self.config_fixture.config( self.config_fixture.config(
group='token', group='token',
provider='fernet', provider='fernet',
skipping to change at line 2998 skipping to change at line 2996
password=self.user1['password'])) password=self.user1['password']))
scoped_token = self.get_requested_token( scoped_token = self.get_requested_token(
self.build_authentication_request( self.build_authentication_request(
token=unscoped_token, token=unscoped_token,
project_id=self.projectA['id'])) project_id=self.projectA['id']))
# confirm both tokens are valid # confirm both tokens are valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token}, headers={'X-Subject-Token': unscoped_token},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': scoped_token}, headers={'X-Subject-Token': scoped_token},
expected_status=http_client.OK) expected_status=http.client.OK)
# create a new role # create a new role
role = unit.new_role_ref() role = unit.new_role_ref()
PROVIDERS.role_api.create_role(role['id'], role) PROVIDERS.role_api.create_role(role['id'], role)
# assign a new role # assign a new role
self.put( self.put(
'/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % { '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
'project_id': self.projectA['id'], 'project_id': self.projectA['id'],
'user_id': self.user1['id'], 'user_id': self.user1['id'],
'role_id': role['id']}) 'role_id': role['id']})
# both tokens should remain valid # both tokens should remain valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token}, headers={'X-Subject-Token': unscoped_token},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': scoped_token}, headers={'X-Subject-Token': scoped_token},
expected_status=http_client.OK) expected_status=http.client.OK)
def test_deleting_user_grant_revokes_token(self): def test_deleting_user_grant_revokes_token(self):
"""Test deleting a user grant revokes token. """Test deleting a user grant revokes token.
Test Plan: Test Plan:
- Get a token for user, scoped to Project - Get a token for user, scoped to Project
- Delete the grant user has on Project - Delete the grant user has on Project
- Check token is no longer valid - Check token is no longer valid
""" """
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
project_id=self.project['id']) project_id=self.project['id'])
token = self.get_requested_token(auth_data) token = self.get_requested_token(auth_data)
# Confirm token is valid # Confirm token is valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.OK) expected_status=http.client.OK)
# Delete the grant, which should invalidate the token # Delete the grant, which should invalidate the token
grant_url = ( grant_url = (
'/projects/%(project_id)s/users/%(user_id)s/' '/projects/%(project_id)s/users/%(user_id)s/'
'roles/%(role_id)s' % { 'roles/%(role_id)s' % {
'project_id': self.project['id'], 'project_id': self.project['id'],
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': self.role['id']}) 'role_id': self.role['id']})
self.delete(grant_url) self.delete(grant_url)
self.head('/auth/tokens', token=token, self.head('/auth/tokens', token=token,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def role_data_fixtures(self): def role_data_fixtures(self):
self.projectC = unit.new_project_ref(domain_id=self.domainA['id']) self.projectC = unit.new_project_ref(domain_id=self.domainA['id'])
PROVIDERS.resource_api.create_project( PROVIDERS.resource_api.create_project(
self.projectC['id'], self.projectC self.projectC['id'], self.projectC
) )
self.user4 = unit.create_user(PROVIDERS.identity_api, self.user4 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainB['id']) domain_id=self.domainB['id'])
self.user5 = unit.create_user(PROVIDERS.identity_api, self.user5 = unit.create_user(PROVIDERS.identity_api,
domain_id=self.domainA['id']) domain_id=self.domainA['id'])
skipping to change at line 3140 skipping to change at line 3138
project_id=self.projectA['id']) project_id=self.projectA['id'])
tokenD = self.get_requested_token(auth_data) tokenD = self.get_requested_token(auth_data)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user6['id'], user_id=self.user6['id'],
password=self.user6['password'], password=self.user6['password'],
domain_id=self.domainA['id']) domain_id=self.domainA['id'])
tokenE = self.get_requested_token(auth_data) tokenE = self.get_requested_token(auth_data)
# Confirm tokens are valid # Confirm tokens are valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenA}, headers={'X-Subject-Token': tokenA},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenB}, headers={'X-Subject-Token': tokenB},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenC}, headers={'X-Subject-Token': tokenC},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenD}, headers={'X-Subject-Token': tokenD},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenE}, headers={'X-Subject-Token': tokenE},
expected_status=http_client.OK) expected_status=http.client.OK)
# Delete the role, which should invalidate the tokens # Delete the role, which should invalidate the tokens
role_url = '/roles/%s' % self.role1['id'] role_url = '/roles/%s' % self.role1['id']
self.delete(role_url) self.delete(role_url)
# Check the tokens that used role1 is invalid # Check the tokens that used role1 is invalid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenA}, headers={'X-Subject-Token': tokenA},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenB}, headers={'X-Subject-Token': tokenB},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenD}, headers={'X-Subject-Token': tokenD},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenE}, headers={'X-Subject-Token': tokenE},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
# ...but the one using role2 is still valid # ...but the one using role2 is still valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': tokenC}, headers={'X-Subject-Token': tokenC},
expected_status=http_client.OK) expected_status=http.client.OK)
def test_domain_user_role_assignment_maintains_token(self): def test_domain_user_role_assignment_maintains_token(self):
"""Test user-domain role assignment maintains existing token. """Test user-domain role assignment maintains existing token.
Test Plan: Test Plan:
- Get a token for user1, scoped to ProjectA - Get a token for user1, scoped to ProjectA
- Create a grant for user1 on DomainB - Create a grant for user1 on DomainB
- Check token is still valid - Check token is still valid
""" """
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user1['id'], user_id=self.user1['id'],
password=self.user1['password'], password=self.user1['password'],
project_id=self.projectA['id']) project_id=self.projectA['id'])
token = self.get_requested_token(auth_data) token = self.get_requested_token(auth_data)
# Confirm token is valid # Confirm token is valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.OK) expected_status=http.client.OK)
# Assign a role, which should not affect the token # Assign a role, which should not affect the token
grant_url = ( grant_url = (
'/domains/%(domain_id)s/users/%(user_id)s/' '/domains/%(domain_id)s/users/%(user_id)s/'
'roles/%(role_id)s' % { 'roles/%(role_id)s' % {
'domain_id': self.domainB['id'], 'domain_id': self.domainB['id'],
'user_id': self.user1['id'], 'user_id': self.user1['id'],
'role_id': self.role1['id']}) 'role_id': self.role1['id']})
self.put(grant_url) self.put(grant_url)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.OK) expected_status=http.client.OK)
def test_disabling_project_revokes_token(self): def test_disabling_project_revokes_token(self):
token = self.get_requested_token( token = self.get_requested_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user3['id'], user_id=self.user3['id'],
password=self.user3['password'], password=self.user3['password'],
project_id=self.projectA['id'])) project_id=self.projectA['id']))
# confirm token is valid # confirm token is valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.OK) expected_status=http.client.OK)
# disable the project, which should invalidate the token # disable the project, which should invalidate the token
self.patch( self.patch(
'/projects/%(project_id)s' % {'project_id': self.projectA['id']}, '/projects/%(project_id)s' % {'project_id': self.projectA['id']},
body={'project': {'enabled': False}}) body={'project': {'enabled': False}})
# user should no longer have access to the project # user should no longer have access to the project
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.v3_create_token( self.v3_create_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user3['id'], user_id=self.user3['id'],
password=self.user3['password'], password=self.user3['password'],
project_id=self.projectA['id']), project_id=self.projectA['id']),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_deleting_project_revokes_token(self): def test_deleting_project_revokes_token(self):
token = self.get_requested_token( token = self.get_requested_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user3['id'], user_id=self.user3['id'],
password=self.user3['password'], password=self.user3['password'],
project_id=self.projectA['id'])) project_id=self.projectA['id']))
# confirm token is valid # confirm token is valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.OK) expected_status=http.client.OK)
# delete the project, which should invalidate the token # delete the project, which should invalidate the token
self.delete( self.delete(
'/projects/%(project_id)s' % {'project_id': self.projectA['id']}) '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
# user should no longer have access to the project # user should no longer have access to the project
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.v3_create_token( self.v3_create_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user3['id'], user_id=self.user3['id'],
password=self.user3['password'], password=self.user3['password'],
project_id=self.projectA['id']), project_id=self.projectA['id']),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_deleting_group_grant_revokes_tokens(self): def test_deleting_group_grant_revokes_tokens(self):
"""Test deleting a group grant revokes tokens. """Test deleting a group grant revokes tokens.
Test Plan: Test Plan:
- Get a token for user1, scoped to ProjectA - Get a token for user1, scoped to ProjectA
- Get a token for user2, scoped to ProjectA - Get a token for user2, scoped to ProjectA
- Get a token for user3, scoped to ProjectA - Get a token for user3, scoped to ProjectA
- Delete the grant group1 has on ProjectA - Delete the grant group1 has on ProjectA
skipping to change at line 3295 skipping to change at line 3293
project_id=self.projectA['id']) project_id=self.projectA['id'])
token2 = self.get_requested_token(auth_data) token2 = self.get_requested_token(auth_data)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user3['id'], user_id=self.user3['id'],
password=self.user3['password'], password=self.user3['password'],
project_id=self.projectA['id']) project_id=self.projectA['id'])
token3 = self.get_requested_token(auth_data) token3 = self.get_requested_token(auth_data)
# Confirm tokens are valid # Confirm tokens are valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token1}, headers={'X-Subject-Token': token1},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token2}, headers={'X-Subject-Token': token2},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token3}, headers={'X-Subject-Token': token3},
expected_status=http_client.OK) expected_status=http.client.OK)
# Delete the group grant, which should invalidate the # Delete the group grant, which should invalidate the
# tokens for user1 and user2 # tokens for user1 and user2
grant_url = ( grant_url = (
'/projects/%(project_id)s/groups/%(group_id)s/' '/projects/%(project_id)s/groups/%(group_id)s/'
'roles/%(role_id)s' % { 'roles/%(role_id)s' % {
'project_id': self.projectA['id'], 'project_id': self.projectA['id'],
'group_id': self.group1['id'], 'group_id': self.group1['id'],
'role_id': self.role1['id']}) 'role_id': self.role1['id']})
self.delete(grant_url) self.delete(grant_url)
PROVIDERS.assignment_api.delete_grant( PROVIDERS.assignment_api.delete_grant(
role_id=self.role1['id'], project_id=self.projectA['id'], role_id=self.role1['id'], project_id=self.projectA['id'],
user_id=self.user1['id'] user_id=self.user1['id']
) )
PROVIDERS.assignment_api.delete_grant( PROVIDERS.assignment_api.delete_grant(
role_id=self.role1['id'], project_id=self.projectA['id'], role_id=self.role1['id'], project_id=self.projectA['id'],
user_id=self.user2['id'] user_id=self.user2['id']
) )
self.head('/auth/tokens', token=token1, self.head('/auth/tokens', token=token1,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
self.head('/auth/tokens', token=token2, self.head('/auth/tokens', token=token2,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
# But user3's token should be invalid too as revocation is done for # But user3's token should be invalid too as revocation is done for
# scope role & project # scope role & project
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token3}, headers={'X-Subject-Token': token3},
expected_status=http_client.OK) expected_status=http.client.OK)
def test_domain_group_role_assignment_maintains_token(self): def test_domain_group_role_assignment_maintains_token(self):
"""Test domain-group role assignment maintains existing token. """Test domain-group role assignment maintains existing token.
Test Plan: Test Plan:
- Get a token for user1, scoped to ProjectA - Get a token for user1, scoped to ProjectA
- Create a grant for group1 on DomainB - Create a grant for group1 on DomainB
- Check token is still longer valid - Check token is still longer valid
""" """
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user1['id'], user_id=self.user1['id'],
password=self.user1['password'], password=self.user1['password'],
project_id=self.projectA['id']) project_id=self.projectA['id'])
token = self.get_requested_token(auth_data) token = self.get_requested_token(auth_data)
# Confirm token is valid # Confirm token is valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.OK) expected_status=http.client.OK)
# Delete the grant, which should invalidate the token # Delete the grant, which should invalidate the token
grant_url = ( grant_url = (
'/domains/%(domain_id)s/groups/%(group_id)s/' '/domains/%(domain_id)s/groups/%(group_id)s/'
'roles/%(role_id)s' % { 'roles/%(role_id)s' % {
'domain_id': self.domainB['id'], 'domain_id': self.domainB['id'],
'group_id': self.group1['id'], 'group_id': self.group1['id'],
'role_id': self.role1['id']}) 'role_id': self.role1['id']})
self.put(grant_url) self.put(grant_url)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token}, headers={'X-Subject-Token': token},
expected_status=http_client.OK) expected_status=http.client.OK)
def test_group_membership_changes_revokes_token(self): def test_group_membership_changes_revokes_token(self):
"""Test add/removal to/from group revokes token. """Test add/removal to/from group revokes token.
Test Plan: Test Plan:
- Get a token for user1, scoped to ProjectA - Get a token for user1, scoped to ProjectA
- Get a token for user2, scoped to ProjectA - Get a token for user2, scoped to ProjectA
- Remove user1 from group1 - Remove user1 from group1
- Check token for user1 is no longer valid - Check token for user1 is no longer valid
skipping to change at line 3388 skipping to change at line 3386
project_id=self.projectA['id']) project_id=self.projectA['id'])
token1 = self.get_requested_token(auth_data) token1 = self.get_requested_token(auth_data)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user2['id'], user_id=self.user2['id'],
password=self.user2['password'], password=self.user2['password'],
project_id=self.projectA['id']) project_id=self.projectA['id'])
token2 = self.get_requested_token(auth_data) token2 = self.get_requested_token(auth_data)
# Confirm tokens are valid # Confirm tokens are valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token1}, headers={'X-Subject-Token': token1},
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token2}, headers={'X-Subject-Token': token2},
expected_status=http_client.OK) expected_status=http.client.OK)
# Remove user1 from group1, which should invalidate # Remove user1 from group1, which should invalidate
# the token # the token
self.delete('/groups/%(group_id)s/users/%(user_id)s' % { self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group1['id'], 'group_id': self.group1['id'],
'user_id': self.user1['id']}) 'user_id': self.user1['id']})
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token1}, headers={'X-Subject-Token': token1},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
# But user2's token should still be valid # But user2's token should still be valid
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token2}, headers={'X-Subject-Token': token2},
expected_status=http_client.OK) expected_status=http.client.OK)
# Adding user2 to a group should not invalidate token # Adding user2 to a group should not invalidate token
self.put('/groups/%(group_id)s/users/%(user_id)s' % { self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group2['id'], 'group_id': self.group2['id'],
'user_id': self.user2['id']}) 'user_id': self.user2['id']})
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': token2}, headers={'X-Subject-Token': token2},
expected_status=http_client.OK) expected_status=http.client.OK)
def test_removing_role_assignment_does_not_affect_other_users(self): def test_removing_role_assignment_does_not_affect_other_users(self):
"""Revoking a role from one user should not affect other users.""" """Revoking a role from one user should not affect other users."""
time = datetime.datetime.utcnow() time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime: with freezegun.freeze_time(time) as frozen_datetime:
# This group grant is not needed for the test # This group grant is not needed for the test
self.delete( self.delete(
'/projects/%(p_id)s/groups/%(g_id)s/roles/%(r_id)s' % '/projects/%(p_id)s/groups/%(g_id)s/roles/%(r_id)s' %
{'p_id': self.projectA['id'], {'p_id': self.projectA['id'],
'g_id': self.group1['id'], 'g_id': self.group1['id'],
skipping to change at line 3453 skipping to change at line 3451
# delete relationships between user1 and projectA from setUp # delete relationships between user1 and projectA from setUp
self.delete( self.delete(
'/projects/%(p_id)s/users/%(u_id)s/roles/%(r_id)s' % { '/projects/%(p_id)s/users/%(u_id)s/roles/%(r_id)s' % {
'p_id': self.projectA['id'], 'p_id': self.projectA['id'],
'u_id': self.user1['id'], 'u_id': self.user1['id'],
'r_id': self.role1['id']}) 'r_id': self.role1['id']})
# authorization for the first user should now fail # authorization for the first user should now fail
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': user1_token}, headers={'X-Subject-Token': user1_token},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.v3_create_token( self.v3_create_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user1['id'], user_id=self.user1['id'],
password=self.user1['password'], password=self.user1['password'],
project_id=self.projectA['id']), project_id=self.projectA['id']),
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
# authorization for the second user should still succeed # authorization for the second user should still succeed
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': user3_token}, headers={'X-Subject-Token': user3_token},
expected_status=http_client.OK) expected_status=http.client.OK)
self.v3_create_token( self.v3_create_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user3['id'], user_id=self.user3['id'],
password=self.user3['password'], password=self.user3['password'],
project_id=self.projectA['id'])) project_id=self.projectA['id']))
def test_deleting_project_deletes_grants(self): def test_deleting_project_deletes_grants(self):
# This is to make it a little bit more pretty with PEP8 # This is to make it a little bit more pretty with PEP8
role_path = ('/projects/%(project_id)s/users/%(user_id)s/' role_path = ('/projects/%(project_id)s/users/%(user_id)s/'
'roles/%(role_id)s') 'roles/%(role_id)s')
skipping to change at line 3487 skipping to change at line 3485
'role_id': self.role['id']} 'role_id': self.role['id']}
# grant the user a role on the project # grant the user a role on the project
self.put(role_path) self.put(role_path)
# delete the project, which should remove the roles # delete the project, which should remove the roles
self.delete( self.delete(
'/projects/%(project_id)s' % {'project_id': self.projectA['id']}) '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
# Make sure that we get a 404 Not Found when heading that role. # Make sure that we get a 404 Not Found when heading that role.
self.head(role_path, expected_status=http_client.NOT_FOUND) self.head(role_path, expected_status=http.client.NOT_FOUND)
def test_revoke_token_from_token(self): def test_revoke_token_from_token(self):
# Test that a scoped token can be requested from an unscoped token, # Test that a scoped token can be requested from an unscoped token,
# the scoped token can be revoked, and the unscoped token remains # the scoped token can be revoked, and the unscoped token remains
# valid. # valid.
unscoped_token = self.get_requested_token( unscoped_token = self.get_requested_token(
self.build_authentication_request( self.build_authentication_request(
user_id=self.user1['id'], user_id=self.user1['id'],
password=self.user1['password'])) password=self.user1['password']))
skipping to change at line 3518 skipping to change at line 3516
token=unscoped_token, token=unscoped_token,
domain_id=self.domainA['id'])) domain_id=self.domainA['id']))
# revoke the project-scoped token. # revoke the project-scoped token.
self.delete('/auth/tokens', self.delete('/auth/tokens',
headers={'X-Subject-Token': project_scoped_token}) headers={'X-Subject-Token': project_scoped_token})
# The project-scoped token is invalidated. # The project-scoped token is invalidated.
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': project_scoped_token}, headers={'X-Subject-Token': project_scoped_token},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
# The unscoped token should still be valid. # The unscoped token should still be valid.
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token}, headers={'X-Subject-Token': unscoped_token},
expected_status=http_client.OK) expected_status=http.client.OK)
# The domain-scoped token should still be valid. # The domain-scoped token should still be valid.
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': domain_scoped_token}, headers={'X-Subject-Token': domain_scoped_token},
expected_status=http_client.OK) expected_status=http.client.OK)
# revoke the domain-scoped token. # revoke the domain-scoped token.
self.delete('/auth/tokens', self.delete('/auth/tokens',
headers={'X-Subject-Token': domain_scoped_token}) headers={'X-Subject-Token': domain_scoped_token})
# The domain-scoped token is invalid. # The domain-scoped token is invalid.
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': domain_scoped_token}, headers={'X-Subject-Token': domain_scoped_token},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
# The unscoped token should still be valid. # The unscoped token should still be valid.
self.head('/auth/tokens', self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token}, headers={'X-Subject-Token': unscoped_token},
expected_status=http_client.OK) expected_status=http.client.OK)
class TestTokenRevokeApi(TestTokenRevokeById): class TestTokenRevokeApi(TestTokenRevokeById):
"""Test token revocation on the v3 Identity API.""" """Test token revocation on the v3 Identity API."""
def config_overrides(self): def config_overrides(self):
super(TestTokenRevokeApi, self).config_overrides() super(TestTokenRevokeApi, self).config_overrides()
self.config_fixture.config( self.config_fixture.config(
group='token', group='token',
provider='fernet', provider='fernet',
revoke_by_id=False) revoke_by_id=False)
skipping to change at line 3594 skipping to change at line 3592
expected_response = {'events': [kwargs]} expected_response = {'events': [kwargs]}
self.assertEqual(expected_response, events_response) self.assertEqual(expected_response, events_response)
def test_revoke_token(self): def test_revoke_token(self):
scoped_token = self.get_scoped_token() scoped_token = self.get_scoped_token()
headers = {'X-Subject-Token': scoped_token} headers = {'X-Subject-Token': scoped_token}
response = self.get('/auth/tokens', headers=headers).json_body['token'] response = self.get('/auth/tokens', headers=headers).json_body['token']
self.delete('/auth/tokens', headers=headers) self.delete('/auth/tokens', headers=headers)
self.head('/auth/tokens', headers=headers, self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
events_response = self.get('/OS-REVOKE/events').json_body events_response = self.get('/OS-REVOKE/events').json_body
self.assertValidRevokedTokenResponse(events_response, self.assertValidRevokedTokenResponse(events_response,
audit_id=response['audit_ids'][0]) audit_id=response['audit_ids'][0])
def test_get_revoke_by_id_false_returns_gone(self): def test_get_revoke_by_id_false_returns_gone(self):
self.get('/auth/tokens/OS-PKI/revoked', self.get('/auth/tokens/OS-PKI/revoked',
expected_status=http_client.GONE) expected_status=http.client.GONE)
def test_head_revoke_by_id_false_returns_gone(self): def test_head_revoke_by_id_false_returns_gone(self):
self.head('/auth/tokens/OS-PKI/revoked', self.head('/auth/tokens/OS-PKI/revoked',
expected_status=http_client.GONE) expected_status=http.client.GONE)
def test_revoke_by_id_true_returns_forbidden(self): def test_revoke_by_id_true_returns_forbidden(self):
self.config_fixture.config( self.config_fixture.config(
group='token', group='token',
revoke_by_id=True) revoke_by_id=True)
self.get( self.get(
'/auth/tokens/OS-PKI/revoked', '/auth/tokens/OS-PKI/revoked',
expected_status=http_client.FORBIDDEN expected_status=http.client.FORBIDDEN
) )
self.head( self.head(
'/auth/tokens/OS-PKI/revoked', '/auth/tokens/OS-PKI/revoked',
expected_status=http_client.FORBIDDEN expected_status=http.client.FORBIDDEN
) )
def test_list_delete_project_shows_in_event_list(self): def test_list_delete_project_shows_in_event_list(self):
self.role_data_fixtures() self.role_data_fixtures()
events = self.get('/OS-REVOKE/events').json_body['events'] events = self.get('/OS-REVOKE/events').json_body['events']
self.assertEqual([], events) self.assertEqual([], events)
self.delete( self.delete(
'/projects/%(project_id)s' % {'project_id': self.projectA['id']}) '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
events_response = self.get('/OS-REVOKE/events').json_body events_response = self.get('/OS-REVOKE/events').json_body
skipping to change at line 3672 skipping to change at line 3670
auth_req = self.build_authentication_request(token=scoped_token) auth_req = self.build_authentication_request(token=scoped_token)
response = self.v3_create_token(auth_req) response = self.v3_create_token(auth_req)
token2 = response.json_body['token'] token2 = response.json_body['token']
headers2 = {'X-Subject-Token': response.headers['X-Subject-Token']} headers2 = {'X-Subject-Token': response.headers['X-Subject-Token']}
response = self.v3_create_token(auth_req) response = self.v3_create_token(auth_req)
response.json_body['token'] response.json_body['token']
headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']} headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']}
self.head('/auth/tokens', headers=headers, self.head('/auth/tokens', headers=headers,
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', headers=headers2, self.head('/auth/tokens', headers=headers2,
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', headers=headers3, self.head('/auth/tokens', headers=headers3,
expected_status=http_client.OK) expected_status=http.client.OK)
self.delete('/auth/tokens', headers=headers) self.delete('/auth/tokens', headers=headers)
# NOTE(ayoung): not deleting token3, as it should be deleted # NOTE(ayoung): not deleting token3, as it should be deleted
# by previous # by previous
events_response = self.get('/OS-REVOKE/events').json_body events_response = self.get('/OS-REVOKE/events').json_body
events = events_response['events'] events = events_response['events']
self.assertEqual(1, len(events)) self.assertEqual(1, len(events))
self.assertEventDataInList( self.assertEventDataInList(
events, events,
audit_id=token2['audit_ids'][1]) audit_id=token2['audit_ids'][1])
self.head('/auth/tokens', headers=headers, self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.head('/auth/tokens', headers=headers2, self.head('/auth/tokens', headers=headers2,
expected_status=http_client.OK) expected_status=http.client.OK)
self.head('/auth/tokens', headers=headers3, self.head('/auth/tokens', headers=headers3,
expected_status=http_client.OK) expected_status=http.client.OK)
def test_list_with_filter(self): def test_list_with_filter(self):
self.role_data_fixtures() self.role_data_fixtures()
events = self.get('/OS-REVOKE/events').json_body['events'] events = self.get('/OS-REVOKE/events').json_body['events']
self.assertEqual(0, len(events)) self.assertEqual(0, len(events))
scoped_token = self.get_scoped_token() scoped_token = self.get_scoped_token()
headers = {'X-Subject-Token': scoped_token} headers = {'X-Subject-Token': scoped_token}
auth = self.build_authentication_request(token=scoped_token) auth = self.build_authentication_request(token=scoped_token)
skipping to change at line 3732 skipping to change at line 3730
methods=['password', 'token']) methods=['password', 'token'])
def test_remote_user_disabled(self): def test_remote_user_disabled(self):
app = self.loadapp() app = self.loadapp()
remote_user = '%s@%s' % (self.user['name'], self.domain['name']) remote_user = '%s@%s' % (self.user['name'], self.domain['name'])
with app.test_client() as c: with app.test_client() as c:
c.environ_base.update(self.build_external_auth_environ( c.environ_base.update(self.build_external_auth_environ(
remote_user)) remote_user))
auth_data = self.build_authentication_request() auth_data = self.build_authentication_request()
c.post('/v3/auth/tokens', json=auth_data, c.post('/v3/auth/tokens', json=auth_data,
expected_status_code=http_client.UNAUTHORIZED) expected_status_code=http.client.UNAUTHORIZED)
# FIXME(morgan): This test case must be re-worked to function under flask. It # FIXME(morgan): This test case must be re-worked to function under flask. It
# has been commented out until it is re-worked ensuring no issues when webob # has been commented out until it is re-worked ensuring no issues when webob
# classes are removed. # classes are removed.
# https://bugs.launchpad.net/keystone/+bug/1793756 # https://bugs.launchpad.net/keystone/+bug/1793756
# class AuthExternalDomainBehavior(object): # class AuthExternalDomainBehavior(object):
# content_type = 'json' # content_type = 'json'
# #
# def test_remote_user_with_realm(self): # def test_remote_user_with_realm(self):
# api = auth.controllers.Auth() # api = auth.controllers.Auth()
skipping to change at line 3813 skipping to change at line 3811
def auth_plugin_config_override(self, methods=None, **method_classes): def auth_plugin_config_override(self, methods=None, **method_classes):
self.config_fixture.config(group='auth', methods=[]) self.config_fixture.config(group='auth', methods=[])
def test_remote_user_no_method(self): def test_remote_user_no_method(self):
app = self.loadapp() app = self.loadapp()
with app.test_client() as c: with app.test_client() as c:
c.environ_base.update(self.build_external_auth_environ( c.environ_base.update(self.build_external_auth_environ(
self.default_domain_user['name'])) self.default_domain_user['name']))
auth_data = self.build_authentication_request() auth_data = self.build_authentication_request()
c.post('/v3/auth/tokens', json=auth_data, c.post('/v3/auth/tokens', json=auth_data,
expected_status_code=http_client.UNAUTHORIZED) expected_status_code=http.client.UNAUTHORIZED)
class TrustAPIBehavior(test_v3.RestfulTestCase): class TrustAPIBehavior(test_v3.RestfulTestCase):
"""Redelegation valid and secure. """Redelegation valid and secure.
Redelegation is a hierarchical structure of trusts between initial trustor Redelegation is a hierarchical structure of trusts between initial trustor
and a group of users allowed to impersonate trustor and act in his name. and a group of users allowed to impersonate trustor and act in his name.
Hierarchy is created in a process of trusting already trusted permissions Hierarchy is created in a process of trusting already trusted permissions
and organized as an adjacency list using 'redelegated_trust_id' field. and organized as an adjacency list using 'redelegated_trust_id' field.
Redelegation is valid if each subsequent trust in a chain passes 'not more' Redelegation is valid if each subsequent trust in a chain passes 'not more'
permissions than being redelegated. permissions than being redelegated.
skipping to change at line 3894 skipping to change at line 3892
self.redelegated_trust_ref['redelegation_count'] = 0 self.redelegated_trust_ref['redelegation_count'] = 0
r = self.post('/OS-TRUST/trusts', r = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref}) body={'trust': self.redelegated_trust_ref})
trust = self.assertValidTrustResponse(r) trust = self.assertValidTrustResponse(r)
trust_token = self._get_trust_token(trust) trust_token = self._get_trust_token(trust)
# Attempt to create a redelegated trust. # Attempt to create a redelegated trust.
self.post('/OS-TRUST/trusts', self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref}, body={'trust': self.chained_trust_ref},
token=trust_token, token=trust_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_modified_redelegation_count_error(self): def test_modified_redelegation_count_error(self):
r = self.post('/OS-TRUST/trusts', r = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref}) body={'trust': self.redelegated_trust_ref})
trust = self.assertValidTrustResponse(r) trust = self.assertValidTrustResponse(r)
trust_token = self._get_trust_token(trust) trust_token = self._get_trust_token(trust)
# Attempt to create a redelegated trust with incorrect # Attempt to create a redelegated trust with incorrect
# redelegation_count. # redelegation_count.
correct = trust['redelegation_count'] - 1 correct = trust['redelegation_count'] - 1
incorrect = correct - 1 incorrect = correct - 1
self.chained_trust_ref['redelegation_count'] = incorrect self.chained_trust_ref['redelegation_count'] = incorrect
self.post('/OS-TRUST/trusts', self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref}, body={'trust': self.chained_trust_ref},
token=trust_token, token=trust_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_max_redelegation_count_constraint(self): def test_max_redelegation_count_constraint(self):
incorrect = CONF.trust.max_redelegation_count + 1 incorrect = CONF.trust.max_redelegation_count + 1
self.redelegated_trust_ref['redelegation_count'] = incorrect self.redelegated_trust_ref['redelegation_count'] = incorrect
self.post('/OS-TRUST/trusts', self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref}, body={'trust': self.redelegated_trust_ref},
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_redelegation_expiry(self): def test_redelegation_expiry(self):
r = self.post('/OS-TRUST/trusts', r = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref}) body={'trust': self.redelegated_trust_ref})
trust = self.assertValidTrustResponse(r) trust = self.assertValidTrustResponse(r)
trust_token = self._get_trust_token(trust) trust_token = self._get_trust_token(trust)
# Attempt to create a redelegated trust supposed to last longer # Attempt to create a redelegated trust supposed to last longer
# than the parent trust: let's give it 10 minutes (>1 minute). # than the parent trust: let's give it 10 minutes (>1 minute).
too_long_live_chained_trust_ref = unit.new_trust_ref( too_long_live_chained_trust_ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
project_id=self.project_id, project_id=self.project_id,
impersonation=True, impersonation=True,
expires=dict(minutes=10), expires=dict(minutes=10),
role_ids=[self.role_id]) role_ids=[self.role_id])
self.post('/OS-TRUST/trusts', self.post('/OS-TRUST/trusts',
body={'trust': too_long_live_chained_trust_ref}, body={'trust': too_long_live_chained_trust_ref},
token=trust_token, token=trust_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_redelegation_remaining_uses(self): def test_redelegation_remaining_uses(self):
r = self.post('/OS-TRUST/trusts', r = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref}) body={'trust': self.redelegated_trust_ref})
trust = self.assertValidTrustResponse(r) trust = self.assertValidTrustResponse(r)
trust_token = self._get_trust_token(trust) trust_token = self._get_trust_token(trust)
# Attempt to create a redelegated trust with remaining_uses defined. # Attempt to create a redelegated trust with remaining_uses defined.
# It must fail according to specification: remaining_uses must be # It must fail according to specification: remaining_uses must be
# omitted for trust redelegation. Any number here. # omitted for trust redelegation. Any number here.
self.chained_trust_ref['remaining_uses'] = 5 self.chained_trust_ref['remaining_uses'] = 5
self.post('/OS-TRUST/trusts', self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref}, body={'trust': self.chained_trust_ref},
token=trust_token, token=trust_token,
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_roles_subset(self): def test_roles_subset(self):
# Build second role # Build second role
role = unit.new_role_ref() role = unit.new_role_ref()
PROVIDERS.role_api.create_role(role['id'], role) PROVIDERS.role_api.create_role(role['id'], role)
# assign a new role to the user # assign a new role to the user
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
role_id=role['id'], user_id=self.user_id, role_id=role['id'], user_id=self.user_id,
project_id=self.project_id project_id=self.project_id
) )
skipping to change at line 4090 skipping to change at line 4088
) )
# Try to chain a trust with the role not from parent trust # Try to chain a trust with the role not from parent trust
self.chained_trust_ref['roles'] = [{'id': role['id']}] self.chained_trust_ref['roles'] = [{'id': role['id']}]
# Bypass policy enforcement # Bypass policy enforcement
with mock.patch.object(policy, 'enforce', return_value=True): with mock.patch.object(policy, 'enforce', return_value=True):
self.post('/OS-TRUST/trusts', self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref}, body={'trust': self.chained_trust_ref},
token=trust_token, token=trust_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_redelegation_terminator(self): def test_redelegation_terminator(self):
self.redelegated_trust_ref['expires_at'] = ( self.redelegated_trust_ref['expires_at'] = (
datetime.datetime.utcnow().replace(year=2032).strftime( datetime.datetime.utcnow().replace(year=2032).strftime(
unit.TIME_FORMAT)) unit.TIME_FORMAT))
r = self.post('/OS-TRUST/trusts', r = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref}) body={'trust': self.redelegated_trust_ref})
trust = self.assertValidTrustResponse(r) trust = self.assertValidTrustResponse(r)
trust_token = self._get_trust_token(trust) trust_token = self._get_trust_token(trust)
skipping to change at line 4124 skipping to change at line 4122
# Check that allow_redelegation == False caused redelegation_count # Check that allow_redelegation == False caused redelegation_count
# to be set to 0, while allow_redelegation is removed # to be set to 0, while allow_redelegation is removed
self.assertNotIn('allow_redelegation', trust) self.assertNotIn('allow_redelegation', trust)
self.assertEqual(0, trust['redelegation_count']) self.assertEqual(0, trust['redelegation_count'])
trust_token = self._get_trust_token(trust) trust_token = self._get_trust_token(trust)
# Build third trust, same as second # Build third trust, same as second
self.post('/OS-TRUST/trusts', self.post('/OS-TRUST/trusts',
body={'trust': ref}, body={'trust': ref},
token=trust_token, token=trust_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_redelegation_without_impersonation(self): def test_redelegation_without_impersonation(self):
# Update trust to not allow impersonation # Update trust to not allow impersonation
self.redelegated_trust_ref['impersonation'] = False self.redelegated_trust_ref['impersonation'] = False
# Create trust # Create trust
resp = self.post('/OS-TRUST/trusts', resp = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref}, body={'trust': self.redelegated_trust_ref},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
trust = self.assertValidTrustResponse(resp) trust = self.assertValidTrustResponse(resp)
# Get trusted token without impersonation # Get trusted token without impersonation
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'], user_id=self.trustee_user['id'],
password=self.trustee_user['password'], password=self.trustee_user['password'],
trust_id=trust['id']) trust_id=trust['id'])
trust_token = self.get_requested_token(auth_data) trust_token = self.get_requested_token(auth_data)
# Create second user for redelegation # Create second user for redelegation
skipping to change at line 4162 skipping to change at line 4160
impersonation=False, impersonation=False,
expires=dict(minutes=1), expires=dict(minutes=1),
role_ids=[self.role_id], role_ids=[self.role_id],
allow_redelegation=False) allow_redelegation=False)
# Creating a second trust should not be allowed since trustor does not # Creating a second trust should not be allowed since trustor does not
# have the role to delegate thus returning 404 NOT FOUND. # have the role to delegate thus returning 404 NOT FOUND.
resp = self.post('/OS-TRUST/trusts', resp = self.post('/OS-TRUST/trusts',
body={'trust': trust_ref_2}, body={'trust': trust_ref_2},
token=trust_token, token=trust_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_create_unscoped_trust(self): def test_create_unscoped_trust(self):
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id']) trustee_user_id=self.trustee_user['id'])
r = self.post('/OS-TRUST/trusts', body={'trust': ref}) r = self.post('/OS-TRUST/trusts', body={'trust': ref})
self.assertValidTrustResponse(r, ref) self.assertValidTrustResponse(r, ref)
def test_create_trust_no_roles(self): def test_create_trust_no_roles(self):
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
project_id=self.project_id) project_id=self.project_id)
self.post('/OS-TRUST/trusts', body={'trust': ref}, self.post('/OS-TRUST/trusts', body={'trust': ref},
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def _initialize_test_consume_trust(self, count): def _initialize_test_consume_trust(self, count):
# Make sure remaining_uses is decremented as we consume the trust # Make sure remaining_uses is decremented as we consume the trust
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
project_id=self.project_id, project_id=self.project_id,
remaining_uses=count, remaining_uses=count,
role_ids=[self.role_id]) role_ids=[self.role_id])
r = self.post('/OS-TRUST/trusts', body={'trust': ref}) r = self.post('/OS-TRUST/trusts', body={'trust': ref})
skipping to change at line 4229 skipping to change at line 4227
}, },
# We don't need a trust to execute this test, the # We don't need a trust to execute this test, the
# OS-TRUST:trust key of the request body just has to be a # OS-TRUST:trust key of the request body just has to be a
# string instead of a dictionary in order to throw a 500 when # string instead of a dictionary in order to throw a 500 when
# it should a 400 Bad Request. # it should a 400 Bad Request.
'scope': {'OS-TRUST:trust': ''} 'scope': {'OS-TRUST:trust': ''}
} }
} }
self.admin_request( self.admin_request(
method='POST', path='/v3/auth/tokens', body=auth_data, method='POST', path='/v3/auth/tokens', body=auth_data,
expected_status=http_client.BAD_REQUEST expected_status=http.client.BAD_REQUEST
) )
def test_consume_trust_once(self): def test_consume_trust_once(self):
trust = self._initialize_test_consume_trust(2) trust = self._initialize_test_consume_trust(2)
# check decremented value # check decremented value
r = self.get( r = self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
trust = r.result.get('trust') trust = r.result.get('trust')
self.assertIsNotNone(trust) self.assertIsNotNone(trust)
self.assertEqual(1, trust['remaining_uses']) self.assertEqual(1, trust['remaining_uses'])
self.assertEqual(self.role['name'], trust['roles'][0]['name']) self.assertEqual(self.role['name'], trust['roles'][0]['name'])
self.assertEqual(self.role['id'], trust['roles'][0]['id']) self.assertEqual(self.role['id'], trust['roles'][0]['id'])
def test_create_one_time_use_trust(self): def test_create_one_time_use_trust(self):
trust = self._initialize_test_consume_trust(1) trust = self._initialize_test_consume_trust(1)
# No more uses, the trust is made unavailable # No more uses, the trust is made unavailable
self.get( self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
# this time we can't get a trust token # this time we can't get a trust token
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'], user_id=self.trustee_user['id'],
password=self.trustee_user['password'], password=self.trustee_user['password'],
trust_id=trust['id']) trust_id=trust['id'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_create_unlimited_use_trust(self): def test_create_unlimited_use_trust(self):
# by default trusts are unlimited in terms of tokens that can be # by default trusts are unlimited in terms of tokens that can be
# generated from them, this test creates such a trust explicitly # generated from them, this test creates such a trust explicitly
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
project_id=self.project_id, project_id=self.project_id,
remaining_uses=None, remaining_uses=None,
role_ids=[self.role_id]) role_ids=[self.role_id])
skipping to change at line 4316 skipping to change at line 4314
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
project_id=self.project_id, project_id=self.project_id,
impersonation=True, impersonation=True,
expires=dict(minutes=1), expires=dict(minutes=1),
role_ids=[self.role_id]) role_ids=[self.role_id])
self.post('/OS-TRUST/trusts', self.post('/OS-TRUST/trusts',
body={'trust': ref}, body={'trust': ref},
token=trust_token, token=trust_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_trust_deleted_grant(self): def test_trust_deleted_grant(self):
# create a new role # create a new role
role = unit.new_role_ref() role = unit.new_role_ref()
PROVIDERS.role_api.create_role(role['id'], role) PROVIDERS.role_api.create_role(role['id'], role)
grant_url = ( grant_url = (
'/projects/%(project_id)s/users/%(user_id)s/' '/projects/%(project_id)s/users/%(user_id)s/'
'roles/%(role_id)s' % { 'roles/%(role_id)s' % {
'project_id': self.project_id, 'project_id': self.project_id,
skipping to change at line 4355 skipping to change at line 4353
# delete the grant # delete the grant
self.delete(grant_url) self.delete(grant_url)
# attempt to get a trust token with the deleted grant # attempt to get a trust token with the deleted grant
# and ensure it's unauthorized # and ensure it's unauthorized
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'], user_id=self.trustee_user['id'],
password=self.trustee_user['password'], password=self.trustee_user['password'],
trust_id=trust['id']) trust_id=trust['id'])
r = self.v3_create_token(auth_data, r = self.v3_create_token(auth_data,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_trust_chained(self): def test_trust_chained(self):
"""Test that a trust token can't be used to execute another trust. """Test that a trust token can't be used to execute another trust.
To do this, we create an A->B->C hierarchy of trusts, then attempt to To do this, we create an A->B->C hierarchy of trusts, then attempt to
execute the trusts in series (C->B->A). execute the trusts in series (C->B->A).
""" """
# create a sub-trustee user # create a sub-trustee user
sub_trustee_user = unit.create_user( sub_trustee_user = unit.create_user(
skipping to change at line 4422 skipping to change at line 4420
user_id=sub_trustee_user['id'], user_id=sub_trustee_user['id'],
password=sub_trustee_user['password'], password=sub_trustee_user['password'],
trust_id=trust2['id']) trust_id=trust2['id'])
trust_token = self.get_requested_token(auth_data) trust_token = self.get_requested_token(auth_data)
# attempt to get the second trust using a trust token # attempt to get the second trust using a trust token
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
token=trust_token, token=trust_token,
trust_id=trust1['id']) trust_id=trust1['id'])
r = self.v3_create_token(auth_data, r = self.v3_create_token(auth_data,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def assertTrustTokensRevoked(self, trust_id): def assertTrustTokensRevoked(self, trust_id):
revocation_response = self.get('/OS-REVOKE/events') revocation_response = self.get('/OS-REVOKE/events')
revocation_events = revocation_response.json_body['events'] revocation_events = revocation_response.json_body['events']
found = False found = False
for event in revocation_events: for event in revocation_events:
if event.get('OS-TRUST:trust_id') == trust_id: if event.get('OS-TRUST:trust_id') == trust_id:
found = True found = True
self.assertTrue(found, 'event with trust_id %s not found in list' % self.assertTrue(found, 'event with trust_id %s not found in list' %
trust_id) trust_id)
skipping to change at line 4457 skipping to change at line 4455
password=self.trustee_user['password'], password=self.trustee_user['password'],
trust_id=trust_id) trust_id=trust_id)
r = self.v3_create_token(auth_data) r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse( self.assertValidProjectScopedTokenResponse(
r, self.trustee_user) r, self.trustee_user)
trust_token = r.headers['X-Subject-Token'] trust_token = r.headers['X-Subject-Token']
self.delete('/OS-TRUST/trusts/%(trust_id)s' % { self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': trust_id}) 'trust_id': trust_id})
headers = {'X-Subject-Token': trust_token} headers = {'X-Subject-Token': trust_token}
self.head('/auth/tokens', headers=headers, self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.assertTrustTokensRevoked(trust_id) self.assertTrustTokensRevoked(trust_id)
def disable_user(self, user): def disable_user(self, user):
user['enabled'] = False user['enabled'] = False
PROVIDERS.identity_api.update_user(user['id'], user) PROVIDERS.identity_api.update_user(user['id'], user)
def test_trust_get_token_fails_if_trustor_disabled(self): def test_trust_get_token_fails_if_trustor_disabled(self):
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
skipping to change at line 4490 skipping to change at line 4488
trust_id=trust['id']) trust_id=trust['id'])
self.v3_create_token(auth_data) self.v3_create_token(auth_data)
self.disable_user(self.user) self.disable_user(self.user)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'], user_id=self.trustee_user['id'],
password=self.trustee_user['password'], password=self.trustee_user['password'],
trust_id=trust['id']) trust_id=trust['id'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_trust_get_token_fails_if_trustee_disabled(self): def test_trust_get_token_fails_if_trustee_disabled(self):
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
project_id=self.project_id, project_id=self.project_id,
impersonation=False, impersonation=False,
expires=dict(minutes=1), expires=dict(minutes=1),
role_ids=[self.role_id]) role_ids=[self.role_id])
skipping to change at line 4518 skipping to change at line 4516
trust_id=trust['id']) trust_id=trust['id'])
self.v3_create_token(auth_data) self.v3_create_token(auth_data)
self.disable_user(self.trustee_user) self.disable_user(self.trustee_user)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'], user_id=self.trustee_user['id'],
password=self.trustee_user['password'], password=self.trustee_user['password'],
trust_id=trust['id']) trust_id=trust['id'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_delete_trust(self): def test_delete_trust(self):
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
project_id=self.project_id, project_id=self.project_id,
impersonation=False, impersonation=False,
expires=dict(minutes=1), expires=dict(minutes=1),
role_ids=[self.role_id]) role_ids=[self.role_id])
skipping to change at line 4541 skipping to change at line 4539
trust = self.assertValidTrustResponse(r, ref) trust = self.assertValidTrustResponse(r, ref)
self.delete('/OS-TRUST/trusts/%(trust_id)s' % { self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': trust['id']}) 'trust_id': trust['id']})
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'], user_id=self.trustee_user['id'],
password=self.trustee_user['password'], password=self.trustee_user['password'],
trust_id=trust['id']) trust_id=trust['id'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_change_password_invalidates_trust_tokens(self): def test_change_password_invalidates_trust_tokens(self):
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
trustor_user_id=self.user_id, trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'], trustee_user_id=self.trustee_user['id'],
project_id=self.project_id, project_id=self.project_id,
impersonation=True, impersonation=True,
expires=dict(minutes=1), expires=dict(minutes=1),
role_ids=[self.role_id]) role_ids=[self.role_id])
skipping to change at line 4572 skipping to change at line 4570
trust_token = r.headers.get('X-Subject-Token') trust_token = r.headers.get('X-Subject-Token')
self.get('/OS-TRUST/trusts?trustor_user_id=%s' % self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
self.user_id, token=trust_token) self.user_id, token=trust_token)
self.assertValidUserResponse( self.assertValidUserResponse(
self.patch('/users/%s' % self.trustee_user['id'], self.patch('/users/%s' % self.trustee_user['id'],
body={'user': {'password': uuid.uuid4().hex}})) body={'user': {'password': uuid.uuid4().hex}}))
self.get('/OS-TRUST/trusts?trustor_user_id=%s' % self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
self.user_id, expected_status=http_client.UNAUTHORIZED, self.user_id, expected_status=http.client.UNAUTHORIZED,
token=trust_token) token=trust_token)
def test_trustee_can_do_role_ops(self): def test_trustee_can_do_role_ops(self):
resp = self.post('/OS-TRUST/trusts', resp = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref}) body={'trust': self.redelegated_trust_ref})
trust = self.assertValidTrustResponse(resp) trust = self.assertValidTrustResponse(resp)
trust_token = self._get_trust_token(trust) trust_token = self._get_trust_token(trust)
resp = self.get( resp = self.get(
'/OS-TRUST/trusts/%(trust_id)s/roles' % { '/OS-TRUST/trusts/%(trust_id)s/roles' % {
'trust_id': trust['id']}, 'trust_id': trust['id']},
token=trust_token) token=trust_token)
self.assertValidRoleListResponse(resp, self.role) self.assertValidRoleListResponse(resp, self.role)
self.head( self.head(
'/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
'trust_id': trust['id'], 'trust_id': trust['id'],
'role_id': self.role['id']}, 'role_id': self.role['id']},
token=trust_token, token=trust_token,
expected_status=http_client.OK) expected_status=http.client.OK)
resp = self.get( resp = self.get(
'/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
'trust_id': trust['id'], 'trust_id': trust['id'],
'role_id': self.role['id']}, 'role_id': self.role['id']},
token=trust_token) token=trust_token)
self.assertValidRoleResponse(resp, self.role) self.assertValidRoleResponse(resp, self.role)
def test_do_not_consume_remaining_uses_when_get_token_fails(self): def test_do_not_consume_remaining_uses_when_get_token_fails(self):
ref = unit.new_trust_ref( ref = unit.new_trust_ref(
skipping to change at line 4622 skipping to change at line 4620
new_trust = r.result.get('trust') new_trust = r.result.get('trust')
trust_id = new_trust.get('id') trust_id = new_trust.get('id')
# Pass in another user's ID as the trustee, the result being a failed # Pass in another user's ID as the trustee, the result being a failed
# token authenticate and the remaining_uses of the trust should not be # token authenticate and the remaining_uses of the trust should not be
# decremented. # decremented.
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.default_domain_user['id'], user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'], password=self.default_domain_user['password'],
trust_id=trust_id) trust_id=trust_id)
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
r = self.get('/OS-TRUST/trusts/%s' % trust_id) r = self.get('/OS-TRUST/trusts/%s' % trust_id)
self.assertEqual(3, r.result.get('trust').get('remaining_uses')) self.assertEqual(3, r.result.get('trust').get('remaining_uses'))
class TestTrustChain(test_v3.RestfulTestCase): class TestTrustChain(test_v3.RestfulTestCase):
def config_overrides(self): def config_overrides(self):
super(TestTrustChain, self).config_overrides() super(TestTrustChain, self).config_overrides()
self.config_fixture.config( self.config_fixture.config(
group='trust', group='trust',
skipping to change at line 4751 skipping to change at line 4749
self.assertTrue(found, 'event with trust_id %s not found in list' % self.assertTrue(found, 'event with trust_id %s not found in list' %
trust_id) trust_id)
def test_delete_trust_cascade(self): def test_delete_trust_cascade(self):
self.assert_user_authenticate(self.user_list[0]) self.assert_user_authenticate(self.user_list[0])
self.delete('/OS-TRUST/trusts/%(trust_id)s' % { self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': self.trust_chain[0]['id']}) 'trust_id': self.trust_chain[0]['id']})
headers = {'X-Subject-Token': self.last_token} headers = {'X-Subject-Token': self.last_token}
self.head('/auth/tokens', headers=headers, self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
self.assert_trust_tokens_revoked(self.trust_chain[0]['id']) self.assert_trust_tokens_revoked(self.trust_chain[0]['id'])
def test_delete_broken_chain(self): def test_delete_broken_chain(self):
self.assert_user_authenticate(self.user_list[0]) self.assert_user_authenticate(self.user_list[0])
self.delete('/OS-TRUST/trusts/%(trust_id)s' % { self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': self.trust_chain[0]['id']}) 'trust_id': self.trust_chain[0]['id']})
# Verify the two remaining trust have been deleted # Verify the two remaining trust have been deleted
for i in range(len(self.user_list) - 1): for i in range(len(self.user_list) - 1):
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.user_list[i]['id'], user_id=self.user_list[i]['id'],
password=self.user_list[i]['password']) password=self.user_list[i]['password'])
auth_token = self.get_requested_token(auth_data) auth_token = self.get_requested_token(auth_data)
# Assert chained trust have been deleted # Assert chained trust have been deleted
self.get('/OS-TRUST/trusts/%(trust_id)s' % { self.get('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': self.trust_chain[i + 1]['id']}, 'trust_id': self.trust_chain[i + 1]['id']},
token=auth_token, token=auth_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_trustor_roles_revoked(self): def test_trustor_roles_revoked(self):
self.assert_user_authenticate(self.user_list[0]) self.assert_user_authenticate(self.user_list[0])
PROVIDERS.assignment_api.remove_role_from_user_and_project( PROVIDERS.assignment_api.remove_role_from_user_and_project(
self.user_id, self.project_id, self.role_id self.user_id, self.project_id, self.role_id
) )
# Verify that users are not allowed to authenticate with trust # Verify that users are not allowed to authenticate with trust
for i in range(len(self.user_list[1:])): for i in range(len(self.user_list[1:])):
skipping to change at line 4795 skipping to change at line 4793
password=trustee['password']) password=trustee['password'])
# Attempt to authenticate with trust # Attempt to authenticate with trust
token = self.get_requested_token(auth_data) token = self.get_requested_token(auth_data)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
token=token, token=token,
trust_id=self.trust_chain[i - 1]['id']) trust_id=self.trust_chain[i - 1]['id'])
# Trustee has no delegated roles # Trustee has no delegated roles
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_intermediate_user_disabled(self): def test_intermediate_user_disabled(self):
self.assert_user_authenticate(self.user_list[0]) self.assert_user_authenticate(self.user_list[0])
disabled = self.user_list[0] disabled = self.user_list[0]
disabled['enabled'] = False disabled['enabled'] = False
PROVIDERS.identity_api.update_user(disabled['id'], disabled) PROVIDERS.identity_api.update_user(disabled['id'], disabled)
# Bypass policy enforcement # Bypass policy enforcement
with mock.patch.object(policy, 'enforce', return_value=True): with mock.patch.object(policy, 'enforce', return_value=True):
headers = {'X-Subject-Token': self.last_token} headers = {'X-Subject-Token': self.last_token}
self.head('/auth/tokens', headers=headers, self.head('/auth/tokens', headers=headers,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_intermediate_user_deleted(self): def test_intermediate_user_deleted(self):
self.assert_user_authenticate(self.user_list[0]) self.assert_user_authenticate(self.user_list[0])
PROVIDERS.identity_api.delete_user(self.user_list[0]['id']) PROVIDERS.identity_api.delete_user(self.user_list[0]['id'])
# Bypass policy enforcement # Bypass policy enforcement
# Delete trustee will invalidate the trust. # Delete trustee will invalidate the trust.
with mock.patch.object(policy, 'enforce', return_value=True): with mock.patch.object(policy, 'enforce', return_value=True):
headers = {'X-Subject-Token': self.last_token} headers = {'X-Subject-Token': self.last_token}
self.head('/auth/tokens', headers=headers, self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
class TestAuthContext(unit.TestCase): class TestAuthContext(unit.TestCase):
def setUp(self): def setUp(self):
super(TestAuthContext, self).setUp() super(TestAuthContext, self).setUp()
self.auth_context = auth.core.AuthContext() self.auth_context = auth.core.AuthContext()
def test_pick_lowest_expires_at(self): def test_pick_lowest_expires_at(self):
expires_at_1 = utils.isotime(timeutils.utcnow()) expires_at_1 = utils.isotime(timeutils.utcnow())
expires_at_2 = utils.isotime(timeutils.utcnow() + expires_at_2 = utils.isotime(timeutils.utcnow() +
datetime.timedelta(seconds=10)) datetime.timedelta(seconds=10))
skipping to change at line 4878 skipping to change at line 4876
attr_val_1 = uuid.uuid4().hex attr_val_1 = uuid.uuid4().hex
attr_val_2 = uuid.uuid4().hex attr_val_2 = uuid.uuid4().hex
self.auth_context[attr_name] = attr_val_1 self.auth_context[attr_name] = attr_val_1
self.auth_context[attr_name] = attr_val_2 self.auth_context[attr_name] = attr_val_2
self.assertEqual(attr_val_2, self.auth_context[attr_name]) self.assertEqual(attr_val_2, self.auth_context[attr_name])
class TestAuthSpecificData(test_v3.RestfulTestCase): class TestAuthSpecificData(test_v3.RestfulTestCase):
def test_get_catalog_with_project_scoped_token(self): def test_get_catalog_with_project_scoped_token(self):
"""Call ``GET /auth/catalog`` with a project-scoped token.""" """Call ``GET /auth/catalog`` with a project-scoped token."""
r = self.get('/auth/catalog', expected_status=http_client.OK) r = self.get('/auth/catalog', expected_status=http.client.OK)
self.assertValidCatalogResponse(r) self.assertValidCatalogResponse(r)
def test_head_catalog_with_project_scoped_token(self): def test_head_catalog_with_project_scoped_token(self):
"""Call ``HEAD /auth/catalog`` with a project-scoped token.""" """Call ``HEAD /auth/catalog`` with a project-scoped token."""
self.head('/auth/catalog', expected_status=http_client.OK) self.head('/auth/catalog', expected_status=http.client.OK)
def test_get_catalog_with_domain_scoped_token(self): def test_get_catalog_with_domain_scoped_token(self):
"""Call ``GET /auth/catalog`` with a domain-scoped token.""" """Call ``GET /auth/catalog`` with a domain-scoped token."""
# grant a domain role to a user # grant a domain role to a user
self.put(path='/domains/%s/users/%s/roles/%s' % ( self.put(path='/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])) self.domain['id'], self.user['id'], self.role['id']))
self.get( self.get(
'/auth/catalog', '/auth/catalog',
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
domain_id=self.domain['id']), domain_id=self.domain['id']),
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_head_catalog_with_domain_scoped_token(self): def test_head_catalog_with_domain_scoped_token(self):
"""Call ``HEAD /auth/catalog`` with a domain-scoped token.""" """Call ``HEAD /auth/catalog`` with a domain-scoped token."""
# grant a domain role to a user # grant a domain role to a user
self.put(path='/domains/%s/users/%s/roles/%s' % ( self.put(path='/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])) self.domain['id'], self.user['id'], self.role['id']))
self.head( self.head(
'/auth/catalog', '/auth/catalog',
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
domain_id=self.domain['id']), domain_id=self.domain['id']),
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_get_catalog_with_unscoped_token(self): def test_get_catalog_with_unscoped_token(self):
"""Call ``GET /auth/catalog`` with an unscoped token.""" """Call ``GET /auth/catalog`` with an unscoped token."""
self.get( self.get(
'/auth/catalog', '/auth/catalog',
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.default_domain_user['id'], user_id=self.default_domain_user['id'],
password=self.default_domain_user['password']), password=self.default_domain_user['password']),
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_head_catalog_with_unscoped_token(self): def test_head_catalog_with_unscoped_token(self):
"""Call ``HEAD /auth/catalog`` with an unscoped token.""" """Call ``HEAD /auth/catalog`` with an unscoped token."""
self.head( self.head(
'/auth/catalog', '/auth/catalog',
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.default_domain_user['id'], user_id=self.default_domain_user['id'],
password=self.default_domain_user['password']), password=self.default_domain_user['password']),
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_get_catalog_no_token(self): def test_get_catalog_no_token(self):
"""Call ``GET /auth/catalog`` without a token.""" """Call ``GET /auth/catalog`` without a token."""
self.get( self.get(
'/auth/catalog', '/auth/catalog',
noauth=True, noauth=True,
expected_status=http_client.UNAUTHORIZED expected_status=http.client.UNAUTHORIZED
) )
def test_head_catalog_no_token(self): def test_head_catalog_no_token(self):
"""Call ``HEAD /auth/catalog`` without a token.""" """Call ``HEAD /auth/catalog`` without a token."""
self.head( self.head(
'/auth/catalog', '/auth/catalog',
noauth=True, noauth=True,
expected_status=http_client.UNAUTHORIZED expected_status=http.client.UNAUTHORIZED
) )
def test_get_projects_with_project_scoped_token(self): def test_get_projects_with_project_scoped_token(self):
r = self.get('/auth/projects', expected_status=http_client.OK) r = self.get('/auth/projects', expected_status=http.client.OK)
self.assertThat(r.json['projects'], matchers.HasLength(1)) self.assertThat(r.json['projects'], matchers.HasLength(1))
self.assertValidProjectListResponse(r) self.assertValidProjectListResponse(r)
def test_head_projects_with_project_scoped_token(self): def test_head_projects_with_project_scoped_token(self):
self.head('/auth/projects', expected_status=http_client.OK) self.head('/auth/projects', expected_status=http.client.OK)
def test_get_projects_matches_federated_get_projects(self): def test_get_projects_matches_federated_get_projects(self):
# create at least one addition project to make sure it doesn't end up # create at least one addition project to make sure it doesn't end up
# in the response, since the user doesn't have any authorization on it # in the response, since the user doesn't have any authorization on it
ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id) ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
r = self.post('/projects', body={'project': ref}) r = self.post('/projects', body={'project': ref})
unauthorized_project_id = r.json['project']['id'] unauthorized_project_id = r.json['project']['id']
r = self.get('/auth/projects', expected_status=http_client.OK) r = self.get('/auth/projects', expected_status=http.client.OK)
self.assertThat(r.json['projects'], matchers.HasLength(1)) self.assertThat(r.json['projects'], matchers.HasLength(1))
for project in r.json['projects']: for project in r.json['projects']:
self.assertNotEqual(unauthorized_project_id, project['id']) self.assertNotEqual(unauthorized_project_id, project['id'])
expected_project_id = r.json['projects'][0]['id'] expected_project_id = r.json['projects'][0]['id']
# call GET /v3/OS-FEDERATION/projects # call GET /v3/OS-FEDERATION/projects
r = self.get('/OS-FEDERATION/projects', expected_status=http_client.OK) r = self.get('/OS-FEDERATION/projects', expected_status=http.client.OK)
# make sure the response is the same # make sure the response is the same
self.assertThat(r.json['projects'], matchers.HasLength(1)) self.assertThat(r.json['projects'], matchers.HasLength(1))
for project in r.json['projects']: for project in r.json['projects']:
self.assertEqual(expected_project_id, project['id']) self.assertEqual(expected_project_id, project['id'])
def test_get_domains_matches_federated_get_domains(self): def test_get_domains_matches_federated_get_domains(self):
# create at least one addition domain to make sure it doesn't end up # create at least one addition domain to make sure it doesn't end up
# in the response, since the user doesn't have any authorization on it # in the response, since the user doesn't have any authorization on it
ref = unit.new_domain_ref() ref = unit.new_domain_ref()
skipping to change at line 4993 skipping to change at line 4991
ref = unit.new_domain_ref() ref = unit.new_domain_ref()
r = self.post('/domains', body={'domain': ref}) r = self.post('/domains', body={'domain': ref})
authorized_domain_id = r.json['domain']['id'] authorized_domain_id = r.json['domain']['id']
path = '/domains/%(domain_id)s/users/%(user_id)s/roles/%(role_id)s' % { path = '/domains/%(domain_id)s/users/%(user_id)s/roles/%(role_id)s' % {
'domain_id': authorized_domain_id, 'domain_id': authorized_domain_id,
'user_id': self.user_id, 'user_id': self.user_id,
'role_id': self.role_id 'role_id': self.role_id
} }
self.put(path, expected_status=http_client.NO_CONTENT) self.put(path, expected_status=http.client.NO_CONTENT)
r = self.get('/auth/domains', expected_status=http_client.OK) r = self.get('/auth/domains', expected_status=http.client.OK)
self.assertThat(r.json['domains'], matchers.HasLength(1)) self.assertThat(r.json['domains'], matchers.HasLength(1))
self.assertEqual(authorized_domain_id, r.json['domains'][0]['id']) self.assertEqual(authorized_domain_id, r.json['domains'][0]['id'])
self.assertNotEqual(unauthorized_domain_id, r.json['domains'][0]['id']) self.assertNotEqual(unauthorized_domain_id, r.json['domains'][0]['id'])
# call GET /v3/OS-FEDERATION/domains # call GET /v3/OS-FEDERATION/domains
r = self.get('/OS-FEDERATION/domains', expected_status=http_client.OK) r = self.get('/OS-FEDERATION/domains', expected_status=http.client.OK)
# make sure the response is the same # make sure the response is the same
self.assertThat(r.json['domains'], matchers.HasLength(1)) self.assertThat(r.json['domains'], matchers.HasLength(1))
self.assertEqual(authorized_domain_id, r.json['domains'][0]['id']) self.assertEqual(authorized_domain_id, r.json['domains'][0]['id'])
self.assertNotEqual(unauthorized_domain_id, r.json['domains'][0]['id']) self.assertNotEqual(unauthorized_domain_id, r.json['domains'][0]['id'])
def test_get_domains_with_project_scoped_token(self): def test_get_domains_with_project_scoped_token(self):
self.put(path='/domains/%s/users/%s/roles/%s' % ( self.put(path='/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])) self.domain['id'], self.user['id'], self.role['id']))
r = self.get('/auth/domains', expected_status=http_client.OK) r = self.get('/auth/domains', expected_status=http.client.OK)
self.assertThat(r.json['domains'], matchers.HasLength(1)) self.assertThat(r.json['domains'], matchers.HasLength(1))
self.assertValidDomainListResponse(r) self.assertValidDomainListResponse(r)
def test_head_domains_with_project_scoped_token(self): def test_head_domains_with_project_scoped_token(self):
self.put(path='/domains/%s/users/%s/roles/%s' % ( self.put(path='/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])) self.domain['id'], self.user['id'], self.role['id']))
self.head('/auth/domains', expected_status=http_client.OK) self.head('/auth/domains', expected_status=http.client.OK)
def test_get_system_roles_with_unscoped_token(self): def test_get_system_roles_with_unscoped_token(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % { path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': self.role_id 'role_id': self.role_id
} }
self.put(path=path) self.put(path=path)
unscoped_request = self.build_authentication_request( unscoped_request = self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'] user_id=self.user['id'], password=self.user['password']
) )
r = self.post('/auth/tokens', body=unscoped_request) r = self.post('/auth/tokens', body=unscoped_request)
unscoped_token = r.headers.get('X-Subject-Token') unscoped_token = r.headers.get('X-Subject-Token')
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
response = self.get('/auth/system', token=unscoped_token) response = self.get('/auth/system', token=unscoped_token)
self.assertTrue(response.json_body['system'][0]['all']) self.assertTrue(response.json_body['system'][0]['all'])
self.head( self.head(
'/auth/system', token=unscoped_token, '/auth/system', token=unscoped_token,
expected_status=http_client.OK expected_status=http.client.OK
) )
def test_get_system_roles_returns_empty_list_without_system_roles(self): def test_get_system_roles_returns_empty_list_without_system_roles(self):
# A user without a system role assignment shouldn't expect an empty # A user without a system role assignment shouldn't expect an empty
# list when calling /v3/auth/system regardless of calling the API with # list when calling /v3/auth/system regardless of calling the API with
# an unscoped token or a project-scoped token. # an unscoped token or a project-scoped token.
unscoped_request = self.build_authentication_request( unscoped_request = self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'] user_id=self.user['id'], password=self.user['password']
) )
r = self.post('/auth/tokens', body=unscoped_request) r = self.post('/auth/tokens', body=unscoped_request)
unscoped_token = r.headers.get('X-Subject-Token') unscoped_token = r.headers.get('X-Subject-Token')
self.assertValidUnscopedTokenResponse(r) self.assertValidUnscopedTokenResponse(r)
response = self.get('/auth/system', token=unscoped_token) response = self.get('/auth/system', token=unscoped_token)
self.assertEqual(response.json_body['system'], []) self.assertEqual(response.json_body['system'], [])
self.head( self.head(
'/auth/system', token=unscoped_token, '/auth/system', token=unscoped_token,
expected_status=http_client.OK expected_status=http.client.OK
) )
project_scoped_request = self.build_authentication_request( project_scoped_request = self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'], user_id=self.user['id'], password=self.user['password'],
project_id=self.project_id project_id=self.project_id
) )
r = self.post('/auth/tokens', body=project_scoped_request) r = self.post('/auth/tokens', body=project_scoped_request)
project_scoped_token = r.headers.get('X-Subject-Token') project_scoped_token = r.headers.get('X-Subject-Token')
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
response = self.get('/auth/system', token=project_scoped_token) response = self.get('/auth/system', token=project_scoped_token)
self.assertEqual(response.json_body['system'], []) self.assertEqual(response.json_body['system'], [])
self.head( self.head(
'/auth/system', token=project_scoped_token, '/auth/system', token=project_scoped_token,
expected_status=http_client.OK expected_status=http.client.OK
) )
def test_get_system_roles_with_project_scoped_token(self): def test_get_system_roles_with_project_scoped_token(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % { path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': self.role_id 'role_id': self.role_id
} }
self.put(path=path) self.put(path=path)
self.put(path='/domains/%s/users/%s/roles/%s' % ( self.put(path='/domains/%s/users/%s/roles/%s' % (
skipping to change at line 5094 skipping to change at line 5092
user_id=self.user['id'], password=self.user['password'], user_id=self.user['id'], password=self.user['password'],
domain_id=self.domain['id'] domain_id=self.domain['id']
) )
r = self.post('/auth/tokens', body=domain_scoped_request) r = self.post('/auth/tokens', body=domain_scoped_request)
domain_scoped_token = r.headers.get('X-Subject-Token') domain_scoped_token = r.headers.get('X-Subject-Token')
self.assertValidDomainScopedTokenResponse(r) self.assertValidDomainScopedTokenResponse(r)
response = self.get('/auth/system', token=domain_scoped_token) response = self.get('/auth/system', token=domain_scoped_token)
self.assertTrue(response.json_body['system'][0]['all']) self.assertTrue(response.json_body['system'][0]['all'])
self.head( self.head(
'/auth/system', token=domain_scoped_token, '/auth/system', token=domain_scoped_token,
expected_status=http_client.OK expected_status=http.client.OK
) )
def test_get_system_roles_with_domain_scoped_token(self): def test_get_system_roles_with_domain_scoped_token(self):
path = '/system/users/%(user_id)s/roles/%(role_id)s' % { path = '/system/users/%(user_id)s/roles/%(role_id)s' % {
'user_id': self.user['id'], 'user_id': self.user['id'],
'role_id': self.role_id 'role_id': self.role_id
} }
self.put(path=path) self.put(path=path)
project_scoped_request = self.build_authentication_request( project_scoped_request = self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'], user_id=self.user['id'], password=self.user['password'],
project_id=self.project_id project_id=self.project_id
) )
r = self.post('/auth/tokens', body=project_scoped_request) r = self.post('/auth/tokens', body=project_scoped_request)
project_scoped_token = r.headers.get('X-Subject-Token') project_scoped_token = r.headers.get('X-Subject-Token')
self.assertValidProjectScopedTokenResponse(r) self.assertValidProjectScopedTokenResponse(r)
response = self.get('/auth/system', token=project_scoped_token) response = self.get('/auth/system', token=project_scoped_token)
self.assertTrue(response.json_body['system'][0]['all']) self.assertTrue(response.json_body['system'][0]['all'])
self.head( self.head(
'/auth/system', token=project_scoped_token, '/auth/system', token=project_scoped_token,
expected_status=http_client.OK expected_status=http.client.OK
) )
class TestTrustAuthFernetTokenProvider(TrustAPIBehavior, TestTrustChain): class TestTrustAuthFernetTokenProvider(TrustAPIBehavior, TestTrustChain):
def config_overrides(self): def config_overrides(self):
super(TestTrustAuthFernetTokenProvider, self).config_overrides() super(TestTrustAuthFernetTokenProvider, self).config_overrides()
self.config_fixture.config(group='token', self.config_fixture.config(group='token',
provider='fernet', provider='fernet',
revoke_by_id=False) revoke_by_id=False)
self.config_fixture.config(group='trust') self.config_fixture.config(group='trust')
self.useFixture( self.useFixture(
skipping to change at line 5199 skipping to change at line 5197
def cleanup(self): def cleanup(self):
totp_creds = PROVIDERS.credential_api.list_credentials_for_user( totp_creds = PROVIDERS.credential_api.list_credentials_for_user(
self.default_domain_user['id'], type='totp') self.default_domain_user['id'], type='totp')
other_creds = PROVIDERS.credential_api.list_credentials_for_user( other_creds = PROVIDERS.credential_api.list_credentials_for_user(
self.default_domain_user['id'], type='other') self.default_domain_user['id'], type='other')
for cred in itertools.chain(other_creds, totp_creds): for cred in itertools.chain(other_creds, totp_creds):
self.delete('/credentials/%s' % cred['id'], self.delete('/credentials/%s' % cred['id'],
expected_status=http_client.NO_CONTENT) expected_status=http.client.NO_CONTENT)
def test_with_a_valid_passcode(self): def test_with_a_valid_passcode(self):
creds = self._make_credentials('totp') creds = self._make_credentials('totp')
secret = creds[-1]['blob'] secret = creds[-1]['blob']
# Stop the clock otherwise there is a chance of auth failure due to # Stop the clock otherwise there is a chance of auth failure due to
# getting a different TOTP between the call here and the call in the # getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0]) totp._generate_totp_passcodes(secret)[0])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_with_an_expired_passcode(self): def test_with_an_expired_passcode(self):
creds = self._make_credentials('totp') creds = self._make_credentials('totp')
secret = creds[-1]['blob'] secret = creds[-1]['blob']
past = datetime.datetime.utcnow() - datetime.timedelta(minutes=2) past = datetime.datetime.utcnow() - datetime.timedelta(minutes=2)
with freezegun.freeze_time(past): with freezegun.freeze_time(past):
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0]) totp._generate_totp_passcodes(secret)[0])
# Stop the clock otherwise there is a chance of accidental success due # Stop the clock otherwise there is a chance of accidental success due
# to getting a different TOTP between the call here and the call in the # to getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_with_an_expired_passcode_no_previous_windows(self): def test_with_an_expired_passcode_no_previous_windows(self):
self.config_fixture.config(group='totp', self.config_fixture.config(group='totp',
included_previous_windows=0) included_previous_windows=0)
creds = self._make_credentials('totp') creds = self._make_credentials('totp')
secret = creds[-1]['blob'] secret = creds[-1]['blob']
past = datetime.datetime.utcnow() - datetime.timedelta(seconds=30) past = datetime.datetime.utcnow() - datetime.timedelta(seconds=30)
with freezegun.freeze_time(past): with freezegun.freeze_time(past):
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0]) totp._generate_totp_passcodes(secret)[0])
# Stop the clock otherwise there is a chance of accidental success due # Stop the clock otherwise there is a chance of accidental success due
# to getting a different TOTP between the call here and the call in the # to getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_with_passcode_no_previous_windows(self): def test_with_passcode_no_previous_windows(self):
self.config_fixture.config(group='totp', self.config_fixture.config(group='totp',
included_previous_windows=0) included_previous_windows=0)
creds = self._make_credentials('totp') creds = self._make_credentials('totp')
secret = creds[-1]['blob'] secret = creds[-1]['blob']
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0]) totp._generate_totp_passcodes(secret)[0])
# Stop the clock otherwise there is a chance of auth failure due to # Stop the clock otherwise there is a chance of auth failure due to
# getting a different TOTP between the call here and the call in the # getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_with_passcode_in_previous_windows_default(self): def test_with_passcode_in_previous_windows_default(self):
"""Confirm previous window default of 1 works.""" """Confirm previous window default of 1 works."""
creds = self._make_credentials('totp') creds = self._make_credentials('totp')
secret = creds[-1]['blob'] secret = creds[-1]['blob']
past = datetime.datetime.utcnow() - datetime.timedelta(seconds=30) past = datetime.datetime.utcnow() - datetime.timedelta(seconds=30)
with freezegun.freeze_time(past): with freezegun.freeze_time(past):
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0]) totp._generate_totp_passcodes(secret)[0])
# Stop the clock otherwise there is a chance of auth failure due to # Stop the clock otherwise there is a chance of auth failure due to
# getting a different TOTP between the call here and the call in the # getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_with_passcode_in_previous_windows_extended(self): def test_with_passcode_in_previous_windows_extended(self):
self.config_fixture.config(group='totp', self.config_fixture.config(group='totp',
included_previous_windows=4) included_previous_windows=4)
creds = self._make_credentials('totp') creds = self._make_credentials('totp')
secret = creds[-1]['blob'] secret = creds[-1]['blob']
past = datetime.datetime.utcnow() - datetime.timedelta(minutes=2) past = datetime.datetime.utcnow() - datetime.timedelta(minutes=2)
with freezegun.freeze_time(past): self.useFixture(fixture.TimeFixture(past))
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0]) totp._generate_totp_passcodes(secret)[0])
# Stop the clock otherwise there is a chance of auth failure due to # Stop the clock otherwise there is a chance of auth failure due to
# getting a different TOTP between the call here and the call in the # getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_with_an_invalid_passcode_and_user_credentials(self): def test_with_an_invalid_passcode_and_user_credentials(self):
self._make_credentials('totp') self._make_credentials('totp')
auth_data = self._make_auth_data_by_id('000000') auth_data = self._make_auth_data_by_id('000000')
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_with_an_invalid_passcode_with_no_user_credentials(self): def test_with_an_invalid_passcode_with_no_user_credentials(self):
auth_data = self._make_auth_data_by_id('000000') auth_data = self._make_auth_data_by_id('000000')
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_with_a_corrupt_totp_credential(self): def test_with_a_corrupt_totp_credential(self):
self._make_credentials('totp', count=1, blob='0') self._make_credentials('totp', count=1, blob='0')
auth_data = self._make_auth_data_by_id('000000') auth_data = self._make_auth_data_by_id('000000')
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_with_multiple_credentials(self): def test_with_multiple_credentials(self):
self._make_credentials('other', 3) self._make_credentials('other', 3)
creds = self._make_credentials('totp', count=3) creds = self._make_credentials('totp', count=3)
secret = creds[-1]['blob'] secret = creds[-1]['blob']
# Stop the clock otherwise there is a chance of auth failure due to # Stop the clock otherwise there is a chance of auth failure due to
# getting a different TOTP between the call here and the call in the # getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0]) totp._generate_totp_passcodes(secret)[0])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_with_multiple_users(self): def test_with_multiple_users(self):
# make some credentials for the existing user # make some credentials for the existing user
self._make_credentials('totp', count=3) self._make_credentials('totp', count=3)
# create a new user and their credentials # create a new user and their credentials
user = unit.create_user( user = unit.create_user(
PROVIDERS.identity_api, domain_id=self.domain_id PROVIDERS.identity_api, domain_id=self.domain_id
) )
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
skipping to change at line 5354 skipping to change at line 5352
creds = self._make_credentials('totp', count=1, user_id=user['id']) creds = self._make_credentials('totp', count=1, user_id=user['id'])
secret = creds[-1]['blob'] secret = creds[-1]['blob']
# Stop the clock otherwise there is a chance of auth failure due to # Stop the clock otherwise there is a chance of auth failure due to
# getting a different TOTP between the call here and the call in the # getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0], user_id=user['id']) totp._generate_totp_passcodes(secret)[0], user_id=user['id'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_with_multiple_users_and_invalid_credentials(self): def test_with_multiple_users_and_invalid_credentials(self):
"""Prevent logging in with someone else's credentials. """Prevent logging in with someone else's credentials.
It's very easy to forget to limit the credentials query by user. It's very easy to forget to limit the credentials query by user.
Let's just test it for a sanity check. Let's just test it for a sanity check.
""" """
# make some credentials for the existing user # make some credentials for the existing user
self._make_credentials('totp', count=3) self._make_credentials('totp', count=3)
skipping to change at line 5381 skipping to change at line 5379
) )
user2_creds = self._make_credentials( user2_creds = self._make_credentials(
'totp', count=1, user_id=new_user['id']) 'totp', count=1, user_id=new_user['id'])
user_id = self.default_domain_user['id'] # user1 user_id = self.default_domain_user['id'] # user1
secret = user2_creds[-1]['blob'] secret = user2_creds[-1]['blob']
auth_data = self._make_auth_data_by_id( auth_data = self._make_auth_data_by_id(
totp._generate_totp_passcodes(secret)[0], user_id=user_id) totp._generate_totp_passcodes(secret)[0], user_id=user_id)
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_with_username_and_domain_id(self): def test_with_username_and_domain_id(self):
creds = self._make_credentials('totp') creds = self._make_credentials('totp')
secret = creds[-1]['blob'] secret = creds[-1]['blob']
# Stop the clock otherwise there is a chance of auth failure due to # Stop the clock otherwise there is a chance of auth failure due to
# getting a different TOTP between the call here and the call in the # getting a different TOTP between the call here and the call in the
# auth plugin. # auth plugin.
self.useFixture(fixture.TimeFixture()) self.useFixture(fixture.TimeFixture())
auth_data = self._make_auth_data_by_name( auth_data = self._make_auth_data_by_name(
totp._generate_totp_passcodes(secret)[0], totp._generate_totp_passcodes(secret)[0],
username=self.default_domain_user['name'], username=self.default_domain_user['name'],
user_domain_id=self.default_domain_user['domain_id']) user_domain_id=self.default_domain_user['domain_id'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_generated_passcode_is_correct_format(self): def test_generated_passcode_is_correct_format(self):
secret = self._make_credentials('totp')[-1]['blob'] secret = self._make_credentials('totp')[-1]['blob']
passcode = totp._generate_totp_passcodes(secret)[0] passcode = totp._generate_totp_passcodes(secret)[0]
reg = re.compile(r'^-?[0-9]+$') reg = re.compile(r'^-?[0-9]+$')
self.assertTrue(reg.match(passcode)) self.assertTrue(reg.match(passcode))
class TestFetchRevocationList(test_v3.RestfulTestCase): class TestFetchRevocationList(test_v3.RestfulTestCase):
"""Test fetch token revocation list on the v3 Identity API.""" """Test fetch token revocation list on the v3 Identity API."""
def config_overrides(self): def config_overrides(self):
super(TestFetchRevocationList, self).config_overrides() super(TestFetchRevocationList, self).config_overrides()
self.config_fixture.config(group='token', revoke_by_id=True) self.config_fixture.config(group='token', revoke_by_id=True)
def test_get_ids_no_tokens_returns_forbidden(self): def test_get_ids_no_tokens_returns_forbidden(self):
# NOTE(vishakha): Since this API is deprecated and isn't supported. # NOTE(vishakha): Since this API is deprecated and isn't supported.
# Returning a 403 till API is removed. If API is removed a 410 # Returning a 403 till API is removed. If API is removed a 410
# can be returned. # can be returned.
self.get( self.get(
'/auth/tokens/OS-PKI/revoked', '/auth/tokens/OS-PKI/revoked',
expected_status=http_client.FORBIDDEN expected_status=http.client.FORBIDDEN
) )
def test_head_ids_no_tokens_returns_forbidden(self): def test_head_ids_no_tokens_returns_forbidden(self):
# NOTE(vishakha): Since this API is deprecated and isn't supported. # NOTE(vishakha): Since this API is deprecated and isn't supported.
# Returning a 403 till API is removed. If API is removed a 410 # Returning a 403 till API is removed. If API is removed a 410
# can be returned. # can be returned.
self.head( self.head(
'/auth/tokens/OS-PKI/revoked', '/auth/tokens/OS-PKI/revoked',
expected_status=http_client.FORBIDDEN expected_status=http.client.FORBIDDEN
) )
class ApplicationCredentialAuth(test_v3.RestfulTestCase): class ApplicationCredentialAuth(test_v3.RestfulTestCase):
def setUp(self): def setUp(self):
super(ApplicationCredentialAuth, self).setUp() super(ApplicationCredentialAuth, self).setUp()
self.app_cred_api = PROVIDERS.application_credential_api self.app_cred_api = PROVIDERS.application_credential_api
def config_overrides(self): def config_overrides(self):
super(ApplicationCredentialAuth, self).config_overrides() super(ApplicationCredentialAuth, self).config_overrides()
skipping to change at line 5458 skipping to change at line 5456
'project_id': self.project['id'], 'project_id': self.project['id'],
'description': uuid.uuid4().hex, 'description': uuid.uuid4().hex,
'roles': roles 'roles': roles
} }
if expires: if expires:
data['expires_at'] = expires data['expires_at'] = expires
if access_rules: if access_rules:
data['access_rules'] = access_rules data['access_rules'] = access_rules
return data return data
def _validate_token(self, token, headers=None, expected_status=http_client.O def _validate_token(self, token, headers=None,
K): expected_status=http.client.OK):
path = '/v3/auth/tokens' path = '/v3/auth/tokens'
headers = headers or {} headers = headers or {}
headers.update({'X-Auth-Token': token, 'X-Subject-Token': token}) headers.update({'X-Auth-Token': token, 'X-Subject-Token': token})
with self.test_client() as c: with self.test_client() as c:
resp = c.get(path, headers=headers, resp = c.get(path, headers=headers,
expected_status_code=expected_status) expected_status_code=expected_status)
return resp return resp
def test_valid_application_credential_succeeds(self): def test_valid_application_credential_succeeds(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_validate_application_credential_token_populates_restricted(self): def test_validate_application_credential_token_populates_restricted(self):
self.config_fixture.config(group='token', cache_on_issue=False) self.config_fixture.config(group='token', cache_on_issue=False)
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
auth_response = self.v3_create_token( auth_response = self.v3_create_token(
auth_data, expected_status=http_client.CREATED) auth_data, expected_status=http.client.CREATED)
self.assertTrue( self.assertTrue(
auth_response.json['token']['application_credential']['restricted'] auth_response.json['token']['application_credential']['restricted']
) )
token_id = auth_response.headers.get('X-Subject-Token') token_id = auth_response.headers.get('X-Subject-Token')
headers = {'X-Auth-Token': token_id, 'X-Subject-Token': token_id} headers = {'X-Auth-Token': token_id, 'X-Subject-Token': token_id}
validate_response = self.get( validate_response = self.get(
'/auth/tokens', headers=headers '/auth/tokens', headers=headers
).json_body ).json_body
self.assertTrue( self.assertTrue(
validate_response['token']['application_credential']['restricted'] validate_response['token']['application_credential']['restricted']
) )
def test_valid_application_credential_with_name_succeeds(self): def test_valid_application_credential_with_name_succeeds(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_name=app_cred_ref['name'], secret=app_cred_ref['secret'], app_cred_name=app_cred_ref['name'], secret=app_cred_ref['secret'],
user_id=self.user['id']) user_id=self.user['id'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_valid_application_credential_name_and_username_succeeds(self): def test_valid_application_credential_name_and_username_succeeds(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_name=app_cred_ref['name'], secret=app_cred_ref['secret'], app_cred_name=app_cred_ref['name'], secret=app_cred_ref['secret'],
username=self.user['name'], user_domain_id=self.user['domain_id']) username=self.user['name'], user_domain_id=self.user['domain_id'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_application_credential_with_invalid_secret_fails(self): def test_application_credential_with_invalid_secret_fails(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret='badsecret') app_cred_id=app_cred_ref['id'], secret='badsecret')
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_unexpired_application_credential_succeeds(self): def test_unexpired_application_credential_succeeds(self):
expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=1) expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
app_cred = self._make_app_cred(expires=expires_at) app_cred = self._make_app_cred(expires=expires_at)
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_expired_application_credential_fails(self): def test_expired_application_credential_fails(self):
expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=1) expires_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
app_cred = self._make_app_cred(expires=expires_at) app_cred = self._make_app_cred(expires=expires_at)
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
future = datetime.datetime.utcnow() + datetime.timedelta(minutes=2) future = datetime.datetime.utcnow() + datetime.timedelta(minutes=2)
with freezegun.freeze_time(future): with freezegun.freeze_time(future):
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_application_credential_fails_when_user_deleted(self): def test_application_credential_fails_when_user_deleted(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
PROVIDERS.identity_api.delete_user(self.user['id']) PROVIDERS.identity_api.delete_user(self.user['id'])
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.NOT_FOUND) self.v3_create_token(auth_data, expected_status=http.client.NOT_FOUND)
def test_application_credential_fails_when_user_disabled(self): def test_application_credential_fails_when_user_disabled(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
PROVIDERS.identity_api.update_user(self.user['id'], PROVIDERS.identity_api.update_user(self.user['id'],
{'enabled': False}) {'enabled': False})
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, self.v3_create_token(auth_data,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_application_credential_fails_when_project_deleted(self): def test_application_credential_fails_when_project_deleted(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
PROVIDERS.resource_api.delete_project(self.project['id']) PROVIDERS.resource_api.delete_project(self.project['id'])
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.NOT_FOUND) self.v3_create_token(auth_data, expected_status=http.client.NOT_FOUND)
def test_application_credential_fails_when_role_deleted(self): def test_application_credential_fails_when_role_deleted(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
PROVIDERS.role_api.delete_role(self.role_id) PROVIDERS.role_api.delete_role(self.role_id)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.NOT_FOUND) self.v3_create_token(auth_data, expected_status=http.client.NOT_FOUND)
def test_application_credential_fails_when_role_unassigned(self): def test_application_credential_fails_when_role_unassigned(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
PROVIDERS.assignment_api.remove_role_from_user_and_project( PROVIDERS.assignment_api.remove_role_from_user_and_project(
self.user['id'], self.project['id'], self.user['id'], self.project['id'],
self.role_id) self.role_id)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.NOT_FOUND) self.v3_create_token(auth_data, expected_status=http.client.NOT_FOUND)
def test_application_credential_through_group_membership(self): def test_application_credential_through_group_membership(self):
user1 = unit.create_user( user1 = unit.create_user(
PROVIDERS.identity_api, domain_id=self.domain_id PROVIDERS.identity_api, domain_id=self.domain_id
) )
group1 = unit.new_group_ref(domain_id=self.domain_id) group1 = unit.new_group_ref(domain_id=self.domain_id)
group1 = PROVIDERS.identity_api.create_group(group1) group1 = PROVIDERS.identity_api.create_group(group1)
PROVIDERS.identity_api.add_user_to_group( PROVIDERS.identity_api.add_user_to_group(
skipping to change at line 5623 skipping to change at line 5622
'project_id': self.project_id, 'project_id': self.project_id,
'description': uuid.uuid4().hex, 'description': uuid.uuid4().hex,
'roles': [{'id': self.role_id}] 'roles': [{'id': self.role_id}]
} }
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
self.v3_create_token(auth_data, expected_status=http_client.CREATED) self.v3_create_token(auth_data, expected_status=http.client.CREATED)
def test_application_credential_cannot_scope(self): def test_application_credential_cannot_scope(self):
app_cred = self._make_app_cred() app_cred = self._make_app_cred()
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
new_project_ref = unit.new_project_ref(domain_id=self.domain_id) new_project_ref = unit.new_project_ref(domain_id=self.domain_id)
# Create a new project and assign the user a valid role on it # Create a new project and assign the user a valid role on it
new_project = PROVIDERS.resource_api.create_project( new_project = PROVIDERS.resource_api.create_project(
new_project_ref['id'], new_project_ref) new_project_ref['id'], new_project_ref)
PROVIDERS.assignment_api.add_role_to_user_and_project( PROVIDERS.assignment_api.add_role_to_user_and_project(
skipping to change at line 5648 skipping to change at line 5647
password=self.user['password'], password=self.user['password'],
project_id=new_project['id']) project_id=new_project['id'])
password_response = self.v3_create_token(password_auth) password_response = self.v3_create_token(password_auth)
self.assertValidProjectScopedTokenResponse(password_response) self.assertValidProjectScopedTokenResponse(password_response)
# Should not be able to use that scope with an application credential # Should not be able to use that scope with an application credential
# even though the user has a valid assignment on it # even though the user has a valid assignment on it
app_cred_auth = self.build_authentication_request( app_cred_auth = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'], app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'],
project_id=new_project['id']) project_id=new_project['id'])
self.v3_create_token(app_cred_auth, self.v3_create_token(app_cred_auth,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_application_credential_with_access_rules(self): def test_application_credential_with_access_rules(self):
access_rules = [ access_rules = [
{ {
'id': uuid.uuid4().hex, 'id': uuid.uuid4().hex,
'path': '/v2.1/servers', 'path': '/v2.1/servers',
'method': 'POST', 'method': 'POST',
'service': uuid.uuid4().hex, 'service': uuid.uuid4().hex,
} }
] ]
app_cred = self._make_app_cred(access_rules=access_rules) app_cred = self._make_app_cred(access_rules=access_rules)
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
resp = self.v3_create_token(auth_data, resp = self.v3_create_token(auth_data,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
token = resp.headers.get('X-Subject-Token') token = resp.headers.get('X-Subject-Token')
headers = {'OpenStack-Identity-Access-Rules': '1.0'} headers = {'OpenStack-Identity-Access-Rules': '1.0'}
self._validate_token(token, headers=headers) self._validate_token(token, headers=headers)
def test_application_credential_access_rules_without_header_fails(self): def test_application_credential_access_rules_without_header_fails(self):
access_rules = [ access_rules = [
{ {
'id': uuid.uuid4().hex, 'id': uuid.uuid4().hex,
'path': '/v2.1/servers', 'path': '/v2.1/servers',
'method': 'POST', 'method': 'POST',
'service': uuid.uuid4().hex, 'service': uuid.uuid4().hex,
} }
] ]
app_cred = self._make_app_cred(access_rules=access_rules) app_cred = self._make_app_cred(access_rules=access_rules)
app_cred_ref = self.app_cred_api.create_application_credential( app_cred_ref = self.app_cred_api.create_application_credential(
app_cred) app_cred)
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret']) app_cred_id=app_cred_ref['id'], secret=app_cred_ref['secret'])
resp = self.v3_create_token(auth_data, resp = self.v3_create_token(auth_data,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
token = resp.headers.get('X-Subject-Token') token = resp.headers.get('X-Subject-Token')
self._validate_token(token, expected_status=http_client.NOT_FOUND) self._validate_token(token, expected_status=http.client.NOT_FOUND)
 End of changes. 229 change blocks. 
232 lines changed or deleted 230 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)