"Fossies" - the Fresh Open Source Software Archive

Member "fail2ban-0.11.1/fail2ban/tests/banmanagertestcase.py" (11 Jan 2020, 9344 Bytes) of package /linux/misc/fail2ban-0.11.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "banmanagertestcase.py": 0.10.5_vs_0.11.1.

    1 # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
    2 # vi: set ft=python sts=4 ts=4 sw=4 noet :
    3 
    4 # This file is part of Fail2Ban.
    5 #
    6 # Fail2Ban is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # Fail2Ban is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with Fail2Ban; if not, write to the Free Software
   18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   19 
   20 # Author: Cyril Jaquier
   21 # 
   22 
   23 __author__ = "Cyril Jaquier"
   24 __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
   25 __license__ = "GPL"
   26 
   27 import unittest
   28 
   29 from .utils import setUpMyTime, tearDownMyTime
   30 
   31 from ..server.banmanager import BanManager
   32 from ..server.ticket import BanTicket
   33 
   34 class AddFailure(unittest.TestCase):
   35     def setUp(self):
   36         """Call before every test case."""
   37         super(AddFailure, self).setUp()
   38         setUpMyTime()
   39         self.__ticket = BanTicket('193.168.0.128', 1167605999.0)
   40         self.__banManager = BanManager()
   41 
   42     def tearDown(self):
   43         """Call after every test case."""
   44         super(AddFailure, self).tearDown()
   45         tearDownMyTime()
   46 
   47     def testAdd(self):
   48         self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
   49         self.assertEqual(self.__banManager.size(), 1)
   50         self.assertEqual(self.__banManager.getBanTotal(), 1)
   51         self.__banManager.setBanTotal(0)
   52         self.assertEqual(self.__banManager.getBanTotal(), 0)
   53     
   54     def testAddDuplicate(self):
   55         self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
   56         self.assertFalse(self.__banManager.addBanTicket(self.__ticket))
   57         self.assertEqual(self.__banManager.size(), 1)
   58 
   59     def testAddDuplicateWithTime(self):
   60         defBanTime = self.__banManager.getBanTime()
   61         prevEndOfBanTime = 0
   62         # add again a duplicate :
   63         #   0) with same start time and the same (default) ban time
   64         #   1) with newer start time and the same (default) ban time
   65         #   2) with same start time and longer ban time
   66     #   3) with permanent ban time (-1)
   67         for tnew, btnew in (
   68             (1167605999.0,       None),
   69             (1167605999.0 + 100, None),
   70             (1167605999.0,       24*60*60),
   71             (1167605999.0,       -1),
   72         ):
   73             ticket1 = BanTicket('193.168.0.128', 1167605999.0)
   74             ticket2 = BanTicket('193.168.0.128', tnew)
   75             if btnew is not None:
   76                 ticket2.setBanTime(btnew)
   77             self.assertTrue(self.__banManager.addBanTicket(ticket1))
   78             self.assertFalse(self.__banManager.addBanTicket(ticket2))
   79             self.assertEqual(self.__banManager.size(), 1)
   80             # pop ticket and check it was prolonged :
   81             banticket = self.__banManager.getTicketByID(ticket2.getID())
   82             self.assertEqual(banticket.getEndOfBanTime(defBanTime), ticket2.getEndOfBanTime(defBanTime))
   83             self.assertTrue(banticket.getEndOfBanTime(defBanTime) > prevEndOfBanTime)
   84             prevEndOfBanTime = ticket1.getEndOfBanTime(defBanTime)
   85             # but the start time should not be changed (+ 100 is ignored):
   86             self.assertEqual(banticket.getTime(), 1167605999.0)
   87             # if prolong to permanent, it should also have permanent ban time:
   88             if btnew == -1:
   89                 self.assertEqual(banticket.getBanTime(defBanTime), -1)
   90 
   91     def testInListOK(self):
   92         self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
   93         ticket = BanTicket('193.168.0.128', 1167605999.0)
   94         self.assertTrue(self.__banManager._inBanList(ticket))
   95 
   96     def testInListNOK(self):
   97         self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
   98         ticket = BanTicket('111.111.1.111', 1167605999.0)
   99         self.assertFalse(self.__banManager._inBanList(ticket))
  100         
  101     def testBanTimeIncr(self):
  102         ticket = BanTicket(self.__ticket.getIP(), self.__ticket.getTime())
  103         ## increase twice and at end permanent, check time/count increase:
  104         c = 0
  105         for i in (1000, 2000, -1):
  106             self.__banManager.addBanTicket(self.__ticket); c += 1
  107             ticket.setBanTime(i)
  108             self.assertFalse(self.__banManager.addBanTicket(ticket)); # no incr of c (already banned)
  109             self.assertEqual(str(self.__banManager.getTicketByID(ticket.getIP())), 
  110                 "BanTicket: ip=%s time=%s bantime=%s bancount=%s #attempts=0 matches=[]" % (ticket.getIP(), ticket.getTime(), i, c))
  111         ## after permanent, it should remain permanent ban time (-1):
  112         self.__banManager.addBanTicket(self.__ticket); c += 1
  113         ticket.setBanTime(-1)
  114         self.assertFalse(self.__banManager.addBanTicket(ticket)); # no incr of c (already banned)
  115         ticket.setBanTime(1000)
  116         self.assertFalse(self.__banManager.addBanTicket(ticket)); # no incr of c (already banned)
  117         self.assertEqual(str(self.__banManager.getTicketByID(ticket.getIP())), 
  118             "BanTicket: ip=%s time=%s bantime=%s bancount=%s #attempts=0 matches=[]" % (ticket.getIP(), ticket.getTime(), -1, c))
  119 
  120     def testUnban(self):
  121         btime = self.__banManager.getBanTime()
  122         stime = self.__ticket.getTime()
  123         self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
  124         self.assertTrue(self.__banManager._inBanList(self.__ticket))
  125         self.assertEqual(self.__banManager.unBanList(stime), [])
  126         self.assertEqual(self.__banManager.unBanList(stime + btime + 1), [self.__ticket])
  127         self.assertEqual(self.__banManager.size(), 0)
  128         ## again, but now we will prolong ban-time and then try to unban again (1st too early):
  129         self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
  130         # prolong ban:
  131         ticket = BanTicket(self.__ticket.getID(), stime + 600)
  132         self.assertFalse(self.__banManager.addBanTicket(ticket))
  133         # try unban too early:
  134         self.assertEqual(len(self.__banManager.unBanList(stime + btime + 1)), 0)
  135         # try unban using correct time:
  136         self.assertEqual(len(self.__banManager.unBanList(stime + btime + 600 + 1)), 1)
  137         ## again, but now we test removing tickets particular (to test < 2/3-rule):
  138         for i in range(5):
  139             ticket = BanTicket('193.168.0.%s' % i, stime)
  140             ticket.setBanTime(ticket.getBanTime(btime) + i*10)
  141             self.assertTrue(self.__banManager.addBanTicket(ticket))
  142         self.assertEqual(len(self.__banManager.unBanList(stime + btime + 1*10 + 1)), 2)
  143         self.assertEqual(len(self.__banManager.unBanList(stime + btime + 5*10 + 1)), 3)
  144         self.assertEqual(self.__banManager.size(), 0)
  145 
  146     def testUnbanPermanent(self):
  147         btime = self.__banManager.getBanTime()
  148         self.__banManager.setBanTime(-1)
  149         try:
  150             self.assertTrue(self.__banManager.addBanTicket(self.__ticket))
  151             self.assertTrue(self.__banManager._inBanList(self.__ticket))
  152             self.assertEqual(self.__banManager.unBanList(self.__ticket.getTime() + btime + 1), [])
  153             self.assertEqual(self.__banManager.size(), 1)
  154         finally:
  155             self.__banManager.setBanTime(btime)
  156 
  157 
  158 class StatusExtendedCymruInfo(unittest.TestCase):
  159     def setUp(self):
  160         """Call before every test case."""
  161         super(StatusExtendedCymruInfo, self).setUp()
  162         unittest.F2B.SkipIfNoNetwork()
  163         setUpMyTime()
  164         self.__ban_ip = "93.184.216.34"
  165         self.__asn = "15133"
  166         self.__country = "EU"
  167         self.__rir = "ripencc"
  168         ticket = BanTicket(self.__ban_ip, 1167605999.0)
  169         self.__banManager = BanManager()
  170         self.assertTrue(self.__banManager.addBanTicket(ticket))
  171 
  172     def tearDown(self):
  173         """Call after every test case."""
  174         super(StatusExtendedCymruInfo, self).tearDown()
  175         tearDownMyTime()
  176 
  177     available = True, None
  178 
  179     def _getBanListExtendedCymruInfo(self):
  180         tc = StatusExtendedCymruInfo
  181         if tc.available[0]:
  182             cymru_info = self.__banManager.getBanListExtendedCymruInfo(
  183                 timeout=(2 if unittest.F2B.fast else 20))
  184         else: # pragma: no cover - availability (once after error case only)
  185             cymru_info = tc.available[1]
  186         if cymru_info.get("error"): # pragma: no cover - availability
  187             tc.available = False, cymru_info
  188             raise unittest.SkipTest('Skip test because service is not available: %s' % cymru_info["error"])
  189         return cymru_info
  190 
  191 
  192     def testCymruInfo(self):
  193         cymru_info = self._getBanListExtendedCymruInfo()
  194         self.assertDictEqual(cymru_info,
  195                           {"asn": [self.__asn],
  196                            "country": [self.__country],
  197                            "rir": [self.__rir]})
  198 
  199     def testCymruInfoASN(self):
  200         self.assertEqual(
  201             self.__banManager.geBanListExtendedASN(self._getBanListExtendedCymruInfo()),
  202             [self.__asn])
  203 
  204     def testCymruInfoCountry(self):
  205         self.assertEqual(
  206             self.__banManager.geBanListExtendedCountry(self._getBanListExtendedCymruInfo()),
  207             [self.__country])
  208 
  209     def testCymruInfoRIR(self):
  210         self.assertEqual(
  211             self.__banManager.geBanListExtendedRIR(self._getBanListExtendedCymruInfo()),
  212             [self.__rir])
  213 
  214     def testCymruInfoNxdomain(self):
  215         self.__banManager = BanManager()
  216 
  217         # non-existing IP
  218         ticket = BanTicket("0.0.0.0", 1167605999.0)
  219         self.assertTrue(self.__banManager.addBanTicket(ticket))
  220         cymru_info = self._getBanListExtendedCymruInfo()
  221         self.assertDictEqual(cymru_info,
  222                           {"asn": ["nxdomain"],
  223                            "country": ["nxdomain"],
  224                            "rir": ["nxdomain"]})
  225 
  226         # Since it outputs for all active tickets we would get previous results
  227         # and new ones
  228         ticket = BanTicket("8.0.0.0", 1167606000.0)
  229         self.assertTrue(self.__banManager.addBanTicket(ticket))
  230         cymru_info = self._getBanListExtendedCymruInfo()
  231         self.assertSortedEqual(cymru_info,
  232                           {"asn": ["nxdomain", "3356",],
  233                            "country": ["nxdomain", "US"],
  234                            "rir": ["nxdomain", "arin"]}, level=-1, key=str)