"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/plugin/castellan_secret_store.py" (14 Apr 2021, 5790 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "castellan_secret_store.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 11.0.0_vs_12.0.0.

    1 # Copyright (c) 2018 Red Hat Inc.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #    http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
   12 # implied.
   13 # See the License for the specific language governing permissions and
   14 # limitations under the License.
   15 
   16 import abc
   17 import six
   18 
   19 from castellan.common.objects import opaque_data
   20 from castellan import key_manager
   21 from oslo_context import context
   22 from oslo_log import log
   23 
   24 from barbican.plugin.interface import secret_store as ss
   25 
   26 LOG = log.getLogger(__name__)
   27 
   28 
   29 class CastellanSecretStore(ss.SecretStoreBase, metaclass=abc.ABCMeta):
   30 
   31     KEY_ID = "key_id"
   32     ALG = "alg"
   33     BIT_LENGTH = "bit_length"
   34 
   35     def _set_params(self, conf):
   36         self.key_manager = key_manager.API(conf)
   37         self.context = context.get_current()
   38 
   39     @abc.abstractmethod
   40     def get_conf(self, conf):
   41         """Get plugin configuration
   42 
   43         This method is supposed to be implemented by the relevant
   44         subclass.  This method reads in the config for the plugin
   45         in barbican.conf -- which should look like the way other
   46         barbican plugins are configured, and convert them to the
   47         proper oslo.config object to be passed to the keymanager
   48         API. (keymanager.API(conf)
   49 
   50         @returns oslo.config object
   51         """
   52         raise NotImplementedError  # pragma: no cover
   53 
   54     @abc.abstractmethod
   55     def get_plugin_name(self):
   56         """Get plugin name
   57 
   58         This method is implemented by the subclass.
   59         Note that this name must be unique across the deployment.
   60         """
   61         raise NotImplementedError  # pragma: no cover
   62 
   63     def get_secret(self, secret_type, secret_metadata):
   64         secret_ref = secret_metadata[CastellanSecretStore.KEY_ID]
   65         try:
   66             secret = self.key_manager.get(
   67                 self.context,
   68                 secret_ref)
   69 
   70             return ss.SecretDTO(secret_type, secret.get_encoded(),
   71                                 ss.KeySpec(), secret_metadata['content_type'])
   72         except Exception as e:
   73             LOG.exception("Error retrieving secret {}: {}".format(
   74                 secret_ref, six.text_type(e)))
   75             raise ss.SecretGeneralException(e)
   76 
   77     def store_secret(self, secret_dto):
   78         if not self.store_secret_supports(secret_dto.key_spec):
   79             raise ss.SecretAlgorithmNotSupportedException(
   80                 secret_dto.key_spec.alg)
   81 
   82         try:
   83             secret_ref = self.key_manager.store(
   84                 self.context,
   85                 opaque_data.OpaqueData(secret_dto.secret)
   86             )
   87             return {CastellanSecretStore.KEY_ID: secret_ref}
   88         except Exception as e:
   89             LOG.exception("Error storing secret: {}".format(
   90                 six.text_type(e)))
   91             raise ss.SecretGeneralException(e)
   92 
   93     def delete_secret(self, secret_metadata):
   94         secret_ref = secret_metadata[CastellanSecretStore.KEY_ID]
   95         try:
   96             self.key_manager.delete(
   97                 self.context,
   98                 secret_ref)
   99         except KeyError:
  100             LOG.warning("Attempting to delete a non-existent secret {}".format(
  101                 secret_ref))
  102         except Exception as e:
  103             LOG.exception("Error deleting secret: {}".format(
  104                 six.text_type(e)))
  105             raise ss.SecretGeneralException(e)
  106 
  107     def generate_symmetric_key(self, key_spec):
  108         if not self.generate_supports(key_spec):
  109             raise ss.SecretAlgorithmNotSupportedException(
  110                 key_spec.alg)
  111         try:
  112             secret_ref = self.key_manager.create_key(
  113                 self.context,
  114                 key_spec.alg,
  115                 key_spec.bit_length
  116             )
  117             return {CastellanSecretStore.KEY_ID: secret_ref}
  118         except Exception as e:
  119             LOG.exception("Error generating symmetric key: {}".format(
  120                 six.text_type(e)))
  121             raise ss.SecretGeneralException(e)
  122 
  123     def generate_asymmetric_key(self, key_spec):
  124         if not self.generate_supports(key_spec):
  125             raise ss.SecretAlgorithmNotSupportedException(
  126                 key_spec.alg)
  127 
  128         if key_spec.passphrase:
  129             raise ss.GeneratePassphraseNotSupportedException()
  130 
  131         try:
  132             private_ref, public_ref = self.key_manager.create_key_pair(
  133                 self.context,
  134                 key_spec.alg,
  135                 key_spec.bit_length
  136             )
  137 
  138             private_key_metadata = {
  139                 CastellanSecretStore.ALG: key_spec.alg,
  140                 CastellanSecretStore.BIT_LENGTH: key_spec.bit_length,
  141                 CastellanSecretStore.KEY_ID: private_ref
  142             }
  143 
  144             public_key_metadata = {
  145                 CastellanSecretStore.ALG: key_spec.alg,
  146                 CastellanSecretStore.BIT_LENGTH: key_spec.bit_length,
  147                 CastellanSecretStore.KEY_ID: public_ref
  148             }
  149 
  150             return ss.AsymmetricKeyMetadataDTO(
  151                 private_key_metadata,
  152                 public_key_metadata,
  153                 None
  154             )
  155         except Exception as e:
  156             LOG.exception("Error generating asymmetric key: {}".format(
  157                 six.text_type(e)))
  158             raise ss.SecretGeneralException(e)
  159 
  160     @abc.abstractmethod
  161     def store_secret_supports(self, key_spec):
  162         raise NotImplementedError  # pragma: no cover
  163 
  164     @abc.abstractmethod
  165     def generate_supports(self, key_spec):
  166         raise NotImplementedError  # pragma: no cover