"Fossies" - the Fresh Open Source Software Archive

Member "fail2ban-0.11.1/fail2ban/tests/filtertestcase.py" (11 Jan 2020, 78369 Bytes) of package /linux/misc/fail2ban-0.11.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "filtertestcase.py": 0.10.5_vs_0.11.1.

    1 # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
    2 # vi: set ft=python sts=4 ts=4 sw=4 noet :
    3 
    4 # This file is part of Fail2Ban.
    5 #
    6 # Fail2Ban is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # Fail2Ban is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with Fail2Ban; if not, write to the Free Software
   18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   19 
   20 # Fail2Ban developers
   21 
   22 __copyright__ = "Copyright (c) 2004 Cyril Jaquier; 2012 Yaroslav Halchenko"
   23 __license__ = "GPL"
   24 
   25 from __builtin__ import open as fopen
   26 import unittest
   27 import os
   28 import re
   29 import sys
   30 import time, datetime
   31 import tempfile
   32 import uuid
   33 
   34 try:
   35     from systemd import journal
   36 except ImportError:
   37     journal = None
   38 
   39 from ..server.jail import Jail
   40 from ..server.filterpoll import FilterPoll
   41 from ..server.filter import FailTicket, Filter, FileFilter, FileContainer
   42 from ..server.failmanager import FailManagerEmpty
   43 from ..server.ipdns import asip, getfqdn, DNSUtils, IPAddr
   44 from ..server.mytime import MyTime
   45 from ..server.utils import Utils, uni_decode
   46 from .utils import setUpMyTime, tearDownMyTime, mtimesleep, with_tmpdir, LogCaptureTestCase, \
   47     logSys as DefLogSys, CONFIG_DIR as STOCK_CONF_DIR
   48 from .dummyjail import DummyJail
   49 
   50 TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
   51 
   52 
   53 # yoh: per Steven Hiscocks's insight while troubleshooting
   54 # https://github.com/fail2ban/fail2ban/issues/103#issuecomment-15542836
   55 # adding a sufficiently large buffer might help to guarantee that
   56 # writes happen atomically.
   57 def open(*args):
   58     """Overload built in open so we could assure sufficiently large buffer
   59 
   60     Explicit .flush would be needed to assure that changes leave the buffer
   61     """
   62     if len(args) == 2:
   63         # ~50kB buffer should be sufficient for all tests here.
   64         args = args + (50000,)
   65     if sys.version_info >= (3,):
   66         return fopen(*args, **{'encoding': 'utf-8', 'errors': 'ignore'})
   67     else:
   68         return fopen(*args)
   69 
   70 
   71 def _killfile(f, name):
   72     try:
   73         f.close()
   74     except:
   75         pass
   76     try:
   77         os.unlink(name)
   78     except:
   79         pass
   80 
   81     # there might as well be the .bak file
   82     if os.path.exists(name + '.bak'):
   83         _killfile(None, name + '.bak')
   84 
   85 
   86 _maxWaitTime = unittest.F2B.maxWaitTime
   87 
   88 
   89 class _tmSerial():
   90     _last_s = -0x7fffffff
   91     _last_m = -0x7fffffff
   92     _str_s = ""
   93     _str_m = ""
   94     @staticmethod
   95     def _tm(time):
   96         # ## strftime it too slow for large time serializer :
   97         # return MyTime.time2str(time)
   98         c = _tmSerial
   99         sec = (time % 60)
  100         if c._last_s == time - sec:
  101             return "%s%02u" % (c._str_s, sec)
  102         mt = (time % 3600)
  103         if c._last_m == time - mt:
  104             c._last_s = time - sec
  105             c._str_s = "%s%02u:" % (c._str_m, mt // 60)
  106             return "%s%02u" % (c._str_s, sec)
  107         c._last_m = time - mt
  108         c._str_m = datetime.datetime.fromtimestamp(time).strftime("%Y-%m-%d %H:")
  109         c._last_s = time - sec
  110         c._str_s = "%s%02u:" % (c._str_m, mt // 60)
  111         return "%s%02u" % (c._str_s, sec)
  112 
  113 _tm = _tmSerial._tm
  114 
  115 
  116 def _assert_equal_entries(utest, found, output, count=None):
  117     """Little helper to unify comparisons with the target entries
  118 
  119     and report helpful failure reports instead of millions of seconds ;)
  120     """
  121     utest.assertEqual(found[0], output[0])            # IP
  122     utest.assertEqual(found[1], count or output[1])   # count
  123     found_time, output_time = \
  124                 MyTime.localtime(found[2]),\
  125                 MyTime.localtime(output[2])
  126     try:
  127         utest.assertEqual(found_time, output_time)
  128     except AssertionError as e:
  129         # assert more structured:
  130         utest.assertEqual((float(found[2]), found_time), (float(output[2]), output_time))
  131     if len(output) > 3 and count is None: # match matches
  132         # do not check if custom count (e.g. going through them twice)
  133         if os.linesep != '\n' or sys.platform.startswith('cygwin'):
  134             # on those where text file lines end with '\r\n', remove '\r'
  135             srepr = lambda x: repr(x).replace(r'\r', '')
  136         else:
  137             srepr = repr
  138         utest.assertEqual(srepr(found[3]), srepr(output[3]))
  139 
  140 
  141 def _ticket_tuple(ticket):
  142     """Create a tuple for easy comparison from fail ticket
  143     """
  144     attempts = ticket.getAttempt()
  145     date = ticket.getTime()
  146     ip = ticket.getIP()
  147     matches = ticket.getMatches()
  148     return (ip, attempts, date, matches)
  149 
  150 
  151 def _assert_correct_last_attempt(utest, filter_, output, count=None):
  152     """Additional helper to wrap most common test case
  153 
  154     Test filter to contain target ticket
  155     """
  156     # one or multiple tickets:
  157     if not isinstance(output[0], (tuple,list)):
  158         tickcount = 1
  159         failcount = (count if count else output[1])
  160     else:
  161         tickcount = len(output)
  162         failcount = (count if count else sum((o[1] for o in output)))
  163 
  164     found = []
  165     if isinstance(filter_, DummyJail):
  166         # get fail ticket from jail
  167         found.append(_ticket_tuple(filter_.getFailTicket()))
  168     else:
  169         # when we are testing without jails
  170         # wait for failures (up to max time)
  171         Utils.wait_for(
  172             lambda: filter_.failManager.getFailCount() >= (tickcount, failcount),
  173             _maxWaitTime(10))
  174         # get fail ticket(s) from filter
  175         while tickcount:
  176             try:
  177                 found.append(_ticket_tuple(filter_.failManager.toBan()))
  178             except FailManagerEmpty:
  179                 break
  180             tickcount -= 1
  181 
  182     if not isinstance(output[0], (tuple,list)):
  183         utest.assertEqual(len(found), 1)
  184         _assert_equal_entries(utest, found[0], output, count)
  185     else:
  186         # sort by string representation of ip (multiple failures with different ips):
  187         found = sorted(found, key=lambda x: str(x))
  188         output = sorted(output, key=lambda x: str(x))
  189         for f, o in zip(found, output):
  190             _assert_equal_entries(utest, f, o)
  191 
  192 
  193 def _copy_lines_between_files(in_, fout, n=None, skip=0, mode='a', terminal_line=""):
  194     """Copy lines from one file to another (which might be already open)
  195 
  196     Returns open fout
  197     """
  198     # on old Python st_mtime is int, so we should give at least 1 sec so
  199     # polling filter could detect the change
  200     mtimesleep()
  201     if isinstance(in_, str): # pragma: no branch - only used with str in test cases
  202         fin = open(in_, 'r')
  203     else:
  204         fin = in_
  205     # Skip
  206     for i in xrange(skip):
  207         fin.readline()
  208     # Read
  209     i = 0
  210     lines = []
  211     while n is None or i < n:
  212         l = fin.readline()
  213         if terminal_line is not None and l == terminal_line:
  214             break
  215         lines.append(l)
  216         i += 1
  217     # Write: all at once and flush
  218     if isinstance(fout, str):
  219         fout = open(fout, mode)
  220     fout.write('\n'.join(lines))
  221     fout.flush()
  222     if isinstance(in_, str): # pragma: no branch - only used with str in test cases
  223         # Opened earlier, therefore must close it
  224         fin.close()
  225     # to give other threads possibly some time to crunch
  226     time.sleep(Utils.DEFAULT_SHORT_INTERVAL)
  227     return fout
  228 
  229 
  230 TEST_JOURNAL_FIELDS = {
  231   "SYSLOG_IDENTIFIER": "fail2ban-testcases",
  232     "PRIORITY": "7",
  233 }
  234 def _copy_lines_to_journal(in_, fields={},n=None, skip=0, terminal_line=""): # pragma: systemd no cover
  235     """Copy lines from one file to systemd journal
  236 
  237     Returns None
  238     """
  239     if isinstance(in_, str): # pragma: no branch - only used with str in test cases
  240         fin = open(in_, 'r')
  241     else:
  242         fin = in_
  243     # Required for filtering
  244     fields.update(TEST_JOURNAL_FIELDS)
  245     # Skip
  246     for i in xrange(skip):
  247         fin.readline()
  248     # Read/Write
  249     i = 0
  250     while n is None or i < n:
  251         l = fin.readline()
  252         if terminal_line is not None and l == terminal_line:
  253             break
  254         journal.send(MESSAGE=l.strip(), **fields)
  255         i += 1
  256     if isinstance(in_, str): # pragma: no branch - only used with str in test cases
  257         # Opened earlier, therefore must close it
  258         fin.close()
  259 
  260 
  261 #
  262 #  Actual tests
  263 #
  264 
  265 class BasicFilter(unittest.TestCase):
  266 
  267     def setUp(self):
  268         super(BasicFilter, self).setUp()
  269         self.filter = Filter(None)
  270 
  271     def testGetSetUseDNS(self):
  272         # default is warn
  273         self.assertEqual(self.filter.getUseDns(), 'warn')
  274         self.filter.setUseDns(True)
  275         self.assertEqual(self.filter.getUseDns(), 'yes')
  276         self.filter.setUseDns(False)
  277         self.assertEqual(self.filter.getUseDns(), 'no')
  278 
  279     def testGetSetDatePattern(self):
  280         self.assertEqual(self.filter.getDatePattern(),
  281             (None, "Default Detectors"))
  282         self.filter.setDatePattern(r"^%Y-%m-%d-%H%M%S\.%f %z **")
  283         self.assertEqual(self.filter.getDatePattern(),
  284             (r"^%Y-%m-%d-%H%M%S\.%f %z **",
  285             r"^Year-Month-Day-24hourMinuteSecond\.Microseconds Zone offset **"))
  286 
  287     def testGetSetLogTimeZone(self):
  288         self.assertEqual(self.filter.getLogTimeZone(), None)
  289         self.filter.setLogTimeZone('UTC')
  290         self.assertEqual(self.filter.getLogTimeZone(), 'UTC')
  291         self.filter.setLogTimeZone('UTC-0400')
  292         self.assertEqual(self.filter.getLogTimeZone(), 'UTC-0400')
  293         self.filter.setLogTimeZone('UTC+0200')
  294         self.assertEqual(self.filter.getLogTimeZone(), 'UTC+0200')
  295         self.assertRaises(ValueError, self.filter.setLogTimeZone, 'not-a-time-zone')
  296 
  297     def testAssertWrongTime(self):
  298         self.assertRaises(AssertionError, 
  299             lambda: _assert_equal_entries(self, 
  300                 ('1.1.1.1', 1, 1421262060.0), 
  301                 ('1.1.1.1', 1, 1421262059.0), 
  302             1)
  303         )
  304 
  305     def testTest_tm(self):
  306         unittest.F2B.SkipIfFast()
  307         ## test function "_tm" works correct (returns the same as slow strftime):
  308         for i in xrange(1417512352, (1417512352 // 3600 + 3) * 3600):
  309             tm = MyTime.time2str(i)
  310             if _tm(i) != tm: # pragma: no cover - never reachable
  311                 self.assertEqual((_tm(i), i), (tm, i))
  312 
  313     def testWrongCharInTupleLine(self):
  314         ## line tuple has different types (ascii after ascii / unicode):
  315         for a1 in ('', u'', b''):
  316             for a2 in ('2016-09-05T20:18:56', u'2016-09-05T20:18:56', b'2016-09-05T20:18:56'):
  317                 for a3 in (
  318                     'Fail for "g\xc3\xb6ran" from 192.0.2.1', 
  319                     u'Fail for "g\xc3\xb6ran" from 192.0.2.1',
  320                     b'Fail for "g\xc3\xb6ran" from 192.0.2.1'
  321                 ):
  322                     # join should work if all arguments have the same type:
  323                     "".join([uni_decode(v) for v in (a1, a2, a3)])
  324 
  325 
  326 class IgnoreIP(LogCaptureTestCase):
  327 
  328     def setUp(self):
  329         """Call before every test case."""
  330         LogCaptureTestCase.setUp(self)
  331         self.jail = DummyJail()
  332         self.filter = FileFilter(self.jail)
  333         self.filter.ignoreSelf = False
  334 
  335     def testIgnoreSelfIP(self):
  336         ipList = ("127.0.0.1",)
  337         # test ignoreSelf is false:
  338         for ip in ipList:
  339             self.assertFalse(self.filter.inIgnoreIPList(ip))
  340             self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ignoreself rule"))
  341         # test ignoreSelf with true:
  342         self.filter.ignoreSelf = True
  343         self.pruneLog()
  344         for ip in ipList:
  345             self.assertTrue(self.filter.inIgnoreIPList(ip))
  346             self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ignoreself rule"))
  347 
  348     def testIgnoreIPOK(self):
  349         ipList = "127.0.0.1", "192.168.0.1", "255.255.255.255", "99.99.99.99"
  350         for ip in ipList:
  351             self.filter.addIgnoreIP(ip)
  352             self.assertTrue(self.filter.inIgnoreIPList(ip))
  353             self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, "ip"))
  354 
  355     def testIgnoreIPNOK(self):
  356         ipList = "", "999.999.999.999", "abcdef.abcdef", "192.168.0."
  357         for ip in ipList:
  358             self.filter.addIgnoreIP(ip)
  359             self.assertFalse(self.filter.inIgnoreIPList(ip))
  360         if not unittest.F2B.no_network: # pragma: no cover
  361             self.assertLogged(
  362                 'Unable to find a corresponding IP address for 999.999.999.999',
  363                 'Unable to find a corresponding IP address for abcdef.abcdef',
  364                 'Unable to find a corresponding IP address for 192.168.0.', all=True)
  365 
  366     def testIgnoreIPCIDR(self):
  367         self.filter.addIgnoreIP('192.168.1.0/25')
  368         self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0'))
  369         self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1'))
  370         self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127'))
  371         self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128'))
  372         self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255'))
  373         self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255'))
  374 
  375     def testIgnoreIPMask(self):
  376         self.filter.addIgnoreIP('192.168.1.0/255.255.255.128')
  377         self.assertTrue(self.filter.inIgnoreIPList('192.168.1.0'))
  378         self.assertTrue(self.filter.inIgnoreIPList('192.168.1.1'))
  379         self.assertTrue(self.filter.inIgnoreIPList('192.168.1.127'))
  380         self.assertFalse(self.filter.inIgnoreIPList('192.168.1.128'))
  381         self.assertFalse(self.filter.inIgnoreIPList('192.168.1.255'))
  382         self.assertFalse(self.filter.inIgnoreIPList('192.168.0.255'))
  383 
  384     def testWrongIPMask(self):
  385         self.filter.addIgnoreIP('192.168.1.0/255.255.0.0')
  386         self.assertRaises(ValueError, self.filter.addIgnoreIP, '192.168.1.0/255.255.0.128')
  387 
  388     def testIgnoreInProcessLine(self):
  389         setUpMyTime()
  390         try:
  391             self.filter.addIgnoreIP('192.168.1.0/25')
  392             self.filter.addFailRegex('<HOST>')
  393             self.filter.setDatePattern(r'{^LN-BEG}EPOCH')
  394             self.filter.processLineAndAdd('1387203300.222 192.168.1.32')
  395             self.assertLogged('Ignore 192.168.1.32')
  396         finally:
  397             tearDownMyTime()
  398 
  399     def testTimeJump(self):
  400         try:
  401             self.filter.addFailRegex('^<HOST>')
  402             self.filter.setDatePattern(r'{^LN-BEG}%Y-%m-%d %H:%M:%S(?:\s*%Z)?\s')
  403             self.filter.setFindTime(10); # max 10 seconds back
  404             #
  405             self.pruneLog('[phase 1] DST time jump')
  406             # check local time jump (DST hole):
  407             MyTime.setTime(1572137999)
  408             self.filter.processLineAndAdd('2019-10-27 02:59:59 192.0.2.5'); # +1 = 1
  409             MyTime.setTime(1572138000)
  410             self.filter.processLineAndAdd('2019-10-27 02:00:00 192.0.2.5'); # +1 = 2
  411             MyTime.setTime(1572138001)
  412             self.filter.processLineAndAdd('2019-10-27 02:00:01 192.0.2.5'); # +1 = 3
  413             self.assertLogged(
  414                 'Current failures from 1 IPs (IP:count): 192.0.2.5:1', 
  415                 'Current failures from 1 IPs (IP:count): 192.0.2.5:2', 
  416                 'Current failures from 1 IPs (IP:count): 192.0.2.5:3',
  417                 "Total # of detected failures: 3.", all=True, wait=True)
  418             self.assertNotLogged('Ignore line')
  419             #
  420             self.pruneLog('[phase 2] UTC time jump (NTP correction)')
  421             # check time drifting backwards (NTP correction):
  422             MyTime.setTime(1572210000)
  423             self.filter.processLineAndAdd('2019-10-27 22:00:00 CET 192.0.2.6'); # +1 = 1
  424             MyTime.setTime(1572200000)
  425             self.filter.processLineAndAdd('2019-10-27 22:00:01 CET 192.0.2.6'); # +1 = 2 (logged before correction)
  426             self.filter.processLineAndAdd('2019-10-27 19:13:20 CET 192.0.2.6'); # +1 = 3 (logged after correction)
  427             self.filter.processLineAndAdd('2019-10-27 19:13:21 CET 192.0.2.6'); # +1 = 4
  428             self.assertLogged(
  429                 '192.0.2.6:1', '192.0.2.6:2', '192.0.2.6:3', '192.0.2.6:4', 
  430                 "Total # of detected failures: 7.", all=True, wait=True)
  431             self.assertNotLogged('Ignore line')
  432         finally:
  433             tearDownMyTime()
  434 
  435     def testAddAttempt(self):
  436         self.filter.setMaxRetry(3)
  437         for i in xrange(1, 1+3):
  438             self.filter.addAttempt('192.0.2.1')
  439             self.assertLogged('Attempt 192.0.2.1', '192.0.2.1:%d' % i, all=True, wait=True)
  440         self.jail.actions._Actions__checkBan()
  441         self.assertLogged('Ban 192.0.2.1', wait=True)
  442 
  443     def testIgnoreCommand(self):
  444         self.filter.ignoreCommand = sys.executable + ' ' + os.path.join(TEST_FILES_DIR, "ignorecommand.py <ip>")
  445         self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1"))
  446         self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0"))
  447         self.assertLogged("returned successfully 0", "returned successfully 1", all=True)
  448         self.pruneLog()
  449         self.assertFalse(self.filter.inIgnoreIPList(""))
  450         self.assertLogged("usage: ignorecommand IP", "returned 10", all=True)
  451     
  452     def testIgnoreCommandForTicket(self):
  453         # by host of IP (2001:db8::1 and 2001:db8::ffff map to "test-host" and "test-other" in the test-suite):
  454         self.filter.ignoreCommand = 'if [ "<ip-host>" = "test-host" ]; then exit 0; fi; exit 1'
  455         self.pruneLog()
  456         self.assertTrue(self.filter.inIgnoreIPList(FailTicket("2001:db8::1")))
  457         self.assertLogged("returned successfully 0")
  458         self.pruneLog()
  459         self.assertFalse(self.filter.inIgnoreIPList(FailTicket("2001:db8::ffff")))
  460         self.assertLogged("returned successfully 1")
  461         # by user-name (ignore tester):
  462         self.filter.ignoreCommand = 'if [ "<F-USER>" = "tester" ]; then exit 0; fi; exit 1'
  463         self.pruneLog()
  464         self.assertTrue(self.filter.inIgnoreIPList(FailTicket("tester", data={'user': 'tester'})))
  465         self.assertLogged("returned successfully 0")
  466         self.pruneLog()
  467         self.assertFalse(self.filter.inIgnoreIPList(FailTicket("root", data={'user': 'root'})))
  468         self.assertLogged("returned successfully 1", all=True)
  469 
  470     def testIgnoreCache(self):
  471         # like both test-cases above, just cached (so once per key)...
  472         self.filter.ignoreCache = {"key":"<ip>"}
  473         self.filter.ignoreCommand = 'if [ "<ip>" = "10.0.0.1" ]; then exit 0; fi; exit 1'
  474         for i in xrange(5):
  475             self.pruneLog()
  476             self.assertTrue(self.filter.inIgnoreIPList("10.0.0.1"))
  477             self.assertFalse(self.filter.inIgnoreIPList("10.0.0.0"))
  478             if not i:
  479                 self.assertLogged("returned successfully 0", "returned successfully 1", all=True)
  480             else:
  481                 self.assertNotLogged("returned successfully 0", "returned successfully 1", all=True)
  482         # by host of IP:
  483         self.filter.ignoreCache = {"key":"<ip-host>"}
  484         self.filter.ignoreCommand = 'if [ "<ip-host>" = "test-host" ]; then exit 0; fi; exit 1'
  485         for i in xrange(5):
  486             self.pruneLog()
  487             self.assertTrue(self.filter.inIgnoreIPList(FailTicket("2001:db8::1")))
  488             self.assertFalse(self.filter.inIgnoreIPList(FailTicket("2001:db8::ffff")))
  489             if not i:
  490                 self.assertLogged("returned successfully")
  491             else:
  492                 self.assertNotLogged("returned successfully")
  493         # by user-name:
  494         self.filter.ignoreCache = {"key":"<F-USER>", "max-count":"10", "max-time":"1h"}
  495         self.assertEqual(self.filter.ignoreCache, ["<F-USER>", 10, 60*60])
  496         self.filter.ignoreCommand = 'if [ "<F-USER>" = "tester" ]; then exit 0; fi; exit 1'
  497         for i in xrange(5):
  498             self.pruneLog()
  499             self.assertTrue(self.filter.inIgnoreIPList(FailTicket("tester", data={'user': 'tester'})))
  500             self.assertFalse(self.filter.inIgnoreIPList(FailTicket("root", data={'user': 'root'})))
  501             if not i:
  502                 self.assertLogged("returned successfully")
  503             else:
  504                 self.assertNotLogged("returned successfully")
  505 
  506     def testIgnoreCauseOK(self):
  507         ip = "93.184.216.34"
  508         for ignore_source in ["dns", "ip", "command"]:
  509             self.filter.logIgnoreIp(ip, True, ignore_source=ignore_source)
  510             self.assertLogged("[%s] Ignore %s by %s" % (self.jail.name, ip, ignore_source))
  511 
  512     def testIgnoreCauseNOK(self):
  513         self.filter.logIgnoreIp("example.com", False, ignore_source="NOT_LOGGED")
  514         self.assertNotLogged("[%s] Ignore %s by %s" % (self.jail.name, "example.com", "NOT_LOGGED"))
  515 
  516 
  517 class IgnoreIPDNS(LogCaptureTestCase):
  518 
  519     def setUp(self):
  520         """Call before every test case."""
  521         unittest.F2B.SkipIfNoNetwork()
  522         LogCaptureTestCase.setUp(self)
  523         self.jail = DummyJail()
  524         self.filter = FileFilter(self.jail)
  525 
  526     def testIgnoreIPDNS(self):
  527         for dns in ("www.epfl.ch", "example.com"):
  528             self.filter.addIgnoreIP(dns)
  529             ips = DNSUtils.dnsToIp(dns)
  530             self.assertTrue(len(ips) > 0)
  531             # for each ip from dns check ip ignored:
  532             for ip in ips:
  533                 ip = str(ip)
  534                 DefLogSys.debug('  ++ positive case for %s', ip)
  535                 self.assertTrue(self.filter.inIgnoreIPList(ip))
  536                 # check another ips (with increment/decrement of first/last part) not ignored:
  537                 iparr = []
  538                 ip2 = re.search(r'^([^.:]+)([.:])(.*?)([.:])([^.:]+)$', ip)
  539                 if ip2:
  540                     ip2 = ip2.groups()
  541                     for o in (0, 4):
  542                         for i in (1, -1):
  543                             ipo = list(ip2)
  544                             if ipo[1] == '.':
  545                                 ipo[o] = str(int(ipo[o])+i)
  546                             else:
  547                                 ipo[o] = '%x' % (int(ipo[o], 16)+i)
  548                             ipo = ''.join(ipo)
  549                             if ipo not in ips:
  550                                 iparr.append(ipo)
  551                 self.assertTrue(len(iparr) > 0)
  552                 for ip in iparr:
  553                     DefLogSys.debug('  -- negative case for %s', ip)
  554                     self.assertFalse(self.filter.inIgnoreIPList(str(ip)))
  555 
  556     def testIgnoreCmdApacheFakegooglebot(self):
  557         unittest.F2B.SkipIfCfgMissing(stock=True)
  558         cmd = os.path.join(STOCK_CONF_DIR, "filter.d/ignorecommands/apache-fakegooglebot")
  559         ## below test direct as python module:
  560         mod = Utils.load_python_module(cmd)
  561         self.assertFalse(mod.is_googlebot(mod.process_args([cmd, "128.178.222.69"])))
  562         self.assertFalse(mod.is_googlebot(mod.process_args([cmd, "192.0.2.1"])))
  563         bot_ips = ['66.249.66.1']
  564         for ip in bot_ips:
  565             self.assertTrue(mod.is_googlebot(mod.process_args([cmd, str(ip)])), "test of googlebot ip %s failed" % ip)
  566         self.assertRaises(ValueError, lambda: mod.is_googlebot(mod.process_args([cmd])))
  567         self.assertRaises(ValueError, lambda: mod.is_googlebot(mod.process_args([cmd, "192.0"])))
  568         ## via command:
  569         self.filter.ignoreCommand = cmd + " <ip>"
  570         for ip in bot_ips:
  571             self.assertTrue(self.filter.inIgnoreIPList(str(ip)), "test of googlebot ip %s failed" % ip)
  572             self.assertLogged('-- returned successfully')
  573             self.pruneLog()
  574         self.assertFalse(self.filter.inIgnoreIPList("192.0"))
  575         self.assertLogged('Argument must be a single valid IP.')
  576         self.pruneLog()
  577         self.filter.ignoreCommand = cmd + " bad arguments <ip>"
  578         self.assertFalse(self.filter.inIgnoreIPList("192.0"))
  579         self.assertLogged('Please provide a single IP as an argument.')
  580 
  581 
  582 
  583 class LogFile(LogCaptureTestCase):
  584 
  585     MISSING = 'testcases/missingLogFile'
  586 
  587     def setUp(self):
  588         LogCaptureTestCase.setUp(self)
  589 
  590     def tearDown(self):
  591         LogCaptureTestCase.tearDown(self)
  592 
  593     def testMissingLogFiles(self):
  594         self.filter = FilterPoll(None)
  595         self.assertRaises(IOError, self.filter.addLogPath, LogFile.MISSING)
  596 
  597 
  598 class LogFileFilterPoll(unittest.TestCase):
  599 
  600     FILENAME = os.path.join(TEST_FILES_DIR, "testcase01.log")
  601 
  602     def setUp(self):
  603         """Call before every test case."""
  604         super(LogFileFilterPoll, self).setUp()
  605         self.filter = FilterPoll(DummyJail())
  606         self.filter.addLogPath(LogFileFilterPoll.FILENAME)
  607 
  608     def tearDown(self):
  609         """Call after every test case."""
  610         super(LogFileFilterPoll, self).tearDown()
  611 
  612     #def testOpen(self):
  613     #   self.filter.openLogFile(LogFile.FILENAME)
  614 
  615     def testIsModified(self):
  616         self.assertTrue(self.filter.isModified(LogFileFilterPoll.FILENAME))
  617         self.assertFalse(self.filter.isModified(LogFileFilterPoll.FILENAME))
  618 
  619     def testSeekToTimeSmallFile(self):
  620         # speedup search using exact date pattern:
  621         self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
  622         fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log')
  623         time = 1417512352
  624         f = open(fname, 'w')
  625         fc = None
  626         try:
  627             fc = FileContainer(fname, self.filter.getLogEncoding())
  628             fc.open()
  629             fc.setPos(0); self.filter.seekToTime(fc, time)
  630             f.flush()
  631             # empty :
  632             fc.setPos(0); self.filter.seekToTime(fc, time)
  633             self.assertEqual(fc.getPos(), 0)
  634             # one entry with exact time:
  635             f.write("%s [sshd] error: PAM: failure len 1\n" % _tm(time))
  636             f.flush()
  637             fc.setPos(0); self.filter.seekToTime(fc, time)
  638 
  639             # rewrite :
  640             f.seek(0)
  641             f.truncate()
  642             fc.close()
  643             fc = FileContainer(fname, self.filter.getLogEncoding())
  644             fc.open()
  645             # no time - nothing should be found :
  646             for i in xrange(10):
  647                 f.write("[sshd] error: PAM: failure len 1\n")
  648                 f.flush()
  649                 fc.setPos(0); self.filter.seekToTime(fc, time)
  650 
  651             # rewrite
  652             f.seek(0)
  653             f.truncate()
  654             fc.close()
  655             fc = FileContainer(fname, self.filter.getLogEncoding())
  656             fc.open()
  657             # one entry with smaller time:
  658             f.write("%s [sshd] error: PAM: failure len 2\n" % _tm(time - 10))
  659             f.flush()
  660             fc.setPos(0); self.filter.seekToTime(fc, time)
  661             self.assertEqual(fc.getPos(), 53)
  662             # two entries with smaller time:
  663             f.write("%s [sshd] error: PAM: failure len 3 2 1\n" % _tm(time - 9))
  664             f.flush()
  665             fc.setPos(0); self.filter.seekToTime(fc, time)
  666             self.assertEqual(fc.getPos(), 110)
  667             # check move after end (all of time smaller):
  668             f.write("%s [sshd] error: PAM: failure\n" % _tm(time - 1))
  669             f.flush()
  670             self.assertEqual(fc.getFileSize(), 157)
  671             fc.setPos(0); self.filter.seekToTime(fc, time)
  672             self.assertEqual(fc.getPos(), 157)
  673 
  674             # stil one exact line:
  675             f.write("%s [sshd] error: PAM: Authentication failure\n" % _tm(time))
  676             f.write("%s [sshd] error: PAM: failure len 1\n" % _tm(time))
  677             f.flush()
  678             fc.setPos(0); self.filter.seekToTime(fc, time)
  679             self.assertEqual(fc.getPos(), 157)
  680 
  681             # add something hereafter:
  682             f.write("%s [sshd] error: PAM: failure len 3 2 1\n" % _tm(time + 2))
  683             f.write("%s [sshd] error: PAM: Authentication failure\n" % _tm(time + 3))
  684             f.flush()
  685             fc.setPos(0); self.filter.seekToTime(fc, time)
  686             self.assertEqual(fc.getPos(), 157)
  687             # add something hereafter:
  688             f.write("%s [sshd] error: PAM: failure\n" % _tm(time + 9))
  689             f.write("%s [sshd] error: PAM: failure len 4 3 2\n" % _tm(time + 9))
  690             f.flush()
  691             fc.setPos(0); self.filter.seekToTime(fc, time)
  692             self.assertEqual(fc.getPos(), 157)
  693             # start search from current pos :
  694             fc.setPos(157); self.filter.seekToTime(fc, time)
  695             self.assertEqual(fc.getPos(), 157)
  696             # start search from current pos :
  697             fc.setPos(110); self.filter.seekToTime(fc, time)
  698             self.assertEqual(fc.getPos(), 157)
  699 
  700         finally:
  701             if fc:
  702                 fc.close()
  703             _killfile(f, fname)
  704 
  705     def testSeekToTimeLargeFile(self):
  706         # speedup search using exact date pattern:
  707         self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
  708         fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='.log')
  709         time = 1417512352
  710         f = open(fname, 'w')
  711         fc = None
  712         count = 1000 if unittest.F2B.fast else 10000
  713         try:
  714             fc = FileContainer(fname, self.filter.getLogEncoding())
  715             fc.open()
  716             f.seek(0)
  717             # variable length of file (ca 45K or 450K before and hereafter):
  718             # write lines with smaller as search time:
  719             t = time - count - 1
  720             for i in xrange(count):
  721                 f.write("%s [sshd] error: PAM: failure\n" % _tm(t))
  722                 t += 1
  723             f.flush()
  724             fc.setPos(0); self.filter.seekToTime(fc, time)
  725             self.assertEqual(fc.getPos(), 47*count)
  726             # write lines with exact search time:
  727             for i in xrange(10):
  728                 f.write("%s [sshd] error: PAM: failure\n" % _tm(time))
  729             f.flush()
  730             fc.setPos(0); self.filter.seekToTime(fc, time)
  731             self.assertEqual(fc.getPos(), 47*count)
  732             fc.setPos(4*count); self.filter.seekToTime(fc, time)
  733             self.assertEqual(fc.getPos(), 47*count)
  734             # write lines with greater as search time:
  735             t = time+1
  736             for i in xrange(count//500):
  737                 for j in xrange(500):
  738                     f.write("%s [sshd] error: PAM: failure\n" % _tm(t))
  739                     t += 1
  740                 f.flush()
  741                 fc.setPos(0); self.filter.seekToTime(fc, time)
  742                 self.assertEqual(fc.getPos(), 47*count)
  743                 fc.setPos(53); self.filter.seekToTime(fc, time)
  744                 self.assertEqual(fc.getPos(), 47*count)
  745         
  746         finally:
  747             if fc:
  748                 fc.close()
  749             _killfile(f, fname)
  750 
  751 class LogFileMonitor(LogCaptureTestCase):
  752     """Few more tests for FilterPoll API
  753     """
  754     def setUp(self):
  755         """Call before every test case."""
  756         setUpMyTime()
  757         LogCaptureTestCase.setUp(self)
  758         self.filter = self.name = 'NA'
  759         _, self.name = tempfile.mkstemp('fail2ban', 'monitorfailures')
  760         self.file = open(self.name, 'a')
  761         self.filter = FilterPoll(DummyJail())
  762         self.filter.addLogPath(self.name, autoSeek=False)
  763         self.filter.active = True
  764         self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
  765 
  766     def tearDown(self):
  767         tearDownMyTime()
  768         LogCaptureTestCase.tearDown(self)
  769         _killfile(self.file, self.name)
  770         pass
  771 
  772     def isModified(self, delay=2):
  773         """Wait up to `delay` sec to assure that it was modified or not
  774         """
  775         return Utils.wait_for(lambda: self.filter.isModified(self.name), _maxWaitTime(delay))
  776 
  777     def notModified(self, delay=2):
  778         """Wait up to `delay` sec as long as it was not modified
  779         """
  780         return Utils.wait_for(lambda: not self.filter.isModified(self.name), _maxWaitTime(delay))
  781 
  782     def testUnaccessibleLogFile(self):
  783         os.chmod(self.name, 0)
  784         self.filter.getFailures(self.name)
  785         failure_was_logged = self._is_logged('Unable to open %s' % self.name)
  786         # verify that we cannot access the file. Checking by name of user is not
  787         # sufficient since could be a fakeroot or some other super-user
  788         is_root = True
  789         try:
  790             with open(self.name) as f: # pragma: no cover - normally no root
  791                 f.read()
  792         except IOError:
  793             is_root = False
  794 
  795         # If ran as root, those restrictive permissions would not
  796         # forbid log to be read.
  797         self.assertTrue(failure_was_logged != is_root)
  798 
  799     def testNoLogFile(self):
  800         _killfile(self.file, self.name)
  801         self.filter.getFailures(self.name)
  802         self.assertLogged('Unable to open %s' % self.name)
  803 
  804     def testErrorProcessLine(self):
  805         # speedup search using exact date pattern:
  806         self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
  807         self.filter.sleeptime /= 1000.0
  808         ## produce error with not callable processLine:
  809         _org_processLine = self.filter.processLine
  810         self.filter.processLine = None
  811         for i in range(100):
  812             self.file.write("line%d\n" % 1)
  813         self.file.flush()
  814         for i in range(100):
  815             self.filter.getFailures(self.name)
  816         self.assertLogged('Failed to process line:')
  817         self.assertLogged('Too many errors at once')
  818         self.pruneLog()
  819         self.assertTrue(self.filter.idle)
  820         self.filter.idle = False
  821         self.filter.getFailures(self.name)
  822         self.filter.processLine = _org_processLine
  823         self.file.write("line%d\n" % 1)
  824         self.file.flush()
  825         self.filter.getFailures(self.name)
  826         self.assertNotLogged('Failed to process line:')
  827 
  828     def testRemovingFailRegex(self):
  829         self.filter.delFailRegex(0)
  830         self.assertNotLogged('Cannot remove regular expression. Index 0 is not valid')
  831         self.filter.delFailRegex(0)
  832         self.assertLogged('Cannot remove regular expression. Index 0 is not valid')
  833 
  834     def testRemovingIgnoreRegex(self):
  835         self.filter.delIgnoreRegex(0)
  836         self.assertLogged('Cannot remove regular expression. Index 0 is not valid')
  837 
  838     def testNewChangeViaIsModified(self):
  839         # it is a brand new one -- so first we think it is modified
  840         self.assertTrue(self.isModified())
  841         # but not any longer
  842         self.assertTrue(self.notModified())
  843         self.assertTrue(self.notModified())
  844         mtimesleep()                # to guarantee freshier mtime
  845         for i in range(4):            # few changes
  846             # unless we write into it
  847             self.file.write("line%d\n" % i)
  848             self.file.flush()
  849             self.assertTrue(self.isModified())
  850             self.assertTrue(self.notModified())
  851             mtimesleep()                # to guarantee freshier mtime
  852         os.rename(self.name, self.name + '.old')
  853         # we are not signaling as modified whenever
  854         # it gets away
  855         self.assertTrue(self.notModified(1))
  856         f = open(self.name, 'a')
  857         self.assertTrue(self.isModified())
  858         self.assertTrue(self.notModified())
  859         mtimesleep()
  860         f.write("line%d\n" % i)
  861         f.flush()
  862         self.assertTrue(self.isModified())
  863         self.assertTrue(self.notModified())
  864         _killfile(f, self.name)
  865         _killfile(self.name, self.name + '.old')
  866         pass
  867 
  868     def testNewChangeViaGetFailures_simple(self):
  869         # speedup search using exact date pattern:
  870         self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
  871         # suck in lines from this sample log file
  872         self.filter.getFailures(self.name)
  873         self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
  874 
  875         # Now let's feed it with entries from the file
  876         _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
  877         self.filter.getFailures(self.name)
  878         self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
  879         # and it should have not been enough
  880 
  881         _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
  882         self.filter.getFailures(self.name)
  883         _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
  884 
  885     def testNewChangeViaGetFailures_rewrite(self):
  886         # speedup search using exact date pattern:
  887         self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
  888         #
  889         # if we rewrite the file at once
  890         self.file.close()
  891         _copy_lines_between_files(GetFailures.FILENAME_01, self.name).close()
  892         self.filter.getFailures(self.name)
  893         _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
  894 
  895         # What if file gets overridden
  896         # yoh: skip so we skip those 2 identical lines which our
  897         # filter "marked" as the known beginning, otherwise it
  898         # would not detect "rotation"
  899         self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
  900                                               skip=3, mode='w')
  901         self.filter.getFailures(self.name)
  902         #self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
  903         _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
  904 
  905     def testNewChangeViaGetFailures_move(self):
  906         # speedup search using exact date pattern:
  907         self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
  908         #
  909         # if we move file into a new location while it has been open already
  910         self.file.close()
  911         self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
  912                                               n=14, mode='w')
  913         self.filter.getFailures(self.name)
  914         self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
  915         self.assertEqual(self.filter.failManager.getFailTotal(), 2)
  916 
  917         # move aside, but leaving the handle still open...
  918         os.rename(self.name, self.name + '.bak')
  919         _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14).close()
  920         self.filter.getFailures(self.name)
  921         _assert_correct_last_attempt(self, self.filter, GetFailures.FAILURES_01)
  922         self.assertEqual(self.filter.failManager.getFailTotal(), 3)
  923 
  924 
  925 class CommonMonitorTestCase(unittest.TestCase):
  926 
  927     def setUp(self):
  928         """Call before every test case."""
  929         super(CommonMonitorTestCase, self).setUp()
  930         self._failTotal = 0
  931 
  932     def waitFailTotal(self, count, delay=1):
  933         """Wait up to `delay` sec to assure that expected failure `count` reached
  934         """
  935         ret = Utils.wait_for(
  936             lambda: self.filter.failManager.getFailTotal() >= (self._failTotal + count) and self.jail.isFilled(),
  937             _maxWaitTime(delay))
  938         self._failTotal += count
  939         return ret
  940 
  941     def isFilled(self, delay=1):
  942         """Wait up to `delay` sec to assure that it was modified or not
  943         """
  944         return Utils.wait_for(self.jail.isFilled, _maxWaitTime(delay))
  945 
  946     def isEmpty(self, delay=5):
  947         """Wait up to `delay` sec to assure that it empty again
  948         """
  949         return Utils.wait_for(self.jail.isEmpty, _maxWaitTime(delay))
  950 
  951     def waitForTicks(self, ticks, delay=2):
  952         """Wait up to `delay` sec to assure that it was modified or not
  953         """
  954         last_ticks = self.filter.ticks
  955         return Utils.wait_for(lambda: self.filter.ticks >= last_ticks + ticks, _maxWaitTime(delay))
  956 
  957 
  958 def get_monitor_failures_testcase(Filter_):
  959     """Generator of TestCase's for different filters/backends
  960     """
  961 
  962     # add Filter_'s name so we could easily identify bad cows
  963     testclass_name = tempfile.mktemp(
  964         'fail2ban', 'monitorfailures_%s_' % (Filter_.__name__,))
  965 
  966     class MonitorFailures(CommonMonitorTestCase):
  967         count = 0
  968 
  969         def setUp(self):
  970             """Call before every test case."""
  971             super(MonitorFailures, self).setUp()
  972             setUpMyTime()
  973             self.filter = self.name = 'NA'
  974             self.name = '%s-%d' % (testclass_name, self.count)
  975             MonitorFailures.count += 1 # so we have unique filenames across tests
  976             self.file = open(self.name, 'a')
  977             self.jail = DummyJail()
  978             self.filter = Filter_(self.jail)
  979             self.filter.addLogPath(self.name, autoSeek=False)
  980             # speedup search using exact date pattern:
  981             self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
  982             self.filter.active = True
  983             self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
  984             self.filter.start()
  985             # If filter is polling it would sleep a bit to guarantee that
  986             # we have initial time-stamp difference to trigger "actions"
  987             self._sleep_4_poll()
  988             #print "D: started filter %s" % self.filter
  989 
  990         def tearDown(self):
  991             tearDownMyTime()
  992             #print "D: SLEEPING A BIT"
  993             #import time; time.sleep(5)
  994             #print "D: TEARING DOWN"
  995             self.filter.stop()
  996             #print "D: WAITING FOR FILTER TO STOP"
  997             self.filter.join()        # wait for the thread to terminate
  998             #print "D: KILLING THE FILE"
  999             _killfile(self.file, self.name)
 1000             #time.sleep(0.2)              # Give FS time to ack the removal
 1001             super(MonitorFailures, self).tearDown()
 1002 
 1003         def _sleep_4_poll(self):
 1004             # Since FilterPoll relies on time stamps and some
 1005             # actions might be happening too fast in the tests,
 1006             # sleep a bit to guarantee reliable time stamps
 1007             if isinstance(self.filter, FilterPoll):
 1008                 Utils.wait_for(self.filter.isAlive, _maxWaitTime(5))
 1009 
 1010         def assert_correct_last_attempt(self, failures, count=None):
 1011             self.assertTrue(self.waitFailTotal(count if count else failures[1], 10))
 1012             _assert_correct_last_attempt(self, self.jail, failures, count=count)
 1013 
 1014         def test_grow_file(self):
 1015             self._test_grow_file()
 1016 
 1017         def test_grow_file_in_idle(self):
 1018             self._test_grow_file(True)
 1019 
 1020         def _test_grow_file(self, idle=False):
 1021             if idle:
 1022                 self.filter.sleeptime /= 100.0
 1023                 self.filter.idle = True
 1024                 self.waitForTicks(1)
 1025             # suck in lines from this sample log file
 1026             self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1027 
 1028             # Now let's feed it with entries from the file
 1029             _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=5)
 1030             self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1031             # and our dummy jail is empty as well
 1032             self.assertFalse(len(self.jail))
 1033             # since it should have not been enough
 1034 
 1035             _copy_lines_between_files(GetFailures.FILENAME_01, self.file, skip=5)
 1036             if idle:
 1037                 self.waitForTicks(1)
 1038                 self.assertTrue(self.isEmpty(1))
 1039                 return
 1040             self.assertTrue(self.isFilled(10))
 1041             # so we sleep a bit for it not to become empty,
 1042             # and meanwhile pass to other thread(s) and filter should
 1043             # have gathered new failures and passed them into the
 1044             # DummyJail
 1045             self.assertEqual(len(self.jail), 1)
 1046             # and there should be no "stuck" ticket in failManager
 1047             self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1048             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1049             self.assertEqual(len(self.jail), 0)
 1050 
 1051             #return
 1052             # just for fun let's copy all of them again and see if that results
 1053             # in a new ban
 1054             _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
 1055             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1056 
 1057         def test_rewrite_file(self):
 1058             # if we rewrite the file at once
 1059             self.file.close()
 1060             _copy_lines_between_files(GetFailures.FILENAME_01, self.name).close()
 1061             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1062 
 1063             # What if file gets overridden
 1064             # yoh: skip so we skip those 2 identical lines which our
 1065             # filter "marked" as the known beginning, otherwise it
 1066             # would not detect "rotation"
 1067             self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 1068                                                   skip=3, mode='w')
 1069             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1070 
 1071         def _wait4failures(self, count=2):
 1072             # Poll might need more time
 1073             self.assertTrue(self.isEmpty(_maxWaitTime(5)),
 1074                             "Queue must be empty but it is not: %s."
 1075                             % (', '.join([str(x) for x in self.jail.queue])))
 1076             self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1077             Utils.wait_for(lambda: self.filter.failManager.getFailTotal() >= count, _maxWaitTime(10))
 1078             self.assertEqual(self.filter.failManager.getFailTotal(), count)
 1079 
 1080         def test_move_file(self):
 1081             # if we move file into a new location while it has been open already
 1082             self.file.close()
 1083             self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 1084                                                   n=14, mode='w')
 1085             self._wait4failures()
 1086 
 1087             # move aside, but leaving the handle still open...
 1088             os.rename(self.name, self.name + '.bak')
 1089             _copy_lines_between_files(GetFailures.FILENAME_01, self.name, skip=14).close()
 1090             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1091             self.assertEqual(self.filter.failManager.getFailTotal(), 3)
 1092 
 1093             # now remove the moved file
 1094             _killfile(None, self.name + '.bak')
 1095             _copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close()
 1096             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1097             self.assertEqual(self.filter.failManager.getFailTotal(), 6)
 1098 
 1099         def test_pyinotify_delWatch(self):
 1100             if hasattr(self.filter, '_delWatch'): # pyinotify only
 1101                 m = self.filter._FilterPyinotify__monitor
 1102                 # remove existing watch:
 1103                 self.assertTrue(self.filter._delWatch(m.get_wd(self.name)))
 1104                 # mockup get_path to allow once find path for invalid wd-value:
 1105                 _org_get_path = m.get_path
 1106                 def _get_path(wd):
 1107                     #m.get_path = _org_get_path
 1108                     return 'test'
 1109                 m.get_path = _get_path
 1110                 # try remove watch using definitely not existing handle:
 1111                 self.assertFalse(self.filter._delWatch(0x7fffffff))
 1112                 m.get_path = _org_get_path
 1113 
 1114         def test_del_file(self):
 1115             # test filter reaction by delete watching file:
 1116             self.file.close()
 1117             self.waitForTicks(1)
 1118             # remove file (cause detection of log-rotation)...
 1119             os.unlink(self.name)
 1120             # check it was detected (in pending files):
 1121             self.waitForTicks(2)
 1122             if hasattr(self.filter, "getPendingPaths"):
 1123                 self.assertTrue(Utils.wait_for(lambda: self.name in self.filter.getPendingPaths(), _maxWaitTime(10)))
 1124                 self.assertEqual(len(self.filter.getPendingPaths()), 1)
 1125 
 1126         @with_tmpdir
 1127         def test_move_dir(self, tmp):
 1128             self.file.close()
 1129             self.filter.setMaxRetry(10)
 1130             self.filter.delLogPath(self.name)
 1131             _killfile(None, self.name)
 1132             # if we rename parent dir into a new location (simulate directory-base log rotation)
 1133             tmpsub1 = os.path.join(tmp, "1")
 1134             tmpsub2 = os.path.join(tmp, "2")
 1135             os.mkdir(tmpsub1)
 1136             self.name = os.path.join(tmpsub1, os.path.basename(self.name))
 1137             os.close(os.open(self.name, os.O_CREAT|os.O_APPEND)); # create empty file
 1138             self.filter.addLogPath(self.name, autoSeek=False)
 1139             
 1140             self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 1141                                                   skip=12, n=1, mode='w')
 1142             self.file.close()
 1143             self._wait4failures(1)
 1144 
 1145             # rotate whole directory: rename directory 1 as 2a:
 1146             os.rename(tmpsub1, tmpsub2 + 'a')
 1147             os.mkdir(tmpsub1)
 1148             self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 1149                                                   skip=12, n=1, mode='w')
 1150             self.file.close()
 1151             self._wait4failures(2)
 1152 
 1153             # rotate whole directory: rename directory 1 as 2b:
 1154             os.rename(tmpsub1, tmpsub2 + 'b')
 1155             # wait a bit in-between (try to increase coverage, should find pending file for pending dir):
 1156             self.waitForTicks(2)
 1157             os.mkdir(tmpsub1)
 1158             self.waitForTicks(2)
 1159             self.file = _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 1160                                                   skip=12, n=1, mode='w')
 1161             self.file.close()
 1162             self._wait4failures(3)
 1163 
 1164             # stop before tmpdir deleted (just prevents many monitor events)
 1165             self.filter.stop()
 1166             self.filter.join()
 1167 
 1168 
 1169         def _test_move_into_file(self, interim_kill=False):
 1170             # if we move a new file into the location of an old (monitored) file
 1171             _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 1172                                       n=100).close()
 1173             # make sure that it is monitored first
 1174             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1175             self.assertEqual(self.filter.failManager.getFailTotal(), 3)
 1176 
 1177             if interim_kill:
 1178                 _killfile(None, self.name)
 1179                 time.sleep(Utils.DEFAULT_SHORT_INTERVAL)                  # let them know
 1180 
 1181             # now create a new one to override old one
 1182             _copy_lines_between_files(GetFailures.FILENAME_01, self.name + '.new',
 1183                                       n=100).close()
 1184             os.rename(self.name + '.new', self.name)
 1185             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1186             self.assertEqual(self.filter.failManager.getFailTotal(), 6)
 1187 
 1188             # and to make sure that it now monitored for changes
 1189             _copy_lines_between_files(GetFailures.FILENAME_01, self.name,
 1190                                       n=100).close()
 1191             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1192             self.assertEqual(self.filter.failManager.getFailTotal(), 9)
 1193 
 1194         def test_move_into_file(self):
 1195             self._test_move_into_file(interim_kill=False)
 1196 
 1197         def test_move_into_file_after_removed(self):
 1198             # exactly as above test + remove file explicitly
 1199             # to test against possible drop-out of the file from monitoring
 1200             self._test_move_into_file(interim_kill=True)
 1201 
 1202         def test_new_bogus_file(self):
 1203             # to make sure that watching whole directory does not effect
 1204             _copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close()
 1205             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1206 
 1207             # create a bogus file in the same directory and see if that doesn't affect
 1208             open(self.name + '.bak2', 'w').close()
 1209             _copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100).close()
 1210             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1211             self.assertEqual(self.filter.failManager.getFailTotal(), 6)
 1212             _killfile(None, self.name + '.bak2')
 1213 
 1214         def test_delLogPath(self):
 1215             # Smoke test for removing of the path from being watched
 1216 
 1217             # basic full test
 1218             _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
 1219             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1220 
 1221             # and now remove the LogPath
 1222             self.filter.delLogPath(self.name)
 1223             # wait a bit for filter (backend-threads):
 1224             self.waitForTicks(2)
 1225 
 1226             _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
 1227             # so we should get no more failures detected
 1228             self.assertTrue(self.isEmpty(10))
 1229 
 1230             # but then if we add it back again (no seek to time in FileFilter's, because in file used the same time)
 1231             self.filter.addLogPath(self.name, autoSeek=False)
 1232             # wait a bit for filter (backend-threads):
 1233             self.waitForTicks(2)
 1234             # Tricky catch here is that it should get them from the
 1235             # tail written before, so let's not copy anything yet
 1236             #_copy_lines_between_files(GetFailures.FILENAME_01, self.name, n=100)
 1237             # we should detect the failures
 1238             self.assert_correct_last_attempt(GetFailures.FAILURES_01, count=6) # was needed if we write twice above
 1239 
 1240             # now copy and get even more
 1241             _copy_lines_between_files(GetFailures.FILENAME_01, self.file, n=100)
 1242       # check for 3 failures (not 9), because 6 already get above...
 1243             self.assert_correct_last_attempt(GetFailures.FAILURES_01)
 1244             # total count in this test:
 1245             self.assertEqual(self.filter.failManager.getFailTotal(), 12)
 1246 
 1247     cls = MonitorFailures
 1248     cls.__qualname__ = cls.__name__ = "MonitorFailures<%s>(%s)" \
 1249               % (Filter_.__name__, testclass_name) # 'tempfile')
 1250     return cls
 1251 
 1252 
 1253 def get_monitor_failures_journal_testcase(Filter_): # pragma: systemd no cover
 1254     """Generator of TestCase's for journal based filters/backends
 1255     """
 1256     
 1257     testclass_name = "monitorjournalfailures_%s" % (Filter_.__name__,)
 1258 
 1259     class MonitorJournalFailures(CommonMonitorTestCase):
 1260         def setUp(self):
 1261             """Call before every test case."""
 1262             super(MonitorJournalFailures, self).setUp()
 1263             self._runtimeJournal = None
 1264             self.test_file = os.path.join(TEST_FILES_DIR, "testcase-journal.log")
 1265             self.jail = DummyJail()
 1266             self.filter = None
 1267             # UUID used to ensure that only meeages generated
 1268             # as part of this test are picked up by the filter
 1269             self.test_uuid = str(uuid.uuid4())
 1270             self.name = "%s-%s" % (testclass_name, self.test_uuid)
 1271             self.journal_fields = {
 1272                 'TEST_FIELD': "1", 'TEST_UUID': self.test_uuid}
 1273 
 1274         def _initFilter(self, **kwargs):
 1275             self._getRuntimeJournal() # check journal available
 1276             self.filter = Filter_(self.jail, **kwargs)
 1277             self.filter.addJournalMatch([
 1278                 "SYSLOG_IDENTIFIER=fail2ban-testcases",
 1279                 "TEST_FIELD=1",
 1280                 "TEST_UUID=%s" % self.test_uuid])
 1281             self.filter.addJournalMatch([
 1282                 "SYSLOG_IDENTIFIER=fail2ban-testcases",
 1283                 "TEST_FIELD=2",
 1284                 "TEST_UUID=%s" % self.test_uuid])
 1285             self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>")
 1286 
 1287         def tearDown(self):
 1288             if self.filter and self.filter.active:
 1289                 self.filter.stop()
 1290                 self.filter.join()        # wait for the thread to terminate
 1291             super(MonitorJournalFailures, self).tearDown()
 1292 
 1293         def _getRuntimeJournal(self):
 1294             """Retrieve current system journal path
 1295 
 1296             If not found, SkipTest exception will be raised.
 1297             """
 1298             # we can cache it:
 1299             if self._runtimeJournal is None:
 1300                 # Depending on the system, it could be found under /run or /var/log (e.g. Debian)
 1301                 # which are pointed by different systemd-path variables.  We will
 1302                 # check one at at time until the first hit
 1303                 for systemd_var in 'system-runtime-logs', 'system-state-logs':
 1304                     tmp = Utils.executeCmd(
 1305                         'find "$(systemd-path %s)" -name system.journal' % systemd_var,
 1306                         timeout=10, shell=True, output=True
 1307                     )
 1308                     self.assertTrue(tmp)
 1309                     out = str(tmp[1].decode('utf-8')).split('\n')[0]
 1310                     if out: break
 1311                 self._runtimeJournal = out
 1312             if self._runtimeJournal:
 1313                 return self._runtimeJournal
 1314             raise unittest.SkipTest('systemd journal seems to be not available (e. g. no rights to read)')
 1315         
 1316         def testJournalFilesArg(self):
 1317             # retrieve current system journal path
 1318             jrnlfile = self._getRuntimeJournal()
 1319             self._initFilter(journalfiles=jrnlfile)
 1320 
 1321         def testJournalFilesAndFlagsArgs(self):
 1322             # retrieve current system journal path
 1323             jrnlfile = self._getRuntimeJournal()
 1324             self._initFilter(journalfiles=jrnlfile, journalflags=0)
 1325 
 1326         def testJournalPathArg(self):
 1327             # retrieve current system journal path
 1328             jrnlpath = self._getRuntimeJournal()
 1329             jrnlpath = os.path.dirname(jrnlpath)
 1330             self._initFilter(journalpath=jrnlpath)
 1331             self.filter.seekToTime(
 1332                 datetime.datetime.now() - datetime.timedelta(days=1)
 1333             )
 1334             self.filter.start()
 1335             self.waitForTicks(2)
 1336             self.assertTrue(self.isEmpty(1))
 1337             self.assertEqual(len(self.jail), 0)
 1338             self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1339 
 1340         def testJournalFlagsArg(self):
 1341             self._initFilter(journalflags=0) # e. g. 2 - journal.RUNTIME_ONLY
 1342 
 1343         def assert_correct_ban(self, test_ip, test_attempts):
 1344             self.assertTrue(self.waitFailTotal(test_attempts, 10)) # give Filter a chance to react
 1345             ticket = self.jail.getFailTicket()
 1346             self.assertTrue(ticket)
 1347 
 1348             attempts = ticket.getAttempt()
 1349             ip = ticket.getIP()
 1350             ticket.getMatches()
 1351 
 1352             self.assertEqual(ip, test_ip)
 1353             self.assertEqual(attempts, test_attempts)
 1354 
 1355         def test_grow_file(self):
 1356             self._test_grow_file()
 1357 
 1358         def test_grow_file_in_idle(self):
 1359             self._test_grow_file(True)
 1360 
 1361         def _test_grow_file(self, idle=False):
 1362             self._initFilter()
 1363             self.filter.start()
 1364             if idle:
 1365                 self.filter.sleeptime /= 100.0
 1366                 self.filter.idle = True
 1367                 self.waitForTicks(1)
 1368             self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1369 
 1370             # Now let's feed it with entries from the file
 1371             _copy_lines_to_journal(
 1372                 self.test_file, self.journal_fields, n=2)
 1373             self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1374             # and our dummy jail is empty as well
 1375             self.assertFalse(len(self.jail))
 1376             # since it should have not been enough
 1377 
 1378             _copy_lines_to_journal(
 1379                 self.test_file, self.journal_fields, skip=2, n=3)
 1380             if idle:
 1381                 self.waitForTicks(1)
 1382                 self.assertTrue(self.isEmpty(1))
 1383                 return
 1384             self.assertTrue(self.isFilled(10))
 1385             # so we sleep for up to 6 sec for it not to become empty,
 1386             # and meanwhile pass to other thread(s) and filter should
 1387             # have gathered new failures and passed them into the
 1388             # DummyJail
 1389             self.assertEqual(len(self.jail), 1)
 1390             # and there should be no "stuck" ticket in failManager
 1391             self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1392             self.assert_correct_ban("193.168.0.128", 3)
 1393             self.assertEqual(len(self.jail), 0)
 1394 
 1395             # Lets read some more to check it bans again
 1396             _copy_lines_to_journal(
 1397                 self.test_file, self.journal_fields, skip=5, n=4)
 1398             self.assert_correct_ban("193.168.0.128", 3)
 1399 
 1400         def test_delJournalMatch(self):
 1401             self._initFilter()
 1402             self.filter.start()
 1403             # Smoke test for removing of match
 1404 
 1405             # basic full test
 1406             _copy_lines_to_journal(
 1407                 self.test_file, self.journal_fields, n=5)
 1408             self.assert_correct_ban("193.168.0.128", 3)
 1409 
 1410             # and now remove the JournalMatch
 1411             self.filter.delJournalMatch([
 1412                 "SYSLOG_IDENTIFIER=fail2ban-testcases",
 1413                 "TEST_FIELD=1",
 1414                 "TEST_UUID=%s" % self.test_uuid])
 1415 
 1416             _copy_lines_to_journal(
 1417                 self.test_file, self.journal_fields, n=5, skip=5)
 1418             # so we should get no more failures detected
 1419             self.assertTrue(self.isEmpty(10))
 1420 
 1421             # but then if we add it back again
 1422             self.filter.addJournalMatch([
 1423                 "SYSLOG_IDENTIFIER=fail2ban-testcases",
 1424                 "TEST_FIELD=1",
 1425                 "TEST_UUID=%s" % self.test_uuid])
 1426             self.assert_correct_ban("193.168.0.128", 4)
 1427             _copy_lines_to_journal(
 1428                 self.test_file, self.journal_fields, n=6, skip=10)
 1429             # we should detect the failures
 1430             self.assertTrue(self.isFilled(10))
 1431 
 1432         def test_WrongChar(self):
 1433             self._initFilter()
 1434             self.filter.start()
 1435             # Now let's feed it with entries from the file
 1436             _copy_lines_to_journal(
 1437                 self.test_file, self.journal_fields, skip=15, n=4)
 1438             self.waitForTicks(1)
 1439             self.assertTrue(self.isFilled(10))
 1440             self.assert_correct_ban("87.142.124.10", 4)
 1441             # Add direct utf, unicode, blob:
 1442             for l in (
 1443             "error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
 1444            u"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1",
 1445            b"error: PAM: Authentication failure for \xe4\xf6\xfc\xdf from 192.0.2.1".decode('utf-8', 'replace'),
 1446             "error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
 1447            u"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2",
 1448            b"error: PAM: Authentication failure for \xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f from 192.0.2.2".decode('utf-8', 'replace')
 1449             ):
 1450                 fields = self.journal_fields
 1451                 fields.update(TEST_JOURNAL_FIELDS)
 1452                 journal.send(MESSAGE=l, **fields)
 1453             self.waitForTicks(1)
 1454             self.waitFailTotal(6, 10)
 1455             self.assertTrue(Utils.wait_for(lambda: len(self.jail) == 2, 10))
 1456             self.assertSortedEqual([self.jail.getFailTicket().getIP(), self.jail.getFailTicket().getIP()], 
 1457                 ["192.0.2.1", "192.0.2.2"])
 1458 
 1459     cls = MonitorJournalFailures
 1460     cls.__qualname__ = cls.__name__ = "MonitorJournalFailures<%s>(%s)" \
 1461               % (Filter_.__name__, testclass_name)
 1462     return cls
 1463 
 1464 
 1465 class GetFailures(LogCaptureTestCase):
 1466 
 1467     FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
 1468     FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
 1469     FILENAME_03 = os.path.join(TEST_FILES_DIR, "testcase03.log")
 1470     FILENAME_04 = os.path.join(TEST_FILES_DIR, "testcase04.log")
 1471     FILENAME_USEDNS = os.path.join(TEST_FILES_DIR, "testcase-usedns.log")
 1472     FILENAME_MULTILINE = os.path.join(TEST_FILES_DIR, "testcase-multiline.log")
 1473 
 1474     # so that they could be reused by other tests
 1475     FAILURES_01 = ('193.168.0.128', 3, 1124013599.0,
 1476                   [u'Aug 14 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128']*3)
 1477 
 1478     def setUp(self):
 1479         """Call before every test case."""
 1480         LogCaptureTestCase.setUp(self)
 1481         setUpMyTime()
 1482         self.jail = DummyJail()
 1483         self.filter = FileFilter(self.jail)
 1484         self.filter.active = True
 1485         # speedup search using exact date pattern:
 1486         self.filter.setDatePattern(r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?')
 1487         # TODO Test this
 1488         #self.filter.setTimeRegex("\S{3}\s{1,2}\d{1,2} \d{2}:\d{2}:\d{2}")
 1489         #self.filter.setTimePattern("%b %d %H:%M:%S")
 1490 
 1491     def tearDown(self):
 1492         """Call after every test case."""
 1493         tearDownMyTime()
 1494         LogCaptureTestCase.tearDown(self)
 1495 
 1496     def testFilterAPI(self):
 1497         self.assertEqual(self.filter.getLogs(), [])
 1498         self.assertEqual(self.filter.getLogCount(), 0)
 1499         self.filter.addLogPath(GetFailures.FILENAME_01, tail=True)
 1500         self.assertEqual(self.filter.getLogCount(), 1)
 1501         self.assertEqual(self.filter.getLogPaths(), [GetFailures.FILENAME_01])
 1502         self.filter.addLogPath(GetFailures.FILENAME_02, tail=True)
 1503         self.assertEqual(self.filter.getLogCount(), 2)
 1504         self.assertSortedEqual(self.filter.getLogPaths(), [GetFailures.FILENAME_01, GetFailures.FILENAME_02])
 1505 
 1506     def testTail(self):
 1507         # There must be no containters registered, otherwise [-1] indexing would be wrong
 1508         self.assertEqual(self.filter.getLogs(), [])
 1509         self.filter.addLogPath(GetFailures.FILENAME_01, tail=True)
 1510         self.assertEqual(self.filter.getLogs()[-1].getPos(), 1653)
 1511         self.filter.getLogs()[-1].close()
 1512         self.assertEqual(self.filter.getLogs()[-1].readline(), "")
 1513         self.filter.delLogPath(GetFailures.FILENAME_01)
 1514         self.assertEqual(self.filter.getLogs(), [])
 1515 
 1516     def testNoLogAdded(self):
 1517         self.filter.addLogPath(GetFailures.FILENAME_01, tail=True)
 1518         self.assertTrue(self.filter.containsLogPath(GetFailures.FILENAME_01))
 1519         self.filter.delLogPath(GetFailures.FILENAME_01)
 1520         self.assertFalse(self.filter.containsLogPath(GetFailures.FILENAME_01))
 1521         # and unknown (safety and cover)
 1522         self.assertFalse(self.filter.containsLogPath('unknown.log'))
 1523         self.filter.delLogPath('unknown.log')
 1524 
 1525 
 1526     def testGetFailures01(self, filename=None, failures=None):
 1527         filename = filename or GetFailures.FILENAME_01
 1528         failures = failures or GetFailures.FAILURES_01
 1529 
 1530         self.filter.addLogPath(filename, autoSeek=0)
 1531         self.filter.addFailRegex(r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>$")
 1532         self.filter.getFailures(filename)
 1533         _assert_correct_last_attempt(self, self.filter,  failures)
 1534 
 1535     def testCRLFFailures01(self):
 1536         # We first adjust logfile/failures to end with CR+LF
 1537         fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
 1538         # poor man unix2dos:
 1539         fin, fout = open(GetFailures.FILENAME_01), open(fname, 'w')
 1540         for l in fin.readlines():
 1541             fout.write('%s\r\n' % l.rstrip('\n'))
 1542         fin.close()
 1543         fout.close()
 1544 
 1545         # now see if we should be getting the "same" failures
 1546         self.testGetFailures01(filename=fname)
 1547         _killfile(fout, fname)
 1548 
 1549     def testGetFailures02(self):
 1550         output = ('141.3.81.106', 4, 1124013539.0,
 1551                   [u'Aug 14 11:%d:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:141.3.81.106 port 51332 ssh2'
 1552                    % m for m in 53, 54, 57, 58])
 1553 
 1554         self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=0)
 1555         self.filter.addFailRegex(r"Failed .* from <HOST>")
 1556         self.filter.getFailures(GetFailures.FILENAME_02)
 1557         _assert_correct_last_attempt(self, self.filter, output)
 1558 
 1559     def testGetFailures03(self):
 1560         output = ('203.162.223.135', 7, 1124013544.0)
 1561 
 1562         self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=0)
 1563         self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
 1564         self.filter.getFailures(GetFailures.FILENAME_03)
 1565         _assert_correct_last_attempt(self, self.filter, output)
 1566 
 1567     def testGetFailures03_Seek1(self):
 1568         # same test as above but with seek to 'Aug 14 11:55:04' - so other output ...
 1569         output = ('203.162.223.135', 5, 1124013544.0)
 1570 
 1571         self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2] - 4*60)
 1572         self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
 1573         self.filter.getFailures(GetFailures.FILENAME_03)
 1574         _assert_correct_last_attempt(self, self.filter, output)
 1575 
 1576     def testGetFailures03_Seek2(self):
 1577         # same test as above but with seek to 'Aug 14 11:59:04' - so other output ...
 1578         output = ('203.162.223.135', 1, 1124013544.0)
 1579         self.filter.setMaxRetry(1)
 1580 
 1581         self.filter.addLogPath(GetFailures.FILENAME_03, autoSeek=output[2])
 1582         self.filter.addFailRegex(r"error,relay=<HOST>,.*550 User unknown")
 1583         self.filter.getFailures(GetFailures.FILENAME_03)
 1584         _assert_correct_last_attempt(self, self.filter, output)
 1585 
 1586     def testGetFailures04(self):
 1587         # because of not exact time in testcase04.log (no year), we should always use our test time:
 1588         self.assertEqual(MyTime.time(), 1124013600)
 1589         # should find exact 4 failures for *.186 and 2 failures for *.185
 1590         output = (('212.41.96.186', 4, 1124013600.0),
 1591                   ('212.41.96.185', 2, 1124013598.0))
 1592 
 1593         # speedup search using exact date pattern:
 1594         self.filter.setDatePattern((r'^%ExY(?P<_sep>[-/.])%m(?P=_sep)%d[T ]%H:%M:%S(?:[.,]%f)?(?:\s*%z)?',
 1595             r'^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?',
 1596             r'^EPOCH'
 1597         ))
 1598         self.filter.setMaxRetry(2)
 1599         self.filter.addLogPath(GetFailures.FILENAME_04, autoSeek=0)
 1600         self.filter.addFailRegex(r"Invalid user .* <HOST>")
 1601         self.filter.getFailures(GetFailures.FILENAME_04)
 1602 
 1603         _assert_correct_last_attempt(self, self.filter, output)
 1604 
 1605     def testGetFailuresWrongChar(self):
 1606         # write wrong utf-8 char:
 1607         fname = tempfile.mktemp(prefix='tmp_fail2ban', suffix='crlf')
 1608         fout = fopen(fname, 'wb')
 1609         try:
 1610             # write:
 1611             for l in (
 1612                 b'2015-01-14 20:00:58 user \"test\xf1ing\" from \"192.0.2.0\"\n',          # wrong utf-8 char
 1613                 b'2015-01-14 20:00:59 user \"\xd1\xe2\xe5\xf2\xe0\" from \"192.0.2.0\"\n', # wrong utf-8 chars
 1614                 b'2015-01-14 20:01:00 user \"testing\" from \"192.0.2.0\"\n'               # correct utf-8 chars
 1615             ):
 1616                 fout.write(l)
 1617             fout.close()
 1618             #
 1619             output = ('192.0.2.0', 3, 1421262060.0)
 1620             failregex = r"^\s*user \"[^\"]*\" from \"<HOST>\"\s*$"
 1621 
 1622             # test encoding auto or direct set of encoding:
 1623             for enc in (None, 'utf-8', 'ascii'):
 1624                 if enc is not None:
 1625                     self.tearDown();self.setUp();
 1626                     self.filter.setLogEncoding(enc);
 1627                 # speedup search using exact date pattern:
 1628                 self.filter.setDatePattern(r'^%ExY-%Exm-%Exd %ExH:%ExM:%ExS')
 1629                 self.assertNotLogged('Error decoding line');
 1630                 self.filter.addLogPath(fname)
 1631                 self.filter.addFailRegex(failregex)
 1632                 self.filter.getFailures(fname)
 1633                 _assert_correct_last_attempt(self, self.filter, output)
 1634                 
 1635                 self.assertLogged('Error decoding line');
 1636                 self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:58 user ');
 1637                 self.assertLogged('Continuing to process line ignoring invalid characters:', '2015-01-14 20:00:59 user ');
 1638 
 1639         finally:
 1640             _killfile(fout, fname)
 1641 
 1642     def testGetFailuresUseDNS(self):
 1643         unittest.F2B.SkipIfNoNetwork()
 1644         # We should still catch failures with usedns = no ;-)
 1645         output_yes = (
 1646             ('93.184.216.34', 2, 1124013539.0,
 1647               [u'Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2',
 1648                u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']
 1649             ),
 1650             ('2606:2800:220:1:248:1893:25c8:1946', 1, 1124013299.0,
 1651               [u'Aug 14 11:54:59 i60p295 sshd[12365]: Failed publickey for roehl from example.com port 51332 ssh2']
 1652             ),
 1653         )
 1654 
 1655         output_no = (
 1656             ('93.184.216.34', 1, 1124013539.0,
 1657               [u'Aug 14 11:58:59 i60p295 sshd[12365]: Failed publickey for roehl from ::ffff:93.184.216.34 port 51332 ssh2']
 1658             )
 1659         )
 1660 
 1661         # Actually no exception would be raised -- it will be just set to 'no'
 1662         #self.assertRaises(ValueError,
 1663         #                 FileFilter, None, useDns='wrong_value_for_useDns')
 1664 
 1665         for useDns, output in (
 1666             ('yes',  output_yes),
 1667             ('no',   output_no),
 1668             ('warn', output_yes)
 1669         ):
 1670             self.pruneLog("[test-phase useDns=%s]" % useDns)
 1671             jail = DummyJail()
 1672             filter_ = FileFilter(jail, useDns=useDns)
 1673             filter_.active = True
 1674             filter_.failManager.setMaxRetry(1)  # we might have just few failures
 1675 
 1676             filter_.addLogPath(GetFailures.FILENAME_USEDNS, autoSeek=False)
 1677             filter_.addFailRegex(r"Failed .* from <HOST>")
 1678             filter_.getFailures(GetFailures.FILENAME_USEDNS)
 1679             _assert_correct_last_attempt(self, filter_, output)
 1680 
 1681     def testGetFailuresMultiRegex(self):
 1682         output = ('141.3.81.106', 8, 1124013541.0)
 1683 
 1684         self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False)
 1685         self.filter.addFailRegex(r"Failed .* from <HOST>")
 1686         self.filter.addFailRegex(r"Accepted .* from <HOST>")
 1687         self.filter.getFailures(GetFailures.FILENAME_02)
 1688         _assert_correct_last_attempt(self, self.filter, output)
 1689 
 1690     def testGetFailuresIgnoreRegex(self):
 1691         self.filter.addLogPath(GetFailures.FILENAME_02, autoSeek=False)
 1692         self.filter.addFailRegex(r"Failed .* from <HOST>")
 1693         self.filter.addFailRegex(r"Accepted .* from <HOST>")
 1694         self.filter.addIgnoreRegex("for roehl")
 1695 
 1696         self.filter.getFailures(GetFailures.FILENAME_02)
 1697 
 1698         self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1699 
 1700     def testGetFailuresMultiLine(self):
 1701         output = [("192.0.43.10", 2, 1124013599.0),
 1702             ("192.0.43.11", 1, 1124013598.0)]
 1703         self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
 1704         self.filter.setMaxLines(100)
 1705         self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
 1706         self.filter.setMaxRetry(1)
 1707 
 1708         self.filter.getFailures(GetFailures.FILENAME_MULTILINE)
 1709 
 1710         foundList = []
 1711         while True:
 1712             try:
 1713                 foundList.append(
 1714                     _ticket_tuple(self.filter.failManager.toBan())[0:3])
 1715             except FailManagerEmpty:
 1716                 break
 1717         self.assertSortedEqual(foundList, output)
 1718 
 1719     def testGetFailuresMultiLineIgnoreRegex(self):
 1720         output = [("192.0.43.10", 2, 1124013599.0)]
 1721         self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
 1722         self.filter.setMaxLines(100)
 1723         self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
 1724         self.filter.addIgnoreRegex("rsync error: Received SIGINT")
 1725         self.filter.setMaxRetry(1)
 1726 
 1727         self.filter.getFailures(GetFailures.FILENAME_MULTILINE)
 1728 
 1729         _assert_correct_last_attempt(self, self.filter, output.pop())
 1730 
 1731         self.assertRaises(FailManagerEmpty, self.filter.failManager.toBan)
 1732 
 1733     def testGetFailuresMultiLineMultiRegex(self):
 1734         output = [("192.0.43.10", 2, 1124013599.0),
 1735             ("192.0.43.11", 1, 1124013598.0),
 1736             ("192.0.43.15", 1, 1124013598.0)]
 1737         self.filter.addLogPath(GetFailures.FILENAME_MULTILINE, autoSeek=False)
 1738         self.filter.setMaxLines(100)
 1739         self.filter.addFailRegex(r"^.*rsyncd\[(?P<pid>\d+)\]: connect from .+ \(<HOST>\)$<SKIPLINES>^.+ rsyncd\[(?P=pid)\]: rsync error: .*$")
 1740         self.filter.addFailRegex(r"^.* sendmail\[.*, msgid=<(?P<msgid>[^>]+).*relay=\[<HOST>\].*$<SKIPLINES>^.+ spamd: result: Y \d+ .*,mid=<(?P=msgid)>(,bayes=[.\d]+)?(,autolearn=\S+)?\s*$")
 1741         self.filter.setMaxRetry(1)
 1742 
 1743         self.filter.getFailures(GetFailures.FILENAME_MULTILINE)
 1744 
 1745         foundList = []
 1746         while True:
 1747             try:
 1748                 foundList.append(
 1749                     _ticket_tuple(self.filter.failManager.toBan())[0:3])
 1750             except FailManagerEmpty:
 1751                 break
 1752         self.assertSortedEqual(foundList, output)
 1753 
 1754 
 1755 class DNSUtilsTests(unittest.TestCase):
 1756 
 1757     def testCache(self):
 1758         c = Utils.Cache(maxCount=5, maxTime=60)
 1759         # not available :
 1760         self.assertTrue(c.get('a') is None)
 1761         self.assertEqual(c.get('a', 'test'), 'test')
 1762         # exact 5 elements :
 1763         for i in xrange(5):
 1764             c.set(i, i)
 1765         for i in xrange(5):
 1766             self.assertEqual(c.get(i), i)
 1767         # remove unavailable key:
 1768         c.unset('a'); c.unset('a')
 1769 
 1770     def testCacheMaxSize(self):
 1771         c = Utils.Cache(maxCount=5, maxTime=60)
 1772         # exact 5 elements :
 1773         for i in xrange(5):
 1774             c.set(i, i)
 1775         self.assertEqual([c.get(i) for i in xrange(5)], [i for i in xrange(5)])
 1776         self.assertNotIn(-1, (c.get(i, -1) for i in xrange(5)))
 1777         # add one - too many:
 1778         c.set(10, i)
 1779         # one element should be removed :
 1780         self.assertIn(-1, (c.get(i, -1) for i in xrange(5)))
 1781         # test max size (not expired):
 1782         for i in xrange(10):
 1783             c.set(i, 1)
 1784         self.assertEqual(len(c), 5)
 1785 
 1786     def testCacheMaxTime(self):
 1787         # test max time (expired, timeout reached) :
 1788         c = Utils.Cache(maxCount=5, maxTime=0.0005)
 1789         for i in xrange(10):
 1790             c.set(i, 1)
 1791         st = time.time()
 1792         self.assertTrue(Utils.wait_for(lambda: time.time() >= st + 0.0005, 1))
 1793         # we have still 5 elements (or fewer if too slow test mashine):
 1794         self.assertTrue(len(c) <= 5)
 1795         # but all that are expiered also:
 1796         for i in xrange(10):
 1797             self.assertTrue(c.get(i) is None)
 1798         # here the whole cache should be empty:
 1799         self.assertEqual(len(c), 0)
 1800         
 1801     def testOverflowedIPCache(self):
 1802         # test overflow of IP-cache multi-threaded (2 "parasite" threads flooding cache):
 1803         from threading import Thread
 1804         from random import shuffle
 1805         # save original cache and use smaller cache during the test here:
 1806         _org_cache = IPAddr.CACHE_OBJ
 1807         cache = IPAddr.CACHE_OBJ = Utils.Cache(maxCount=5, maxTime=60)
 1808         result = list()
 1809         count = 1 if unittest.F2B.fast else 50
 1810         try:
 1811             # tester procedure of worker:
 1812             def _TestCacheStr2IP(forw=True, result=[], random=False):
 1813                 try:
 1814                     c = count
 1815                     while c:
 1816                         c -= 1
 1817                         s = xrange(0, 256, 1) if forw else xrange(255, -1, -1)
 1818                         if random: shuffle([i for i in s])
 1819                         for i in s:
 1820                             IPAddr('192.0.2.'+str(i), IPAddr.FAM_IPv4)
 1821                             IPAddr('2001:db8::'+str(i), IPAddr.FAM_IPv6)
 1822                     result.append(None)
 1823                 except Exception as e:
 1824                     DefLogSys.debug(e, exc_info=True)
 1825                     result.append(e)
 1826 
 1827             # 2 workers flooding it forwards and backwards:
 1828             th1 = Thread(target=_TestCacheStr2IP, args=(True,  result)); th1.start()
 1829             th2 = Thread(target=_TestCacheStr2IP, args=(False, result)); th2.start()
 1830             # and here we flooding it with random IPs too:
 1831             _TestCacheStr2IP(True, result, True)
 1832         finally:
 1833             # wait for end of threads and restore cache:
 1834             th1.join()
 1835             th2.join()
 1836             IPAddr.CACHE_OBJ = _org_cache
 1837         self.assertEqual(result, [None]*3) # no errors
 1838         self.assertTrue(len(cache) <= cache.maxCount)
 1839 
 1840 
 1841 class DNSUtilsNetworkTests(unittest.TestCase):
 1842 
 1843     def setUp(self):
 1844         """Call before every test case."""
 1845         super(DNSUtilsNetworkTests, self).setUp()
 1846         #unittest.F2B.SkipIfNoNetwork()
 1847 
 1848     def test_IPAddr(self):
 1849         ip4 = IPAddr('192.0.2.1')
 1850         ip6 = IPAddr('2001:DB8::')
 1851         self.assertTrue(ip4.isIPv4)
 1852         self.assertTrue(ip6.isIPv6)
 1853         self.assertTrue(asip('192.0.2.1').isIPv4)
 1854         self.assertTrue(id(asip(ip4)) == id(ip4))
 1855 
 1856     def test_IPAddr_Raw(self):
 1857         # raw string:
 1858         r = IPAddr('xxx', IPAddr.CIDR_RAW)
 1859         self.assertFalse(r.isIPv4)
 1860         self.assertFalse(r.isIPv6)
 1861         self.assertTrue(r.isValid)
 1862         self.assertEqual(r, 'xxx')
 1863         self.assertEqual('xxx', str(r))
 1864         self.assertNotEqual(r, IPAddr('xxx'))
 1865         # raw (not IP, for example host:port as string):
 1866         r = IPAddr('1:2', IPAddr.CIDR_RAW)
 1867         self.assertFalse(r.isIPv4)
 1868         self.assertFalse(r.isIPv6)
 1869         self.assertTrue(r.isValid)
 1870         self.assertEqual(r, '1:2')
 1871         self.assertEqual('1:2', str(r))
 1872         self.assertNotEqual(r, IPAddr('1:2'))
 1873         # raw vs ip4 (raw is not an ip):
 1874         r = IPAddr('93.184.0.1', IPAddr.CIDR_RAW)
 1875         ip4 = IPAddr('93.184.0.1')
 1876         self.assertNotEqual(ip4, r)
 1877         self.assertNotEqual(r, ip4)
 1878         self.assertTrue(r < ip4)
 1879         self.assertTrue(r < ip4)
 1880         # raw vs ip6 (raw is not an ip):
 1881         r = IPAddr('1::2', IPAddr.CIDR_RAW)
 1882         ip6 = IPAddr('1::2')
 1883         self.assertNotEqual(ip6, r)
 1884         self.assertNotEqual(r, ip6)
 1885         self.assertTrue(r < ip6)
 1886         self.assertTrue(r < ip6)
 1887 
 1888     def testUseDns(self):
 1889         res = DNSUtils.textToIp('www.example.com', 'no')
 1890         self.assertSortedEqual(res, [])
 1891         unittest.F2B.SkipIfNoNetwork()
 1892         res = DNSUtils.textToIp('www.example.com', 'warn')
 1893         # sort ipaddr, IPv4 is always smaller as IPv6
 1894         self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
 1895         res = DNSUtils.textToIp('www.example.com', 'yes')
 1896         # sort ipaddr, IPv4 is always smaller as IPv6
 1897         self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
 1898 
 1899     def testTextToIp(self):
 1900         unittest.F2B.SkipIfNoNetwork()
 1901         # Test hostnames
 1902         hostnames = [
 1903             'www.example.com',
 1904             'doh1.2.3.4.buga.xxxxx.yyy.invalid',
 1905             '1.2.3.4.buga.xxxxx.yyy.invalid',
 1906             ]
 1907         for s in hostnames:
 1908             res = DNSUtils.textToIp(s, 'yes')
 1909             if s == 'www.example.com':
 1910                 # sort ipaddr, IPv4 is always smaller as IPv6
 1911                 self.assertSortedEqual(res, ['93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'])
 1912             else:
 1913                 self.assertSortedEqual(res, [])
 1914 
 1915     def testIpToIp(self):
 1916         # pure ips:
 1917         for s in ('93.184.216.34', '2606:2800:220:1:248:1893:25c8:1946'):
 1918             ips = DNSUtils.textToIp(s, 'yes')
 1919             self.assertSortedEqual(ips, [s])
 1920             for ip in ips:
 1921                 self.assertTrue(isinstance(ip, IPAddr))
 1922 
 1923     def testIpToName(self):
 1924         unittest.F2B.SkipIfNoNetwork()
 1925         res = DNSUtils.ipToName('8.8.4.4')
 1926         self.assertTrue(res.endswith(('.google', '.google.com')))
 1927         # same as above, but with IPAddr:
 1928         res = DNSUtils.ipToName(IPAddr('8.8.4.4'))
 1929         self.assertTrue(res.endswith(('.google', '.google.com')))
 1930         # invalid ip (TEST-NET-1 according to RFC 5737)
 1931         res = DNSUtils.ipToName('192.0.2.0')
 1932         self.assertEqual(res, None)
 1933         # invalid ip:
 1934         res = DNSUtils.ipToName('192.0.2.888')
 1935         self.assertEqual(res, None)
 1936 
 1937     def testAddr2bin(self):
 1938         res = IPAddr('10.0.0.0')
 1939         self.assertEqual(res.addr, 167772160L)
 1940         res = IPAddr('10.0.0.0', cidr=None)
 1941         self.assertEqual(res.addr, 167772160L)
 1942         res = IPAddr('10.0.0.0', cidr=32L)
 1943         self.assertEqual(res.addr, 167772160L)
 1944         res = IPAddr('10.0.0.1', cidr=32L)
 1945         self.assertEqual(res.addr, 167772161L)
 1946         res = IPAddr('10.0.0.1', cidr=31L)
 1947         self.assertEqual(res.addr, 167772160L)
 1948 
 1949         self.assertEqual(IPAddr('10.0.0.0').hexdump, '0a000000')
 1950         self.assertEqual(IPAddr('1::2').hexdump, '00010000000000000000000000000002')
 1951         self.assertEqual(IPAddr('xxx').hexdump, '')
 1952 
 1953         self.assertEqual(IPAddr('192.0.2.0').getPTR(), '0.2.0.192.in-addr.arpa.')
 1954         self.assertEqual(IPAddr('192.0.2.1').getPTR(), '1.2.0.192.in-addr.arpa.')
 1955         self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8:1946').getPTR(), 
 1956             '6.4.9.1.8.c.5.2.3.9.8.1.8.4.2.0.1.0.0.0.0.2.2.0.0.0.8.2.6.0.6.2.ip6.arpa.')
 1957 
 1958     def testIPAddr_Equal6(self):
 1959         self.assertEqual(
 1960             IPAddr('2606:2800:220:1:248:1893::'),
 1961             IPAddr('2606:2800:220:1:248:1893:0:0')
 1962         )
 1963         # special case IPv6 in brackets:
 1964         self.assertEqual(
 1965             IPAddr('[2606:2800:220:1:248:1893::]'),
 1966             IPAddr('2606:2800:220:1:248:1893:0:0')
 1967         )
 1968 
 1969     def testIPAddr_InInet(self):
 1970         ip4net = IPAddr('93.184.0.1/24')
 1971         ip6net = IPAddr('2606:2800:220:1:248:1893:25c8:0/120')
 1972         # ip4:
 1973         self.assertTrue(IPAddr('93.184.0.1').isInNet(ip4net))
 1974         self.assertTrue(IPAddr('93.184.0.255').isInNet(ip4net))
 1975         self.assertFalse(IPAddr('93.184.1.0').isInNet(ip4net))
 1976         self.assertFalse(IPAddr('93.184.0.1').isInNet(ip6net))
 1977         # ip6:
 1978         self.assertTrue(IPAddr('2606:2800:220:1:248:1893:25c8:1').isInNet(ip6net))
 1979         self.assertTrue(IPAddr('2606:2800:220:1:248:1893:25c8:ff').isInNet(ip6net))
 1980         self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:100').isInNet(ip6net))
 1981         self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:100').isInNet(ip4net))
 1982         # raw not in net:
 1983         self.assertFalse(IPAddr('93.184.0.1', IPAddr.CIDR_RAW).isInNet(ip4net))
 1984         self.assertFalse(IPAddr('2606:2800:220:1:248:1893:25c8:1', IPAddr.CIDR_RAW).isInNet(ip6net))
 1985         # invalid not in net:
 1986         self.assertFalse(IPAddr('xxx').isInNet(ip4net))
 1987 
 1988     def testIPAddr_Compare(self):
 1989         ip4 = [
 1990             IPAddr('93.184.0.1'),
 1991             IPAddr('93.184.216.1'),
 1992             IPAddr('93.184.216.34')
 1993         ]
 1994         ip6 = [
 1995             IPAddr('2606:2800:220:1:248:1893::'),
 1996             IPAddr('2606:2800:220:1:248:1893:25c8:0'),
 1997             IPAddr('2606:2800:220:1:248:1893:25c8:1946')
 1998         ]
 1999         # ip4
 2000         self.assertNotEqual(ip4[0], None)
 2001         self.assertTrue(ip4[0] is not None)
 2002         self.assertFalse(ip4[0] is None)
 2003         self.assertTrue(ip4[0] < ip4[1])
 2004         self.assertTrue(ip4[1] < ip4[2])
 2005         self.assertEqual(sorted(reversed(ip4)), ip4)
 2006         # ip6
 2007         self.assertNotEqual(ip6[0], None)
 2008         self.assertTrue(ip6[0] is not None)
 2009         self.assertFalse(ip6[0] is None)
 2010         self.assertTrue(ip6[0] < ip6[1])
 2011         self.assertTrue(ip6[1] < ip6[2])
 2012         self.assertEqual(sorted(reversed(ip6)), ip6)
 2013         # ip4 vs ip6
 2014         self.assertNotEqual(ip4[0], ip6[0])
 2015         self.assertTrue(ip4[0] < ip6[0])
 2016         self.assertTrue(ip4[2] < ip6[2])
 2017         self.assertEqual(sorted(reversed(ip4+ip6)), ip4+ip6)
 2018         # hashing (with string as key):
 2019         d={
 2020             '93.184.216.34': 'ip4-test', 
 2021             '2606:2800:220:1:248:1893:25c8:1946': 'ip6-test'
 2022         }
 2023         d2 = dict([(IPAddr(k), v) for k, v in d.iteritems()])
 2024         self.assertTrue(isinstance(d.keys()[0], basestring))
 2025         self.assertTrue(isinstance(d2.keys()[0], IPAddr))
 2026         self.assertEqual(d.get(ip4[2], ''), 'ip4-test')
 2027         self.assertEqual(d.get(ip6[2], ''), 'ip6-test')
 2028         self.assertEqual(d2.get(str(ip4[2]), ''), 'ip4-test')
 2029         self.assertEqual(d2.get(str(ip6[2]), ''), 'ip6-test')
 2030         # compare with string direct:
 2031         self.assertEqual(d, d2)
 2032 
 2033     def testIPAddr_CIDR(self):
 2034         self.assertEqual(str(IPAddr('93.184.0.1', 24)), '93.184.0.0/24')
 2035         self.assertEqual(str(IPAddr('192.168.1.0/255.255.255.128')), '192.168.1.0/25')
 2036         self.assertEqual(IPAddr('93.184.0.1', 24).ntoa, '93.184.0.0/24')
 2037         self.assertEqual(IPAddr('192.168.1.0/255.255.255.128').ntoa, '192.168.1.0/25')
 2038 
 2039         self.assertEqual(IPAddr('93.184.0.1/32').ntoa, '93.184.0.1')
 2040         self.assertEqual(IPAddr('93.184.0.1/255.255.255.255').ntoa, '93.184.0.1')
 2041 
 2042         self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8::', 120)), '2606:2800:220:1:248:1893:25c8:0/120')
 2043         self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8::', 120).ntoa, '2606:2800:220:1:248:1893:25c8:0/120')
 2044         self.assertEqual(str(IPAddr('2606:2800:220:1:248:1893:25c8:0/120')), '2606:2800:220:1:248:1893:25c8:0/120')
 2045         self.assertEqual(IPAddr('2606:2800:220:1:248:1893:25c8:0/120').ntoa, '2606:2800:220:1:248:1893:25c8:0/120')
 2046 
 2047         self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::', 25)), '2606:2880::/25')
 2048         self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ff80::')), '2606:2880::/25')
 2049         self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ffff:ffff:ffff:ffff:ffff:ffff::')), 
 2050             '2606:28ff:220:1:248:1893:25c8:0/112')
 2051 
 2052         self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/128')), 
 2053             '2606:28ff:220:1:248:1893:25c8:0')
 2054         self.assertEqual(str(IPAddr('2606:28ff:220:1:248:1893:25c8::/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')), 
 2055             '2606:28ff:220:1:248:1893:25c8:0')
 2056 
 2057     def testIPAddr_CIDR_Wrong(self):
 2058         # too many plen representations:
 2059         self.assertRaises(ValueError, IPAddr, '2606:28ff:220:1:248:1893:25c8::/ffff::/::1')
 2060 
 2061     def testIPAddr_CIDR_Repr(self):
 2062         self.assertEqual(["127.0.0.0/8", "::/32", "2001:db8::/32"],
 2063             [IPAddr("127.0.0.0", 8), IPAddr("::1", 32), IPAddr("2001:db8::", 32)]
 2064         )
 2065 
 2066     def testIPAddr_CompareDNS(self):
 2067         ips = IPAddr('example.com')
 2068         self.assertTrue(IPAddr("93.184.216.34").isInNet(ips))
 2069         self.assertTrue(IPAddr("2606:2800:220:1:248:1893:25c8:1946").isInNet(ips))
 2070 
 2071     def testIPAddr_wrongDNS_IP(self):
 2072         unittest.F2B.SkipIfNoNetwork()
 2073         DNSUtils.dnsToIp('`this`.dns-is-wrong.`wrong-nic`-dummy')
 2074         DNSUtils.ipToName('*')
 2075 
 2076     def testIPAddr_Cached(self):
 2077         ips = [DNSUtils.dnsToIp('example.com'), DNSUtils.dnsToIp('example.com')]
 2078         for ip1, ip2 in zip(ips, ips):
 2079             self.assertEqual(id(ip1), id(ip2))
 2080         ip1 = IPAddr('93.184.216.34'); ip2 = IPAddr('93.184.216.34'); self.assertEqual(id(ip1), id(ip2))
 2081         ip1 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); ip2 = IPAddr('2606:2800:220:1:248:1893:25c8:1946'); self.assertEqual(id(ip1), id(ip2))
 2082 
 2083     def testFQDN(self):
 2084         sname = DNSUtils.getHostname(fqdn=False)
 2085         lname = DNSUtils.getHostname(fqdn=True)
 2086         # FQDN is not localhost if short hostname is not localhost too (or vice versa):
 2087         self.assertEqual(lname != 'localhost',
 2088                          sname != 'localhost')
 2089         # FQDN from short name should be long name:
 2090         self.assertEqual(getfqdn(sname), lname)
 2091         # FQDN from FQDN is the same:
 2092         self.assertEqual(getfqdn(lname), lname)
 2093         # coverage (targeting all branches): FQDN from loopback and DNS blackhole is always the same:
 2094         self.assertIn(getfqdn('localhost.'), ('localhost', 'localhost.'))
 2095     
 2096     def testFQDN_DNS(self):
 2097         unittest.F2B.SkipIfNoNetwork()
 2098         self.assertIn(getfqdn('as112.arpa.'), ('as112.arpa.', 'as112.arpa'))
 2099 
 2100 
 2101 class JailTests(unittest.TestCase):
 2102 
 2103     def testSetBackend_gh83(self):
 2104         # smoke test
 2105         # Must not fail to initiate
 2106         Jail('test', backend='polling')
 2107