"Fossies" - the Fresh Open Source Software Archive

Member "salt-3002.2/tests/integration/states/test_x509.py" (18 Nov 2020, 33950 Bytes) of package /linux/misc/salt-3002.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_x509.py": 3002.1_vs_3002.2.

    1 import datetime
    2 import hashlib
    3 import logging
    4 import os
    5 import pprint
    6 import textwrap
    7 
    8 import pytest
    9 import salt.utils.files
   10 from tests.support.case import ModuleCase
   11 from tests.support.helpers import slowTest, with_tempfile
   12 from tests.support.mixins import SaltReturnAssertsMixin
   13 from tests.support.runtests import RUNTIME_VARS
   14 from tests.support.unit import skipIf
   15 
   16 try:
   17     import M2Crypto  # pylint: disable=W0611
   18 
   19     HAS_M2CRYPTO = True
   20 except ImportError:
   21     HAS_M2CRYPTO = False
   22 
   23 log = logging.getLogger(__name__)
   24 
   25 
   26 @pytest.mark.usefixtures("salt_sub_minion")
   27 @skipIf(not HAS_M2CRYPTO, "Skip when no M2Crypto found")
   28 class x509Test(ModuleCase, SaltReturnAssertsMixin):
   29     @classmethod
   30     def setUpClass(cls):
   31         cert_path = os.path.join(RUNTIME_VARS.BASE_FILES, "x509_test.crt")
   32         with salt.utils.files.fopen(cert_path) as fp:
   33             cls.x509_cert_text = fp.read()
   34 
   35     def setUp(self):
   36         with salt.utils.files.fopen(
   37             os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"), "w"
   38         ) as fp:
   39             fp.write(
   40                 textwrap.dedent(
   41                     """\
   42                 x509_signing_policies:
   43                   ca_policy:
   44                     - minions: '*'
   45                     - signing_private_key: {0}/pki/ca.key
   46                     - signing_cert: {0}/pki/ca.crt
   47                     - O: Test Company
   48                     - basicConstraints: "CA:false"
   49                     - keyUsage: "critical digitalSignature, keyEncipherment"
   50                     - extendedKeyUsage: "critical serverAuth, clientAuth"
   51                     - subjectKeyIdentifier: hash
   52                     - authorityKeyIdentifier: keyid
   53                     - days_valid: 730
   54                     - copypath: {0}/pki
   55                   compound_match:
   56                     - minions: 'G@x509_test_grain:correct_value'
   57                     - signing_private_key: {0}/pki/ca.key
   58                     - signing_cert: {0}/pki/ca.crt
   59                     - O: Test Company
   60                     - basicConstraints: "CA:false"
   61                     - keyUsage: "critical digitalSignature, keyEncipherment"
   62                     - extendedKeyUsage: "critical serverAuth, clientAuth"
   63                     - subjectKeyIdentifier: hash
   64                     - authorityKeyIdentifier: keyid
   65                     - days_valid: 730
   66                     - copypath: {0}/pki
   67                      """.format(
   68                         RUNTIME_VARS.TMP
   69                     )
   70                 )
   71             )
   72         with salt.utils.files.fopen(
   73             os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"), "w"
   74         ) as fp:
   75             fp.write(
   76                 textwrap.dedent(
   77                     """\
   78                      base:
   79                        '*':
   80                          - signing_policies
   81                      """
   82                 )
   83             )
   84         self.run_function("saltutil.refresh_pillar")
   85         self.run_function(
   86             "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="sub_minion"
   87         )
   88         self.run_function(
   89             "grains.set", ["x509_test_grain", "not_correct_value"], minion_tgt="minion"
   90         )
   91 
   92     def tearDown(self):
   93         os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"))
   94         os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"))
   95         certs_path = os.path.join(RUNTIME_VARS.TMP, "pki")
   96         if os.path.exists(certs_path):
   97             salt.utils.files.rm_rf(certs_path)
   98         self.run_function("saltutil.refresh_pillar")
   99         self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="sub_minion")
  100         self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="minion")
  101 
  102     def run_function(self, *args, **kwargs):  # pylint: disable=arguments-differ
  103         ret = super().run_function(*args, **kwargs)
  104         return ret
  105 
  106     @staticmethod
  107     def file_checksum(path):
  108         hash = hashlib.sha1()
  109         with salt.utils.files.fopen(path, "rb") as f:
  110             for block in iter(lambda: f.read(4096), b""):
  111                 hash.update(block)
  112         return hash.hexdigest()
  113 
  114     @with_tempfile(suffix=".pem", create=False)
  115     @slowTest
  116     def test_issue_49027(self, pemfile):
  117         ret = self.run_state("x509.pem_managed", name=pemfile, text=self.x509_cert_text)
  118         assert isinstance(ret, dict), ret
  119         ret = ret[next(iter(ret))]
  120         assert ret.get("result") is True, ret
  121         with salt.utils.files.fopen(pemfile) as fp:
  122             result = fp.readlines()
  123         self.assertEqual(self.x509_cert_text.splitlines(True), result)
  124 
  125     @with_tempfile(suffix=".crt", create=False)
  126     @with_tempfile(suffix=".key", create=False)
  127     @slowTest
  128     def test_issue_49008(self, keyfile, crtfile):
  129         ret = self.run_function(
  130             "state.apply",
  131             ["issue-49008"],
  132             pillar={"keyfile": keyfile, "crtfile": crtfile},
  133         )
  134         assert isinstance(ret, dict), ret
  135         for state_result in ret.values():
  136             assert state_result["result"] is True, state_result
  137         assert os.path.exists(keyfile)
  138         assert os.path.exists(crtfile)
  139 
  140     @slowTest
  141     def test_cert_signing(self):
  142         ret = self.run_function(
  143             "state.apply", ["x509.cert_signing"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
  144         )
  145         key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  146             RUNTIME_VARS.TMP
  147         )
  148         assert key in ret
  149         assert "changes" in ret[key]
  150         assert "Certificate" in ret[key]["changes"]
  151         assert "New" in ret[key]["changes"]["Certificate"]
  152 
  153     @slowTest
  154     def test_cert_signing_based_on_csr(self):
  155         ret = self.run_function(
  156             "state.apply",
  157             ["x509.cert_signing_based_on_csr"],
  158             pillar={"tmp_dir": RUNTIME_VARS.TMP},
  159         )
  160         key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  161             RUNTIME_VARS.TMP
  162         )
  163         assert key in ret
  164         assert "changes" in ret[key]
  165         assert "Certificate" in ret[key]["changes"]
  166         assert "New" in ret[key]["changes"]["Certificate"]
  167 
  168     @slowTest
  169     def test_proper_cert_comparison(self):
  170         # In this SLS we define two certs which have identical content.
  171         # The first one is expected to be created.
  172         # The second one is expected to be recognized as already present.
  173         ret = self.run_function(
  174             "state.apply",
  175             ["x509.proper_cert_comparison"],
  176             pillar={"tmp_dir": RUNTIME_VARS.TMP},
  177         )
  178         # check the first generated cert
  179         first_key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  180             RUNTIME_VARS.TMP
  181         )
  182         assert first_key in ret
  183         assert "changes" in ret[first_key]
  184         assert "Certificate" in ret[first_key]["changes"]
  185         assert "New" in ret[first_key]["changes"]["Certificate"]
  186         # check whether the second defined cert is considered to match the first one
  187         second_key = "x509_|-second_test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  188             RUNTIME_VARS.TMP
  189         )
  190         assert second_key in ret
  191         assert "changes" in ret[second_key]
  192         assert ret[second_key]["changes"] == {}
  193 
  194     @slowTest
  195     def test_crl_managed(self):
  196         ret = self.run_function(
  197             "state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
  198         )
  199         key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format(
  200             RUNTIME_VARS.TMP, RUNTIME_VARS.TMP
  201         )
  202 
  203         # hints for easier debugging
  204         # import json
  205         # print(json.dumps(ret[key], indent=4, sort_keys=True))
  206         # print(ret[key]['comment'])
  207 
  208         assert key in ret
  209         assert "changes" in ret[key]
  210         self.assertEqual(ret[key]["result"], True)
  211         assert "New" in ret[key]["changes"]
  212         assert "Revoked Certificates" in ret[key]["changes"]["New"]
  213         self.assertEqual(
  214             ret[key]["changes"]["Old"],
  215             "{}/pki/ca.crl does not exist.".format(RUNTIME_VARS.TMP),
  216         )
  217 
  218     @slowTest
  219     def test_crl_managed_replacing_existing_crl(self):
  220         os.mkdir(os.path.join(RUNTIME_VARS.TMP, "pki"))
  221         with salt.utils.files.fopen(
  222             os.path.join(RUNTIME_VARS.TMP, "pki/ca.crl"), "wb"
  223         ) as crl_file:
  224             crl_file.write(
  225                 b"""-----BEGIN RSA PRIVATE KEY-----
  226 MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
  227 pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
  228 2rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
  229 AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr
  230 yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH
  231 hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R
  232 3MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7
  233 u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy
  234 kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj
  235 35WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk
  236 TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK
  237 tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
  238 c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
  239 -----END RSA PRIVATE KEY-----
  240 """
  241             )
  242 
  243         ret = self.run_function(
  244             "state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
  245         )
  246         key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format(
  247             RUNTIME_VARS.TMP, RUNTIME_VARS.TMP
  248         )
  249 
  250         # hints for easier debugging
  251         # import json
  252         # print(json.dumps(ret[key], indent=4, sort_keys=True))
  253         # print(ret[key]['comment'])
  254 
  255         assert key in ret
  256         assert "changes" in ret[key]
  257         self.assertEqual(ret[key]["result"], True)
  258         assert "New" in ret[key]["changes"]
  259         assert "Revoked Certificates" in ret[key]["changes"]["New"]
  260         self.assertEqual(
  261             ret[key]["changes"]["Old"],
  262             "{}/pki/ca.crl is not a valid CRL.".format(RUNTIME_VARS.TMP),
  263         )
  264 
  265     def test_cert_issue_not_before_not_after(self):
  266         ret = self.run_function(
  267             "state.apply",
  268             ["test_cert_not_before_not_after"],
  269             pillar={"tmp_dir": RUNTIME_VARS.TMP},
  270         )
  271         key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  272             RUNTIME_VARS.TMP
  273         )
  274         assert key in ret
  275         assert "changes" in ret[key]
  276         assert "Certificate" in ret[key]["changes"]
  277         assert "New" in ret[key]["changes"]["Certificate"]
  278         assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
  279         assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
  280         not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"]
  281         not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"]
  282         assert not_before == "2019-05-05 00:00:00"
  283         assert not_after == "2020-05-05 14:30:00"
  284 
  285     def test_cert_issue_not_before(self):
  286         ret = self.run_function(
  287             "state.apply",
  288             ["test_cert_not_before"],
  289             pillar={"tmp_dir": RUNTIME_VARS.TMP},
  290         )
  291         key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  292             RUNTIME_VARS.TMP
  293         )
  294         assert key in ret
  295         assert "changes" in ret[key]
  296         assert "Certificate" in ret[key]["changes"]
  297         assert "New" in ret[key]["changes"]["Certificate"]
  298         assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
  299         assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
  300         not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"]
  301         assert not_before == "2019-05-05 00:00:00"
  302 
  303     def test_cert_issue_not_after(self):
  304         ret = self.run_function(
  305             "state.apply", ["test_cert_not_after"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
  306         )
  307         key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
  308             RUNTIME_VARS.TMP
  309         )
  310         assert key in ret
  311         assert "changes" in ret[key]
  312         assert "Certificate" in ret[key]["changes"]
  313         assert "New" in ret[key]["changes"]["Certificate"]
  314         assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
  315         assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
  316         not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"]
  317         assert not_after == "2020-05-05 14:30:00"
  318 
  319     @with_tempfile(suffix=".crt", create=False)
  320     @with_tempfile(suffix=".key", create=False)
  321     def test_issue_41858(self, keyfile, crtfile):
  322         ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
  323         signing_policy = "no_such_policy"
  324         ret = self.run_function(
  325             "state.apply",
  326             ["issue-41858.gen_cert"],
  327             pillar={
  328                 "keyfile": keyfile,
  329                 "crtfile": crtfile,
  330                 "tmp_dir": RUNTIME_VARS.TMP,
  331             },
  332         )
  333         self.assertTrue(ret[ret_key]["result"])
  334         cert_sum = self.file_checksum(crtfile)
  335 
  336         ret = self.run_function(
  337             "state.apply",
  338             ["issue-41858.check"],
  339             pillar={
  340                 "keyfile": keyfile,
  341                 "crtfile": crtfile,
  342                 "signing_policy": signing_policy,
  343             },
  344         )
  345         self.assertFalse(ret[ret_key]["result"])
  346         # self.assertSaltCommentRegexpMatches(ret[ret_key], "Signing policy {0} does not exist".format(signing_policy))
  347         self.assertEqual(self.file_checksum(crtfile), cert_sum)
  348 
  349     @with_tempfile(suffix=".crt", create=False)
  350     @with_tempfile(suffix=".key", create=False)
  351     def test_compound_match_minion_have_correct_grain_value(self, keyfile, crtfile):
  352         ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
  353         signing_policy = "compound_match"
  354         ret = self.run_function(
  355             "state.apply",
  356             ["x509_compound_match.gen_ca"],
  357             pillar={"tmp_dir": RUNTIME_VARS.TMP},
  358         )
  359 
  360         # sub_minion have grain set and CA is on other minion
  361         # CA minion have same grain with incorrect value
  362         ret = self.run_function(
  363             "state.apply",
  364             ["x509_compound_match.check"],
  365             minion_tgt="sub_minion",
  366             pillar={
  367                 "keyfile": keyfile,
  368                 "crtfile": crtfile,
  369                 "signing_policy": signing_policy,
  370             },
  371         )
  372         self.assertTrue(ret[ret_key]["result"])
  373 
  374     @with_tempfile(suffix=".crt", create=False)
  375     @with_tempfile(suffix=".key", create=False)
  376     def test_compound_match_ca_have_correct_grain_value(self, keyfile, crtfile):
  377         self.run_function(
  378             "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="minion"
  379         )
  380         self.run_function(
  381             "grains.set",
  382             ["x509_test_grain", "not_correct_value"],
  383             minion_tgt="sub_minion",
  384         )
  385 
  386         ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
  387         signing_policy = "compound_match"
  388         self.run_function(
  389             "state.apply",
  390             ["x509_compound_match.gen_ca"],
  391             pillar={"tmp_dir": RUNTIME_VARS.TMP},
  392         )
  393 
  394         ret = self.run_function(
  395             "state.apply",
  396             ["x509_compound_match.check"],
  397             minion_tgt="sub_minion",
  398             pillar={
  399                 "keyfile": keyfile,
  400                 "crtfile": crtfile,
  401                 "signing_policy": signing_policy,
  402             },
  403         )
  404         self.assertFalse(ret[ret_key]["result"])
  405 
  406     @with_tempfile(suffix=".crt", create=False)
  407     @with_tempfile(suffix=".key", create=False)
  408     def test_self_signed_cert(self, keyfile, crtfile):
  409         """
  410         Self-signed certificate, no CA.
  411         Run the state twice to confirm the cert is only created once
  412         and its contents don't change.
  413         """
  414         first_run = self.run_function(
  415             "state.apply",
  416             ["x509.self_signed"],
  417             pillar={"keyfile": keyfile, "crtfile": crtfile},
  418         )
  419         key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
  420         self.assertIn("New", first_run[key]["changes"]["Certificate"])
  421         self.assertEqual(
  422             "Certificate is valid and up to date",
  423             first_run[key]["changes"]["Status"]["New"],
  424         )
  425         self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  426 
  427         with salt.utils.files.fopen(crtfile, "r") as first_cert:
  428             cert_contents = first_cert.read()
  429 
  430         second_run = self.run_function(
  431             "state.apply",
  432             ["x509.self_signed"],
  433             pillar={"keyfile": keyfile, "crtfile": crtfile},
  434         )
  435         self.assertEqual({}, second_run[key]["changes"])
  436         with salt.utils.files.fopen(crtfile, "r") as second_cert:
  437             self.assertEqual(
  438                 cert_contents,
  439                 second_cert.read(),
  440                 "Certificate contents should not have changed.",
  441             )
  442 
  443     @with_tempfile(suffix=".crt", create=False)
  444     @with_tempfile(suffix=".key", create=False)
  445     def test_old_self_signed_cert_is_recreated(self, keyfile, crtfile):
  446         """
  447         Self-signed certificate, no CA.
  448         First create a cert that expires in 30 days, then recreate
  449         the cert because the second state run requires days_remaining
  450         to be at least 90.
  451         """
  452         first_run = self.run_function(
  453             "state.apply",
  454             ["x509.self_signed_expiry"],
  455             pillar={
  456                 "keyfile": keyfile,
  457                 "crtfile": crtfile,
  458                 "days_valid": 30,
  459                 "days_remaining": 10,
  460             },
  461         )
  462         key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
  463         self.assertEqual(
  464             "Certificate is valid and up to date",
  465             first_run[key]["changes"]["Status"]["New"],
  466         )
  467         expiry = datetime.datetime.strptime(
  468             first_run[key]["changes"]["Certificate"]["New"]["Not After"],
  469             "%Y-%m-%d %H:%M:%S",
  470         )
  471         self.assertEqual(29, (expiry - datetime.datetime.now()).days)
  472         self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  473 
  474         with salt.utils.files.fopen(crtfile, "r") as first_cert:
  475             cert_contents = first_cert.read()
  476 
  477         second_run = self.run_function(
  478             "state.apply",
  479             ["x509.self_signed_expiry"],
  480             pillar={
  481                 "keyfile": keyfile,
  482                 "crtfile": crtfile,
  483                 "days_valid": 180,
  484                 "days_remaining": 90,
  485             },
  486         )
  487         self.assertEqual(
  488             "Certificate needs renewal: 29 days remaining but it needs to be at least 90",
  489             second_run[key]["changes"]["Status"]["Old"],
  490         )
  491         expiry = datetime.datetime.strptime(
  492             second_run[key]["changes"]["Certificate"]["New"]["Not After"],
  493             "%Y-%m-%d %H:%M:%S",
  494         )
  495         self.assertEqual(179, (expiry - datetime.datetime.now()).days)
  496         with salt.utils.files.fopen(crtfile, "r") as second_cert:
  497             self.assertNotEqual(
  498                 cert_contents,
  499                 second_cert.read(),
  500                 "Certificate contents should have changed.",
  501             )
  502 
  503     @with_tempfile(suffix=".crt", create=False)
  504     @with_tempfile(suffix=".key", create=False)
  505     def test_mismatched_self_signed_cert_is_recreated(self, keyfile, crtfile):
  506         """
  507         Self-signed certificate, no CA.
  508         First create a cert, then run the state again with a different
  509         subjectAltName. The cert should be recreated.
  510         Finally, run once more with the same subjectAltName as the
  511         second run. Nothing should change.
  512         """
  513         first_run = self.run_function(
  514             "state.apply",
  515             ["x509.self_signed_different_properties"],
  516             pillar={
  517                 "keyfile": keyfile,
  518                 "crtfile": crtfile,
  519                 "subjectAltName": "DNS:alt.service.local",
  520             },
  521         )
  522         key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
  523         self.assertEqual(
  524             "Certificate is valid and up to date",
  525             first_run[key]["changes"]["Status"]["New"],
  526         )
  527         sans = first_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
  528             "subjectAltName"
  529         ]
  530         self.assertEqual("DNS:alt.service.local", sans)
  531         self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  532 
  533         with salt.utils.files.fopen(crtfile, "r") as first_cert:
  534             first_cert_contents = first_cert.read()
  535 
  536         second_run_pillar = {
  537             "keyfile": keyfile,
  538             "crtfile": crtfile,
  539             "subjectAltName": "DNS:alt1.service.local, DNS:alt2.service.local",
  540         }
  541         second_run = self.run_function(
  542             "state.apply",
  543             ["x509.self_signed_different_properties"],
  544             pillar=second_run_pillar,
  545         )
  546         self.assertEqual(
  547             "Certificate properties are different: X509v3 Extensions",
  548             second_run[key]["changes"]["Status"]["Old"],
  549         )
  550         sans = second_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
  551             "subjectAltName"
  552         ]
  553         self.assertEqual("DNS:alt1.service.local, DNS:alt2.service.local", sans)
  554         with salt.utils.files.fopen(crtfile, "r") as second_cert:
  555             second_cert_contents = second_cert.read()
  556             self.assertNotEqual(
  557                 first_cert_contents,
  558                 second_cert_contents,
  559                 "Certificate contents should have changed.",
  560             )
  561 
  562         third_run = self.run_function(
  563             "state.apply",
  564             ["x509.self_signed_different_properties"],
  565             pillar=second_run_pillar,
  566         )
  567         self.assertEqual({}, third_run[key]["changes"])
  568         with salt.utils.files.fopen(crtfile, "r") as third_cert:
  569             self.assertEqual(
  570                 second_cert_contents,
  571                 third_cert.read(),
  572                 "Certificate contents should not have changed.",
  573             )
  574 
  575     @with_tempfile(suffix=".crt", create=False)
  576     @with_tempfile(suffix=".key", create=False)
  577     def test_certificate_managed_with_managed_private_key_does_not_error(
  578         self, keyfile, crtfile
  579     ):
  580         """
  581         Test using the deprecated managed_private_key arg in certificate_managed does not throw an error.
  582 
  583         TODO: Remove this test in Aluminium when the arg is removed.
  584         """
  585         self.run_state("x509.private_key_managed", name=keyfile, bits=4096)
  586         ret = self.run_state(
  587             "x509.certificate_managed",
  588             name=crtfile,
  589             CN="localhost",
  590             signing_private_key=keyfile,
  591             managed_private_key={"name": keyfile, "bits": 4096},
  592         )
  593         key = "x509_|-{0}_|-{0}_|-certificate_managed".format(crtfile)
  594         self.assertEqual(True, ret[key]["result"])
  595 
  596     @with_tempfile(suffix=".crt", create=False)
  597     @with_tempfile(suffix=".key", create=False)
  598     def test_file_properties_are_updated(self, keyfile, crtfile):
  599         """
  600         Self-signed certificate, no CA.
  601         First create a cert, then run the state again with different
  602         file mode. The cert should not be recreated, but the file
  603         should be updated.
  604         Finally, run once more with the same file mode as the second
  605         run. Nothing should change.
  606         """
  607         first_run = self.run_function(
  608             "state.apply",
  609             ["x509.self_signed_different_properties"],
  610             pillar={"keyfile": keyfile, "crtfile": crtfile, "fileMode": "0755"},
  611         )
  612         key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
  613         self.assertEqual(
  614             "Certificate is valid and up to date",
  615             first_run[key]["changes"]["Status"]["New"],
  616         )
  617         self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
  618         self.assertEqual("0755", oct(os.stat(crtfile).st_mode)[-4:])
  619 
  620         second_run_pillar = {
  621             "keyfile": keyfile,
  622             "crtfile": crtfile,
  623             "mode": "0600",
  624         }
  625         second_run = self.run_function(
  626             "state.apply",
  627             ["x509.self_signed_different_properties"],
  628             pillar=second_run_pillar,
  629         )
  630         self.assertEqual("0600", oct(os.stat(crtfile).st_mode)[-4:])
  631 
  632         third_run = self.run_function(
  633             "state.apply",
  634             ["x509.self_signed_different_properties"],
  635             pillar=second_run_pillar,
  636         )
  637         self.assertEqual({}, third_run[key]["changes"])
  638         self.assertEqual("0600", oct(os.stat(crtfile).st_mode)[-4:])
  639 
  640     @with_tempfile(suffix=".crt", create=False)
  641     @with_tempfile(suffix=".key", create=False)
  642     def test_file_managed_failure(self, keyfile, crtfile):
  643         """
  644         Test that a failure in the file.managed call marks the state
  645         call as failed.
  646         """
  647         crtfile_pieces = os.path.split(crtfile)
  648         bad_crtfile = os.path.join(
  649             crtfile_pieces[0], "deeply/nested", crtfile_pieces[1]
  650         )
  651         ret = self.run_function(
  652             "state.apply",
  653             ["x509.self_signed_file_error"],
  654             pillar={"keyfile": keyfile, "crtfile": bad_crtfile},
  655         )
  656 
  657         key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(bad_crtfile)
  658         self.assertFalse(ret[key]["result"], "State should have failed.")
  659         self.assertEqual({}, ret[key]["changes"])
  660         self.assertFalse(
  661             os.path.exists(crtfile), "Certificate should not have been created."
  662         )
  663 
  664     @with_tempfile(suffix=".crt", create=False)
  665     @with_tempfile(suffix=".key", create=False)
  666     def test_py2_generated_cert_is_not_recreated(self, keyfile, crtfile):
  667         keyfile_contents = textwrap.dedent(
  668             """\
  669         -----BEGIN RSA PRIVATE KEY-----
  670         MIIEpAIBAAKCAQEAp5PQyx5NlYrfzd7vU/Xb2YR5qbWWtpWWoKmJC1gML5v5DBI7
  671         +p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2Du7pP0xiCAYolhFqF78ibxNrN4OkT
  672         UPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6wRzTj4T9b+0Bb/PZHI2t5YwtIooVM
  673         EFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43Aj3Epy++kqmaWj1hIucSprkDrAXFS
  674         WacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJkwjiqbZTwYIPSSrl+FO3XqDY70SxU
  675         3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCBEQIDAQABAoIBAQCZvS23u1RYVrEe
  676         sWGF+LA67aOkg9kCJ1iqiv8UrjF32DNy1KO8OcY2d5H/+u/mUzqh2HmU5QbtBsoi
  677         xS9dSSTrLHGhbAGRogjrVRU9uCDYSBjLN2mmR4IrdkTF3pkZtpcRY0gU/eWTNXUl
  678         iCmGxhj5KtfJxZQAfLon6FW5dBdIOgxSCJhvRq0zFpWJZFGWWkBExDfeNg//0fCU
  679         UbjRjGacP/+R6FSJa6tevzgR7tIIapm1dY/ofPXIXsZGo1R87fRgLI1D+e84Jdds
  680         /U0bKzPOgAjcC1b262lJ8058pjG/nqWC0YUfpIJUVv2ciJpH3Ha+90526InLAUXA
  681         RWe1Z2YxAoGBANqACEKvUbxENu+XxQj0SI1co4SRTOvgbrSQGL61rDY6PvY/bOqC
  682         JeR0KC3MN6e7fx52tsl/eqP9iyExUpO9b0BCnGg967MivJXWUxhUdOL/r2ceQBqD
  683         DiPVZCFsjeNdSNihnNctAig9Po3GEUWE0ikHr3NcD+wXTnhnIEjJ/fltAoGBAMRW
  684         dIcOiuDLm/oDLNCpwEO4m63ymbUgeOj2cZhKMTqFmspnKnuCU1U/A8cuQcs1gydL
  685         7MzxVP7MZDIEqT5gGj3eyuVMAmKbvLFR2NctDIDjaUs6oz0J9NGByPNjXaYr4uMd
  686         EZrxD8gLZ/G+/7eKsCgBA9ksSydDo00Vf/qAsmO1AoGBANWqc+l59eyrrCj5egU6
  687         lKQf3gsp51WV/8v0SS5dC41vwdgdx80+/fz8FbpLRHVypWlN34sFbRFmQ6Juz/iH
  688         O35UZQyO2KkxI8dGcbWOCUtditHExBzo4W/rIWKJ++pFc5Hb4DqO2dgto7kR4hvg
  689         OX9D869UbIGLfQHCntBvLju1AoGAHpcl0sEmTD4NEFgcTGqWZTbHMsQAxOLJU+rJ
  690         6iNtJiQY6P5H9TRqDXci/I6te57bz2yZ+ZiEWKq51b06LVjF3evviuhb2sdPEAWj
  691         lmsTbqWAC1OYiXMarOXezGUn+zMNR7uIua5jehSk3lqW9x7psWHvGpA3KWf1cpYt
  692         +XbB1J0CgYBCSjALTv4dcn+CtS3kqb806z8H9MSZznUwSmcgvwCR5sqwLAUk1xRn
  693         hEqXbC1RGee3Xqv9mXPDK2LirpdRYi9Jr9ApZkrSkeaXSd2d4cy2ujUT0c7P8JrD
  694         i6QXb+HaFeBuS5ulYDmo4mIbCysuTsgrLzplViUy3xUQv23M/Eh1gw==
  695         -----END RSA PRIVATE KEY-----
  696         """
  697         )
  698         crtfile_contents = textwrap.dedent(
  699             """\
  700         -----BEGIN CERTIFICATE-----
  701         MIIEhTCCA22gAwIBAgIIUijHgif6VJUwDQYJKoZIhvcNAQELBQAwgYIxCzAJBgNV
  702         BAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJvb3QgQ0ExETAPBgNVBAcMCEthcGVs
  703         bGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYDVQQKDAdFeGFtcGxlMSIwIAYJKoZI
  704         hvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3JnMB4XDTIwMDYxNjA3Mzk1OVoXDTMw
  705         MDYxNDA3Mzk1OVowgYIxCzAJBgNVBAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJv
  706         b3QgQ0ExETAPBgNVBAcMCEthcGVsbGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYD
  707         VQQKDAdFeGFtcGxlMSIwIAYJKoZIhvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3Jn
  708         MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp5PQyx5NlYrfzd7vU/Xb
  709         2YR5qbWWtpWWoKmJC1gML5v5DBI7+p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2D
  710         u7pP0xiCAYolhFqF78ibxNrN4OkTUPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6w
  711         RzTj4T9b+0Bb/PZHI2t5YwtIooVMEFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43A
  712         j3Epy++kqmaWj1hIucSprkDrAXFSWacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJk
  713         wjiqbZTwYIPSSrl+FO3XqDY70SxU3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCB
  714         EQIDAQABo4H8MIH5MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgEGMB0G
  715         A1UdDgQWBBTmNsYLuQTxpANgTuw7LRn1qHJsjzCBtgYDVR0jBIGuMIGrgBTmNsYL
  716         uQTxpANgTuw7LRn1qHJsj6GBiKSBhTCBgjELMAkGA1UEBhMCQkUxGDAWBgNVBAMM
  717         D0V4YW1wbGUgUm9vdCBDQTERMA8GA1UEBwwIS2FwZWxsZW4xEDAOBgNVBAgMB0Fu
  718         dHdlcnAxEDAOBgNVBAoMB0V4YW1wbGUxIjAgBgkqhkiG9w0BCQEWE2NlcnRhZG1A
  719         ZXhhbXBsZS5vcmeCCFIox4In+lSVMA0GCSqGSIb3DQEBCwUAA4IBAQBnC1/kK+xr
  720         Vjr5Y2YRjyjm4e8I/nTU+RX2p5K+Yth3CqWO3JuDiV/31UMtPl832n2GWSgXG2pP
  721         B52oeuCP4Re76jqhOmJWY3CKPji+Rs16wj199i9AAcwhSF0rpi5+Fi84HtP3q6pH
  722         cuzZfIPW44aJ5l4k+QvTLoWzr0XujMFcYzI45i3SJqTMs8xdIP5YLN8JXtQSPw9Z
  723         8/nBKbPj7WTUC9cj9Cw2bz+wTpdRF4XCsUF3Vpl9fP7SK8yvv0I85LZnWQx1eQlv
  724         COAM5HWxUT9bWgv18zXdYkc6VLw6ufQSxxuhLMjJxuK27Ny/F18/xYLRTVnse36d
  725         tPJrseUPmvIK
  726         -----END CERTIFICATE-----
  727         """
  728         )
  729         slsfile = textwrap.dedent(
  730             """\
  731         {%- set ca_key_path = '"""
  732             + keyfile
  733             + """' %}
  734         {%- set ca_crt_path = '"""
  735             + crtfile
  736             + """' %}
  737 
  738         certificate.authority::private-key:
  739           x509.private_key_managed:
  740             - name: {{ ca_key_path }}
  741             - backup: True
  742 
  743         certificate.authority::certificate:
  744           x509.certificate_managed:
  745             - name: {{ ca_crt_path }}
  746             - signing_private_key: {{ ca_key_path }}
  747             - CN: Example Root CA
  748             - O: Example
  749             - C: BE
  750             - ST: Antwerp
  751             - L: Kapellen
  752             - Email: certadm@example.org
  753             - basicConstraints: "critical CA:true"
  754             - keyUsage: "critical cRLSign, keyCertSign"
  755             - subjectKeyIdentifier: hash
  756             - authorityKeyIdentifier: keyid,issuer:always
  757             - days_valid: 3650
  758             - days_remaining: 0
  759             - backup: True
  760             - require:
  761               - x509: certificate.authority::private-key
  762         """
  763         )
  764         with salt.utils.files.fopen(
  765             os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "cert.sls"), "w"
  766         ) as wfh:
  767             wfh.write(slsfile)
  768 
  769         # Generate the certificate twice.
  770         # On the first run, no key nor cert exist.
  771         ret = self.run_function("state.sls", ["cert"])
  772         log.debug(
  773             "First state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
  774         )
  775         for state_run_id, state_run_details in ret.items():
  776             if state_run_id.endswith("private_key_managed"):
  777                 assert state_run_details["result"]
  778                 assert "new" in state_run_details["changes"]
  779             if state_run_id.endswith("certificate_managed"):
  780                 assert state_run_details["result"]
  781                 assert "Certificate" in state_run_details["changes"]
  782                 assert "New" in state_run_details["changes"]["Certificate"]
  783                 assert "Status" in state_run_details["changes"]
  784                 assert "New" in state_run_details["changes"]["Status"]
  785         # On the second run, they exist and should not trigger any modification
  786         ret = self.run_function("state.sls", ["cert"])
  787         log.debug(
  788             "Second state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
  789         )
  790         for state_run_id, state_run_details in ret.items():
  791             if state_run_id.endswith("private_key_managed"):
  792                 assert state_run_details["result"]
  793                 assert state_run_details["changes"] == {}
  794             if state_run_id.endswith("certificate_managed"):
  795                 assert state_run_details["result"]
  796                 assert state_run_details["changes"] == {}
  797         # Now we repleace they key and cert contents with the contents of the above
  798         # call, but under Py2
  799         with salt.utils.files.fopen(keyfile, "w") as wfh:
  800             wfh.write(keyfile_contents)
  801         with salt.utils.files.fopen(keyfile) as rfh:
  802             log.debug("Written keyfile, %r, contents:\n%s", keyfile, rfh.read())
  803         with salt.utils.files.fopen(crtfile, "w") as wfh:
  804             wfh.write(crtfile_contents)
  805         with salt.utils.files.fopen(crtfile) as rfh:
  806             log.debug("Written crtfile, %r, contents:\n%s", crtfile, rfh.read())
  807         # We should not trigger any modification
  808         ret = self.run_function("state.sls", ["cert"])
  809         log.debug(
  810             "Third state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
  811         )
  812         for state_run_id, state_run_details in ret.items():
  813             if state_run_id.endswith("private_key_managed"):
  814                 assert state_run_details["result"]
  815                 assert state_run_details["changes"] == {}
  816             if state_run_id.endswith("certificate_managed"):
  817                 assert state_run_details["result"]
  818                 assert state_run_details["changes"] == {}