"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "keystone/tests/unit/test_v3_credential.py" between
keystone-16.0.1.tar.gz and keystone-17.0.0.tar.gz

About: OpenStack Keystone (Core Service: Identity) provides an authentication and authorization service for other OpenStack services. Provides a catalog of endpoints for all OpenStack services.
The "Ussuri" series (latest release).

test_v3_credential.py  (keystone-16.0.1):test_v3_credential.py  (keystone-17.0.0)
skipping to change at line 17 skipping to change at line 17
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import hashlib import hashlib
import json import json
from unittest import mock
import uuid import uuid
import http.client
from keystoneclient.contrib.ec2 import utils as ec2_utils from keystoneclient.contrib.ec2 import utils as ec2_utils
import mock
from oslo_db import exception as oslo_db_exception from oslo_db import exception as oslo_db_exception
from six.moves import http_client, urllib
from testtools import matchers from testtools import matchers
import urllib
from keystone.api import ec2tokens from keystone.api import ec2tokens
from keystone.common import provider_api from keystone.common import provider_api
from keystone.common import utils from keystone.common import utils
from keystone.credential.providers import fernet as credential_fernet from keystone.credential.providers import fernet as credential_fernet
from keystone import exception from keystone import exception
from keystone import oauth1 from keystone import oauth1
from keystone.tests import unit from keystone.tests import unit
from keystone.tests.unit import ksfixtures from keystone.tests.unit import ksfixtures
from keystone.tests.unit import test_v3 from keystone.tests.unit import test_v3
skipping to change at line 88 skipping to change at line 89
# ec2tokens API. This proves the v3 ec2 credentials actually work. # ec2tokens API. This proves the v3 ec2 credentials actually work.
sig_ref = {'access': access, sig_ref = {'access': access,
'signature': signature, 'signature': signature,
'host': 'foo', 'host': 'foo',
'verb': 'GET', 'verb': 'GET',
'path': '/bar', 'path': '/bar',
'params': params} 'params': params}
r = self.post( r = self.post(
'/ec2tokens', '/ec2tokens',
body={'ec2Credentials': sig_ref}, body={'ec2Credentials': sig_ref},
expected_status=http_client.OK) expected_status=http.client.OK)
self.assertValidTokenResponse(r) self.assertValidTokenResponse(r)
return r.result['token'] return r.result['token']
class CredentialTestCase(CredentialBaseTestCase): class CredentialTestCase(CredentialBaseTestCase):
"""Test credential CRUD.""" """Test credential CRUD."""
def setUp(self): def setUp(self):
super(CredentialTestCase, self).setUp() super(CredentialTestCase, self).setUp()
skipping to change at line 265 skipping to change at line 266
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
credential_id = r.result.get('credential')['id'] credential_id = r.result.get('credential')['id']
# Updating such credential to ec2 type without providing a project_id # Updating such credential to ec2 type without providing a project_id
# will fail # will fail
update_ref = {'type': 'ec2'} update_ref = {'type': 'ec2'}
self.patch( self.patch(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}, 'credential_id': credential_id},
body={'credential': update_ref}, body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_update_credential_to_ec2_with_previously_set_project_id(self): def test_update_credential_to_ec2_with_previously_set_project_id(self):
"""Call ``PATCH /credentials/{credential_id}``.""" """Call ``PATCH /credentials/{credential_id}``."""
# Create a credential providing a project_id # Create a credential providing a project_id
ref = unit.new_credential_ref(user_id=self.user['id'], ref = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id) project_id=self.project_id)
r = self.post( r = self.post(
'/credentials', '/credentials',
body={'credential': ref}) body={'credential': ref})
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
skipping to change at line 340 skipping to change at line 341
body={'credential': ref}) body={'credential': ref})
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
credential_id = r.result.get('credential')['id'] credential_id = r.result.get('credential')['id']
# Try changing to a different trust # Try changing to a different trust
blob['trust_id'] = uuid.uuid4().hex blob['trust_id'] = uuid.uuid4().hex
update_ref = {'blob': json.dumps(blob)} update_ref = {'blob': json.dumps(blob)}
self.patch( self.patch(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}, 'credential_id': credential_id},
body={'credential': update_ref}, body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
# Try removing the trust # Try removing the trust
del blob['trust_id'] del blob['trust_id']
update_ref = {'blob': json.dumps(blob)} update_ref = {'blob': json.dumps(blob)}
self.patch( self.patch(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}, 'credential_id': credential_id},
body={'credential': update_ref}, body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_update_ec2_credential_change_app_cred_id(self): def test_update_ec2_credential_change_app_cred_id(self):
"""Call ``PATCH /credentials/{credential_id}``.""" """Call ``PATCH /credentials/{credential_id}``."""
blob, ref = unit.new_ec2_credential(user_id=self.user['id'], blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id) project_id=self.project_id)
blob['app_cred_id'] = uuid.uuid4().hex blob['app_cred_id'] = uuid.uuid4().hex
ref['blob'] = json.dumps(blob) ref['blob'] = json.dumps(blob)
r = self.post( r = self.post(
'/credentials', '/credentials',
body={'credential': ref}) body={'credential': ref})
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
credential_id = r.result.get('credential')['id'] credential_id = r.result.get('credential')['id']
# Try changing to a different app cred # Try changing to a different app cred
blob['app_cred_id'] = uuid.uuid4().hex blob['app_cred_id'] = uuid.uuid4().hex
update_ref = {'blob': json.dumps(blob)} update_ref = {'blob': json.dumps(blob)}
self.patch( self.patch(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}, 'credential_id': credential_id},
body={'credential': update_ref}, body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
# Try removing the app cred # Try removing the app cred
del blob['app_cred_id'] del blob['app_cred_id']
update_ref = {'blob': json.dumps(blob)} update_ref = {'blob': json.dumps(blob)}
self.patch( self.patch(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}, 'credential_id': credential_id},
body={'credential': update_ref}, body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_update_ec2_credential_change_access_token_id(self): def test_update_ec2_credential_change_access_token_id(self):
"""Call ``PATCH /credentials/{credential_id}``.""" """Call ``PATCH /credentials/{credential_id}``."""
blob, ref = unit.new_ec2_credential(user_id=self.user['id'], blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id) project_id=self.project_id)
blob['access_token_id'] = uuid.uuid4().hex blob['access_token_id'] = uuid.uuid4().hex
ref['blob'] = json.dumps(blob) ref['blob'] = json.dumps(blob)
r = self.post( r = self.post(
'/credentials', '/credentials',
body={'credential': ref}) body={'credential': ref})
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
credential_id = r.result.get('credential')['id'] credential_id = r.result.get('credential')['id']
# Try changing to a different access token # Try changing to a different access token
blob['access_token_id'] = uuid.uuid4().hex blob['access_token_id'] = uuid.uuid4().hex
update_ref = {'blob': json.dumps(blob)} update_ref = {'blob': json.dumps(blob)}
self.patch( self.patch(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}, 'credential_id': credential_id},
body={'credential': update_ref}, body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
# Try removing the access token # Try removing the access token
del blob['access_token_id'] del blob['access_token_id']
update_ref = {'blob': json.dumps(blob)} update_ref = {'blob': json.dumps(blob)}
self.patch( self.patch(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}, 'credential_id': credential_id},
body={'credential': update_ref}, body={'credential': update_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_delete_credential(self): def test_delete_credential(self):
"""Call ``DELETE /credentials/{credential_id}``.""" """Call ``DELETE /credentials/{credential_id}``."""
self.delete( self.delete(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': self.credential['id']}) 'credential_id': self.credential['id']})
def test_delete_credential_retries_on_deadlock(self): def test_delete_credential_retries_on_deadlock(self):
patcher = mock.patch('sqlalchemy.orm.query.Query.delete', patcher = mock.patch('sqlalchemy.orm.query.Query.delete',
autospec=True) autospec=True)
skipping to change at line 459 skipping to change at line 460
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
# Assert credential id is same as hash of access key id for # Assert credential id is same as hash of access key id for
# ec2 credentials # ec2 credentials
access = blob['access'].encode('utf-8') access = blob['access'].encode('utf-8')
self.assertEqual(hashlib.sha256(access).hexdigest(), self.assertEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id']) r.result['credential']['id'])
# Create second ec2 credential with the same access key id and check # Create second ec2 credential with the same access key id and check
# for conflict. # for conflict.
self.post( self.post(
'/credentials', '/credentials',
body={'credential': ref}, expected_status=http_client.CONFLICT) body={'credential': ref}, expected_status=http.client.CONFLICT)
def test_get_ec2_dict_blob(self): def test_get_ec2_dict_blob(self):
"""Ensure non-JSON blob data is correctly converted.""" """Ensure non-JSON blob data is correctly converted."""
expected_blob, credential_id = self._create_dict_blob_credential() expected_blob, credential_id = self._create_dict_blob_credential()
r = self.get( r = self.get(
'/credentials/%(credential_id)s' % { '/credentials/%(credential_id)s' % {
'credential_id': credential_id}) 'credential_id': credential_id})
# use json.loads to transform the blobs back into Python dictionaries # use json.loads to transform the blobs back into Python dictionaries
skipping to change at line 514 skipping to change at line 515
def test_create_ec2_credential_with_missing_project_id(self): def test_create_ec2_credential_with_missing_project_id(self):
"""Test Creating ec2 credential with missing project_id. """Test Creating ec2 credential with missing project_id.
Call ``POST /credentials``. Call ``POST /credentials``.
""" """
_, ref = unit.new_ec2_credential(user_id=self.user['id'], _, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=None) project_id=None)
# Assert bad request status when missing project_id # Assert bad request status when missing project_id
self.post( self.post(
'/credentials', '/credentials',
body={'credential': ref}, expected_status=http_client.BAD_REQUEST) body={'credential': ref}, expected_status=http.client.BAD_REQUEST)
def test_create_ec2_credential_with_invalid_blob(self): def test_create_ec2_credential_with_invalid_blob(self):
"""Test creating ec2 credential with invalid blob. """Test creating ec2 credential with invalid blob.
Call ``POST /credentials``. Call ``POST /credentials``.
""" """
ref = unit.new_credential_ref(user_id=self.user['id'], ref = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id, project_id=self.project_id,
blob='{"abc":"def"d}', blob='{"abc":"def"d}',
type=CRED_TYPE_EC2) type=CRED_TYPE_EC2)
# Assert bad request status when request contains invalid blob # Assert bad request status when request contains invalid blob
response = self.post( response = self.post(
'/credentials', '/credentials',
body={'credential': ref}, expected_status=http_client.BAD_REQUEST) body={'credential': ref}, expected_status=http.client.BAD_REQUEST)
self.assertValidErrorResponse(response) self.assertValidErrorResponse(response)
def test_create_credential_with_admin_token(self): def test_create_credential_with_admin_token(self):
# Make sure we can create credential with the static admin token # Make sure we can create credential with the static admin token
ref = unit.new_credential_ref(user_id=self.user['id']) ref = unit.new_credential_ref(user_id=self.user['id'])
r = self.post( r = self.post(
'/credentials', '/credentials',
body={'credential': ref}, body={'credential': ref},
token=self.get_admin_token()) token=self.get_admin_token())
self.assertValidCredentialResponse(r, ref) self.assertValidCredentialResponse(r, ref)
skipping to change at line 631 skipping to change at line 632
ec2_roles = [role['id'] for role in ec2token['roles']] ec2_roles = [role['id'] for role in ec2token['roles']]
self.assertIn(self.role_id, ec2_roles) self.assertIn(self.role_id, ec2_roles)
self.assertNotIn(role_id, ec2_roles) self.assertNotIn(role_id, ec2_roles)
# Create second ec2 credential with the same access key id and check # Create second ec2 credential with the same access key id and check
# for conflict. # for conflict.
self.post( self.post(
'/credentials', '/credentials',
body={'credential': ref}, body={'credential': ref},
token=token_id, token=token_id,
expected_status=http_client.CONFLICT) expected_status=http.client.CONFLICT)
class TestCredentialAppCreds(CredentialBaseTestCase): class TestCredentialAppCreds(CredentialBaseTestCase):
"""Test credential with application credential token.""" """Test credential with application credential token."""
def setUp(self): def setUp(self):
super(TestCredentialAppCreds, self).setUp() super(TestCredentialAppCreds, self).setUp()
self.useFixture( self.useFixture(
ksfixtures.KeyRepository( ksfixtures.KeyRepository(
self.config_fixture, self.config_fixture,
'credential', 'credential',
skipping to change at line 704 skipping to change at line 705
ec2_roles = [role['id'] for role in ec2token['roles']] ec2_roles = [role['id'] for role in ec2token['roles']]
self.assertIn(self.role_id, ec2_roles) self.assertIn(self.role_id, ec2_roles)
self.assertNotIn(role_id, ec2_roles) self.assertNotIn(role_id, ec2_roles)
# Create second ec2 credential with the same access key id and check # Create second ec2 credential with the same access key id and check
# for conflict. # for conflict.
self.post( self.post(
'/credentials', '/credentials',
body={'credential': ref}, body={'credential': ref},
token=token_id, token=token_id,
expected_status=http_client.CONFLICT) expected_status=http.client.CONFLICT)
class TestCredentialAccessToken(CredentialBaseTestCase): class TestCredentialAccessToken(CredentialBaseTestCase):
"""Test credential with access token.""" """Test credential with access token."""
def setUp(self): def setUp(self):
super(TestCredentialAccessToken, self).setUp() super(TestCredentialAccessToken, self).setUp()
self.useFixture( self.useFixture(
ksfixtures.KeyRepository( ksfixtures.KeyRepository(
self.config_fixture, self.config_fixture,
'credential', 'credential',
skipping to change at line 795 skipping to change at line 796
content = self.post( content = self.post(
url, headers=headers, url, headers=headers,
response_content_type='application/x-www-form-urlencoded') response_content_type='application/x-www-form-urlencoded')
credentials = self._urllib_parse_qs_text_keys(content.result) credentials = self._urllib_parse_qs_text_keys(content.result)
request_key = credentials['oauth_token'][0] request_key = credentials['oauth_token'][0]
request_secret = credentials['oauth_token_secret'][0] request_secret = credentials['oauth_token_secret'][0]
request_token = oauth1.Token(request_key, request_secret) request_token = oauth1.Token(request_key, request_secret)
url = self._authorize_request_token(request_key) url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]} body = {'roles': [{'id': self.role_id}]}
resp = self.put(url, body=body, expected_status=http_client.OK) resp = self.put(url, body=body, expected_status=http.client.OK)
verifier = resp.result['token']['oauth_verifier'] verifier = resp.result['token']['oauth_verifier']
request_token.set_verifier(verifier) request_token.set_verifier(verifier)
url, headers = self._create_access_token(consumer, request_token) url, headers = self._create_access_token(consumer, request_token)
content = self.post( content = self.post(
url, headers=headers, url, headers=headers,
response_content_type='application/x-www-form-urlencoded') response_content_type='application/x-www-form-urlencoded')
credentials = self._urllib_parse_qs_text_keys(content.result) credentials = self._urllib_parse_qs_text_keys(content.result)
access_key = credentials['oauth_token'][0] access_key = credentials['oauth_token'][0]
access_secret = credentials['oauth_token_secret'][0] access_secret = credentials['oauth_token_secret'][0]
skipping to change at line 917 skipping to change at line 918
access_key = uuid.uuid4().hex access_key = uuid.uuid4().hex
cred_id = utils.hash_access_key(access_key) cred_id = utils.hash_access_key(access_key)
non_ec2_cred = unit.new_credential_ref( non_ec2_cred = unit.new_credential_ref(
user_id=self.user_id, user_id=self.user_id,
project_id=self.project_id) project_id=self.project_id)
non_ec2_cred['id'] = cred_id non_ec2_cred['id'] = cred_id
PROVIDERS.credential_api.create_credential(cred_id, non_ec2_cred) PROVIDERS.credential_api.create_credential(cred_id, non_ec2_cred)
uri = '/'.join([self._get_ec2_cred_uri(), access_key]) uri = '/'.join([self._get_ec2_cred_uri(), access_key])
# if access_key is not found, ec2 controller raises Unauthorized # if access_key is not found, ec2 controller raises Unauthorized
# exception # exception
self.get(uri, expected_status=http_client.UNAUTHORIZED) self.get(uri, expected_status=http.client.UNAUTHORIZED)
def test_ec2_list_credentials(self): def test_ec2_list_credentials(self):
"""Test ec2 credential listing.""" """Test ec2 credential listing."""
self._get_ec2_cred() self._get_ec2_cred()
uri = self._get_ec2_cred_uri() uri = self._get_ec2_cred_uri()
r = self.get(uri) r = self.get(uri)
cred_list = r.result['credentials'] cred_list = r.result['credentials']
self.assertEqual(1, len(cred_list)) self.assertEqual(1, len(cred_list))
self.assertThat(r.result['links']['self'], self.assertThat(r.result['links']['self'],
matchers.EndsWith(uri)) matchers.EndsWith(uri))
 End of changes. 20 change blocks. 
17 lines changed or deleted 18 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)