"Fossies" - the Fresh Open Source Software Archive

Member "fail2ban-0.11.1/fail2ban/tests/utils.py" (11 Jan 2020, 29863 Bytes) of package /linux/misc/fail2ban-0.11.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "utils.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 0.10.5_vs_0.11.1.

    1 # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
    2 # vi: set ft=python sts=4 ts=4 sw=4 noet :
    3 
    4 # This file is part of Fail2Ban.
    5 #
    6 # Fail2Ban is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # Fail2Ban is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with Fail2Ban; if not, write to the Free Software
   18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   19 
   20 
   21 __author__ = "Yaroslav Halchenko"
   22 __copyright__ = "Copyright (c) 2013 Yaroslav Halchenko"
   23 __license__ = "GPL"
   24 
   25 import fileinput
   26 import itertools
   27 import logging
   28 import optparse
   29 import os
   30 import re
   31 import tempfile
   32 import shutil
   33 import sys
   34 import time
   35 import threading
   36 import unittest
   37 
   38 from cStringIO import StringIO
   39 from functools import wraps
   40 
   41 from ..helpers import getLogger, str2LogLevel, getVerbosityFormat, uni_decode
   42 from ..server.ipdns import DNSUtils
   43 from ..server.mytime import MyTime
   44 from ..server.utils import Utils
   45 # for action_d.test_smtp :
   46 from ..server import asyncserver
   47 from ..version import version
   48 
   49 
   50 logSys = getLogger(__name__)
   51 
   52 TEST_NOW = 1124013600
   53 
   54 CONFIG_DIR = os.environ.get('FAIL2BAN_CONFIG_DIR', None)
   55 
   56 if not CONFIG_DIR:
   57 # Use heuristic to figure out where configuration files are
   58     if os.path.exists(os.path.join('config','fail2ban.conf')):
   59         CONFIG_DIR = 'config'
   60     else: # pragma: no cover - normally unreachable
   61         CONFIG_DIR = '/etc/fail2ban'
   62 
   63 # Indicates that we've stock config:
   64 STOCK = os.path.exists(os.path.join(CONFIG_DIR, 'fail2ban.conf'))
   65 
   66 # During the test cases (or setup) use fail2ban modules from main directory:
   67 os.putenv('PYTHONPATH', os.path.dirname(os.path.dirname(os.path.dirname(
   68     os.path.abspath(__file__)))))
   69 
   70 # Default options, if running from installer (setup.py):
   71 class DefaultTestOptions(optparse.Values):
   72     def __init__(self):
   73         self.__dict__ = {
   74             'log_level': None, 'verbosity': None, 'log_lazy': True, 
   75             'log_traceback': None, 'full_traceback': None,
   76             'fast': False, 'memory_db': False, 'no_gamin': False,
   77             'no_network': False, 'negate_re': False
   78         }
   79 
   80 #
   81 # Initialization
   82 #
   83 def getOptParser(doc=""):
   84     Option = optparse.Option
   85     # use module docstring for help output
   86     p = optparse.OptionParser(
   87                 usage="%s [OPTIONS] [regexps]\n" % sys.argv[0] + doc,
   88                 version="%prog " + version)
   89 
   90     p.add_options([
   91         Option('-l', "--log-level",
   92                dest="log_level",
   93                default=None,
   94                help="Log level for the logger to use during running tests"),
   95         Option('-v', action="count", dest="verbosity",
   96                default=None,
   97                help="Increase verbosity"),
   98         Option("--verbosity", action="store", dest="verbosity", type=int,
   99                default=None,
  100                help="Set numerical level of verbosity (0..4)"),
  101         Option("--log-direct", action="store_false",
  102                dest="log_lazy",
  103                default=True,
  104                help="Prevent lazy logging inside tests"),
  105         Option('-n', "--no-network", action="store_true",
  106                dest="no_network",
  107                help="Do not run tests that require the network"),
  108         Option('-g', "--no-gamin", action="store_true",
  109                dest="no_gamin",
  110                help="Do not run tests that require the gamin"),
  111         Option('-m', "--memory-db", action="store_true",
  112                dest="memory_db",
  113                help="Run database tests using memory instead of file"),
  114         Option('-f', "--fast", action="store_true",
  115                dest="fast",
  116                help="Try to increase speed of the tests, decreasing of wait intervals, memory database"),
  117         Option('-i', "--ignore", action="store_true",
  118                dest="negate_re",
  119                help="negate [regexps] filter to ignore tests matched specified regexps"),
  120         Option("-t", "--log-traceback", action='store_true',
  121                help="Enrich log-messages with compressed tracebacks"),
  122         Option("--full-traceback", action='store_true',
  123                help="Either to make the tracebacks full, not compressed (as by default)"),
  124         ])
  125     return p
  126 
  127 def initProcess(opts):
  128     # Logger:
  129     global logSys
  130     logSys = getLogger("fail2ban")
  131 
  132     llev = None
  133     if opts.log_level is not None: # pragma: no cover
  134         # so we had explicit settings
  135         llev = str2LogLevel(opts.log_level)
  136         logSys.setLevel(llev)
  137     else: # pragma: no cover
  138         # suppress the logging but it would leave unittests' progress dots
  139         # ticking, unless like with '-l critical' which would be silent
  140         # unless error occurs
  141         logSys.setLevel(logging.CRITICAL)
  142     opts.log_level = logSys.level
  143 
  144     # Numerical level of verbosity corresponding to a given log "level"
  145     verbosity = opts.verbosity
  146     if verbosity is None:
  147         verbosity = (
  148             1 if llev is None else \
  149             4 if llev <= logging.HEAVYDEBUG else \
  150             3 if llev <= logging.DEBUG else \
  151             2 if llev <= min(logging.INFO, logging.NOTICE) else \
  152             1 if llev <= min(logging.WARNING, logging.ERROR) else \
  153             0 # if llev <= logging.CRITICAL
  154         )
  155         opts.verbosity = verbosity
  156 
  157     # Add the default logging handler
  158     stdout = logging.StreamHandler(sys.stdout)
  159 
  160     fmt = ' %(message)s'
  161 
  162     if opts.log_traceback: # pragma: no cover
  163         from ..helpers import FormatterWithTraceBack as Formatter
  164         fmt = (opts.full_traceback and ' %(tb)s' or ' %(tbc)s') + fmt
  165     else:
  166         Formatter = logging.Formatter
  167 
  168     # Custom log format for the verbose tests runs
  169     fmt = getVerbosityFormat(verbosity, fmt)
  170 
  171     #
  172     stdout.setFormatter(Formatter(fmt))
  173     logSys.addHandler(stdout)
  174 
  175     # Let know the version
  176     if opts.verbosity != 0:
  177         print("Fail2ban %s test suite. Python %s. Please wait..." \
  178                 % (version, str(sys.version).replace('\n', '')))
  179 
  180     return opts;
  181 
  182 
  183 class F2B(DefaultTestOptions):
  184 
  185     MAX_WAITTIME = 60
  186     MID_WAITTIME = 30
  187 
  188     def __init__(self, opts):
  189         self.__dict__ = opts.__dict__
  190         if self.fast: # pragma: no cover - normal mode in travis
  191             self.memory_db = True
  192             self.no_gamin = True
  193         self.__dict__['share_config'] = {}
  194     def SkipIfFast(self):
  195         pass
  196     def SkipIfNoNetwork(self):
  197         pass
  198 
  199     def SkipIfCfgMissing(self, **kwargs):
  200         """Helper to check action/filter config is available
  201         """
  202         if not STOCK: # pragma: no cover
  203             if kwargs.get('stock'):
  204                 raise unittest.SkipTest('Skip test because of missing stock-config files')
  205             for t in ('action', 'filter'):
  206                 v = kwargs.get(t)
  207                 if v is None: continue
  208                 if os.path.splitext(v)[1] == '': v += '.conf'
  209                 if not os.path.exists(os.path.join(CONFIG_DIR, t+'.d', v)):
  210                     raise unittest.SkipTest('Skip test because of missing %s-config for %r' % (t, v))
  211 
  212     def skip_if_cfg_missing(self, **decargs):
  213         """Helper decorator to check action/filter config is available
  214         """
  215         def _deco_wrapper(f):
  216             @wraps(f)
  217             def wrapper(self, *args, **kwargs):
  218                 unittest.F2B.SkipIfCfgMissing(**decargs)
  219                 return f(self, *args, **kwargs)
  220             return wrapper
  221         return _deco_wrapper
  222 
  223     def maxWaitTime(self, wtime=True):
  224         if isinstance(wtime, bool) and wtime:
  225             wtime = self.MAX_WAITTIME
  226         # short only integer interval (avoid by conditional wait with callable, and dual 
  227         # wrapping in some routines, if it will be called twice):
  228         if self.fast and isinstance(wtime, int):
  229             wtime = float(wtime) / 10
  230         return wtime
  231 
  232 
  233 def with_tmpdir(f):
  234     """Helper decorator to create a temporary directory
  235 
  236     Directory gets removed after function returns, regardless
  237     if exception was thrown of not
  238     """
  239     @wraps(f)
  240     def wrapper(self, *args, **kwargs):
  241         tmp = tempfile.mkdtemp(prefix="f2b-temp")
  242         try:
  243             return f(self, tmp, *args, **kwargs)
  244         finally:
  245             # clean up
  246             shutil.rmtree(tmp)
  247     return wrapper
  248 
  249 def with_alt_time(f):
  250     """Helper decorator to execute test in alternate (fixed) test time."""
  251     @wraps(f)
  252     def wrapper(self, *args, **kwargs):
  253         setUpMyTime()
  254         try:
  255             return f(self, *args, **kwargs)
  256         finally:
  257             tearDownMyTime()
  258     return wrapper
  259 
  260 
  261 # backwards compatibility to python 2.6:
  262 if not hasattr(unittest, 'SkipTest'): # pragma: no cover
  263     class SkipTest(Exception):
  264         pass
  265     unittest.SkipTest = SkipTest
  266     _org_AddError = unittest._TextTestResult.addError
  267     def addError(self, test, err):
  268         if err[0] is SkipTest: 
  269             if self.showAll:
  270                 self.stream.writeln(str(err[1]))
  271             elif self.dots:
  272                 self.stream.write('s')
  273                 self.stream.flush()
  274             return
  275         _org_AddError(self, test, err)
  276     unittest._TextTestResult.addError = addError
  277 
  278 def initTests(opts):
  279     ## if running from installer (setup.py):
  280     if not opts:
  281         opts = initProcess(DefaultTestOptions())
  282     unittest.F2B = F2B(opts)
  283     # --fast :
  284     if unittest.F2B.fast: # pragma: no cover
  285         # racy decrease default sleep intervals to test it faster 
  286         # (prevent long sleeping during test cases ... less time goes to sleep):
  287         Utils.DEFAULT_SLEEP_TIME = 0.0025
  288         Utils.DEFAULT_SLEEP_INTERVAL = 0.0005
  289         Utils.DEFAULT_SHORT_INTERVAL = 0.0001
  290         def F2B_SkipIfFast():
  291             raise unittest.SkipTest('Skip test because of "--fast"')
  292         unittest.F2B.SkipIfFast = F2B_SkipIfFast
  293     else:
  294         # smaller inertance inside test-cases (litle speedup):
  295         Utils.DEFAULT_SLEEP_TIME = 0.25
  296         Utils.DEFAULT_SLEEP_INTERVAL = 0.025
  297         Utils.DEFAULT_SHORT_INTERVAL = 0.0005
  298         # sleep intervals are large - use replacement for sleep to check time to sleep:
  299         _org_sleep = time.sleep
  300         def _new_sleep(v):
  301             if v > max(1, Utils.DEFAULT_SLEEP_TIME): # pragma: no cover
  302                 raise ValueError('[BAD-CODE] To long sleep interval: %s, try to use conditional Utils.wait_for instead' % v)
  303             _org_sleep(min(v, Utils.DEFAULT_SLEEP_TIME))
  304         time.sleep = _new_sleep
  305     # --no-network :
  306     if unittest.F2B.no_network: # pragma: no cover
  307         def F2B_SkipIfNoNetwork():
  308             raise unittest.SkipTest('Skip test because of "--no-network"')
  309         unittest.F2B.SkipIfNoNetwork = F2B_SkipIfNoNetwork
  310 
  311     # persistently set time zone to CET (used in zone-related test-cases),
  312     # yoh: we need to adjust TZ to match the one used by Cyril so all the timestamps match
  313     # This offset corresponds to Europe/Zurich timezone.  Specifying it
  314     # explicitly allows to avoid requiring tzdata package to be installed during
  315     # testing.   See https://bugs.debian.org/855920 for more information
  316     os.environ['TZ'] = 'CET-01CEST-02,M3.5.0,M10.5.0'
  317     time.tzset()
  318     # set alternate now for time related test cases:
  319     MyTime.setAlternateNow(TEST_NOW)
  320 
  321     # precache all invalid ip's (TEST-NET-1, ..., TEST-NET-3 according to RFC 5737):
  322     c = DNSUtils.CACHE_ipToName
  323     # increase max count and max time (too many entries, long time testing):
  324     c.setOptions(maxCount=10000, maxTime=5*60)
  325     for i in xrange(256):
  326         c.set('192.0.2.%s' % i, None)
  327         c.set('198.51.100.%s' % i, None)
  328         c.set('203.0.113.%s' % i, None)
  329         c.set('2001:db8::%s' %i, 'test-host')
  330     # some legal ips used in our test cases (prevent slow dns-resolving and failures if will be changed later):
  331     c.set('2001:db8::ffff', 'test-other')
  332     c.set('87.142.124.10', 'test-host')
  333     if unittest.F2B.no_network: # pragma: no cover
  334         # precache all wrong dns to ip's used in test cases:
  335         c = DNSUtils.CACHE_nameToIp
  336         for i in (
  337             ('999.999.999.999', set()),
  338             ('abcdef.abcdef', set()),
  339             ('192.168.0.', set()),
  340             ('failed.dns.ch', set()),
  341         ):
  342             c.set(*i)
  343         # if fast - precache all host names as localhost addresses (speed-up getSelfIPs/ignoreself):
  344         if unittest.F2B.fast: # pragma: no cover
  345             for i in DNSUtils.getSelfNames():
  346                 c.set(i, DNSUtils.dnsToIp('localhost'))
  347 
  348 
  349 def mtimesleep():
  350     # no sleep now should be necessary since polling tracks now not only
  351     # mtime but also ino and size
  352     pass
  353 
  354 old_TZ = os.environ.get('TZ', None)
  355 
  356 
  357 def setUpMyTime():
  358     # Set the time to a fixed, known value
  359     # Sun Aug 14 12:00:00 CEST 2005
  360     MyTime.setTime(TEST_NOW)
  361 
  362 
  363 def tearDownMyTime():
  364     MyTime.myTime = None
  365 
  366 
  367 def gatherTests(regexps=None, opts=None):
  368     initTests(opts)
  369     # Import all the test cases here instead of a module level to
  370     # avoid circular imports
  371     from . import banmanagertestcase
  372     from . import clientbeautifiertestcase
  373     from . import clientreadertestcase
  374     from . import tickettestcase
  375     from . import failmanagertestcase
  376     from . import filtertestcase
  377     from . import servertestcase
  378     from . import datedetectortestcase
  379     from . import actiontestcase
  380     from . import actionstestcase
  381     from . import sockettestcase
  382     from . import misctestcase
  383     from . import databasetestcase
  384     from . import observertestcase
  385     from . import samplestestcase
  386     from . import fail2banclienttestcase
  387     from . import fail2banregextestcase
  388 
  389     if not regexps: # pragma: no cover
  390         tests = unittest.TestSuite()
  391     else: # pragma: no cover
  392         class FilteredTestSuite(unittest.TestSuite):
  393             _regexps = [re.compile(r) for r in regexps]
  394 
  395             def addTest(self, suite):
  396                 matched = []
  397                 for test in suite:
  398                     # test of suite loaded with loadTestsFromName may be a suite self:
  399                     if isinstance(test, unittest.TestSuite): # pragma: no cover
  400                         self.addTest(test)
  401                         continue
  402                     # filter by regexp:
  403                     s = str(test)
  404                     for r in self._regexps:
  405                         m = r.search(s)
  406                         if (m if not opts.negate_re else not m):
  407                             matched.append(test)
  408                             break
  409                 for test in matched:
  410                     super(FilteredTestSuite, self).addTest(test)
  411 
  412         tests = FilteredTestSuite()
  413 
  414     # Server
  415     tests.addTest(unittest.makeSuite(servertestcase.Transmitter))
  416     tests.addTest(unittest.makeSuite(servertestcase.JailTests))
  417     tests.addTest(unittest.makeSuite(servertestcase.RegexTests))
  418     tests.addTest(unittest.makeSuite(servertestcase.LoggingTests))
  419     tests.addTest(unittest.makeSuite(servertestcase.ServerConfigReaderTests))
  420     tests.addTest(unittest.makeSuite(actiontestcase.CommandActionTest))
  421     tests.addTest(unittest.makeSuite(actionstestcase.ExecuteActions))
  422     # Ticket, BanTicket, FailTicket
  423     tests.addTest(unittest.makeSuite(tickettestcase.TicketTests))
  424     # FailManager
  425     tests.addTest(unittest.makeSuite(failmanagertestcase.AddFailure))
  426     tests.addTest(unittest.makeSuite(failmanagertestcase.FailmanagerComplex))
  427     # BanManager
  428     tests.addTest(unittest.makeSuite(banmanagertestcase.AddFailure))
  429     try:
  430         import dns
  431         tests.addTest(unittest.makeSuite(banmanagertestcase.StatusExtendedCymruInfo))
  432     except ImportError: # pragma: no cover
  433         pass
  434     
  435     # ClientBeautifier
  436     tests.addTest(unittest.makeSuite(clientbeautifiertestcase.BeautifierTest))
  437 
  438     # ClientReaders
  439     tests.addTest(unittest.makeSuite(clientreadertestcase.ConfigReaderTest))
  440     tests.addTest(unittest.makeSuite(clientreadertestcase.JailReaderTest))
  441     tests.addTest(unittest.makeSuite(clientreadertestcase.FilterReaderTest))
  442     tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTest))
  443     tests.addTest(unittest.makeSuite(clientreadertestcase.JailsReaderTestCache))
  444     # CSocket and AsyncServer
  445     tests.addTest(unittest.makeSuite(sockettestcase.Socket))
  446     tests.addTest(unittest.makeSuite(sockettestcase.ClientMisc))
  447     # Misc helpers
  448     tests.addTest(unittest.makeSuite(misctestcase.HelpersTest))
  449     tests.addTest(unittest.makeSuite(misctestcase.SetupTest))
  450     tests.addTest(unittest.makeSuite(misctestcase.TestsUtilsTest))
  451     tests.addTest(unittest.makeSuite(misctestcase.MyTimeTest))
  452     # Database
  453     tests.addTest(unittest.makeSuite(databasetestcase.DatabaseTest))
  454     # Observer
  455     tests.addTest(unittest.makeSuite(observertestcase.ObserverTest))
  456     tests.addTest(unittest.makeSuite(observertestcase.BanTimeIncr))
  457     tests.addTest(unittest.makeSuite(observertestcase.BanTimeIncrDB))
  458 
  459     # Filter
  460     tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIP))
  461     tests.addTest(unittest.makeSuite(filtertestcase.BasicFilter))
  462     tests.addTest(unittest.makeSuite(filtertestcase.LogFile))
  463     tests.addTest(unittest.makeSuite(filtertestcase.LogFileMonitor))
  464     tests.addTest(unittest.makeSuite(filtertestcase.LogFileFilterPoll))
  465     # each test case class self will check no network, and skip it (we see it in log)
  466     tests.addTest(unittest.makeSuite(filtertestcase.IgnoreIPDNS))
  467     tests.addTest(unittest.makeSuite(filtertestcase.GetFailures))
  468     tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsTests))
  469     tests.addTest(unittest.makeSuite(filtertestcase.DNSUtilsNetworkTests))
  470     tests.addTest(unittest.makeSuite(filtertestcase.JailTests))
  471 
  472     # DateDetector
  473     tests.addTest(unittest.makeSuite(datedetectortestcase.DateDetectorTest))
  474     tests.addTest(unittest.makeSuite(datedetectortestcase.CustomDateFormatsTest))
  475     # Filter Regex tests with sample logs
  476     tests.addTest(unittest.makeSuite(samplestestcase.FilterSamplesRegex))
  477 
  478     # bin/fail2ban-client, bin/fail2ban-server
  479     tests.addTest(unittest.makeSuite(fail2banclienttestcase.Fail2banClientTest))
  480     tests.addTest(unittest.makeSuite(fail2banclienttestcase.Fail2banServerTest))
  481     # bin/fail2ban-regex
  482     tests.addTest(unittest.makeSuite(fail2banregextestcase.Fail2banRegexTest))
  483 
  484     #
  485     # Python action testcases
  486     #
  487     testloader = unittest.TestLoader()
  488     from . import action_d
  489     for file_ in os.listdir(
  490         os.path.abspath(os.path.dirname(action_d.__file__))):
  491         if file_.startswith("test_") and file_.endswith(".py"):
  492             tests.addTest(testloader.loadTestsFromName(
  493                 "%s.%s" % (action_d.__name__, os.path.splitext(file_)[0])))
  494 
  495     #
  496     # Extensive use-tests of different available filters backends
  497     #
  498 
  499     from ..server.filterpoll import FilterPoll
  500     filters = [FilterPoll]                    # always available
  501 
  502     # Additional filters available only if external modules are available
  503     # yoh: Since I do not know better way for parametric tests
  504     #      with good old unittest
  505     try:
  506         # because gamin can be very slow on some platforms (and can produce many failures 
  507         # with fast sleep interval) - skip it by fast run:
  508         if unittest.F2B.fast or unittest.F2B.no_gamin: # pragma: no cover
  509             raise ImportError('Skip, fast: %s, no_gamin: %s' % (unittest.F2B.fast, unittest.F2B.no_gamin))
  510         from ..server.filtergamin import FilterGamin
  511         filters.append(FilterGamin)
  512     except ImportError as e: # pragma: no cover
  513         logSys.warning("Skipping gamin backend testing. Got exception '%s'" % e)
  514 
  515     try:
  516         from ..server.filterpyinotify import FilterPyinotify
  517         filters.append(FilterPyinotify)
  518     except ImportError as e: # pragma: no cover
  519         logSys.warning("I: Skipping pyinotify backend testing. Got exception '%s'" % e)
  520 
  521     for Filter_ in filters:
  522         tests.addTest(unittest.makeSuite(
  523             filtertestcase.get_monitor_failures_testcase(Filter_)))
  524     try: # pragma: systemd no cover
  525         from ..server.filtersystemd import FilterSystemd
  526         tests.addTest(unittest.makeSuite(filtertestcase.get_monitor_failures_journal_testcase(FilterSystemd)))
  527     except ImportError as e: # pragma: no cover
  528         logSys.warning("I: Skipping systemd backend testing. Got exception '%s'" % e)
  529 
  530     # Server test for logging elements which break logging used to support
  531     # testcases analysis
  532     tests.addTest(unittest.makeSuite(servertestcase.TransmitterLogging))
  533 
  534     return tests
  535 
  536 
  537 #
  538 # Forwards compatibility of unittest.TestCase for some early python versions
  539 #
  540 
  541 import difflib, pprint
  542 if not hasattr(unittest.TestCase, 'assertDictEqual'):
  543     def assertDictEqual(self, d1, d2, msg=None):
  544         self.assert_(isinstance(d1, dict), 'First argument is not a dictionary')
  545         self.assert_(isinstance(d2, dict), 'Second argument is not a dictionary')
  546         if d1 != d2:
  547             standardMsg = '%r != %r' % (d1, d2)
  548             diff = ('\n' + '\n'.join(difflib.ndiff(
  549                 pprint.pformat(d1).splitlines(),
  550                 pprint.pformat(d2).splitlines())))
  551             msg = msg or (standardMsg + diff)
  552             self.fail(msg)
  553     unittest.TestCase.assertDictEqual = assertDictEqual
  554 
  555 def assertSortedEqual(self, a, b, level=1, nestedOnly=True, key=repr, msg=None):
  556     """Compare complex elements (like dict, list or tuple) in sorted order until
  557     level 0 not reached (initial level = -1 meant all levels),
  558     or if nestedOnly set to True and some of the objects still contains nested lists or dicts.
  559     """
  560     # used to recognize having element as nested dict, list or tuple:
  561     def _is_nested(v):
  562         if isinstance(v, dict):
  563             return any(isinstance(v, (dict, list, tuple)) for v in v.itervalues())
  564         return any(isinstance(v, (dict, list, tuple)) for v in v)
  565     # level comparison routine:
  566     def _assertSortedEqual(a, b, level, nestedOnly, key):
  567         # first the lengths:
  568         if len(a) != len(b):
  569             raise ValueError('%r != %r' % (a, b))
  570         # if not allow sorting of nested - just compare directly:
  571         if not level and (nestedOnly and (not _is_nested(a) and not _is_nested(b))):
  572             if a == b:
  573                 return
  574             raise ValueError('%r != %r' % (a, b))
  575         if isinstance(a, dict) and isinstance(b, dict): # compare dict's:
  576             for k, v1 in a.iteritems():
  577                 v2 = b[k]
  578                 if isinstance(v1, (dict, list, tuple)) and isinstance(v2, (dict, list, tuple)):
  579                     _assertSortedEqual(v1, v2, level-1 if level != 0 else 0, nestedOnly, key)
  580                 elif v1 != v2:
  581                     raise ValueError('%r != %r' % (a, b))
  582         else: # list, tuple, something iterable:
  583             a = sorted(a, key=key)
  584             b = sorted(b, key=key)
  585             for v1, v2 in zip(a, b):
  586                 if isinstance(v1, (dict, list, tuple)) and isinstance(v2, (dict, list, tuple)):
  587                     _assertSortedEqual(v1, v2, level-1 if level != 0 else 0, nestedOnly, key)
  588                 elif v1 != v2:
  589                     raise ValueError('%r != %r' % (a, b))
  590     # compare and produce assertion-error by exception:
  591     try:
  592         _assertSortedEqual(a, b, level, nestedOnly, key)
  593     except Exception as e:
  594         standardMsg = e.args[0] if isinstance(e, ValueError) else (str(e) + "\nwithin:")
  595         diff = ('\n' + '\n'.join(difflib.ndiff(
  596             pprint.pformat(a).splitlines(),
  597             pprint.pformat(b).splitlines())))
  598         msg = msg or (standardMsg + diff)
  599         self.fail(msg)
  600 unittest.TestCase.assertSortedEqual = assertSortedEqual
  601 
  602 if not hasattr(unittest.TestCase, 'assertRaisesRegexp'):
  603     def assertRaisesRegexp(self, exccls, regexp, fun, *args, **kwargs):
  604         try:
  605             fun(*args, **kwargs)
  606         except exccls as e:
  607             if re.search(regexp, str(e)) is None:
  608                 self.fail('\"%s\" does not match \"%s\"' % (regexp, e))
  609         else:
  610             self.fail('%s not raised' % getattr(exccls, '__name__'))
  611     unittest.TestCase.assertRaisesRegexp = assertRaisesRegexp
  612 
  613 # always custom following methods, because we use atm better version of both (support generators)
  614 if True: ## if not hasattr(unittest.TestCase, 'assertIn'):
  615     def assertIn(self, a, b, msg=None):
  616         bb = b
  617         wrap = False
  618         if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring):
  619             b, bb = itertools.tee(b)
  620             wrap = True
  621         if a not in b:
  622             if wrap: bb = list(bb)
  623             msg = msg or "%r was not found in %r" % (a, bb)
  624             self.fail(msg)
  625     unittest.TestCase.assertIn = assertIn
  626     def assertNotIn(self, a, b, msg=None):
  627         bb = b
  628         wrap = False
  629         if msg is None and hasattr(b, '__iter__') and not isinstance(b, basestring):
  630             b, bb = itertools.tee(b)
  631             wrap = True
  632         if a in b:
  633             if wrap: bb = list(bb)
  634             msg = msg or "%r unexpectedly found in %r" % (a, bb)
  635             self.fail(msg)
  636     unittest.TestCase.assertNotIn = assertNotIn
  637 
  638 _org_setUp = unittest.TestCase.setUp
  639 def _customSetUp(self):
  640     # print('=='*10, self)
  641     # so if DEBUG etc -- show them (and log it in travis)!
  642     if unittest.F2B.log_level <= logging.DEBUG: # pragma: no cover
  643         sys.stderr.write("\n")
  644         logSys.debug('='*10 + ' %s ' + '='*20, self.id())
  645     _org_setUp(self)
  646     if unittest.F2B.verbosity > 2: # pragma: no cover
  647         self.__startTime = time.time()
  648 
  649 _org_tearDown = unittest.TestCase.tearDown
  650 def _customTearDown(self):
  651     if unittest.F2B.verbosity > 2: # pragma: no cover
  652         sys.stderr.write(" %.3fs -- " % (time.time() - self.__startTime,))
  653 
  654 unittest.TestCase.setUp = _customSetUp
  655 unittest.TestCase.tearDown = _customTearDown
  656 
  657 
  658 class LogCaptureTestCase(unittest.TestCase):
  659 
  660     class _MemHandler(logging.Handler):
  661         """Logging handler helper
  662         
  663         Affords not to delegate logging to StreamHandler at all,
  664         format lazily on demand in getvalue.
  665         Increases performance inside the LogCaptureTestCase tests, because there
  666         the log level set to DEBUG.
  667         """
  668 
  669         def __init__(self, lazy=True):
  670             self._lock = threading.Lock()
  671             self._val = ''
  672             self._dirty = 0
  673             self._recs = list()
  674             self._nolckCntr = 0
  675             self._strm = StringIO()
  676             logging.Handler.__init__(self)
  677             if lazy:
  678                 self.handle = self._handle_lazy
  679             
  680         def truncate(self, size=None):
  681             """Truncate the internal buffer and records."""
  682             if size: # pragma: no cover - not implemented now
  683                 raise Exception('invalid size argument: %r, should be None or 0' % size)
  684             self._val = ''
  685             with self._lock:
  686                 self._dirty = 0
  687                 self._recs = list()
  688                 self._strm.truncate(0)
  689 
  690         def __write(self, record):
  691             try:
  692                 msg = record.getMessage() + '\n'
  693                 try:
  694                     self._strm.write(msg)
  695                 except UnicodeEncodeError: # pragma: no cover - normally unreachable now
  696                     self._strm.write(msg.encode('UTF-8', 'replace'))
  697             except Exception as e: # pragma: no cover - normally unreachable
  698                 self._strm.write('Error by logging handler: %r' % e)
  699 
  700         def getvalue(self):
  701             """Return current buffer as whole string."""
  702             # if cached (still unchanged/no write operation), we don't need to enter lock:
  703             if not self._dirty:
  704                 return self._val
  705             # try to lock, if not possible - return cached/empty (max 5 times):
  706             lck = self._lock.acquire(False)
  707             # if records changed:
  708             if self._dirty & 2:
  709                 if not lck: # pragma: no cover (may be too sporadic on slow systems)
  710                     self._nolckCntr += 1
  711                     if self._nolckCntr <= 5:
  712                         return self._val
  713                     self._nolckCntr = 0
  714                     self._lock.acquire()
  715                 # minimize time of lock, avoid dead-locking during cross lock within self._strm ...
  716                 try:
  717                     self._dirty &= ~3 # reset dirty records/buffer flag before cache value built
  718                     recs = self._recs
  719                     self._recs = list()
  720                 finally:
  721                     self._lock.release()
  722                 # submit already emitted (delivered to handle) records:
  723                 for record in recs:
  724                     self.__write(record)
  725             elif lck: # pragma: no cover - too sporadic for coverage
  726                 # reset dirty buffer flag (if we can lock, otherwise just next time):
  727                 self._dirty &= ~1 # reset dirty buffer flag
  728                 self._lock.release()
  729             # cache (outside of log to avoid dead-locking during cross lock within self._strm):
  730             self._val = self._strm.getvalue()
  731             # return current string value:
  732             return self._val
  733              
  734         def handle(self, record): # pragma: no cover
  735             """Handle the specified record direct (not lazy)"""
  736             self.__write(record)
  737             # string buffer changed:
  738             with self._lock:
  739                 self._dirty |= 1 # buffer changed
  740 
  741         def _handle_lazy(self, record):
  742             """Lazy handle the specified record on demand"""
  743             with self._lock:
  744                 self._recs.append(record)
  745                 # logged - causes changed string buffer (signal by set _dirty):
  746                 self._dirty |= 2 # records changed
  747 
  748     def setUp(self):
  749         # For extended testing of what gets output into logging
  750         # system, we will redirect it to a string
  751         # Keep old settings
  752         self._old_level = logSys.level
  753         self._old_handlers = logSys.handlers
  754         # Let's log everything into a string
  755         self._log = LogCaptureTestCase._MemHandler(unittest.F2B.log_lazy)
  756         logSys.handlers = [self._log]
  757         # lowest log level to capture messages (expected in tests) is Lev.9
  758         if self._old_level <= logging.DEBUG: # pragma: no cover
  759             logSys.handlers += self._old_handlers
  760         if self._old_level > logging.DEBUG-1:
  761             logSys.setLevel(logging.DEBUG-1)
  762         super(LogCaptureTestCase, self).setUp()
  763 
  764     def tearDown(self):
  765         """Call after every test case."""
  766         # print "O: >>%s<<" % self._log.getvalue()
  767         self.pruneLog()
  768         logSys.handlers = self._old_handlers
  769         logSys.level = self._old_level
  770         super(LogCaptureTestCase, self).tearDown()
  771 
  772     def _is_logged(self, *s, **kwargs):
  773         logged = self._log.getvalue()
  774         if not kwargs.get('all', False):
  775             # at least one entry should be found:
  776             for s_ in s:
  777                 if s_ in logged:
  778                     return True
  779             if True: # pragma: no cover
  780                 return False
  781         else:
  782             # each entry should be found:
  783             for s_ in s:
  784                 if s_ not in logged: # pragma: no cover
  785                     return False
  786             return True
  787 
  788     def assertLogged(self, *s, **kwargs):
  789         """Assert that one of the strings was logged
  790 
  791         Preferable to assertTrue(self._is_logged(..)))
  792         since provides message with the actual log.
  793 
  794         Parameters
  795         ----------
  796         s : string or list/set/tuple of strings
  797           Test should succeed if string (or any of the listed) is present in the log
  798         all : boolean (default False) if True should fail if any of s not logged
  799         """
  800         wait = kwargs.get('wait', None)
  801         if wait:
  802             wait = unittest.F2B.maxWaitTime(wait)
  803             res = Utils.wait_for(lambda: self._is_logged(*s, **kwargs), wait)
  804         else:
  805             res = self._is_logged(*s, **kwargs)
  806         if not kwargs.get('all', False):
  807             # at least one entry should be found:
  808             if not res:
  809                 logged = self._log.getvalue()
  810                 self.fail("None among %r was found in the log%s: ===\n%s===" % (s, 
  811                     ((', waited %s' % wait) if wait else ''), logged))
  812         else:
  813             # each entry should be found:
  814             if not res:
  815                 logged = self._log.getvalue()
  816                 for s_ in s:
  817                     if s_ not in logged:
  818                         self.fail("%r was not found in the log%s: ===\n%s===" % (s_, 
  819                             ((', waited %s' % wait) if wait else ''), logged))
  820 
  821     def assertNotLogged(self, *s, **kwargs):
  822         """Assert that strings were not logged
  823 
  824         Parameters
  825         ----------
  826         s : string or list/set/tuple of strings
  827           Test should succeed if the string (or at least one of the listed) is not
  828           present in the log
  829         all : boolean (default False) if True should fail if any of s logged
  830         """
  831         logged = self._log.getvalue()
  832         if len(s) > 1 and not kwargs.get('all', False):
  833             for s_ in s:
  834                 if s_ not in logged:
  835                     return
  836             self.fail("All of the %r were found present in the log: ===\n%s===" % (s, logged))
  837         else:
  838             for s_ in s:
  839                 if s_ in logged:
  840                     self.fail("%r was found in the log: ===\n%s===" % (s_, logged))
  841 
  842     def pruneLog(self, logphase=None):
  843         self._log.truncate(0)
  844         if logphase:
  845             logSys.debug('='*5 + ' %s ' + '='*5, logphase)
  846 
  847     def getLog(self):
  848         return self._log.getvalue()
  849 
  850     @staticmethod
  851     def dumpFile(fn, handle=logSys.debug):
  852         """Helper which outputs content of the file at HEAVYDEBUG loglevels"""
  853         if (handle != logSys.debug or logSys.getEffectiveLevel() <= logging.DEBUG):
  854             handle('---- ' + fn + ' ----')
  855             for line in fileinput.input(fn):
  856                 line = line.rstrip('\n')
  857                 handle(line)
  858             handle('-'*30)
  859 
  860 
  861 pid_exists = Utils.pid_exists