"Fossies" - the Fresh Open Source Software Archive

Member "fail2ban-0.10.4/fail2ban/tests/actiontestcase.py" (4 Oct 2018, 22975 Bytes) of package /linux/misc/fail2ban-0.10.4.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "actiontestcase.py": 0.10.3.1_vs_0.10.4.

    1 # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
    2 # vi: set ft=python sts=4 ts=4 sw=4 noet :
    3 
    4 # This file is part of Fail2Ban.
    5 #
    6 # Fail2Ban is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # Fail2Ban is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with Fail2Ban; if not, write to the Free Software
   18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   19 
   20 # Author: Cyril Jaquier
   21 # 
   22 
   23 __author__ = "Cyril Jaquier"
   24 __copyright__ = "Copyright (c) 2004 Cyril Jaquier"
   25 __license__ = "GPL"
   26 
   27 import os
   28 import tempfile
   29 import time
   30 import unittest
   31 
   32 from ..server.action import CommandAction, CallingMap, substituteRecursiveTags
   33 from ..server.actions import OrderedDict, Actions
   34 from ..server.utils import Utils
   35 
   36 from .dummyjail import DummyJail
   37 from .utils import LogCaptureTestCase
   38 from .utils import pid_exists
   39 
   40 class CommandActionTest(LogCaptureTestCase):
   41 
   42     def setUp(self):
   43         """Call before every test case."""
   44         LogCaptureTestCase.setUp(self)
   45         self.__action = CommandAction(None, "Test")
   46         # prevent execute stop if start fails (or event not started at all):
   47         self.__action_started = False
   48         orgstart = self.__action.start
   49         def _action_start():
   50             self.__action_started = True
   51             return orgstart()
   52         self.__action.start = _action_start
   53 
   54     def tearDown(self):
   55         """Call after every test case."""
   56         if self.__action_started:
   57             self.__action.stop()
   58         LogCaptureTestCase.tearDown(self)
   59 
   60     def testSubstituteRecursiveTags(self):
   61         aInfo = {
   62             'HOST': "192.0.2.0",
   63             'ABC': "123 <HOST>",
   64             'xyz': "890 <ABC>",
   65         }
   66         # Recursion is bad
   67         self.assertRaises(ValueError,
   68             lambda: substituteRecursiveTags({'A': '<A>'}))
   69         self.assertRaises(ValueError,
   70             lambda: substituteRecursiveTags({'A': '<B>', 'B': '<A>'}))
   71         self.assertRaises(ValueError,
   72             lambda: substituteRecursiveTags({'A': '<B>', 'B': '<C>', 'C': '<A>'}))
   73         # Unresolveable substition
   74         self.assertRaises(ValueError,
   75             lambda: substituteRecursiveTags({'A': 'to=<B> fromip=<IP>', 'C': '<B>', 'B': '<C>', 'D': ''}))
   76         self.assertRaises(ValueError,
   77             lambda: substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP>', 'sweet': '<honeypot>', 'honeypot': '<sweet>', 'ignoreregex': ''}))
   78         # We need here an ordered, because the sequence of iteration is very important for this test
   79         if OrderedDict:
   80             # No cyclic recursion, just multiple replacement of tag <T>, should be successful:
   81             self.assertEqual(substituteRecursiveTags( OrderedDict(
   82                     (('X', 'x=x<T>'), ('T', '1'), ('Z', '<X> <T> <Y>'), ('Y', 'y=y<T>')))
   83                 ), {'X': 'x=x1', 'T': '1', 'Y': 'y=y1', 'Z': 'x=x1 1 y=y1'}
   84             )
   85             # No cyclic recursion, just multiple replacement of tag <T> in composite tags, should be successful:
   86             self.assertEqual(substituteRecursiveTags( OrderedDict(
   87                   (('X', 'x=x<T> <Z> <<R1>> <<R2>>'), ('R1', 'Z'), ('R2', 'Y'), ('T', '1'), ('Z', '<T> <Y>'), ('Y', 'y=y<T>')))
   88                 ), {'X': 'x=x1 1 y=y1 1 y=y1 y=y1', 'R1': 'Z', 'R2': 'Y', 'T': '1', 'Z': '1 y=y1', 'Y': 'y=y1'}
   89             )
   90             # No cyclic recursion, just multiple replacement of same tags, should be successful:
   91             self.assertEqual(substituteRecursiveTags( OrderedDict((
   92                     ('actionstart', 'ipset create <ipmset> hash:ip timeout <bantime> family <ipsetfamily>\n<iptables> -I <chain> <actiontype>'),
   93                     ('ipmset', 'f2b-<name>'),
   94                     ('name', 'any'),
   95                     ('bantime', '600'),
   96                     ('ipsetfamily', 'inet'),
   97                     ('iptables', 'iptables <lockingopt>'),
   98                     ('lockingopt', '-w'),
   99                     ('chain', 'INPUT'),
  100                     ('actiontype', '<multiport>'),
  101                     ('multiport', '-p <protocol> -m multiport --dports <port> -m set --match-set <ipmset> src -j <blocktype>'),
  102                     ('protocol', 'tcp'),
  103                     ('port', 'ssh'),
  104                     ('blocktype', 'REJECT',),
  105                 ))
  106                 ), OrderedDict((
  107                     ('actionstart', 'ipset create f2b-any hash:ip timeout 600 family inet\niptables -w -I INPUT -p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
  108                     ('ipmset', 'f2b-any'),
  109                     ('name', 'any'),
  110                     ('bantime', '600'),
  111                     ('ipsetfamily', 'inet'),
  112                     ('iptables', 'iptables -w'),
  113                     ('lockingopt', '-w'),
  114                     ('chain', 'INPUT'),
  115                     ('actiontype', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
  116                     ('multiport', '-p tcp -m multiport --dports ssh -m set --match-set f2b-any src -j REJECT'),
  117                     ('protocol', 'tcp'),
  118                     ('port', 'ssh'),
  119                     ('blocktype', 'REJECT')
  120                 ))
  121             )
  122             # Cyclic recursion by composite tag creation, tags "create" another tag, that closes cycle:
  123             self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
  124                     ('A', '<<B><C>>'),
  125                     ('B', 'D'), ('C', 'E'),
  126                     ('DE', 'cycle <A>'),
  127             )) ))
  128             self.assertRaises(ValueError, lambda: substituteRecursiveTags( OrderedDict((
  129                     ('DE', 'cycle <A>'),
  130                     ('A', '<<B><C>>'),
  131                     ('B', 'D'), ('C', 'E'),
  132             )) ))
  133             
  134         # missing tags are ok
  135         self.assertEqual(substituteRecursiveTags({'A': '<C>'}), {'A': '<C>'})
  136         self.assertEqual(substituteRecursiveTags({'A': '<C> <D> <X>','X':'fun'}), {'A': '<C> <D> fun', 'X':'fun'})
  137         self.assertEqual(substituteRecursiveTags({'A': '<C> <B>', 'B': 'cool'}), {'A': '<C> cool', 'B': 'cool'})
  138         # Escaped tags should be ignored
  139         self.assertEqual(substituteRecursiveTags({'A': '<matches> <B>', 'B': 'cool'}), {'A': '<matches> cool', 'B': 'cool'})
  140         # Multiple stuff on same line is ok
  141         self.assertEqual(substituteRecursiveTags({'failregex': 'to=<honeypot> fromip=<IP> evilperson=<honeypot>', 'honeypot': 'pokie', 'ignoreregex': ''}),
  142                                 { 'failregex': "to=pokie fromip=<IP> evilperson=pokie",
  143                                     'honeypot': 'pokie',
  144                                     'ignoreregex': '',
  145                                 })
  146         # rest is just cool
  147         self.assertEqual(substituteRecursiveTags(aInfo),
  148                                 { 'HOST': "192.0.2.0",
  149                                     'ABC': '123 192.0.2.0',
  150                                     'xyz': '890 123 192.0.2.0',
  151                                 })
  152         # obscure embedded case
  153         self.assertEqual(substituteRecursiveTags({'A': '<<PREF>HOST>', 'PREF': 'IPV4'}),
  154                          {'A': '<IPV4HOST>', 'PREF': 'IPV4'})
  155         self.assertEqual(substituteRecursiveTags({'A': '<<PREF>HOST>', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'}),
  156                          {'A': '1.2.3.4', 'PREF': 'IPV4', 'IPV4HOST': '1.2.3.4'})
  157         # more embedded within a string and two interpolations
  158         self.assertEqual(substituteRecursiveTags({'A': 'A <IP<PREF>HOST> B IP<PREF> C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'}),
  159                          {'A': 'A 1.2.3.4 B IPV4 C', 'PREF': 'V4', 'IPV4HOST': '1.2.3.4'})
  160 
  161     def testSubstRec_DontTouchUnusedCallable(self):
  162         cm = CallingMap({
  163             'A':0,
  164             'B':lambda self: '<A><A>',
  165             'C':'',
  166             'D':''
  167         })
  168         #
  169         # should raise no exceptions:
  170         substituteRecursiveTags(cm)
  171         # add exception tag:
  172         cm['C'] = lambda self,i=0: 5 // int(self['A']) # raise error by access
  173         # test direct get of callable (should raise an error):
  174         self.assertRaises(ZeroDivisionError, lambda: cm['C'])
  175         # should raise no exceptions (tag "C" still unused):
  176         substituteRecursiveTags(cm)
  177         # add reference to "broken" tag:
  178         cm['D'] = 'test=<C>'
  179         # should raise an exception (BOOM by replacement of tag "D" recursive):
  180         self.assertRaises(ZeroDivisionError, lambda: substituteRecursiveTags(cm))
  181         #
  182         # should raise no exceptions:
  183         self.assertEqual(self.__action.replaceTag('test=<A>', cm), "test=0")
  184         # **Important**: recursive replacement of dynamic data from calling map should be prohibited,
  185         # otherwise may be vulnerable on foreign user-input:
  186         self.assertEqual(self.__action.replaceTag('test=<A>--<B>--<A>', cm), "test=0--<A><A>--0")
  187         # should raise an exception (BOOM by replacement of tag "C"):
  188         self.assertRaises(ZeroDivisionError, lambda: self.__action.replaceTag('test=<C>', cm))
  189         # should raise no exceptions (replaces tag "D" only):
  190         self.assertEqual(self.__action.replaceTag('<D>', cm), "test=<C>")
  191 
  192     def testReplaceTag(self):
  193         aInfo = {
  194             'HOST': "192.0.2.0",
  195             'ABC': "123",
  196             'xyz': "890",
  197         }
  198         self.assertEqual(
  199             self.__action.replaceTag("Text<br>text", aInfo),
  200             "Text\ntext")
  201         self.assertEqual(
  202             self.__action.replaceTag("Text <HOST> text", aInfo),
  203             "Text 192.0.2.0 text")
  204         self.assertEqual(
  205             self.__action.replaceTag("Text <xyz> text <ABC> ABC", aInfo),
  206             "Text 890 text 123 ABC")
  207         self.assertEqual(
  208             self.__action.replaceTag("<matches>",
  209                 {'matches': "some >char< should \< be[ escap}ed&\n"}),
  210             "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n")
  211         self.assertEqual(
  212             self.__action.replaceTag("<ipmatches>",
  213                 {'ipmatches': "some >char< should \< be[ escap}ed&\n"}),
  214             "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n")
  215         self.assertEqual(
  216             self.__action.replaceTag("<ipjailmatches>",
  217                 {'ipjailmatches': "some >char< should \< be[ escap}ed&\n"}),
  218             "some \\>char\\< should \\\\\\< be\\[ escap\\}ed\\&\n")
  219 
  220         # Recursive
  221         aInfo["ABC"] = "<xyz>"
  222         self.assertEqual(
  223             self.__action.replaceTag("Text <xyz> text <ABC> ABC", aInfo),
  224             "Text 890 text 890 ABC")
  225 
  226         # Callable
  227         self.assertEqual(
  228             self.__action.replaceTag("09 <matches> 11",
  229                 CallingMap(matches=lambda self: str(10))),
  230             "09 10 11")
  231 
  232     def testReplaceNoTag(self):
  233         # As tag not present, therefore callable should not be called
  234         # Will raise ValueError if it is
  235         self.assertEqual(
  236             self.__action.replaceTag("abc",
  237                 CallingMap(matches=lambda self: int("a"))), "abc")
  238 
  239     def testReplaceTagSelfRecursion(self):
  240         setattr(self.__action, 'a', "<a")
  241         setattr(self.__action, 'b', "c>")
  242         setattr(self.__action, 'b?family=inet6', "b>")
  243         setattr(self.__action, 'ac', "<a><b>")
  244         setattr(self.__action, 'ab', "<ac>")
  245         setattr(self.__action, 'x?family=inet6', "")
  246         # produce self-referencing properties except:
  247         self.assertRaisesRegexp(ValueError, r"properties contain self referencing definitions",
  248             lambda: self.__action.replaceTag("<a><b>", 
  249                 self.__action._properties, conditional="family=inet4")
  250         )
  251         # remore self-referencing in props:
  252         delattr(self.__action, 'ac')
  253         # produce self-referencing query except:
  254         self.assertRaisesRegexp(ValueError, r"possible self referencing definitions in query",
  255             lambda: self.__action.replaceTag("<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x<x>>>>>>>>>>>>>>>>>>>>>", 
  256                 self.__action._properties, conditional="family=inet6")
  257         )
  258 
  259     def testReplaceTagConditionalCached(self):
  260         setattr(self.__action, 'abc', "123")
  261         setattr(self.__action, 'abc?family=inet4', "345")
  262         setattr(self.__action, 'abc?family=inet6', "567")
  263         setattr(self.__action, 'xyz', "890-<abc>")
  264         setattr(self.__action, 'banaction', "Text <xyz> text <abc>")
  265         # test replacement in sub tags and direct, conditional, cached:
  266         cache = self.__action._substCache
  267         for i in range(2):
  268             self.assertEqual(
  269                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  270                     conditional="", cache=cache),
  271                 "Text 890-123 text 123 '123'")
  272             self.assertEqual(
  273                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  274                     conditional="family=inet4", cache=cache),
  275                 "Text 890-345 text 345 '345'")
  276             self.assertEqual(
  277                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  278                     conditional="family=inet6", cache=cache),
  279                 "Text 890-567 text 567 '567'")
  280         self.assertTrue(len(cache) >= 3)
  281         # set one parameter - internal properties and cache should be reseted:
  282         setattr(self.__action, 'xyz', "000-<abc>")
  283         self.assertEqual(len(cache), 0)
  284         # test againg, should have 000 instead of 890:
  285         for i in range(2):
  286             self.assertEqual(
  287                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  288                     conditional="", cache=cache),
  289                 "Text 000-123 text 123 '123'")
  290             self.assertEqual(
  291                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  292                     conditional="family=inet4", cache=cache),
  293                 "Text 000-345 text 345 '345'")
  294             self.assertEqual(
  295                 self.__action.replaceTag("<banaction> '<abc>'", self.__action._properties, 
  296                     conditional="family=inet6", cache=cache),
  297                 "Text 000-567 text 567 '567'")
  298         self.assertTrue(len(cache) >= 3)
  299 
  300 
  301     def testExecuteActionBan(self):
  302         self.__action.actionstart = "touch /tmp/fail2ban.test"
  303         self.assertEqual(self.__action.actionstart, "touch /tmp/fail2ban.test")
  304         self.__action.actionstop = "rm -f /tmp/fail2ban.test"
  305         self.assertEqual(self.__action.actionstop, 'rm -f /tmp/fail2ban.test')
  306         self.__action.actionban = "echo -n"
  307         self.assertEqual(self.__action.actionban, 'echo -n')
  308         self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
  309         self.assertEqual(self.__action.actioncheck, '[ -e /tmp/fail2ban.test ]')
  310         self.__action.actionunban = "true"
  311         self.assertEqual(self.__action.actionunban, 'true')
  312 
  313         self.assertNotLogged('returned')
  314         # no action was actually executed yet
  315 
  316         self.__action.ban({'ip': None})
  317         self.assertLogged('Invariant check failed')
  318         self.assertLogged('returned successfully')
  319 
  320     def testExecuteActionEmptyUnban(self):
  321         self.__action.actionunban = ""
  322         self.__action.unban({})
  323         self.assertLogged('Nothing to do')
  324 
  325     def testExecuteActionStartCtags(self):
  326         self.__action.HOST = "192.0.2.0"
  327         self.__action.actionstart = "touch /tmp/fail2ban.test.<HOST>"
  328         self.__action.actionstop = "rm -f /tmp/fail2ban.test.<HOST>"
  329         self.__action.actioncheck = "[ -e /tmp/fail2ban.test.192.0.2.0 ]"
  330         self.__action.start()
  331 
  332     def testExecuteActionCheckRestoreEnvironment(self):
  333         self.__action.actionstart = ""
  334         self.__action.actionstop = "rm -f /tmp/fail2ban.test"
  335         self.__action.actionban = "rm /tmp/fail2ban.test"
  336         self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
  337         self.assertRaises(RuntimeError, self.__action.ban, {'ip': None})
  338         self.assertLogged('Invariant check failed', 'Unable to restore environment', all=True)
  339         # 2nd time, try to restore with producing error in stop, but succeeded start hereafter:
  340         self.pruneLog('[phase 2]')
  341         self.__action.actionstart = "touch /tmp/fail2ban.test"
  342         self.__action.actionstop = "rm /tmp/fail2ban.test"
  343         self.__action.actionban = 'printf "%%b\n" <ip> >> /tmp/fail2ban.test'
  344         self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
  345         self.__action.ban({'ip': None})
  346         self.assertLogged('Invariant check failed')
  347         self.assertNotLogged('Unable to restore environment')
  348 
  349     def testExecuteActionCheckRepairEnvironment(self):
  350         self.__action.actionstart = ""
  351         self.__action.actionstop = ""
  352         self.__action.actionban = "rm /tmp/fail2ban.test"
  353         self.__action.actioncheck = "[ -e /tmp/fail2ban.test ]"
  354         self.__action.actionrepair = "echo 'repair ...'; touch /tmp/fail2ban.test"
  355         # 1st time with success repair:
  356         self.__action.ban({'ip': None})
  357         self.assertLogged("Invariant check failed. Trying", "echo 'repair ...'", all=True)
  358         self.pruneLog()
  359         # 2nd time failed (not really repaired):
  360         self.__action.actionrepair = "echo 'repair ...'"
  361         self.assertRaises(RuntimeError, self.__action.ban, {'ip': None})
  362         self.assertLogged(
  363             "Invariant check failed. Trying", 
  364             "echo 'repair ...'", 
  365             "Unable to restore environment", all=True)
  366 
  367     def testExecuteActionChangeCtags(self):
  368         self.assertRaises(AttributeError, getattr, self.__action, "ROST")
  369         self.__action.ROST = "192.0.2.0"
  370         self.assertEqual(self.__action.ROST,"192.0.2.0")
  371 
  372     def testExecuteActionUnbanAinfo(self):
  373         aInfo = CallingMap({
  374             'ABC': "123",
  375             'ip': '192.0.2.1',
  376             'F-*': lambda self: {
  377             'fid': 111,
  378             'fport': 222,
  379                 'user': "tester"
  380             }
  381         })
  382         self.__action.actionban = "touch /tmp/fail2ban.test.123; echo 'failure <F-ID> of <F-USER> -<F-TEST>- from <ip>:<F-PORT>'"
  383         self.__action.actionunban = "rm /tmp/fail2ban.test.<ABC>; echo 'user <F-USER> unbanned'"
  384         self.__action.ban(aInfo)
  385         self.__action.unban(aInfo)
  386         self.assertLogged(
  387             " -- stdout: 'failure 111 of tester -- from 192.0.2.1:222'",
  388             " -- stdout: 'user tester unbanned'",
  389             all=True
  390         )
  391 
  392     def testExecuteActionStartEmpty(self):
  393         self.__action.actionstart = ""
  394         self.__action.start()
  395         self.assertTrue(self.__action.executeCmd(""))
  396         self.assertLogged('Nothing to do')
  397         self.pruneLog()
  398         self.assertTrue(self.__action._processCmd(""))
  399         self.assertLogged('Nothing to do')
  400         self.pruneLog()
  401 
  402     def testExecuteWithVars(self):
  403         self.assertTrue(self.__action.executeCmd(
  404             r'''printf %b "foreign input:\n'''
  405             r''' -- $f2bV_A --\n'''
  406             r''' -- $f2bV_B --\n'''
  407             r''' -- $(echo -n $f2bV_C) --''' # echo just replaces \n to test it as single line
  408             r'''"''', 
  409             varsDict={
  410             'f2bV_A': 'I\'m a hacker; && $(echo $f2bV_B)', 
  411             'f2bV_B': 'I"m very bad hacker', 
  412             'f2bV_C': '`Very | very\n$(bad & worst hacker)`'
  413         }))
  414         self.assertLogged(r"""foreign input:""",
  415             ' -- I\'m a hacker; && $(echo $f2bV_B) --',
  416             ' -- I"m very bad hacker --',
  417             ' -- `Very | very $(bad & worst hacker)` --', all=True)
  418 
  419     def testExecuteReplaceEscapeWithVars(self):
  420         self.__action.actionban = 'echo "** ban <ip>, reason: <reason> ...\\n<matches>"'
  421         self.__action.actionunban = 'echo "** unban <ip>"'
  422         self.__action.actionstop = 'echo "** stop monitoring"'
  423         matches = [
  424             '<actionunban>',
  425             '" Hooray! #',
  426             '`I\'m cool script kiddy',
  427             '`I`m very cool > /here-is-the-path/to/bin/.x-attempt.sh',
  428             '<actionstop>',
  429         ]
  430         aInfo = {
  431             'ip': '192.0.2.1',
  432             'reason': 'hacking attempt ( he thought he knows how f2b internally works ;)',
  433             'matches': '\n'.join(matches)
  434         }
  435         self.pruneLog()
  436         self.__action.ban(aInfo)
  437         self.assertLogged(
  438             '** ban %s' % aInfo['ip'], aInfo['reason'], *matches, all=True)
  439         self.assertNotLogged(
  440             '** unban %s' % aInfo['ip'], '** stop monitoring', all=True)
  441         self.pruneLog()
  442         self.__action.unban(aInfo)
  443         self.__action.stop()
  444         self.assertLogged(
  445             '** unban %s' % aInfo['ip'], '** stop monitoring', all=True)
  446 
  447     def testExecuteIncorrectCmd(self):
  448         CommandAction.executeCmd('/bin/ls >/dev/null\nbogusXXX now 2>/dev/null')
  449         self.assertLogged('HINT on 127: "Command not found"')
  450 
  451     def testExecuteTimeout(self):
  452         stime = time.time()
  453         timeout = 1 if not unittest.F2B.fast else 0.01
  454         # Should take a 30 seconds (so timeout will occur)
  455         self.assertFalse(CommandAction.executeCmd('sleep 30', timeout=timeout))
  456         # give a test still 1 second, because system could be too busy
  457         self.assertTrue(time.time() >= stime + timeout and time.time() <= stime + timeout + 1)
  458         self.assertLogged('sleep 30', ' -- timed out after', all=True)
  459         self.assertLogged(' -- killed with SIGTERM', 
  460                           ' -- killed with SIGKILL')
  461 
  462     def testExecuteTimeoutWithNastyChildren(self):
  463         # temporary file for a nasty kid shell script
  464         tmpFilename = tempfile.mktemp(".sh", "fail2ban_")
  465         # Create a nasty script which would hang there for a while
  466         with open(tmpFilename, 'w') as f:
  467             f.write("""#!/bin/bash
  468         trap : HUP EXIT TERM
  469 
  470         echo "$$" > %s.pid
  471         echo "my pid $$ . sleeping lo-o-o-ong"
  472         sleep 30
  473         """ % tmpFilename)
  474         stime = 0
  475 
  476         # timeout as long as pid-file was not created, but max 5 seconds
  477         def getnasty_tout():
  478             return (
  479                 getnastypid() is not None
  480                 or time.time() - stime > 5
  481             )
  482 
  483         def getnastypid():
  484             cpid = None
  485             if os.path.isfile(tmpFilename + '.pid'):
  486                 with open(tmpFilename + '.pid') as f:
  487                     try:
  488                         cpid = int(f.read())
  489                     except ValueError:
  490                         pass
  491             return cpid
  492 
  493         # First test if can kill the bastard
  494         stime = time.time()
  495         self.assertFalse(CommandAction.executeCmd(
  496             'bash %s' % tmpFilename, timeout=getnasty_tout))
  497         # Wait up to 3 seconds, the child got killed
  498         cpid = getnastypid()
  499         # Verify that the process itself got killed
  500         self.assertTrue(Utils.wait_for(lambda: not pid_exists(cpid), 3))  # process should have been killed
  501         self.assertLogged('my pid ', 'Resource temporarily unavailable')
  502         self.assertLogged('timed out')
  503         self.assertLogged('killed with SIGTERM', 
  504                           'killed with SIGKILL')
  505         os.unlink(tmpFilename + '.pid')
  506 
  507         # A bit evolved case even though, previous test already tests killing children processes
  508         stime = time.time()
  509         self.assertFalse(CommandAction.executeCmd(
  510             'out=`bash %s`; echo ALRIGHT' % tmpFilename, timeout=getnasty_tout))
  511         # Wait up to 3 seconds, the child got killed
  512         cpid = getnastypid()
  513         # Verify that the process itself got killed
  514         self.assertTrue(Utils.wait_for(lambda: not pid_exists(cpid), 3))
  515         self.assertLogged('my pid ', 'Resource temporarily unavailable')
  516         self.assertLogged(' -- timed out')
  517         self.assertLogged(' -- killed with SIGTERM', 
  518                           ' -- killed with SIGKILL')
  519         os.unlink(tmpFilename)
  520         os.unlink(tmpFilename + '.pid')
  521 
  522 
  523     def testCaptureStdOutErr(self):
  524         CommandAction.executeCmd('echo "How now brown cow"')
  525         self.assertLogged("stdout: 'How now brown cow'\n")
  526         CommandAction.executeCmd(
  527             'echo "The rain in Spain stays mainly in the plain" 1>&2')
  528         self.assertLogged(
  529             "stderr: 'The rain in Spain stays mainly in the plain'\n")
  530 
  531     def testCallingMap(self):
  532         mymap = CallingMap(callme=lambda self: str(10), error=lambda self: int('a'),
  533             dontcallme= "string", number=17)
  534 
  535         # Should work fine
  536         self.assertEqual(
  537             "%(callme)s okay %(dontcallme)s %(number)i" % mymap,
  538             "10 okay string 17")
  539         # Error will now trip, demonstrating delayed call
  540         self.assertRaises(ValueError, lambda x: "%(error)i" % x, mymap)
  541 
  542     def testCallingMapModify(self):
  543         m = CallingMap({
  544             'a': lambda self: 2 + 3,
  545             'b': lambda self: self['a'] + 6,
  546             'c': 'test',
  547         })
  548         # test reset (without modifications):
  549         m.reset()
  550         # do modifications:
  551         m['a'] = 4
  552         del m['c']
  553         # test set and delete:
  554         self.assertEqual(len(m), 2)
  555         self.assertNotIn('c', m)
  556         self.assertEqual((m['a'], m['b']), (4, 10))
  557         # reset to original and test again:
  558         m.reset()
  559         s = repr(m)
  560         self.assertEqual(len(m), 3)
  561         self.assertIn('c', m)
  562         self.assertEqual((m['a'], m['b'], m['c']), (5, 11, 'test'))
  563 
  564     def testCallingMapRep(self):
  565         m = CallingMap({
  566             'a': lambda self: 2 + 3,
  567             'b': lambda self: self['a'] + 6,
  568             'c': ''
  569         })
  570         s = repr(m); # only stored values (no calculated)
  571         self.assertNotIn("'a': ", s)
  572         self.assertNotIn("'b': ", s)
  573         self.assertIn("'c': ''", s)
  574 
  575         s = m._asrepr(True) # all values (including calculated)
  576         self.assertIn("'a': 5", s)
  577         self.assertIn("'b': 11", s)
  578         self.assertIn("'c': ''", s)
  579         
  580         m['c'] = lambda self: self['xxx'] + 7; # unresolvable
  581         s = m._asrepr(True)
  582         self.assertIn("'a': 5", s)
  583         self.assertIn("'b': 11", s)
  584         self.assertIn("'c': ", s) # presents as callable
  585         self.assertNotIn("'c': ''", s) # but not empty
  586 
  587     def testActionsIdleMode(self):
  588         a = Actions(DummyJail())
  589         a.sleeptime = 0.0001;   # don't need to wait long
  590         # enter idle mode right now (start idle):
  591         a.idle = True;
  592         # start:
  593         a.start()
  594         # wait for enter/leave of idle mode:
  595         self.assertLogged("Actions: enter idle mode", wait=10)
  596         # leave idle mode:
  597         a.idle = False
  598         self.assertLogged("Actions: leave idle mode", wait=10)
  599         # stop it:
  600         a.active = False
  601         a.join()