barbican  12.0.0
About: OpenStack Barbican is the OpenStack Key Manager service. It provides secure storage, provisioning and management of secret data.
The "Wallaby" series (latest release).
  Fossies Dox: barbican-12.0.0.tar.gz  ("unofficial" and yet experimental doxygen-generated source code documentation)  

dogtag.py
Go to the documentation of this file.
1 # Copyright (c) 2014 Red Hat, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 
16 import base64
17 import copy
18 from cryptography.hazmat.backends import default_backend
19 from cryptography.hazmat.primitives import serialization
20 import datetime
21 import os
22 from oslo_utils import uuidutils
23 import time
24 
25 import pki
26 
27 subcas_available = True
28 try:
29  import pki.authority as authority
30  import pki.feature as feature
31 except ImportError:
32  subcas_available = False
33 
34 import pki.cert
35 import pki.client
36 import pki.crypto as cryptoutil
37 import pki.key as key
38 import pki.kra
39 import pki.profile
40 from requests import exceptions as request_exceptions
41 import six
42 
43 from barbican.common import exception
44 from barbican.common import utils
45 from barbican import i18n as u
46 
47 # we want to keep the dogtag config options separated. That way we
48 # do not need to import every dogtag requirement to generate the
49 # sample config
53 
54 # reuse the conf object to not call config.new_config() twice
55 CONF = barbican.plugin.dogtag_config_opts.CONF
56 LOG = utils.getLogger(__name__)
57 
58 CERT_HEADER = "-----BEGIN CERTIFICATE-----"
59 CERT_FOOTER = "-----END CERTIFICATE-----"
60 
61 KRA_TRANSPORT_NICK = "KRA transport cert"
62 
63 
64 def _create_nss_db_if_needed(nss_db_path, nss_password):
65  """Creates NSS DB if it's not setup already
66 
67  :returns: True or False whether the database was created or not.
68  """
69  if not os.path.exists(nss_db_path):
70  cryptoutil.NSSCryptoProvider.setup_database(
71  nss_db_path, nss_password, over_write=True)
72  return True
73  else:
74  LOG.info("The nss_db_path provided already exists, so the "
75  "database is assumed to be already set up.")
76  return False
77 
78 
80  """Sets up NSS Crypto functions
81 
82  This sets up the NSSCryptoProvider and the database it needs for it to
83  store certificates. If the path specified in the configuration is already
84  existent, it will assume that the database is already setup.
85 
86  This will also import the transport cert needed by the KRA if the NSS DB
87  was created.
88  """
89  nss_db_path, nss_password = (conf.dogtag_plugin.nss_db_path,
90  conf.dogtag_plugin.nss_password)
91  if nss_db_path is None:
92  LOG.warning("nss_db_path was not provided so the crypto "
93  "provider functions were not initialized.")
94  return None
95  if nss_password is None:
96  raise ValueError(u._("nss_password is required"))
97  if type(nss_password) is not six.binary_type:
98  # Password needs to be a bytes object in Python 3
99  nss_password = nss_password.encode('UTF-8')
100 
101  nss_db_created = _create_nss_db_if_needed(nss_db_path, nss_password)
102  crypto = cryptoutil.NSSCryptoProvider(nss_db_path, nss_password)
103  if nss_db_created:
105 
106  return crypto
107 
108 
110  try:
111  connection = create_connection(conf, 'kra')
112  kraclient = pki.kra.KRAClient(connection, crypto)
113  systemcert_client = kraclient.system_certs
114 
115  transport_cert = systemcert_client.get_transport_cert()
116  crypto.import_cert(KRA_TRANSPORT_NICK, transport_cert, ",,")
117  except Exception as e:
118  LOG.debug("Error importing KRA transport cert.", exc_info=True)
119  LOG.error("Error in importing transport cert."
120  " KRA may not be enabled: %s", e)
121 
122 
123 def create_connection(conf, subsystem_path):
124  pem_path = conf.dogtag_plugin.pem_path
125  if pem_path is None:
126  raise ValueError(u._("pem_path is required"))
127  # port is string type in PKIConnection
128  connection = pki.client.PKIConnection(
129  'https',
130  conf.dogtag_plugin.dogtag_host,
131  str(conf.dogtag_plugin.dogtag_port),
132  subsystem_path)
133  connection.set_authentication_cert(pem_path)
134  return connection
135 
136 
138 if crypto:
139  crypto.initialize()
140 
141 
142 class DogtagPluginAlgorithmException(exception.BarbicanException):
143  message = u._("Invalid algorithm passed in")
144 
145 
146 class DogtagPluginNotSupportedException(exception.NotSupported):
147  message = u._("Operation not supported by Dogtag Plugin")
148 
149  def __init__(self, msg=None):
150  if not msg:
151  message = self.messagemessage
152  else:
153  message = msg
154 
155  super(DogtagPluginNotSupportedException, self).__init__(message)
156 
157 
158 class DogtagPluginArchivalException(exception.BarbicanException):
159  message = u._("Key archival failed. Error returned from KRA.")
160 
161 
162 class DogtagPluginGenerationException(exception.BarbicanException):
163  message = u._("Key generation failed. Error returned from KRA.")
164 
165 
166 class DogtagKRAPlugin(sstore.SecretStoreBase):
167  """Implementation of the secret store plugin with KRA as the backend."""
168 
169  # metadata constants
170  ALG = "alg"
171  BIT_LENGTH = "bit_length"
172  GENERATED = "generated"
173  KEY_ID = "key_id"
174  SECRET_MODE = "secret_mode" # nosec
175  PASSPHRASE_KEY_ID = "passphrase_key_id" # nosec
176  CONVERT_TO_PEM = "convert_to_pem"
177 
178  # string constants
179  DSA_PRIVATE_KEY_HEADER = '-----BEGIN DSA PRIVATE KEY-----'
180  DSA_PRIVATE_KEY_FOOTER = '-----END DSA PRIVATE KEY-----'
181  DSA_PUBLIC_KEY_HEADER = '-----BEGIN DSA PUBLIC KEY-----'
182  DSA_PUBLIC_KEY_FOOTER = '-----END DSA PUBLIC KEY-----'
183 
184  def __init__(self, conf=CONF):
185  """Constructor - create the keyclient."""
186  LOG.debug("starting DogtagKRAPlugin init")
187  connection = create_connection(conf, 'kra')
188 
189  # create kraclient
190  kraclient = pki.kra.KRAClient(connection, crypto)
191  self.keyclientkeyclient = kraclient.keys
192 
193  self.keyclientkeyclient.set_transport_cert(KRA_TRANSPORT_NICK)
194  self.plugin_nameplugin_name = conf.dogtag_plugin.plugin_name
195  self.retriesretries = conf.dogtag_plugin.retries
196 
197  LOG.debug("completed DogtagKRAPlugin init")
198 
199  def get_plugin_name(self):
200  return self.plugin_nameplugin_name
201 
202  def store_secret(self, secret_dto):
203  """Store a secret in the KRA
204 
205  If secret_dto.transport_key is not None, then we expect
206  secret_dto.secret to include a base64 encoded PKIArchiveOptions
207  structure as defined in section 6.4 of RFC 2511. This package contains
208  a transport key wrapped session key, the session key wrapped secret
209  and parameters to specify the symmetric key wrapping.
210 
211  Otherwise, the data is unencrypted and we use a call to archive_key()
212  to have the Dogtag KRA client generate the relevant session keys.
213 
214  The secret_dto contains additional information on the type of secret
215  that is being stored. We will use that shortly. For, now, lets just
216  assume that its all PASS_PHRASE_TYPE
217 
218  Returns a dict with the relevant metadata (which in this case is just
219  the key_id
220  """
221  data_type = key.KeyClient.PASS_PHRASE_TYPE
222  key_id = None
223 
224  attempts = 0
225  offset_time = 1
226  while attempts <= self.retriesretries and key_id is None:
227  client_key_id = uuidutils.generate_uuid(dashed=False)
228  if secret_dto.transport_key is not None:
229  # TODO(alee-3) send the transport key with the archival request
230  # once the Dogtag Client API changes.
231  response = self.keyclientkeyclient.archive_pki_options(
232  client_key_id,
233  data_type,
234  secret_dto.secret,
235  key_algorithm=None,
236  key_size=None)
237  else:
238  response = self.keyclientkeyclient.archive_key(
239  client_key_id,
240  data_type,
241  secret_dto.secret,
242  key_algorithm=None,
243  key_size=None)
244 
245  key_id = response.get_key_id()
246 
247  if key_id is None:
248  LOG.warning("key_id is None. attempts: {}".format(attempts))
249  attempts += 1
250  time.sleep(offset_time)
251  offset_time += 1
252 
253  if key_id is None:
254  raise DogtagPluginArchivalException
255 
256  meta_dict = {DogtagKRAPlugin.KEY_ID: key_id}
257 
258  self._store_secret_attributes_store_secret_attributes(meta_dict, secret_dto)
259  return meta_dict
260 
261  def get_secret(self, secret_type, secret_metadata):
262  """Retrieve a secret from the KRA
263 
264  The secret_metadata is simply the dict returned by a store_secret() or
265  get_secret() call. We will extract the key_id from this dict.
266 
267  Note: There are two ways to retrieve secrets from the KRA.
268 
269  The first method calls retrieve_key without a wrapping key. This
270  relies on the KRA client to generate a wrapping key (and wrap it with
271  the KRA transport cert), and is completely transparent to the
272  Barbican server. What is returned to the caller is the
273  unencrypted secret.
274 
275  The second way is to provide a wrapping key that would be generated
276  on the barbican client. That way only the client will be
277  able to unwrap the secret. This wrapping key is provided in the
278  secret_metadata by Barbican core.
279 
280  Format/Type of the secret returned in the SecretDTO object.
281  -----------------------------------------------------------
282  The type of the secret returned is always dependent on the way it is
283  stored using the store_secret method.
284 
285  In case of strings - like passphrase/PEM strings, the return will be a
286  string.
287 
288  In case of binary data - the return will be the actual binary data.
289 
290  In case of retrieving an asymmetric key that is generated using the
291  dogtag plugin, then the binary representation of, the asymmetric key in
292  PEM format, is returned
293  """
294  key_id = secret_metadata[DogtagKRAPlugin.KEY_ID]
295 
296  key_spec = sstore.KeySpec(
297  alg=secret_metadata.get(DogtagKRAPlugin.ALG, None),
298  bit_length=secret_metadata.get(DogtagKRAPlugin.BIT_LENGTH, None),
299  mode=secret_metadata.get(DogtagKRAPlugin.SECRET_MODE, None),
300  passphrase=None
301  )
302 
303  generated = secret_metadata.get(DogtagKRAPlugin.GENERATED, False)
304 
305  passphrase = self._get_passphrase_for_a_private_key_get_passphrase_for_a_private_key(
306  secret_type, secret_metadata, key_spec)
307 
308  recovered_key = None
309  twsk = DogtagKRAPlugin._get_trans_wrapped_session_key(secret_type,
310  secret_metadata)
311 
312  if DogtagKRAPlugin.CONVERT_TO_PEM in secret_metadata:
313  # Case for returning the asymmetric keys generated in KRA.
314  # Asymmetric keys generated in KRA are not generated in PEM format.
315  # This marker DogtagKRAPlugin.CONVERT_TO_PEM is set in the
316  # secret_metadata for asymmetric keys generated in KRA to
317  # help convert the returned private/public keys to PEM format and
318  # eventually return the binary data of the keys in PEM format.
319 
320  if secret_type == sstore.SecretType.PUBLIC:
321  # Public key should be retrieved using the get_key_info method
322  # as it is treated as an attribute of the asymmetric key pair
323  # stored in the KRA database.
324 
325  key_info = self.keyclientkeyclient.get_key_info(key_id)
326  recovered_key = serialization.load_der_public_key(
327  key_info.public_key,
328  backend=default_backend()
329  ).public_bytes(
330  serialization.Encoding.PEM,
331  serialization.PublicFormat.PKCS1)
332 
333  elif secret_type == sstore.SecretType.PRIVATE:
334  key_data = self.keyclientkeyclient.retrieve_key(key_id)
335  private_key = serialization.load_der_private_key(
336  key_data.data,
337  password=None,
338  backend=default_backend()
339  )
340 
341  if passphrase is not None:
342  e_alg = serialization.BestAvailableEncryption(passphrase)
343  else:
344  e_alg = serialization.NoEncryption()
345 
346  recovered_key = private_key.private_bytes(
347  encoding=serialization.Encoding.PEM,
348  format=serialization.PrivateFormat.PKCS8,
349  encryption_algorithm=e_alg
350  )
351  else:
352  # TODO(alee-3) send transport key as well when dogtag client API
353  # changes in case the transport key has changed.
354  key_data = self.keyclientkeyclient.retrieve_key(key_id, twsk)
355  if twsk:
356  # The data returned is a byte array.
357  recovered_key = key_data.encrypted_data
358  else:
359  recovered_key = key_data.data
360 
361  # TODO(alee) remove final field when content_type is removed
362  # from secret_dto
363 
364  if generated:
365  recovered_key = base64.b64encode(recovered_key)
366 
367  ret = sstore.SecretDTO(
368  type=secret_type,
369  secret=recovered_key,
370  key_spec=key_spec,
371  content_type=None,
372  transport_key=None)
373 
374  return ret
375 
376  def delete_secret(self, secret_metadata):
377  """Delete a secret from the KRA
378 
379  There is currently no way to delete a secret in Dogtag.
380  We will be implementing such a method shortly.
381  """
382  pass
383 
384  def generate_symmetric_key(self, key_spec):
385  """Generate a symmetric key
386 
387  This calls generate_symmetric_key() on the KRA passing in the
388  algorithm, bit_length and id (used as the client_key_id) from
389  the secret. The remaining parameters are not used.
390 
391  Returns a metadata object that can be used for retrieving the secret.
392  """
393 
394  usages = [key.SymKeyGenerationRequest.DECRYPT_USAGE,
395  key.SymKeyGenerationRequest.ENCRYPT_USAGE]
396 
397  algorithm = self._map_algorithm(key_spec.alg.lower())
398 
399  if algorithm is None:
400  raise DogtagPluginAlgorithmException
401  passphrase = key_spec.passphrase
402  if passphrase:
403  raise DogtagPluginNotSupportedException(
404  u._("Passphrase encryption is not supported for symmetric"
405  " key generating algorithms."))
406 
407  key_id = None
408  attempts = 0
409  offset_time = 1
410  while attempts <= self.retries and key_id is None:
411  client_key_id = uuidutils.generate_uuid()
412  response = self.keyclient.generate_symmetric_key(
413  client_key_id,
414  algorithm,
415  key_spec.bit_length,
416  usages)
417  key_id = response.get_key_id()
418 
419  if key_id is None:
420  LOG.warning("generate_symkey: key_id is None. attempts: {}"
421  .format(attempts))
422  attempts += 1
423  time.sleep(offset_time)
424  offset_time += 1
425 
426  if key_id is None:
427  raise DogtagPluginGenerationException
428 
429  # Barbican expects stored keys to be base 64 encoded. We need to
430  # add flag to the keyclient.generate_symmetric_key() call above
431  # to ensure that the key that is stored is base64 encoded.
432  #
433  # As a workaround until that update is available, we will store a
434  # parameter "generated" to indicate that the response must be base64
435  # encoded on retrieval. Note that this will not work for transport
436  # key encoded data.
437  return {DogtagKRAPlugin.ALG: key_spec.alg,
438  DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
439  DogtagKRAPlugin.KEY_ID: response.get_key_id(),
440  DogtagKRAPlugin.GENERATED: True}
441 
442  def generate_asymmetric_key(self, key_spec):
443  """Generate an asymmetric key.
444 
445  Note that barbican expects all secrets to be base64 encoded.
446  """
447 
448  usages = [key.AsymKeyGenerationRequest.DECRYPT_USAGE,
449  key.AsymKeyGenerationRequest.ENCRYPT_USAGE]
450 
451  client_key_id = uuidutils.generate_uuid()
452  algorithm = self._map_algorithm_map_algorithm(key_spec.alg.lower())
453  passphrase = key_spec.passphrase
454 
455  if algorithm is None:
456  raise DogtagPluginAlgorithmException
457 
458  passphrase_key_id = None
459  passphrase_metadata = None
460  if passphrase:
461  if algorithm == key.KeyClient.DSA_ALGORITHM:
463  u._("Passphrase encryption is not "
464  "supported for DSA algorithm")
465  )
466 
467  stored_passphrase_info = self.keyclientkeyclient.archive_key(
468  uuidutils.generate_uuid(),
469  self.keyclientkeyclient.PASS_PHRASE_TYPE,
470  base64.b64encode(passphrase))
471 
472  passphrase_key_id = stored_passphrase_info.get_key_id()
473  passphrase_metadata = {
474  DogtagKRAPlugin.KEY_ID: passphrase_key_id
475  }
476 
477  # Barbican expects stored keys to be base 64 encoded. We need to
478  # add flag to the keyclient.generate_asymmetric_key() call above
479  # to ensure that the key that is stored is base64 encoded.
480  #
481  # As a workaround until that update is available, we will store a
482  # parameter "generated" to indicate that the response must be base64
483  # encoded on retrieval. Note that this will not work for transport
484  # key encoded data.
485 
486  response = self.keyclientkeyclient.generate_asymmetric_key(
487  client_key_id,
488  algorithm,
489  key_spec.bit_length,
490  usages)
491 
492  public_key_metadata = {
493  DogtagKRAPlugin.ALG: key_spec.alg,
494  DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
495  DogtagKRAPlugin.KEY_ID: response.get_key_id(),
496  DogtagKRAPlugin.CONVERT_TO_PEM: "true",
497  DogtagKRAPlugin.GENERATED: True
498  }
499 
500  private_key_metadata = {
501  DogtagKRAPlugin.ALG: key_spec.alg,
502  DogtagKRAPlugin.BIT_LENGTH: key_spec.bit_length,
503  DogtagKRAPlugin.KEY_ID: response.get_key_id(),
504  DogtagKRAPlugin.CONVERT_TO_PEM: "true",
505  DogtagKRAPlugin.GENERATED: True
506  }
507 
508  if passphrase_key_id:
509  private_key_metadata[DogtagKRAPlugin.PASSPHRASE_KEY_ID] = (
510  passphrase_key_id
511  )
512 
513  return sstore.AsymmetricKeyMetadataDTO(private_key_metadata,
514  public_key_metadata,
515  passphrase_metadata)
516 
517  def generate_supports(self, key_spec):
518  """Key generation supported?
519 
520  Specifies whether the plugin supports key generation with the
521  given key_spec.
522 
523  For now, we will just check the algorithm. When dogtag adds a
524  call to check the bit length as well, we will use that call to
525  take advantage of the bit_length information
526  """
527  return self._map_algorithm_map_algorithm(key_spec.alg) is not None
528 
529  def store_secret_supports(self, key_spec):
530  """Key storage supported?
531 
532  Specifies whether the plugin supports storage of the secret given
533  the attributes included in the KeySpec
534  """
535  return True
536 
537  @staticmethod
538  def _map_algorithm(algorithm):
539  """Map Barbican algorithms to Dogtag plugin algorithms.
540 
541  Note that only algorithms supported by Dogtag will be mapped.
542  """
543  if algorithm is None:
544  return None
545 
546  if algorithm.lower() == sstore.KeyAlgorithm.AES.lower():
547  return key.KeyClient.AES_ALGORITHM
548  elif algorithm.lower() == sstore.KeyAlgorithm.DES.lower():
549  return key.KeyClient.DES_ALGORITHM
550  elif algorithm.lower() == sstore.KeyAlgorithm.DESEDE.lower():
551  return key.KeyClient.DES3_ALGORITHM
552  elif algorithm.lower() == sstore.KeyAlgorithm.DSA.lower():
553  return key.KeyClient.DSA_ALGORITHM
554  elif algorithm.lower() == sstore.KeyAlgorithm.RSA.lower():
555  return key.KeyClient.RSA_ALGORITHM
556  elif algorithm.lower() == sstore.KeyAlgorithm.DIFFIE_HELLMAN.lower():
557  # may be supported, needs to be tested
558  return None
559  elif algorithm.lower() == sstore.KeyAlgorithm.EC.lower():
560  # asymmetric keys not yet supported
561  return None
562  else:
563  return None
564 
565  @staticmethod
566  def _store_secret_attributes(meta_dict, secret_dto):
567  # store the following attributes for retrieval
568  key_spec = secret_dto.key_spec
569  if key_spec is not None:
570  if key_spec.alg is not None:
571  meta_dict[DogtagKRAPlugin.ALG] = key_spec.alg
572  if key_spec.bit_length is not None:
573  meta_dict[DogtagKRAPlugin.BIT_LENGTH] = key_spec.bit_length
574  if key_spec.mode is not None:
575  meta_dict[DogtagKRAPlugin.SECRET_MODE] = key_spec.mode
576 
577  def _get_passphrase_for_a_private_key(self, secret_type, secret_metadata,
578  key_spec):
579  """Retrieve the passphrase for the private key stored in the KRA."""
580  if secret_type is None:
581  return None
582  if key_spec.alg is None:
583  return None
584 
585  passphrase = None
586  if DogtagKRAPlugin.PASSPHRASE_KEY_ID in secret_metadata:
587  if key_spec.alg.upper() == key.KeyClient.RSA_ALGORITHM:
588  passphrase = self.keyclientkeyclient.retrieve_key(
589  secret_metadata.get(DogtagKRAPlugin.PASSPHRASE_KEY_ID)
590  ).data
591  else:
592  if key_spec.alg.upper() == key.KeyClient.DSA_ALGORITHM:
593  raise sstore.SecretGeneralException(
594  u._("DSA keys should not have a passphrase in the"
595  " database, for being used during retrieval.")
596  )
597  raise sstore.SecretGeneralException(
598  u._("Secrets of type {secret_type} should not have a "
599  "passphrase in the database, for being used during "
600  "retrieval.").format(secret_type=secret_type)
601  )
602 
603  # note that Barbican expects the passphrase to be base64 encoded when
604  # stored, so we need to decode it.
605  if passphrase:
606  passphrase = base64.b64decode(passphrase)
607  return passphrase
608 
609  @staticmethod
610  def _get_trans_wrapped_session_key(secret_type, secret_metadata):
611  twsk = secret_metadata.get('trans_wrapped_session_key', None)
612  if secret_type in [sstore.SecretType.PUBLIC,
613  sstore.SecretType.PRIVATE]:
614  if twsk:
616  u._("Encryption using session key is not supported when "
617  "retrieving a {secret_type} "
618  "key.").format(secret_type=secret_type)
619  )
620 
621  return twsk
622 
623 
624 def _catch_request_exception(ca_related_function):
625  def _catch_ca_unavailable(self, *args, **kwargs):
626  try:
627  return ca_related_function(self, *args, **kwargs)
628  except request_exceptions.RequestException:
629  return cm.ResultDTO(
630  cm.CertificateStatus.CA_UNAVAILABLE_FOR_REQUEST)
631 
632  return _catch_ca_unavailable
633 
634 
635 def _catch_enrollment_exceptions(ca_related_function):
636  def _catch_enrollment_exception(self, *args, **kwargs):
637  try:
638  return ca_related_function(self, *args, **kwargs)
639  except pki.BadRequestException as e:
640  return cm.ResultDTO(
641  cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
642  status_message=e.message)
643  except pki.PKIException as e:
644  raise cm.CertificateGeneralException(
645  u._("Exception thrown by enroll_cert: {message}").format(
646  message=e.message))
647 
648  return _catch_enrollment_exception
649 
650 
651 def _catch_subca_creation_exceptions(ca_related_function):
652  def _catch_subca_exception(self, *args, **kwargs):
653  try:
654  return ca_related_function(self, *args, **kwargs)
655  except pki.BadRequestException as e:
656  raise exception.BadSubCACreationRequest(reason=e.message)
657  except pki.PKIException as e:
658  raise exception.SubCACreationErrors(reason=e.message)
659  except request_exceptions.RequestException:
660  raise exception.SubCACreationErrors(
661  reason="Unable to connect to CA")
662 
663  return _catch_subca_exception
664 
665 
666 def _catch_subca_deletion_exceptions(ca_related_function):
667  def _catch_subca_exception(self, *args, **kwargs):
668  try:
669  return ca_related_function(self, *args, **kwargs)
670  except pki.ResourceNotFoundException:
671  LOG.warning("Sub-CA already deleted")
672  pass
673  except pki.PKIException as e:
674  raise exception.SubCADeletionErrors(reason=e.message)
675  except request_exceptions.RequestException:
676  raise exception.SubCACreationErrors(
677  reason="Unable to connect to CA")
678 
679  return _catch_subca_exception
680 
681 
682 class DogtagCAPlugin(cm.CertificatePluginBase):
683  """Implementation of the cert plugin with Dogtag CA as the backend."""
684 
685  # order_metadata fields
686  PROFILE_ID = "profile_id"
687 
688  # plugin_metadata fields
689  REQUEST_ID = "request_id"
690 
691  def __init__(self, conf=CONF):
692  """Constructor - create the cert clients."""
693  connection = create_connection(conf, 'ca')
694  self.certclientcertclient = pki.cert.CertClient(connection)
695  self.simple_cmc_profilesimple_cmc_profile = conf.dogtag_plugin.simple_cmc_profile
696  self.auto_approved_profilesauto_approved_profiles = conf.dogtag_plugin.auto_approved_profiles
697 
698  self.working_dirworking_dir = conf.dogtag_plugin.plugin_working_dir
699  if not os.path.isdir(self.working_dirworking_dir):
700  os.mkdir(self.working_dirworking_dir)
701 
702  self._expiration_expiration = None
703  self._expiration_delta_expiration_delta = conf.dogtag_plugin.ca_expiration_time
704  self._expiration_data_path_expiration_data_path = os.path.join(self.working_dirworking_dir,
705  "expiration_data.txt")
706 
707  self._host_aid_path_host_aid_path = os.path.join(self.working_dirworking_dir, "host_aid.txt")
708  self._host_aid_host_aid = None
709 
710  if not os.path.isfile(self._expiration_data_path_expiration_data_path):
711  self.expirationexpirationexpirationexpiration = datetime.datetime.utcnow()
712 
713  global subcas_available
714  subcas_available = self._are_subcas_enabled_on_backend_are_subcas_enabled_on_backend(connection)
715  if subcas_available:
716  self.authority_clientauthority_client = authority.AuthorityClient(connection)
717  if not os.path.isfile(self._host_aid_path_host_aid_path):
718  self.host_aidhost_aidhost_aidhost_aid = self.get_host_aidget_host_aid()
719 
720  @property
721  def expiration(self):
722  if self._expiration_expiration is None:
723  try:
724  with open(self._expiration_data_path_expiration_data_path) as expiration_fh:
725  self._expiration_expiration = datetime.datetime.strptime(
726  expiration_fh.read(),
727  "%Y-%m-%d %H:%M:%S.%f"
728  )
729  except (ValueError, TypeError):
730  LOG.warning("Invalid data read from expiration file")
731  self.expirationexpirationexpirationexpiration = datetime.utcnow()
732  return self._expiration_expiration
733 
734  @expiration.setter
735  def expiration(self, val):
736  with open(self._expiration_data_path_expiration_data_path, 'w') as expiration_fh:
737  expiration_fh.write(val.strftime("%Y-%m-%d %H:%M:%S.%f"))
738  self._expiration_expiration = val
739 
740  @property
741  def host_aid(self):
742  if self._host_aid_host_aid is None:
743  with open(self._host_aid_path_host_aid_path) as host_aid_fh:
744  self._host_aid_host_aid = host_aid_fh.read()
745  return self._host_aid_host_aid
746 
747  @host_aid.setter
748  def host_aid(self, val):
749  if val is not None:
750  with open(self._host_aid_path_host_aid_path, 'w') as host_aid_fh:
751  host_aid_fh.write(val)
752  self._host_aid_host_aid = val
753 
754  def _are_subcas_enabled_on_backend(self, connection):
755  """Check if subca feature is available
756 
757  SubCA creation must be supported in both the Dogtag client as well
758  as on the back-end server. Moreover, it must be enabled on the
759  backend server. This method sets the subcas_available global variable.
760  :return: True/False
761  """
762  global subcas_available
763  if subcas_available:
764  # subcas are supported in the Dogtag client
765  try:
766  feature_client = feature.FeatureClient(connection)
767  authority_feature = feature_client.get_feature("authority")
768  if authority_feature.enabled:
769  LOG.info("Sub-CAs are enabled by Dogtag server")
770  return True
771  else:
772  LOG.info("Sub-CAs are not enabled by Dogtag server")
773  except (request_exceptions.HTTPError,
774  pki.ResourceNotFoundException):
775  LOG.info("Sub-CAs are not supported by Dogtag server")
776  else:
777  LOG.info("Sub-CAs are not supported by Dogtag client")
778  return False
779 
780  def _get_request_id(self, order_id, plugin_meta, operation):
781  request_id = plugin_meta.get(self.REQUEST_IDREQUEST_ID, None)
782  if not request_id:
783  raise cm.CertificateGeneralException(
784  u._(
785  "{request} not found for {operation} for "
786  "order_id {order_id}"
787  ).format(
788  request=self.REQUEST_IDREQUEST_ID,
789  operation=operation,
790  order_id=order_id
791  )
792  )
793  return request_id
794 
795  @_catch_request_exception
796  def _get_request(self, request_id):
797  try:
798  return self.certclientcertclient.get_request(request_id)
799  except pki.RequestNotFoundException:
800  return None
801 
802  @_catch_request_exception
803  def _get_cert(self, cert_id):
804  try:
805  return self.certclientcertclient.get_cert(cert_id)
806  except pki.CertNotFoundException:
807  return None
808 
810  return "Dogtag CA"
811 
813  # TODO(alee) Add code to get the signing cert
814  return None
815 
817  # TODO(alee) Add code to get the cert chain
818  return None
819 
820  def check_certificate_status(self, order_id, order_meta, plugin_meta,
821  barbican_meta_dto):
822  """Check the status of a certificate request.
823 
824  :param order_id: ID of the order associated with this request
825  :param order_meta: order_metadata associated with this order
826  :param plugin_meta: data populated by previous calls for this order,
827  in particular the request_id
828  :param barbican_meta_dto: additional data needed to process order.
829  :return: cm.ResultDTO
830  """
831  request_id = self._get_request_id_get_request_id(order_id, plugin_meta, "checking")
832 
833  request = self._get_request_get_request(request_id)
834  if not request:
835  raise cm.CertificateGeneralException(
836  u._(
837  "No request found for request_id {request_id} for "
838  "order {order_id}"
839  ).format(
840  request_id=request_id,
841  order_id=order_id
842  )
843  )
844 
845  request_status = request.request_status
846 
847  if request_status == pki.cert.CertRequestStatus.REJECTED:
848  return cm.ResultDTO(
849  cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
850  status_message=request.error_message)
851  elif request_status == pki.cert.CertRequestStatus.CANCELED:
852  return cm.ResultDTO(
853  cm.CertificateStatus.REQUEST_CANCELED)
854  elif request_status == pki.cert.CertRequestStatus.PENDING:
855  return cm.ResultDTO(
856  cm.CertificateStatus.WAITING_FOR_CA)
857  elif request_status == pki.cert.CertRequestStatus.COMPLETE:
858  # get the cert
859  cert_id = request.cert_id
860  if not cert_id:
861  raise cm.CertificateGeneralException(
862  u._(
863  "Request {request_id} reports status_complete, but no "
864  "cert_id has been returned"
865  ).format(
866  request_id=request_id
867  )
868  )
869 
870  cert = self._get_cert_get_cert(cert_id)
871  if not cert:
872  raise cm.CertificateGeneralException(
873  u._("Certificate not found for cert_id: {cert_id}").format(
874  cert_id=cert_id
875  )
876  )
877  return cm.ResultDTO(
878  cm.CertificateStatus.CERTIFICATE_GENERATED,
879  certificate=cert.encoded,
880  intermediates=cert.pkcs7_cert_chain)
881  else:
882  raise cm.CertificateGeneralException(
883  u._("Invalid request_status returned by CA"))
884 
885  @_catch_request_exception
886  def issue_certificate_request(self, order_id, order_meta, plugin_meta,
887  barbican_meta_dto):
888  """Issue a certificate request to the Dogtag CA
889 
890  Call the relevant certificate issuance function depending on the
891  Barbican defined request type in the order_meta.
892 
893  :param order_id: ID of the order associated with this request
894  :param order_meta: dict containing all the inputs for this request.
895  This includes the request_type.
896  :param plugin_meta: Used to store data for status check
897  :param barbican_meta_dto: additional data needed to process order.
898  :return: cm.ResultDTO
899  """
900  request_type = order_meta.get(
901  cm.REQUEST_TYPE,
902  cm.CertificateRequestType.CUSTOM_REQUEST)
903 
904  jump_table = {
905  cm.CertificateRequestType.SIMPLE_CMC_REQUEST:
906  self._issue_simple_cmc_request_issue_simple_cmc_request,
907  cm.CertificateRequestType.FULL_CMC_REQUEST:
908  self._issue_full_cmc_request_issue_full_cmc_request,
909  cm.CertificateRequestType.STORED_KEY_REQUEST:
910  self._issue_stored_key_request_issue_stored_key_request,
911  cm.CertificateRequestType.CUSTOM_REQUEST:
912  self._issue_custom_certificate_request_issue_custom_certificate_request
913  }
914 
915  if request_type not in jump_table:
917  "Dogtag plugin does not support %s request type").format(
918  request_type))
919 
920  return jump_table[request_type](order_id, order_meta, plugin_meta,
921  barbican_meta_dto)
922 
923  @_catch_enrollment_exceptions
924  def _issue_simple_cmc_request(self, order_id, order_meta, plugin_meta,
925  barbican_meta_dto):
926  """Issue a simple CMC request to the Dogtag CA.
927 
928  :param order_id:
929  :param order_meta:
930  :param plugin_meta:
931  :param barbican_meta_dto:
932  :return: cm.ResultDTO
933  """
934  if barbican_meta_dto.generated_csr is not None:
935  csr = barbican_meta_dto.generated_csr
936  else:
937  # we expect the CSR to be base64 encoded PEM
938  # Dogtag CA needs it to be unencoded
939  csr = base64.b64decode(order_meta.get('request_data'))
940 
941  profile_id = order_meta.get('profile', self.simple_cmc_profilesimple_cmc_profile)
942  inputs = {
943  'cert_request_type': 'pkcs10',
944  'cert_request': csr
945  }
946 
947  return self._issue_certificate_request_issue_certificate_request(
948  profile_id, inputs, plugin_meta, barbican_meta_dto)
949 
950  @_catch_enrollment_exceptions
951  def _issue_full_cmc_request(self, order_id, order_meta, plugin_meta,
952  barbican_meta_dto):
953  """Issue a full CMC request to the Dogtag CA.
954 
955  :param order_id:
956  :param order_meta:
957  :param plugin_meta:
958  :param barbican_meta_dto:
959  :return: cm.ResultDTO
960  """
962  "Dogtag plugin does not support %s request type").format(
963  cm.CertificateRequestType.FULL_CMC_REQUEST))
964 
965  @_catch_enrollment_exceptions
966  def _issue_stored_key_request(self, order_id, order_meta, plugin_meta,
967  barbican_meta_dto):
968  """Issue a simple CMC request to the Dogtag CA.
969 
970  :param order_id:
971  :param order_meta:
972  :param plugin_meta:
973  :param barbican_meta_dto:
974  :return: cm.ResultDTO
975  """
976  return self._issue_simple_cmc_request_issue_simple_cmc_request(
977  order_id,
978  order_meta,
979  plugin_meta,
980  barbican_meta_dto)
981 
982  @_catch_enrollment_exceptions
983  def _issue_custom_certificate_request(self, order_id, order_meta,
984  plugin_meta, barbican_meta_dto):
985  """Issue a custom certificate request to Dogtag CA
986 
987  :param order_id: ID of the order associated with this request
988  :param order_meta: dict containing all the inputs required for a
989  particular profile. One of these must be the profile_id.
990  The exact fields (both optional and mandatory) depend on the
991  profile, but they will be exposed to the user in a method to
992  expose syntax. Depending on the profile, only the relevant fields
993  will be populated in the request. All others will be ignored.
994  :param plugin_meta: Used to store data for status check.
995  :param barbican_meta_dto: Extra data to aid in processing.
996  :return: cm.ResultDTO
997  """
998  profile_id = order_meta.get(self.PROFILE_IDPROFILE_ID, None)
999  if not profile_id:
1000  return cm.ResultDTO(
1001  cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
1002  status_message=u._("No profile_id specified"))
1003 
1004  # we expect the csr to be base64 encoded PEM data. Dogtag CA expects
1005  # PEM data though so we need to decode it.
1006  updated_meta = copy.deepcopy(order_meta)
1007  if 'cert_request' in updated_meta:
1008  updated_meta['cert_request'] = base64.b64decode(
1009  updated_meta['cert_request'])
1010 
1011  return self._issue_certificate_request_issue_certificate_request(
1012  profile_id, updated_meta, plugin_meta, barbican_meta_dto)
1013 
1014  def _issue_certificate_request(self, profile_id, inputs, plugin_meta,
1015  barbican_meta_dto):
1016  """Actually send the cert request to the Dogtag CA
1017 
1018  If the profile_id is one of the auto-approved profiles, then use
1019  the convenience enroll_cert() method to create and approve the request
1020  using the Barbican agent cert credentials. If not, then submit the
1021  request and wait for approval by a CA agent on the Dogtag CA.
1022 
1023  :param profile_id: enrollment profile
1024  :param inputs: dict of request inputs
1025  :param plugin_meta: Used to store data for status check.
1026  :param barbican_meta_dto: Extra data to aid in processing.
1027  :return: cm.ResultDTO
1028  """
1029  ca_id = barbican_meta_dto.plugin_ca_id or self.get_default_ca_nameget_default_ca_name()
1030 
1031  if profile_id in self.auto_approved_profilesauto_approved_profiles:
1032  if ca_id == self.get_default_ca_nameget_default_ca_name():
1033  results = self.certclientcertclient.enroll_cert(profile_id, inputs)
1034  else:
1035  results = self.certclientcertclient.enroll_cert(
1036  profile_id, inputs, ca_id)
1037  return self._process_auto_enrollment_results_process_auto_enrollment_results(
1038  results, plugin_meta, barbican_meta_dto)
1039  else:
1040  request = self.certclientcertclient.create_enrollment_request(
1041  profile_id, inputs)
1042  if ca_id == self.get_default_ca_nameget_default_ca_name():
1043  results = self.certclientcertclient.submit_enrollment_request(request)
1044  else:
1045  results = self.certclientcertclient.submit_enrollment_request(
1046  request, ca_id)
1047  return self._process_pending_enrollment_results_process_pending_enrollment_results(
1048  results, plugin_meta, barbican_meta_dto)
1049 
1050  def _process_auto_enrollment_results(self, enrollment_results,
1051  plugin_meta, barbican_meta_dto):
1052  """Process results received from Dogtag CA for auto-enrollment
1053 
1054  This processes data from enroll_cert, which submits, approves and
1055  gets the cert issued and returns as a list of CertEnrollment objects.
1056 
1057  :param enrollment_results: list of CertEnrollmentResult objects
1058  :param plugin_meta: metadata dict for storing plugin specific data
1059  :param barbican_meta_dto: object containing extra data to help process
1060  the request
1061  :return: cm.ResultDTO
1062  """
1063 
1064  # Although it is possible to create multiple certs in an invocation
1065  # of enroll_cert, Barbican cannot handle this case. Assume
1066  # only once cert and request generated for now.
1067  enrollment_result = enrollment_results[0]
1068  request = enrollment_result.request
1069  if not request:
1070  raise cm.CertificateGeneralException(
1071  u._("No request returned in enrollment_results"))
1072 
1073  # store the request_id in the plugin metadata
1074  plugin_meta[self.REQUEST_IDREQUEST_ID] = request.request_id
1075 
1076  cert = enrollment_result.cert
1077 
1078  return self._create_dto_create_dto(request.request_status,
1079  request.request_id,
1080  request.error_message,
1081  cert)
1082 
1083  def _process_pending_enrollment_results(self, results, plugin_meta,
1084  barbican_meta_dto):
1085  """Process results received from Dogtag CA for pending enrollment
1086 
1087  This method processes data returned by submit_enrollment_request(),
1088  which creates requests that still need to be approved by an agent.
1089 
1090  :param results: CertRequestInfoCollection object
1091  :param plugin_meta: metadata dict for storing plugin specific data
1092  :param barbican_meta_dto: object containing extra data to help process
1093  the request
1094  :return: cm.ResultDTO
1095  """
1096 
1097  # Although it is possible to create multiple certs in an invocation
1098  # of enroll_cert, Barbican cannot handle this case. Assume
1099  # only once cert and request generated for now
1100 
1101  cert_request_info = results.cert_request_info_list[0]
1102  status = cert_request_info.request_status
1103  request_id = getattr(cert_request_info, 'request_id', None)
1104  error_message = getattr(cert_request_info, 'error_message', None)
1105 
1106  # store the request_id in the plugin metadata
1107  if request_id:
1108  plugin_meta[self.REQUEST_IDREQUEST_ID] = request_id
1109 
1110  return self._create_dto_create_dto(status, request_id, error_message, None)
1111 
1112  def _create_dto(self, request_status, request_id, error_message, cert):
1113  dto = None
1114  if request_status == pki.cert.CertRequestStatus.COMPLETE:
1115  if cert is not None:
1116  # Barbican is expecting base 64 encoded PEM, so we base64
1117  # encode below.
1118  #
1119  # Currently there is an inconsistency in what Dogtag returns
1120  # for certificates and intermediates. For certs, we return
1121  # PEM, whereas for intermediates, we return headerless PEM.
1122  # This is being addressed in Dogtag ticket:
1123  # https://fedorahosted.org/pki/ticket/1374
1124  #
1125  # Until this is addressed, simply add the missing headers
1126  cert_chain = (CERT_HEADER + "\r\n" + cert.pkcs7_cert_chain +
1127  CERT_FOOTER)
1128 
1129  dto = cm.ResultDTO(cm.CertificateStatus.CERTIFICATE_GENERATED,
1130  certificate=base64.b64encode(cert.encoded),
1131  intermediates=base64.b64encode(cert_chain))
1132  else:
1133  raise cm.CertificateGeneralException(
1134  u._("request_id {req_id} returns COMPLETE but no cert "
1135  "returned").format(req_id=request_id))
1136 
1137  elif request_status == pki.cert.CertRequestStatus.REJECTED:
1138  dto = cm.ResultDTO(cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
1139  status_message=error_message)
1140  elif request_status == pki.cert.CertRequestStatus.CANCELED:
1141  dto = cm.ResultDTO(cm.CertificateStatus.REQUEST_CANCELED)
1142  elif request_status == pki.cert.CertRequestStatus.PENDING:
1143  dto = cm.ResultDTO(cm.CertificateStatus.WAITING_FOR_CA)
1144  else:
1145  raise cm.CertificateGeneralException(
1146  u._("Invalid request_status {status} for "
1147  "request_id {request_id}").format(
1148  status=request_status,
1149  request_id=request_id)
1150  )
1151 
1152  return dto
1153 
1154  def modify_certificate_request(self, order_id, order_meta, plugin_meta,
1155  barbican_meta_dto):
1156  """Modify a certificate request.
1157 
1158  Once a certificate request is generated, it cannot be modified.
1159  The only alternative is to cancel the request (if it has not already
1160  completed) and attempt a fresh enrolment. That is what will be
1161  attempted here.
1162  :param order_id: ID for this order
1163  :param order_meta: order metadata. It is assumed that the newly
1164  modified request data will be present here.
1165  :param plugin_meta: data stored on behalf of the plugin for further
1166  operations
1167  :param barbican_meta_dto: additional data needed to process order.
1168  :return: ResultDTO:
1169  """
1170  result_dto = self.cancel_certificate_requestcancel_certificate_request(
1171  order_id, order_meta, plugin_meta, barbican_meta_dto)
1172 
1173  if result_dto.status == cm.CertificateStatus.REQUEST_CANCELED:
1174  return self.issue_certificate_requestissue_certificate_request(
1175  order_id, order_meta, plugin_meta, barbican_meta_dto)
1176  elif result_dto.status == cm.CertificateStatus.INVALID_OPERATION:
1177  return cm.ResultDTO(
1178  cm.CertificateStatus.INVALID_OPERATION,
1179  status_message=u._(
1180  "Modify request: unable to cancel: "
1181  "{message}").format(message=result_dto.status_message)
1182  )
1183  else:
1184  # other status (ca_unavailable, client_data_issue)
1185  # return result from cancel operation
1186  return result_dto
1187 
1188  @_catch_request_exception
1189  def cancel_certificate_request(self, order_id, order_meta, plugin_meta,
1190  barbican_meta_dto):
1191  """Cancel a certificate request.
1192 
1193  :param order_id: ID for the order associated with this request
1194  :param order_meta: order metadata fdr this request
1195  :param plugin_meta: data stored by plugin for further processing.
1196  In particular, the request_id
1197  :param barbican_meta_dto: additional data needed to process order.
1198  :return: cm.ResultDTO:
1199  """
1200  request_id = self._get_request_id_get_request_id(order_id, plugin_meta, "cancelling")
1201 
1202  try:
1203  review_response = self.certclientcertclient.review_request(request_id)
1204  self.certclientcertclient.cancel_request(request_id, review_response)
1205 
1206  return cm.ResultDTO(cm.CertificateStatus.REQUEST_CANCELED)
1207  except pki.RequestNotFoundException:
1208  return cm.ResultDTO(
1209  cm.CertificateStatus.CLIENT_DATA_ISSUE_SEEN,
1210  status_message=u._("no request found for this order"))
1211  except pki.ConflictingOperationException as e:
1212  return cm.ResultDTO(
1213  cm.CertificateStatus.INVALID_OPERATION,
1214  status_message=e.message)
1215 
1216  def supports(self, certificate_spec):
1217  if cm.CA_TYPE in certificate_spec:
1218  return certificate_spec[cm.CA_TYPE] == cm.CA_PLUGIN_TYPE_DOGTAG
1219 
1220  if cm.CA_PLUGIN_TYPE_SYMANTEC in certificate_spec:
1221  # TODO(alee-3) Handle case where SKI is provided
1222  pass
1223 
1224  return True
1225 
1227  """Returns the request_types supported by this plugin.
1228 
1229  :returns: a list of the Barbican-core defined request_types
1230  supported by this plugin.
1231  """
1232  return [cm.CertificateRequestType.SIMPLE_CMC_REQUEST,
1233  cm.CertificateRequestType.STORED_KEY_REQUEST,
1234  cm.CertificateRequestType.CUSTOM_REQUEST]
1235 
1237  """Returns if this plugin and the backend CA supports subCAs
1238 
1239  :return: True/False
1240  """
1241  return subcas_available
1242 
1243  @_catch_subca_creation_exceptions
1244  def create_ca(self, ca_create_dto):
1245  """Creates a subordinate CA upon request
1246 
1247  :param ca_create_dto:
1248  Data transfer object :class:`CACreateDTO` containing data
1249  required to generate a subordinate CA. This data includes
1250  the subject DN of the new CA signing certificate, a name for
1251  the new CA and a reference to the CA that will issue the new
1252  subordinate CA's signing certificate,
1253 
1254  :return: ca_info:
1255  Dictionary containing the data needed to create a
1256  models.CertificateAuthority object
1257  """
1258  if not subcas_available:
1259  raise exception.SubCAsNotSupported(
1260  "Subordinate CAs are not supported by this Dogtag CA")
1261 
1262  parent_ca_id = self._get_correct_ca_id_get_correct_ca_id(ca_create_dto.parent_ca_id)
1263  ca_data = authority.AuthorityData(
1264  dn=ca_create_dto.subject_dn,
1265  parent_aid=parent_ca_id,
1266  description=ca_create_dto.name)
1267 
1268  new_ca_data = self.authority_clientauthority_client.create_ca(ca_data)
1269 
1270  cert = self.authority_clientauthority_client.get_cert(new_ca_data.aid, "PEM")
1271  chain = self.authority_clientauthority_client.get_chain(new_ca_data.aid, "PEM")
1272 
1273  return {
1274  cm.INFO_NAME: new_ca_data.description,
1275  cm.INFO_CA_SIGNING_CERT: cert,
1276  cm.INFO_EXPIRATION: self.expirationexpirationexpirationexpiration.isoformat(),
1277  cm.INFO_INTERMEDIATES: chain,
1278  cm.PLUGIN_CA_ID: new_ca_data.aid
1279  }
1280 
1281  def _get_correct_ca_id(self, plugin_ca_id):
1282  """Returns the correct authority id
1283 
1284  When the Dogtag plugin updates its CA list, any subcas will
1285  have a plugin_ca_id that matches the authority_id (aid) as
1286  returned from the backend CA.
1287 
1288  For migration purposes, though, ie. migrating from a non-subca
1289  environment to a subca one, we want the host CA to keep the same
1290  plugin_ca_id (which is the default_ca_name) so that no disruption
1291  occurs. Therefore, we need to store the host CA's authority ID
1292  (in get_ca_info) and return it here instead.
1293  """
1294  if plugin_ca_id == self.get_default_ca_nameget_default_ca_name():
1295  return self.host_aidhost_aidhost_aidhost_aid
1296  else:
1297  return plugin_ca_id
1298 
1299  @_catch_subca_deletion_exceptions
1300  def delete_ca(self, ca_id):
1301  """Deletes a subordinate CA
1302 
1303  :param ca_id: id for the CA as specified by the plugin
1304  :return: None
1305  """
1306  if not subcas_available:
1307  raise exception.SubCAsNotSupported(
1308  "Subordinate CAs are not supported by this Dogtag CA")
1309 
1310  # ca must be disabled first
1311  self.authority_clientauthority_client.disable_ca(ca_id)
1312  self.authority_clientauthority_client.delete_ca(ca_id)
1313 
1314  def get_ca_info(self):
1315  if not subcas_available:
1316  return super(DogtagCAPlugin, self).get_ca_info()
1317 
1318  self.expirationexpirationexpirationexpiration = (datetime.datetime.utcnow() + datetime.timedelta(
1319  days=int(self._expiration_delta_expiration_delta)))
1320 
1321  ret = {}
1322  cas = self.authority_clientauthority_client.list_cas()
1323  for ca_data in cas.ca_list:
1324  if not ca_data.enabled:
1325  continue
1326 
1327  cert = self.authority_clientauthority_client.get_cert(ca_data.aid, "PEM")
1328  chain = self.authority_clientauthority_client.get_chain(ca_data.aid, "PEM")
1329  ca_info = {
1330  cm.INFO_NAME: ca_data.description,
1331  cm.INFO_CA_SIGNING_CERT: cert,
1332  cm.INFO_INTERMEDIATES: chain,
1333  cm.INFO_EXPIRATION: self.expirationexpirationexpirationexpiration.isoformat()
1334  }
1335 
1336  # handle the migration case. The top level CA should continue
1337  # to work as before
1338 
1339  if ca_data.is_host_authority:
1340  ret[self.get_default_ca_nameget_default_ca_name()] = ca_info
1341  self.host_aidhost_aidhost_aidhost_aid = ca_data.aid
1342  else:
1343  ret[ca_data.aid] = ca_info
1344 
1345  return ret
1346 
1347  def get_host_aid(self):
1348  cas = self.authority_clientauthority_client.list_cas()
1349  for ca_data in cas.ca_list:
1350  if ca_data.is_host_authority:
1351  return ca_data.aid
1352  return None
def _issue_full_cmc_request(self, order_id, order_meta, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:952
def _process_auto_enrollment_results(self, enrollment_results, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:1051
def issue_certificate_request(self, order_id, order_meta, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:887
def _issue_stored_key_request(self, order_id, order_meta, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:967
def cancel_certificate_request(self, order_id, order_meta, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:1190
def _get_correct_ca_id(self, plugin_ca_id)
Definition: dogtag.py:1281
def _issue_certificate_request(self, profile_id, inputs, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:1015
def __init__(self, conf=CONF)
Definition: dogtag.py:691
def _issue_custom_certificate_request(self, order_id, order_meta, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:984
def _get_request(self, request_id)
Definition: dogtag.py:796
def supports(self, certificate_spec)
Definition: dogtag.py:1216
def _issue_simple_cmc_request(self, order_id, order_meta, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:925
def check_certificate_status(self, order_id, order_meta, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:821
def _create_dto(self, request_status, request_id, error_message, cert)
Definition: dogtag.py:1112
def _are_subcas_enabled_on_backend(self, connection)
Definition: dogtag.py:754
def _get_request_id(self, order_id, plugin_meta, operation)
Definition: dogtag.py:780
def create_ca(self, ca_create_dto)
Definition: dogtag.py:1244
def modify_certificate_request(self, order_id, order_meta, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:1155
def _get_cert(self, cert_id)
Definition: dogtag.py:803
def _process_pending_enrollment_results(self, results, plugin_meta, barbican_meta_dto)
Definition: dogtag.py:1084
def __init__(self, conf=CONF)
Definition: dogtag.py:184
def generate_supports(self, key_spec)
Definition: dogtag.py:517
def _store_secret_attributes(meta_dict, secret_dto)
Definition: dogtag.py:566
def generate_asymmetric_key(self, key_spec)
Definition: dogtag.py:442
def store_secret(self, secret_dto)
Definition: dogtag.py:202
def generate_symmetric_key(self, key_spec)
Definition: dogtag.py:384
def delete_secret(self, secret_metadata)
Definition: dogtag.py:376
def _get_passphrase_for_a_private_key(self, secret_type, secret_metadata, key_spec)
Definition: dogtag.py:578
def store_secret_supports(self, key_spec)
Definition: dogtag.py:529
def _get_trans_wrapped_session_key(secret_type, secret_metadata)
Definition: dogtag.py:610
def get_secret(self, secret_type, secret_metadata)
Definition: dogtag.py:261
def _catch_subca_creation_exceptions(ca_related_function)
Definition: dogtag.py:651
def _import_kra_transport_cert_to_nss_db(conf, crypto)
Definition: dogtag.py:109
def _catch_subca_deletion_exceptions(ca_related_function)
Definition: dogtag.py:666
def create_connection(conf, subsystem_path)
Definition: dogtag.py:123
def _setup_nss_db_services(conf)
Definition: dogtag.py:79
def _catch_enrollment_exceptions(ca_related_function)
Definition: dogtag.py:635
def _create_nss_db_if_needed(nss_db_path, nss_password)
Definition: dogtag.py:64
def _catch_request_exception(ca_related_function)
Definition: dogtag.py:624