"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/tests/plugin/crypto/test_pkcs11.py" (14 Apr 2021, 18873 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_pkcs11.py": 11.0.0_vs_12.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License");
    2 # you may not use this file except in compliance with the License.
    3 # You may obtain a copy of the License at
    4 #
    5 #    http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS,
    9 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   10 # implied.
   11 # See the License for the specific language governing permissions and
   12 # limitations under the License.
   13 
   14 from unittest import mock
   15 
   16 
   17 from barbican.common import exception
   18 from barbican.plugin.crypto import pkcs11
   19 from barbican.tests import utils
   20 
   21 
   22 class WhenTestingPKCS11(utils.BaseTestCase):
   23 
   24     def setUp(self):
   25         super(WhenTestingPKCS11, self).setUp()
   26 
   27         self.lib = mock.Mock()
   28         self.lib.C_Initialize.return_value = pkcs11.CKR_OK
   29         self.lib.C_Finalize.return_value = pkcs11.CKR_OK
   30         self.lib.C_GetSlotList.side_effect = self._get_slot_list
   31         self.lib.C_GetTokenInfo.side_effect = self._get_token_info
   32         self.lib.C_OpenSession.side_effect = self._open_session
   33         self.lib.C_CloseSession.return_value = pkcs11.CKR_OK
   34         self.lib.C_GetSessionInfo.side_effect = self._get_session_user
   35         self.lib.C_Login.return_value = pkcs11.CKR_OK
   36         self.lib.C_FindObjectsInit.return_value = pkcs11.CKR_OK
   37         self.lib.C_FindObjects.side_effect = self._find_objects_one
   38         self.lib.C_FindObjectsFinal.return_value = pkcs11.CKR_OK
   39         self.lib.C_GenerateKey.side_effect = self._generate_key
   40         self.lib.C_GenerateRandom.side_effect = self._generate_random
   41         self.lib.C_SeedRandom.return_value = pkcs11.CKR_OK
   42         self.lib.C_EncryptInit.return_value = pkcs11.CKR_OK
   43         self.lib.C_Encrypt.side_effect = self._encrypt
   44         self.lib.C_DecryptInit.return_value = pkcs11.CKR_OK
   45         self.lib.C_Decrypt.side_effect = self._decrypt
   46         self.lib.C_WrapKey.side_effect = self._wrap_key
   47         self.lib.C_UnwrapKey.side_effect = self._unwrap_key
   48         self.lib.C_SignInit.return_value = pkcs11.CKR_OK
   49         self.lib.C_Sign.side_effect = self._sign
   50         self.lib.C_VerifyInit.return_value = pkcs11.CKR_OK
   51         self.lib.C_Verify.side_effect = self._verify
   52         self.lib.C_DestroyObject.return_value = pkcs11.CKR_OK
   53         self.ffi = pkcs11.build_ffi()
   54         setattr(self.ffi, 'dlopen', lambda x: self.lib)
   55 
   56         self.cfg_mock = mock.MagicMock(name='config mock')
   57         self.cfg_mock.library_path = '/dev/null'
   58         self.cfg_mock.login_passphrase = 'foobar'
   59         self.cfg_mock.rw_session = False
   60         self.cfg_mock.slot_id = 1
   61         self.cfg_mock.encryption_mechanism = 'CKM_AES_CBC'
   62         self.cfg_mock.hmac_keywrap_mechanism = 'CKM_SHA256_HMAC'
   63 
   64         self.token_mock = mock.MagicMock()
   65         self.token_mock.label = b'myLabel'
   66         self.token_mock.serial_number = b'111111'
   67 
   68         self.pkcs11 = pkcs11.PKCS11(
   69             self.cfg_mock.library_path, self.cfg_mock.login_passphrase,
   70             self.cfg_mock.rw_session, self.cfg_mock.slot_id,
   71             self.cfg_mock.encryption_mechanism,
   72             ffi=self.ffi,
   73             hmac_keywrap_mechanism=self.cfg_mock.hmac_keywrap_mechanism
   74         )
   75 
   76     def _generate_random(self, session, buf, length):
   77         self.ffi.buffer(buf)[:] = b'0' * length
   78         return pkcs11.CKR_OK
   79 
   80     def _get_slot_list(self, token_present, slot_ids_ptr, slots_ptr):
   81         # default to mocking only one slot (ID: 1)
   82         if slot_ids_ptr is not self.ffi.NULL:
   83             slot_ids_ptr[0] = 1
   84         slots_ptr[0] = 1
   85         return pkcs11.CKR_OK
   86 
   87     def _get_token_info(self, id, token_info_ptr):
   88         token_info_ptr.serialNumber = self.token_mock.serial_number
   89         token_info_ptr.label = self.token_mock.label
   90         return pkcs11.CKR_OK
   91 
   92     def _get_two_slot_list(self, token_present, slot_ids_ptr, slots_ptr):
   93         # mock two slots (IDs: 1, 2)
   94         if slot_ids_ptr is not self.ffi.NULL:
   95             slot_ids_ptr[0] = 1
   96             slot_ids_ptr[1] = 2
   97         slots_ptr[0] = 2
   98         return pkcs11.CKR_OK
   99 
  100     def _get_two_token_info_same_label(self, id, token_info_ptr):
  101         token_info_ptr.serialNumber = (str(id) * 6).encode('UTF-8')
  102         token_info_ptr.label = self.token_mock.label
  103         return pkcs11.CKR_OK
  104 
  105     def _get_session_public(self, session, session_info_ptr):
  106         if self.cfg_mock.rw_session:
  107             session_info_ptr[0].state = pkcs11.CKS_RW_PUBLIC_SESSION
  108         else:
  109             session_info_ptr[0].state = pkcs11.CKS_RO_PUBLIC_SESSION
  110         return pkcs11.CKR_OK
  111 
  112     def _get_session_user(self, session, session_info_ptr):
  113         if self.cfg_mock.rw_session:
  114             session_info_ptr[0].state = pkcs11.CKS_RW_USER_FUNCTIONS
  115         else:
  116             session_info_ptr[0].state = pkcs11.CKS_RO_USER_FUNCTIONS
  117         return pkcs11.CKR_OK
  118 
  119     def _open_session(self, *args, **kwargs):
  120         args[4][0] = int(1)
  121         return pkcs11.CKR_OK
  122 
  123     def _find_objects_one(self, session, obj_handle_ptr, max_count, count):
  124         obj_handle_ptr[0] = int(2)
  125         count[0] = 1
  126         return pkcs11.CKR_OK
  127 
  128     def _find_objects_two(self, session, obj_handle_ptr, max_count, count):
  129         obj_handle_ptr[0] = int(2)
  130         count[0] = 2
  131         return pkcs11.CKR_OK
  132 
  133     def _find_objects_zero(self, session, obj_handle_ptr, max_count, count):
  134         count[0] = 0
  135         return pkcs11.CKR_OK
  136 
  137     def _generate_key(self, session, mech, attributes, attributes_len,
  138                       obj_handle_ptr):
  139         obj_handle_ptr[0] = int(3)
  140         return pkcs11.CKR_OK
  141 
  142     def _encrypt(self, session, pt, pt_len, ct, ct_len):
  143         if self.pkcs11.generate_iv:
  144             self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * self.pkcs11.gcmtagsize
  145         else:
  146             self.ffi.buffer(ct)[:] = pt[::-1] + b'0' * (self.pkcs11.gcmtagsize
  147                                                         * 2)
  148         return pkcs11.CKR_OK
  149 
  150     def _decrypt(self, session, ct, ct_len, pt, pt_len):
  151         tmp = ct[:-self.pkcs11.gcmtagsize][::-1]
  152         self.ffi.buffer(pt)[:len(tmp)] = tmp
  153         return pkcs11.CKR_OK
  154 
  155     def _wrap_key(self, *args, **kwargs):
  156         wrapped_key = args[4]
  157         wrapped_key_len = args[5]
  158         wrapped_key_len[0] = int(16)
  159         if wrapped_key != self.ffi.NULL:
  160             self.ffi.buffer(wrapped_key)[:] = b'0' * 16
  161         return pkcs11.CKR_OK
  162 
  163     def _unwrap_key(self, *args, **kwargs):
  164         unwrapped_key = args[7]
  165         unwrapped_key[0] = int(1)
  166         return pkcs11.CKR_OK
  167 
  168     def _sign(self, *args, **kwargs):
  169         buf = args[3]
  170         buf_len = args[4]
  171         self.ffi.buffer(buf)[:] = b'0' * buf_len[0]
  172         return pkcs11.CKR_OK
  173 
  174     def _verify(self, *args, **kwargs):
  175         return pkcs11.CKR_OK
  176 
  177     def test_get_slot_id_from_serial_number(self):
  178         slot_id = self.pkcs11._get_slot_id('111111', None, 2)
  179         self.assertEqual(1, slot_id)
  180 
  181     def test_get_slot_id_from_label(self):
  182         slot_id = self.pkcs11._get_slot_id(None, ['myLabel'], 2)
  183         self.assertEqual(1, slot_id)
  184 
  185     def test_get_slot_id_backwards_compatibility(self):
  186         slot_id = self.pkcs11._get_slot_id(None, None, 5)
  187         self.assertEqual(5, slot_id)
  188 
  189     def test_get_slot_id_from_serial_ignores_label(self):
  190         slot_id = self.pkcs11._get_slot_id('111111', ['badLabel'], 2)
  191         self.assertEqual(1, slot_id)
  192 
  193     def test_get_slot_id_from_serial_ignores_given_slot(self):
  194         slot_id = self.pkcs11._get_slot_id('111111', None, 3)
  195         self.assertEqual(1, slot_id)
  196 
  197     def test_get_slot_id_from_label_ignores_given_slot(self):
  198         slot_id = self.pkcs11._get_slot_id(None, ['myLabel'], 3)
  199         self.assertEqual(1, slot_id)
  200 
  201     def test_get_slot_id_serial_not_found(self):
  202         self.assertRaises(ValueError,
  203                           self.pkcs11._get_slot_id, '222222', None, 1)
  204 
  205     def test_get_slot_id_label_not_found(self):
  206         self.assertRaises(ValueError,
  207                           self.pkcs11._get_slot_id, None, ['myLabelbad'], 1)
  208 
  209     def test_get_slot_id_two_tokens_same_label(self):
  210         self.lib.C_GetSlotList.side_effect = self._get_two_slot_list
  211         self.lib.C_GetTokenInfo.side_effect = \
  212             self._get_two_token_info_same_label
  213         slot_id = self.pkcs11._get_slot_id(None, ['myLabel'], 3)
  214         self.assertEqual(1, slot_id)
  215 
  216     def test_public_get_session(self):
  217         self.lib.C_GetSessionInfo.side_effect = self._get_session_public
  218         sess = self.pkcs11.get_session()
  219 
  220         self.assertEqual(1, sess)
  221 
  222         self.assertEqual(2, self.lib.C_OpenSession.call_count)
  223         self.assertEqual(2, self.lib.C_GetSessionInfo.call_count)
  224         self.assertEqual(1, self.lib.C_Login.call_count)
  225         self.assertEqual(1, self.lib.C_CloseSession.call_count)
  226 
  227     def test_user_get_session(self):
  228         self.pkcs11.get_session()
  229 
  230         self.assertEqual(2, self.lib.C_OpenSession.call_count)
  231         self.assertEqual(2, self.lib.C_GetSessionInfo.call_count)
  232         self.assertEqual(0, self.lib.C_Login.call_count)
  233 
  234     def test_seed_random(self):
  235         rd = "random-data"
  236         session = 'session'
  237         self.pkcs11._seed_random(session, rd)
  238         self.lib.C_SeedRandom.assert_called_once_with(
  239             session, mock.ANY, len(rd))
  240 
  241     def test_generate_random(self):
  242         r = self.pkcs11.generate_random(32, mock.MagicMock())
  243 
  244         self.assertEqual(b'0' * 32, r)
  245 
  246         self.assertEqual(2, self.lib.C_GenerateRandom.call_count)
  247 
  248     def test_rng_self_test_fail(self):
  249         def _bad_generate_random(session, buf, length):
  250             self.ffi.buffer(buf)[:] = b'\x00' * length
  251             return pkcs11.CKR_OK
  252         self.lib.C_GenerateRandom.side_effect = _bad_generate_random
  253         self.assertRaises(exception.P11CryptoPluginException,
  254                           self.pkcs11._rng_self_test, mock.MagicMock())
  255 
  256     def test_get_key_handle_one_key(self):
  257         key = self.pkcs11.get_key_handle('CKK_AES', 'foo', mock.MagicMock())
  258 
  259         self.assertEqual(2, key)
  260 
  261         self.assertEqual(1, self.lib.C_FindObjectsInit.call_count)
  262         self.assertEqual(1, self.lib.C_FindObjects.call_count)
  263         self.assertEqual(1, self.lib.C_FindObjectsFinal.call_count)
  264 
  265     def test_get_key_handle_no_keys(self):
  266         self.lib.C_FindObjects.side_effect = self._find_objects_zero
  267         key = self.pkcs11.get_key_handle('CKK_AES', 'foo', mock.MagicMock())
  268 
  269         self.assertIsNone(key)
  270 
  271         self.assertEqual(1, self.lib.C_FindObjectsInit.call_count)
  272         self.assertEqual(1, self.lib.C_FindObjects.call_count)
  273         self.assertEqual(1, self.lib.C_FindObjectsFinal.call_count)
  274 
  275     def test_get_key_handle_multiple_keys(self):
  276         self.lib.C_FindObjects.side_effect = self._find_objects_two
  277 
  278         self.assertRaises(exception.P11CryptoPluginKeyException,
  279                           self.pkcs11.get_key_handle, 'CKK_AES', 'foo',
  280                           mock.MagicMock())
  281 
  282         self.assertEqual(1, self.lib.C_FindObjectsInit.call_count)
  283         self.assertEqual(1, self.lib.C_FindObjects.call_count)
  284         self.assertEqual(1, self.lib.C_FindObjectsFinal.call_count)
  285 
  286     def test_generate_session_key(self):
  287         key = self.pkcs11.generate_key('CKK_AES', 16, 'CKM_AES_KEY_GEN',
  288                                        mock.MagicMock(), encrypt=True)
  289 
  290         self.assertEqual(3, key)
  291 
  292         self.assertEqual(1, self.lib.C_GenerateKey.call_count)
  293 
  294     def test_generate_master_key(self):
  295         key = self.pkcs11.generate_key('CKK_AES', 16, 'CKM_AES_KEY_GEN',
  296                                        mock.MagicMock(), key_label='key',
  297                                        encrypt=True, master_key=True)
  298 
  299         self.assertEqual(3, key)
  300 
  301         self.assertEqual(1, self.lib.C_GenerateKey.call_count)
  302 
  303     def test_generate_key_no_flags(self):
  304         self.assertRaises(exception.P11CryptoPluginException,
  305                           self.pkcs11.generate_key, 'CKK_AES', 16,
  306                           mock.MagicMock(), mock.MagicMock())
  307 
  308     def test_generate_master_key_no_label(self):
  309         self.assertRaises(ValueError, self.pkcs11.generate_key,
  310                           'CKK_AES', 16,
  311                           mock.MagicMock(), mock.MagicMock(),
  312                           encrypt=True, master_key=True)
  313 
  314     def test_encrypt_with_no_iv_generation(self):
  315         pt = b'0123456789ABCDEF'
  316         self.pkcs11.generate_iv = False
  317         ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
  318             mock.MagicMock(),
  319             pt, mock.MagicMock()
  320         )
  321 
  322         self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
  323         self.assertGreater(len(ct['iv']), 0)
  324 
  325         self.assertEqual(1, self.lib.C_GenerateRandom.call_count)
  326         self.assertEqual(1, self.lib.C_EncryptInit.call_count)
  327         self.assertEqual(1, self.lib.C_Encrypt.call_count)
  328 
  329     def test_encrypt_with_iv_generation(self):
  330         pt = b'0123456789ABCDEF'
  331         self.pkcs11.generate_iv = True
  332         ct = self.pkcs11._VENDOR_SAFENET_CKM_AES_GCM_encrypt(
  333             mock.MagicMock(), pt, mock.MagicMock()
  334         )
  335 
  336         self.assertEqual(ct['ct'][:len(pt)], pt[::-1])
  337         self.assertGreater(len(ct['iv']), 0)
  338 
  339         self.assertEqual(2, self.lib.C_GenerateRandom.call_count)
  340         self.assertEqual(1, self.lib.C_EncryptInit.call_count)
  341         self.assertEqual(1, self.lib.C_Encrypt.call_count)
  342 
  343     def test_decrypt(self):
  344         ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
  345         iv = b'0' * self.pkcs11.noncesize
  346         pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
  347                                  mock.MagicMock(), iv, ct, mock.MagicMock())
  348 
  349         pt_len = len(ct) - self.pkcs11.gcmtagsize
  350         self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
  351 
  352         self.assertEqual(1, self.lib.C_DecryptInit.call_count)
  353         self.assertEqual(1, self.lib.C_Decrypt.call_count)
  354 
  355     def test_decrypt_with_pad(self):
  356         ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
  357         iv = b'0' * self.pkcs11.blocksize
  358         pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
  359                                  mock.MagicMock(), iv, ct, mock.MagicMock())
  360 
  361         pt_len = len(ct) - self.pkcs11.gcmtagsize - 3
  362         self.assertEqual(pt[:pt_len], ct[3:-self.pkcs11.gcmtagsize][::-1])
  363 
  364         self.assertEqual(1, self.lib.C_DecryptInit.call_count)
  365         self.assertEqual(1, self.lib.C_Decrypt.call_count)
  366 
  367     def test_decrypt_with_pad_new_iv(self):
  368         ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
  369         iv = b'0' * self.pkcs11.gcmtagsize
  370         pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
  371                                  mock.MagicMock(), iv, ct, mock.MagicMock())
  372 
  373         pt_len = len(ct) - self.pkcs11.gcmtagsize
  374         self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
  375 
  376         self.assertEqual(1, self.lib.C_DecryptInit.call_count)
  377         self.assertEqual(1, self.lib.C_Decrypt.call_count)
  378 
  379     def test_decrypt_with_pad_wrong_size(self):
  380         ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
  381         iv = b'0' * self.pkcs11.blocksize
  382         pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
  383                                  mock.MagicMock(), iv, ct, mock.MagicMock())
  384 
  385         pt_len = len(ct) - self.pkcs11.gcmtagsize
  386         self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
  387 
  388         self.assertEqual(1, self.lib.C_DecryptInit.call_count)
  389         self.assertEqual(1, self.lib.C_Decrypt.call_count)
  390 
  391     def test_decrypt_with_pad_wrong_length(self):
  392         ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
  393         iv = b'0' * self.pkcs11.blocksize
  394         pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
  395                                  mock.MagicMock(), iv, ct, mock.MagicMock())
  396 
  397         pt_len = len(ct) - self.pkcs11.gcmtagsize
  398         self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
  399 
  400         self.assertEqual(1, self.lib.C_DecryptInit.call_count)
  401         self.assertEqual(1, self.lib.C_Decrypt.call_count)
  402 
  403     def test_decrypt_with_too_large_pad(self):
  404         ct = b'c2VjcmV0a2V5BwcHBwcHBw=='
  405         iv = b'0' * self.pkcs11.blocksize
  406         pt = self.pkcs11.decrypt('VENDOR_SAFENET_CKM_AES_GCM',
  407                                  mock.MagicMock(), iv, ct, mock.MagicMock())
  408 
  409         pt_len = len(ct) - self.pkcs11.gcmtagsize
  410         self.assertEqual(pt[:pt_len], ct[:-self.pkcs11.gcmtagsize][::-1])
  411 
  412         self.assertEqual(1, self.lib.C_DecryptInit.call_count)
  413         self.assertEqual(1, self.lib.C_Decrypt.call_count)
  414 
  415     def test_wrap_key(self):
  416         wkek = self.pkcs11.wrap_key(mock.Mock(), mock.Mock(), mock.Mock())
  417         self.assertGreater(len(wkek['iv']), 0)
  418         self.assertEqual(b'0' * 16, wkek['wrapped_key'])
  419 
  420         self.assertEqual(2, self.lib.C_GenerateRandom.call_count)
  421         self.assertEqual(2, self.lib.C_WrapKey.call_count)
  422 
  423     def test_unwrap_key(self):
  424         kek = self.pkcs11.unwrap_key(mock.Mock(), b'0' * 16,
  425                                      b'0' * 16, mock.Mock())
  426         self.assertEqual(1, kek)
  427 
  428         self.assertEqual(self.lib.C_UnwrapKey.call_count, 1)
  429 
  430     def test_compute_hmac(self):
  431         buf = self.pkcs11.compute_hmac(mock.MagicMock(), mock.MagicMock(),
  432                                        mock.MagicMock())
  433         self.assertEqual(32, len(buf))
  434 
  435         self.assertEqual(1, self.lib.C_SignInit.call_count)
  436         self.assertEqual(1, self.lib.C_Sign.call_count)
  437 
  438     def test_verify_hmac(self):
  439         self.pkcs11.verify_hmac(mock.MagicMock(), mock.MagicMock(),
  440                                 mock.MagicMock(), mock.MagicMock())
  441 
  442         self.assertEqual(1, self.lib.C_VerifyInit.call_count)
  443         self.assertEqual(1, self.lib.C_Verify.call_count)
  444 
  445     def test_destroy_object(self):
  446         self.pkcs11.destroy_object(mock.MagicMock(), mock.MagicMock())
  447 
  448         self.assertEqual(1, self.lib.C_DestroyObject.call_count)
  449 
  450     def test_invalid_build_attributes(self):
  451         self.assertRaises(TypeError, self.pkcs11._build_attributes,
  452                           [pkcs11.Attribute(pkcs11.CKA_CLASS, {})])
  453 
  454     def test_finalize(self):
  455         self.pkcs11.finalize()
  456 
  457         self.assertEqual(1, self.lib.C_Finalize.call_count)
  458 
  459     def test_check_error(self):
  460         self.assertIsNone(self.pkcs11._check_error(pkcs11.CKR_OK))
  461 
  462     def test_check_error_with_without_specific_handling(self):
  463         self.assertRaises(exception.P11CryptoPluginException,
  464                           self.pkcs11._check_error, 5)
  465 
  466     def test_check_error_with_token_error(self):
  467         self.assertRaises(exception.P11CryptoTokenException,
  468                           self.pkcs11._check_error, 0xe0)
  469 
  470     def test_converting_unicode_to_bytes(self):
  471         self.assertEqual(b'foo', pkcs11._to_bytes(u'foo'))
  472 
  473     def test_converting_default_str_type_to_bytes(self):
  474         self.assertEqual(b'foo', pkcs11._to_bytes('foo'))