"Fossies" - the Fresh Open Source Software Archive

Member "pdns-auth-4.2.0/regression-tests.dnsdist/test_Spoofing.py" (27 Aug 2019, 15171 Bytes) of package /linux/misc/dns/pdns-auth-4.2.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "test_Spoofing.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 4.1.13_vs_4.2.0.

    1 #!/usr/bin/env python
    2 import dns
    3 from dnsdisttests import DNSDistTest
    4 
    5 class TestSpoofingSpoof(DNSDistTest):
    6 
    7     _config_template = """
    8     addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
    9     addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
   10     addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
   11     newServer{address="127.0.0.1:%s"}
   12     """
   13 
   14     def testSpoofActionA(self):
   15         """
   16         Spoofing: Spoof A via Action
   17 
   18         Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
   19         check that dnsdist sends a spoofed result.
   20         """
   21         name = 'spoofaction.spoofing.tests.powerdns.com.'
   22         query = dns.message.make_query(name, 'A', 'IN')
   23         # dnsdist set RA = RD for spoofed responses
   24         query.flags &= ~dns.flags.RD
   25         expectedResponse = dns.message.make_response(query)
   26         rrset = dns.rrset.from_text(name,
   27                                     60,
   28                                     dns.rdataclass.IN,
   29                                     dns.rdatatype.A,
   30                                     '192.0.2.1')
   31         expectedResponse.answer.append(rrset)
   32 
   33         for method in ("sendUDPQuery", "sendTCPQuery"):
   34             sender = getattr(self, method)
   35             (_, receivedResponse) = sender(query, response=None, useQueue=False)
   36             self.assertTrue(receivedResponse)
   37             self.assertEquals(expectedResponse, receivedResponse)
   38 
   39     def testSpoofActionAAAA(self):
   40         """
   41         Spoofing: Spoof AAAA via Action
   42 
   43         Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
   44         check that dnsdist sends a spoofed result.
   45         """
   46         name = 'spoofaction.spoofing.tests.powerdns.com.'
   47         query = dns.message.make_query(name, 'AAAA', 'IN')
   48         # dnsdist set RA = RD for spoofed responses
   49         query.flags &= ~dns.flags.RD
   50         expectedResponse = dns.message.make_response(query)
   51         rrset = dns.rrset.from_text(name,
   52                                     60,
   53                                     dns.rdataclass.IN,
   54                                     dns.rdatatype.AAAA,
   55                                     '2001:DB8::1')
   56         expectedResponse.answer.append(rrset)
   57 
   58         for method in ("sendUDPQuery", "sendTCPQuery"):
   59             sender = getattr(self, method)
   60             (_, receivedResponse) = sender(query, response=None, useQueue=False)
   61             self.assertTrue(receivedResponse)
   62             self.assertEquals(expectedResponse, receivedResponse)
   63 
   64     def testSpoofActionCNAME(self):
   65         """
   66         Spoofing: Spoof CNAME via Action
   67 
   68         Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
   69         check that dnsdist sends a spoofed result.
   70         """
   71         name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
   72         query = dns.message.make_query(name, 'A', 'IN')
   73         # dnsdist set RA = RD for spoofed responses
   74         query.flags &= ~dns.flags.RD
   75         expectedResponse = dns.message.make_response(query)
   76         rrset = dns.rrset.from_text(name,
   77                                     60,
   78                                     dns.rdataclass.IN,
   79                                     dns.rdatatype.CNAME,
   80                                     'cnameaction.spoofing.tests.powerdns.com.')
   81         expectedResponse.answer.append(rrset)
   82 
   83         for method in ("sendUDPQuery", "sendTCPQuery"):
   84             sender = getattr(self, method)
   85             (_, receivedResponse) = sender(query, response=None, useQueue=False)
   86             self.assertTrue(receivedResponse)
   87             self.assertEquals(expectedResponse, receivedResponse)
   88 
   89     def testSpoofActionMultiA(self):
   90         """
   91         Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
   92 
   93         Send an A query for "multispoof.spoofing.tests.powerdns.com.",
   94         check that dnsdist sends a spoofed result.
   95         """
   96         name = 'multispoof.spoofing.tests.powerdns.com.'
   97         query = dns.message.make_query(name, 'A', 'IN')
   98         # dnsdist set RA = RD for spoofed responses
   99         query.flags &= ~dns.flags.RD
  100         expectedResponse = dns.message.make_response(query)
  101         rrset = dns.rrset.from_text(name,
  102                                     60,
  103                                     dns.rdataclass.IN,
  104                                     dns.rdatatype.A,
  105                                     '192.0.2.2', '192.0.2.1')
  106         expectedResponse.answer.append(rrset)
  107 
  108         for method in ("sendUDPQuery", "sendTCPQuery"):
  109             sender = getattr(self, method)
  110             (_, receivedResponse) = sender(query, response=None, useQueue=False)
  111             self.assertTrue(receivedResponse)
  112             self.assertEquals(expectedResponse, receivedResponse)
  113 
  114     def testSpoofActionMultiAAAA(self):
  115         """
  116         Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
  117 
  118         Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
  119         check that dnsdist sends a spoofed result.
  120         """
  121         name = 'multispoof.spoofing.tests.powerdns.com.'
  122         query = dns.message.make_query(name, 'AAAA', 'IN')
  123         # dnsdist set RA = RD for spoofed responses
  124         query.flags &= ~dns.flags.RD
  125         expectedResponse = dns.message.make_response(query)
  126         rrset = dns.rrset.from_text(name,
  127                                     60,
  128                                     dns.rdataclass.IN,
  129                                     dns.rdatatype.AAAA,
  130                                     '2001:DB8::1', '2001:DB8::2')
  131         expectedResponse.answer.append(rrset)
  132 
  133         for method in ("sendUDPQuery", "sendTCPQuery"):
  134             sender = getattr(self, method)
  135             (_, receivedResponse) = sender(query, response=None, useQueue=False)
  136             self.assertTrue(receivedResponse)
  137             self.assertEquals(expectedResponse, receivedResponse)
  138 
  139     def testSpoofActionMultiANY(self):
  140         """
  141         Spoofing: Spoof multiple addresses via AddDomainSpoof
  142 
  143         Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
  144         check that dnsdist sends a spoofed result.
  145         """
  146         name = 'multispoof.spoofing.tests.powerdns.com.'
  147         query = dns.message.make_query(name, 'ANY', 'IN')
  148         # dnsdist set RA = RD for spoofed responses
  149         query.flags &= ~dns.flags.RD
  150         expectedResponse = dns.message.make_response(query)
  151 
  152         rrset = dns.rrset.from_text(name,
  153                                     60,
  154                                     dns.rdataclass.IN,
  155                                     dns.rdatatype.A,
  156                                     '192.0.2.2', '192.0.2.1')
  157         expectedResponse.answer.append(rrset)
  158 
  159         rrset = dns.rrset.from_text(name,
  160                                     60,
  161                                     dns.rdataclass.IN,
  162                                     dns.rdatatype.AAAA,
  163                                     '2001:DB8::1', '2001:DB8::2')
  164         expectedResponse.answer.append(rrset)
  165 
  166         for method in ("sendUDPQuery", "sendTCPQuery"):
  167             sender = getattr(self, method)
  168             (_, receivedResponse) = sender(query, response=None, useQueue=False)
  169             self.assertTrue(receivedResponse)
  170             self.assertEquals(expectedResponse, receivedResponse)
  171 
  172 class TestSpoofingLuaSpoof(DNSDistTest):
  173 
  174     _config_template = """
  175     function spoof1rule(dq)
  176         if(dq.qtype==1) -- A
  177         then
  178                 return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
  179         elseif(dq.qtype == 28) -- AAAA
  180         then
  181                 return DNSAction.Spoof, "2001:DB8::1"
  182         else
  183                 return DNSAction.None, ""
  184         end
  185     end
  186     function spoof2rule(dq)
  187         return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
  188     end
  189     addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
  190     addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
  191     newServer{address="127.0.0.1:%s"}
  192     """
  193 
  194     def testLuaSpoofA(self):
  195         """
  196         Spoofing: Spoofing an A via Lua
  197 
  198         Send an A query to "luaspoof1.spoofing.tests.powerdns.com.",
  199         check that dnsdist sends a spoofed result.
  200         """
  201         name = 'luaspoof1.spoofing.tests.powerdns.com.'
  202         query = dns.message.make_query(name, 'A', 'IN')
  203         # dnsdist set RA = RD for spoofed responses
  204         query.flags &= ~dns.flags.RD
  205         expectedResponse = dns.message.make_response(query)
  206         rrset = dns.rrset.from_text(name,
  207                                     60,
  208                                     dns.rdataclass.IN,
  209                                     dns.rdatatype.A,
  210                                     '192.0.2.1', '192.0.2.2')
  211         expectedResponse.answer.append(rrset)
  212 
  213         for method in ("sendUDPQuery", "sendTCPQuery"):
  214             sender = getattr(self, method)
  215             (_, receivedResponse) = sender(query, response=None, useQueue=False)
  216             self.assertTrue(receivedResponse)
  217             self.assertEquals(expectedResponse, receivedResponse)
  218 
  219     def testLuaSpoofAAAA(self):
  220         """
  221         Spoofing: Spoofing an AAAA via Lua
  222 
  223         Send an AAAA query to "luaspoof1.spoofing.tests.powerdns.com.",
  224         check that dnsdist sends a spoofed result.
  225         """
  226         name = 'luaspoof1.spoofing.tests.powerdns.com.'
  227         query = dns.message.make_query(name, 'AAAA', 'IN')
  228         # dnsdist set RA = RD for spoofed responses
  229         query.flags &= ~dns.flags.RD
  230         expectedResponse = dns.message.make_response(query)
  231         rrset = dns.rrset.from_text(name,
  232                                     60,
  233                                     dns.rdataclass.IN,
  234                                     dns.rdatatype.AAAA,
  235                                     '2001:DB8::1')
  236         expectedResponse.answer.append(rrset)
  237 
  238         for method in ("sendUDPQuery", "sendTCPQuery"):
  239             sender = getattr(self, method)
  240             (_, receivedResponse) = sender(query, response=None, useQueue=False)
  241             self.assertTrue(receivedResponse)
  242             self.assertEquals(expectedResponse, receivedResponse)
  243 
  244     def testLuaSpoofAWithCNAME(self):
  245         """
  246         Spoofing: Spoofing an A with a CNAME via Lua
  247 
  248         Send an A query to "luaspoof2.spoofing.tests.powerdns.com.",
  249         check that dnsdist sends a spoofed result.
  250         """
  251         name = 'luaspoof2.spoofing.tests.powerdns.com.'
  252         query = dns.message.make_query(name, 'A', 'IN')
  253         # dnsdist set RA = RD for spoofed responses
  254         query.flags &= ~dns.flags.RD
  255         expectedResponse = dns.message.make_response(query)
  256         rrset = dns.rrset.from_text(name,
  257                                     60,
  258                                     dns.rdataclass.IN,
  259                                     dns.rdatatype.CNAME,
  260                                     'spoofedcname.spoofing.tests.powerdns.com.')
  261         expectedResponse.answer.append(rrset)
  262 
  263         for method in ("sendUDPQuery", "sendTCPQuery"):
  264             sender = getattr(self, method)
  265             (_, receivedResponse) = sender(query, response=None, useQueue=False)
  266             self.assertTrue(receivedResponse)
  267             self.assertEquals(expectedResponse, receivedResponse)
  268 
  269     def testLuaSpoofAAAAWithCNAME(self):
  270         """
  271         Spoofing: Spoofing an AAAA with a CNAME via Lua
  272 
  273         Send an AAAA query to "luaspoof2.spoofing.tests.powerdns.com.",
  274         check that dnsdist sends a spoofed result.
  275         """
  276         name = 'luaspoof2.spoofing.tests.powerdns.com.'
  277         query = dns.message.make_query(name, 'AAAA', 'IN')
  278         # dnsdist set RA = RD for spoofed responses
  279         query.flags &= ~dns.flags.RD
  280         expectedResponse = dns.message.make_response(query)
  281         rrset = dns.rrset.from_text(name,
  282                                     60,
  283                                     dns.rdataclass.IN,
  284                                     dns.rdatatype.CNAME,
  285                                     'spoofedcname.spoofing.tests.powerdns.com.')
  286         expectedResponse.answer.append(rrset)
  287 
  288         for method in ("sendUDPQuery", "sendTCPQuery"):
  289             sender = getattr(self, method)
  290             (_, receivedResponse) = sender(query, response=None, useQueue=False)
  291             self.assertTrue(receivedResponse)
  292             self.assertEquals(expectedResponse, receivedResponse)
  293 
  294 class TestSpoofingLuaWithStatistics(DNSDistTest):
  295 
  296     _config_template = """
  297     function spoof1rule(dq)
  298         queriesCount = getStatisticsCounters()['queries']
  299         if(queriesCount == 1) then
  300                 return DNSAction.Spoof, "192.0.2.1"
  301         elseif(queriesCount == 2) then
  302                 return DNSAction.Spoof, "192.0.2.2"
  303         else
  304                 return DNSAction.Spoof, "192.0.2.0"
  305         end
  306     end
  307     addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
  308     newServer{address="127.0.0.1:%s"}
  309     """
  310 
  311     def testLuaSpoofBasedOnStatistics(self):
  312         """
  313         Spoofing: Spoofing an A via Lua based on statistics counters
  314 
  315         """
  316         name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
  317         query = dns.message.make_query(name, 'A', 'IN')
  318         # dnsdist set RA = RD for spoofed responses
  319         query.flags &= ~dns.flags.RD
  320         expectedResponse1 = dns.message.make_response(query)
  321         rrset = dns.rrset.from_text(name,
  322                                     60,
  323                                     dns.rdataclass.IN,
  324                                     dns.rdatatype.A,
  325                                     '192.0.2.1')
  326         expectedResponse1.answer.append(rrset)
  327         expectedResponse2 = dns.message.make_response(query)
  328         rrset = dns.rrset.from_text(name,
  329                                     60,
  330                                     dns.rdataclass.IN,
  331                                     dns.rdatatype.A,
  332                                     '192.0.2.2')
  333         expectedResponse2.answer.append(rrset)
  334         expectedResponseAfterwards = dns.message.make_response(query)
  335         rrset = dns.rrset.from_text(name,
  336                                     60,
  337                                     dns.rdataclass.IN,
  338                                     dns.rdatatype.A,
  339                                     '192.0.2.0')
  340         expectedResponseAfterwards.answer.append(rrset)
  341 
  342         (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
  343         self.assertTrue(receivedResponse)
  344         self.assertEquals(expectedResponse1, receivedResponse)
  345 
  346         (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
  347         self.assertTrue(receivedResponse)
  348         self.assertEquals(expectedResponse2, receivedResponse)
  349 
  350         for method in ("sendUDPQuery", "sendTCPQuery"):
  351             sender = getattr(self, method)
  352             (_, receivedResponse) = sender(query, response=None, useQueue=False)
  353             self.assertTrue(receivedResponse)
  354             self.assertEquals(expectedResponseAfterwards, receivedResponse)