"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "keystone/tests/unit/test_v3_credential.py" between
keystone-16.0.0.tar.gz and keystone-16.0.1.tar.gz

About: OpenStack Keystone (Core Service: Identity) provides an authentication and authorization service for other OpenStack services. Provides a catalog of endpoints for all OpenStack services.
The "Train" series (latest release).

test_v3_credential.py  (keystone-16.0.0):test_v3_credential.py  (keystone-16.0.1)
skipping to change at line 22 skipping to change at line 22
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import hashlib import hashlib
import json import json
import uuid import uuid
from keystoneclient.contrib.ec2 import utils as ec2_utils from keystoneclient.contrib.ec2 import utils as ec2_utils
import mock import mock
from oslo_db import exception as oslo_db_exception from oslo_db import exception as oslo_db_exception
from six.moves import http_client from six.moves import http_client, urllib
from testtools import matchers from testtools import matchers
from keystone.api import ec2tokens from keystone.api import ec2tokens
from keystone.common import provider_api from keystone.common import provider_api
from keystone.common import utils from keystone.common import utils
from keystone.credential.providers import fernet as credential_fernet from keystone.credential.providers import fernet as credential_fernet
from keystone import exception from keystone import exception
from keystone import oauth1
from keystone.tests import unit from keystone.tests import unit
from keystone.tests.unit import ksfixtures from keystone.tests.unit import ksfixtures
from keystone.tests.unit import test_v3 from keystone.tests.unit import test_v3
PROVIDERS = provider_api.ProviderAPIs PROVIDERS = provider_api.ProviderAPIs
CRED_TYPE_EC2 = ec2tokens.CRED_TYPE_EC2 CRED_TYPE_EC2 = ec2tokens.CRED_TYPE_EC2
class CredentialBaseTestCase(test_v3.RestfulTestCase): class CredentialBaseTestCase(test_v3.RestfulTestCase):
def setUp(self): def setUp(self):
skipping to change at line 64 skipping to change at line 65
# This means we can test the dict->json workaround, added # This means we can test the dict->json workaround, added
# as part of the bugfix for backwards compatibility works. # as part of the bugfix for backwards compatibility works.
credential['blob'] = blob credential['blob'] = blob
credential_id = credential['id'] credential_id = credential['id']
# Create direct via the DB API to avoid validation failure # Create direct via the DB API to avoid validation failure
PROVIDERS.credential_api.create_credential(credential_id, credential) PROVIDERS.credential_api.create_credential(credential_id, credential)
return json.dumps(blob), credential_id return json.dumps(blob), credential_id
def _test_get_token(self, access, secret):
"""Test signature validation with the access/secret provided."""
signer = ec2_utils.Ec2Signer(secret)
params = {'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
'AWSAccessKeyId': access}
request = {'host': 'foo',
'verb': 'GET',
'path': '/bar',
'params': params}
signature = signer.generate(request)
# Now make a request to validate the signed dummy request via the
# ec2tokens API. This proves the v3 ec2 credentials actually work.
sig_ref = {'access': access,
'signature': signature,
'host': 'foo',
'verb': 'GET',
'path': '/bar',
'params': params}
r = self.post(
'/ec2tokens',
body={'ec2Credentials': sig_ref},
expected_status=http_client.OK)
self.assertValidTokenResponse(r)
return r.result['token']
class CredentialTestCase(CredentialBaseTestCase): class CredentialTestCase(CredentialBaseTestCase):
"""Test credential CRUD.""" """Test credential CRUD."""
def setUp(self): def setUp(self):
super(CredentialTestCase, self).setUp() super(CredentialTestCase, self).setUp()
self.credential = unit.new_credential_ref(user_id=self.user['id'], self.credential = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id) project_id=self.project_id)
skipping to change at line 258 skipping to change at line 286
credential_id = r.result.get('credential')['id'] credential_id = r.result.get('credential')['id']
# Since the created credential above already has a project_id, the # Since the created credential above already has a project_id, the
# update request will not fail # update request will not fail
update_ref = {'type': 'ec2'} update_ref = {'type': 'ec2'}
self.patch( self.patch(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}, 'credential_id': credential_id},
body={'credential': update_ref}) body={'credential': update_ref})
def test_update_credential_non_owner(self):
"""Call ``PATCH /credentials/{credential_id}``."""
alt_user = unit.create_user(
PROVIDERS.identity_api, domain_id=self.domain_id)
alt_user_id = alt_user['id']
alt_project = unit.new_project_ref(domain_id=self.domain_id)
alt_project_id = alt_project['id']
PROVIDERS.resource_api.create_project(
alt_project['id'], alt_project)
alt_role = unit.new_role_ref(name='reader')
alt_role_id = alt_role['id']
PROVIDERS.role_api.create_role(alt_role_id, alt_role)
PROVIDERS.assignment_api.add_role_to_user_and_project(
alt_user_id, alt_project_id, alt_role_id)
auth = self.build_authentication_request(
user_id=alt_user_id,
password=alt_user['password'],
project_id=alt_project_id)
ref = unit.new_credential_ref(user_id=alt_user_id,
project_id=alt_project_id)
r = self.post(
'/credentials',
auth=auth,
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
credential_id = r.result.get('credential')['id']
# Cannot change the credential to be owned by another user
update_ref = {'user_id': self.user_id, 'project_id': self.project_id}
self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id},
expected_status=403,
auth=auth,
body={'credential': update_ref})
def test_update_ec2_credential_change_trust_id(self):
"""Call ``PATCH /credentials/{credential_id}``."""
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id)
blob['trust_id'] = uuid.uuid4().hex
ref['blob'] = json.dumps(blob)
r = self.post(
'/credentials',
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
credential_id = r.result.get('credential')['id']
# Try changing to a different trust
blob['trust_id'] = uuid.uuid4().hex
update_ref = {'blob': json.dumps(blob)}
self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id},
body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST)
# Try removing the trust
del blob['trust_id']
update_ref = {'blob': json.dumps(blob)}
self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id},
body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST)
def test_update_ec2_credential_change_app_cred_id(self):
"""Call ``PATCH /credentials/{credential_id}``."""
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id)
blob['app_cred_id'] = uuid.uuid4().hex
ref['blob'] = json.dumps(blob)
r = self.post(
'/credentials',
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
credential_id = r.result.get('credential')['id']
# Try changing to a different app cred
blob['app_cred_id'] = uuid.uuid4().hex
update_ref = {'blob': json.dumps(blob)}
self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id},
body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST)
# Try removing the app cred
del blob['app_cred_id']
update_ref = {'blob': json.dumps(blob)}
self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id},
body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST)
def test_update_ec2_credential_change_access_token_id(self):
"""Call ``PATCH /credentials/{credential_id}``."""
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id)
blob['access_token_id'] = uuid.uuid4().hex
ref['blob'] = json.dumps(blob)
r = self.post(
'/credentials',
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
credential_id = r.result.get('credential')['id']
# Try changing to a different access token
blob['access_token_id'] = uuid.uuid4().hex
update_ref = {'blob': json.dumps(blob)}
self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id},
body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST)
# Try removing the access token
del blob['access_token_id']
update_ref = {'blob': json.dumps(blob)}
self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id},
body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST)
def test_delete_credential(self): def test_delete_credential(self):
"""Call ``DELETE /credentials/{credential_id}``.""" """Call ``DELETE /credentials/{credential_id}``."""
self.delete( self.delete(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': self.credential['id']}) 'credential_id': self.credential['id']})
def test_delete_credential_retries_on_deadlock(self): def test_delete_credential_retries_on_deadlock(self):
patcher = mock.patch('sqlalchemy.orm.query.Query.delete', patcher = mock.patch('sqlalchemy.orm.query.Query.delete',
autospec=True) autospec=True)
skipping to change at line 392 skipping to change at line 540
def test_create_credential_with_admin_token(self): def test_create_credential_with_admin_token(self):
# Make sure we can create credential with the static admin token # Make sure we can create credential with the static admin token
ref = unit.new_credential_ref(user_id=self.user['id']) ref = unit.new_credential_ref(user_id=self.user['id'])
r = self.post( r = self.post(
'/credentials', '/credentials',
body={'credential': ref}, body={'credential': ref},
token=self.get_admin_token()) token=self.get_admin_token())
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
class TestCredentialTrustScoped(test_v3.RestfulTestCase): class TestCredentialTrustScoped(CredentialBaseTestCase):
"""Test credential with trust scoped token.""" """Test credential with trust scoped token."""
def setUp(self): def setUp(self):
super(TestCredentialTrustScoped, self).setUp() super(TestCredentialTrustScoped, self).setUp()
self.trustee_user = unit.new_user_ref(domain_id=self.domain_id) self.trustee_user = unit.new_user_ref(domain_id=self.domain_id)
password = self.trustee_user['password'] password = self.trustee_user['password']
self.trustee_user = PROVIDERS.identity_api.create_user( self.trustee_user = PROVIDERS.identity_api.create_user(
self.trustee_user self.trustee_user
) )
skipping to change at line 445 skipping to change at line 593
auth_data = self.build_authentication_request( auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'], user_id=self.trustee_user['id'],
password=self.trustee_user['password'], password=self.trustee_user['password'],
trust_id=trust['id']) trust_id=trust['id'])
r = self.v3_create_token(auth_data) r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r, self.user) self.assertValidProjectScopedTokenResponse(r, self.user)
trust_id = r.result['token']['OS-TRUST:trust']['id'] trust_id = r.result['token']['OS-TRUST:trust']['id']
token_id = r.headers.get('X-Subject-Token') token_id = r.headers.get('X-Subject-Token')
# Create the credential with the trust scoped token # Create the credential with the trust scoped token
blob, ref = unit.new_ec2_credential(user_id=self.user['id'], blob, ref = unit.new_ec2_credential(user_id=self.user_id,
project_id=self.project_id) project_id=self.project_id)
r = self.post('/credentials', body={'credential': ref}, token=token_id) r = self.post('/credentials', body={'credential': ref}, token=token_id)
# We expect the response blob to contain the trust_id # We expect the response blob to contain the trust_id
ret_ref = ref.copy() ret_ref = ref.copy()
ret_blob = blob.copy() ret_blob = blob.copy()
ret_blob['trust_id'] = trust_id ret_blob['trust_id'] = trust_id
ret_ref['blob'] = json.dumps(ret_blob) ret_ref['blob'] = json.dumps(ret_blob)
self.assertValidCredentialResponse(r, ref=ret_ref) self.assertValidCredentialResponse(r, ref=ret_ref)
# Assert credential id is same as hash of access key id for # Assert credential id is same as hash of access key id for
# ec2 credentials # ec2 credentials
access = blob['access'].encode('utf-8') access = blob['access'].encode('utf-8')
self.assertEqual(hashlib.sha256(access).hexdigest(), self.assertEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id']) r.result['credential']['id'])
# Create a role assignment to ensure that it is ignored and only the
# trust-delegated roles are used
role = unit.new_role_ref(name='reader')
role_id = role['id']
PROVIDERS.role_api.create_role(role_id, role)
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user_id, self.project_id, role_id)
ret_blob = json.loads(r.result['credential']['blob'])
ec2token = self._test_get_token(
access=ret_blob['access'], secret=ret_blob['secret'])
ec2_roles = [role['id'] for role in ec2token['roles']]
self.assertIn(self.role_id, ec2_roles)
self.assertNotIn(role_id, ec2_roles)
# Create second ec2 credential with the same access key id and check # Create second ec2 credential with the same access key id and check
# for conflict. # for conflict.
self.post( self.post(
'/credentials', '/credentials',
body={'credential': ref}, body={'credential': ref},
token=token_id, token=token_id,
expected_status=http_client.CONFLICT) expected_status=http_client.CONFLICT)
class TestCredentialEc2(CredentialBaseTestCase): class TestCredentialAppCreds(CredentialBaseTestCase):
"""Test v3 credential compatibility with ec2tokens.""" """Test credential with application credential token."""
def _validate_signature(self, access, secret): def setUp(self):
"""Test signature validation with the access/secret provided.""" super(TestCredentialAppCreds, self).setUp()
signer = ec2_utils.Ec2Signer(secret) self.useFixture(
params = {'SignatureMethod': 'HmacSHA256', ksfixtures.KeyRepository(
'SignatureVersion': '2', self.config_fixture,
'AWSAccessKeyId': access} 'credential',
request = {'host': 'foo', credential_fernet.MAX_ACTIVE_KEYS
'verb': 'GET', )
'path': '/bar', )
'params': params}
signature = signer.generate(request)
# Now make a request to validate the signed dummy request via the def test_app_cred_ec2_credential(self):
# ec2tokens API. This proves the v3 ec2 credentials actually work. """Test creating ec2 credential from an application credential.
sig_ref = {'access': access,
'signature': signature, Call ``POST /credentials``.
'host': 'foo', """
'verb': 'GET', # Create the app cred
'path': '/bar', ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
'params': params} del ref['id']
r = self.post( r = self.post('/users/%s/application_credentials' % self.user_id,
'/ec2tokens', body={'application_credential': ref})
body={'ec2Credentials': sig_ref}, app_cred = r.result['application_credential']
expected_status=http_client.OK)
self.assertValidTokenResponse(r) # Get an application credential token
auth_data = self.build_authentication_request(
app_cred_id=app_cred['id'],
secret=app_cred['secret'])
r = self.v3_create_token(auth_data)
token_id = r.headers.get('X-Subject-Token')
# Create the credential with the app cred token
blob, ref = unit.new_ec2_credential(user_id=self.user_id,
project_id=self.project_id)
r = self.post('/credentials', body={'credential': ref}, token=token_id)
# We expect the response blob to contain the app_cred_id
ret_ref = ref.copy()
ret_blob = blob.copy()
ret_blob['app_cred_id'] = app_cred['id']
ret_ref['blob'] = json.dumps(ret_blob)
self.assertValidCredentialResponse(r, ref=ret_ref)
# Assert credential id is same as hash of access key id for
# ec2 credentials
access = blob['access'].encode('utf-8')
self.assertEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id'])
# Create a role assignment to ensure that it is ignored and only the
# roles in the app cred are used
role = unit.new_role_ref(name='reader')
role_id = role['id']
PROVIDERS.role_api.create_role(role_id, role)
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user_id, self.project_id, role_id)
ret_blob = json.loads(r.result['credential']['blob'])
ec2token = self._test_get_token(
access=ret_blob['access'], secret=ret_blob['secret'])
ec2_roles = [role['id'] for role in ec2token['roles']]
self.assertIn(self.role_id, ec2_roles)
self.assertNotIn(role_id, ec2_roles)
# Create second ec2 credential with the same access key id and check
# for conflict.
self.post(
'/credentials',
body={'credential': ref},
token=token_id,
expected_status=http_client.CONFLICT)
class TestCredentialAccessToken(CredentialBaseTestCase):
"""Test credential with access token."""
def setUp(self):
super(TestCredentialAccessToken, self).setUp()
self.useFixture(
ksfixtures.KeyRepository(
self.config_fixture,
'credential',
credential_fernet.MAX_ACTIVE_KEYS
)
)
self.base_url = 'http://localhost/v3'
def _urllib_parse_qs_text_keys(self, content):
results = urllib.parse.parse_qs(content)
return {key.decode('utf-8'): value for key, value in results.items()}
def _create_single_consumer(self):
endpoint = '/OS-OAUTH1/consumers'
ref = {'description': uuid.uuid4().hex}
resp = self.post(endpoint, body={'consumer': ref})
return resp.result['consumer']
def _create_request_token(self, consumer, project_id, base_url=None):
endpoint = '/OS-OAUTH1/request_token'
client = oauth1.Client(consumer['key'],
client_secret=consumer['secret'],
signature_method=oauth1.SIG_HMAC,
callback_uri="oob")
headers = {'requested_project_id': project_id}
if not base_url:
base_url = self.base_url
url, headers, body = client.sign(base_url + endpoint,
http_method='POST',
headers=headers)
return endpoint, headers
def _create_access_token(self, consumer, token, base_url=None):
endpoint = '/OS-OAUTH1/access_token'
client = oauth1.Client(consumer['key'],
client_secret=consumer['secret'],
resource_owner_key=token.key,
resource_owner_secret=token.secret,
signature_method=oauth1.SIG_HMAC,
verifier=token.verifier)
if not base_url:
base_url = self.base_url
url, headers, body = client.sign(base_url + endpoint,
http_method='POST')
headers.update({'Content-Type': 'application/json'})
return endpoint, headers
def _get_oauth_token(self, consumer, token):
client = oauth1.Client(consumer['key'],
client_secret=consumer['secret'],
resource_owner_key=token.key,
resource_owner_secret=token.secret,
signature_method=oauth1.SIG_HMAC)
endpoint = '/auth/tokens'
url, headers, body = client.sign(self.base_url + endpoint,
http_method='POST')
headers.update({'Content-Type': 'application/json'})
ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}}
return endpoint, headers, ref
def _authorize_request_token(self, request_id):
if isinstance(request_id, bytes):
request_id = request_id.decode()
return '/OS-OAUTH1/authorize/%s' % (request_id)
def _get_access_token(self):
consumer = self._create_single_consumer()
consumer_id = consumer['id']
consumer_secret = consumer['secret']
consumer = {'key': consumer_id, 'secret': consumer_secret}
url, headers = self._create_request_token(consumer, self.project_id)
content = self.post(
url, headers=headers,
response_content_type='application/x-www-form-urlencoded')
credentials = self._urllib_parse_qs_text_keys(content.result)
request_key = credentials['oauth_token'][0]
request_secret = credentials['oauth_token_secret'][0]
request_token = oauth1.Token(request_key, request_secret)
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
resp = self.put(url, body=body, expected_status=http_client.OK)
verifier = resp.result['token']['oauth_verifier']
request_token.set_verifier(verifier)
url, headers = self._create_access_token(consumer, request_token)
content = self.post(
url, headers=headers,
response_content_type='application/x-www-form-urlencoded')
credentials = self._urllib_parse_qs_text_keys(content.result)
access_key = credentials['oauth_token'][0]
access_secret = credentials['oauth_token_secret'][0]
access_token = oauth1.Token(access_key, access_secret)
url, headers, body = self._get_oauth_token(consumer, access_token)
content = self.post(url, headers=headers, body=body)
return access_key, content.headers['X-Subject-Token']
def test_access_token_ec2_credential(self):
"""Test creating ec2 credential from an oauth access token.
Call ``POST /credentials``.
"""
access_key, token_id = self._get_access_token()
# Create the credential with the access token
blob, ref = unit.new_ec2_credential(user_id=self.user_id,
project_id=self.project_id)
r = self.post('/credentials', body={'credential': ref}, token=token_id)
# We expect the response blob to contain the access_token_id
ret_ref = ref.copy()
ret_blob = blob.copy()
ret_blob['access_token_id'] = access_key.decode('utf-8')
ret_ref['blob'] = json.dumps(ret_blob)
self.assertValidCredentialResponse(r, ref=ret_ref)
# Assert credential id is same as hash of access key id for
# ec2 credentials
access = blob['access'].encode('utf-8')
self.assertEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id'])
# Create a role assignment to ensure that it is ignored and only the
# roles in the access token are used
role = unit.new_role_ref(name='reader')
role_id = role['id']
PROVIDERS.role_api.create_role(role_id, role)
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user_id, self.project_id, role_id)
ret_blob = json.loads(r.result['credential']['blob'])
ec2token = self._test_get_token(
access=ret_blob['access'], secret=ret_blob['secret'])
ec2_roles = [role['id'] for role in ec2token['roles']]
self.assertIn(self.role_id, ec2_roles)
self.assertNotIn(role_id, ec2_roles)
class TestCredentialEc2(CredentialBaseTestCase):
"""Test v3 credential compatibility with ec2tokens."""
def test_ec2_credential_signature_validate(self): def test_ec2_credential_signature_validate(self):
"""Test signature validation with a v3 ec2 credential.""" """Test signature validation with a v3 ec2 credential."""
blob, ref = unit.new_ec2_credential(user_id=self.user['id'], blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id) project_id=self.project_id)
r = self.post('/credentials', body={'credential': ref}) r = self.post('/credentials', body={'credential': ref})
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
# Assert credential id is same as hash of access key id # Assert credential id is same as hash of access key id
access = blob['access'].encode('utf-8') access = blob['access'].encode('utf-8')
self.assertEqual(hashlib.sha256(access).hexdigest(), self.assertEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id']) r.result['credential']['id'])
cred_blob = json.loads(r.result['credential']['blob']) cred_blob = json.loads(r.result['credential']['blob'])
self.assertEqual(blob, cred_blob) self.assertEqual(blob, cred_blob)
self._validate_signature(access=cred_blob['access'], self._test_get_token(access=cred_blob['access'],
secret=cred_blob['secret']) secret=cred_blob['secret'])
def test_ec2_credential_signature_validate_legacy(self): def test_ec2_credential_signature_validate_legacy(self):
"""Test signature validation with a legacy v3 ec2 credential.""" """Test signature validation with a legacy v3 ec2 credential."""
cred_json, _ = self._create_dict_blob_credential() cred_json, _ = self._create_dict_blob_credential()
cred_blob = json.loads(cred_json) cred_blob = json.loads(cred_json)
self._validate_signature(access=cred_blob['access'], self._test_get_token(access=cred_blob['access'],
secret=cred_blob['secret']) secret=cred_blob['secret'])
def _get_ec2_cred_uri(self): def _get_ec2_cred_uri(self):
return '/users/%s/credentials/OS-EC2' % self.user_id return '/users/%s/credentials/OS-EC2' % self.user_id
def _get_ec2_cred(self): def _get_ec2_cred(self):
uri = self._get_ec2_cred_uri() uri = self._get_ec2_cred_uri()
r = self.post(uri, body={'tenant_id': self.project_id}) r = self.post(uri, body={'tenant_id': self.project_id})
return r.result['credential'] return r.result['credential']
def test_ec2_create_credential(self): def test_ec2_create_credential(self):
"""Test ec2 credential creation.""" """Test ec2 credential creation."""
ec2_cred = self._get_ec2_cred() ec2_cred = self._get_ec2_cred()
self.assertEqual(self.user_id, ec2_cred['user_id']) self.assertEqual(self.user_id, ec2_cred['user_id'])
self.assertEqual(self.project_id, ec2_cred['tenant_id']) self.assertEqual(self.project_id, ec2_cred['tenant_id'])
self.assertIsNone(ec2_cred['trust_id']) self.assertIsNone(ec2_cred['trust_id'])
self._validate_signature(access=ec2_cred['access'], self._test_get_token(access=ec2_cred['access'],
secret=ec2_cred['secret']) secret=ec2_cred['secret'])
uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
self.assertThat(ec2_cred['links']['self'], self.assertThat(ec2_cred['links']['self'],
matchers.EndsWith(uri)) matchers.EndsWith(uri))
def test_ec2_get_credential(self): def test_ec2_get_credential(self):
ec2_cred = self._get_ec2_cred() ec2_cred = self._get_ec2_cred()
uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
r = self.get(uri) r = self.get(uri)
self.assertDictEqual(ec2_cred, r.result['credential']) self.assertDictEqual(ec2_cred, r.result['credential'])
self.assertThat(ec2_cred['links']['self'], self.assertThat(ec2_cred['links']['self'],
 End of changes. 13 change blocks. 
35 lines changed or deleted 391 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)