"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "fail2ban/tests/servertestcase.py" between
fail2ban-0.10.5.tar.gz and fail2ban-0.11.1.tar.gz

About:

servertestcase.py  (fail2ban-0.10.5):servertestcase.py  (fail2ban-0.11.1)
skipping to change at line 44 skipping to change at line 44
from ..server.failregex import Regex, FailRegex, RegexException from ..server.failregex import Regex, FailRegex, RegexException
from ..server import actions as _actions from ..server import actions as _actions
from ..server.server import Server from ..server.server import Server
from ..server.ipdns import IPAddr from ..server.ipdns import IPAddr
from ..server.jail import Jail from ..server.jail import Jail
from ..server.jailthread import JailThread from ..server.jailthread import JailThread
from ..server.ticket import BanTicket from ..server.ticket import BanTicket
from ..server.utils import Utils from ..server.utils import Utils
from .dummyjail import DummyJail from .dummyjail import DummyJail
from .utils import LogCaptureTestCase from .utils import LogCaptureTestCase, with_alt_time, MyTime
from ..helpers import getLogger, extractOptions, PREFER_ENC from ..helpers import getLogger, extractOptions, PREFER_ENC
from .. import version from .. import version
try: try:
from ..server import filtersystemd from ..server import filtersystemd
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
filtersystemd = None filtersystemd = None
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
FAST_BACKEND = "polling" FAST_BACKEND = "polling"
skipping to change at line 382 skipping to change at line 382
for ip in ("192.0.2.1", "192.0.2.2"): for ip in ("192.0.2.1", "192.0.2.2"):
self.assertEqual(attempt(ip, ["test failure %d" % i]), (0, 1)) self.assertEqual(attempt(ip, ["test failure %d" % i]), (0, 1))
self.assertLogged("192.0.2.1:2", "192.0.2.2:2", all=True, wait=Tr ue) self.assertLogged("192.0.2.1:2", "192.0.2.2:2", all=True, wait=Tr ue)
# this 3 attempts at once should cause a ban: # this 3 attempts at once should cause a ban:
self.assertEqual(attempt(ip, ["test failure %d" % i for i in (3,4 ,5)]), (0, 1)) self.assertEqual(attempt(ip, ["test failure %d" % i for i in (3,4 ,5)]), (0, 1))
self.assertLogged("192.0.2.2:5", wait=True) self.assertLogged("192.0.2.2:5", wait=True)
# resulted to ban for "192.0.2.2" but not for "192.0.2.1": # resulted to ban for "192.0.2.2" but not for "192.0.2.1":
self.assertLogged("Ban 192.0.2.2", wait=True) self.assertLogged("Ban 192.0.2.2", wait=True)
self.assertNotLogged("Ban 192.0.2.1") self.assertNotLogged("Ban 192.0.2.1")
@with_alt_time
def testJailBanList(self):
jail = "TestJailBanList"
self.server.addJail(jail, FAST_BACKEND)
self.server.startJail(jail)
# Helper to process set banip/set unbanip commands and compare th
e list of
# banned IP addresses with outList.
def _getBanListTest(jail, banip=None, unbanip=None, args=(), outL
ist=[]):
# Ban IP address
if banip is not None:
self.assertEqual(
self.transm.proceed(["set", jail, "banip"
, banip]),
(0, 1))
self.assertLogged("Ban %s" % banip, wait=True) #
Give chance to ban
# Unban IP address
if unbanip is not None:
self.assertEqual(
self.transm.proceed(["set", jail, "unbani
p", unbanip]),
(0, 1))
self.assertLogged("Unban %s" % unbanip, wait=True
) # Give chance to unban
# Compare the list of banned IP addresses with outList
self.assertSortedEqual(
self.transm.proceed(["get", jail, "banip"]+list(a
rgs)),
(0, outList), nestedOnly=False)
MyTime.setTime(MyTime.time() + 1)
_getBanListTest(jail,
outList=[])
_getBanListTest(jail, banip="127.0.0.1", args=('--with-time',),
outList=["127.0.0.1 \t2005-08-14 12:00:01 + 600 = 2005-08
-14 12:10:01"])
_getBanListTest(jail, banip="192.168.0.1", args=('--with-time',),
outList=[
"127.0.0.1 \t2005-08-14 12:00:01 + 600 = 2005-08-
14 12:10:01",
"192.168.0.1 \t2005-08-14 12:00:02 + 600 = 2005-0
8-14 12:10:02"])
_getBanListTest(jail, banip="192.168.1.10",
outList=["127.0.0.1", "192.168.0.1", "192.168.1.10"])
_getBanListTest(jail, unbanip="127.0.0.1",
outList=["192.168.0.1", "192.168.1.10"])
_getBanListTest(jail, unbanip="192.168.1.10",
outList=["192.168.0.1"])
_getBanListTest(jail, unbanip="192.168.0.1",
outList=[])
def testJailMaxMatches(self): def testJailMaxMatches(self):
self.setGetTest("maxmatches", "5", 5, jail=self.jailName) self.setGetTest("maxmatches", "5", 5, jail=self.jailName)
self.setGetTest("maxmatches", "2", 2, jail=self.jailName) self.setGetTest("maxmatches", "2", 2, jail=self.jailName)
self.setGetTest("maxmatches", "-2", -2, jail=self.jailName) self.setGetTest("maxmatches", "-2", -2, jail=self.jailName)
self.setGetTestNOK("maxmatches", "Duck", jail=self.jailName) self.setGetTestNOK("maxmatches", "Duck", jail=self.jailName)
def testJailMaxRetry(self): def testJailMaxRetry(self):
self.setGetTest("maxretry", "5", 5, jail=self.jailName) self.setGetTest("maxretry", "5", 5, jail=self.jailName)
self.setGetTest("maxretry", "2", 2, jail=self.jailName) self.setGetTest("maxretry", "2", 2, jail=self.jailName)
self.setGetTest("maxretry", "-2", -2, jail=self.jailName) self.setGetTest("maxretry", "-2", -2, jail=self.jailName)
skipping to change at line 989 skipping to change at line 1033
finally: finally:
os.remove(fn2) os.remove(fn2)
finally: finally:
try: try:
os.remove(fn) os.remove(fn)
except OSError: except OSError:
pass pass
self.assertEqual(self.transm.proceed(["set", "logtarget", "STDERR "]), (0, "STDERR")) self.assertEqual(self.transm.proceed(["set", "logtarget", "STDERR "]), (0, "STDERR"))
self.assertEqual(self.transm.proceed(["flushlogs"]), (0, "flushed ")) self.assertEqual(self.transm.proceed(["flushlogs"]), (0, "flushed "))
def testBanTimeIncr(self):
self.setGetTest("bantime.increment", "true", True, jail=self.jail
Name)
self.setGetTest("bantime.rndtime", "30min", 30*60, jail=self.jail
Name)
self.setGetTest("bantime.maxtime", "1000 days", 1000*24*60*60, ja
il=self.jailName)
self.setGetTest("bantime.factor", "2", "2", jail=self.jailName)
self.setGetTest("bantime.formula", "ban.Time * math.exp(float(ban
.Count+1)*banFactor)/math.exp(1*banFactor)", jail=self.jailName)
self.setGetTest("bantime.multipliers", "1 5 30 60 300 720 1440 28
80", "1 5 30 60 300 720 1440 2880", jail=self.jailName)
self.setGetTest("bantime.overalljails", "true", "true", jail=self
.jailName)
class JailTests(unittest.TestCase): class JailTests(unittest.TestCase):
def testLongName(self): def testLongName(self):
# Just a smoke test for now # Just a smoke test for now
longname = "veryveryverylongname" longname = "veryveryverylongname"
jail = Jail(longname) jail = Jail(longname)
self.assertEqual(jail.name, longname) self.assertEqual(jail.name, longname)
class RegexTests(unittest.TestCase): class RegexTests(unittest.TestCase):
skipping to change at line 1151 skipping to change at line 1204
super(ServerConfigReaderTests, self).tearDown() super(ServerConfigReaderTests, self).tearDown()
def _executeCmd(self, realCmd, timeout=60): def _executeCmd(self, realCmd, timeout=60):
for l in realCmd.split('\n'): for l in realCmd.split('\n'):
if not l.startswith('#'): if not l.startswith('#'):
logSys.debug('exec-cmd: `%s`', l) logSys.debug('exec-cmd: `%s`', l)
else: else:
logSys.debug(l) logSys.debug(l)
return True return True
def _testActionInfos(self):
if not hasattr(self, '__aInfos'):
dmyjail = DummyJail()
self.__aInfos = {}
for t, ip in (('ipv4', '192.0.2.1'), ('ipv6', '2001:DB8::
')):
ticket = BanTicket(ip)
ticket.setBanTime(600)
self.__aInfos[t] = _actions.Actions.ActionInfo(ti
cket, dmyjail)
return self.__aInfos
def _testExecActions(self, server): def _testExecActions(self, server):
jails = server._Server__jails jails = server._Server__jails
aInfos = self._testActionInfos()
for jail in jails: for jail in jails:
# print(jail, jails[jail]) # print(jail, jails[jail])
for a in jails[jail].actions: for a in jails[jail].actions:
action = jails[jail].actions[a] action = jails[jail].actions[a]
logSys.debug('# ' + ('=' * 50)) logSys.debug('# ' + ('=' * 50))
logSys.debug('# == %-44s ==', jail + ' - ' + acti on._name) logSys.debug('# == %-44s ==', jail + ' - ' + acti on._name)
logSys.debug('# ' + ('=' * 50)) logSys.debug('# ' + ('=' * 50))
# we can currently test only command actions: # we can currently test only command actions:
if not isinstance(action, _actions.CommandAction) : continue if not isinstance(action, _actions.CommandAction) : continue
# wrap default command processor, just log if (he avy)debug: # wrap default command processor, just log if (he avy)debug:
action.executeCmd = self._executeCmd action.executeCmd = self._executeCmd
# test start : # test start :
logSys.debug('# === start ==='); self.pruneLog() logSys.debug('# === start ==='); self.pruneLog()
action.start() action.start()
# test ban ip4 : # test ban ip4 :
logSys.debug('# === ban-ipv4 ==='); self.pruneLog () logSys.debug('# === ban-ipv4 ==='); self.pruneLog ()
action.ban({'ip': IPAddr('192.0.2.1'), 'family': 'inet4'}) action.ban(aInfos['ipv4'])
# test unban ip4 : # test unban ip4 :
logSys.debug('# === unban ipv4 ==='); self.pruneL og() logSys.debug('# === unban ipv4 ==='); self.pruneL og()
action.unban({'ip': IPAddr('192.0.2.1'), 'family' : 'inet4'}) action.unban(aInfos['ipv4'])
# test ban ip6 : # test ban ip6 :
logSys.debug('# === ban ipv6 ==='); self.pruneLog () logSys.debug('# === ban ipv6 ==='); self.pruneLog ()
action.ban({'ip': IPAddr('2001:DB8::'), 'family': 'inet6'}) action.ban(aInfos['ipv6'])
# test unban ip6 : # test unban ip6 :
logSys.debug('# === unban ipv6 ==='); self.pruneL og() logSys.debug('# === unban ipv6 ==='); self.pruneL og()
action.unban({'ip': IPAddr('2001:DB8::'), 'family ': 'inet6'}) action.unban(aInfos['ipv6'])
# test stop : # test stop :
logSys.debug('# === stop ==='); self.pruneLog() logSys.debug('# === stop ==='); self.pruneLog()
action.stop() action.stop()
def testCheckStockJailActions(self): def testCheckStockJailActions(self):
unittest.F2B.SkipIfCfgMissing(stock=True) unittest.F2B.SkipIfCfgMissing(stock=True)
# we are running tests from root project dir atm # we are running tests from root project dir atm
jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_ config=self.__share_cfg) jails = JailsReader(basedir=CONFIG_DIR, force_enable=True, share_ config=self.__share_cfg)
self.assertTrue(jails.read()) # opens fine self.assertTrue(jails.read()) # opens fine
self.assertTrue(jails.getOptions()) # reads fine self.assertTrue(jails.getOptions()) # reads fine
skipping to change at line 1502 skipping to change at line 1567
r"`iptables -w -D f2b-j-w-iptables-ap -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`", r"`iptables -w -D f2b-j-w-iptables-ap -s 192.0.2.1 -j REJECT --reject-with icmp-port-unreachable`",
), ),
'ip6-ban': ( 'ip6-ban': (
r"`ip6tables -w -I f2b-j-w-iptables-ap 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`", r"`ip6tables -w -I f2b-j-w-iptables-ap 1 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
), ),
'ip6-unban': ( 'ip6-unban': (
r"`ip6tables -w -D f2b-j-w-iptables-ap -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`", r"`ip6tables -w -D f2b-j-w-iptables-ap -s 2001:db8:: -j REJECT --reject-with icmp6-port-unreachable`",
), ),
}), }),
# iptables-ipset-proto6 -- # iptables-ipset-proto6 --
('j-w-iptables-ipset', 'iptables-ipset-proto6[name=%(__na me__)s, bantime="10m", port="http", protocol="tcp", chain="<known/chain>"]', { ('j-w-iptables-ipset', 'iptables-ipset-proto6[name=%(__na me__)s, bantime="10m", default-timeout=0, port="http", protocol="tcp", chain="<k nown/chain>"]', {
'ip4': (' f2b-j-w-iptables-ipset ',), 'ip6': (' f 2b-j-w-iptables-ipset6 ',), 'ip4': (' f2b-j-w-iptables-ipset ',), 'ip6': (' f 2b-j-w-iptables-ipset6 ',),
'ip4-start': ( 'ip4-start': (
"`ipset create f2b-j-w-iptables-ipset has h:ip timeout 600`", "`ipset create f2b-j-w-iptables-ipset has h:ip timeout 0`",
"`iptables -w -I INPUT -p tcp -m multipor t --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject -with icmp-port-unreachable`", "`iptables -w -I INPUT -p tcp -m multipor t --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject -with icmp-port-unreachable`",
), ),
'ip6-start': ( 'ip6-start': (
"`ipset create f2b-j-w-iptables-ipset6 ha sh:ip timeout 600 family inet6`", "`ipset create f2b-j-w-iptables-ipset6 ha sh:ip timeout 0 family inet6`",
"`ip6tables -w -I INPUT -p tcp -m multipo rt --dports http -m set --match-set f2b-j-w-iptables-ipset6 src -j REJECT --reje ct-with icmp6-port-unreachable`", "`ip6tables -w -I INPUT -p tcp -m multipo rt --dports http -m set --match-set f2b-j-w-iptables-ipset6 src -j REJECT --reje ct-with icmp6-port-unreachable`",
), ),
'flush': ( 'flush': (
"`ipset flush f2b-j-w-iptables-ipset`", "`ipset flush f2b-j-w-iptables-ipset`",
"`ipset flush f2b-j-w-iptables-ipset6`", "`ipset flush f2b-j-w-iptables-ipset6`",
), ),
'stop': ( 'stop': (
"`iptables -w -D INPUT -p tcp -m multipor t --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject -with icmp-port-unreachable`", "`iptables -w -D INPUT -p tcp -m multipor t --dports http -m set --match-set f2b-j-w-iptables-ipset src -j REJECT --reject -with icmp-port-unreachable`",
"`ipset flush f2b-j-w-iptables-ipset`", "`ipset flush f2b-j-w-iptables-ipset`",
"`ipset destroy f2b-j-w-iptables-ipset`", "`ipset destroy f2b-j-w-iptables-ipset`",
skipping to change at line 1538 skipping to change at line 1603
r"`ipset del f2b-j-w-iptables-ipset 192.0 .2.1 -exist`", r"`ipset del f2b-j-w-iptables-ipset 192.0 .2.1 -exist`",
), ),
'ip6-ban': ( 'ip6-ban': (
r"`ipset add f2b-j-w-iptables-ipset6 2001 :db8:: timeout 600 -exist`", r"`ipset add f2b-j-w-iptables-ipset6 2001 :db8:: timeout 600 -exist`",
), ),
'ip6-unban': ( 'ip6-unban': (
r"`ipset del f2b-j-w-iptables-ipset6 2001 :db8:: -exist`", r"`ipset del f2b-j-w-iptables-ipset6 2001 :db8:: -exist`",
), ),
}), }),
# iptables-ipset-proto6-allports -- # iptables-ipset-proto6-allports --
('j-w-iptables-ipset-ap', 'iptables-ipset-proto6-allports [name=%(__name__)s, bantime="10m", chain="<known/chain>"]', { ('j-w-iptables-ipset-ap', 'iptables-ipset-proto6-allports [name=%(__name__)s, bantime="10m", default-timeout=0, chain="<known/chain>"]', {
'ip4': (' f2b-j-w-iptables-ipset-ap ',), 'ip6': ( ' f2b-j-w-iptables-ipset-ap6 ',), 'ip4': (' f2b-j-w-iptables-ipset-ap ',), 'ip6': ( ' f2b-j-w-iptables-ipset-ap6 ',),
'ip4-start': ( 'ip4-start': (
"`ipset create f2b-j-w-iptables-ipset-ap hash:ip timeout 600`", "`ipset create f2b-j-w-iptables-ipset-ap hash:ip timeout 0`",
"`iptables -w -I INPUT -m set --match-set f2b-j-w-iptables-ipset-ap src -j REJECT --reject-with icmp-port-unreachable`", "`iptables -w -I INPUT -m set --match-set f2b-j-w-iptables-ipset-ap src -j REJECT --reject-with icmp-port-unreachable`",
), ),
'ip6-start': ( 'ip6-start': (
"`ipset create f2b-j-w-iptables-ipset-ap6 hash:ip timeout 600 family inet6`", "`ipset create f2b-j-w-iptables-ipset-ap6 hash:ip timeout 0 family inet6`",
"`ip6tables -w -I INPUT -m set --match-se t f2b-j-w-iptables-ipset-ap6 src -j REJECT --reject-with icmp6-port-unreachable` ", "`ip6tables -w -I INPUT -m set --match-se t f2b-j-w-iptables-ipset-ap6 src -j REJECT --reject-with icmp6-port-unreachable` ",
), ),
'flush': ( 'flush': (
"`ipset flush f2b-j-w-iptables-ipset-ap`" , "`ipset flush f2b-j-w-iptables-ipset-ap`" ,
"`ipset flush f2b-j-w-iptables-ipset-ap6` ", "`ipset flush f2b-j-w-iptables-ipset-ap6` ",
), ),
'stop': ( 'stop': (
"`iptables -w -D INPUT -m set --match-set f2b-j-w-iptables-ipset-ap src -j REJECT --reject-with icmp-port-unreachable`", "`iptables -w -D INPUT -m set --match-set f2b-j-w-iptables-ipset-ap src -j REJECT --reject-with icmp-port-unreachable`",
"`ipset flush f2b-j-w-iptables-ipset-ap`" , "`ipset flush f2b-j-w-iptables-ipset-ap`" ,
"`ipset destroy f2b-j-w-iptables-ipset-ap `", "`ipset destroy f2b-j-w-iptables-ipset-ap `",
skipping to change at line 1845 skipping to change at line 1910
r"`firewall-cmd --direct --remove-rule ip v4 filter f2b-j-w-fwcmd-ap 0 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unre achable`", r"`firewall-cmd --direct --remove-rule ip v4 filter f2b-j-w-fwcmd-ap 0 -s 192.0.2.1 -j REJECT --reject-with icmp-port-unre achable`",
), ),
'ip6-ban': ( 'ip6-ban': (
r"`firewall-cmd --direct --add-rule ipv6 filter f2b-j-w-fwcmd-ap 0 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unrea chable`", r"`firewall-cmd --direct --add-rule ipv6 filter f2b-j-w-fwcmd-ap 0 -s 2001:db8:: -j REJECT --reject-with icmp6-port-unrea chable`",
), ),
'ip6-unban': ( 'ip6-unban': (
r"`firewall-cmd --direct --remove-rule ip v6 filter f2b-j-w-fwcmd-ap 0 -s 2001:db8:: -j REJECT --reject-with icmp6-port-un reachable`", r"`firewall-cmd --direct --remove-rule ip v6 filter f2b-j-w-fwcmd-ap 0 -s 2001:db8:: -j REJECT --reject-with icmp6-port-un reachable`",
), ),
}), }),
# firewallcmd-ipset (multiport) -- # firewallcmd-ipset (multiport) --
('j-w-fwcmd-ipset', 'firewallcmd-ipset[name=%(__name__)s, bantime="10m", port="http", protocol="tcp", chain="<known/chain>"]', { ('j-w-fwcmd-ipset', 'firewallcmd-ipset[name=%(__name__)s, bantime="10m", default-timeout=0, port="http", protocol="tcp", chain="<known/ch ain>"]', {
'ip4': (' f2b-j-w-fwcmd-ipset ',), 'ip6': (' f2b- j-w-fwcmd-ipset6 ',), 'ip4': (' f2b-j-w-fwcmd-ipset ',), 'ip6': (' f2b- j-w-fwcmd-ipset6 ',),
'ip4-start': ( 'ip4-start': (
"`ipset create f2b-j-w-fwcmd-ipset hash:i p timeout 600`", "`ipset create f2b-j-w-fwcmd-ipset hash:i p timeout 0`",
"`firewall-cmd --direct --add-rule ipv4 f ilter INPUT_direct 0 -p tcp -m multiport --dports http -m set --match-set f2b-j- w-fwcmd-ipset src -j REJECT --reject-with icmp-port-unreachable`", "`firewall-cmd --direct --add-rule ipv4 f ilter INPUT_direct 0 -p tcp -m multiport --dports http -m set --match-set f2b-j- w-fwcmd-ipset src -j REJECT --reject-with icmp-port-unreachable`",
), ),
'ip6-start': ( 'ip6-start': (
"`ipset create f2b-j-w-fwcmd-ipset6 hash: ip timeout 600 family inet6`", "`ipset create f2b-j-w-fwcmd-ipset6 hash: ip timeout 0 family inet6`",
"`firewall-cmd --direct --add-rule ipv6 f ilter INPUT_direct 0 -p tcp -m multiport --dports http -m set --match-set f2b-j- w-fwcmd-ipset6 src -j REJECT --reject-with icmp6-port-unreachable`", "`firewall-cmd --direct --add-rule ipv6 f ilter INPUT_direct 0 -p tcp -m multiport --dports http -m set --match-set f2b-j- w-fwcmd-ipset6 src -j REJECT --reject-with icmp6-port-unreachable`",
), ),
'flush': ( 'flush': (
"`ipset flush f2b-j-w-fwcmd-ipset`", "`ipset flush f2b-j-w-fwcmd-ipset`",
"`ipset flush f2b-j-w-fwcmd-ipset6`", "`ipset flush f2b-j-w-fwcmd-ipset6`",
), ),
'stop': ( 'stop': (
"`firewall-cmd --direct --remove-rule ipv 4 filter INPUT_direct 0 -p tcp -m multiport --dports http -m set --match-set f2b -j-w-fwcmd-ipset src -j REJECT --reject-with icmp-port-unreachable`", "`firewall-cmd --direct --remove-rule ipv 4 filter INPUT_direct 0 -p tcp -m multiport --dports http -m set --match-set f2b -j-w-fwcmd-ipset src -j REJECT --reject-with icmp-port-unreachable`",
"`ipset flush f2b-j-w-fwcmd-ipset`", "`ipset flush f2b-j-w-fwcmd-ipset`",
"`ipset destroy f2b-j-w-fwcmd-ipset`", "`ipset destroy f2b-j-w-fwcmd-ipset`",
skipping to change at line 1935 skipping to change at line 2000
# print(cmd) # print(cmd)
# transmit jail to the server: # transmit jail to the server:
for cmd in stream: for cmd in stream:
# command to server: # command to server:
ret, res = transm.proceed(cmd) ret, res = transm.proceed(cmd)
self.assertEqual(ret, 0) self.assertEqual(ret, 0)
jails = server._Server__jails jails = server._Server__jails
tickets = { aInfos = self._testActionInfos()
'ip4': BanTicket('192.0.2.1'),
'ip6': BanTicket('2001:DB8::'),
}
for jail, act, tests in testJailsActions: for jail, act, tests in testJailsActions:
# print(jail, jails[jail]) # print(jail, jails[jail])
for a in jails[jail].actions: for a in jails[jail].actions:
action = jails[jail].actions[a] action = jails[jail].actions[a]
logSys.debug('# ' + ('=' * 50)) logSys.debug('# ' + ('=' * 50))
logSys.debug('# == %-44s ==', jail + ' - ' + acti on._name) logSys.debug('# == %-44s ==', jail + ' - ' + acti on._name)
logSys.debug('# ' + ('=' * 50)) logSys.debug('# ' + ('=' * 50))
self.assertTrue(isinstance(action, _actions.Comma ndAction)) self.assertTrue(isinstance(action, _actions.Comma ndAction))
# wrap default command processor: # wrap default command processor:
action.executeCmd = self._executeCmd action.executeCmd = self._executeCmd
# test start : # test start :
self.pruneLog('# === start ===') self.pruneLog('# === start ===')
action.start() action.start()
if tests.get('start'): if tests.get('start'):
self.assertLogged(*tests['start'], all=Tr ue) self.assertLogged(*tests['start'], all=Tr ue)
elif tests.get('ip4-start') and tests.get('ip6-st art'): elif tests.get('ip4-start') and tests.get('ip6-st art'):
self.assertNotLogged(*tests['ip4-start']+ tests['ip6-start'], all=True) self.assertNotLogged(*tests['ip4-start']+ tests['ip6-start'], all=True)
ainfo = {
'ip4': _actions.Actions.ActionInfo(ticket
s['ip4'], jails[jail]),
'ip6': _actions.Actions.ActionInfo(ticket
s['ip6'], jails[jail]),
}
# test ban ip4 : # test ban ip4 :
self.pruneLog('# === ban-ipv4 ===') self.pruneLog('# === ban-ipv4 ===')
action.ban(ainfo['ip4']) action.ban(aInfos['ipv4'])
if tests.get('ip4-start'): self.assertLogged(*tes ts.get('*-start', ())+tests['ip4-start'], all=True) if tests.get('ip4-start'): self.assertLogged(*tes ts.get('*-start', ())+tests['ip4-start'], all=True)
if tests.get('ip6-start'): self.assertNotLogged(* tests['ip6-start'], all=True) if tests.get('ip6-start'): self.assertNotLogged(* tests['ip6-start'], all=True)
self.assertLogged(*tests.get('ip4-check',())+test s['ip4-ban'], all=True) self.assertLogged(*tests.get('ip4-check',())+test s['ip4-ban'], all=True)
self.assertNotLogged(*tests['ip6'], all=True) self.assertNotLogged(*tests['ip6'], all=True)
# test unban ip4 : # test unban ip4 :
self.pruneLog('# === unban ipv4 ===') self.pruneLog('# === unban ipv4 ===')
action.unban(ainfo['ip4']) action.unban(aInfos['ipv4'])
self.assertLogged(*tests.get('ip4-check',())+test s['ip4-unban'], all=True) self.assertLogged(*tests.get('ip4-check',())+test s['ip4-unban'], all=True)
self.assertNotLogged(*tests['ip6'], all=True) self.assertNotLogged(*tests['ip6'], all=True)
# test ban ip6 : # test ban ip6 :
self.pruneLog('# === ban ipv6 ===') self.pruneLog('# === ban ipv6 ===')
action.ban(ainfo['ip6']) action.ban(aInfos['ipv6'])
if tests.get('ip6-start'): self.assertLogged(*tes ts.get('*-start', ())+tests['ip6-start'], all=True) if tests.get('ip6-start'): self.assertLogged(*tes ts.get('*-start', ())+tests['ip6-start'], all=True)
if tests.get('ip4-start'): self.assertNotLogged(* tests['ip4-start'], all=True) if tests.get('ip4-start'): self.assertNotLogged(* tests['ip4-start'], all=True)
self.assertLogged(*tests.get('ip6-check',())+test s['ip6-ban'], all=True) self.assertLogged(*tests.get('ip6-check',())+test s['ip6-ban'], all=True)
self.assertNotLogged(*tests['ip4'], all=True) self.assertNotLogged(*tests['ip4'], all=True)
# test unban ip6 : # test unban ip6 :
self.pruneLog('# === unban ipv6 ===') self.pruneLog('# === unban ipv6 ===')
action.unban(ainfo['ip6']) action.unban(aInfos['ipv6'])
self.assertLogged(*tests.get('ip6-check',())+test s['ip6-unban'], all=True) self.assertLogged(*tests.get('ip6-check',())+test s['ip6-unban'], all=True)
self.assertNotLogged(*tests['ip4'], all=True) self.assertNotLogged(*tests['ip4'], all=True)
# test flush for actions should supported this: # test flush for actions should supported this:
if tests.get('flush'): if tests.get('flush'):
self.pruneLog('# === flush ===') self.pruneLog('# === flush ===')
action.flush() action.flush()
self.assertLogged(*tests['flush'], all=Tr ue) self.assertLogged(*tests['flush'], all=Tr ue)
# test stop : # test stop :
self.pruneLog('# === stop ===') self.pruneLog('# === stop ===')
action.stop() action.stop()
 End of changes. 24 change blocks. 
28 lines changed or deleted 102 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)