"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "keystone/tests/unit/token/test_fernet_provider.py" between
keystone-16.0.1.tar.gz and keystone-17.0.0.tar.gz

About: OpenStack Keystone (Core Service: Identity) provides an authentication and authorization service for other OpenStack services. Provides a catalog of endpoints for all OpenStack services.
The "Ussuri" series (latest release).

test_fernet_provider.py  (keystone-16.0.1):test_fernet_provider.py  (keystone-17.0.0)
skipping to change at line 16 skipping to change at line 16
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import base64 import base64
import datetime import datetime
import hashlib import hashlib
import mock
import os import os
from unittest import mock
import uuid import uuid
from oslo_utils import timeutils from oslo_utils import timeutils
import six
from keystone import auth from keystone import auth
from keystone.common import fernet_utils from keystone.common import fernet_utils
from keystone.common import provider_api from keystone.common import provider_api
from keystone.common import utils from keystone.common import utils
import keystone.conf import keystone.conf
from keystone import exception from keystone import exception
from keystone.federation import constants as federation_constants from keystone.federation import constants as federation_constants
from keystone.tests import unit from keystone.tests import unit
from keystone.tests.unit import default_fixtures from keystone.tests.unit import default_fixtures
skipping to change at line 225 skipping to change at line 224
self.config_fixture.config(group='token', caching=False) self.config_fixture.config(group='token', caching=False)
self.config_fixture.config(group='token', cache_on_issue=False) self.config_fixture.config(group='token', cache_on_issue=False)
class TestTokenFormatter(unit.TestCase): class TestTokenFormatter(unit.TestCase):
def test_restore_padding(self): def test_restore_padding(self):
# 'a' will result in '==' padding, 'aa' will result in '=' padding, and # 'a' will result in '==' padding, 'aa' will result in '=' padding, and
# 'aaa' will result in no padding. # 'aaa' will result in no padding.
binary_to_test = [b'a', b'aa', b'aaa'] binary_to_test = [b'a', b'aa', b'aaa']
for binary in binary_to_test: for binary in binary_to_test:
# base64.urlsafe_b64encode takes six.binary_type and returns # base64.urlsafe_b64encode takes bytes and returns
# six.binary_type. # bytes.
encoded_string = base64.urlsafe_b64encode(binary) encoded_string = base64.urlsafe_b64encode(binary)
encoded_string = encoded_string.decode('utf-8') encoded_string = encoded_string.decode('utf-8')
# encoded_string is now six.text_type. # encoded_string is now str.
encoded_str_without_padding = encoded_string.rstrip('=') encoded_str_without_padding = encoded_string.rstrip('=')
self.assertFalse(encoded_str_without_padding.endswith('=')) self.assertFalse(encoded_str_without_padding.endswith('='))
encoded_str_with_padding_restored = ( encoded_str_with_padding_restored = (
token_formatters.TokenFormatter.restore_padding( token_formatters.TokenFormatter.restore_padding(
encoded_str_without_padding) encoded_str_without_padding)
) )
self.assertEqual(encoded_string, encoded_str_with_padding_restored) self.assertEqual(encoded_string, encoded_str_with_padding_restored)
def test_create_validate_federated_unscoped_token_non_uuid_user_id(self): def test_create_validate_federated_unscoped_token_non_uuid_user_id(self):
exp_user_id = hashlib.sha256().hexdigest() exp_user_id = hashlib.sha256().hexdigest()
exp_methods = ['password'] exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()] exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_group_ids = [{'id': uuid.uuid4().hex}] exp_federated_group_ids = [{'id': uuid.uuid4().hex}]
exp_idp_id = uuid.uuid4().hex exp_idp_id = uuid.uuid4().hex
exp_protocol_id = uuid.uuid4().hex exp_protocol_id = uuid.uuid4().hex
token_formatter = token_formatters.TokenFormatter() token_formatter = token_formatters.TokenFormatter()
token = token_formatter.create_token(user_id=exp_user_id, token = token_formatter.create_token(
expires_at=exp_expires_at, user_id=exp_user_id,
audit_ids=exp_audit_ids, expires_at=exp_expires_at,
payload_class=token_formatters.Fede audit_ids=exp_audit_ids,
ratedUnscopedPayload, payload_class=token_formatters.FederatedUnscopedPayload,
methods=exp_methods, methods=exp_methods,
federated_group_ids=exp_federated_g federated_group_ids=exp_federated_group_ids,
roup_ids, identity_provider_id=exp_idp_id,
identity_provider_id=exp_idp_id, protocol_id=exp_protocol_id)
protocol_id=exp_protocol_id)
(user_id, methods, audit_ids, system, domain_id, project_id, trust_id, (user_id, methods, audit_ids, system, domain_id, project_id, trust_id,
federated_group_ids, identity_provider_id, protocol_id, federated_group_ids, identity_provider_id, protocol_id,
access_token_id, app_cred_id, issued_at, expires_at) = token_formatter. access_token_id, app_cred_id, issued_at,
validate_token(token) expires_at) = token_formatter.validate_token(token)
self.assertEqual(exp_user_id, user_id) self.assertEqual(exp_user_id, user_id)
self.assertTrue(isinstance(user_id, six.string_types)) self.assertTrue(isinstance(user_id, str))
self.assertEqual(exp_methods, methods) self.assertEqual(exp_methods, methods)
self.assertEqual(exp_audit_ids, audit_ids) self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_federated_group_ids, federated_group_ids) self.assertEqual(exp_federated_group_ids, federated_group_ids)
self.assertEqual(exp_idp_id, identity_provider_id) self.assertEqual(exp_idp_id, identity_provider_id)
self.assertEqual(exp_protocol_id, protocol_id) self.assertEqual(exp_protocol_id, protocol_id)
def test_create_validate_federated_scoped_token_non_uuid_user_id(self): def test_create_validate_federated_scoped_token_non_uuid_user_id(self):
exp_user_id = hashlib.sha256().hexdigest() exp_user_id = hashlib.sha256().hexdigest()
exp_methods = ['password'] exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()] exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_group_ids = [{'id': uuid.uuid4().hex}] exp_federated_group_ids = [{'id': uuid.uuid4().hex}]
exp_idp_id = uuid.uuid4().hex exp_idp_id = uuid.uuid4().hex
exp_protocol_id = uuid.uuid4().hex exp_protocol_id = uuid.uuid4().hex
exp_project_id = uuid.uuid4().hex exp_project_id = uuid.uuid4().hex
token_formatter = token_formatters.TokenFormatter() token_formatter = token_formatters.TokenFormatter()
token = token_formatter.create_token(user_id=exp_user_id, token = token_formatter.create_token(
expires_at=exp_expires_at, user_id=exp_user_id,
audit_ids=exp_audit_ids, expires_at=exp_expires_at,
payload_class=token_formatters.Fede audit_ids=exp_audit_ids,
ratedProjectScopedPayload, payload_class=token_formatters.FederatedProjectScopedPayload,
methods=exp_methods, methods=exp_methods,
federated_group_ids=exp_federated_g federated_group_ids=exp_federated_group_ids,
roup_ids, identity_provider_id=exp_idp_id,
identity_provider_id=exp_idp_id, protocol_id=exp_protocol_id,
protocol_id=exp_protocol_id, project_id=exp_project_id)
project_id=exp_project_id)
(user_id, methods, audit_ids, system, domain_id, project_id, trust_id, (user_id, methods, audit_ids, system, domain_id, project_id, trust_id,
federated_group_ids, identity_provider_id, protocol_id, federated_group_ids, identity_provider_id, protocol_id,
access_token_id, app_cred_id, issued_at, expires_at) = token_formatter. access_token_id, app_cred_id, issued_at,
validate_token(token) expires_at) = token_formatter.validate_token(token)
self.assertEqual(exp_user_id, user_id) self.assertEqual(exp_user_id, user_id)
self.assertTrue(isinstance(user_id, six.string_types)) self.assertTrue(isinstance(user_id, str))
self.assertEqual(exp_methods, methods) self.assertEqual(exp_methods, methods)
self.assertEqual(exp_audit_ids, audit_ids) self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_project_id, project_id) self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_federated_group_ids, federated_group_ids) self.assertEqual(exp_federated_group_ids, federated_group_ids)
self.assertEqual(exp_idp_id, identity_provider_id) self.assertEqual(exp_idp_id, identity_provider_id)
self.assertEqual(exp_protocol_id, protocol_id) self.assertEqual(exp_protocol_id, protocol_id)
class TestPayloads(unit.TestCase): class TestPayloads(unit.TestCase):
def assertTimestampsEqual(self, expected, actual): def assertTimestampsEqual(self, expected, actual):
# The timestamp that we get back when parsing the payload may not # The timestamp that we get back when parsing the payload may not
skipping to change at line 320 skipping to change at line 323
actual_time = timeutils.parse_isotime(actual) actual_time = timeutils.parse_isotime(actual)
# the granularity of timestamp string is microseconds and it's only the # the granularity of timestamp string is microseconds and it's only the
# last digit in the representation that's different, so use a delta # last digit in the representation that's different, so use a delta
# just above nanoseconds. # just above nanoseconds.
return self.assertCloseEnoughForGovernmentWork(exp_time, actual_time, return self.assertCloseEnoughForGovernmentWork(exp_time, actual_time,
delta=1e-05) delta=1e-05)
def test_strings_can_be_converted_to_bytes(self): def test_strings_can_be_converted_to_bytes(self):
s = provider.random_urlsafe_str() s = provider.random_urlsafe_str()
self.assertIsInstance(s, six.text_type) self.assertIsInstance(s, str)
b = token_formatters.BasePayload.random_urlsafe_str_to_bytes(s) b = token_formatters.BasePayload.random_urlsafe_str_to_bytes(s)
self.assertIsInstance(b, six.binary_type) self.assertIsInstance(b, bytes)
def test_uuid_hex_to_byte_conversions(self): def test_uuid_hex_to_byte_conversions(self):
payload_cls = token_formatters.BasePayload payload_cls = token_formatters.BasePayload
expected_hex_uuid = uuid.uuid4().hex expected_hex_uuid = uuid.uuid4().hex
uuid_obj = uuid.UUID(expected_hex_uuid) uuid_obj = uuid.UUID(expected_hex_uuid)
expected_uuid_in_bytes = uuid_obj.bytes expected_uuid_in_bytes = uuid_obj.bytes
actual_uuid_in_bytes = payload_cls.convert_uuid_hex_to_bytes( actual_uuid_in_bytes = payload_cls.convert_uuid_hex_to_bytes(
expected_hex_uuid) expected_hex_uuid)
self.assertEqual(expected_uuid_in_bytes, actual_uuid_in_bytes) self.assertEqual(expected_uuid_in_bytes, actual_uuid_in_bytes)
skipping to change at line 413 skipping to change at line 416
self.assertEqual(expected_hex_uuid, actual_hex_uuid) self.assertEqual(expected_hex_uuid, actual_hex_uuid)
def _test_payload(self, payload_class, exp_user_id=None, exp_methods=None, def _test_payload(self, payload_class, exp_user_id=None, exp_methods=None,
exp_system=None, exp_project_id=None, exp_domain_id=None, exp_system=None, exp_project_id=None, exp_domain_id=None,
exp_trust_id=None, exp_federated_group_ids=None, exp_trust_id=None, exp_federated_group_ids=None,
exp_identity_provider_id=None, exp_protocol_id=None, exp_identity_provider_id=None, exp_protocol_id=None,
exp_access_token_id=None, exp_app_cred_id=None, exp_access_token_id=None, exp_app_cred_id=None,
encode_ids=False): encode_ids=False):
def _encode_id(value): def _encode_id(value):
if value is not None and six.text_type(value) and encode_ids: if value is not None and str(value) and encode_ids:
return value.encode('utf-8') return value.encode('utf-8')
return value return value
exp_user_id = exp_user_id or uuid.uuid4().hex exp_user_id = exp_user_id or uuid.uuid4().hex
exp_methods = exp_methods or ['password'] exp_methods = exp_methods or ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()] exp_audit_ids = [provider.random_urlsafe_str()]
payload = payload_class.assemble( payload = payload_class.assemble(
_encode_id(exp_user_id), _encode_id(exp_user_id),
exp_methods, exp_methods,
skipping to change at line 618 skipping to change at line 621
"""Create a "thumbprint" of the current key repository. """Create a "thumbprint" of the current key repository.
Because key files are renamed, this produces a hash of the contents of Because key files are renamed, this produces a hash of the contents of
the key files, ignoring their filenames. the key files, ignoring their filenames.
The resulting signature can be used, for example, to ensure that you The resulting signature can be used, for example, to ensure that you
have a unique set of keys after you perform a key rotation (taking a have a unique set of keys after you perform a key rotation (taking a
static set of keys, and simply shuffling them, would fail such a test). static set of keys, and simply shuffling them, would fail such a test).
""" """
# Load the keys into a list, keys is list of six.text_type. # Load the keys into a list, keys is list of str.
key_utils = fernet_utils.FernetUtils( key_utils = fernet_utils.FernetUtils(
CONF.fernet_tokens.key_repository, CONF.fernet_tokens.key_repository,
CONF.fernet_tokens.max_active_keys, CONF.fernet_tokens.max_active_keys,
'fernet_tokens' 'fernet_tokens'
) )
keys = key_utils.load_keys() keys = key_utils.load_keys()
# Sort the list of keys by the keys themselves (they were previously # Sort the list of keys by the keys themselves (they were previously
# sorted by filename). # sorted by filename).
keys.sort() keys.sort()
# Create the thumbprint using all keys in the repository. # Create the thumbprint using all keys in the repository.
signature = hashlib.sha1() signature = hashlib.sha1()
for key in keys: for key in keys:
# Need to convert key to six.binary_type for update. # Need to convert key to bytes for update.
signature.update(key.encode('utf-8')) signature.update(key.encode('utf-8'))
return signature.hexdigest() return signature.hexdigest()
def assertRepositoryState(self, expected_size): def assertRepositoryState(self, expected_size):
"""Validate the state of the key repository.""" """Validate the state of the key repository."""
self.assertEqual(expected_size, self.key_repository_size) self.assertEqual(expected_size, self.key_repository_size)
self.assertUniqueRepositoryState() self.assertUniqueRepositoryState()
def assertUniqueRepositoryState(self): def assertUniqueRepositoryState(self):
"""Ensure that the current key repo state has not been seen before.""" """Ensure that the current key repo state has not been seen before."""
 End of changes. 16 change blocks. 
37 lines changed or deleted 34 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)