"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "fail2ban/tests/databasetestcase.py" between
fail2ban-0.10.3.1.tar.gz and fail2ban-0.10.4.tar.gz

About: fail2ban scans log files and bans (via firewall rules) IP-addresses that makes too many access failures. It updates firewall rules to reject the IP address. Experimental version.

databasetestcase.py  (fail2ban-0.10.3.1):databasetestcase.py  (fail2ban-0.10.4)
skipping to change at line 38 skipping to change at line 38
import tempfile import tempfile
import sqlite3 import sqlite3
import shutil import shutil
from ..server.filter import FileContainer from ..server.filter import FileContainer
from ..server.mytime import MyTime from ..server.mytime import MyTime
from ..server.ticket import FailTicket from ..server.ticket import FailTicket
from ..server.actions import Actions, Utils from ..server.actions import Actions, Utils
from .dummyjail import DummyJail from .dummyjail import DummyJail
try: try:
from ..server.database import Fail2BanDb as Fail2BanDb from ..server import database
Fail2BanDb = database.Fail2BanDb
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
Fail2BanDb = None Fail2BanDb = None
from .utils import LogCaptureTestCase from .utils import LogCaptureTestCase, logSys as DefLogSys
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files")
# because of tests performance use memory instead of file: # because of tests performance use memory instead of file:
def getFail2BanDb(filename): def getFail2BanDb(filename):
if unittest.F2B.memory_db: # pragma: no cover if unittest.F2B.memory_db: # pragma: no cover
return Fail2BanDb(':memory:') return Fail2BanDb(':memory:')
return Fail2BanDb(filename) return Fail2BanDb(filename)
class DatabaseTest(LogCaptureTestCase): class DatabaseTest(LogCaptureTestCase):
skipping to change at line 239 skipping to change at line 240
tickets = self.db.getBans(jail=self.jail) tickets = self.db.getBans(jail=self.jail)
self.assertEqual(len(tickets), 1) self.assertEqual(len(tickets), 1)
self.assertTrue( self.assertTrue(
isinstance(tickets[0], FailTicket)) isinstance(tickets[0], FailTicket))
def testAddBanInvalidEncoded(self): def testAddBanInvalidEncoded(self):
self.testAddJail() self.testAddJail()
# invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ... # invalid + valid, invalid + valid unicode, invalid + valid dual converted (like in filter:readline by fallback) ...
tickets = [ tickets = [
FailTicket("127.0.0.1", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', 'use FailTicket("127.0.0.1", 0, ['user "test"', 'user "\xd1\xe2\xe5\
r "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']), xf2\xe0"', 'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.2", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', u'us FailTicket("127.0.0.2", 0, ['user "test"', u'user "\xd1\xe2\xe5
er "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']), \xf2\xe0"', u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.3", 0, ['user "\xd1\xe2\xe5\xf2\xe0"', b'us FailTicket("127.0.0.3", 0, ['user "test"', b'user "\xd1\xe2\xe5
er "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.decode('utf-8', 'replace')]) \xf2\xe0"', b'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']),
FailTicket("127.0.0.4", 0, ['user "test"', 'user "\xd1\xe2\xe5\
xf2\xe0"', u'user "\xe4\xf6\xfc\xdf"']),
FailTicket("127.0.0.5", 0, ['user "test"', 'unterminated \xcf']
),
FailTicket("127.0.0.6", 0, ['user "test"', u'unterminated \xcf'
]),
FailTicket("127.0.0.7", 0, ['user "test"', b'unterminated \xcf'
])
] ]
self.db.addBan(self.jail, tickets[0]) for ticket in tickets:
self.db.addBan(self.jail, tickets[1]) self.db.addBan(self.jail, ticket)
self.db.addBan(self.jail, tickets[2])
self.assertNotLogged("json dumps failed")
readtickets = self.db.getBans(jail=self.jail) readtickets = self.db.getBans(jail=self.jail)
self.assertEqual(len(readtickets), 3)
## python 2 or 3 : self.assertNotLogged("json loads failed")
invstr = u'user "\ufffd\ufffd\ufffd\ufffd\ufffd"'.encode('utf-8',
'replace') ## all tickets available
self.assertTrue( self.assertEqual(len(readtickets), 7)
readtickets[0] == FailTicket("127.0.0.1", 0, [invstr,
'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']) ## too different to cover all possible constellations for python
or readtickets[0] == tickets[0] 2 and 3,
) ## can replace/ignore some non-ascii chars by json dump/load (uni
self.assertTrue( code/str),
readtickets[1] == FailTicket("127.0.0.2", 0, [invstr, ## so check ip and matches count only:
u'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"'.encode('utf-8', 'replace')]) for i, ticket in enumerate(tickets):
or readtickets[1] == tickets[1] DefLogSys.debug('readtickets[%d]: %r', i, readtickets[i].
) getData())
self.assertTrue( DefLogSys.debug(' == tickets[%d]: %r', i, ticket.getData(
readtickets[2] == FailTicket("127.0.0.3", 0, [invstr, ))
'user "\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f"']) self.assertEqual(readtickets[i].getIP(), ticket.getIP())
or readtickets[2] == tickets[2] self.assertEqual(len(readtickets[i].getMatches()), len(ti
) cket.getMatches()))
self.pruneLog('[test-phase 2] simulate errors')
## simulate errors in dumps/loads:
priorEnc = database.PREFER_ENC
try:
database.PREFER_ENC = 'f2b-test::non-existing-encoding'
for ticket in tickets:
self.db.addBan(self.jail, ticket)
self.assertLogged("json dumps failed")
readtickets = self.db.getBans(jail=self.jail)
self.assertLogged("json loads failed")
## despite errors all tickets written and loaded (check a
dapter-handlers are error-safe):
self.assertEqual(len(readtickets), 14)
finally:
database.PREFER_ENC = priorEnc
## check the database is still operable (not locked) after all th
e errors:
self.pruneLog('[test-phase 3] still operable?')
self.db.addBan(self.jail, FailTicket("127.0.0.8"))
readtickets = self.db.getBans(jail=self.jail)
self.assertEqual(len(readtickets), 15)
self.assertNotLogged("json loads failed", "json dumps failed")
def _testAdd3Bans(self): def _testAdd3Bans(self):
self.testAddJail() self.testAddJail()
for i in (1, 2, 3): for i in (1, 2, 3):
ticket = FailTicket(("192.0.2.%d" % i), 0, ["test\n"]) ticket = FailTicket(("192.0.2.%d" % i), 0, ["test\n"])
self.db.addBan(self.jail, ticket) self.db.addBan(self.jail, ticket)
tickets = self.db.getBans(jail=self.jail) tickets = self.db.getBans(jail=self.jail)
self.assertEqual(len(tickets), 3) self.assertEqual(len(tickets), 3)
return tickets return tickets
 End of changes. 5 change blocks. 
30 lines changed or deleted 69 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)