"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "fail2ban/tests/databasetestcase.py" between
fail2ban-0.10.5.tar.gz and fail2ban-0.11.1.tar.gz

About:

databasetestcase.py  (fail2ban-0.10.5):databasetestcase.py  (fail2ban-0.11.1)
skipping to change at line 165 skipping to change at line 165
shutil.copyfile( shutil.copyfile(
os.path.join(TEST_FILES_DIR, 'database_v1.db'), s elf.dbFilename) os.path.join(TEST_FILES_DIR, 'database_v1.db'), s elf.dbFilename)
self.db = Fail2BanDb(self.dbFilename) self.db = Fail2BanDb(self.dbFilename)
self.assertEqual(self.db.getJailNames(), set(['DummyJail #29162448 with 0 tickets'])) self.assertEqual(self.db.getJailNames(), set(['DummyJail #29162448 with 0 tickets']))
self.assertEqual(self.db.getLogPaths(), set(['/tmp/Fail2B anDb_pUlZJh.log'])) self.assertEqual(self.db.getLogPaths(), set(['/tmp/Fail2B anDb_pUlZJh.log']))
ticket = FailTicket("127.0.0.1", 1388009242.26, [u"abc\n" ]) ticket = FailTicket("127.0.0.1", 1388009242.26, [u"abc\n" ])
self.assertEqual(self.db.getBans()[0], ticket) self.assertEqual(self.db.getBans()[0], ticket)
self.assertEqual(self.db.updateDb(Fail2BanDb.__version__) , Fail2BanDb.__version__) self.assertEqual(self.db.updateDb(Fail2BanDb.__version__) , Fail2BanDb.__version__)
self.assertRaises(NotImplementedError, self.db.updateDb, Fail2BanDb.__version__ + 1) self.assertRaises(NotImplementedError, self.db.updateDb, Fail2BanDb.__version__ + 1)
# check current bans (should find exactly 1 ticket after
upgrade):
tickets = self.db.getCurrentBans(fromtime=1388009242, cor
rectBanTime=123456)
self.assertEqual(len(tickets), 1)
self.assertEqual(tickets[0].getBanTime(), 123456); # ban-
time was unknown (normally updated from jail)
finally: finally:
if self.db and self.db._dbFilename != ":memory:": if self.db and self.db._dbFilename != ":memory:":
os.remove(self.db._dbBackupFilename) os.remove(self.db._dbBackupFilename)
def testUpdateDb2(self):
self.db = None
if self.dbFilename is None: # pragma: no cover
_, self.dbFilename = tempfile.mkstemp(".db", "fail2ban_")
shutil.copyfile(
os.path.join(TEST_FILES_DIR, 'database_v2.db'), self.dbFi
lename)
self.db = Fail2BanDb(self.dbFilename)
self.assertEqual(self.db.getJailNames(), set(['pam-generic']))
self.assertEqual(self.db.getLogPaths(), set(['/var/log/auth.log']
))
bans = self.db.getBans()
self.assertEqual(len(bans), 2)
# compare first ticket completely:
ticket = FailTicket("1.2.3.7", 1417595494, [
u'Dec 3 09:31:08 f2btest test:auth[27658]: pam_unix(test
:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.
3.7',
u'Dec 3 09:31:32 f2btest test:auth[27671]: pam_unix(test
:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.
3.7',
u'Dec 3 09:31:34 f2btest test:auth[27673]: pam_unix(test
:auth): authentication failure; logname= uid=0 euid=0 tty=test ruser= rhost=1.2.
3.7'
])
ticket.setAttempt(3)
self.assertEqual(bans[0], ticket)
# second ban found also:
self.assertEqual(bans[1].getIP(), "1.2.3.8")
# updated ?
self.assertEqual(self.db.updateDb(Fail2BanDb.__version__), Fail2B
anDb.__version__)
# check current bans (should find 2 tickets after upgrade):
self.jail = DummyJail(name='pam-generic')
tickets = self.db.getCurrentBans(jail=self.jail, fromtime=1417595
494)
self.assertEqual(len(tickets), 2)
self.assertEqual(tickets[0].getBanTime(), 600)
# further update should fail:
self.assertRaises(NotImplementedError, self.db.updateDb, Fail2Ban
Db.__version__ + 1)
# clean:
os.remove(self.db._dbBackupFilename)
def testAddJail(self): def testAddJail(self):
self.jail = DummyJail() self.jail = DummyJail()
self.db.addJail(self.jail) self.db.addJail(self.jail)
self.assertTrue( self.assertTrue(
self.jail.name in self.db.getJailNames(True), self.jail.name in self.db.getJailNames(True),
"Jail not added to database") "Jail not added to database")
def testAddLog(self): def testAddLog(self):
self.testAddJail() # Jail required self.testAddJail() # Jail required
skipping to change at line 392 skipping to change at line 429
self.db.maxMatches = 0; self.db.maxMatches = 0;
self.db.addBan(self.jail, ticket) self.db.addBan(self.jail, ticket)
ticket = self.db.getCurrentBans(self.jail, "127.0.0.1", fromtime= MyTime.time()-100) ticket = self.db.getCurrentBans(self.jail, "127.0.0.1", fromtime= MyTime.time()-100)
self.assertTrue(ticket is not None) self.assertTrue(ticket is not None)
self.assertEqual(ticket.getAttempt(), len(failures)) self.assertEqual(ticket.getAttempt(), len(failures))
self.assertEqual(len(ticket.getMatches()), 0) self.assertEqual(len(ticket.getMatches()), 0)
def testGetBansMerged(self): def testGetBansMerged(self):
self.testAddJail() self.testAddJail()
jail2 = DummyJail() jail2 = DummyJail(name='DummyJail-2')
self.db.addJail(jail2) self.db.addJail(jail2)
ticket = FailTicket("127.0.0.1", MyTime.time() - 40, ["abc\n"]) ticket = FailTicket("127.0.0.1", MyTime.time() - 40, ["abc\n"])
ticket.setAttempt(10) ticket.setAttempt(10)
self.db.addBan(self.jail, ticket) self.db.addBan(self.jail, ticket)
ticket = FailTicket("127.0.0.1", MyTime.time() - 30, ["123\n"]) ticket = FailTicket("127.0.0.1", MyTime.time() - 30, ["123\n"])
ticket.setAttempt(20) ticket.setAttempt(20)
self.db.addBan(self.jail, ticket) self.db.addBan(self.jail, ticket)
ticket = FailTicket("127.0.0.2", MyTime.time() - 20, ["ABC\n"]) ticket = FailTicket("127.0.0.2", MyTime.time() - 20, ["ABC\n"])
ticket.setAttempt(30) ticket.setAttempt(30)
skipping to change at line 474 skipping to change at line 511
self.assertEqual(ticket.getIP(), "127.0.0.1") self.assertEqual(ticket.getIP(), "127.0.0.1")
# positive case (1 ticket not yet expired): # positive case (1 ticket not yet expired):
tickets = self.db.getCurrentBans(jail=self.jail, forbantime=15, tickets = self.db.getCurrentBans(jail=self.jail, forbantime=15,
fromtime=MyTime.time()) fromtime=MyTime.time())
self.assertEqual(len(tickets), 1) self.assertEqual(len(tickets), 1)
# negative case (all are expired in 1year): # negative case (all are expired in 1year):
tickets = self.db.getCurrentBans(jail=self.jail, forbantime=15, tickets = self.db.getCurrentBans(jail=self.jail, forbantime=15,
fromtime=MyTime.time() + MyTime.str2seconds("1year")) fromtime=MyTime.time() + MyTime.str2seconds("1year"))
self.assertEqual(len(tickets), 0) self.assertEqual(len(tickets), 0)
# persistent bantime (-1), so never expired: # persistent bantime (-1), so never expired (but no persistent ti ckets):
tickets = self.db.getCurrentBans(jail=self.jail, forbantime=-1, tickets = self.db.getCurrentBans(jail=self.jail, forbantime=-1,
fromtime=MyTime.time() + MyTime.str2seconds("1year")) fromtime=MyTime.time() + MyTime.str2seconds("1year"))
self.assertEqual(len(tickets), 2) self.assertEqual(len(tickets), 0)
# add persistent one:
ticket.setBanTime(-1)
self.db.addBan(self.jail, ticket)
# persistent bantime (-1), so never expired (but jail has other m
ax bantime now):
tickets = self.db.getCurrentBans(jail=self.jail, forbantime=-1,
fromtime=MyTime.time() + MyTime.str2seconds("1year"))
# no tickets should be found (max ban time = 600):
self.assertEqual(len(tickets), 0)
self.assertLogged("ignore ticket (with new max ban-time %r)" % se
lf.jail.getMaxBanTime())
# change jail to persistent ban and try again (1 persistent ticke
t):
self.jail.actions.setBanTime(-1)
tickets = self.db.getCurrentBans(jail=self.jail, forbantime=-1,
fromtime=MyTime.time() + MyTime.str2seconds("1year"))
self.assertEqual(len(tickets), 1)
self.assertEqual(tickets[0].getBanTime(), -1); # current jail ban
time.
def testActionWithDB(self): def testActionWithDB(self):
# test action together with database functionality # test action together with database functionality
self.testAddJail() # Jail required self.testAddJail() # Jail required
self.jail.database = self.db self.jail.database = self.db
actions = Actions(self.jail) actions = Actions(self.jail)
actions.add( actions.add(
"action_checkainfo", "action_checkainfo",
os.path.join(TEST_FILES_DIR, "action.d/action_checkainfo. py"), os.path.join(TEST_FILES_DIR, "action.d/action_checkainfo. py"),
{}) {})
ticket = FailTicket("1.2.3.4", MyTime.time(), ['test', 'test']) ticket = FailTicket("1.2.3.4")
ticket.setAttempt(5) ticket.setAttempt(5)
ticket.setMatches(['test', 'test'])
self.jail.putFailTicket(ticket) self.jail.putFailTicket(ticket)
actions._Actions__checkBan() actions._Actions__checkBan()
self.assertLogged("ban ainfo %s, %s, %s, %s" % (True, True, True, True)) self.assertLogged("ban ainfo %s, %s, %s, %s" % (True, True, True, True))
def testDelAndAddJail(self): def testDelAndAddJail(self):
self.testAddJail() # Add jail self.testAddJail() # Add jail
# Delete jail (just disabled it): # Delete jail (just disabled it):
self.db.delJail(self.jail) self.db.delJail(self.jail)
jails = self.db.getJailNames() jails = self.db.getJailNames()
self.assertIn(len(jails) == 1 and self.jail.name, jails) self.assertIn(len(jails) == 1 and self.jail.name, jails)
 End of changes. 7 change blocks. 
4 lines changed or deleted 75 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)