"Fossies" - the Fresh Open Source Software Archive

Member "fail2ban-0.11.1/fail2ban/tests/actiontestcase.py" (11 Jan 2020, 24077 Bytes) of package /linux/misc/fail2ban-0.11.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "actiontestcase.py": 0.10.5_vs_0.11.1.

    1 # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
    2 # vi: set ft=python sts=4 ts=4 sw=4 noet :
    3 
    4 # This file is part of Fail2Ban.
    5 #
    6 # Fail2Ban is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # Fail2Ban is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with Fail2Ban; if not, write to the Free Software
   18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   19 
   20 # Author: Cyril Jaquier
   21 # 
   22 
   23 __author__ = "Cyril Jaquier"
   24 __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
   25 __license__ = "GPL"
   26 
   27 import os
   28 import tempfile
   29 import time
   30 import unittest
   31 
   32 from ..server.action import CommandAction, CallingMap, substituteRecursiveTags
   33 from ..server.actions import OrderedDict, Actions
   34 from ..server.utils import Utils
   35 
   36 from .dummyjail import DummyJail
   37 from .utils import pid_exists, with_tmpdir, LogCaptureTestCase
   38 
   39 
   40 class CommandActionTest(LogCaptureTestCase):
   41 
   42     def setUp(self):
   43         """Call before every test case."""
   44         LogCaptureTestCase.setUp(self)
   45         self.__action = CommandAction(None, "Test")
   46         # prevent execute stop if start fails (or event not started at all):
   47         self.__action_started = False
   48         orgstart = self.__action.start
   49         def _action_start():
   50             self.__action_started = True
   51             return orgstart()
   52         self.__action.start = _action_start
   53 
   54     def tearDown(self):
   55         """Call after every test case."""
   56         if self.__action_started:
   57             self.__action.stop()
   58         LogCaptureTestCase.tearDown(self)
   59 
   60     def testSubstituteRecursiveTags(self):
   61         aInfo = {
   62             'HOST': "192.0.2.0",
   63             'ABC': "123 <HOST>",
   64             'xyz': "890 <ABC>",
   65         }
   66         # Recursion is bad
   67         self.assertRaises(ValueError,
   68             lambda: substituteRecursiveTags({'A': '<A>'}))
   69         self.assertRaises(ValueError,
   70             lambda: substituteRecursiveTags({'A': '<B>', 'B': '<A>'}))
   71         self.assertRaises(ValueError,
   72             lambda: substituteRecursiveTags({'A': '<B>', 'B': '<C>', 'C': '<A>'}))
   73         # Unresolveable substition
   74         self.assertRaises(ValueError,
   75             lambda: substituteRecursiveTags({'A': 'to=<B> fromip=<IP>', 'C': '<B>', 'B': '<C>', 'D': ''}))
   76         self.assertRaises(ValueError,
   77             lambda: substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP>', 'sweet': '<honeypot>', 'honeypot': '<sweet>', 'ignoreregex': ''}))
   78         # We need here an ordered, because the sequence of iteration is very important for this test
   79         if OrderedDict:
   80             # No cyclic recursion, just multiple replacement of tag <T>, should be successful:
   81             self.assertEqual(substituteRecursiveTags( OrderedDict(
   82                     (('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
   83                 ), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
   84             )
   85             # No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
   86             self.assertEqual(substituteRecursiveTags( OrderedDict(
   87                   (('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
   88                 ), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
   89             )
   90             # No cyclic recursion, just multiple replacement of same tags, should be successful:
   91             self.assertEqual(substituteRecursiveTags( OrderedDict((
   92                     ('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
   93                     ('ipmset', 'f2b-<name>'),
   94                     ('name', 'any'),
   95                     ('bantime', '600'),
   96                     ('ipsetfamily', 'inet'),
   97                     ('iptables', 'iptables <lockingopt>'),
   98                     ('lockingopt', '-w'),
   99                     ('chain', 'INPUT'),
  100                     ('actiontype', '<multiport>'),
  101                     ('multiport', '-p <protocol> -m multiport --dports <port> -m set --match-set <ipmset> src -j <blocktype>'),
  102                     ('protocol', 'tcp'),
  103                     ('port', 'ssh'),
  104                     ('blocktype', 'REJECT',),
  105                 ))
  106                 ), OrderedDict((
  107                     ('actionstart', 'ipset create f2b-any hash:ip timeout 600 family inet\niptables -w -I INPUT -p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
  108                     ('ipmset', 'f2b-any'),
  109                     ('name', 'any'),
  110                     ('bantime', '600'),
  111                     ('ipsetfamily', 'inet'),
  112                     ('iptables', 'iptables -w'),
  113                     ('lockingopt', '-w'),
  114                     ('chain', 'INPUT'),
  115                     ('actiontype', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
  116                     ('multiport', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
  117                     ('protocol', 'tcp'),
  118                     ('port', 'ssh'),
  119                     ('blocktype', 'REJECT')
  120                 ))
  121             )
  122             # Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
  123             self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
  124                     ('A', '<<B><C>>'),
  125                     ('B', 'D'), ('C', 'E'),
  126                     ('DE', 'cycle <A>'),
  127             )) ))
  128             self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
  129                     ('DE', 'cycle <A>'),
  130                     ('A', '<<B><C>>'),
  131                     ('B', 'D'), ('C', 'E'),
  132             )) ))
  133             
  134         # missing tags are ok
  135         self.assertEqual(substituteRecursiveTags({'A': '<C>'}), {'A': '<C>'})
  136         self.assertEqual(substituteRecursiveTags({'A': '<C> <D> <X>','X':'fun'}), {'A': '<C> <D> fun', 'X':'fun'})
  137         self.assertEqual(substituteRecursiveTags({'A': '<C> <B>', 'B': 'cool'}), {'A': '<C> cool', 'B': 'cool'})
  138         # Escaped tags should be ignored
  139         self.assertEqual(substituteRecursiveTags({'A': '<matches> <B>', 'B': 'cool'}), {'A': '<matches> cool', 'B': 'cool'})
  140         # Multiple stuff on same line is ok
  141         self.assertEqual(substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP> evilperson=<honeypot>', 'honeypot': 'pokie', 'ignoreregex': ''}),
  142                                 { 'failregex': "to=pokie fromip=<IP> evilperson=pokie",
  143                                     'honeypot': 'pokie',
  144                                     'ignoreregex': '',
  145                                 })
  146         # rest is just cool
  147         self.assertEqual(substituteRecursiveTags(aInfo),
  148                                 { 'HOST': "192.0.2.0",
  149                                     'ABC': '123 192.0.2.0',
  150                                     'xyz': '890 123 192.0.2.0',
  151                                 })
  152         # obscure embedded case
  153         self.assertEqual(substituteRecursiveTags({'A': '<<PREF>HOST>', 'PREF': 'IPV4'}),
  154                          {'A': '<IPV4HOST>', 'PREF': 'IPV4'})
  155         self.assertEqual(substituteRecursiveTags({'A': '<<PREF>HOST>', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'}),
  156                          {'A': '1.2.3.4', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'})
  157         # more embedded within a string and two interpolations
  158         self.assertEqual(substituteRecursiveTags({'A': 'A <IP<PREF>HOST> B IP<PREF> C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'}),
  159                          {'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'})
  160 
  161     def testSubstRec_DontTouchUnusedCallable(self):
  162         cm = CallingMap({
  163             'A':0,
  164             'B':lambda self: '<A><A>',
  165             'C':'',
  166             'D':''
  167         })
  168         #
  169         # should raise no exceptions:
  170         substituteRecursiveTags(cm)
  171         # add exception tag:
  172         cm['C'] = lambda self,i=0: 5 // int(self['A']) # raise error by access
  173         # test direct get of callable (should raise an error):
  174         self.assertRaises(ZeroDivisionError, lambda: cm['C'])
  175         # should raise no exceptions (tag "C" still unused):
  176         substituteRecursiveTags(cm)
  177         # add reference to "broken" tag:
  178         cm['D'] = 'test=<C>'
  179         # should raise an exception (BOOM by replacement of tag "D" recursive):
  180         self.assertRaises(ZeroDivisionError, lambda: substituteRecursiveTags(cm))
  181         #
  182         # should raise no exceptions:
  183         self.assertEqual(self.__action.replaceTag('test=<A>', cm), "test=0")
  184         # **Important**: recursive replacement of dynamic data from calling map should be prohibited,
  185         # otherwise may be vulnerable on foreign user-input:
  186         self.assertEqual(self.__action.replaceTag('test=<A>--<B>--<A>', cm), "test=0--<A><A>--0")
  187         # should raise an exception (BOOM by replacement of tag "C"):
  188         self.assertRaises(ZeroDivisionError, lambda: self.__action.replaceTag('test=<C>', cm))
  189         # should raise no exceptions (replaces tag "D" only):
  190         self.assertEqual(self.__action.replaceTag('<D>', cm), "test=<C>")
  191 
  192     def testReplaceTag(self):
  193         aInfo = {
  194             'HOST': "192.0.2.0",
  195             'ABC': "123",
  196             'xyz': "890",
  197         }
  198         self.assertEqual(
  199             self.__action.replaceTag("Text<br>text", aInfo),
  200             "Text\ntext")
  201         self.assertEqual(
  202             self.__action.replaceTag("Text <HOST> text", aInfo),
  203             "Text 192.0.2.0 text")
  204         self.assertEqual(
  205             self.__action.replaceTag("Text <xyz> text <ABC> ABC", aInfo),
  206             "Text 890 text 123 ABC")
  207         self.assertEqual(
  208             self.__action.replaceTag("<matches>",
  209                 {'matches': "some >char< should \\< be[ escap}ed&\n"}),
  210             "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\\n")
  211         self.assertEqual(
  212             self.__action.replaceTag("<ipmatches>",
  213                 {'ipmatches': "some >char< should \\< be[ escap}ed&\n"}),
  214             "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\\n")
  215         self.assertEqual(
  216             self.__action.replaceTag("<ipjailmatches>",
  217                 {'ipjailmatches': "some >char< should \\< be[ escap}ed&\r\n"}),
  218             "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\\r\\n")
  219 
  220         # Recursive
  221         aInfo["ABC"] = "<xyz>"
  222         self.assertEqual(
  223             self.__action.replaceTag("Text <xyz> text <ABC> ABC", aInfo),
  224             "Text 890 text 890 ABC")
  225 
  226         # Callable
  227         self.assertEqual(
  228             self.__action.replaceTag("09 <matches> 11",
  229                 CallingMap(matches=lambda self: str(10))),
  230             "09 10 11")
  231 
  232     def testReplaceNoTag(self):
  233         # As tag not present, therefore callable should not be called
  234         # Will raise ValueError if it is
  235         self.assertEqual(
  236             self.__action.replaceTag("abc",
  237                 CallingMap(matches=lambda self: int("a"))), "abc")
  238 
  239     def testReplaceTagSelfRecursion(self):
  240         setattr(self.__action, 'a', "<a")
  241         setattr(self.__action, 'b', "c>")
  242         setattr(self.__action, 'b?family=inet6', "b>")
  243         setattr(self.__action, 'ac', "<a><b>")
  244         setattr(self.__action, 'ab', "<ac>")
  245         setattr(self.__action, 'x?family=inet6', "")
  246         # produce self-referencing properties except:
  247         self.assertRaisesRegexp(ValueError, r"properties contain self referencing definitions",
  248             lambda: self.__action.replaceTag("<a><b>", 
  249                 self.__action._properties, conditional="family=inet4")
  250         )
  251         # remore self-referencing in props:
  252         delattr(self.__action, 'ac')
  253         # produce self-referencing query except:
  254         self.assertRaisesRegexp(ValueError, r"possible self referencing definitions in query",
  255             lambda: self.__action.replaceTag("<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x>>>>>>>>>>>>>>>>>>>>>", 
  256                 self.__action._properties, conditional="family=inet6")
  257         )
  258 
  259     def testReplaceTagConditionalCached(self):
  260         setattr(self.__action, 'abc', "123")
  261         setattr(self.__action, 'abc?family=inet4', "345")
  262         setattr(self.__action, 'abc?family=inet6', "567")
  263         setattr(self.__action, 'xyz', "890-<abc>")
  264         setattr(self.__action, 'banaction', "Text <xyz> text <abc>")
  265         # test replacement in sub tags and direct, conditional, cached:
  266         cache = self.__action._substCache
  267         for i in range(2):
  268             self.assertEqual(
  269                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  270                     conditional="", cache=cache),
  271                 "Text 890-123 text 123 '123'")
  272             self.assertEqual(
  273                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  274                     conditional="family=inet4", cache=cache),
  275                 "Text 890-345 text 345 '345'")
  276             self.assertEqual(
  277                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  278                     conditional="family=inet6", cache=cache),
  279                 "Text 890-567 text 567 '567'")
  280         self.assertTrue(len(cache) >= 3)
  281         # set one parameter - internal properties and cache should be reseted:
  282         setattr(self.__action, 'xyz', "000-<abc>")
  283         self.assertEqual(len(cache), 0)
  284         # test againg, should have 000 instead of 890:
  285         for i in range(2):
  286             self.assertEqual(
  287                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  288                     conditional="", cache=cache),
  289                 "Text 000-123 text 123 '123'")
  290             self.assertEqual(
  291                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  292                     conditional="family=inet4", cache=cache),
  293                 "Text 000-345 text 345 '345'")
  294             self.assertEqual(
  295                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  296                     conditional="family=inet6", cache=cache),
  297                 "Text 000-567 text 567 '567'")
  298         self.assertTrue(len(cache) >= 3)
  299 
  300     @with_tmpdir
  301     def testExecuteActionBan(self, tmp):
  302         tmp += "/fail2ban.test"
  303         self.__action.actionstart = "touch '%s'" % tmp
  304         self.__action.actionrepair = self.__action.actionstart
  305         self.assertEqual(self.__action.actionstart, "touch '%s'" % tmp)
  306         self.__action.actionstop = "rm -f '%s'" % tmp
  307         self.assertEqual(self.__action.actionstop, "rm -f '%s'" % tmp)
  308         self.__action.actionban = "echo -n"
  309         self.assertEqual(self.__action.actionban, 'echo -n')
  310         self.__action.actioncheck = "[ -e '%s' ]" % tmp
  311         self.assertEqual(self.__action.actioncheck, "[ -e '%s' ]" % tmp)
  312         self.__action.actionunban = "true"
  313         self.assertEqual(self.__action.actionunban, 'true')
  314         self.pruneLog()
  315 
  316         self.assertNotLogged('returned')
  317         # no action was actually executed yet
  318 
  319         self.__action.ban({'ip': None})
  320         self.assertLogged('Invariant check failed')
  321         self.assertLogged('returned successfully')
  322         self.__action.stop()
  323         self.assertLogged(self.__action.actionstop)
  324 
  325     def testExecuteActionEmptyUnban(self):
  326         # unban will be executed for actions with banned items only:
  327         self.__action.actionban = ""
  328         self.__action.actionunban = ""
  329         self.__action.actionflush = "echo -n 'flush'"
  330         self.__action.actionstop = "echo -n 'stop'"
  331         self.__action.start();
  332         self.__action.ban({});
  333         self.pruneLog()
  334         self.__action.unban({})
  335         self.assertLogged('Nothing to do', wait=True)
  336         # same as above but with interim flush, so no unban anymore:
  337         self.__action.ban({});
  338         self.pruneLog('[phase 2]')
  339         self.__action.flush()
  340         self.__action.unban({})
  341         self.__action.stop()
  342         self.assertLogged('stop', wait=True)
  343         self.assertNotLogged('Nothing to do')
  344 
  345     @with_tmpdir
  346     def testExecuteActionStartCtags(self, tmp):
  347         tmp += '/fail2ban.test'
  348         self.__action.HOST = "192.0.2.0"
  349         self.__action.actionstart = "touch '%s.<HOST>'" % tmp
  350         self.__action.actionstop = "rm -f '%s.<HOST>'" % tmp
  351         self.__action.actioncheck = "[ -e '%s.192.0.2.0' ]" % tmp
  352         self.__action.start()
  353         self.__action.consistencyCheck()
  354 
  355     @with_tmpdir
  356     def testExecuteActionCheckRestoreEnvironment(self, tmp):
  357         tmp += '/fail2ban.test'
  358         self.__action.actionstart = ""
  359         self.__action.actionstop = "rm -f '%s'" % tmp
  360         self.__action.actionban = "rm '%s'" % tmp
  361         self.__action.actioncheck = "[ -e '%s' ]" % tmp
  362         self.assertRaises(RuntimeError, self.__action.ban, {'ip': None})
  363         self.assertLogged('Invariant check failed', 'Unable to restore environment', all=True)
  364         # 2nd time, try to restore with producing error in stop, but succeeded start hereafter:
  365         self.pruneLog('[phase 2]')
  366         self.__action.actionstart = "touch '%s'" % tmp
  367         self.__action.actionstop = "rm '%s'" % tmp
  368         self.__action.actionban = """printf "%%%%b\n" <ip> >> '%s'""" % tmp
  369         self.__action.actioncheck = "[ -e '%s' ]" % tmp
  370         self.__action.ban({'ip': None})
  371         self.assertLogged('Invariant check failed')
  372         self.assertNotLogged('Unable to restore environment')
  373 
  374     @with_tmpdir
  375     def testExecuteActionCheckRepairEnvironment(self, tmp):
  376         tmp += '/fail2ban.test'
  377         self.__action.actionstart = ""
  378         self.__action.actionstop = ""
  379         self.__action.actionban = "rm '%s'" % tmp
  380         self.__action.actioncheck = "[ -e '%s' ]" % tmp
  381         self.__action.actionrepair = "echo 'repair ...'; touch '%s'" % tmp
  382         # 1st time with success repair:
  383         self.__action.ban({'ip': None})
  384         self.assertLogged("Invariant check failed. Trying", "echo 'repair ...'", all=True)
  385         self.pruneLog()
  386         # 2nd time failed (not really repaired):
  387         self.__action.actionrepair = "echo 'repair ...'"
  388         self.assertRaises(RuntimeError, self.__action.ban, {'ip': None})
  389         self.assertLogged(
  390             "Invariant check failed. Trying", 
  391             "echo 'repair ...'", 
  392             "Unable to restore environment", all=True)
  393 
  394     def testExecuteActionChangeCtags(self):
  395         self.assertRaises(AttributeError, getattr, self.__action, "ROST")
  396         self.__action.ROST = "192.0.2.0"
  397         self.assertEqual(self.__action.ROST,"192.0.2.0")
  398 
  399     def testExecuteActionUnbanAinfo(self):
  400         aInfo = CallingMap({
  401             'ABC': "123",
  402             'ip': '192.0.2.1',
  403             'F-*': lambda self: {
  404             'fid': 111,
  405             'fport': 222,
  406                 'user': "tester"
  407             }
  408         })
  409         self.__action.actionban = "echo '<ABC>, failure <F-ID> of <F-USER> -<F-TEST>- from <ip>:<F-PORT>'"
  410         self.__action.actionunban = "echo '<ABC>, user <F-USER> unbanned'"
  411         self.__action.ban(aInfo)
  412         self.__action.unban(aInfo)
  413         self.assertLogged(
  414             " -- stdout: '123, failure 111 of tester -- from 192.0.2.1:222'",
  415             " -- stdout: '123, user tester unbanned'",
  416             all=True
  417         )
  418 
  419     def testExecuteActionStartEmpty(self):
  420         self.__action.actionstart = ""
  421         self.__action.start()
  422         self.assertTrue(self.__action.executeCmd(""))
  423         self.assertLogged('Nothing to do')
  424         self.pruneLog()
  425         self.assertTrue(self.__action._processCmd(""))
  426         self.assertLogged('Nothing to do')
  427         self.pruneLog()
  428 
  429     def testExecuteWithVars(self):
  430         self.assertTrue(self.__action.executeCmd(
  431             r'''printf %b "foreign input:\n'''
  432             r''' -- $f2bV_A --\n'''
  433             r''' -- $f2bV_B --\n'''
  434             r''' -- $(echo -n $f2bV_C) --''' # echo just replaces \n to test it as single line
  435             r'''"''', 
  436             varsDict={
  437             'f2bV_A': 'I\'m a hacker; && $(echo $f2bV_B)', 
  438             'f2bV_B': 'I"m very bad hacker', 
  439             'f2bV_C': '`Very | very\n$(bad & worst hacker)`'
  440         }))
  441         self.assertLogged(r"""foreign input:""",
  442             ' -- I\'m a hacker; && $(echo $f2bV_B) --',
  443             ' -- I"m very bad hacker --',
  444             ' -- `Very | very $(bad & worst hacker)` --', all=True)
  445 
  446     def testExecuteReplaceEscapeWithVars(self):
  447         self.__action.actionban = 'echo "** ban <ip>, reason: <reason> ...\\n<matches>"'
  448         self.__action.actionunban = 'echo "** unban <ip>"'
  449         self.__action.actionstop = 'echo "** stop monitoring"'
  450         matches = [
  451             '<actionunban>',
  452             '" Hooray! #',
  453             '`I\'m cool script kiddy',
  454             '`I`m very cool > /here-is-the-path/to/bin/.x-attempt.sh',
  455             '<actionstop>',
  456         ]
  457         aInfo = {
  458             'ip': '192.0.2.1',
  459             'reason': 'hacking attempt ( he thought he knows how f2b internally works ;)',
  460             'matches': '\n'.join(matches)
  461         }
  462         self.pruneLog()
  463         self.__action.ban(aInfo)
  464         self.assertLogged(
  465             '** ban %s' % aInfo['ip'], aInfo['reason'], *matches, all=True)
  466         self.assertNotLogged(
  467             '** unban %s' % aInfo['ip'], '** stop monitoring', all=True)
  468         self.pruneLog()
  469         self.__action.unban(aInfo)
  470         self.__action.stop()
  471         self.assertLogged(
  472             '** unban %s' % aInfo['ip'], '** stop monitoring', all=True)
  473 
  474     def testExecuteIncorrectCmd(self):
  475         CommandAction.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null')
  476         self.assertLogged('HINT on 127: "Command not found"')
  477 
  478     def testExecuteTimeout(self):
  479         stime = time.time()
  480         timeout = 1 if not unittest.F2B.fast else 0.01
  481         # Should take a 30 seconds (so timeout will occur)
  482         self.assertFalse(CommandAction.executeCmd('sleep 30', timeout=timeout))
  483         # give a test still 1 second, because system could be too busy
  484         self.assertTrue(time.time() >= stime + timeout and time.time() <= stime + timeout + 1)
  485         self.assertLogged('sleep 30', ' -- timed out after', all=True)
  486         self.assertLogged(' -- killed with SIGTERM', 
  487                           ' -- killed with SIGKILL')
  488 
  489     def testExecuteTimeoutWithNastyChildren(self):
  490         # temporary file for a nasty kid shell script
  491         tmpFilename = tempfile.mktemp(".sh", "fail2ban_")
  492         # Create a nasty script which would hang there for a while
  493         with open(tmpFilename, 'w') as f:
  494             f.write("""#!/bin/bash
  495         trap : HUP EXIT TERM
  496 
  497         echo "$$" > %s.pid
  498         echo "my pid $$ . sleeping lo-o-o-ong"
  499         sleep 30
  500         """ % tmpFilename)
  501         stime = 0
  502 
  503         # timeout as long as pid-file was not created, but max 5 seconds
  504         def getnasty_tout():
  505             return (
  506                 getnastypid() is not None
  507                 or time.time() - stime > 5
  508             )
  509 
  510         def getnastypid():
  511             cpid = None
  512             if os.path.isfile(tmpFilename + '.pid'):
  513                 with open(tmpFilename + '.pid') as f:
  514                     try:
  515                         cpid = int(f.read())
  516                     except ValueError:
  517                         pass
  518             return cpid
  519 
  520         # First test if can kill the bastard
  521         stime = time.time()
  522         self.assertFalse(CommandAction.executeCmd(
  523             'bash %s' % tmpFilename, timeout=getnasty_tout))
  524         # Wait up to 3 seconds, the child got killed
  525         cpid = getnastypid()
  526         # Verify that the process itself got killed
  527         self.assertTrue(Utils.wait_for(lambda: not pid_exists(cpid), 3))  # process should have been killed
  528         self.assertLogged('my pid ', 'Resource temporarily unavailable')
  529         self.assertLogged('timed out')
  530         self.assertLogged('killed with SIGTERM', 
  531                           'killed with SIGKILL')
  532         os.unlink(tmpFilename + '.pid')
  533 
  534         # A bit evolved case even though, previous test already tests killing children processes
  535         stime = time.time()
  536         self.assertFalse(CommandAction.executeCmd(
  537             'out=`bash %s`; echo ALRIGHT' % tmpFilename, timeout=getnasty_tout))
  538         # Wait up to 3 seconds, the child got killed
  539         cpid = getnastypid()
  540         # Verify that the process itself got killed
  541         self.assertTrue(Utils.wait_for(lambda: not pid_exists(cpid), 3))
  542         self.assertLogged('my pid ', 'Resource temporarily unavailable')
  543         self.assertLogged(' -- timed out')
  544         self.assertLogged(' -- killed with SIGTERM', 
  545                           ' -- killed with SIGKILL')
  546         os.unlink(tmpFilename)
  547         os.unlink(tmpFilename + '.pid')
  548 
  549 
  550     def testCaptureStdOutErr(self):
  551         CommandAction.executeCmd('echo "How now brown cow"')
  552         self.assertLogged("stdout: 'How now brown cow'\n")
  553         CommandAction.executeCmd(
  554             'echo "The rain in Spain stays mainly in the plain" 1>&2')
  555         self.assertLogged(
  556             "stderr: 'The rain in Spain stays mainly in the plain'\n")
  557 
  558     def testCallingMap(self):
  559         mymap = CallingMap(callme=lambda self: str(10), error=lambda self: int('a'),
  560             dontcallme= "string", number=17)
  561 
  562         # Should work fine
  563         self.assertEqual(
  564             "%(callme)s okay %(dontcallme)s %(number)i" % mymap,
  565             "10 okay string 17")
  566         # Error will now trip, demonstrating delayed call
  567         self.assertRaises(ValueError, lambda x: "%(error)i" % x, mymap)
  568 
  569     def testCallingMapModify(self):
  570         m = CallingMap({
  571             'a': lambda self: 2 + 3,
  572             'b': lambda self: self['a'] + 6,
  573             'c': 'test',
  574         })
  575         # test reset (without modifications):
  576         m.reset()
  577         # do modifications:
  578         m['a'] = 4
  579         del m['c']
  580         # test set and delete:
  581         self.assertEqual(len(m), 2)
  582         self.assertNotIn('c', m)
  583         self.assertEqual((m['a'], m['b']), (4, 10))
  584         # reset to original and test again:
  585         m.reset()
  586         s = repr(m)
  587         self.assertEqual(len(m), 3)
  588         self.assertIn('c', m)
  589         self.assertEqual((m['a'], m['b'], m['c']), (5, 11, 'test'))
  590         # immutability of copy:
  591         m['d'] = 'dddd'
  592         m2 = m.copy()
  593         m2['c'] = lambda self: self['a'] + 7
  594         m2['a'] = 1
  595         del m2['b']
  596         del m2['d']
  597         self.assertTrue('b' in m)
  598         self.assertTrue('d' in m)
  599         self.assertFalse('b' in m2)
  600         self.assertFalse('d' in m2)
  601         self.assertEqual((m['a'], m['b'], m['c'], m['d']), (5, 11, 'test', 'dddd'))
  602         self.assertEqual((m2['a'], m2['c']), (1, 8))
  603 
  604     def testCallingMapRep(self):
  605         m = CallingMap({
  606             'a': lambda self: 2 + 3,
  607             'b': lambda self: self['a'] + 6,
  608             'c': ''
  609         })
  610         s = repr(m); # only stored values (no calculated)
  611         self.assertNotIn("'a': ", s)
  612         self.assertNotIn("'b': ", s)
  613         self.assertIn("'c': ''", s)
  614 
  615         s = m._asrepr(True) # all values (including calculated)
  616         self.assertIn("'a': 5", s)
  617         self.assertIn("'b': 11", s)
  618         self.assertIn("'c': ''", s)
  619         
  620         m['c'] = lambda self: self['xxx'] + 7; # unresolvable
  621         s = m._asrepr(True)
  622         self.assertIn("'a': 5", s)
  623         self.assertIn("'b': 11", s)
  624         self.assertIn("'c': ", s) # presents as callable
  625         self.assertNotIn("'c': ''", s) # but not empty
  626 
  627     def testActionsIdleMode(self):
  628         a = Actions(DummyJail())
  629         a.sleeptime = 0.0001;   # don't need to wait long
  630         # enter idle mode right now (start idle):
  631         a.idle = True;
  632         # start:
  633         a.start()
  634         # wait for enter/leave of idle mode:
  635         self.assertLogged("Actions: enter idle mode", wait=10)
  636         # leave idle mode:
  637         a.idle = False
  638         self.assertLogged("Actions: leave idle mode", wait=10)
  639         # stop it:
  640         a.active = False
  641         a.join()