"Fossies" - the Fresh Open Source Software Archive 
Member "salt-3002.2/tests/unit/modules/test_iptables.py" (18 Nov 2020, 23051 Bytes) of package /linux/misc/salt-3002.2.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
See also the latest
Fossies "Diffs" side-by-side code changes report for "test_iptables.py":
3002.1_vs_3002.2.
1 """
2 :codeauthor: Jayesh Kariya <jayeshk@saltstack.com>
3 """
4
5
6 import textwrap
7 import uuid
8
9 import salt.modules.iptables as iptables
10 from tests.support.mixins import LoaderModuleMockMixin
11 from tests.support.mock import MagicMock, mock_open, patch
12 from tests.support.unit import TestCase
13
14
15 class IptablesTestCase(TestCase, LoaderModuleMockMixin):
16 """
17 Test cases for salt.modules.iptables
18 """
19
20 def setup_loader_modules(self):
21 return {iptables: {}}
22
23 # 'version' function tests: 1
24
25 def test_version(self):
26 """
27 Test if it return version from iptables --version
28 """
29 mock = MagicMock(return_value="iptables v1.4.21")
30 with patch.dict(iptables.__salt__, {"cmd.run": mock}):
31 self.assertEqual(iptables.version(), "v1.4.21")
32
33 # 'build_rule' function tests: 1
34
35 def test_build_rule(self):
36 """
37 Test if it build a well-formatted iptables rule based on kwargs.
38 """
39 with patch.object(iptables, "_has_option", MagicMock(return_value=True)):
40 self.assertEqual(iptables.build_rule(), "")
41
42 self.assertEqual(
43 iptables.build_rule(name="ignored", state="ignored"),
44 "",
45 "build_rule should ignore name and state",
46 )
47
48 # Should properly negate bang-prefixed values
49 self.assertEqual(iptables.build_rule(**{"if": "!eth0"}), "! -i eth0")
50
51 # Should properly negate "not"-prefixed values
52 self.assertEqual(iptables.build_rule(**{"if": "not eth0"}), "! -i eth0")
53
54 self.assertEqual(
55 iptables.build_rule(**{"protocol": "tcp", "syn": "!"}), "-p tcp ! --syn"
56 )
57
58 self.assertEqual(
59 iptables.build_rule(dports=[80, 443], protocol="tcp"),
60 "-p tcp -m multiport --dports 80,443",
61 )
62
63 self.assertEqual(
64 iptables.build_rule(dports="80,443", protocol="tcp"),
65 "-p tcp -m multiport --dports 80,443",
66 )
67
68 # Should it really behave this way?
69 self.assertEqual(
70 iptables.build_rule(dports=["!80", 443], protocol="tcp"),
71 "-p tcp -m multiport ! --dports 80,443",
72 )
73
74 self.assertEqual(
75 iptables.build_rule(dports="!80,443", protocol="tcp"),
76 "-p tcp -m multiport ! --dports 80,443",
77 )
78
79 self.assertEqual(
80 iptables.build_rule(sports=[80, 443], protocol="tcp"),
81 "-p tcp -m multiport --sports 80,443",
82 )
83
84 self.assertEqual(
85 iptables.build_rule(sports="80,443", protocol="tcp"),
86 "-p tcp -m multiport --sports 80,443",
87 )
88
89 self.assertEqual(
90 iptables.build_rule(
91 "filter",
92 "INPUT",
93 command="I",
94 position="3",
95 full=True,
96 dports="protocol",
97 jump="ACCEPT",
98 ),
99 "Error: protocol must be specified",
100 )
101
102 self.assertEqual(
103 iptables.build_rule(
104 "filter",
105 "INPUT",
106 command="I",
107 position="3",
108 full=True,
109 sports="protocol",
110 jump="ACCEPT",
111 ),
112 "Error: protocol must be specified",
113 )
114
115 self.assertEqual(
116 iptables.build_rule(
117 "",
118 "INPUT",
119 command="I",
120 position="3",
121 full="True",
122 match="state",
123 jump="ACCEPT",
124 ),
125 "Error: Table needs to be specified",
126 )
127
128 self.assertEqual(
129 iptables.build_rule(
130 "filter",
131 "",
132 command="I",
133 position="3",
134 full="True",
135 match="state",
136 jump="ACCEPT",
137 ),
138 "Error: Chain needs to be specified",
139 )
140
141 self.assertEqual(
142 iptables.build_rule(
143 "filter",
144 "INPUT",
145 command="",
146 position="3",
147 full="True",
148 match="state",
149 jump="ACCEPT",
150 ),
151 "Error: Command needs to be specified",
152 )
153
154 # Test arguments that should appear after the --jump
155 self.assertEqual(
156 iptables.build_rule(jump="REDIRECT", **{"to-port": 8080}),
157 "--jump REDIRECT --to-port 8080",
158 )
159
160 # Should quote arguments with spaces, like log-prefix often has
161 self.assertEqual(
162 iptables.build_rule(jump="LOG", **{"log-prefix": "long prefix"}),
163 '--jump LOG --log-prefix "long prefix"',
164 )
165
166 # Should quote arguments with leading or trailing spaces
167 self.assertEqual(
168 iptables.build_rule(jump="LOG", **{"log-prefix": "spam: "}),
169 '--jump LOG --log-prefix "spam: "',
170 )
171
172 # Should allow no-arg jump options
173 self.assertEqual(
174 iptables.build_rule(jump="CLUSTERIP", **{"new": ""}),
175 "--jump CLUSTERIP --new",
176 )
177
178 # Should allow no-arg jump options as None
179 self.assertEqual(
180 iptables.build_rule(jump="CT", **{"notrack": None}),
181 "--jump CT --notrack",
182 )
183
184 # should build match-sets with single string
185 self.assertEqual(
186 iptables.build_rule(**{"match-set": "src flag1,flag2"}),
187 "-m set --match-set src flag1,flag2",
188 )
189
190 # should build match-sets as list
191 match_sets = [
192 "src1 flag1",
193 "src2 flag2,flag3",
194 ]
195 self.assertEqual(
196 iptables.build_rule(**{"match-set": match_sets}),
197 "-m set --match-set src1 flag1 -m set --match-set src2 flag2,flag3",
198 )
199
200 # should handle negations for string match-sets
201 self.assertEqual(
202 iptables.build_rule(**{"match-set": "!src flag"}),
203 "-m set ! --match-set src flag",
204 )
205
206 # should handle negations for list match-sets
207 match_sets = ["src1 flag", "not src2 flag2"]
208 self.assertEqual(
209 iptables.build_rule(**{"match-set": match_sets}),
210 "-m set --match-set src1 flag -m set ! --match-set src2 flag2",
211 )
212
213 # should allow escaped name
214 self.assertEqual(
215 iptables.build_rule(**{"match": "recent", "name_": "SSH"}),
216 "-m recent --name SSH",
217 )
218
219 # should allow empty arguments
220 self.assertEqual(
221 iptables.build_rule(**{"match": "recent", "update": None}),
222 "-m recent --update",
223 )
224
225 # Should allow the --save jump option to CONNSECMARK
226 # self.assertEqual(iptables.build_rule(jump='CONNSECMARK',
227 # **{'save': ''}),
228 # '--jump CONNSECMARK --save ')
229
230 ret = "/sbin/iptables --wait -t salt -I INPUT 3 -m state --jump ACCEPT"
231 with patch.object(
232 iptables, "_iptables_cmd", MagicMock(return_value="/sbin/iptables")
233 ):
234 self.assertEqual(
235 iptables.build_rule(
236 "salt",
237 "INPUT",
238 command="I",
239 position="3",
240 full="True",
241 match="state",
242 jump="ACCEPT",
243 ),
244 ret,
245 )
246
247 # 'get_saved_rules' function tests: 2
248
249 def test_get_saved_rules(self):
250 """
251 Test if it return a data structure of the rules in the conf file
252 """
253 mock = MagicMock(return_value=False)
254 with patch.object(iptables, "_parse_conf", mock):
255 self.assertFalse(iptables.get_saved_rules())
256 mock.assert_called_with(conf_file=None, family="ipv4")
257
258 def test_get_saved_rules_nilinuxrt(self):
259 """
260 Test get rules on NILinuxRT system
261 """
262 data = {
263 "/etc/natinst/share/iptables.conf": textwrap.dedent(
264 """\
265 # Generated by iptables-save v1.6.2 on Thu Oct 8 12:13:44 2020
266 *filter
267 :INPUT ACCEPT [2958:584773]
268 :FORWARD ACCEPT [0:0]
269 :OUTPUT ACCEPT [92:23648]
270 -A INPUT -p tcp -m tcp --dport 80 -j ACCEPT
271 COMMIT
272 # Completed on Thu Oct 8 12:13:44 2020
273 """
274 )
275 }
276 expected_input_rules = [
277 {
278 "protocol": ["tcp"],
279 "jump": ["ACCEPT"],
280 "match": ["tcp"],
281 "destination_port": ["80"],
282 }
283 ]
284 file_mock = mock_open(read_data=data)
285 with patch.dict(
286 iptables.__grains__, {"os_family": "NILinuxRT", "os": "NILinuxRT"}
287 ):
288 with patch.object(iptables.salt.utils.files, "fopen", file_mock):
289 rules = iptables.get_saved_rules()
290 self.assertEqual(
291 expected_input_rules, rules["filter"]["INPUT"]["rules"]
292 )
293
294 # 'get_rules' function tests: 1
295
296 def test_get_rules(self):
297 """
298 Test if it return a data structure of the current, in-memory rules
299 """
300 mock = MagicMock(return_value=False)
301 with patch.object(iptables, "_parse_conf", mock):
302 self.assertFalse(iptables.get_rules())
303 mock.assert_called_with(in_mem=True, family="ipv4")
304
305 # 'get_saved_policy' function tests: 1
306
307 def test_get_saved_policy(self):
308 """
309 Test if it return the current policy for the specified table/chain
310 """
311 self.assertEqual(
312 iptables.get_saved_policy(
313 table="filter", chain=None, conf_file=None, family="ipv4"
314 ),
315 "Error: Chain needs to be specified",
316 )
317
318 with patch.object(
319 iptables,
320 "_parse_conf",
321 MagicMock(return_value={"filter": {"INPUT": {"policy": True}}}),
322 ):
323 self.assertTrue(
324 iptables.get_saved_policy(
325 table="filter", chain="INPUT", conf_file=None, family="ipv4"
326 )
327 )
328
329 with patch.object(
330 iptables,
331 "_parse_conf",
332 MagicMock(return_value={"filter": {"INPUT": {"policy1": True}}}),
333 ):
334 self.assertIsNone(
335 iptables.get_saved_policy(
336 table="filter", chain="INPUT", conf_file=None, family="ipv4"
337 )
338 )
339
340 # 'get_policy' function tests: 1
341
342 def test_get_policy(self):
343 """
344 Test if it return the current policy for the specified table/chain
345 """
346 self.assertEqual(
347 iptables.get_policy(table="filter", chain=None, family="ipv4"),
348 "Error: Chain needs to be specified",
349 )
350
351 with patch.object(
352 iptables,
353 "_parse_conf",
354 MagicMock(return_value={"filter": {"INPUT": {"policy": True}}}),
355 ):
356 self.assertTrue(
357 iptables.get_policy(table="filter", chain="INPUT", family="ipv4")
358 )
359
360 with patch.object(
361 iptables,
362 "_parse_conf",
363 MagicMock(return_value={"filter": {"INPUT": {"policy1": True}}}),
364 ):
365 self.assertIsNone(
366 iptables.get_policy(table="filter", chain="INPUT", family="ipv4")
367 )
368
369 # 'set_policy' function tests: 1
370
371 def test_set_policy(self):
372 """
373 Test if it set the current policy for the specified table/chain
374 """
375 with patch.object(iptables, "_has_option", MagicMock(return_value=True)):
376 self.assertEqual(
377 iptables.set_policy(
378 table="filter", chain=None, policy=None, family="ipv4"
379 ),
380 "Error: Chain needs to be specified",
381 )
382
383 self.assertEqual(
384 iptables.set_policy(
385 table="filter", chain="INPUT", policy=None, family="ipv4"
386 ),
387 "Error: Policy needs to be specified",
388 )
389
390 mock = MagicMock(return_value=True)
391 with patch.dict(iptables.__salt__, {"cmd.run": mock}):
392 self.assertTrue(
393 iptables.set_policy(
394 table="filter", chain="INPUT", policy="ACCEPT", family="ipv4"
395 )
396 )
397
398 # 'save' function tests: 1
399
400 def test_save(self):
401 """
402 Test if it save the current in-memory rules to disk
403 """
404 with patch("salt.modules.iptables._conf", MagicMock(return_value=False)), patch(
405 "os.path.isdir", MagicMock(return_value=True)
406 ):
407 mock = MagicMock(return_value=True)
408 with patch.dict(
409 iptables.__salt__,
410 {
411 "cmd.run": mock,
412 "file.write": mock,
413 "config.option": MagicMock(return_value=[]),
414 },
415 ):
416 self.assertTrue(iptables.save(filename="/xyz", family="ipv4"))
417
418 # 'check' function tests: 1
419
420 def test_check(self):
421 """
422 Test if it check for the existence of a rule in the table and chain
423 """
424 self.assertEqual(
425 iptables.check(table="filter", chain=None, rule=None, family="ipv4"),
426 "Error: Chain needs to be specified",
427 )
428
429 self.assertEqual(
430 iptables.check(table="filter", chain="INPUT", rule=None, family="ipv4"),
431 "Error: Rule needs to be specified",
432 )
433
434 mock_rule = "m state --state RELATED,ESTABLISHED -j ACCEPT"
435 mock_chain = "INPUT"
436 mock_uuid = 31337
437 mock_cmd = MagicMock(
438 return_value="-A {}\n-A {}".format(mock_chain, hex(mock_uuid))
439 )
440 mock_has = MagicMock(return_value=True)
441 mock_not = MagicMock(return_value=False)
442
443 with patch.object(iptables, "_has_option", mock_not):
444 with patch.object(uuid, "getnode", MagicMock(return_value=mock_uuid)):
445 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
446 self.assertTrue(
447 iptables.check(
448 table="filter",
449 chain=mock_chain,
450 rule=mock_rule,
451 family="ipv4",
452 )
453 )
454
455 mock_cmd = MagicMock(return_value="")
456
457 with patch.object(iptables, "_has_option", mock_not):
458 with patch.object(uuid, "getnode", MagicMock(return_value=mock_uuid)):
459 with patch.dict(
460 iptables.__salt__, {"cmd.run": MagicMock(return_value="")}
461 ):
462 self.assertFalse(
463 iptables.check(
464 table="filter",
465 chain=mock_chain,
466 rule=mock_rule,
467 family="ipv4",
468 )
469 )
470
471 with patch.object(iptables, "_has_option", mock_has):
472 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
473 self.assertTrue(
474 iptables.check(
475 table="filter", chain="INPUT", rule=mock_rule, family="ipv4"
476 )
477 )
478
479 mock_cmd = MagicMock(return_value="-A 0x4d2")
480 mock_uuid = MagicMock(return_value=1234)
481
482 with patch.object(iptables, "_has_option", mock_has):
483 with patch.object(uuid, "getnode", mock_uuid):
484 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
485 self.assertTrue(
486 iptables.check(
487 table="filter", chain="0x4d2", rule=mock_rule, family="ipv4"
488 )
489 )
490
491 # 'check_chain' function tests: 1
492
493 def test_check_chain(self):
494 """
495 Test if it check for the existence of a chain in the table
496 """
497 self.assertEqual(
498 iptables.check_chain(table="filter", chain=None, family="ipv4"),
499 "Error: Chain needs to be specified",
500 )
501
502 mock_cmd = MagicMock(return_value="")
503 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
504 self.assertFalse(
505 iptables.check_chain(table="filter", chain="INPUT", family="ipv4")
506 )
507
508 # 'new_chain' function tests: 1
509
510 def test_new_chain(self):
511 """
512 Test if it create new custom chain to the specified table.
513 """
514 self.assertEqual(
515 iptables.new_chain(table="filter", chain=None, family="ipv4"),
516 "Error: Chain needs to be specified",
517 )
518
519 mock_cmd = MagicMock(return_value="")
520 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
521 self.assertTrue(
522 iptables.new_chain(table="filter", chain="INPUT", family="ipv4")
523 )
524
525 # 'delete_chain' function tests: 1
526
527 def test_delete_chain(self):
528 """
529 Test if it delete custom chain to the specified table.
530 """
531 self.assertEqual(
532 iptables.delete_chain(table="filter", chain=None, family="ipv4"),
533 "Error: Chain needs to be specified",
534 )
535
536 mock_cmd = MagicMock(return_value="")
537 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
538 self.assertTrue(
539 iptables.delete_chain(table="filter", chain="INPUT", family="ipv4")
540 )
541
542 # 'append' function tests: 1
543
544 def test_append(self):
545 """
546 Test if it append a rule to the specified table/chain.
547 """
548 with patch.object(
549 iptables, "_has_option", MagicMock(return_value=True)
550 ), patch.object(iptables, "check", MagicMock(return_value=False)):
551 self.assertEqual(
552 iptables.append(table="filter", chain=None, rule=None, family="ipv4"),
553 "Error: Chain needs to be specified",
554 )
555
556 self.assertEqual(
557 iptables.append(
558 table="filter", chain="INPUT", rule=None, family="ipv4"
559 ),
560 "Error: Rule needs to be specified",
561 )
562
563 _rule = "m state --state RELATED,ESTABLISHED -j ACCEPT"
564 mock = MagicMock(side_effect=["", "SALT"])
565 with patch.dict(iptables.__salt__, {"cmd.run": mock}):
566 self.assertTrue(
567 iptables.append(
568 table="filter", chain="INPUT", rule=_rule, family="ipv4"
569 )
570 )
571
572 self.assertFalse(
573 iptables.append(
574 table="filter", chain="INPUT", rule=_rule, family="ipv4"
575 )
576 )
577
578 # 'insert' function tests: 1
579
580 def test_insert(self):
581 """
582 Test if it insert a rule into the specified table/chain,
583 at the specified position.
584 """
585 with patch.object(
586 iptables, "_has_option", MagicMock(return_value=True)
587 ), patch.object(iptables, "check", MagicMock(return_value=False)):
588 self.assertEqual(
589 iptables.insert(
590 table="filter", chain=None, position=None, rule=None, family="ipv4"
591 ),
592 "Error: Chain needs to be specified",
593 )
594
595 pos_err = "Error: Position needs to be specified or use append (-A)"
596 self.assertEqual(
597 iptables.insert(
598 table="filter",
599 chain="INPUT",
600 position=None,
601 rule=None,
602 family="ipv4",
603 ),
604 pos_err,
605 )
606
607 self.assertEqual(
608 iptables.insert(
609 table="filter", chain="INPUT", position=3, rule=None, family="ipv4"
610 ),
611 "Error: Rule needs to be specified",
612 )
613
614 _rule = "m state --state RELATED,ESTABLISHED -j ACCEPT"
615 mock = MagicMock(return_value=True)
616 with patch.dict(iptables.__salt__, {"cmd.run": mock}):
617 self.assertTrue(
618 iptables.insert(
619 table="filter",
620 chain="INPUT",
621 position=3,
622 rule=_rule,
623 family="ipv4",
624 )
625 )
626
627 # 'delete' function tests: 1
628
629 def test_delete(self):
630 """
631 Test if it delete a rule from the specified table/chain
632 """
633 with patch.object(iptables, "_has_option", MagicMock(return_value=True)):
634 _rule = "m state --state RELATED,ESTABLISHED -j ACCEPT"
635 self.assertEqual(
636 iptables.delete(
637 table="filter", chain=None, position=3, rule=_rule, family="ipv4"
638 ),
639 "Error: Only specify a position or a rule, not both",
640 )
641
642 mock = MagicMock(return_value=True)
643 with patch.dict(iptables.__salt__, {"cmd.run": mock}):
644 self.assertTrue(
645 iptables.delete(
646 table="filter",
647 chain="INPUT",
648 position=3,
649 rule="",
650 family="ipv4",
651 )
652 )
653
654 # 'flush' function tests: 1
655
656 def test_flush(self):
657 """
658 Test if it flush the chain in the specified table,
659 flush all chains in the specified table if not specified chain.
660 """
661 with patch.object(iptables, "_has_option", MagicMock(return_value=True)):
662 mock_cmd = MagicMock(return_value=True)
663 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
664 self.assertTrue(
665 iptables.flush(table="filter", chain="INPUT", family="ipv4")
666 )