"Fossies" - the Fresh Open Source Software Archive

Member "salt-3002.2/tests/unit/modules/test_iptables.py" (18 Nov 2020, 23051 Bytes) of package /linux/misc/salt-3002.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_iptables.py": 3002.1_vs_3002.2.

    1 """
    2     :codeauthor: Jayesh Kariya <jayeshk@saltstack.com>
    3 """
    4 
    5 
    6 import textwrap
    7 import uuid
    8 
    9 import salt.modules.iptables as iptables
   10 from tests.support.mixins import LoaderModuleMockMixin
   11 from tests.support.mock import MagicMock, mock_open, patch
   12 from tests.support.unit import TestCase
   13 
   14 
   15 class IptablesTestCase(TestCase, LoaderModuleMockMixin):
   16     """
   17     Test cases for salt.modules.iptables
   18     """
   19 
   20     def setup_loader_modules(self):
   21         return {iptables: {}}
   22 
   23     # 'version' function tests: 1
   24 
   25     def test_version(self):
   26         """
   27         Test if it return version from iptables --version
   28         """
   29         mock = MagicMock(return_value="iptables v1.4.21")
   30         with patch.dict(iptables.__salt__, {"cmd.run": mock}):
   31             self.assertEqual(iptables.version(), "v1.4.21")
   32 
   33     # 'build_rule' function tests: 1
   34 
   35     def test_build_rule(self):
   36         """
   37         Test if it build a well-formatted iptables rule based on kwargs.
   38         """
   39         with patch.object(iptables, "_has_option", MagicMock(return_value=True)):
   40             self.assertEqual(iptables.build_rule(), "")
   41 
   42             self.assertEqual(
   43                 iptables.build_rule(name="ignored", state="ignored"),
   44                 "",
   45                 "build_rule should ignore name and state",
   46             )
   47 
   48             # Should properly negate bang-prefixed values
   49             self.assertEqual(iptables.build_rule(**{"if": "!eth0"}), "! -i eth0")
   50 
   51             # Should properly negate "not"-prefixed values
   52             self.assertEqual(iptables.build_rule(**{"if": "not eth0"}), "! -i eth0")
   53 
   54             self.assertEqual(
   55                 iptables.build_rule(**{"protocol": "tcp", "syn": "!"}), "-p tcp ! --syn"
   56             )
   57 
   58             self.assertEqual(
   59                 iptables.build_rule(dports=[80, 443], protocol="tcp"),
   60                 "-p tcp -m multiport --dports 80,443",
   61             )
   62 
   63             self.assertEqual(
   64                 iptables.build_rule(dports="80,443", protocol="tcp"),
   65                 "-p tcp -m multiport --dports 80,443",
   66             )
   67 
   68             # Should it really behave this way?
   69             self.assertEqual(
   70                 iptables.build_rule(dports=["!80", 443], protocol="tcp"),
   71                 "-p tcp -m multiport ! --dports 80,443",
   72             )
   73 
   74             self.assertEqual(
   75                 iptables.build_rule(dports="!80,443", protocol="tcp"),
   76                 "-p tcp -m multiport ! --dports 80,443",
   77             )
   78 
   79             self.assertEqual(
   80                 iptables.build_rule(sports=[80, 443], protocol="tcp"),
   81                 "-p tcp -m multiport --sports 80,443",
   82             )
   83 
   84             self.assertEqual(
   85                 iptables.build_rule(sports="80,443", protocol="tcp"),
   86                 "-p tcp -m multiport --sports 80,443",
   87             )
   88 
   89             self.assertEqual(
   90                 iptables.build_rule(
   91                     "filter",
   92                     "INPUT",
   93                     command="I",
   94                     position="3",
   95                     full=True,
   96                     dports="protocol",
   97                     jump="ACCEPT",
   98                 ),
   99                 "Error: protocol must be specified",
  100             )
  101 
  102             self.assertEqual(
  103                 iptables.build_rule(
  104                     "filter",
  105                     "INPUT",
  106                     command="I",
  107                     position="3",
  108                     full=True,
  109                     sports="protocol",
  110                     jump="ACCEPT",
  111                 ),
  112                 "Error: protocol must be specified",
  113             )
  114 
  115             self.assertEqual(
  116                 iptables.build_rule(
  117                     "",
  118                     "INPUT",
  119                     command="I",
  120                     position="3",
  121                     full="True",
  122                     match="state",
  123                     jump="ACCEPT",
  124                 ),
  125                 "Error: Table needs to be specified",
  126             )
  127 
  128             self.assertEqual(
  129                 iptables.build_rule(
  130                     "filter",
  131                     "",
  132                     command="I",
  133                     position="3",
  134                     full="True",
  135                     match="state",
  136                     jump="ACCEPT",
  137                 ),
  138                 "Error: Chain needs to be specified",
  139             )
  140 
  141             self.assertEqual(
  142                 iptables.build_rule(
  143                     "filter",
  144                     "INPUT",
  145                     command="",
  146                     position="3",
  147                     full="True",
  148                     match="state",
  149                     jump="ACCEPT",
  150                 ),
  151                 "Error: Command needs to be specified",
  152             )
  153 
  154             # Test arguments that should appear after the --jump
  155             self.assertEqual(
  156                 iptables.build_rule(jump="REDIRECT", **{"to-port": 8080}),
  157                 "--jump REDIRECT --to-port 8080",
  158             )
  159 
  160             # Should quote arguments with spaces, like log-prefix often has
  161             self.assertEqual(
  162                 iptables.build_rule(jump="LOG", **{"log-prefix": "long prefix"}),
  163                 '--jump LOG --log-prefix "long prefix"',
  164             )
  165 
  166             # Should quote arguments with leading or trailing spaces
  167             self.assertEqual(
  168                 iptables.build_rule(jump="LOG", **{"log-prefix": "spam: "}),
  169                 '--jump LOG --log-prefix "spam: "',
  170             )
  171 
  172             # Should allow no-arg jump options
  173             self.assertEqual(
  174                 iptables.build_rule(jump="CLUSTERIP", **{"new": ""}),
  175                 "--jump CLUSTERIP --new",
  176             )
  177 
  178             # Should allow no-arg jump options as None
  179             self.assertEqual(
  180                 iptables.build_rule(jump="CT", **{"notrack": None}),
  181                 "--jump CT --notrack",
  182             )
  183 
  184             # should build match-sets with single string
  185             self.assertEqual(
  186                 iptables.build_rule(**{"match-set": "src flag1,flag2"}),
  187                 "-m set --match-set src flag1,flag2",
  188             )
  189 
  190             # should build match-sets as list
  191             match_sets = [
  192                 "src1 flag1",
  193                 "src2 flag2,flag3",
  194             ]
  195             self.assertEqual(
  196                 iptables.build_rule(**{"match-set": match_sets}),
  197                 "-m set --match-set src1 flag1 -m set --match-set src2 flag2,flag3",
  198             )
  199 
  200             # should handle negations for string match-sets
  201             self.assertEqual(
  202                 iptables.build_rule(**{"match-set": "!src flag"}),
  203                 "-m set ! --match-set src flag",
  204             )
  205 
  206             # should handle negations for list match-sets
  207             match_sets = ["src1 flag", "not src2 flag2"]
  208             self.assertEqual(
  209                 iptables.build_rule(**{"match-set": match_sets}),
  210                 "-m set --match-set src1 flag -m set ! --match-set src2 flag2",
  211             )
  212 
  213             # should allow escaped name
  214             self.assertEqual(
  215                 iptables.build_rule(**{"match": "recent", "name_": "SSH"}),
  216                 "-m recent --name SSH",
  217             )
  218 
  219             # should allow empty arguments
  220             self.assertEqual(
  221                 iptables.build_rule(**{"match": "recent", "update": None}),
  222                 "-m recent --update",
  223             )
  224 
  225             # Should allow the --save jump option to CONNSECMARK
  226             # self.assertEqual(iptables.build_rule(jump='CONNSECMARK',
  227             #                                     **{'save': ''}),
  228             #                 '--jump CONNSECMARK --save ')
  229 
  230             ret = "/sbin/iptables --wait -t salt -I INPUT 3 -m state --jump ACCEPT"
  231             with patch.object(
  232                 iptables, "_iptables_cmd", MagicMock(return_value="/sbin/iptables")
  233             ):
  234                 self.assertEqual(
  235                     iptables.build_rule(
  236                         "salt",
  237                         "INPUT",
  238                         command="I",
  239                         position="3",
  240                         full="True",
  241                         match="state",
  242                         jump="ACCEPT",
  243                     ),
  244                     ret,
  245                 )
  246 
  247     # 'get_saved_rules' function tests: 2
  248 
  249     def test_get_saved_rules(self):
  250         """
  251         Test if it return a data structure of the rules in the conf file
  252         """
  253         mock = MagicMock(return_value=False)
  254         with patch.object(iptables, "_parse_conf", mock):
  255             self.assertFalse(iptables.get_saved_rules())
  256             mock.assert_called_with(conf_file=None, family="ipv4")
  257 
  258     def test_get_saved_rules_nilinuxrt(self):
  259         """
  260         Test get rules on NILinuxRT system
  261         """
  262         data = {
  263             "/etc/natinst/share/iptables.conf": textwrap.dedent(
  264                 """\
  265                 # Generated by iptables-save v1.6.2 on Thu Oct  8 12:13:44 2020
  266                 *filter
  267                 :INPUT ACCEPT [2958:584773]
  268                 :FORWARD ACCEPT [0:0]
  269                 :OUTPUT ACCEPT [92:23648]
  270                 -A INPUT -p tcp -m tcp --dport 80 -j ACCEPT
  271                 COMMIT
  272                 # Completed on Thu Oct  8 12:13:44 2020
  273                 """
  274             )
  275         }
  276         expected_input_rules = [
  277             {
  278                 "protocol": ["tcp"],
  279                 "jump": ["ACCEPT"],
  280                 "match": ["tcp"],
  281                 "destination_port": ["80"],
  282             }
  283         ]
  284         file_mock = mock_open(read_data=data)
  285         with patch.dict(
  286             iptables.__grains__, {"os_family": "NILinuxRT", "os": "NILinuxRT"}
  287         ):
  288             with patch.object(iptables.salt.utils.files, "fopen", file_mock):
  289                 rules = iptables.get_saved_rules()
  290                 self.assertEqual(
  291                     expected_input_rules, rules["filter"]["INPUT"]["rules"]
  292                 )
  293 
  294     # 'get_rules' function tests: 1
  295 
  296     def test_get_rules(self):
  297         """
  298         Test if it return a data structure of the current, in-memory rules
  299         """
  300         mock = MagicMock(return_value=False)
  301         with patch.object(iptables, "_parse_conf", mock):
  302             self.assertFalse(iptables.get_rules())
  303             mock.assert_called_with(in_mem=True, family="ipv4")
  304 
  305     # 'get_saved_policy' function tests: 1
  306 
  307     def test_get_saved_policy(self):
  308         """
  309         Test if it return the current policy for the specified table/chain
  310         """
  311         self.assertEqual(
  312             iptables.get_saved_policy(
  313                 table="filter", chain=None, conf_file=None, family="ipv4"
  314             ),
  315             "Error: Chain needs to be specified",
  316         )
  317 
  318         with patch.object(
  319             iptables,
  320             "_parse_conf",
  321             MagicMock(return_value={"filter": {"INPUT": {"policy": True}}}),
  322         ):
  323             self.assertTrue(
  324                 iptables.get_saved_policy(
  325                     table="filter", chain="INPUT", conf_file=None, family="ipv4"
  326                 )
  327             )
  328 
  329         with patch.object(
  330             iptables,
  331             "_parse_conf",
  332             MagicMock(return_value={"filter": {"INPUT": {"policy1": True}}}),
  333         ):
  334             self.assertIsNone(
  335                 iptables.get_saved_policy(
  336                     table="filter", chain="INPUT", conf_file=None, family="ipv4"
  337                 )
  338             )
  339 
  340     # 'get_policy' function tests: 1
  341 
  342     def test_get_policy(self):
  343         """
  344         Test if it return the current policy for the specified table/chain
  345         """
  346         self.assertEqual(
  347             iptables.get_policy(table="filter", chain=None, family="ipv4"),
  348             "Error: Chain needs to be specified",
  349         )
  350 
  351         with patch.object(
  352             iptables,
  353             "_parse_conf",
  354             MagicMock(return_value={"filter": {"INPUT": {"policy": True}}}),
  355         ):
  356             self.assertTrue(
  357                 iptables.get_policy(table="filter", chain="INPUT", family="ipv4")
  358             )
  359 
  360         with patch.object(
  361             iptables,
  362             "_parse_conf",
  363             MagicMock(return_value={"filter": {"INPUT": {"policy1": True}}}),
  364         ):
  365             self.assertIsNone(
  366                 iptables.get_policy(table="filter", chain="INPUT", family="ipv4")
  367             )
  368 
  369     # 'set_policy' function tests: 1
  370 
  371     def test_set_policy(self):
  372         """
  373         Test if it set the current policy for the specified table/chain
  374         """
  375         with patch.object(iptables, "_has_option", MagicMock(return_value=True)):
  376             self.assertEqual(
  377                 iptables.set_policy(
  378                     table="filter", chain=None, policy=None, family="ipv4"
  379                 ),
  380                 "Error: Chain needs to be specified",
  381             )
  382 
  383             self.assertEqual(
  384                 iptables.set_policy(
  385                     table="filter", chain="INPUT", policy=None, family="ipv4"
  386                 ),
  387                 "Error: Policy needs to be specified",
  388             )
  389 
  390             mock = MagicMock(return_value=True)
  391             with patch.dict(iptables.__salt__, {"cmd.run": mock}):
  392                 self.assertTrue(
  393                     iptables.set_policy(
  394                         table="filter", chain="INPUT", policy="ACCEPT", family="ipv4"
  395                     )
  396                 )
  397 
  398     # 'save' function tests: 1
  399 
  400     def test_save(self):
  401         """
  402         Test if it save the current in-memory rules to disk
  403         """
  404         with patch("salt.modules.iptables._conf", MagicMock(return_value=False)), patch(
  405             "os.path.isdir", MagicMock(return_value=True)
  406         ):
  407             mock = MagicMock(return_value=True)
  408             with patch.dict(
  409                 iptables.__salt__,
  410                 {
  411                     "cmd.run": mock,
  412                     "file.write": mock,
  413                     "config.option": MagicMock(return_value=[]),
  414                 },
  415             ):
  416                 self.assertTrue(iptables.save(filename="/xyz", family="ipv4"))
  417 
  418     # 'check' function tests: 1
  419 
  420     def test_check(self):
  421         """
  422         Test if it check for the existence of a rule in the table and chain
  423         """
  424         self.assertEqual(
  425             iptables.check(table="filter", chain=None, rule=None, family="ipv4"),
  426             "Error: Chain needs to be specified",
  427         )
  428 
  429         self.assertEqual(
  430             iptables.check(table="filter", chain="INPUT", rule=None, family="ipv4"),
  431             "Error: Rule needs to be specified",
  432         )
  433 
  434         mock_rule = "m state --state RELATED,ESTABLISHED -j ACCEPT"
  435         mock_chain = "INPUT"
  436         mock_uuid = 31337
  437         mock_cmd = MagicMock(
  438             return_value="-A {}\n-A {}".format(mock_chain, hex(mock_uuid))
  439         )
  440         mock_has = MagicMock(return_value=True)
  441         mock_not = MagicMock(return_value=False)
  442 
  443         with patch.object(iptables, "_has_option", mock_not):
  444             with patch.object(uuid, "getnode", MagicMock(return_value=mock_uuid)):
  445                 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
  446                     self.assertTrue(
  447                         iptables.check(
  448                             table="filter",
  449                             chain=mock_chain,
  450                             rule=mock_rule,
  451                             family="ipv4",
  452                         )
  453                     )
  454 
  455         mock_cmd = MagicMock(return_value="")
  456 
  457         with patch.object(iptables, "_has_option", mock_not):
  458             with patch.object(uuid, "getnode", MagicMock(return_value=mock_uuid)):
  459                 with patch.dict(
  460                     iptables.__salt__, {"cmd.run": MagicMock(return_value="")}
  461                 ):
  462                     self.assertFalse(
  463                         iptables.check(
  464                             table="filter",
  465                             chain=mock_chain,
  466                             rule=mock_rule,
  467                             family="ipv4",
  468                         )
  469                     )
  470 
  471         with patch.object(iptables, "_has_option", mock_has):
  472             with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
  473                 self.assertTrue(
  474                     iptables.check(
  475                         table="filter", chain="INPUT", rule=mock_rule, family="ipv4"
  476                     )
  477                 )
  478 
  479         mock_cmd = MagicMock(return_value="-A 0x4d2")
  480         mock_uuid = MagicMock(return_value=1234)
  481 
  482         with patch.object(iptables, "_has_option", mock_has):
  483             with patch.object(uuid, "getnode", mock_uuid):
  484                 with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
  485                     self.assertTrue(
  486                         iptables.check(
  487                             table="filter", chain="0x4d2", rule=mock_rule, family="ipv4"
  488                         )
  489                     )
  490 
  491     # 'check_chain' function tests: 1
  492 
  493     def test_check_chain(self):
  494         """
  495         Test if it check for the existence of a chain in the table
  496         """
  497         self.assertEqual(
  498             iptables.check_chain(table="filter", chain=None, family="ipv4"),
  499             "Error: Chain needs to be specified",
  500         )
  501 
  502         mock_cmd = MagicMock(return_value="")
  503         with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
  504             self.assertFalse(
  505                 iptables.check_chain(table="filter", chain="INPUT", family="ipv4")
  506             )
  507 
  508     # 'new_chain' function tests: 1
  509 
  510     def test_new_chain(self):
  511         """
  512         Test if it create new custom chain to the specified table.
  513         """
  514         self.assertEqual(
  515             iptables.new_chain(table="filter", chain=None, family="ipv4"),
  516             "Error: Chain needs to be specified",
  517         )
  518 
  519         mock_cmd = MagicMock(return_value="")
  520         with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
  521             self.assertTrue(
  522                 iptables.new_chain(table="filter", chain="INPUT", family="ipv4")
  523             )
  524 
  525     # 'delete_chain' function tests: 1
  526 
  527     def test_delete_chain(self):
  528         """
  529         Test if it delete custom chain to the specified table.
  530         """
  531         self.assertEqual(
  532             iptables.delete_chain(table="filter", chain=None, family="ipv4"),
  533             "Error: Chain needs to be specified",
  534         )
  535 
  536         mock_cmd = MagicMock(return_value="")
  537         with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
  538             self.assertTrue(
  539                 iptables.delete_chain(table="filter", chain="INPUT", family="ipv4")
  540             )
  541 
  542     # 'append' function tests: 1
  543 
  544     def test_append(self):
  545         """
  546         Test if it append a rule to the specified table/chain.
  547         """
  548         with patch.object(
  549             iptables, "_has_option", MagicMock(return_value=True)
  550         ), patch.object(iptables, "check", MagicMock(return_value=False)):
  551             self.assertEqual(
  552                 iptables.append(table="filter", chain=None, rule=None, family="ipv4"),
  553                 "Error: Chain needs to be specified",
  554             )
  555 
  556             self.assertEqual(
  557                 iptables.append(
  558                     table="filter", chain="INPUT", rule=None, family="ipv4"
  559                 ),
  560                 "Error: Rule needs to be specified",
  561             )
  562 
  563             _rule = "m state --state RELATED,ESTABLISHED -j ACCEPT"
  564             mock = MagicMock(side_effect=["", "SALT"])
  565             with patch.dict(iptables.__salt__, {"cmd.run": mock}):
  566                 self.assertTrue(
  567                     iptables.append(
  568                         table="filter", chain="INPUT", rule=_rule, family="ipv4"
  569                     )
  570                 )
  571 
  572                 self.assertFalse(
  573                     iptables.append(
  574                         table="filter", chain="INPUT", rule=_rule, family="ipv4"
  575                     )
  576                 )
  577 
  578     # 'insert' function tests: 1
  579 
  580     def test_insert(self):
  581         """
  582         Test if it insert a rule into the specified table/chain,
  583         at the specified position.
  584         """
  585         with patch.object(
  586             iptables, "_has_option", MagicMock(return_value=True)
  587         ), patch.object(iptables, "check", MagicMock(return_value=False)):
  588             self.assertEqual(
  589                 iptables.insert(
  590                     table="filter", chain=None, position=None, rule=None, family="ipv4"
  591                 ),
  592                 "Error: Chain needs to be specified",
  593             )
  594 
  595             pos_err = "Error: Position needs to be specified or use append (-A)"
  596             self.assertEqual(
  597                 iptables.insert(
  598                     table="filter",
  599                     chain="INPUT",
  600                     position=None,
  601                     rule=None,
  602                     family="ipv4",
  603                 ),
  604                 pos_err,
  605             )
  606 
  607             self.assertEqual(
  608                 iptables.insert(
  609                     table="filter", chain="INPUT", position=3, rule=None, family="ipv4"
  610                 ),
  611                 "Error: Rule needs to be specified",
  612             )
  613 
  614             _rule = "m state --state RELATED,ESTABLISHED -j ACCEPT"
  615             mock = MagicMock(return_value=True)
  616             with patch.dict(iptables.__salt__, {"cmd.run": mock}):
  617                 self.assertTrue(
  618                     iptables.insert(
  619                         table="filter",
  620                         chain="INPUT",
  621                         position=3,
  622                         rule=_rule,
  623                         family="ipv4",
  624                     )
  625                 )
  626 
  627     # 'delete' function tests: 1
  628 
  629     def test_delete(self):
  630         """
  631         Test if it delete a rule from the specified table/chain
  632         """
  633         with patch.object(iptables, "_has_option", MagicMock(return_value=True)):
  634             _rule = "m state --state RELATED,ESTABLISHED -j ACCEPT"
  635             self.assertEqual(
  636                 iptables.delete(
  637                     table="filter", chain=None, position=3, rule=_rule, family="ipv4"
  638                 ),
  639                 "Error: Only specify a position or a rule, not both",
  640             )
  641 
  642             mock = MagicMock(return_value=True)
  643             with patch.dict(iptables.__salt__, {"cmd.run": mock}):
  644                 self.assertTrue(
  645                     iptables.delete(
  646                         table="filter",
  647                         chain="INPUT",
  648                         position=3,
  649                         rule="",
  650                         family="ipv4",
  651                     )
  652                 )
  653 
  654     # 'flush' function tests: 1
  655 
  656     def test_flush(self):
  657         """
  658         Test if it flush the chain in the specified table,
  659         flush all chains in the specified table if not specified chain.
  660         """
  661         with patch.object(iptables, "_has_option", MagicMock(return_value=True)):
  662             mock_cmd = MagicMock(return_value=True)
  663             with patch.dict(iptables.__salt__, {"cmd.run": mock_cmd}):
  664                 self.assertTrue(
  665                     iptables.flush(table="filter", chain="INPUT", family="ipv4")
  666                 )