"Fossies" - the Fresh Open Source Software Archive

Member "swift-2.21.0/test/unit/common/middleware/crypto/test_crypto_utils.py" (25 Mar 2019, 24729 Bytes) of package /linux/misc/openstack/swift-2.21.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_crypto_utils.py": 2.19.1_vs_2.21.0.

    1 # Copyright (c) 2015-2016 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 import os
   16 import unittest
   17 
   18 import mock
   19 from cryptography.hazmat.backends import default_backend
   20 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
   21 
   22 from swift.common.exceptions import EncryptionException
   23 from swift.common.middleware.crypto import crypto_utils
   24 from swift.common.middleware.crypto.crypto_utils import (
   25     CRYPTO_KEY_CALLBACK, Crypto, CryptoWSGIContext)
   26 from swift.common.swob import HTTPException
   27 from test.unit import FakeLogger
   28 from test.unit.common.middleware.crypto.crypto_helpers import fetch_crypto_keys
   29 
   30 
   31 class TestCryptoWsgiContext(unittest.TestCase):
   32     def setUp(self):
   33         class FakeFilter(object):
   34             app = None
   35             crypto = Crypto({})
   36 
   37         self.fake_logger = FakeLogger()
   38         self.crypto_context = CryptoWSGIContext(
   39             FakeFilter(), 'object', self.fake_logger)
   40 
   41     def test_get_keys(self):
   42         # ok
   43         env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
   44         keys = self.crypto_context.get_keys(env)
   45         self.assertDictEqual(fetch_crypto_keys(), keys)
   46 
   47         # only default required keys are checked
   48         subset_keys = {'object': fetch_crypto_keys()['object']}
   49         env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
   50         keys = self.crypto_context.get_keys(env)
   51         self.assertDictEqual(subset_keys, keys)
   52 
   53         # only specified required keys are checked
   54         subset_keys = {'container': fetch_crypto_keys()['container']}
   55         env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
   56         keys = self.crypto_context.get_keys(env, required=['container'])
   57         self.assertDictEqual(subset_keys, keys)
   58 
   59         subset_keys = {'object': fetch_crypto_keys()['object'],
   60                        'container': fetch_crypto_keys()['container']}
   61         env = {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: subset_keys}
   62         keys = self.crypto_context.get_keys(
   63             env, required=['object', 'container'])
   64         self.assertDictEqual(subset_keys, keys)
   65 
   66     def test_get_keys_with_crypto_meta(self):
   67         # verify that key_id from crypto_meta is passed to fetch_crypto_keys
   68         keys = fetch_crypto_keys()
   69         mock_fetch_crypto_keys = mock.MagicMock(return_value=keys)
   70         env = {CRYPTO_KEY_CALLBACK: mock_fetch_crypto_keys}
   71         key_id = {'secret_id': '123'}
   72         keys = self.crypto_context.get_keys(env, key_id=key_id)
   73         self.assertDictEqual(fetch_crypto_keys(), keys)
   74         mock_fetch_crypto_keys.assert_called_with(key_id={'secret_id': '123'})
   75 
   76         # but it's ok for there to be no crypto_meta
   77         keys = self.crypto_context.get_keys(env, key_id={})
   78         self.assertDictEqual(fetch_crypto_keys(), keys)
   79         mock_fetch_crypto_keys.assert_called_with(key_id={})
   80         keys = self.crypto_context.get_keys(env)
   81         self.assertDictEqual(fetch_crypto_keys(), keys)
   82         mock_fetch_crypto_keys.assert_called_with(key_id=None)
   83 
   84     def test_get_keys_missing_callback(self):
   85         with self.assertRaises(HTTPException) as cm:
   86             self.crypto_context.get_keys({})
   87         self.assertIn('500 Internal Error', cm.exception.status)
   88         self.assertIn('missing callback',
   89                       self.fake_logger.get_lines_for_level('error')[0])
   90         self.assertIn(b'Unable to retrieve encryption keys.',
   91                       cm.exception.body)
   92 
   93     def test_get_keys_callback_exception(self):
   94         def callback(*args, **kwargs):
   95             raise Exception('boom')
   96         with self.assertRaises(HTTPException) as cm:
   97             self.crypto_context.get_keys({CRYPTO_KEY_CALLBACK: callback})
   98         self.assertIn('500 Internal Error', cm.exception.status)
   99         self.assertIn('from callback: boom',
  100                       self.fake_logger.get_lines_for_level('error')[0])
  101         self.assertIn(b'Unable to retrieve encryption keys.',
  102                       cm.exception.body)
  103 
  104     def test_get_keys_missing_key_for_default_required_list(self):
  105         bad_keys = dict(fetch_crypto_keys())
  106         bad_keys.pop('object')
  107         with self.assertRaises(HTTPException) as cm:
  108             self.crypto_context.get_keys(
  109                 {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys})
  110         self.assertIn('500 Internal Error', cm.exception.status)
  111         self.assertIn("Missing key for 'object'",
  112                       self.fake_logger.get_lines_for_level('error')[0])
  113         self.assertIn(b'Unable to retrieve encryption keys.',
  114                       cm.exception.body)
  115 
  116     def test_get_keys_missing_object_key_for_specified_required_list(self):
  117         bad_keys = dict(fetch_crypto_keys())
  118         bad_keys.pop('object')
  119         with self.assertRaises(HTTPException) as cm:
  120             self.crypto_context.get_keys(
  121                 {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys},
  122                 required=['object', 'container'])
  123         self.assertIn('500 Internal Error', cm.exception.status)
  124         self.assertIn("Missing key for 'object'",
  125                       self.fake_logger.get_lines_for_level('error')[0])
  126         self.assertIn(b'Unable to retrieve encryption keys.',
  127                       cm.exception.body)
  128 
  129     def test_get_keys_missing_container_key_for_specified_required_list(self):
  130         bad_keys = dict(fetch_crypto_keys())
  131         bad_keys.pop('container')
  132         with self.assertRaises(HTTPException) as cm:
  133             self.crypto_context.get_keys(
  134                 {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys},
  135                 required=['object', 'container'])
  136         self.assertIn('500 Internal Error', cm.exception.status)
  137         self.assertIn("Missing key for 'container'",
  138                       self.fake_logger.get_lines_for_level('error')[0])
  139         self.assertIn(b'Unable to retrieve encryption keys.',
  140                       cm.exception.body)
  141 
  142     def test_bad_object_key_for_default_required_list(self):
  143         bad_keys = dict(fetch_crypto_keys())
  144         bad_keys['object'] = b'the minor key'
  145         with self.assertRaises(HTTPException) as cm:
  146             self.crypto_context.get_keys(
  147                 {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys})
  148         self.assertIn('500 Internal Error', cm.exception.status)
  149         self.assertIn("Bad key for 'object'",
  150                       self.fake_logger.get_lines_for_level('error')[0])
  151         self.assertIn(b'Unable to retrieve encryption keys.',
  152                       cm.exception.body)
  153 
  154     def test_bad_container_key_for_default_required_list(self):
  155         bad_keys = dict(fetch_crypto_keys())
  156         bad_keys['container'] = b'the major key'
  157         with self.assertRaises(HTTPException) as cm:
  158             self.crypto_context.get_keys(
  159                 {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys},
  160                 required=['object', 'container'])
  161         self.assertIn('500 Internal Error', cm.exception.status)
  162         self.assertIn("Bad key for 'container'",
  163                       self.fake_logger.get_lines_for_level('error')[0])
  164         self.assertIn(b'Unable to retrieve encryption keys.',
  165                       cm.exception.body)
  166 
  167     def test_get_keys_not_a_dict(self):
  168         with self.assertRaises(HTTPException) as cm:
  169             self.crypto_context.get_keys(
  170                 {CRYPTO_KEY_CALLBACK:
  171                     lambda *args, **kwargs: ['key', 'quay', 'qui']})
  172         self.assertEqual('500 Internal Error', cm.exception.status)
  173         self.assertIn("Did not get a keys dict",
  174                       self.fake_logger.get_lines_for_level('error')[0])
  175         self.assertIn(b'Unable to retrieve encryption keys.',
  176                       cm.exception.body)
  177 
  178     def test_get_multiple_keys(self):
  179         env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
  180         mutliple_keys = self.crypto_context.get_multiple_keys(env)
  181         self.assertEqual(
  182             [fetch_crypto_keys(),
  183              fetch_crypto_keys(key_id={'secret_id': 'myid'})],
  184             mutliple_keys)
  185 
  186 
  187 class TestModuleMethods(unittest.TestCase):
  188     meta = {'iv': b'0123456789abcdef', 'cipher': 'AES_CTR_256'}
  189     serialized_meta = '%7B%22cipher%22%3A+%22AES_CTR_256%22%2C+%22' \
  190                       'iv%22%3A+%22MDEyMzQ1Njc4OWFiY2RlZg%3D%3D%22%7D'
  191 
  192     meta_with_key = {'iv': b'0123456789abcdef', 'cipher': 'AES_CTR_256',
  193                      'body_key': {'key': b'fedcba9876543210fedcba9876543210',
  194                                   'iv': b'fedcba9876543210'}}
  195     serialized_meta_with_key = '%7B%22body_key%22%3A+%7B%22iv%22%3A+%22ZmVkY' \
  196                                '2JhOTg3NjU0MzIxMA%3D%3D%22%2C+%22key%22%3A+%' \
  197                                '22ZmVkY2JhOTg3NjU0MzIxMGZlZGNiYTk4NzY1NDMyMT' \
  198                                'A%3D%22%7D%2C+%22cipher%22%3A+%22AES_CTR_256' \
  199                                '%22%2C+%22iv%22%3A+%22MDEyMzQ1Njc4OWFiY2RlZg' \
  200                                '%3D%3D%22%7D'
  201 
  202     def test_dump_crypto_meta(self):
  203         actual = crypto_utils.dump_crypto_meta(self.meta)
  204         self.assertEqual(self.serialized_meta, actual)
  205 
  206         actual = crypto_utils.dump_crypto_meta(self.meta_with_key)
  207         self.assertEqual(self.serialized_meta_with_key, actual)
  208 
  209     def test_load_crypto_meta(self):
  210         actual = crypto_utils.load_crypto_meta(self.serialized_meta)
  211         self.assertEqual(self.meta, actual)
  212 
  213         actual = crypto_utils.load_crypto_meta(self.serialized_meta_with_key)
  214         self.assertEqual(self.meta_with_key, actual)
  215 
  216         def assert_raises(value, message):
  217             with self.assertRaises(EncryptionException) as cm:
  218                 crypto_utils.load_crypto_meta(value)
  219             self.assertIn('Bad crypto meta %r' % value, cm.exception.args[0])
  220             if isinstance(message, (tuple, list)):
  221                 for opt in message:
  222                     if opt in cm.exception.args[0]:
  223                         break
  224                 else:
  225                     self.fail('Expected to find one of %r in %r' % (
  226                         message, cm.exception.args[0]))
  227             else:
  228                 self.assertIn(message, cm.exception.args[0])
  229 
  230         assert_raises(None, 'crypto meta not a string')
  231         assert_raises(99, 'crypto meta not a string')
  232         assert_raises('', ('No JSON object could be decoded',
  233                            'Expecting value: line 1 column 1'))
  234         assert_raises('abc', ('No JSON object could be decoded',
  235                               'Expecting value: line 1 column 1'))
  236         assert_raises('[]', 'crypto meta not a Mapping')
  237         bad_type_messages = [
  238             'must be string or buffer',
  239             'argument should be a bytes-like object or ASCII string',
  240         ]
  241         assert_raises('{"iv": "abcdef"}', 'Incorrect padding')
  242         assert_raises('{"iv": []}', bad_type_messages)
  243         assert_raises('{"iv": {}}', bad_type_messages)
  244         assert_raises('{"iv": 99}', bad_type_messages)
  245         assert_raises('{"key": "abcdef"}', 'Incorrect padding')
  246         assert_raises('{"key": []}', bad_type_messages)
  247         assert_raises('{"key": {}}', bad_type_messages)
  248         assert_raises('{"key": 99}', bad_type_messages)
  249         assert_raises('{"body_key": {"iv": "abcdef"}}', 'Incorrect padding')
  250         assert_raises('{"body_key": {"iv": []}}', bad_type_messages)
  251         assert_raises('{"body_key": {"iv": {}}}', bad_type_messages)
  252         assert_raises('{"body_key": {"iv": 99}}', bad_type_messages)
  253         assert_raises('{"body_key": {"key": "abcdef"}}', 'Incorrect padding')
  254         assert_raises('{"body_key": {"key": []}}', bad_type_messages)
  255         assert_raises('{"body_key": {"key": {}}}', bad_type_messages)
  256         assert_raises('{"body_key": {"key": 99}}', bad_type_messages)
  257 
  258     def test_dump_then_load_crypto_meta(self):
  259         actual = crypto_utils.load_crypto_meta(
  260             crypto_utils.dump_crypto_meta(self.meta))
  261         self.assertEqual(self.meta, actual)
  262 
  263         actual = crypto_utils.load_crypto_meta(
  264             crypto_utils.dump_crypto_meta(self.meta_with_key))
  265         self.assertEqual(self.meta_with_key, actual)
  266 
  267     def test_append_crypto_meta(self):
  268         actual = crypto_utils.append_crypto_meta('abc', self.meta)
  269         expected = 'abc; swift_meta=%s' % self.serialized_meta
  270         self.assertEqual(actual, expected)
  271 
  272         actual = crypto_utils.append_crypto_meta('abc', self.meta_with_key)
  273         expected = 'abc; swift_meta=%s' % self.serialized_meta_with_key
  274         self.assertEqual(actual, expected)
  275 
  276     def test_extract_crypto_meta(self):
  277         val, meta = crypto_utils.extract_crypto_meta(
  278             'abc; swift_meta=%s' % self.serialized_meta)
  279         self.assertEqual('abc', val)
  280         self.assertDictEqual(self.meta, meta)
  281 
  282         val, meta = crypto_utils.extract_crypto_meta(
  283             'abc; swift_meta=%s' % self.serialized_meta_with_key)
  284         self.assertEqual('abc', val)
  285         self.assertDictEqual(self.meta_with_key, meta)
  286 
  287         val, meta = crypto_utils.extract_crypto_meta('abc')
  288         self.assertEqual('abc', val)
  289         self.assertIsNone(meta)
  290 
  291         # other param names will be ignored
  292         val, meta = crypto_utils.extract_crypto_meta('abc; foo=bar')
  293         self.assertEqual('abc', val)
  294         self.assertIsNone(meta)
  295 
  296         val, meta = crypto_utils.extract_crypto_meta(
  297             'abc; swift_meta=%s; foo=bar' % self.serialized_meta_with_key)
  298         self.assertEqual('abc', val)
  299         self.assertDictEqual(self.meta_with_key, meta)
  300 
  301     def test_append_then_extract_crypto_meta(self):
  302         val = 'abc'
  303         actual = crypto_utils.extract_crypto_meta(
  304             crypto_utils.append_crypto_meta(val, self.meta))
  305         self.assertEqual((val, self.meta), actual)
  306 
  307 
  308 class TestCrypto(unittest.TestCase):
  309 
  310     def setUp(self):
  311         self.crypto = Crypto({})
  312 
  313     def test_create_encryption_context(self):
  314         value = b'encrypt me' * 100  # more than one cipher block
  315         key = os.urandom(32)
  316         iv = os.urandom(16)
  317         ctxt = self.crypto.create_encryption_ctxt(key, iv)
  318         expected = Cipher(
  319             algorithms.AES(key), modes.CTR(iv),
  320             backend=default_backend()).encryptor().update(value)
  321         self.assertEqual(expected, ctxt.update(value))
  322 
  323         for bad_iv in (b'a little too long', b'too short'):
  324             self.assertRaises(
  325                 ValueError, self.crypto.create_encryption_ctxt, key, bad_iv)
  326 
  327         for bad_key in (b'objKey', b'a' * 31, b'a' * 33, b'a' * 16, b'a' * 24):
  328             self.assertRaises(
  329                 ValueError, self.crypto.create_encryption_ctxt, bad_key, iv)
  330 
  331     def test_create_decryption_context(self):
  332         value = b'decrypt me' * 100  # more than one cipher block
  333         key = os.urandom(32)
  334         iv = os.urandom(16)
  335         ctxt = self.crypto.create_decryption_ctxt(key, iv, 0)
  336         expected = Cipher(
  337             algorithms.AES(key), modes.CTR(iv),
  338             backend=default_backend()).decryptor().update(value)
  339         self.assertEqual(expected, ctxt.update(value))
  340 
  341         for bad_iv in (b'a little too long', b'too short'):
  342             self.assertRaises(
  343                 ValueError, self.crypto.create_decryption_ctxt, key, bad_iv, 0)
  344 
  345         for bad_key in (b'objKey', b'a' * 31, b'a' * 33, b'a' * 16, b'a' * 24):
  346             self.assertRaises(
  347                 ValueError, self.crypto.create_decryption_ctxt, bad_key, iv, 0)
  348 
  349         with self.assertRaises(ValueError) as cm:
  350             self.crypto.create_decryption_ctxt(key, iv, -1)
  351         self.assertEqual("Offset must not be negative", cm.exception.args[0])
  352 
  353     def test_enc_dec_small_chunks(self):
  354         self.enc_dec_chunks([b'encrypt me', b'because I', b'am sensitive'])
  355 
  356     def test_enc_dec_large_chunks(self):
  357         self.enc_dec_chunks([os.urandom(65536), os.urandom(65536)])
  358 
  359     def enc_dec_chunks(self, chunks):
  360         key = b'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
  361         iv = self.crypto.create_iv()
  362         enc_ctxt = self.crypto.create_encryption_ctxt(key, iv)
  363         enc_val = [enc_ctxt.update(chunk) for chunk in chunks]
  364         self.assertTrue(b''.join(enc_val) != chunks)
  365         dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, 0)
  366         dec_val = [dec_ctxt.update(chunk) for chunk in enc_val]
  367         self.assertEqual(b''.join(chunks), b''.join(dec_val),
  368                          'Expected value {%s} but got {%s}' %
  369                          (b''.join(chunks), b''.join(dec_val)))
  370 
  371     def test_decrypt_range(self):
  372         chunks = [b'0123456789abcdef', b'ghijklmnopqrstuv']
  373         key = b'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
  374         iv = self.crypto.create_iv()
  375         enc_ctxt = self.crypto.create_encryption_ctxt(key, iv)
  376         enc_val = [enc_ctxt.update(chunk) for chunk in chunks]
  377 
  378         # Simulate a ranged GET from byte 19 to 32 : 'jklmnopqrstuv'
  379         dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, 19)
  380         ranged_chunks = [enc_val[1][3:]]
  381         dec_val = [dec_ctxt.update(chunk) for chunk in ranged_chunks]
  382         self.assertEqual(b'jklmnopqrstuv', b''.join(dec_val),
  383                          'Expected value {%s} but got {%s}' %
  384                          (b'jklmnopqrstuv', b''.join(dec_val)))
  385 
  386     def test_create_decryption_context_non_zero_offset(self):
  387         # Verify that iv increments for each 16 bytes of offset.
  388         # For a ranged GET we pass a non-zero offset so that the decrypter
  389         # counter is incremented to the correct value to start decrypting at
  390         # that offset into the object body. The counter should increment by one
  391         # from the starting IV value for every 16 bytes offset into the object
  392         # body, until it reaches 2^128 -1 when it should wrap to zero. We check
  393         # that is happening by verifying a decrypted value using various
  394         # offsets.
  395         key = b'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
  396 
  397         def do_test():
  398             for offset, exp_iv in mappings.items():
  399                 dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, offset)
  400                 offset_in_block = offset % 16
  401                 cipher = Cipher(algorithms.AES(key),
  402                                 modes.CTR(exp_iv),
  403                                 backend=default_backend())
  404                 expected = cipher.decryptor().update(
  405                     b'p' * offset_in_block + b'ciphertext')
  406                 actual = dec_ctxt.update(b'ciphertext')
  407                 expected = expected[offset % 16:]
  408                 self.assertEqual(expected, actual,
  409                                  'Expected %r but got %r, iv=%s and offset=%s'
  410                                  % (expected, actual, iv, offset))
  411 
  412         iv = b'0000000010000000'
  413         mappings = {
  414             2: b'0000000010000000',
  415             16: b'0000000010000001',
  416             19: b'0000000010000001',
  417             48: b'0000000010000003',
  418             1024: b'000000001000000p',
  419             5119: b'000000001000001o'
  420         }
  421         do_test()
  422 
  423         # choose max iv value and test that it wraps to zero
  424         iv = b'\xff' * 16
  425         mappings = {
  426             2: iv,
  427             16: bytes(bytearray.fromhex('00' * 16)),  # iv wraps to 0
  428             19: bytes(bytearray.fromhex('00' * 16)),
  429             48: bytes(bytearray.fromhex('00' * 15 + '02')),
  430             1024: bytes(bytearray.fromhex('00' * 15 + '3f')),
  431             5119: bytes(bytearray.fromhex('00' * 14 + '013E'))
  432         }
  433         do_test()
  434 
  435         iv = b'\x00' * 16
  436         mappings = {
  437             2: iv,
  438             16: bytes(bytearray.fromhex('00' * 15 + '01')),
  439             19: bytes(bytearray.fromhex('00' * 15 + '01')),
  440             48: bytes(bytearray.fromhex('00' * 15 + '03')),
  441             1024: bytes(bytearray.fromhex('00' * 15 + '40')),
  442             5119: bytes(bytearray.fromhex('00' * 14 + '013F'))
  443         }
  444         do_test()
  445 
  446         iv = b'\x00' * 8 + b'\xff' * 8
  447         mappings = {
  448             2: iv,
  449             16: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 8)),
  450             19: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 8)),
  451             48: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 7 + '02')),
  452             1024: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 7 + '3F')),
  453             5119: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 6 + '013E'))
  454         }
  455         do_test()
  456 
  457     def test_check_key(self):
  458         for key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
  459             with self.assertRaises(ValueError) as cm:
  460                 self.crypto.check_key(key)
  461             self.assertEqual("Key must be length 32 bytes",
  462                              cm.exception.args[0])
  463 
  464     def test_check_crypto_meta(self):
  465         meta = {'cipher': 'AES_CTR_256'}
  466         with self.assertRaises(EncryptionException) as cm:
  467             self.crypto.check_crypto_meta(meta)
  468         self.assertEqual("Bad crypto meta: Missing 'iv'",
  469                          cm.exception.args[0])
  470 
  471         for bad_iv in ('a little too long', 'too short'):
  472             meta['iv'] = bad_iv
  473             with self.assertRaises(EncryptionException) as cm:
  474                 self.crypto.check_crypto_meta(meta)
  475             self.assertEqual("Bad crypto meta: IV must be length 16 bytes",
  476                              cm.exception.args[0])
  477 
  478         meta = {'iv': os.urandom(16)}
  479         with self.assertRaises(EncryptionException) as cm:
  480             self.crypto.check_crypto_meta(meta)
  481         self.assertEqual("Bad crypto meta: Missing 'cipher'",
  482                          cm.exception.args[0])
  483 
  484         meta['cipher'] = 'Mystery cipher'
  485         with self.assertRaises(EncryptionException) as cm:
  486             self.crypto.check_crypto_meta(meta)
  487         self.assertEqual("Bad crypto meta: Cipher must be AES_CTR_256",
  488                          cm.exception.args[0])
  489 
  490     def test_create_iv(self):
  491         self.assertEqual(16, len(self.crypto.create_iv()))
  492         # crude check that we get back different values on each call
  493         self.assertNotEqual(self.crypto.create_iv(), self.crypto.create_iv())
  494 
  495     def test_get_crypto_meta(self):
  496         meta = self.crypto.create_crypto_meta()
  497         self.assertIsInstance(meta, dict)
  498         # this is deliberately brittle so that if new items are added then the
  499         # test will need to be updated
  500         self.assertEqual(2, len(meta))
  501         self.assertIn('iv', meta)
  502         self.assertEqual(16, len(meta['iv']))
  503         self.assertIn('cipher', meta)
  504         self.assertEqual('AES_CTR_256', meta['cipher'])
  505         self.crypto.check_crypto_meta(meta)  # sanity check
  506         meta2 = self.crypto.create_crypto_meta()
  507         self.assertNotEqual(meta['iv'], meta2['iv'])  # crude sanity check
  508 
  509     def test_create_random_key(self):
  510         # crude check that we get unique keys on each call
  511         keys = set()
  512         for i in range(10):
  513             key = self.crypto.create_random_key()
  514             self.assertEqual(32, len(key))
  515             keys.add(key)
  516         self.assertEqual(10, len(keys))
  517 
  518     def test_wrap_unwrap_key(self):
  519         wrapping_key = os.urandom(32)
  520         key_to_wrap = os.urandom(32)
  521         iv = os.urandom(16)
  522         with mock.patch(
  523                 'swift.common.middleware.crypto.crypto_utils.Crypto.create_iv',
  524                 return_value=iv):
  525             wrapped = self.crypto.wrap_key(wrapping_key, key_to_wrap)
  526         cipher = Cipher(algorithms.AES(wrapping_key), modes.CTR(iv),
  527                         backend=default_backend())
  528         expected = {'key': cipher.encryptor().update(key_to_wrap),
  529                     'iv': iv}
  530         self.assertEqual(expected, wrapped)
  531 
  532         unwrapped = self.crypto.unwrap_key(wrapping_key, wrapped)
  533         self.assertEqual(key_to_wrap, unwrapped)
  534 
  535     def test_unwrap_bad_key(self):
  536         # verify that ValueError is raised if unwrapped key is invalid
  537         wrapping_key = os.urandom(32)
  538         for length in (0, 16, 24, 31, 33):
  539             key_to_wrap = os.urandom(length)
  540             wrapped = self.crypto.wrap_key(wrapping_key, key_to_wrap)
  541             with self.assertRaises(ValueError) as cm:
  542                 self.crypto.unwrap_key(wrapping_key, wrapped)
  543             self.assertEqual(
  544                 cm.exception.args[0], 'Key must be length 32 bytes')
  545 
  546 
  547 if __name__ == '__main__':
  548     unittest.main()