"Fossies" - the Fresh Open Source Software Archive 
Member "salt-3002.2/tests/integration/states/test_x509.py" (18 Nov 2020, 33950 Bytes) of package /linux/misc/salt-3002.2.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
See also the latest
Fossies "Diffs" side-by-side code changes report for "test_x509.py":
3002.1_vs_3002.2.
1 import datetime
2 import hashlib
3 import logging
4 import os
5 import pprint
6 import textwrap
7
8 import pytest
9 import salt.utils.files
10 from tests.support.case import ModuleCase
11 from tests.support.helpers import slowTest, with_tempfile
12 from tests.support.mixins import SaltReturnAssertsMixin
13 from tests.support.runtests import RUNTIME_VARS
14 from tests.support.unit import skipIf
15
16 try:
17 import M2Crypto # pylint: disable=W0611
18
19 HAS_M2CRYPTO = True
20 except ImportError:
21 HAS_M2CRYPTO = False
22
23 log = logging.getLogger(__name__)
24
25
26 @pytest.mark.usefixtures("salt_sub_minion")
27 @skipIf(not HAS_M2CRYPTO, "Skip when no M2Crypto found")
28 class x509Test(ModuleCase, SaltReturnAssertsMixin):
29 @classmethod
30 def setUpClass(cls):
31 cert_path = os.path.join(RUNTIME_VARS.BASE_FILES, "x509_test.crt")
32 with salt.utils.files.fopen(cert_path) as fp:
33 cls.x509_cert_text = fp.read()
34
35 def setUp(self):
36 with salt.utils.files.fopen(
37 os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"), "w"
38 ) as fp:
39 fp.write(
40 textwrap.dedent(
41 """\
42 x509_signing_policies:
43 ca_policy:
44 - minions: '*'
45 - signing_private_key: {0}/pki/ca.key
46 - signing_cert: {0}/pki/ca.crt
47 - O: Test Company
48 - basicConstraints: "CA:false"
49 - keyUsage: "critical digitalSignature, keyEncipherment"
50 - extendedKeyUsage: "critical serverAuth, clientAuth"
51 - subjectKeyIdentifier: hash
52 - authorityKeyIdentifier: keyid
53 - days_valid: 730
54 - copypath: {0}/pki
55 compound_match:
56 - minions: 'G@x509_test_grain:correct_value'
57 - signing_private_key: {0}/pki/ca.key
58 - signing_cert: {0}/pki/ca.crt
59 - O: Test Company
60 - basicConstraints: "CA:false"
61 - keyUsage: "critical digitalSignature, keyEncipherment"
62 - extendedKeyUsage: "critical serverAuth, clientAuth"
63 - subjectKeyIdentifier: hash
64 - authorityKeyIdentifier: keyid
65 - days_valid: 730
66 - copypath: {0}/pki
67 """.format(
68 RUNTIME_VARS.TMP
69 )
70 )
71 )
72 with salt.utils.files.fopen(
73 os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"), "w"
74 ) as fp:
75 fp.write(
76 textwrap.dedent(
77 """\
78 base:
79 '*':
80 - signing_policies
81 """
82 )
83 )
84 self.run_function("saltutil.refresh_pillar")
85 self.run_function(
86 "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="sub_minion"
87 )
88 self.run_function(
89 "grains.set", ["x509_test_grain", "not_correct_value"], minion_tgt="minion"
90 )
91
92 def tearDown(self):
93 os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"))
94 os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"))
95 certs_path = os.path.join(RUNTIME_VARS.TMP, "pki")
96 if os.path.exists(certs_path):
97 salt.utils.files.rm_rf(certs_path)
98 self.run_function("saltutil.refresh_pillar")
99 self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="sub_minion")
100 self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="minion")
101
102 def run_function(self, *args, **kwargs): # pylint: disable=arguments-differ
103 ret = super().run_function(*args, **kwargs)
104 return ret
105
106 @staticmethod
107 def file_checksum(path):
108 hash = hashlib.sha1()
109 with salt.utils.files.fopen(path, "rb") as f:
110 for block in iter(lambda: f.read(4096), b""):
111 hash.update(block)
112 return hash.hexdigest()
113
114 @with_tempfile(suffix=".pem", create=False)
115 @slowTest
116 def test_issue_49027(self, pemfile):
117 ret = self.run_state("x509.pem_managed", name=pemfile, text=self.x509_cert_text)
118 assert isinstance(ret, dict), ret
119 ret = ret[next(iter(ret))]
120 assert ret.get("result") is True, ret
121 with salt.utils.files.fopen(pemfile) as fp:
122 result = fp.readlines()
123 self.assertEqual(self.x509_cert_text.splitlines(True), result)
124
125 @with_tempfile(suffix=".crt", create=False)
126 @with_tempfile(suffix=".key", create=False)
127 @slowTest
128 def test_issue_49008(self, keyfile, crtfile):
129 ret = self.run_function(
130 "state.apply",
131 ["issue-49008"],
132 pillar={"keyfile": keyfile, "crtfile": crtfile},
133 )
134 assert isinstance(ret, dict), ret
135 for state_result in ret.values():
136 assert state_result["result"] is True, state_result
137 assert os.path.exists(keyfile)
138 assert os.path.exists(crtfile)
139
140 @slowTest
141 def test_cert_signing(self):
142 ret = self.run_function(
143 "state.apply", ["x509.cert_signing"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
144 )
145 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
146 RUNTIME_VARS.TMP
147 )
148 assert key in ret
149 assert "changes" in ret[key]
150 assert "Certificate" in ret[key]["changes"]
151 assert "New" in ret[key]["changes"]["Certificate"]
152
153 @slowTest
154 def test_cert_signing_based_on_csr(self):
155 ret = self.run_function(
156 "state.apply",
157 ["x509.cert_signing_based_on_csr"],
158 pillar={"tmp_dir": RUNTIME_VARS.TMP},
159 )
160 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
161 RUNTIME_VARS.TMP
162 )
163 assert key in ret
164 assert "changes" in ret[key]
165 assert "Certificate" in ret[key]["changes"]
166 assert "New" in ret[key]["changes"]["Certificate"]
167
168 @slowTest
169 def test_proper_cert_comparison(self):
170 # In this SLS we define two certs which have identical content.
171 # The first one is expected to be created.
172 # The second one is expected to be recognized as already present.
173 ret = self.run_function(
174 "state.apply",
175 ["x509.proper_cert_comparison"],
176 pillar={"tmp_dir": RUNTIME_VARS.TMP},
177 )
178 # check the first generated cert
179 first_key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
180 RUNTIME_VARS.TMP
181 )
182 assert first_key in ret
183 assert "changes" in ret[first_key]
184 assert "Certificate" in ret[first_key]["changes"]
185 assert "New" in ret[first_key]["changes"]["Certificate"]
186 # check whether the second defined cert is considered to match the first one
187 second_key = "x509_|-second_test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
188 RUNTIME_VARS.TMP
189 )
190 assert second_key in ret
191 assert "changes" in ret[second_key]
192 assert ret[second_key]["changes"] == {}
193
194 @slowTest
195 def test_crl_managed(self):
196 ret = self.run_function(
197 "state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
198 )
199 key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format(
200 RUNTIME_VARS.TMP, RUNTIME_VARS.TMP
201 )
202
203 # hints for easier debugging
204 # import json
205 # print(json.dumps(ret[key], indent=4, sort_keys=True))
206 # print(ret[key]['comment'])
207
208 assert key in ret
209 assert "changes" in ret[key]
210 self.assertEqual(ret[key]["result"], True)
211 assert "New" in ret[key]["changes"]
212 assert "Revoked Certificates" in ret[key]["changes"]["New"]
213 self.assertEqual(
214 ret[key]["changes"]["Old"],
215 "{}/pki/ca.crl does not exist.".format(RUNTIME_VARS.TMP),
216 )
217
218 @slowTest
219 def test_crl_managed_replacing_existing_crl(self):
220 os.mkdir(os.path.join(RUNTIME_VARS.TMP, "pki"))
221 with salt.utils.files.fopen(
222 os.path.join(RUNTIME_VARS.TMP, "pki/ca.crl"), "wb"
223 ) as crl_file:
224 crl_file.write(
225 b"""-----BEGIN RSA PRIVATE KEY-----
226 MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
227 pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
228 2rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
229 AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr
230 yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH
231 hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R
232 3MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7
233 u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy
234 kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj
235 35WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk
236 TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK
237 tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
238 c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
239 -----END RSA PRIVATE KEY-----
240 """
241 )
242
243 ret = self.run_function(
244 "state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
245 )
246 key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format(
247 RUNTIME_VARS.TMP, RUNTIME_VARS.TMP
248 )
249
250 # hints for easier debugging
251 # import json
252 # print(json.dumps(ret[key], indent=4, sort_keys=True))
253 # print(ret[key]['comment'])
254
255 assert key in ret
256 assert "changes" in ret[key]
257 self.assertEqual(ret[key]["result"], True)
258 assert "New" in ret[key]["changes"]
259 assert "Revoked Certificates" in ret[key]["changes"]["New"]
260 self.assertEqual(
261 ret[key]["changes"]["Old"],
262 "{}/pki/ca.crl is not a valid CRL.".format(RUNTIME_VARS.TMP),
263 )
264
265 def test_cert_issue_not_before_not_after(self):
266 ret = self.run_function(
267 "state.apply",
268 ["test_cert_not_before_not_after"],
269 pillar={"tmp_dir": RUNTIME_VARS.TMP},
270 )
271 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
272 RUNTIME_VARS.TMP
273 )
274 assert key in ret
275 assert "changes" in ret[key]
276 assert "Certificate" in ret[key]["changes"]
277 assert "New" in ret[key]["changes"]["Certificate"]
278 assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
279 assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
280 not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"]
281 not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"]
282 assert not_before == "2019-05-05 00:00:00"
283 assert not_after == "2020-05-05 14:30:00"
284
285 def test_cert_issue_not_before(self):
286 ret = self.run_function(
287 "state.apply",
288 ["test_cert_not_before"],
289 pillar={"tmp_dir": RUNTIME_VARS.TMP},
290 )
291 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
292 RUNTIME_VARS.TMP
293 )
294 assert key in ret
295 assert "changes" in ret[key]
296 assert "Certificate" in ret[key]["changes"]
297 assert "New" in ret[key]["changes"]["Certificate"]
298 assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
299 assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
300 not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"]
301 assert not_before == "2019-05-05 00:00:00"
302
303 def test_cert_issue_not_after(self):
304 ret = self.run_function(
305 "state.apply", ["test_cert_not_after"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
306 )
307 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
308 RUNTIME_VARS.TMP
309 )
310 assert key in ret
311 assert "changes" in ret[key]
312 assert "Certificate" in ret[key]["changes"]
313 assert "New" in ret[key]["changes"]["Certificate"]
314 assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
315 assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
316 not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"]
317 assert not_after == "2020-05-05 14:30:00"
318
319 @with_tempfile(suffix=".crt", create=False)
320 @with_tempfile(suffix=".key", create=False)
321 def test_issue_41858(self, keyfile, crtfile):
322 ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
323 signing_policy = "no_such_policy"
324 ret = self.run_function(
325 "state.apply",
326 ["issue-41858.gen_cert"],
327 pillar={
328 "keyfile": keyfile,
329 "crtfile": crtfile,
330 "tmp_dir": RUNTIME_VARS.TMP,
331 },
332 )
333 self.assertTrue(ret[ret_key]["result"])
334 cert_sum = self.file_checksum(crtfile)
335
336 ret = self.run_function(
337 "state.apply",
338 ["issue-41858.check"],
339 pillar={
340 "keyfile": keyfile,
341 "crtfile": crtfile,
342 "signing_policy": signing_policy,
343 },
344 )
345 self.assertFalse(ret[ret_key]["result"])
346 # self.assertSaltCommentRegexpMatches(ret[ret_key], "Signing policy {0} does not exist".format(signing_policy))
347 self.assertEqual(self.file_checksum(crtfile), cert_sum)
348
349 @with_tempfile(suffix=".crt", create=False)
350 @with_tempfile(suffix=".key", create=False)
351 def test_compound_match_minion_have_correct_grain_value(self, keyfile, crtfile):
352 ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
353 signing_policy = "compound_match"
354 ret = self.run_function(
355 "state.apply",
356 ["x509_compound_match.gen_ca"],
357 pillar={"tmp_dir": RUNTIME_VARS.TMP},
358 )
359
360 # sub_minion have grain set and CA is on other minion
361 # CA minion have same grain with incorrect value
362 ret = self.run_function(
363 "state.apply",
364 ["x509_compound_match.check"],
365 minion_tgt="sub_minion",
366 pillar={
367 "keyfile": keyfile,
368 "crtfile": crtfile,
369 "signing_policy": signing_policy,
370 },
371 )
372 self.assertTrue(ret[ret_key]["result"])
373
374 @with_tempfile(suffix=".crt", create=False)
375 @with_tempfile(suffix=".key", create=False)
376 def test_compound_match_ca_have_correct_grain_value(self, keyfile, crtfile):
377 self.run_function(
378 "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="minion"
379 )
380 self.run_function(
381 "grains.set",
382 ["x509_test_grain", "not_correct_value"],
383 minion_tgt="sub_minion",
384 )
385
386 ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
387 signing_policy = "compound_match"
388 self.run_function(
389 "state.apply",
390 ["x509_compound_match.gen_ca"],
391 pillar={"tmp_dir": RUNTIME_VARS.TMP},
392 )
393
394 ret = self.run_function(
395 "state.apply",
396 ["x509_compound_match.check"],
397 minion_tgt="sub_minion",
398 pillar={
399 "keyfile": keyfile,
400 "crtfile": crtfile,
401 "signing_policy": signing_policy,
402 },
403 )
404 self.assertFalse(ret[ret_key]["result"])
405
406 @with_tempfile(suffix=".crt", create=False)
407 @with_tempfile(suffix=".key", create=False)
408 def test_self_signed_cert(self, keyfile, crtfile):
409 """
410 Self-signed certificate, no CA.
411 Run the state twice to confirm the cert is only created once
412 and its contents don't change.
413 """
414 first_run = self.run_function(
415 "state.apply",
416 ["x509.self_signed"],
417 pillar={"keyfile": keyfile, "crtfile": crtfile},
418 )
419 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
420 self.assertIn("New", first_run[key]["changes"]["Certificate"])
421 self.assertEqual(
422 "Certificate is valid and up to date",
423 first_run[key]["changes"]["Status"]["New"],
424 )
425 self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
426
427 with salt.utils.files.fopen(crtfile, "r") as first_cert:
428 cert_contents = first_cert.read()
429
430 second_run = self.run_function(
431 "state.apply",
432 ["x509.self_signed"],
433 pillar={"keyfile": keyfile, "crtfile": crtfile},
434 )
435 self.assertEqual({}, second_run[key]["changes"])
436 with salt.utils.files.fopen(crtfile, "r") as second_cert:
437 self.assertEqual(
438 cert_contents,
439 second_cert.read(),
440 "Certificate contents should not have changed.",
441 )
442
443 @with_tempfile(suffix=".crt", create=False)
444 @with_tempfile(suffix=".key", create=False)
445 def test_old_self_signed_cert_is_recreated(self, keyfile, crtfile):
446 """
447 Self-signed certificate, no CA.
448 First create a cert that expires in 30 days, then recreate
449 the cert because the second state run requires days_remaining
450 to be at least 90.
451 """
452 first_run = self.run_function(
453 "state.apply",
454 ["x509.self_signed_expiry"],
455 pillar={
456 "keyfile": keyfile,
457 "crtfile": crtfile,
458 "days_valid": 30,
459 "days_remaining": 10,
460 },
461 )
462 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
463 self.assertEqual(
464 "Certificate is valid and up to date",
465 first_run[key]["changes"]["Status"]["New"],
466 )
467 expiry = datetime.datetime.strptime(
468 first_run[key]["changes"]["Certificate"]["New"]["Not After"],
469 "%Y-%m-%d %H:%M:%S",
470 )
471 self.assertEqual(29, (expiry - datetime.datetime.now()).days)
472 self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
473
474 with salt.utils.files.fopen(crtfile, "r") as first_cert:
475 cert_contents = first_cert.read()
476
477 second_run = self.run_function(
478 "state.apply",
479 ["x509.self_signed_expiry"],
480 pillar={
481 "keyfile": keyfile,
482 "crtfile": crtfile,
483 "days_valid": 180,
484 "days_remaining": 90,
485 },
486 )
487 self.assertEqual(
488 "Certificate needs renewal: 29 days remaining but it needs to be at least 90",
489 second_run[key]["changes"]["Status"]["Old"],
490 )
491 expiry = datetime.datetime.strptime(
492 second_run[key]["changes"]["Certificate"]["New"]["Not After"],
493 "%Y-%m-%d %H:%M:%S",
494 )
495 self.assertEqual(179, (expiry - datetime.datetime.now()).days)
496 with salt.utils.files.fopen(crtfile, "r") as second_cert:
497 self.assertNotEqual(
498 cert_contents,
499 second_cert.read(),
500 "Certificate contents should have changed.",
501 )
502
503 @with_tempfile(suffix=".crt", create=False)
504 @with_tempfile(suffix=".key", create=False)
505 def test_mismatched_self_signed_cert_is_recreated(self, keyfile, crtfile):
506 """
507 Self-signed certificate, no CA.
508 First create a cert, then run the state again with a different
509 subjectAltName. The cert should be recreated.
510 Finally, run once more with the same subjectAltName as the
511 second run. Nothing should change.
512 """
513 first_run = self.run_function(
514 "state.apply",
515 ["x509.self_signed_different_properties"],
516 pillar={
517 "keyfile": keyfile,
518 "crtfile": crtfile,
519 "subjectAltName": "DNS:alt.service.local",
520 },
521 )
522 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
523 self.assertEqual(
524 "Certificate is valid and up to date",
525 first_run[key]["changes"]["Status"]["New"],
526 )
527 sans = first_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
528 "subjectAltName"
529 ]
530 self.assertEqual("DNS:alt.service.local", sans)
531 self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
532
533 with salt.utils.files.fopen(crtfile, "r") as first_cert:
534 first_cert_contents = first_cert.read()
535
536 second_run_pillar = {
537 "keyfile": keyfile,
538 "crtfile": crtfile,
539 "subjectAltName": "DNS:alt1.service.local, DNS:alt2.service.local",
540 }
541 second_run = self.run_function(
542 "state.apply",
543 ["x509.self_signed_different_properties"],
544 pillar=second_run_pillar,
545 )
546 self.assertEqual(
547 "Certificate properties are different: X509v3 Extensions",
548 second_run[key]["changes"]["Status"]["Old"],
549 )
550 sans = second_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
551 "subjectAltName"
552 ]
553 self.assertEqual("DNS:alt1.service.local, DNS:alt2.service.local", sans)
554 with salt.utils.files.fopen(crtfile, "r") as second_cert:
555 second_cert_contents = second_cert.read()
556 self.assertNotEqual(
557 first_cert_contents,
558 second_cert_contents,
559 "Certificate contents should have changed.",
560 )
561
562 third_run = self.run_function(
563 "state.apply",
564 ["x509.self_signed_different_properties"],
565 pillar=second_run_pillar,
566 )
567 self.assertEqual({}, third_run[key]["changes"])
568 with salt.utils.files.fopen(crtfile, "r") as third_cert:
569 self.assertEqual(
570 second_cert_contents,
571 third_cert.read(),
572 "Certificate contents should not have changed.",
573 )
574
575 @with_tempfile(suffix=".crt", create=False)
576 @with_tempfile(suffix=".key", create=False)
577 def test_certificate_managed_with_managed_private_key_does_not_error(
578 self, keyfile, crtfile
579 ):
580 """
581 Test using the deprecated managed_private_key arg in certificate_managed does not throw an error.
582
583 TODO: Remove this test in Aluminium when the arg is removed.
584 """
585 self.run_state("x509.private_key_managed", name=keyfile, bits=4096)
586 ret = self.run_state(
587 "x509.certificate_managed",
588 name=crtfile,
589 CN="localhost",
590 signing_private_key=keyfile,
591 managed_private_key={"name": keyfile, "bits": 4096},
592 )
593 key = "x509_|-{0}_|-{0}_|-certificate_managed".format(crtfile)
594 self.assertEqual(True, ret[key]["result"])
595
596 @with_tempfile(suffix=".crt", create=False)
597 @with_tempfile(suffix=".key", create=False)
598 def test_file_properties_are_updated(self, keyfile, crtfile):
599 """
600 Self-signed certificate, no CA.
601 First create a cert, then run the state again with different
602 file mode. The cert should not be recreated, but the file
603 should be updated.
604 Finally, run once more with the same file mode as the second
605 run. Nothing should change.
606 """
607 first_run = self.run_function(
608 "state.apply",
609 ["x509.self_signed_different_properties"],
610 pillar={"keyfile": keyfile, "crtfile": crtfile, "fileMode": "0755"},
611 )
612 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
613 self.assertEqual(
614 "Certificate is valid and up to date",
615 first_run[key]["changes"]["Status"]["New"],
616 )
617 self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
618 self.assertEqual("0755", oct(os.stat(crtfile).st_mode)[-4:])
619
620 second_run_pillar = {
621 "keyfile": keyfile,
622 "crtfile": crtfile,
623 "mode": "0600",
624 }
625 second_run = self.run_function(
626 "state.apply",
627 ["x509.self_signed_different_properties"],
628 pillar=second_run_pillar,
629 )
630 self.assertEqual("0600", oct(os.stat(crtfile).st_mode)[-4:])
631
632 third_run = self.run_function(
633 "state.apply",
634 ["x509.self_signed_different_properties"],
635 pillar=second_run_pillar,
636 )
637 self.assertEqual({}, third_run[key]["changes"])
638 self.assertEqual("0600", oct(os.stat(crtfile).st_mode)[-4:])
639
640 @with_tempfile(suffix=".crt", create=False)
641 @with_tempfile(suffix=".key", create=False)
642 def test_file_managed_failure(self, keyfile, crtfile):
643 """
644 Test that a failure in the file.managed call marks the state
645 call as failed.
646 """
647 crtfile_pieces = os.path.split(crtfile)
648 bad_crtfile = os.path.join(
649 crtfile_pieces[0], "deeply/nested", crtfile_pieces[1]
650 )
651 ret = self.run_function(
652 "state.apply",
653 ["x509.self_signed_file_error"],
654 pillar={"keyfile": keyfile, "crtfile": bad_crtfile},
655 )
656
657 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(bad_crtfile)
658 self.assertFalse(ret[key]["result"], "State should have failed.")
659 self.assertEqual({}, ret[key]["changes"])
660 self.assertFalse(
661 os.path.exists(crtfile), "Certificate should not have been created."
662 )
663
664 @with_tempfile(suffix=".crt", create=False)
665 @with_tempfile(suffix=".key", create=False)
666 def test_py2_generated_cert_is_not_recreated(self, keyfile, crtfile):
667 keyfile_contents = textwrap.dedent(
668 """\
669 -----BEGIN RSA PRIVATE KEY-----
670 MIIEpAIBAAKCAQEAp5PQyx5NlYrfzd7vU/Xb2YR5qbWWtpWWoKmJC1gML5v5DBI7
671 +p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2Du7pP0xiCAYolhFqF78ibxNrN4OkT
672 UPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6wRzTj4T9b+0Bb/PZHI2t5YwtIooVM
673 EFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43Aj3Epy++kqmaWj1hIucSprkDrAXFS
674 WacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJkwjiqbZTwYIPSSrl+FO3XqDY70SxU
675 3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCBEQIDAQABAoIBAQCZvS23u1RYVrEe
676 sWGF+LA67aOkg9kCJ1iqiv8UrjF32DNy1KO8OcY2d5H/+u/mUzqh2HmU5QbtBsoi
677 xS9dSSTrLHGhbAGRogjrVRU9uCDYSBjLN2mmR4IrdkTF3pkZtpcRY0gU/eWTNXUl
678 iCmGxhj5KtfJxZQAfLon6FW5dBdIOgxSCJhvRq0zFpWJZFGWWkBExDfeNg//0fCU
679 UbjRjGacP/+R6FSJa6tevzgR7tIIapm1dY/ofPXIXsZGo1R87fRgLI1D+e84Jdds
680 /U0bKzPOgAjcC1b262lJ8058pjG/nqWC0YUfpIJUVv2ciJpH3Ha+90526InLAUXA
681 RWe1Z2YxAoGBANqACEKvUbxENu+XxQj0SI1co4SRTOvgbrSQGL61rDY6PvY/bOqC
682 JeR0KC3MN6e7fx52tsl/eqP9iyExUpO9b0BCnGg967MivJXWUxhUdOL/r2ceQBqD
683 DiPVZCFsjeNdSNihnNctAig9Po3GEUWE0ikHr3NcD+wXTnhnIEjJ/fltAoGBAMRW
684 dIcOiuDLm/oDLNCpwEO4m63ymbUgeOj2cZhKMTqFmspnKnuCU1U/A8cuQcs1gydL
685 7MzxVP7MZDIEqT5gGj3eyuVMAmKbvLFR2NctDIDjaUs6oz0J9NGByPNjXaYr4uMd
686 EZrxD8gLZ/G+/7eKsCgBA9ksSydDo00Vf/qAsmO1AoGBANWqc+l59eyrrCj5egU6
687 lKQf3gsp51WV/8v0SS5dC41vwdgdx80+/fz8FbpLRHVypWlN34sFbRFmQ6Juz/iH
688 O35UZQyO2KkxI8dGcbWOCUtditHExBzo4W/rIWKJ++pFc5Hb4DqO2dgto7kR4hvg
689 OX9D869UbIGLfQHCntBvLju1AoGAHpcl0sEmTD4NEFgcTGqWZTbHMsQAxOLJU+rJ
690 6iNtJiQY6P5H9TRqDXci/I6te57bz2yZ+ZiEWKq51b06LVjF3evviuhb2sdPEAWj
691 lmsTbqWAC1OYiXMarOXezGUn+zMNR7uIua5jehSk3lqW9x7psWHvGpA3KWf1cpYt
692 +XbB1J0CgYBCSjALTv4dcn+CtS3kqb806z8H9MSZznUwSmcgvwCR5sqwLAUk1xRn
693 hEqXbC1RGee3Xqv9mXPDK2LirpdRYi9Jr9ApZkrSkeaXSd2d4cy2ujUT0c7P8JrD
694 i6QXb+HaFeBuS5ulYDmo4mIbCysuTsgrLzplViUy3xUQv23M/Eh1gw==
695 -----END RSA PRIVATE KEY-----
696 """
697 )
698 crtfile_contents = textwrap.dedent(
699 """\
700 -----BEGIN CERTIFICATE-----
701 MIIEhTCCA22gAwIBAgIIUijHgif6VJUwDQYJKoZIhvcNAQELBQAwgYIxCzAJBgNV
702 BAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJvb3QgQ0ExETAPBgNVBAcMCEthcGVs
703 bGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYDVQQKDAdFeGFtcGxlMSIwIAYJKoZI
704 hvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3JnMB4XDTIwMDYxNjA3Mzk1OVoXDTMw
705 MDYxNDA3Mzk1OVowgYIxCzAJBgNVBAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJv
706 b3QgQ0ExETAPBgNVBAcMCEthcGVsbGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYD
707 VQQKDAdFeGFtcGxlMSIwIAYJKoZIhvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3Jn
708 MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp5PQyx5NlYrfzd7vU/Xb
709 2YR5qbWWtpWWoKmJC1gML5v5DBI7+p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2D
710 u7pP0xiCAYolhFqF78ibxNrN4OkTUPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6w
711 RzTj4T9b+0Bb/PZHI2t5YwtIooVMEFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43A
712 j3Epy++kqmaWj1hIucSprkDrAXFSWacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJk
713 wjiqbZTwYIPSSrl+FO3XqDY70SxU3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCB
714 EQIDAQABo4H8MIH5MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgEGMB0G
715 A1UdDgQWBBTmNsYLuQTxpANgTuw7LRn1qHJsjzCBtgYDVR0jBIGuMIGrgBTmNsYL
716 uQTxpANgTuw7LRn1qHJsj6GBiKSBhTCBgjELMAkGA1UEBhMCQkUxGDAWBgNVBAMM
717 D0V4YW1wbGUgUm9vdCBDQTERMA8GA1UEBwwIS2FwZWxsZW4xEDAOBgNVBAgMB0Fu
718 dHdlcnAxEDAOBgNVBAoMB0V4YW1wbGUxIjAgBgkqhkiG9w0BCQEWE2NlcnRhZG1A
719 ZXhhbXBsZS5vcmeCCFIox4In+lSVMA0GCSqGSIb3DQEBCwUAA4IBAQBnC1/kK+xr
720 Vjr5Y2YRjyjm4e8I/nTU+RX2p5K+Yth3CqWO3JuDiV/31UMtPl832n2GWSgXG2pP
721 B52oeuCP4Re76jqhOmJWY3CKPji+Rs16wj199i9AAcwhSF0rpi5+Fi84HtP3q6pH
722 cuzZfIPW44aJ5l4k+QvTLoWzr0XujMFcYzI45i3SJqTMs8xdIP5YLN8JXtQSPw9Z
723 8/nBKbPj7WTUC9cj9Cw2bz+wTpdRF4XCsUF3Vpl9fP7SK8yvv0I85LZnWQx1eQlv
724 COAM5HWxUT9bWgv18zXdYkc6VLw6ufQSxxuhLMjJxuK27Ny/F18/xYLRTVnse36d
725 tPJrseUPmvIK
726 -----END CERTIFICATE-----
727 """
728 )
729 slsfile = textwrap.dedent(
730 """\
731 {%- set ca_key_path = '"""
732 + keyfile
733 + """' %}
734 {%- set ca_crt_path = '"""
735 + crtfile
736 + """' %}
737
738 certificate.authority::private-key:
739 x509.private_key_managed:
740 - name: {{ ca_key_path }}
741 - backup: True
742
743 certificate.authority::certificate:
744 x509.certificate_managed:
745 - name: {{ ca_crt_path }}
746 - signing_private_key: {{ ca_key_path }}
747 - CN: Example Root CA
748 - O: Example
749 - C: BE
750 - ST: Antwerp
751 - L: Kapellen
752 - Email: certadm@example.org
753 - basicConstraints: "critical CA:true"
754 - keyUsage: "critical cRLSign, keyCertSign"
755 - subjectKeyIdentifier: hash
756 - authorityKeyIdentifier: keyid,issuer:always
757 - days_valid: 3650
758 - days_remaining: 0
759 - backup: True
760 - require:
761 - x509: certificate.authority::private-key
762 """
763 )
764 with salt.utils.files.fopen(
765 os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "cert.sls"), "w"
766 ) as wfh:
767 wfh.write(slsfile)
768
769 # Generate the certificate twice.
770 # On the first run, no key nor cert exist.
771 ret = self.run_function("state.sls", ["cert"])
772 log.debug(
773 "First state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
774 )
775 for state_run_id, state_run_details in ret.items():
776 if state_run_id.endswith("private_key_managed"):
777 assert state_run_details["result"]
778 assert "new" in state_run_details["changes"]
779 if state_run_id.endswith("certificate_managed"):
780 assert state_run_details["result"]
781 assert "Certificate" in state_run_details["changes"]
782 assert "New" in state_run_details["changes"]["Certificate"]
783 assert "Status" in state_run_details["changes"]
784 assert "New" in state_run_details["changes"]["Status"]
785 # On the second run, they exist and should not trigger any modification
786 ret = self.run_function("state.sls", ["cert"])
787 log.debug(
788 "Second state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
789 )
790 for state_run_id, state_run_details in ret.items():
791 if state_run_id.endswith("private_key_managed"):
792 assert state_run_details["result"]
793 assert state_run_details["changes"] == {}
794 if state_run_id.endswith("certificate_managed"):
795 assert state_run_details["result"]
796 assert state_run_details["changes"] == {}
797 # Now we repleace they key and cert contents with the contents of the above
798 # call, but under Py2
799 with salt.utils.files.fopen(keyfile, "w") as wfh:
800 wfh.write(keyfile_contents)
801 with salt.utils.files.fopen(keyfile) as rfh:
802 log.debug("Written keyfile, %r, contents:\n%s", keyfile, rfh.read())
803 with salt.utils.files.fopen(crtfile, "w") as wfh:
804 wfh.write(crtfile_contents)
805 with salt.utils.files.fopen(crtfile) as rfh:
806 log.debug("Written crtfile, %r, contents:\n%s", crtfile, rfh.read())
807 # We should not trigger any modification
808 ret = self.run_function("state.sls", ["cert"])
809 log.debug(
810 "Third state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
811 )
812 for state_run_id, state_run_details in ret.items():
813 if state_run_id.endswith("private_key_managed"):
814 assert state_run_details["result"]
815 assert state_run_details["changes"] == {}
816 if state_run_id.endswith("certificate_managed"):
817 assert state_run_details["result"]
818 assert state_run_details["changes"] == {}