"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "barbican/tests/plugin/crypto/test_pkcs11.py" between
barbican-11.0.0.tar.gz and barbican-12.0.0.tar.gz

About: OpenStack Barbican is the OpenStack Key Manager service. It provides secure storage, provisioning and management of secret data.
The "Wallaby" series (latest release).

test_pkcs11.py  (barbican-11.0.0):test_pkcs11.py  (barbican-12.0.0)
skipping to change at line 28 skipping to change at line 28
from barbican.tests import utils from barbican.tests import utils
class WhenTestingPKCS11(utils.BaseTestCase): class WhenTestingPKCS11(utils.BaseTestCase):
def setUp(self): def setUp(self):
super(WhenTestingPKCS11, self).setUp() super(WhenTestingPKCS11, self).setUp()
self.lib = mock.Mock() self.lib = mock.Mock()
self.lib.C_Initialize.return_value = pkcs11.CKR_OK self.lib.C_Initialize.return_value = pkcs11.CKR_OK
self.lib.C_Finalize.return_value = pkcs11.CKR_OK self.lib.C_Finalize.return_value = pkcs11.CKR_OK
self.lib.C_GetSlotList.side_effect = self._get_slot_list
self.lib.C_GetTokenInfo.side_effect = self._get_token_info
self.lib.C_OpenSession.side_effect = self._open_session self.lib.C_OpenSession.side_effect = self._open_session
self.lib.C_CloseSession.return_value = pkcs11.CKR_OK self.lib.C_CloseSession.return_value = pkcs11.CKR_OK
self.lib.C_GetSessionInfo.side_effect = self._get_session_user self.lib.C_GetSessionInfo.side_effect = self._get_session_user
self.lib.C_Login.return_value = pkcs11.CKR_OK self.lib.C_Login.return_value = pkcs11.CKR_OK
self.lib.C_FindObjectsInit.return_value = pkcs11.CKR_OK self.lib.C_FindObjectsInit.return_value = pkcs11.CKR_OK
self.lib.C_FindObjects.side_effect = self._find_objects_one self.lib.C_FindObjects.side_effect = self._find_objects_one
self.lib.C_FindObjectsFinal.return_value = pkcs11.CKR_OK self.lib.C_FindObjectsFinal.return_value = pkcs11.CKR_OK
self.lib.C_GenerateKey.side_effect = self._generate_key self.lib.C_GenerateKey.side_effect = self._generate_key
self.lib.C_GenerateRandom.side_effect = self._generate_random self.lib.C_GenerateRandom.side_effect = self._generate_random
self.lib.C_SeedRandom.return_value = pkcs11.CKR_OK self.lib.C_SeedRandom.return_value = pkcs11.CKR_OK
skipping to change at line 60 skipping to change at line 62
setattr(self.ffi, 'dlopen', lambda x: self.lib) setattr(self.ffi, 'dlopen', lambda x: self.lib)
self.cfg_mock = mock.MagicMock(name='config mock') self.cfg_mock = mock.MagicMock(name='config mock')
self.cfg_mock.library_path = '/dev/null' self.cfg_mock.library_path = '/dev/null'
self.cfg_mock.login_passphrase = 'foobar' self.cfg_mock.login_passphrase = 'foobar'
self.cfg_mock.rw_session = False self.cfg_mock.rw_session = False
self.cfg_mock.slot_id = 1 self.cfg_mock.slot_id = 1
self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC' self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC'
self.cfg_mock.hmac_keywrap_mechanism = 'CKM_SHA256_HMAC' self.cfg_mock.hmac_keywrap_mechanism = 'CKM_SHA256_HMAC'
self.token_mock = mock.MagicMock()
self.token_mock.label = b'myLabel'
self.token_mock.serial_number = b'111111'
self.pkcs11 = pkcs11.PKCS11( self.pkcs11 = pkcs11.PKCS11(
self.cfg_mock.library_path, self.cfg_mock.login_passphrase, self.cfg_mock.library_path, self.cfg_mock.login_passphrase,
self.cfg_mock.rw_session, self.cfg_mock.slot_id, self.cfg_mock.rw_session, self.cfg_mock.slot_id,
self.cfg_mock.encryption_mechanism, self.cfg_mock.encryption_mechanism,
ffi=self.ffi, ffi=self.ffi,
hmac_keywrap_mechanism=self.cfg_mock.hmac_keywrap_mechanism hmac_keywrap_mechanism=self.cfg_mock.hmac_keywrap_mechanism
) )
def _generate_random(self, session, buf, length): def _generate_random(self, session, buf, length):
self.ffi.buffer(buf)[:] = b'0' * length self.ffi.buffer(buf)[:] = b'0' * length
return pkcs11.CKR_OK return pkcs11.CKR_OK
def _get_slot_list(self, token_present, slot_ids_ptr, slots_ptr):
# default to mocking only one slot (ID: 1)
if slot_ids_ptr is not self.ffi.NULL:
slot_ids_ptr[0] = 1
slots_ptr[0] = 1
return pkcs11.CKR_OK
def _get_token_info(self, id, token_info_ptr):
token_info_ptr.serialNumber = self.token_mock.serial_number
token_info_ptr.label = self.token_mock.label
return pkcs11.CKR_OK
def _get_two_slot_list(self, token_present, slot_ids_ptr, slots_ptr):
# mock two slots (IDs: 1, 2)
if slot_ids_ptr is not self.ffi.NULL:
slot_ids_ptr[0] = 1
slot_ids_ptr[1] = 2
slots_ptr[0] = 2
return pkcs11.CKR_OK
def _get_two_token_info_same_label(self, id, token_info_ptr):
token_info_ptr.serialNumber = (str(id) * 6).encode('UTF-8')
token_info_ptr.label = self.token_mock.label
return pkcs11.CKR_OK
def _get_session_public(self, session, session_info_ptr): def _get_session_public(self, session, session_info_ptr):
if self.cfg_mock.rw_session: if self.cfg_mock.rw_session:
session_info_ptr[0].state = pkcs11.CKS_RW_PUBLIC_SESSION session_info_ptr[0].state = pkcs11.CKS_RW_PUBLIC_SESSION
else: else:
session_info_ptr[0].state = pkcs11.CKS_RO_PUBLIC_SESSION session_info_ptr[0].state = pkcs11.CKS_RO_PUBLIC_SESSION
return pkcs11.CKR_OK return pkcs11.CKR_OK
def _get_session_user(self, session, session_info_ptr): def _get_session_user(self, session, session_info_ptr):
if self.cfg_mock.rw_session: if self.cfg_mock.rw_session:
session_info_ptr[0].state = pkcs11.CKS_RW_USER_FUNCTIONS session_info_ptr[0].state = pkcs11.CKS_RW_USER_FUNCTIONS
skipping to change at line 144 skipping to change at line 175
def _sign(self, *args, **kwargs): def _sign(self, *args, **kwargs):
buf = args[3] buf = args[3]
buf_len = args[4] buf_len = args[4]
self.ffi.buffer(buf)[:] = b'0' * buf_len[0] self.ffi.buffer(buf)[:] = b'0' * buf_len[0]
return pkcs11.CKR_OK return pkcs11.CKR_OK
def _verify(self, *args, **kwargs): def _verify(self, *args, **kwargs):
return pkcs11.CKR_OK return pkcs11.CKR_OK
def test_get_slot_id_from_serial_number(self):
slot_id = self.pkcs11._get_slot_id('111111', None, 2)
self.assertEqual(1, slot_id)
def test_get_slot_id_from_label(self):
slot_id = self.pkcs11._get_slot_id(None, ['myLabel'], 2)
self.assertEqual(1, slot_id)
def test_get_slot_id_backwards_compatibility(self):
slot_id = self.pkcs11._get_slot_id(None, None, 5)
self.assertEqual(5, slot_id)
def test_get_slot_id_from_serial_ignores_label(self):
slot_id = self.pkcs11._get_slot_id('111111', ['badLabel'], 2)
self.assertEqual(1, slot_id)
def test_get_slot_id_from_serial_ignores_given_slot(self):
slot_id = self.pkcs11._get_slot_id('111111', None, 3)
self.assertEqual(1, slot_id)
def test_get_slot_id_from_label_ignores_given_slot(self):
slot_id = self.pkcs11._get_slot_id(None, ['myLabel'], 3)
self.assertEqual(1, slot_id)
def test_get_slot_id_serial_not_found(self):
self.assertRaises(ValueError,
self.pkcs11._get_slot_id, '222222', None, 1)
def test_get_slot_id_label_not_found(self):
self.assertRaises(ValueError,
self.pkcs11._get_slot_id, None, ['myLabelbad'], 1)
def test_get_slot_id_two_tokens_same_label(self):
self.lib.C_GetSlotList.side_effect = self._get_two_slot_list
self.lib.C_GetTokenInfo.side_effect = \
self._get_two_token_info_same_label
slot_id = self.pkcs11._get_slot_id(None, ['myLabel'], 3)
self.assertEqual(1, slot_id)
def test_public_get_session(self): def test_public_get_session(self):
self.lib.C_GetSessionInfo.side_effect = self._get_session_public self.lib.C_GetSessionInfo.side_effect = self._get_session_public
sess = self.pkcs11.get_session() sess = self.pkcs11.get_session()
self.assertEqual(1, sess) self.assertEqual(1, sess)
self.assertEqual(2, self.lib.C_OpenSession.call_count) self.assertEqual(2, self.lib.C_OpenSession.call_count)
self.assertEqual(2, self.lib.C_GetSessionInfo.call_count) self.assertEqual(2, self.lib.C_GetSessionInfo.call_count)
self.assertEqual(1, self.lib.C_Login.call_count) self.assertEqual(1, self.lib.C_Login.call_count)
self.assertEqual(1, self.lib.C_CloseSession.call_count) self.assertEqual(1, self.lib.C_CloseSession.call_count)
 End of changes. 4 change blocks. 
0 lines changed or deleted 70 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)