"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/tests/plugin/test_store_crypto.py" (14 Apr 2021, 29372 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_store_crypto.py": 11.0.0_vs_12.0.0.

    1 # Copyright (c) 2014 Red Hat, Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 import base64
   16 from unittest import mock
   17 
   18 import testtools
   19 
   20 from barbican.common import utils
   21 from barbican.model import models
   22 from barbican.plugin.crypto import base
   23 from barbican.plugin.interface import secret_store
   24 from barbican.plugin import store_crypto
   25 from barbican.tests import keys
   26 from barbican.tests import utils as test_utils
   27 
   28 
   29 def get_private_dto():
   30     spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 2048)
   31     return secret_store.SecretDTO(secret_store.SecretType.PRIVATE,
   32                                   base64.b64encode(
   33                                       keys.get_private_key_pem()),
   34                                   spec,
   35                                   'application/pkcs8')
   36 
   37 
   38 def get_public_dto():
   39     spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 2048)
   40     return secret_store.SecretDTO(secret_store.SecretType.PUBLIC,
   41                                   base64.b64encode(
   42                                       keys.get_public_key_pem()),
   43                                   spec,
   44                                   'application/octet-stream')
   45 
   46 
   47 def get_certificate_dto():
   48     spec = secret_store.KeySpec(secret_store.KeyAlgorithm.RSA, 2048)
   49     return secret_store.SecretDTO(secret_store.SecretType.CERTIFICATE,
   50                                   base64.b64encode(
   51                                       keys.get_certificate_pem()),
   52                                   spec,
   53                                   'application/octet-stream')
   54 
   55 
   56 class TestSecretStoreBase(testtools.TestCase,
   57                           test_utils.MockModelRepositoryMixin):
   58     """Define common configurations for testing store_crypto.py."""
   59     def setUp(self):
   60         super(TestSecretStoreBase, self).setUp()
   61 
   62         self.patchers = []  # List of patchers utilized in this test class.
   63 
   64         self.project_id = '12345'
   65         self.content_type = 'application/octet-stream'
   66         self.content_encoding = 'base64'
   67         self.secret = base64.b64encode(b'secret')
   68         self.decrypted_secret = b'decrypted_secret'
   69         self.cypher_text = b'cypher_text'
   70         self.kek_meta_extended = 'kek-meta-extended'
   71         self.spec_aes = secret_store.KeySpec('AES', 64, 'CBC')
   72         self.spec_rsa = secret_store.KeySpec(
   73             'RSA', 1024, passphrase='changeit')
   74 
   75         self.project_model = mock.MagicMock()
   76         self.project_model.id = 'project-model-id'
   77         self.project_model.external_id = self.project_id
   78         self.secret_dto = secret_store.SecretDTO(
   79             secret_store.SecretType.OPAQUE,
   80             self.secret,
   81             secret_store.KeySpec(),
   82             self.content_type
   83         )
   84         self.response_dto = base.ResponseDTO(
   85             self.cypher_text, kek_meta_extended=self.kek_meta_extended)
   86         self.private_key_dto = base.ResponseDTO(self.cypher_text)
   87         self.public_key_dto = base.ResponseDTO(self.cypher_text)
   88         self.passphrase_dto = base.ResponseDTO(self.cypher_text)
   89 
   90         self.kek_meta_project_model = models.KEKDatum()
   91         self.kek_meta_project_model.plugin_name = 'plugin-name'
   92         self.kek_meta_project_model.kek_label = 'kek-meta-label'
   93         self.kek_meta_project_model.algorithm = 'kek-meta-algo'
   94         self.kek_meta_project_model.bit_length = 1024
   95         self.kek_meta_project_model.mode = 'kek=meta-mode'
   96         self.kek_meta_project_model.plugin_meta = 'kek-meta-plugin-meta'
   97 
   98         self.encrypted_datum_model = models.EncryptedDatum()
   99         self.encrypted_datum_model.kek_meta_project = (
  100             self.kek_meta_project_model)
  101         self.encrypted_datum_model.cypher_text = base64.b64encode(
  102             b'cypher_text')
  103         self.encrypted_datum_model.content_type = 'content_type'
  104         self.encrypted_datum_model.kek_meta_extended = 'extended_meta'
  105 
  106         self.secret_model = models.Secret(
  107             {
  108                 'algorithm': 'myalg',
  109                 'bit_length': 1024,
  110                 'mode': 'mymode'
  111             }
  112         )
  113         self.secret_model.id = 'secret-model-id'
  114         self.secret_model.encrypted_data = [self.encrypted_datum_model]
  115 
  116         self.context = store_crypto.StoreCryptoContext(
  117             secret_model=self.secret_model,
  118             project_model=self.project_model,
  119             content_type=self.content_type)
  120 
  121     def tearDown(self):
  122         super(TestSecretStoreBase, self).tearDown()
  123         for patcher in self.patchers:
  124             patcher.stop()
  125 
  126     def init_patchers(self):
  127         self._config_get_secret_repository()
  128         self._config_get_encrypted_datum_repository()
  129         self._config_get_kek_datum_repository()
  130 
  131     def _start_patcher(self, patcher):
  132         mock = patcher.start()
  133         self.patchers.append(patcher)
  134         return mock
  135 
  136     def _config_get_secret_repository(self):
  137         """Mock the get_secret_repository() factory function."""
  138         self.secret_repo = mock.MagicMock()
  139         self.secret_repo.create_from.return_value = self.secret_model
  140         self.setup_secret_repository_mock(self.secret_repo)
  141 
  142     def _config_get_encrypted_datum_repository(self):
  143         """Mock the get_encrypted_datum_repository() factory function."""
  144         self.datum_repo = mock.MagicMock()
  145         self.datum_repo.create_from.return_value = None
  146         self.setup_encrypted_datum_repository_mock(self.datum_repo)
  147 
  148     def _config_get_kek_datum_repository(self):
  149         """Mock the get_kek_datum_repository() factory function."""
  150         kek_model = self.kek_meta_project_model
  151         self.kek_repo = mock.MagicMock()
  152         self.kek_repo.find_or_create_kek_datum.return_value = kek_model
  153         self.setup_kek_datum_repository_mock(self.kek_repo)
  154 
  155 
  156 @test_utils.parameterized_test_case
  157 class WhenTestingStoreCrypto(TestSecretStoreBase):
  158 
  159     dataset_for_pem = {
  160         'private': [get_private_dto()],
  161         'public': [get_public_dto()],
  162         'certificate': [get_certificate_dto()]
  163     }
  164 
  165     def setUp(self):
  166         super(WhenTestingStoreCrypto, self).setUp()
  167 
  168         self.init_patchers()
  169         self._config_crypto_plugin()
  170         self._config_private_methods()
  171 
  172         self.plugin_to_test = store_crypto.StoreCryptoAdapterPlugin()
  173 
  174     def test_store_secret_with_context_type(self):
  175         """Test storing a secret."""
  176 
  177         response_dict = self.plugin_to_test.store_secret(
  178             self.secret_dto, self.context)
  179 
  180         self.assertIsNone(response_dict)
  181 
  182         # Verify encrypt plugin and method where invoked.
  183         encrypt_mock = self.encrypting_plugin.encrypt
  184         self.assertEqual(1, encrypt_mock.call_count)
  185         args, kwargs = encrypt_mock.call_args
  186         test_encrypt_dto, test_kek_meta_dto, test_project_id = tuple(args)
  187         self.assertIsInstance(test_encrypt_dto, base.EncryptDTO)
  188         self.assertEqual(b'secret', test_encrypt_dto.unencrypted)
  189         self.assertEqual(self.kek_meta_dto, test_kek_meta_dto)
  190         self.assertEqual(self.project_id, test_project_id)
  191 
  192     def test_store_secret_without_context_type(self):
  193         """Test storing a secret."""
  194         self.context.content_type = None
  195 
  196         self.plugin_to_test.store_secret(
  197             self.secret_dto, self.context)
  198 
  199         self.assertEqual(self.content_type, self.context.content_type)
  200 
  201     @test_utils.parameterized_dataset(dataset_for_pem)
  202     def test_store_pem_secret(self, secret_dto):
  203         """Test storing a secret that is PEM encoded."""
  204 
  205         response_dict = self.plugin_to_test.store_secret(
  206             secret_dto, self.context)
  207 
  208         self.assertIsNone(response_dict)
  209 
  210         raw_content = base64.b64decode(secret_dto.secret)
  211 
  212         # Verify encrypt plugin and method where invoked.
  213         encrypt_mock = self.encrypting_plugin.encrypt
  214         self.assertEqual(1, encrypt_mock.call_count)
  215         args, kwargs = encrypt_mock.call_args
  216         test_encrypt_dto, test_kek_meta_dto, test_project_id = tuple(args)
  217         self.assertIsInstance(test_encrypt_dto, base.EncryptDTO)
  218         self.assertEqual(raw_content, test_encrypt_dto.unencrypted)
  219         self.assertEqual(self.kek_meta_dto, test_kek_meta_dto)
  220         self.assertEqual(self.project_id, test_project_id)
  221 
  222     def test_get_secret(self):
  223         """Test getting a secret."""
  224 
  225         secret_dto = self.plugin_to_test.get_secret(
  226             secret_store.SecretType.OPAQUE,
  227             None,  # Secret metadata is not relevant to store_crypto process.
  228             self.context)
  229 
  230         # Verify response.
  231         self.assertIsInstance(secret_dto, secret_store.SecretDTO)
  232         self.assertEqual(secret_store.SecretType.OPAQUE, secret_dto.type)
  233         self.assertEqual(
  234             base64.encodebytes(self.decrypted_secret).rstrip(b'\n'),
  235             secret_dto.secret)
  236         self.assertEqual(
  237             self.encrypted_datum_model.content_type, secret_dto.content_type)
  238         self.assertIsInstance(secret_dto.key_spec, secret_store.KeySpec)
  239         self.assertEqual(
  240             self.secret_model.algorithm, secret_dto.key_spec.alg)
  241         self.assertEqual(
  242             self.secret_model.bit_length, secret_dto.key_spec.bit_length)
  243         self.assertEqual(
  244             self.secret_model.mode, secret_dto.key_spec.mode)
  245 
  246         # Verify decrypt plugin and method where invoked.
  247         decrypt_mock = self.retrieving_plugin.decrypt
  248         self.assertEqual(1, decrypt_mock.call_count)
  249         args, kwargs = decrypt_mock.call_args
  250         (
  251             test_decrypt,
  252             test_kek_meta,
  253             test_kek_meta_extended,
  254             test_project_id
  255         ) = tuple(args)
  256 
  257         self.assertIsInstance(test_decrypt, base.DecryptDTO)
  258         self.assertEqual(
  259             base64.b64decode(self.encrypted_datum_model.cypher_text),
  260             test_decrypt.encrypted)
  261 
  262         self.assertIsInstance(test_kek_meta, base.KEKMetaDTO)
  263         self.assertEqual(
  264             self.kek_meta_project_model.plugin_name, test_kek_meta.plugin_name)
  265 
  266         self.assertEqual(
  267             self.encrypted_datum_model.kek_meta_extended,
  268             test_kek_meta_extended)
  269 
  270         self.assertEqual(self.project_id, test_project_id)
  271 
  272     @test_utils.parameterized_dataset(dataset_for_pem)
  273     def test_get_secret_encoding(self, input_secret_dto):
  274         """Test getting a secret that should be returend in PEM format."""
  275         secret = input_secret_dto.secret
  276         key_spec = input_secret_dto.key_spec
  277         secret_type = input_secret_dto.type
  278 
  279         decrypt_mock = self.retrieving_plugin.decrypt
  280         decrypt_mock.return_value = base64.decodebytes(secret)
  281 
  282         secret_model = self.context.secret_model
  283         secret_model.algorithm = key_spec.alg
  284         secret_model.bit_length = key_spec.bit_length
  285         secret_model.mode = key_spec.mode
  286 
  287         secret_dto = self.plugin_to_test.get_secret(
  288             secret_type,
  289             None,  # Secret metadata is not relevant to store_crypto process.
  290             self.context)
  291 
  292         # Verify response.
  293         self.assertIsInstance(secret_dto, secret_store.SecretDTO)
  294         self.assertEqual(secret, secret_dto.secret)
  295         self.assertEqual(secret_type, secret_dto.type)
  296         self.assertIsInstance(secret_dto.key_spec, secret_store.KeySpec)
  297         self.assertEqual(
  298             secret_model.algorithm, secret_dto.key_spec.alg)
  299         self.assertEqual(
  300             secret_model.bit_length, secret_dto.key_spec.bit_length)
  301         self.assertEqual(
  302             secret_model.mode, secret_dto.key_spec.mode)
  303 
  304     def test_generate_symmetric_key(self):
  305         """test symmetric secret generation."""
  306         generation_type = base.PluginSupportTypes.SYMMETRIC_KEY_GENERATION
  307         self._config_determine_generation_type_private_method(
  308             generation_type)
  309 
  310         response_dict = self.plugin_to_test.generate_symmetric_key(
  311             self.spec_aes, self.context)
  312 
  313         self.assertIsNone(response_dict)
  314 
  315         # Verify KEK objects finder was invoked.
  316         method_target = self.find_or_create_kek_objects_patcher.target
  317         method_mock = method_target._find_or_create_kek_objects
  318         self.assertEqual(1, method_mock.call_count)
  319 
  320         # Verify generating plugin and method where invoked.
  321         self._verify_generating_plugin_args(
  322             self.generating_plugin.generate_symmetric,
  323             self.spec_aes.alg,
  324             self.spec_aes.bit_length)
  325 
  326         # Verify secret save was invoked.
  327         method_target = self.store_secret_and_datum_patcher.target
  328         method_mock = method_target._store_secret_and_datum
  329         self.assertEqual(1, method_mock.call_count)
  330 
  331     def test_generate_asymmetric_key_with_passphrase(self):
  332         """test asymmetric secret generation with passphrase."""
  333         self._test_generate_asymmetric_key(passphrase='passphrase')
  334 
  335     def test_generate_asymmetric_key_without_passphrase(self):
  336         """test asymmetric secret generation with passphrase."""
  337         self._test_generate_asymmetric_key(passphrase=None)
  338 
  339     def test_generate_supports(self):
  340         """test generate_supports."""
  341         # False return if KeySpec == None
  342         self.assertFalse(self.plugin_to_test.generate_supports(None))
  343 
  344         # AES KeySpec should be supported.
  345         key_spec = secret_store.KeySpec(alg='AES', bit_length=64, mode='CBC')
  346         self.assertTrue(self.plugin_to_test.generate_supports(key_spec))
  347         key_spec = secret_store.KeySpec(alg='aes', bit_length=64, mode='CBC')
  348         self.assertTrue(self.plugin_to_test.generate_supports(key_spec))
  349 
  350         # RSA KeySpec should be supported.
  351         key_spec = secret_store.KeySpec(alg='RSA', bit_length=2048)
  352         self.assertTrue(self.plugin_to_test.generate_supports(key_spec))
  353         # Camellia KeySpec should not be supported.
  354         self.key_spec = secret_store.KeySpec('Camellia', 64)
  355         self.assertFalse(self.plugin_to_test.generate_supports(self.key_spec))
  356         # Bogus KeySpec should not be supported.
  357         key_spec = secret_store.KeySpec(alg='bogus', bit_length=2048)
  358         self.assertFalse(self.plugin_to_test.generate_supports(key_spec))
  359 
  360     def test_store_secret_supports(self):
  361         # All spec types are supported for storage.
  362         key_spec = secret_store.KeySpec(
  363             alg='anyalg', bit_length=64, mode='CBC')
  364         self.assertTrue(self.plugin_to_test.store_secret_supports(key_spec))
  365 
  366     def test_delete_secret(self):
  367         """Delete is not implemented, so just verify passes."""
  368         self.plugin_to_test.delete_secret(None)
  369 
  370     def test_should_raise_secret_not_found_get_secret_with_no_model(self):
  371         self.context.secret_model = None
  372 
  373         self.assertRaises(
  374             secret_store.SecretNotFoundException,
  375             self.plugin_to_test.get_secret,
  376             secret_store.SecretType.OPAQUE,
  377             None,  # get_secret() doesn't use the secret metadata argument
  378             self.context
  379         )
  380 
  381     def test_should_raise_secret_not_found_get_secret_no_encrypted_data(self):
  382         self.context.secret_model.encrypted_data = []
  383 
  384         self.assertRaises(
  385             secret_store.SecretNotFoundException,
  386             self.plugin_to_test.get_secret,
  387             secret_store.SecretType.OPAQUE,
  388             None,  # get_secret() doesn't use the secret metadata argument
  389             self.context
  390         )
  391 
  392     def test_should_raise_algorithm_not_supported_generate_symmetric_key(self):
  393         generation_type = base.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION
  394         self._config_determine_generation_type_private_method(
  395             generation_type)
  396 
  397         self.assertRaises(
  398             secret_store.SecretAlgorithmNotSupportedException,
  399             self.plugin_to_test.generate_symmetric_key,
  400             self.spec_aes,
  401             self.context
  402         )
  403 
  404     def test_should_raise_algo_not_supported_generate_asymmetric_key(self):
  405         generation_type = base.PluginSupportTypes.SYMMETRIC_KEY_GENERATION
  406         self._config_determine_generation_type_private_method(
  407             generation_type)
  408 
  409         self.assertRaises(
  410             secret_store.SecretAlgorithmNotSupportedException,
  411             self.plugin_to_test.generate_asymmetric_key,
  412             self.spec_rsa,
  413             self.context
  414         )
  415 
  416     def _test_generate_asymmetric_key(self, passphrase=None):
  417         """test asymmetric secret generation with passphrase parameter."""
  418         self.spec_rsa.passphrase = passphrase
  419 
  420         generation_type = base.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION
  421         self._config_determine_generation_type_private_method(
  422             generation_type)
  423 
  424         response_dto = self.plugin_to_test.generate_asymmetric_key(
  425             self.spec_rsa, self.context)
  426 
  427         # Verify response.
  428         self.assertIsInstance(
  429             response_dto, secret_store.AsymmetricKeyMetadataDTO)
  430         self.assertIsNone(response_dto.private_key_meta)
  431         self.assertIsNone(response_dto.public_key_meta)
  432         self.assertIsNone(response_dto.passphrase_meta)
  433 
  434         # Verify KEK objects finder was invoked.
  435         method_target = self.find_or_create_kek_objects_patcher.target
  436         method_mock = method_target._find_or_create_kek_objects
  437         self.assertEqual(1, method_mock.call_count)
  438 
  439         # Verify generating plugin and method where invoked.
  440         self._verify_generating_plugin_args(
  441             self.generating_plugin.generate_asymmetric,
  442             self.spec_rsa.alg,
  443             self.spec_rsa.bit_length)
  444 
  445         # Assert the secret save was called the proper number of times.
  446         call_count = 2
  447         if passphrase:
  448             call_count = 3
  449         method_target = self.store_secret_and_datum_patcher.target
  450         method_mock = method_target._store_secret_and_datum
  451         self.assertEqual(call_count, method_mock.call_count)
  452 
  453     def _verify_generating_plugin_args(self, generate_mock, alg, bit_length):
  454         """Verify generating plugin and method where invoked."""
  455         self.assertEqual(1, generate_mock.call_count)
  456         args, kwargs = generate_mock.call_args
  457         test_generate_dto, test_kek_meta_dto, test_project_id = tuple(args)
  458         self.assertIsInstance(test_generate_dto, base.GenerateDTO)
  459         self.assertEqual(alg, test_generate_dto.algorithm)
  460         self.assertEqual(bit_length, test_generate_dto.bit_length)
  461         self.assertEqual(self.kek_meta_dto, test_kek_meta_dto)
  462         self.assertEqual(self.project_id, test_project_id)
  463 
  464         return generate_mock
  465 
  466     def _config_crypto_plugin(self):
  467         """Mock the crypto plugin."""
  468 
  469         # Create encrypting and generating plugins (the same plugin does both)
  470         response_dto = self.response_dto
  471         self.generating_plugin = mock.MagicMock()
  472         self.encrypting_plugin = self.generating_plugin
  473         self.generating_plugin.encrypt.return_value = response_dto
  474         self.generating_plugin.generate_symmetric.return_value = response_dto
  475         self.generating_plugin.generate_asymmetric.return_value = (
  476             self.private_key_dto, self.public_key_dto, self.passphrase_dto
  477         )
  478 
  479         # Create secret retrieving plugin
  480         self.retrieving_plugin = mock.MagicMock()
  481         self.retrieving_plugin.decrypt.return_value = self.decrypted_secret
  482 
  483         gen_plugin_config = {
  484             'get_plugin_store_generate.return_value': self.generating_plugin,
  485             'get_plugin_retrieve.return_value': self.retrieving_plugin,
  486         }
  487         self.gen_plugin_patcher = mock.patch(
  488             'barbican.plugin.crypto.manager._PLUGIN_MANAGER',
  489             **gen_plugin_config
  490         )
  491         self._start_patcher(self.gen_plugin_patcher)
  492 
  493     def _config_private_methods(self):
  494         """Mock store_crypto's private methods."""
  495 
  496         # Mock _find_or_create_kek_objects().
  497         self.kek_meta_dto = mock.MagicMock()
  498         find_or_create_kek_objects_config = {
  499             'return_value': (
  500                 self.kek_meta_project_model, self.kek_meta_dto),
  501         }
  502         self.find_or_create_kek_objects_patcher = mock.patch(
  503             'barbican.plugin.store_crypto._find_or_create_kek_objects',
  504             **find_or_create_kek_objects_config
  505         )
  506         self._start_patcher(self.find_or_create_kek_objects_patcher)
  507 
  508         # Mock _store_secret_and_datum().
  509         self.store_secret_and_datum_patcher = mock.patch(
  510             'barbican.plugin.store_crypto._store_secret_and_datum'
  511         )
  512         self._start_patcher(self.store_secret_and_datum_patcher)
  513 
  514     def _config_determine_generation_type_private_method(self, type_to_return):
  515         """Mock _determine_generation_type()."""
  516 
  517         determine_generation_type_config = {
  518             'return_value': type_to_return,
  519         }
  520         self.determine_generation_type_patcher = mock.patch(
  521             'barbican.plugin.store_crypto._determine_generation_type',
  522             **determine_generation_type_config
  523         )
  524         self._start_patcher(self.determine_generation_type_patcher)
  525 
  526 
  527 class WhenTestingStoreCryptoDetermineGenerationType(testtools.TestCase):
  528     """Tests store_crypto.py's _determine_generation_type() function."""
  529 
  530     def test_symmetric_algorithms(self):
  531         for algorithm in base.PluginSupportTypes.SYMMETRIC_ALGORITHMS:
  532             self.assertEqual(
  533                 base.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
  534                 store_crypto._determine_generation_type(algorithm))
  535 
  536         # Case doesn't matter.
  537         self.assertEqual(
  538             base.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
  539             store_crypto._determine_generation_type('AeS'))
  540 
  541     def test_asymmetric_algorithms(self):
  542         for algorithm in base.PluginSupportTypes.ASYMMETRIC_ALGORITHMS:
  543             self.assertEqual(
  544                 base.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION,
  545                 store_crypto._determine_generation_type(algorithm))
  546 
  547         # Case doesn't matter.
  548         self.assertEqual(
  549             base.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION,
  550             store_crypto._determine_generation_type('RsA'))
  551 
  552     def test_should_raise_not_supported_no_algorithm(self):
  553         self.assertRaises(
  554             secret_store.SecretAlgorithmNotSupportedException,
  555             store_crypto._determine_generation_type,
  556             None
  557         )
  558 
  559     def test_should_raise_not_supported_bogus_algorithm(self):
  560         self.assertRaises(
  561             secret_store.SecretAlgorithmNotSupportedException,
  562             store_crypto._determine_generation_type,
  563             'bogus'
  564         )
  565 
  566 
  567 class WhenTestingStoreCryptoFindOrCreateKekObjects(TestSecretStoreBase):
  568     """Tests store_crypto.py's _find_or_create_kek_objects() function."""
  569 
  570     def setUp(self):
  571         super(WhenTestingStoreCryptoFindOrCreateKekObjects, self).setUp()
  572         self.init_patchers()
  573         self._config_private_methods()
  574 
  575     def test_kek_bind_completed(self):
  576         self.kek_meta_project_model.bind_completed = True
  577         plugin_inst = self
  578 
  579         kek_model, kek_meta_dto = store_crypto._find_or_create_kek_objects(
  580             plugin_inst, self.project_model)
  581 
  582         # Verify returns.
  583         self.assertEqual(self.kek_meta_project_model, kek_model)
  584         self.assertIsInstance(kek_meta_dto, base.KEKMetaDTO)
  585 
  586         # Verify the KEK repository interactions.
  587         self._verify_kek_repository_interactions(plugin_inst)
  588 
  589     def test_kek_bind_not_completed(self):
  590         self.kek_meta_project_model.bind_completed = False
  591         test_kek_metadata = 'metadata'
  592         plugin_inst = mock.MagicMock()
  593         plugin_inst.bind_kek_metadata.return_value = test_kek_metadata
  594 
  595         kek_model, kek_meta_dto = store_crypto._find_or_create_kek_objects(
  596             plugin_inst, self.project_model)
  597 
  598         # Verify returns.
  599         self.assertEqual(self.kek_meta_project_model, kek_model)
  600         self.assertEqual(test_kek_metadata, kek_meta_dto)
  601 
  602         # Verify the KEK repository interactions.
  603         self._verify_kek_repository_interactions(plugin_inst)
  604 
  605         # Verify bind operations.
  606         self.assertEqual(
  607             1, plugin_inst.bind_kek_metadata.call_count)
  608         self.assertEqual(
  609             1, self.bind_completed_mock.call_count)
  610         self.assertEqual(
  611             1, self.kek_repo.save.call_count)
  612         args, kwargs = self.kek_repo.save.call_args
  613         kek_model = args[0]
  614         self.assertEqual(self.kek_meta_project_model, kek_model)
  615 
  616     def test_kek_raise_no_kek_bind_not_completed(self):
  617         self.kek_meta_project_model.bind_completed = False
  618         plugin_inst = mock.MagicMock()
  619         plugin_inst.bind_kek_metadata.return_value = None
  620 
  621         self.assertRaises(
  622             base.CryptoKEKBindingException,
  623             store_crypto._find_or_create_kek_objects,
  624             plugin_inst,
  625             self.project_model)
  626 
  627     def _verify_kek_repository_interactions(self, plugin_inst):
  628         """Verify the KEK repository interactions."""
  629         self.assertEqual(
  630             1, self.kek_repo.find_or_create_kek_datum.call_count)
  631         args, kwargs = self.kek_repo.find_or_create_kek_datum.call_args
  632         test_project_model = args[0]
  633         test_full_plugin_name = args[1]
  634         self.assertEqual(self.project_model, test_project_model)
  635         plugin_name = utils.generate_fullname_for(plugin_inst)
  636         self.assertEqual(plugin_name, test_full_plugin_name)
  637 
  638     def _config_private_methods(self):
  639         """Mock store_crypto's private methods."""
  640 
  641         # Mock _indicate_bind_completed().
  642         indicate_bind_completed_config = {
  643             'return_value': None
  644         }
  645         self.indicate_bind_completed_patcher = mock.patch(
  646             'barbican.plugin.store_crypto._indicate_bind_completed',
  647             **indicate_bind_completed_config)
  648         self.bind_completed_mock = self._start_patcher(
  649             self.indicate_bind_completed_patcher)
  650 
  651 
  652 class WhenTestingStoreCryptoStoreSecretAndDatum(TestSecretStoreBase):
  653     """Tests store_crypto.py's _store_secret_and_datum() function."""
  654 
  655     def setUp(self):
  656         super(WhenTestingStoreCryptoStoreSecretAndDatum, self).setUp()
  657 
  658         self.init_patchers()
  659 
  660     def test_without_existing_secret(self):
  661 
  662         self.secret_model.id = None
  663 
  664         store_crypto._store_secret_and_datum(
  665             self.context,
  666             self.secret_model,
  667             self.kek_meta_project_model,
  668             self.response_dto)
  669 
  670         # Verify the repository interactions.
  671         self._verify_secret_repository_interactions()
  672         self._verify_encrypted_datum_repository_interactions()
  673 
  674     def test_with_existing_secret(self):
  675         store_crypto._store_secret_and_datum(
  676             self.context,
  677             self.secret_model,
  678             self.kek_meta_project_model,
  679             self.response_dto)
  680 
  681         # Verify the repository interactions.
  682         self._verify_encrypted_datum_repository_interactions()
  683 
  684         # Verify **not** these repository interactions.
  685         self.assertEqual(
  686             0, self.secret_repo.create_from.call_count)
  687 
  688     def _verify_secret_repository_interactions(self):
  689         """Verify the secret repository interactions."""
  690         self.assertEqual(
  691             1, self.secret_repo.create_from.call_count)
  692         args, kwargs = self.secret_repo.create_from.call_args
  693         test_secret_model = args[0]
  694         self.assertEqual(self.secret_model, test_secret_model)
  695 
  696     def _verify_encrypted_datum_repository_interactions(self):
  697         """Verify the encrypted datum repository interactions."""
  698         self.assertEqual(
  699             1, self.datum_repo.create_from.call_count)
  700         args, kwargs = self.datum_repo.create_from.call_args
  701         test_datum_model = args[0]
  702         self.assertIsInstance(test_datum_model, models.EncryptedDatum)
  703         self.assertEqual(
  704             self.content_type, test_datum_model.content_type)
  705         self.assertEqual(
  706             base64.encodebytes(self.cypher_text).rstrip(b'\n'),
  707             test_datum_model.cypher_text)
  708         self.assertEqual(
  709             self.response_dto.kek_meta_extended,
  710             test_datum_model.kek_meta_extended)
  711 
  712 
  713 class WhenTestingStoreCryptoIndicateBindCompleted(TestSecretStoreBase):
  714     """Tests store_crypto.py's _indicate_bind_completed() function."""
  715 
  716     def test_bind_operation(self):
  717         kek_meta_dto = base.KEKMetaDTO(self.kek_meta_project_model)
  718         self.kek_meta_project_model.bind_completed = False
  719 
  720         store_crypto._indicate_bind_completed(
  721             kek_meta_dto, self.kek_meta_project_model)
  722 
  723         self.assertTrue(self.kek_meta_project_model.bind_completed)
  724         self.assertEqual(
  725             self.kek_meta_project_model.algorithm, kek_meta_dto.algorithm)
  726         self.assertEqual(
  727             self.kek_meta_project_model.bit_length, kek_meta_dto.bit_length)
  728         self.assertEqual(
  729             self.kek_meta_project_model.mode, kek_meta_dto.mode)
  730         self.assertEqual(
  731             self.kek_meta_project_model.plugin_meta, kek_meta_dto.plugin_meta)