"Fossies" - the Fresh Open Source Software Archive

Member "fail2ban-0.11.1/fail2ban/tests/failmanagertestcase.py" (11 Jan 2020, 8978 Bytes) of package /linux/misc/fail2ban-0.11.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "failmanagertestcase.py": 0.10.5_vs_0.11.1.

    1 # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
    2 # vi: set ft=python sts=4 ts=4 sw=4 noet :
    3 
    4 # This file is part of Fail2Ban.
    5 #
    6 # Fail2Ban is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # Fail2Ban is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with Fail2Ban; if not, write to the Free Software
   18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   19 
   20 # Author: Cyril Jaquier
   21 # 
   22 
   23 __author__ = "Cyril Jaquier"
   24 __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
   25 __license__ = "GPL"
   26 
   27 import unittest
   28 
   29 from ..server import failmanager
   30 from ..server.failmanager import FailManager, FailManagerEmpty
   31 from ..server.ipdns import IPAddr
   32 from ..server.ticket import FailTicket
   33 
   34 
   35 class AddFailure(unittest.TestCase):
   36 
   37     def setUp(self):
   38         """Call before every test case."""
   39         super(AddFailure, self).setUp()
   40         self.__items = None
   41         self.__failManager = FailManager()
   42 
   43     def tearDown(self):
   44         """Call after every test case."""
   45         super(AddFailure, self).tearDown()
   46         
   47     def _addDefItems(self):
   48         self.__items = [[u'193.168.0.128', 1167605999.0],
   49                         [u'193.168.0.128', 1167605999.0],
   50                         [u'193.168.0.128', 1167605999.0],
   51                         [u'193.168.0.128', 1167605999.0],
   52                         [u'193.168.0.128', 1167605999.0],
   53                         ['87.142.124.10', 1167605999.0],
   54                         ['87.142.124.10', 1167605999.0],
   55                         ['87.142.124.10', 1167605999.0],
   56                         ['100.100.10.10', 1000000000.0],
   57                         ['100.100.10.10', 1000000500.0],
   58                         ['100.100.10.10', 1000001000.0],
   59                         ['100.100.10.10', 1000001500.0],
   60                         ['100.100.10.10', 1000002000.0]]
   61         for i in self.__items:
   62             self.__failManager.addFailure(FailTicket(i[0], i[1]))
   63 
   64     def testFailManagerAdd(self):
   65         self._addDefItems()
   66         self.assertEqual(self.__failManager.size(), 3)
   67         self.assertEqual(self.__failManager.getFailTotal(), 13)
   68         self.__failManager.setFailTotal(0)
   69         self.assertEqual(self.__failManager.getFailTotal(), 0)
   70         self.__failManager.setFailTotal(13)
   71     
   72     def testFailManagerAdd_MaxMatches(self):
   73         maxMatches = 2
   74         self.__failManager.maxMatches = maxMatches
   75         failures = ["abc\n", "123\n", "ABC\n", "1234\n"]
   76         # add failures sequential:
   77         i = 80
   78         for f in failures:
   79             i -= 10
   80             ticket = FailTicket("127.0.0.1", 1000002000 - i, [f])
   81             ticket.setAttempt(1)
   82             self.__failManager.addFailure(ticket)
   83         #
   84         manFailList = self.__failManager._FailManager__failList
   85         self.assertEqual(len(manFailList), 1)
   86         ticket = manFailList["127.0.0.1"]
   87         # should retrieve 2 matches only, but count of all attempts (4):
   88         self.assertEqual(ticket.getAttempt(), len(failures))
   89         self.assertEqual(len(ticket.getMatches()), maxMatches)
   90         self.assertEqual(ticket.getMatches(), failures[len(failures) - maxMatches:])
   91     # add more failures at once:
   92         ticket = FailTicket("127.0.0.1", 1000002000 - 10, failures)
   93         ticket.setAttempt(len(failures))
   94         self.__failManager.addFailure(ticket)
   95         #
   96         manFailList = self.__failManager._FailManager__failList
   97         self.assertEqual(len(manFailList), 1)
   98         ticket = manFailList["127.0.0.1"]
   99         # should retrieve 2 matches only, but count of all attempts (8):
  100         self.assertEqual(ticket.getAttempt(), 2 * len(failures))
  101         self.assertEqual(len(ticket.getMatches()), maxMatches)
  102         self.assertEqual(ticket.getMatches(), failures[len(failures) - maxMatches:])
  103         # add self ticket again:
  104         self.__failManager.addFailure(ticket)
  105         #
  106         manFailList = self.__failManager._FailManager__failList
  107         self.assertEqual(len(manFailList), 1)
  108         ticket = manFailList["127.0.0.1"]
  109         # same matches, but +1 attempt (9)
  110         self.assertEqual(ticket.getAttempt(), 2 * len(failures) + 1)
  111         self.assertEqual(len(ticket.getMatches()), maxMatches)
  112         self.assertEqual(ticket.getMatches(), failures[len(failures) - maxMatches:])
  113         # no matches by maxMatches == 0 :
  114         self.__failManager.maxMatches = 0
  115         self.__failManager.addFailure(ticket)
  116         manFailList = self.__failManager._FailManager__failList
  117         ticket = manFailList["127.0.0.1"]
  118         self.assertEqual(len(ticket.getMatches()), 0)
  119         # test set matches None to None:
  120         ticket.setMatches(None)
  121     
  122     def testFailManagerMaxTime(self):
  123         self._addDefItems()
  124         self.assertEqual(self.__failManager.getMaxTime(), 600)
  125         self.__failManager.setMaxTime(13)
  126         self.assertEqual(self.__failManager.getMaxTime(), 13)
  127         self.__failManager.setMaxTime(600)
  128 
  129     def testDel(self):
  130         self._addDefItems()
  131         self.__failManager.delFailure('193.168.0.128')
  132         self.__failManager.delFailure('111.111.1.111')
  133         
  134         self.assertEqual(self.__failManager.size(), 2)
  135         
  136     def testCleanupOK(self):
  137         self._addDefItems()
  138         timestamp = 1167606999.0
  139         self.__failManager.cleanup(timestamp)
  140         self.assertEqual(self.__failManager.size(), 0)
  141         
  142     def testCleanupNOK(self):
  143         self._addDefItems()
  144         timestamp = 1167605990.0
  145         self.__failManager.cleanup(timestamp)
  146         self.assertEqual(self.__failManager.size(), 2)
  147     
  148     def testbanOK(self):
  149         self._addDefItems()
  150         self.__failManager.setMaxRetry(5)
  151         #ticket = FailTicket('193.168.0.128', None)
  152         ticket = self.__failManager.toBan()
  153         self.assertEqual(ticket.getIP(), "193.168.0.128")
  154         self.assertTrue(isinstance(ticket.getIP(), (str, IPAddr)))
  155 
  156         # finish with rudimentary tests of the ticket
  157         # verify consistent str
  158         ticket_str = str(ticket)
  159         ticket_repr = repr(ticket)
  160         self.assertEqual(
  161             ticket_str,
  162             'FailTicket: ip=193.168.0.128 time=1167605999.0 bantime=None bancount=0 #attempts=5 matches=[]')
  163         self.assertEqual(
  164             ticket_repr,
  165             'FailTicket: ip=193.168.0.128 time=1167605999.0 bantime=None bancount=0 #attempts=5 matches=[]')
  166         self.assertFalse(not ticket)
  167         # and some get/set-ers otherwise not tested
  168         ticket.setTime(1000002000.0)
  169         self.assertEqual(ticket.getTime(), 1000002000.0)
  170         # and str() adjusted correspondingly
  171         self.assertEqual(
  172             str(ticket),
  173             'FailTicket: ip=193.168.0.128 time=1000002000.0 bantime=None bancount=0 #attempts=5 matches=[]')
  174     
  175     def testbanNOK(self):
  176         self._addDefItems()
  177         self.__failManager.setMaxRetry(10)
  178         self.assertRaises(FailManagerEmpty, self.__failManager.toBan)
  179 
  180     def testWindow(self):
  181         self._addDefItems()
  182         ticket = self.__failManager.toBan()
  183         self.assertNotEqual(ticket.getIP(), "100.100.10.10")
  184         ticket = self.__failManager.toBan()
  185         self.assertNotEqual(ticket.getIP(), "100.100.10.10")
  186         self.assertRaises(FailManagerEmpty, self.__failManager.toBan)
  187 
  188     def testBgService(self):
  189         bgSvc = self.__failManager._FailManager__bgSvc
  190         failManager2nd = FailManager()
  191         # test singleton (same object):
  192         bgSvc2 = failManager2nd._FailManager__bgSvc
  193         self.assertTrue(id(bgSvc) == id(bgSvc2))
  194         bgSvc2 = None
  195         # test service :
  196         self.assertTrue(bgSvc.service(True, True))
  197         self.assertFalse(bgSvc.service())
  198         # bypass threshold and time:
  199         for i in range(1, bgSvc._BgService__threshold):
  200             self.assertFalse(bgSvc.service())
  201         # bypass time check:
  202         bgSvc._BgService__serviceTime = -0x7fffffff
  203         self.assertTrue(bgSvc.service())
  204         # bypass threshold and time:
  205         bgSvc._BgService__serviceTime = -0x7fffffff
  206         for i in range(1, bgSvc._BgService__threshold):
  207             self.assertFalse(bgSvc.service())
  208         self.assertTrue(bgSvc.service(False, True))
  209         self.assertFalse(bgSvc.service(False, True))
  210 
  211 
  212 class FailmanagerComplex(unittest.TestCase):
  213 
  214     def setUp(self):
  215         """Call before every test case."""
  216         super(FailmanagerComplex, self).setUp()
  217         self.__failManager = FailManager()
  218         # down logging level for all this tests, because of extremely large failure count (several GB on heavydebug)
  219         self.__saved_ll = failmanager.logLevel
  220         failmanager.logLevel = 3
  221 
  222     def tearDown(self):
  223         super(FailmanagerComplex, self).tearDown()
  224         # restore level
  225         failmanager.logLevel = self.__saved_ll
  226 
  227     @staticmethod
  228     def _ip_range(maxips):
  229         class _ip(list):
  230             def __str__(self):
  231                 return '.'.join(map(str, self))
  232             def __repr__(self):
  233                 return str(self)
  234             def __key__(self):
  235                 return str(self)
  236             def __hash__(self):
  237                 #return (int)(struct.unpack('I', struct.pack("BBBB",*self))[0])
  238                 return (int)(self[0] << 24 | self[1] << 16 | self[2] << 8 | self[3])
  239         i = 0
  240         c = [127,0,0,0]
  241         while i < maxips:
  242             for n in range(3,0,-1):
  243                 if c[n] < 255:
  244                     c[n] += 1
  245                     break
  246                 c[n] = 0
  247             yield (i, _ip(c))
  248             i += 1
  249 
  250     def testCheckIPGenerator(self):
  251         for i, ip in self._ip_range(65536 if not unittest.F2B.fast else 1000):
  252             if i == 254:
  253                 self.assertEqual(str(ip), '127.0.0.255')
  254             elif i == 255:
  255                 self.assertEqual(str(ip), '127.0.1.0')
  256             elif i == 1000:
  257                 self.assertEqual(str(ip), '127.0.3.233')
  258             elif i == 65534:
  259                 self.assertEqual(str(ip), '127.0.255.255')
  260             elif i == 65535:
  261                 self.assertEqual(str(ip), '127.1.0.0')
  262