"Fossies" - the Fresh Open Source Software Archive

Member "fail2ban-0.10.4/fail2ban/tests/fail2banregextestcase.py" (4 Oct 2018, 14474 Bytes) of package /linux/misc/fail2ban-0.10.4.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "fail2banregextestcase.py": 0.10.3.1_vs_0.10.4.

    1 # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
    2 # vi: set ft=python sts=4 ts=4 sw=4 noet :
    3 
    4 # This file is part of Fail2Ban.
    5 #
    6 # Fail2Ban is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # Fail2Ban is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with Fail2Ban; if not, write to the Free Software
   18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   19 
   20 # Fail2Ban developers
   21 
   22 __author__ = "Serg Brester"
   23 __copyright__ = "Copyright (c) 2015 Serg G. Brester (sebres), 2008- Fail2Ban Contributors"
   24 __license__ = "GPL"
   25 
   26 import os
   27 import sys
   28 
   29 from ..client import fail2banregex
   30 from ..client.fail2banregex import Fail2banRegex, get_opt_parser, exec_command_line, output, str2LogLevel
   31 from .utils import setUpMyTime, tearDownMyTime, LogCaptureTestCase, logSys
   32 from .utils import CONFIG_DIR
   33 
   34 
   35 fail2banregex.logSys = logSys
   36 def _test_output(*args):
   37     logSys.notice(args[0])
   38 
   39 fail2banregex.output = _test_output
   40 
   41 TEST_CONFIG_DIR = os.path.join(os.path.dirname(__file__), "config")
   42 TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
   43 
   44 DEV_NULL = None
   45 
   46 def _Fail2banRegex(*args):
   47     parser = get_opt_parser()
   48     (opts, args) = parser.parse_args(list(args))
   49     # put down log-level if expected, because of too many debug-messages:
   50     if opts.log_level in ("notice", "warning"):
   51         logSys.setLevel(str2LogLevel(opts.log_level))
   52     return (opts, args, Fail2banRegex(opts))
   53 
   54 class ExitException(Exception):
   55     def __init__(self, code):
   56         self.code = code
   57         self.msg = 'Exit with code: %s' % code
   58 
   59 def _test_exec_command_line(*args):
   60     def _exit(code=0):
   61         raise ExitException(code)
   62     global DEV_NULL
   63     _org = {'exit': sys.exit, 'stdout': sys.stdout, 'stderr': sys.stderr}
   64     _exit_code = 0
   65     sys.exit = _exit
   66     if not DEV_NULL: DEV_NULL = open(os.devnull, "w")
   67     sys.stderr = sys.stdout = DEV_NULL
   68     try:
   69         exec_command_line(list(args))
   70     except ExitException as e:
   71         _exit_code = e.code
   72     finally:
   73         sys.exit = _org['exit']
   74         sys.stdout = _org['stdout']
   75         sys.stderr = _org['stderr']
   76     return _exit_code
   77 
   78 
   79 class Fail2banRegexTest(LogCaptureTestCase):
   80 
   81     RE_00 = r"(?:(?:Authentication failure|Failed [-/\w+]+) for(?: [iI](?:llegal|nvalid) user)?|[Ii](?:llegal|nvalid) user|ROOT LOGIN REFUSED) .*(?: from|FROM) <HOST>"
   82 
   83     FILENAME_01 = os.path.join(TEST_FILES_DIR, "testcase01.log")
   84     FILENAME_02 = os.path.join(TEST_FILES_DIR, "testcase02.log")
   85     FILENAME_WRONGCHAR = os.path.join(TEST_FILES_DIR, "testcase-wrong-char.log")
   86 
   87     FILENAME_SSHD = os.path.join(TEST_FILES_DIR, "logs", "sshd")
   88     FILTER_SSHD = os.path.join(CONFIG_DIR, 'filter.d', 'sshd.conf')
   89     FILENAME_ZZZ_SSHD = os.path.join(TEST_FILES_DIR, 'zzz-sshd-obsolete-multiline.log')
   90     FILTER_ZZZ_SSHD = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-sshd-obsolete-multiline.conf')
   91 
   92     FILENAME_ZZZ_GEN = os.path.join(TEST_FILES_DIR, "logs", "zzz-generic-example")
   93     FILTER_ZZZ_GEN = os.path.join(TEST_CONFIG_DIR, 'filter.d', 'zzz-generic-example.conf')
   94 
   95     def setUp(self):
   96         """Call before every test case."""
   97         LogCaptureTestCase.setUp(self)
   98         setUpMyTime()
   99 
  100     def tearDown(self):
  101         """Call after every test case."""
  102         LogCaptureTestCase.tearDown(self)
  103         tearDownMyTime()
  104 
  105     def testWrongRE(self):
  106         (opts, args, fail2banRegex) = _Fail2banRegex(
  107             "test", r".** from <HOST>$"
  108         )
  109         self.assertFalse(fail2banRegex.start(args))
  110         self.assertLogged("Unable to compile regular expression")
  111 
  112     def testWrongIngnoreRE(self):
  113         (opts, args, fail2banRegex) = _Fail2banRegex(
  114             "--datepattern", "{^LN-BEG}EPOCH",
  115             "test", r".*? from <HOST>$", r".**"
  116         )
  117         self.assertFalse(fail2banRegex.start(args))
  118         self.assertLogged("Unable to compile regular expression")
  119 
  120     def testDirectFound(self):
  121         (opts, args, fail2banRegex) = _Fail2banRegex(
  122             "--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
  123             "--print-all-matched", "--print-no-missed",
  124             "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
  125             r"Authentication failure for .*? from <HOST>$"
  126         )
  127         self.assertTrue(fail2banRegex.start(args))
  128         self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
  129 
  130     def testDirectNotFound(self):
  131         (opts, args, fail2banRegex) = _Fail2banRegex(
  132             "--print-all-missed",
  133             "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
  134             r"XYZ from <HOST>$"
  135         )
  136         self.assertTrue(fail2banRegex.start(args))
  137         self.assertLogged('Lines: 1 lines, 0 ignored, 0 matched, 1 missed')
  138 
  139     def testDirectIgnored(self):
  140         (opts, args, fail2banRegex) = _Fail2banRegex(
  141             "--print-all-ignored",
  142             "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
  143             r"Authentication failure for .*? from <HOST>$",
  144             r"kevin from 192.0.2.0$"
  145         )
  146         self.assertTrue(fail2banRegex.start(args))
  147         self.assertLogged('Lines: 1 lines, 1 ignored, 0 matched, 0 missed')
  148 
  149     def testDirectRE_1(self):
  150         (opts, args, fail2banRegex) = _Fail2banRegex(
  151             "--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
  152             "--print-all-matched",
  153             Fail2banRegexTest.FILENAME_01, 
  154             Fail2banRegexTest.RE_00
  155         )
  156         self.assertTrue(fail2banRegex.start(args))
  157         self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
  158 
  159         self.assertLogged('Error decoding line');
  160         self.assertLogged('Continuing to process line ignoring invalid characters')
  161 
  162         self.assertLogged('Dez 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 193.168.0.128')
  163         self.assertLogged('Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 87.142.124.10')
  164 
  165     def testDirectRE_1raw(self):
  166         (opts, args, fail2banRegex) = _Fail2banRegex(
  167             "--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
  168             "--print-all-matched", "--raw",
  169             Fail2banRegexTest.FILENAME_01, 
  170             Fail2banRegexTest.RE_00
  171         )
  172         self.assertTrue(fail2banRegex.start(args))
  173         self.assertLogged('Lines: 19 lines, 0 ignored, 16 matched, 3 missed')
  174 
  175     def testDirectRE_1raw_noDns(self):
  176         (opts, args, fail2banRegex) = _Fail2banRegex(
  177             "--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
  178             "--print-all-matched", "--raw", "--usedns=no",
  179             Fail2banRegexTest.FILENAME_01, 
  180             Fail2banRegexTest.RE_00
  181         )
  182         self.assertTrue(fail2banRegex.start(args))
  183         self.assertLogged('Lines: 19 lines, 0 ignored, 13 matched, 6 missed')
  184 
  185     def testDirectRE_2(self):
  186         (opts, args, fail2banRegex) = _Fail2banRegex(
  187             "--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
  188             "--print-all-matched",
  189             Fail2banRegexTest.FILENAME_02, 
  190             Fail2banRegexTest.RE_00
  191         )
  192         self.assertTrue(fail2banRegex.start(args))
  193         self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
  194 
  195     def testVerbose(self):
  196         (opts, args, fail2banRegex) = _Fail2banRegex(
  197             "--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
  198             "--timezone", "UTC+0200",
  199             "--verbose", "--verbose-date", "--print-no-missed",
  200             Fail2banRegexTest.FILENAME_02, 
  201             Fail2banRegexTest.RE_00
  202         )
  203         self.assertTrue(fail2banRegex.start(args))
  204         self.assertLogged('Lines: 13 lines, 0 ignored, 5 matched, 8 missed')
  205 
  206         self.assertLogged('141.3.81.106  Sun Aug 14 11:53:59 2005')
  207         self.assertLogged('141.3.81.106  Sun Aug 14 11:54:59 2005')
  208 
  209     def testVerboseFullSshd(self):
  210         (opts, args, fail2banRegex) = _Fail2banRegex(
  211             "-l", "notice", # put down log-level, because of too many debug-messages
  212             "-v", "--verbose-date", "--print-all-matched", "--print-all-ignored",
  213             "-c", CONFIG_DIR,
  214             Fail2banRegexTest.FILENAME_SSHD, "sshd"
  215         )
  216         self.assertTrue(fail2banRegex.start(args))
  217         # test failure line and not-failure lines both presents:
  218         self.assertLogged("[29116]: User root not allowed because account is locked",
  219             "[29116]: Received disconnect from 1.2.3.4", all=True)
  220 
  221     def testFastSshd(self):
  222         (opts, args, fail2banRegex) = _Fail2banRegex(
  223             "-l", "notice", # put down log-level, because of too many debug-messages
  224             "--print-all-matched",
  225             "-c", CONFIG_DIR,
  226             Fail2banRegexTest.FILENAME_ZZZ_SSHD, "sshd.conf[mode=normal]"
  227         )
  228         self.assertTrue(fail2banRegex.start(args))
  229         # test failure line and all not-failure lines presents:
  230         self.assertLogged(
  231             "[29116]: Connection from 192.0.2.4",
  232             "[29116]: User root not allowed because account is locked",
  233             "[29116]: Received disconnect from 192.0.2.4", all=True)
  234 
  235     def testMultilineSshd(self):
  236         # by the way test of missing lines by multiline in `for bufLine in orgLineBuffer[int(fullBuffer):]`
  237         (opts, args, fail2banRegex) = _Fail2banRegex(
  238             "-l", "notice", # put down log-level, because of too many debug-messages
  239             "--print-all-matched", "--print-all-missed",
  240             "-c", os.path.dirname(Fail2banRegexTest.FILTER_ZZZ_SSHD),
  241             Fail2banRegexTest.FILENAME_ZZZ_SSHD, os.path.basename(Fail2banRegexTest.FILTER_ZZZ_SSHD)
  242         )
  243         self.assertTrue(fail2banRegex.start(args))
  244         # test "failure" line presents (2nd part only, because multiline fewer precise):
  245         self.assertLogged(
  246             "[29116]: Received disconnect from 192.0.2.4", all=True)
  247 
  248     def testFullGeneric(self):
  249         # by the way test of ignoreregex (specified in filter file)...
  250         (opts, args, fail2banRegex) = _Fail2banRegex(
  251             "-l", "notice", # put down log-level, because of too many debug-messages
  252             Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILTER_ZZZ_GEN+"[mode=test]"
  253         )
  254         self.assertTrue(fail2banRegex.start(args))
  255 
  256     def testDirectMultilineBuf(self):
  257         # test it with some pre-lines also to cover correct buffer scrolling (all multi-lines printed):
  258         for preLines in (0, 20):
  259             self.pruneLog("[test-phase %s]" % preLines)
  260             (opts, args, fail2banRegex) = _Fail2banRegex(
  261                 "--usedns", "no", "-d", "^Epoch", "--print-all-matched", "--maxlines", "5", 
  262                 ("1490349000 TEST-NL\n"*preLines) + 
  263                 "1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34",
  264                 r"^\s*FAIL\s*$<SKIPLINES>^\s*HOST <HOST>\s*$"
  265             )
  266             self.assertTrue(fail2banRegex.start(args))
  267             self.assertLogged('Lines: %s lines, 0 ignored, 2 matched, %s missed' % (preLines+4, preLines+2))
  268             # both matched lines were printed:
  269             self.assertLogged("|  1490349000 FAIL", "|  1490349001 HOST 192.0.2.34", all=True)
  270 
  271 
  272     def testDirectMultilineBufDebuggex(self):
  273         (opts, args, fail2banRegex) = _Fail2banRegex(
  274             "--usedns", "no", "-d", "^Epoch", "--debuggex", "--print-all-matched", "--maxlines", "5",
  275             "1490349000 FAIL\n1490349000 TEST1\n1490349001 TEST2\n1490349001 HOST 192.0.2.34",
  276             r"^\s*FAIL\s*$<SKIPLINES>^\s*HOST <HOST>\s*$"
  277         )
  278         self.assertTrue(fail2banRegex.start(args))
  279         self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
  280         # the sequence in args-dict is currently undefined (so can be 1st argument)
  281         self.assertLogged("&flags=m", "?flags=m")
  282 
  283     def testSinglelineWithNLinContent(self):
  284         # 
  285         (opts, args, fail2banRegex) = _Fail2banRegex(
  286             "--usedns", "no", "-d", "^Epoch", "--print-all-matched",
  287             "1490349000 FAIL: failure\nhost: 192.0.2.35",
  288             r"^\s*FAIL:\s*.*\nhost:\s+<HOST>$"
  289         )
  290         self.assertTrue(fail2banRegex.start(args))
  291         self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
  292 
  293     def testRegexEpochPatterns(self):
  294         (opts, args, fail2banRegex) = _Fail2banRegex(
  295             "-r", "-d", r"^\[{LEPOCH}\]\s+", "--maxlines", "5",
  296             "[1516469849] 192.0.2.1 FAIL: failure\n"
  297             "[1516469849551] 192.0.2.2 FAIL: failure\n"
  298             "[1516469849551000] 192.0.2.3 FAIL: failure\n"
  299             "[1516469849551.000] 192.0.2.4 FAIL: failure",
  300             r"^<HOST> FAIL\b"
  301         )
  302         self.assertTrue(fail2banRegex.start(args))
  303         self.assertLogged('Lines: 4 lines, 0 ignored, 4 matched, 0 missed')
  304 
  305     def testWrongFilterFile(self):
  306         # use test log as filter file to cover eror cases...
  307         (opts, args, fail2banRegex) = _Fail2banRegex(
  308             Fail2banRegexTest.FILENAME_ZZZ_GEN, Fail2banRegexTest.FILENAME_ZZZ_GEN
  309         )
  310         self.assertFalse(fail2banRegex.start(args))
  311 
  312     def _reset(self):
  313         # reset global warn-counter:
  314         from ..server.filter import _decode_line_warn
  315         _decode_line_warn.clear()
  316 
  317     def testWronChar(self):
  318         self._reset()
  319         (opts, args, fail2banRegex) = _Fail2banRegex(
  320             "-l", "notice", # put down log-level, because of too many debug-messages
  321             "--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
  322             Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD
  323         )
  324         self.assertTrue(fail2banRegex.start(args))
  325         self.assertLogged('Lines: 4 lines, 0 ignored, 2 matched, 2 missed')
  326 
  327         self.assertLogged('Error decoding line')
  328         self.assertLogged('Continuing to process line ignoring invalid characters:')
  329 
  330         self.assertLogged('Nov  8 00:16:12 main sshd[32548]: input_userauth_request: invalid user llinco')
  331         self.assertLogged('Nov  8 00:16:12 main sshd[32547]: pam_succeed_if(sshd:auth): error retrieving information about user llinco')
  332 
  333     def testWronCharDebuggex(self):
  334         self._reset()
  335         (opts, args, fail2banRegex) = _Fail2banRegex(
  336             "-l", "notice", # put down log-level, because of too many debug-messages
  337             "--datepattern", "^(?:%a )?%b %d %H:%M:%S(?:\.%f)?(?: %ExY)?",
  338             "--debuggex", "--print-all-matched",
  339             Fail2banRegexTest.FILENAME_WRONGCHAR, Fail2banRegexTest.FILTER_SSHD,
  340             r"llinco[^\\]"
  341         )
  342         self.assertTrue(fail2banRegex.start(args))
  343         self.assertLogged('Error decoding line')
  344         self.assertLogged('Lines: 4 lines, 1 ignored, 2 matched, 1 missed')
  345 
  346         self.assertLogged('https://')
  347 
  348     def testExecCmdLine_Usage(self):
  349         self.assertNotEqual(_test_exec_command_line(), 0)
  350         self.pruneLog()
  351         self.assertEqual(_test_exec_command_line('-V'), 0)
  352         self.assertLogged(fail2banregex.normVersion())
  353         self.pruneLog()
  354         self.assertEqual(_test_exec_command_line('--version'), 0)
  355 
  356     def testExecCmdLine_Direct(self):
  357         self.assertEqual(_test_exec_command_line(
  358             '-l', 'info',
  359             "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
  360             r"Authentication failure for .*? from <HOST>$"
  361         ), 0)
  362         self.assertLogged('Lines: 1 lines, 0 ignored, 1 matched, 0 missed')
  363         
  364     def testExecCmdLine_MissFailID(self):
  365         self.assertNotEqual(_test_exec_command_line(
  366             '-l', 'info',
  367             "Dec 31 11:59:59 [sshd] error: PAM: Authentication failure for kevin from 192.0.2.0",
  368             r"Authentication failure"
  369         ), 0)
  370         self.assertLogged('No failure-id group in ')