"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/plugin/crypto/simple_crypto.py" (14 Apr 2021, 9698 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "simple_crypto.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 11.0.0_vs_12.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License");
    2 # you may not use this file except in compliance with the License.
    3 # You may obtain a copy of the License at
    4 #
    5 #    http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS,
    9 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   10 # implied.
   11 # See the License for the specific language governing permissions and
   12 # limitations under the License.
   13 import os
   14 
   15 from cryptography import fernet
   16 from cryptography.hazmat.backends import default_backend
   17 from cryptography.hazmat.primitives.asymmetric import dsa
   18 from cryptography.hazmat.primitives.asymmetric import rsa
   19 from cryptography.hazmat.primitives import serialization
   20 from oslo_config import cfg
   21 from oslo_utils import encodeutils
   22 import six
   23 
   24 from barbican.common import config
   25 from barbican.common import utils
   26 from barbican import i18n as u
   27 from barbican.plugin.crypto import base as c
   28 
   29 
   30 CONF = config.new_config()
   31 LOG = utils.getLogger(__name__)
   32 
   33 simple_crypto_plugin_group = cfg.OptGroup(name='simple_crypto_plugin',
   34                                           title="Simple Crypto Plugin Options")
   35 simple_crypto_plugin_opts = [
   36     cfg.StrOpt('kek',
   37                default='dGhpcnR5X3R3b19ieXRlX2tleWJsYWhibGFoYmxhaGg=',
   38                help=u._('Key encryption key to be used by Simple Crypto '
   39                         'Plugin'), secret=True),
   40     cfg.StrOpt('plugin_name',
   41                help=u._('User friendly plugin name'),
   42                default='Software Only Crypto'),
   43 ]
   44 CONF.register_group(simple_crypto_plugin_group)
   45 CONF.register_opts(simple_crypto_plugin_opts, group=simple_crypto_plugin_group)
   46 config.parse_args(CONF)
   47 
   48 
   49 def list_opts():
   50     yield simple_crypto_plugin_group, simple_crypto_plugin_opts
   51 
   52 
   53 class SimpleCryptoPlugin(c.CryptoPluginBase):
   54     """Insecure implementation of the crypto plugin."""
   55 
   56     def __init__(self, conf=CONF):
   57         self.master_kek = conf.simple_crypto_plugin.kek
   58         self.plugin_name = conf.simple_crypto_plugin.plugin_name
   59         LOG.info("{} initialized".format(self.plugin_name))
   60 
   61     def get_plugin_name(self):
   62         return self.plugin_name
   63 
   64     def _get_kek(self, kek_meta_dto):
   65         if not kek_meta_dto.plugin_meta:
   66             raise ValueError(u._('KEK not yet created.'))
   67         # the kek is stored encrypted. Need to decrypt.
   68         encryptor = fernet.Fernet(self.master_kek)
   69         # Note : If plugin_meta type is unicode, encode to byte.
   70         if isinstance(kek_meta_dto.plugin_meta, six.text_type):
   71             kek_meta_dto.plugin_meta = kek_meta_dto.plugin_meta.encode('utf-8')
   72 
   73         return encryptor.decrypt(kek_meta_dto.plugin_meta)
   74 
   75     def encrypt(self, encrypt_dto, kek_meta_dto, project_id):
   76         kek = self._get_kek(kek_meta_dto)
   77         unencrypted = encrypt_dto.unencrypted
   78         if not isinstance(unencrypted, six.binary_type):
   79             raise ValueError(
   80                 u._(
   81                     'Unencrypted data must be a byte type, but was '
   82                     '{unencrypted_type}'
   83                 ).format(
   84                     unencrypted_type=type(unencrypted)
   85                 )
   86             )
   87         encryptor = fernet.Fernet(kek)
   88         cyphertext = encryptor.encrypt(unencrypted)
   89         return c.ResponseDTO(cyphertext, None)
   90 
   91     def decrypt(self, encrypted_dto, kek_meta_dto, kek_meta_extended,
   92                 project_id):
   93         kek = self._get_kek(kek_meta_dto)
   94         encrypted = encrypted_dto.encrypted
   95         decryptor = fernet.Fernet(kek)
   96         return decryptor.decrypt(encrypted)
   97 
   98     def bind_kek_metadata(self, kek_meta_dto):
   99         kek_meta_dto.algorithm = 'aes'
  100         kek_meta_dto.bit_length = 128
  101         kek_meta_dto.mode = 'cbc'
  102         if not kek_meta_dto.plugin_meta:
  103             # the kek is stored encrypted in the plugin_meta field
  104             encryptor = fernet.Fernet(self.master_kek)
  105             key = fernet.Fernet.generate_key()
  106             kek_meta_dto.plugin_meta = encryptor.encrypt(key)
  107         return kek_meta_dto
  108 
  109     def generate_symmetric(self, generate_dto, kek_meta_dto, project_id):
  110         byte_length = int(generate_dto.bit_length) // 8
  111         unencrypted = os.urandom(byte_length)
  112 
  113         return self.encrypt(c.EncryptDTO(unencrypted),
  114                             kek_meta_dto,
  115                             project_id)
  116 
  117     def generate_asymmetric(self, generate_dto, kek_meta_dto, project_id):
  118         """Generate asymmetric keys based on below rules:
  119 
  120         - RSA, with passphrase (supported)
  121         - RSA, without passphrase (supported)
  122         - DSA, without passphrase (supported)
  123         - DSA, with passphrase (supported)
  124         """
  125         if(generate_dto.algorithm is None or generate_dto
  126                 .algorithm.lower() == 'rsa'):
  127             private_key = rsa.generate_private_key(
  128                 public_exponent=65537,
  129                 key_size=generate_dto.bit_length,
  130                 backend=default_backend()
  131             )
  132         elif generate_dto.algorithm.lower() == 'dsa':
  133             private_key = dsa.generate_private_key(
  134                 key_size=generate_dto.bit_length,
  135                 backend=default_backend()
  136             )
  137         else:
  138             raise c.CryptoPrivateKeyFailureException()
  139 
  140         public_key = private_key.public_key()
  141 
  142         if generate_dto.algorithm.lower() == 'rsa':
  143             private_key = private_key.private_bytes(
  144                 encoding=serialization.Encoding.PEM,
  145                 format=serialization.PrivateFormat.PKCS8,
  146                 encryption_algorithm=self._get_encryption_algorithm(
  147                     generate_dto.passphrase)
  148             )
  149 
  150             public_key = public_key.public_bytes(
  151                 encoding=serialization.Encoding.PEM,
  152                 format=serialization.PublicFormat.SubjectPublicKeyInfo
  153             )
  154 
  155         if generate_dto.algorithm.lower() == 'dsa':
  156             private_key = private_key.private_bytes(
  157                 encoding=serialization.Encoding.DER,
  158                 format=serialization.PrivateFormat.PKCS8,
  159                 encryption_algorithm=self._get_encryption_algorithm(
  160                     generate_dto.passphrase)
  161             )
  162             public_key = public_key.public_bytes(
  163                 encoding=serialization.Encoding.DER,
  164                 format=serialization.PublicFormat.SubjectPublicKeyInfo
  165             )
  166 
  167         private_dto = self.encrypt(c.EncryptDTO(private_key),
  168                                    kek_meta_dto,
  169                                    project_id)
  170 
  171         public_dto = self.encrypt(c.EncryptDTO(public_key),
  172                                   kek_meta_dto,
  173                                   project_id)
  174 
  175         passphrase_dto = None
  176         if generate_dto.passphrase:
  177             if isinstance(generate_dto.passphrase, six.text_type):
  178                 generate_dto.passphrase = generate_dto.passphrase.encode(
  179                     'utf-8')
  180 
  181             passphrase_dto = self.encrypt(c.EncryptDTO(generate_dto.
  182                                                        passphrase),
  183                                           kek_meta_dto,
  184                                           project_id)
  185 
  186         return private_dto, public_dto, passphrase_dto
  187 
  188     def supports(self, type_enum, algorithm=None, bit_length=None,
  189                  mode=None):
  190         if type_enum == c.PluginSupportTypes.ENCRYPT_DECRYPT:
  191             return True
  192         elif type_enum == c.PluginSupportTypes.SYMMETRIC_KEY_GENERATION:
  193             return self._is_algorithm_supported(algorithm,
  194                                                 bit_length,
  195                                                 mode)
  196         elif type_enum == c.PluginSupportTypes.ASYMMETRIC_KEY_GENERATION:
  197             return self._is_algorithm_supported(algorithm,
  198                                                 bit_length,
  199                                                 mode)
  200         else:
  201             return False
  202 
  203     def _get_encryption_algorithm(self, passphrase):
  204         """Choose whether to use encryption or not based on passphrase
  205 
  206         serialization.BestAvailableEncryption fails if passphrase is not
  207         given or if less than one byte therefore we need to check if it is
  208         valid or not
  209         """
  210         if passphrase:
  211             # encryption requires password in bytes format
  212             algorithm = serialization.BestAvailableEncryption(
  213                 # default encoding is utf-8
  214                 encodeutils.safe_encode(passphrase)
  215             )
  216         else:
  217             algorithm = serialization.NoEncryption()
  218 
  219         return algorithm
  220 
  221     def _is_algorithm_supported(self, algorithm=None,
  222                                 bit_length=None, mode=None):
  223         """check if algorithm and bit_length combination is supported."""
  224         if algorithm is None or bit_length is None:
  225             return False
  226 
  227         length_factor = 1
  228 
  229         # xts-mode cuts the effective key for the algorithm in half,
  230         # so the bit_length must be the double of the supported length.
  231         # in the future there should be a validation of supported modes too.
  232         if mode is not None and mode.lower() == "xts":
  233             length_factor = 2
  234 
  235         if (algorithm.lower() in c.PluginSupportTypes.SYMMETRIC_ALGORITHMS
  236             and bit_length / length_factor
  237                 in c.PluginSupportTypes.SYMMETRIC_KEY_LENGTHS):
  238             return True
  239         elif (algorithm.lower() in c.PluginSupportTypes.ASYMMETRIC_ALGORITHMS
  240               and bit_length in c.PluginSupportTypes.ASYMMETRIC_KEY_LENGTHS):
  241             return True
  242         else:
  243             return False