"Fossies" - the Fresh Open Source Software Archive

Member "barbican-12.0.0/barbican/plugin/snakeoil_ca.py" (14 Apr 2021, 17178 Bytes) of package /linux/misc/openstack/barbican-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "snakeoil_ca.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 11.0.0_vs_12.0.0.

    1 # Copyright 2014 Hewlett-Packard Development Company, L.P.
    2 # All Rights Reserved.
    3 #
    4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    5 #    not use this file except in compliance with the License. You may obtain
    6 #    a copy of the License at
    7 #
    8 #         http://www.apache.org/licenses/LICENSE-2.0
    9 #
   10 #    Unless required by applicable law or agreed to in writing, software
   11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13 #    License for the specific language governing permissions and limitations
   14 #    under the License.
   15 
   16 import base64
   17 import datetime
   18 import os
   19 import re
   20 import subprocess  # nosec
   21 from tempfile import mkstemp
   22 import uuid
   23 
   24 from OpenSSL import crypto
   25 from oslo_config import cfg
   26 from oslo_utils import fnmatch
   27 from oslo_utils import uuidutils
   28 
   29 from barbican.common import config
   30 from barbican.common import utils
   31 from barbican import i18n as u
   32 import barbican.plugin.interface.certificate_manager as cert_manager
   33 
   34 CONF = config.new_config()
   35 LOG = utils.getLogger(__name__)
   36 
   37 
   38 snakeoil_ca_plugin_group = cfg.OptGroup(name='snakeoil_ca_plugin',
   39                                         title="Snakeoil CA Plugin Options")
   40 
   41 snakeoil_ca_plugin_opts = [
   42     cfg.StrOpt('ca_cert_path',
   43                help=u._('Path to CA certificate file')),
   44     cfg.StrOpt('ca_cert_key_path',
   45                help=u._('Path to CA certificate key file')),
   46     cfg.StrOpt('ca_cert_chain_path',
   47                help=u._('Path to CA certificate chain file')),
   48     cfg.StrOpt('ca_cert_pkcs7_path',
   49                help=u._('Path to CA chain pkcs7 file')),
   50     cfg.StrOpt('subca_cert_key_directory',
   51                default='/etc/barbican/snakeoil-cas',
   52                help=u._('Directory in which to store certs/keys for subcas')),
   53 ]
   54 
   55 CONF.register_group(snakeoil_ca_plugin_group)
   56 CONF.register_opts(snakeoil_ca_plugin_opts, group=snakeoil_ca_plugin_group)
   57 config.parse_args(CONF)
   58 
   59 
   60 def list_opts():
   61     yield snakeoil_ca_plugin_group, snakeoil_ca_plugin_opts
   62 
   63 
   64 def set_subject_X509Name(target, dn):
   65     """Set target X509Name object with parsed dn.
   66 
   67     This is very basic and should certainly be replaced by something using
   68     cryptography for instance, but will do for a basic test CA
   69     """
   70 
   71     # TODO(alee) Figure out why C (country) is not working
   72     fields = dn.split(',')
   73     for field in fields:
   74         m = re.search(r"(\w+)\s*=\s*(.+)", field.strip())
   75         name = m.group(1)
   76         value = m.group(2)
   77         if name.lower() == 'ou':
   78             target.OU = value
   79         elif name.lower() == 'st':
   80             target.ST = value
   81         elif name.lower() == 'cn':
   82             target.CN = value
   83         elif name.lower() == 'l':
   84             target.L = value
   85         elif name.lower() == 'o':
   86             target.O = value  # noqa: E741
   87     return target
   88 
   89 
   90 class SnakeoilCA(object):
   91 
   92     def __init__(self, cert_path=None, key_path=None, chain_path=None,
   93                  pkcs7_path=None, name=None, serial=1,
   94                  key_size=2048, expiry_days=10 * 365, x509_version=2,
   95                  subject_dn=None, signing_dn=None, signing_key=None,
   96                  parent_chain_path=None):
   97         self.cert_path = cert_path
   98         self.key_path = key_path
   99         self.chain_path = chain_path
  100         self.pkcs7_path = pkcs7_path
  101         self.name = name
  102         self.serial = serial
  103         self.key_size = key_size
  104         self.expiry_days = expiry_days
  105         self.x509_version = x509_version
  106 
  107         self.subject_dn = subject_dn
  108 
  109         if signing_dn is not None:
  110             self.signing_dn = signing_dn
  111         else:
  112             self.signing_dn = subject_dn    # self-signed
  113 
  114         self.signing_key = signing_key
  115         self.parent_chain_path = parent_chain_path
  116 
  117         self._cert_val = None
  118         self._key_val = None
  119         self._chain_val = None
  120         self._pkcs7_val = None
  121 
  122     @property
  123     def cert(self):
  124         self.ensure_exists()
  125         if self.cert_path:
  126             with open(self.cert_path, 'rb') as cert_fh:
  127                 return crypto.load_certificate(crypto.FILETYPE_PEM,
  128                                                cert_fh.read())
  129         else:
  130             return crypto.load_certificate(crypto.FILETYPE_PEM, self._cert_val)
  131 
  132     @cert.setter
  133     def cert(self, val):
  134         if self.cert_path:
  135             with open(self.cert_path, 'wb') as cert_fh:
  136                 cert_fh.write(crypto.dump_certificate(crypto.FILETYPE_PEM,
  137                                                       val))
  138         else:
  139             self._cert_val = crypto.dump_certificate(crypto.FILETYPE_PEM, val)
  140 
  141     @property
  142     def key(self):
  143         self.ensure_exists()
  144         if self.key_path:
  145             with open(self.key_path, 'rb') as key_fh:
  146                 return crypto.load_privatekey(crypto.FILETYPE_PEM,
  147                                               key_fh.read())
  148         else:
  149             return crypto.load_privatekey(crypto.FILETYPE_PEM, self._key_val)
  150 
  151     @key.setter
  152     def key(self, val):
  153         if self.key_path:
  154             with open(self.key_path, 'wb') as key_fh:
  155                 key_fh.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, val))
  156         else:
  157             self._key_val = crypto.dump_privatekey(crypto.FILETYPE_PEM, val)
  158 
  159     @property
  160     def chain(self):
  161         self.ensure_exists()
  162         if self.chain_path:
  163             with open(self.chain_path, 'rb') as chain_fh:
  164                 return chain_fh.read()
  165         else:
  166             return self._chain_val
  167 
  168     @chain.setter
  169     def chain(self, val):
  170         if self.chain_path:
  171             with open(self.chain_path, 'wb') as chain_fh:
  172                 chain_fh.write(val)
  173         else:
  174             self._chain_val = val
  175 
  176     @property
  177     def pkcs7(self):
  178         self.ensure_exists()
  179         if self.pkcs7_path:
  180             with open(self.pkcs7_path, 'rb') as pkcs7_fh:
  181                 return pkcs7_fh.read()
  182         else:
  183             return self._pkcs7_val
  184 
  185     @pkcs7.setter
  186     def pkcs7(self, val):
  187         if self.pkcs7_path:
  188             with open(self.pkcs7_path, 'wb') as pkcs7_fh:
  189                 pkcs7_fh.write(val)
  190         else:
  191             self._pkcs7_val = val
  192 
  193     @property
  194     def exists(self):
  195         if self.cert_path is not None:
  196             cert_exists = os.path.isfile(self.cert_path)
  197         else:
  198             cert_exists = self._cert_val is not None
  199 
  200         if self.key_path is not None:
  201             key_exists = os.path.isfile(self.key_path)
  202         else:
  203             key_exists = self._key_val is not None
  204 
  205         if self.chain_path is not None:
  206             chain_exists = os.path.isfile(self.chain_path)
  207         else:
  208             chain_exists = self._chain_val is not None
  209 
  210         if self.pkcs7_path is not None:
  211             pkcs7_exists = os.path.isfile(self.pkcs7_path)
  212         else:
  213             pkcs7_exists = self._pkcs7_val is not None
  214 
  215         return (cert_exists and key_exists and
  216                 pkcs7_exists and chain_exists)
  217 
  218     def ensure_exists(self):
  219         if not self.exists:
  220             LOG.debug('Keypair not found, creating new cert/key')
  221             self.cert, self.key, self.chain, self.pkcs7 = (
  222                 self.create_keypair())
  223 
  224     def create_keypair(self):
  225         LOG.debug('Generating Snakeoil CA')
  226         key = crypto.PKey()
  227         key.generate_key(crypto.TYPE_RSA, self.key_size)
  228 
  229         cert = crypto.X509()
  230         cert.set_version(self.x509_version)
  231         cert.set_serial_number(self.serial)
  232         subject = cert.get_subject()
  233         set_subject_X509Name(subject, self.subject_dn)
  234         cert.set_subject(subject)
  235         cert.gmtime_adj_notBefore(0)
  236         cert.gmtime_adj_notAfter(self.expiry_days)
  237         cert.set_issuer(set_subject_X509Name(
  238             cert.get_issuer(), self.signing_dn))
  239         cert.set_pubkey(key)
  240         cert.add_extensions([
  241             crypto.X509Extension(b"basicConstraints", True,
  242                                  b"CA:TRUE, pathlen:5"),
  243         ])
  244         if not self.signing_key:
  245             self.signing_key = key  # self-signed
  246 
  247         cert.sign(self.signing_key, 'sha256')
  248 
  249         LOG.debug('Snakeoil CA cert/key generated')
  250 
  251         chain = b''
  252         if self.parent_chain_path:
  253             with open(self.parent_chain_path, 'rb') as fh:
  254                 chain = fh.read()
  255         chain += crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
  256 
  257         pkcs7 = self._generate_pkcs7(chain)
  258         return cert, key, chain, pkcs7
  259 
  260     def _generate_pkcs7(self, chain):
  261         fin, temp_in = mkstemp()
  262         os.write(fin, chain)
  263         os.close(fin)
  264 
  265         fout, temp_out = mkstemp()
  266         os.close(fout)
  267 
  268         subprocess.call(['/usr/bin/openssl', 'crl2pkcs7', '-nocrl',  # nosec
  269                          '-out', temp_out, '-certfile', temp_in], shell=False)
  270         with open(temp_out, 'rb') as pkcs7_fh:
  271             pkcs7 = pkcs7_fh.read()
  272 
  273         os.remove(temp_in)
  274         os.remove(temp_out)
  275         return pkcs7
  276 
  277 
  278 class CertManager(object):
  279 
  280     def __init__(self, ca):
  281         self.ca = ca
  282 
  283     def get_new_serial(self):
  284         return uuid.uuid4().int
  285 
  286     def make_certificate(self, csr, expires=2 * 365):
  287         cert = crypto.X509()
  288         cert.set_serial_number(self.get_new_serial())
  289         cert.gmtime_adj_notBefore(0)
  290         cert.gmtime_adj_notAfter(expires)
  291         cert.set_issuer(self.ca.cert.get_subject())
  292         cert.set_subject(csr.get_subject())
  293         cert.set_pubkey(csr.get_pubkey())
  294         cert.sign(self.ca.key, 'sha256')
  295         return cert
  296 
  297 
  298 class SnakeoilCACertificatePlugin(cert_manager.CertificatePluginBase):
  299     """Snakeoil CA certificate plugin.
  300 
  301     This is used for easily generating certificates which are not useful in a
  302     production environment.
  303     """
  304 
  305     def __init__(self, conf=CONF):
  306         self.cas = {}
  307         self.ca = SnakeoilCA(
  308             cert_path=conf.snakeoil_ca_plugin.ca_cert_path,
  309             key_path=conf.snakeoil_ca_plugin.ca_cert_key_path,
  310             chain_path=conf.snakeoil_ca_plugin.ca_cert_chain_path,
  311             pkcs7_path=conf.snakeoil_ca_plugin.ca_cert_pkcs7_path,
  312             name=self.get_default_ca_name(),
  313             subject_dn="cn=Snakeoil Certificate,o=example.com"
  314         )
  315 
  316         self.cas[self.get_default_ca_name()] = self.ca
  317 
  318         self.subca_directory = conf.snakeoil_ca_plugin.subca_cert_key_directory
  319         if self.subca_directory:
  320             if not os.path.exists(self.subca_directory):
  321                 os.makedirs(self.subca_directory)    # pragma: no cover
  322             else:
  323                 self._reload_previously_created_subcas()
  324 
  325         self.cert_manager = CertManager(self.ca)
  326 
  327     def _reload_previously_created_subcas(self):
  328         for file in os.listdir(self.subca_directory):
  329             if fnmatch.fnmatch(file, '*.key'):
  330                 ca_id, _ext = os.path.splitext(file)
  331                 self.cas[ca_id] = SnakeoilCA(
  332                     cert_path=os.path.join(self.subca_directory,
  333                                            ca_id + ".cert"),
  334                     key_path=os.path.join(self.subca_directory, file),
  335                     chain_path=os.path.join(self.subca_directory,
  336                                             ca_id + ".chain"),
  337                     pkcs7_path=os.path.join(self.subca_directory,
  338                                             ca_id + ".p7b")
  339                 )
  340 
  341     def get_default_ca_name(self):
  342         return "Snakeoil CA"
  343 
  344     def get_default_signing_cert(self):
  345         return crypto.dump_certificate(crypto.FILETYPE_PEM, self.ca.cert)
  346 
  347     def get_default_intermediates(self):
  348         return None
  349 
  350     def supported_request_types(self):
  351         return [cert_manager.CertificateRequestType.CUSTOM_REQUEST,
  352                 cert_manager.CertificateRequestType.STORED_KEY_REQUEST]
  353 
  354     def issue_certificate_request(self, order_id, order_meta, plugin_meta,
  355                                   barbican_meta_dto):
  356         if barbican_meta_dto.generated_csr is not None:
  357             encoded_csr = barbican_meta_dto.generated_csr
  358         else:
  359             try:
  360                 encoded_csr = base64.b64decode(order_meta['request_data'])
  361             except KeyError:
  362                 return cert_manager.ResultDTO(
  363                     cert_manager.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
  364                     status_message=u._("No request_data specified"))
  365         csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, encoded_csr)
  366 
  367         ca_id = barbican_meta_dto.plugin_ca_id
  368         if ca_id:
  369             ca = self.cas.get(ca_id)
  370             if ca is None:
  371                 raise cert_manager.CertificateGeneralException(
  372                     "Invalid ca_id passed into snake oil plugin:" + ca_id)
  373         else:
  374             ca = self.ca
  375 
  376         cert_mgr = CertManager(ca)
  377         cert = cert_mgr.make_certificate(csr)
  378         cert_enc = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
  379 
  380         return cert_manager.ResultDTO(
  381             cert_manager.CertificateStatus.CERTIFICATE_GENERATED,
  382             certificate=base64.b64encode(cert_enc),
  383             intermediates=base64.b64encode(ca.pkcs7))
  384 
  385     def modify_certificate_request(self, order_id, order_meta, plugin_meta,
  386                                    barbican_meta_dto):
  387         raise NotImplementedError
  388 
  389     def cancel_certificate_request(self, order_id, order_meta, plugin_meta,
  390                                    barbican_meta_dto):
  391         raise NotImplementedError
  392 
  393     def check_certificate_status(self, order_id, order_meta, plugin_meta,
  394                                  barbican_meta_dto):
  395         raise NotImplementedError
  396 
  397     def supports(self, certificate_spec):
  398         request_type = certificate_spec.get(
  399             cert_manager.REQUEST_TYPE,
  400             cert_manager.CertificateRequestType.CUSTOM_REQUEST)
  401         return request_type in self.supported_request_types()
  402 
  403     def supports_create_ca(self):
  404         return True
  405 
  406     def create_ca(self, ca_create_dto):
  407         # get the parent CA from the ca list, return error if not on list
  408         parent_ca_id = ca_create_dto.parent_ca_id
  409         if not parent_ca_id:
  410             raise cert_manager.CertificateGeneralException(
  411                 "No parent id passed to snake oil plugin on create_ca")
  412 
  413         parent_ca = self.cas.get(parent_ca_id)
  414         if not parent_ca:
  415             raise cert_manager.CertificateGeneralException(
  416                 "Invalid parent id passed to snake oil plugin:" + parent_ca_id)
  417 
  418         # create a new ca, passing in key and issuer from the parent
  419         new_ca_id = uuidutils.generate_uuid()
  420         new_cert_path = os.path.join(self.subca_directory, new_ca_id + ".cert")
  421         new_key_path = os.path.join(self.subca_directory, new_ca_id + ".key")
  422         new_chain_path = os.path.join(self.subca_directory,
  423                                       new_ca_id + ".chain")
  424         new_pkcs7_path = os.path.join(self.subca_directory,
  425                                       new_ca_id + ".p7b")
  426         parent_chain_path = parent_ca.chain_path
  427 
  428         new_ca = SnakeoilCA(cert_path=new_cert_path,
  429                             key_path=new_key_path,
  430                             chain_path=new_chain_path,
  431                             pkcs7_path=new_pkcs7_path,
  432                             name=ca_create_dto.name,
  433                             subject_dn=ca_create_dto.subject_dn,
  434                             signing_dn=parent_ca.subject_dn,
  435                             signing_key=parent_ca.key,
  436                             parent_chain_path=parent_chain_path)
  437 
  438         self.cas[new_ca_id] = new_ca
  439 
  440         expiration = (datetime.datetime.utcnow() + datetime.timedelta(
  441             days=cert_manager.CA_INFO_DEFAULT_EXPIRATION_DAYS))
  442 
  443         return {
  444             cert_manager.INFO_NAME: new_ca.name,
  445             cert_manager.INFO_CA_SIGNING_CERT: crypto.dump_certificate(
  446                 crypto.FILETYPE_PEM, new_ca.cert),
  447             cert_manager.INFO_EXPIRATION: expiration.isoformat(),
  448             cert_manager.INFO_INTERMEDIATES: new_ca.pkcs7,
  449             cert_manager.PLUGIN_CA_ID: new_ca_id
  450         }
  451 
  452     def get_ca_info(self):
  453         expiration = (datetime.datetime.utcnow() + datetime.timedelta(
  454             days=cert_manager.CA_INFO_DEFAULT_EXPIRATION_DAYS))
  455 
  456         ret = {}
  457         for ca_id, ca in self.cas.items():
  458             ca_info = {
  459                 cert_manager.INFO_NAME: ca.name,
  460                 cert_manager.INFO_CA_SIGNING_CERT: crypto.dump_certificate(
  461                     crypto.FILETYPE_PEM, ca.cert),
  462                 cert_manager.INFO_INTERMEDIATES: ca.pkcs7,
  463                 cert_manager.INFO_EXPIRATION: expiration.isoformat()
  464             }
  465             ret[ca_id] = ca_info
  466 
  467         return ret
  468 
  469     def delete_ca(self, ca_id):
  470         self.cas.pop(ca_id)
  471 
  472         ca_files = [os.path.join(self.subca_directory, ca_id + ".cert"),
  473                     os.path.join(self.subca_directory, ca_id + ".key"),
  474                     os.path.join(self.subca_directory, ca_id + ".chain"),
  475                     os.path.join(self.subca_directory, ca_id + ".p7b")]
  476 
  477         for ca_file in ca_files:
  478             if os.path.exists(ca_file):
  479                 os.remove(ca_file)