"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/tests/plugin/crypto/test_p11_crypto.py" (14 Apr 2021, 15404 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_p11_crypto.py": 11.0.0_vs_12.0.0.

    1 # Copyright (c) 2013-2014 Rackspace, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 from unittest import mock
   17 
   18 import six
   19 
   20 from barbican.common import exception as ex
   21 from barbican.model import models
   22 from barbican.plugin.crypto import base as plugin_import
   23 from barbican.plugin.crypto import p11_crypto
   24 from barbican.plugin.crypto import pkcs11
   25 from barbican.tests import utils
   26 
   27 
   28 def generate_random_effect(length, session):
   29     return b'0' * length
   30 
   31 
   32 class WhenTestingP11CryptoPlugin(utils.BaseTestCase):
   33 
   34     def setUp(self):
   35         super(WhenTestingP11CryptoPlugin, self).setUp()
   36 
   37         self.pkcs11 = mock.Mock()
   38         self.pkcs11.get_session.return_value = int(1)
   39         self.pkcs11.return_session.return_value = None
   40         self.pkcs11.generate_random.side_effect = generate_random_effect
   41         self.pkcs11.get_key_handle.return_value = int(2)
   42         self.pkcs11.encrypt.return_value = {'iv': b'0', 'ct': b'0'}
   43         self.pkcs11.decrypt.return_value = b'0'
   44         self.pkcs11.generate_key.return_value = int(3)
   45         self.pkcs11.wrap_key.return_value = {'iv': b'1', 'wrapped_key': b'1'}
   46         self.pkcs11.unwrap_key.return_value = int(4)
   47         self.pkcs11.compute_hmac.return_value = b'1'
   48         self.pkcs11.verify_hmac.return_value = None
   49         self.pkcs11.destroy_object.return_value = None
   50         self.pkcs11.finalize.return_value = None
   51 
   52         self.cfg_mock = mock.MagicMock(name='config mock')
   53         self.cfg_mock.p11_crypto_plugin.mkek_label = 'mkek_label'
   54         self.cfg_mock.p11_crypto_plugin.hmac_label = 'hmac_label'
   55         self.cfg_mock.p11_crypto_plugin.mkek_length = 32
   56         self.cfg_mock.p11_crypto_plugin.slot_id = 1
   57         self.cfg_mock.p11_crypto_plugin.token_serial_number = None
   58         self.cfg_mock.p11_crypto_plugin.token_label = None
   59         self.cfg_mock.p11_crypto_plugin.token_labels = None
   60         self.cfg_mock.p11_crypto_plugin.rw_session = True
   61         self.cfg_mock.p11_crypto_plugin.pkek_length = 32
   62         self.cfg_mock.p11_crypto_plugin.pkek_cache_ttl = 900
   63         self.cfg_mock.p11_crypto_plugin.pkek_cache_limit = 10
   64         self.cfg_mock.p11_crypto_plugin.encryption_mechanism = 'CKM_AES_CBC'
   65         self.cfg_mock.p11_crypto_plugin.seed_file = ''
   66         self.cfg_mock.p11_crypto_plugin.seed_length = 32
   67         self.cfg_mock.p11_crypto_plugin.hmac_keywrap_mechanism = \
   68             'CKM_SHA256_HMAC'
   69 
   70         self.plugin_name = 'Test PKCS11 plugin'
   71         self.cfg_mock.p11_crypto_plugin.plugin_name = self.plugin_name
   72 
   73         self.plugin = p11_crypto.P11CryptoPlugin(
   74             conf=self.cfg_mock,
   75             pkcs11=self.pkcs11
   76         )
   77 
   78     def test_invalid_library_path(self):
   79         cfg = self.cfg_mock.p11_crypto_plugin
   80         cfg.library_path = None
   81         self.assertRaises(ValueError, p11_crypto.P11CryptoPlugin,
   82                           conf=self.cfg_mock, pkcs11=self.pkcs11)
   83 
   84     def test_bind_kek_metadata_without_existing_key(self):
   85         kek_datum = models.KEKDatum()
   86         dto = plugin_import.KEKMetaDTO(kek_datum)
   87         dto = self.plugin.bind_kek_metadata(dto)
   88 
   89         self.assertEqual('AES', dto.algorithm)
   90         self.assertEqual(256, dto.bit_length)
   91         self.assertEqual('CBC', dto.mode)
   92 
   93         self.assertEqual(2, self.pkcs11.get_key_handle.call_count)
   94         self.assertEqual(1, self.pkcs11.generate_key.call_count)
   95         self.assertEqual(1, self.pkcs11.wrap_key.call_count)
   96         self.assertEqual(1, self.pkcs11.compute_hmac.call_count)
   97 
   98     def test_bind_kek_metadata_with_existing_key(self):
   99         kek_datum = models.KEKDatum()
  100         dto = plugin_import.KEKMetaDTO(kek_datum)
  101         dto.plugin_meta = '{}'
  102         dto = self.plugin.bind_kek_metadata(dto)
  103 
  104         self.assertEqual(0, self.pkcs11.generate_key.call_count)
  105         self.assertEqual(0, self.pkcs11.wrap_key.call_count)
  106         self.assertEqual(0, self.pkcs11.compute_hmac.call_count)
  107 
  108     def test_encrypt(self):
  109         payload = b'test payload'
  110         encrypt_dto = plugin_import.EncryptDTO(payload)
  111         kek_meta = mock.MagicMock()
  112         kek_meta.kek_label = 'pkek'
  113         kek_meta.plugin_meta = ('{"iv": "iv==",'
  114                                 '"hmac": "hmac",'
  115                                 '"wrapped_key": "wrappedkey==",'
  116                                 '"mkek_label": "mkek_label",'
  117                                 '"hmac_label": "hmac_label"}')
  118         response_dto = self.plugin.encrypt(encrypt_dto,
  119                                            kek_meta,
  120                                            mock.MagicMock())
  121 
  122         self.assertEqual(b'0', response_dto.cypher_text)
  123         self.assertIn('iv', response_dto.kek_meta_extended)
  124 
  125         self.assertEqual(2, self.pkcs11.get_key_handle.call_count)
  126         self.assertEqual(2, self.pkcs11.get_session.call_count)
  127         self.assertEqual(1, self.pkcs11.verify_hmac.call_count)
  128         self.assertEqual(1, self.pkcs11.unwrap_key.call_count)
  129         self.assertEqual(1, self.pkcs11.encrypt.call_count)
  130         self.assertEqual(1, self.pkcs11.return_session.call_count)
  131 
  132     def test_encrypt_bad_session(self):
  133         self.pkcs11.get_session.return_value = mock.DEFAULT
  134         self.pkcs11.get_session.side_effect = ex.P11CryptoPluginException(
  135             'Testing error handling'
  136         )
  137         payload = b'test payload'
  138         encrypt_dto = plugin_import.EncryptDTO(payload)
  139         kek_meta = mock.MagicMock()
  140         kek_meta.kek_label = 'pkek'
  141         kek_meta.plugin_meta = ('{"iv": "iv==",'
  142                                 '"hmac": "hmac",'
  143                                 '"wrapped_key": "wrappedkey==",'
  144                                 '"mkek_label": "mkek_label",'
  145                                 '"hmac_label": "hmac_label"}')
  146         self.assertRaises(ex.P11CryptoPluginException,
  147                           self.plugin._encrypt,
  148                           encrypt_dto,
  149                           kek_meta,
  150                           mock.MagicMock())
  151 
  152         self.assertEqual(2, self.pkcs11.get_key_handle.call_count)
  153         self.assertEqual(2, self.pkcs11.get_session.call_count)
  154         self.assertEqual(1, self.pkcs11.verify_hmac.call_count)
  155         self.assertEqual(1, self.pkcs11.unwrap_key.call_count)
  156         self.assertEqual(0, self.pkcs11.encrypt.call_count)
  157         self.assertEqual(0, self.pkcs11.return_session.call_count)
  158 
  159     def test_decrypt(self):
  160         ct = b'ctct'
  161         kek_meta_extended = '{"iv":"AAAA","mechanism":"CKM_AES_CBC"}'
  162         decrypt_dto = plugin_import.DecryptDTO(ct)
  163         kek_meta = mock.MagicMock()
  164         kek_meta.kek_label = 'pkek'
  165         kek_meta.plugin_meta = ('{"iv": "iv==",'
  166                                 '"hmac": "hmac",'
  167                                 '"wrapped_key": "c2VjcmV0a2V5BwcHBwcHBw==",'
  168                                 '"mkek_label": "mkek_label",'
  169                                 '"hmac_label": "hmac_label"}')
  170         pt = self.plugin.decrypt(decrypt_dto,
  171                                  kek_meta,
  172                                  kek_meta_extended,
  173                                  mock.MagicMock())
  174 
  175         self.assertEqual(b'0', pt)
  176 
  177         self.assertEqual(2, self.pkcs11.get_key_handle.call_count)
  178         self.assertEqual(2, self.pkcs11.get_session.call_count)
  179         self.assertEqual(1, self.pkcs11.verify_hmac.call_count)
  180         self.assertEqual(1, self.pkcs11.unwrap_key.call_count)
  181         self.assertEqual(1, self.pkcs11.decrypt.call_count)
  182         self.assertEqual(1, self.pkcs11.return_session.call_count)
  183 
  184     def test_decrypt_bad_session(self):
  185         self.pkcs11.get_session.return_value = mock.DEFAULT
  186         self.pkcs11.get_session.side_effect = ex.P11CryptoPluginException(
  187             'Testing error handling'
  188         )
  189         ct = b'ctct'
  190         kek_meta_extended = '{"iv":"AAAA","mechanism":"CKM_AES_CBC"}'
  191         decrypt_dto = plugin_import.DecryptDTO(ct)
  192         kek_meta = mock.MagicMock()
  193         kek_meta.kek_label = 'pkek'
  194         kek_meta.plugin_meta = ('{"iv": "iv==",'
  195                                 '"hmac": "hmac",'
  196                                 '"wrapped_key": "wrappedkey==",'
  197                                 '"mkek_label": "mkek_label",'
  198                                 '"hmac_label": "hmac_label"}')
  199         self.assertRaises(ex.P11CryptoPluginException,
  200                           self.plugin._decrypt,
  201                           decrypt_dto,
  202                           kek_meta,
  203                           kek_meta_extended,
  204                           mock.MagicMock())
  205 
  206         self.assertEqual(2, self.pkcs11.get_key_handle.call_count)
  207         self.assertEqual(2, self.pkcs11.get_session.call_count)
  208         self.assertEqual(1, self.pkcs11.verify_hmac.call_count)
  209         self.assertEqual(1, self.pkcs11.unwrap_key.call_count)
  210         self.assertEqual(0, self.pkcs11.decrypt.call_count)
  211         self.assertEqual(0, self.pkcs11.return_session.call_count)
  212 
  213     def test_generate_symmetric(self):
  214         secret = models.Secret()
  215         secret.bit_length = 128
  216         secret.algorithm = 'AES'
  217         generate_dto = plugin_import.GenerateDTO(
  218             secret.algorithm,
  219             secret.bit_length,
  220             None, None)
  221         kek_meta = mock.MagicMock()
  222         kek_meta.kek_label = 'pkek'
  223         kek_meta.plugin_meta = ('{"iv": "iv==",'
  224                                 '"hmac": "hmac",'
  225                                 '"wrapped_key": "wrappedkey==",'
  226                                 '"mkek_label": "mkek_label",'
  227                                 '"hmac_label": "hmac_label"}')
  228         response_dto = self.plugin.generate_symmetric(generate_dto,
  229                                                       kek_meta,
  230                                                       mock.MagicMock())
  231 
  232         self.assertEqual(b'0', response_dto.cypher_text)
  233         self.assertIn('iv', response_dto.kek_meta_extended)
  234 
  235         self.assertEqual(2, self.pkcs11.get_key_handle.call_count)
  236         self.assertEqual(2, self.pkcs11.get_session.call_count)
  237         self.assertEqual(1, self.pkcs11.generate_random.call_count)
  238         self.assertEqual(1, self.pkcs11.verify_hmac.call_count)
  239         self.assertEqual(1, self.pkcs11.unwrap_key.call_count)
  240         self.assertEqual(1, self.pkcs11.encrypt.call_count)
  241         self.assertEqual(1, self.pkcs11.return_session.call_count)
  242 
  243     def test_generate_asymmetric_raises_error(self):
  244         self.assertRaises(NotImplementedError,
  245                           self.plugin.generate_asymmetric,
  246                           mock.MagicMock(),
  247                           mock.MagicMock(),
  248                           mock.MagicMock())
  249 
  250     def test_supports_encrypt_decrypt(self):
  251         self.assertTrue(
  252             self.plugin.supports(
  253                 plugin_import.PluginSupportTypes.ENCRYPT_DECRYPT
  254             )
  255         )
  256 
  257     def test_supports_symmetric_key_generation(self):
  258         self.assertTrue(
  259             self.plugin.supports(
  260                 plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION
  261             )
  262         )
  263 
  264     def test_does_not_supports_asymmetric_key_generation(self):
  265         self.assertFalse(
  266             self.plugin.supports(
  267                 plugin_import.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION
  268             )
  269         )
  270 
  271     def test_does_not_support_unknown_type(self):
  272         self.assertFalse(
  273             self.plugin.supports('SOMETHING_RANDOM')
  274         )
  275 
  276     def test_missing_mkek(self):
  277         self.pkcs11.get_key_handle.return_value = None
  278         self.assertRaises(ex.P11CryptoKeyHandleException,
  279                           self.plugin._get_master_key,
  280                           self.plugin.mkek_key_type,
  281                           'bad_key_label')
  282 
  283     def test_cached_kek_expired(self):
  284         self.plugin.pkek_cache['expired_kek'] = p11_crypto.CachedKEK(4, 0)
  285         self.assertIsNone(self.plugin._pkek_cache_get('expired_kek'))
  286 
  287     def test_create_pkcs11(self):
  288         def _generate_random(session, buf, length):
  289             ffi.buffer(buf)[:] = b'0' * length
  290             return pkcs11.CKR_OK
  291         lib = mock.Mock()
  292         lib.C_Initialize.return_value = pkcs11.CKR_OK
  293         lib.C_GetSlotList.return_value = pkcs11.CKR_OK
  294         lib.C_GetTokenInfo.return_value = pkcs11.CKR_OK
  295         lib.C_OpenSession.return_value = pkcs11.CKR_OK
  296         lib.C_CloseSession.return_value = pkcs11.CKR_OK
  297         lib.C_GetSessionInfo.return_value = pkcs11.CKR_OK
  298         lib.C_Login.return_value = pkcs11.CKR_OK
  299         lib.C_GenerateRandom.side_effect = _generate_random
  300         lib.C_SeedRandom.return_value = pkcs11.CKR_OK
  301         ffi = pkcs11.build_ffi()
  302         setattr(ffi, 'dlopen', lambda x: lib)
  303 
  304         p11 = self.plugin._create_pkcs11(ffi)
  305         self.assertIsInstance(p11, pkcs11.PKCS11)
  306 
  307         # test for when plugin_conf.seed_file is not None
  308         self.plugin.seed_file = 'seed_file'
  309         d = '01234567' * 4
  310         mo = mock.mock_open(read_data=d)
  311 
  312         with mock.patch(six.moves.builtins.__name__ + '.open',
  313                         mo,
  314                         create=True):
  315             p11 = self.plugin._create_pkcs11(ffi)
  316 
  317         self.assertIsInstance(p11, pkcs11.PKCS11)
  318         mo.assert_called_once_with('seed_file', 'rb')
  319         calls = [mock.call('seed_file', 'rb'),
  320                  mock.call().__enter__(),
  321                  mock.call().read(32),
  322                  mock.call().__exit__(None, None, None)]
  323         self.assertEqual(mo.mock_calls, calls)
  324         lib.C_SeedRandom.assert_called_once_with(mock.ANY, mock.ANY, 32)
  325         self.cfg_mock.p11_crypto_plugin.seed_file = ''
  326 
  327     def test_call_pkcs11_with_token_error(self):
  328         self.plugin._encrypt = mock.Mock()
  329         self.plugin._encrypt.side_effect = [ex.P11CryptoTokenException(
  330             'Testing error handling'
  331         ),
  332             'test payload']
  333         self.plugin._reinitialize_pkcs11 = mock.Mock()
  334         self.plugin._reinitialize_pkcs11.return_value = mock.DEFAULT
  335 
  336         self.plugin.encrypt(mock.MagicMock(), mock.MagicMock(),
  337                             mock.MagicMock())
  338 
  339         self.assertEqual(2, self.pkcs11.get_key_handle.call_count)
  340         self.assertEqual(1, self.pkcs11.get_session.call_count)
  341         self.assertEqual(0, self.pkcs11.return_session.call_count)
  342         self.assertEqual(2, self.plugin._encrypt.call_count)
  343 
  344     def test_reinitialize_pkcs11(self):
  345         pkcs11 = self.pkcs11
  346         self.plugin._create_pkcs11 = mock.Mock()
  347         self.plugin._create_pkcs11.return_value = pkcs11
  348         self.plugin._configure_object_cache = mock.Mock()
  349         self.plugin._configure_object_cache.return_value = mock.DEFAULT
  350 
  351         self.plugin._reinitialize_pkcs11()
  352 
  353         self.assertEqual(1, self.pkcs11.finalize.call_count)
  354         self.assertEqual(1, self.plugin._create_pkcs11.call_count)
  355         self.assertEqual(1, self.plugin._configure_object_cache.call_count)
  356 
  357     def test_get_plugin_name(self):
  358         self.assertEqual(self.plugin_name, self.plugin.get_plugin_name())