"Fossies" - the Fresh Open Source Software Archive

Member "magnum-8.1.0/magnum/common/x509/operations.py" (1 Oct 2019, 10172 Bytes) of package /linux/misc/openstack/magnum-8.1.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "operations.py" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 7.1.0_vs_8.0.0.

    1 # Copyright 2015 NEC Corporation.  All rights reserved.
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import datetime
   16 import six
   17 import uuid
   18 
   19 from cryptography.hazmat.backends import default_backend
   20 from cryptography.hazmat.primitives.asymmetric import rsa
   21 from cryptography.hazmat.primitives import hashes
   22 from cryptography.hazmat.primitives import serialization
   23 from cryptography import x509
   24 from oslo_log import log as logging
   25 
   26 from magnum.common import exception
   27 from magnum.common.x509 import validator
   28 import magnum.conf
   29 
   30 LOG = logging.getLogger(__name__)
   31 
   32 CONF = magnum.conf.CONF
   33 
   34 
   35 def generate_ca_certificate(subject_name, encryption_password=None):
   36     """Generate CA Certificate
   37 
   38     :param subject_name: subject name of CA
   39     :param encryption_password: encryption passsword for private key
   40     :returns: generated private key and certificate pair
   41     """
   42     return _generate_self_signed_certificate(
   43         subject_name,
   44         _build_ca_extentions(),
   45         encryption_password=encryption_password
   46     )
   47 
   48 
   49 def generate_client_certificate(issuer_name, subject_name,
   50                                 organization_name, ca_key,
   51                                 encryption_password=None,
   52                                 ca_key_password=None):
   53     """Generate Client Certificate
   54 
   55     :param issuer_name: issuer name
   56     :param subject_name: subject name of client
   57     :param organization_name: Organization name of client
   58     :param ca_key: private key of CA
   59     :param encryption_password: encryption passsword for private key
   60     :param ca_key_password: private key password for given ca key
   61     :returns: generated private key and certificate pair
   62     """
   63     return _generate_certificate(issuer_name, subject_name,
   64                                  _build_client_extentions(),
   65                                  organization_name, ca_key=ca_key,
   66                                  encryption_password=encryption_password,
   67                                  ca_key_password=ca_key_password)
   68 
   69 
   70 def _build_client_extentions():
   71     # Digital Signature and Key Encipherment are enabled
   72     key_usage = x509.KeyUsage(True, False, True, False, False, False, False,
   73                               False, False)
   74     key_usage = x509.Extension(key_usage.oid, True, key_usage)
   75     extended_key_usage = x509.ExtendedKeyUsage([x509.OID_CLIENT_AUTH])
   76     extended_key_usage = x509.Extension(extended_key_usage.oid, False,
   77                                         extended_key_usage)
   78     basic_constraints = x509.BasicConstraints(ca=False, path_length=None)
   79     basic_constraints = x509.Extension(basic_constraints.oid, True,
   80                                        basic_constraints)
   81 
   82     return [key_usage, extended_key_usage, basic_constraints]
   83 
   84 
   85 def _build_ca_extentions():
   86     # Certificate Sign is enabled
   87     key_usage = x509.KeyUsage(False, False, False, False, False, True, False,
   88                               False, False)
   89     key_usage = x509.Extension(key_usage.oid, True, key_usage)
   90 
   91     basic_constraints = x509.BasicConstraints(ca=True, path_length=0)
   92     basic_constraints = x509.Extension(basic_constraints.oid, True,
   93                                        basic_constraints)
   94 
   95     return [basic_constraints, key_usage]
   96 
   97 
   98 def _generate_self_signed_certificate(subject_name, extensions,
   99                                       encryption_password=None):
  100     return _generate_certificate(subject_name, subject_name, extensions,
  101                                  encryption_password=encryption_password)
  102 
  103 
  104 def _generate_certificate(issuer_name, subject_name, extensions,
  105                           organization_name=None, ca_key=None,
  106                           encryption_password=None, ca_key_password=None):
  107 
  108     if not isinstance(subject_name, six.text_type):
  109         subject_name = six.text_type(subject_name.decode('utf-8'))
  110     if organization_name and not isinstance(organization_name, six.text_type):
  111         organization_name = six.text_type(organization_name.decode('utf-8'))
  112 
  113     private_key = rsa.generate_private_key(
  114         public_exponent=65537,
  115         key_size=CONF.x509.rsa_key_size,
  116         backend=default_backend()
  117     )
  118 
  119     # subject name is set as common name
  120     csr = x509.CertificateSigningRequestBuilder()
  121     name_attributes = [x509.NameAttribute(x509.OID_COMMON_NAME, subject_name)]
  122     if organization_name:
  123         name_attributes.append(x509.NameAttribute(x509.OID_ORGANIZATION_NAME,
  124                                                   organization_name))
  125     csr = csr.subject_name(x509.Name(name_attributes))
  126 
  127     for extention in extensions:
  128         csr = csr.add_extension(extention.value, critical=extention.critical)
  129 
  130     # if ca_key is not provided, it means self signed
  131     if not ca_key:
  132         ca_key = private_key
  133         ca_key_password = encryption_password
  134 
  135     csr = csr.sign(private_key, hashes.SHA256(), default_backend())
  136 
  137     if six.PY3 and isinstance(encryption_password, six.text_type):
  138         encryption_password = encryption_password.encode()
  139 
  140     if encryption_password:
  141         encryption_algorithm = serialization.BestAvailableEncryption(
  142             encryption_password)
  143     else:
  144         encryption_algorithm = serialization.NoEncryption()
  145 
  146     private_key = private_key.private_bytes(
  147         encoding=serialization.Encoding.PEM,
  148         format=serialization.PrivateFormat.PKCS8,
  149         encryption_algorithm=encryption_algorithm
  150     )
  151 
  152     keypairs = {
  153         'private_key': private_key,
  154         'certificate': sign(
  155             csr,
  156             issuer_name,
  157             ca_key,
  158             ca_key_password=ca_key_password,
  159             skip_validation=True),
  160     }
  161     return keypairs
  162 
  163 
  164 def _load_pem_private_key(ca_key, ca_key_password=None):
  165     if not isinstance(ca_key, rsa.RSAPrivateKey):
  166         if isinstance(ca_key, six.text_type):
  167             ca_key = six.b(str(ca_key))
  168         if isinstance(ca_key_password, six.text_type):
  169             ca_key_password = six.b(str(ca_key_password))
  170 
  171         ca_key = serialization.load_pem_private_key(
  172             ca_key,
  173             password=ca_key_password,
  174             backend=default_backend()
  175         )
  176 
  177     return ca_key
  178 
  179 
  180 def sign(csr, issuer_name, ca_key, ca_key_password=None,
  181          skip_validation=False):
  182     """Sign a given csr
  183 
  184     :param csr: certificate signing request object or pem encoded csr
  185     :param issuer_name: issuer name
  186     :param ca_key: private key of CA
  187     :param ca_key_password: private key password for given ca key
  188     :param skip_validation: skip csr validation if true
  189     :returns: generated certificate
  190     """
  191 
  192     ca_key = _load_pem_private_key(ca_key, ca_key_password)
  193 
  194     if not isinstance(issuer_name, six.text_type):
  195         issuer_name = six.text_type(issuer_name.decode('utf-8'))
  196 
  197     if isinstance(csr, six.text_type):
  198         csr = six.b(str(csr))
  199     if not isinstance(csr, x509.CertificateSigningRequest):
  200         try:
  201             csr = x509.load_pem_x509_csr(csr, backend=default_backend())
  202         except ValueError:
  203             LOG.exception("Received invalid csr %s.", csr)
  204             raise exception.InvalidCsr(csr=csr)
  205 
  206     term_of_validity = CONF.x509.term_of_validity
  207     one_day = datetime.timedelta(1, 0, 0)
  208     expire_after = datetime.timedelta(term_of_validity, 0, 0)
  209 
  210     builder = x509.CertificateBuilder()
  211     builder = builder.subject_name(csr.subject)
  212     # issuer_name is set as common name
  213     builder = builder.issuer_name(x509.Name([
  214         x509.NameAttribute(x509.OID_COMMON_NAME, issuer_name),
  215     ]))
  216     builder = builder.not_valid_before(datetime.datetime.today() - one_day)
  217     builder = builder.not_valid_after(datetime.datetime.today() + expire_after)
  218     builder = builder.serial_number(int(uuid.uuid4()))
  219     builder = builder.public_key(csr.public_key())
  220 
  221     if skip_validation:
  222         extensions = csr.extensions
  223     else:
  224         extensions = validator.filter_extensions(csr.extensions)
  225 
  226     for extention in extensions:
  227         builder = builder.add_extension(extention.value,
  228                                         critical=extention.critical)
  229 
  230     certificate = builder.sign(
  231         private_key=ca_key, algorithm=hashes.SHA256(),
  232         backend=default_backend()
  233     ).public_bytes(serialization.Encoding.PEM).strip()
  234 
  235     return certificate
  236 
  237 
  238 def generate_csr_and_key(common_name):
  239     """Return a dict with a new csr, public key and private key."""
  240     private_key = rsa.generate_private_key(
  241         public_exponent=65537,
  242         key_size=2048,
  243         backend=default_backend())
  244 
  245     public_key = private_key.public_key()
  246 
  247     csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
  248         x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, common_name),
  249     ])).sign(private_key, hashes.SHA256(), default_backend())
  250 
  251     result = {
  252         'csr': csr.public_bytes(
  253             encoding=serialization.Encoding.PEM).decode("utf-8"),
  254         'private_key': private_key.private_bytes(
  255             encoding=serialization.Encoding.PEM,
  256             format=serialization.PrivateFormat.TraditionalOpenSSL,
  257             encryption_algorithm=serialization.NoEncryption()).decode("utf-8"),
  258         'public_key': public_key.public_bytes(
  259             encoding=serialization.Encoding.PEM,
  260             format=serialization.PublicFormat.SubjectPublicKeyInfo).decode(
  261                 "utf-8"),
  262     }
  263 
  264     return result
  265 
  266 
  267 def decrypt_key(encrypted_key, password):
  268     private_key = _load_pem_private_key(encrypted_key, password)
  269 
  270     decrypted_pem = private_key.private_bytes(
  271         encoding=serialization.Encoding.PEM,
  272         format=serialization.PrivateFormat.PKCS8,
  273         encryption_algorithm=serialization.NoEncryption()
  274     )
  275     return decrypted_pem