"Fossies" - the Fresh Open Source Software Archive

Member "fail2ban-0.11.1/fail2ban/tests/sockettestcase.py" (11 Jan 2020, 7817 Bytes) of package /linux/misc/fail2ban-0.11.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "sockettestcase.py": 0.10.5_vs_0.11.1.

    1 # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
    2 # vi: set ft=python sts=4 ts=4 sw=4 noet :
    3 
    4 # This file is part of Fail2Ban.
    5 #
    6 # Fail2Ban is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # Fail2Ban is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with Fail2Ban; if not, write to the Free Software
   18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   19 
   20 # Author: Steven Hiscocks
   21 # 
   22 
   23 __author__ = "Steven Hiscocks"
   24 __copyright__ = "Copyright (c) 2013 Steven Hiscocks"
   25 __license__ = "GPL"
   26 
   27 import os
   28 import sys
   29 import tempfile
   30 import threading
   31 import time
   32 import unittest
   33 
   34 from .utils import LogCaptureTestCase
   35 
   36 from .. import protocol
   37 from ..server.asyncserver import asyncore, RequestHandler, loop, AsyncServer, AsyncServerException
   38 from ..server.utils import Utils
   39 from ..client.csocket import CSocket
   40 
   41 from .utils import LogCaptureTestCase
   42 
   43 
   44 def TestMsgError(*args):
   45     raise Exception('test unpickle error')
   46 class TestMsg(object):
   47     def __init__(self, unpickle=(TestMsgError, ())):
   48         self.unpickle = unpickle
   49     def __reduce__(self):
   50         return self.unpickle
   51 
   52 
   53 class Socket(LogCaptureTestCase):
   54 
   55     def setUp(self):
   56         """Call before every test case."""
   57         LogCaptureTestCase.setUp(self)
   58         super(Socket, self).setUp()
   59         self.server = AsyncServer(self)
   60         sock_fd, sock_name = tempfile.mkstemp('fail2ban.sock', 'f2b-socket')
   61         os.close(sock_fd)
   62         os.remove(sock_name)
   63         self.sock_name = sock_name
   64         self.serverThread = None
   65 
   66     def tearDown(self):
   67         """Call after every test case."""
   68         if self.serverThread:
   69             self.server.stop(); # stop if not already stopped
   70             self._stopServerThread()
   71         LogCaptureTestCase.tearDown(self)
   72 
   73     @staticmethod
   74     def proceed(message):
   75         """Test transmitter proceed method which just returns first arg"""
   76         return message
   77 
   78     def _createServerThread(self, force=False):
   79         # start in separate thread :
   80         self.serverThread = serverThread = threading.Thread(
   81             target=self.server.start, args=(self.sock_name, force))
   82         serverThread.daemon = True
   83         serverThread.start()
   84         self.assertTrue(Utils.wait_for(self.server.isActive, unittest.F2B.maxWaitTime(10)))
   85         return serverThread
   86     
   87     def _stopServerThread(self):
   88         serverThread = self.serverThread
   89         # wait for end of thread :
   90         Utils.wait_for(lambda: not serverThread.isAlive() 
   91             or serverThread.join(Utils.DEFAULT_SLEEP_TIME), unittest.F2B.maxWaitTime(10))
   92         self.serverThread = None
   93 
   94     def testStopPerCloseUnexpected(self):
   95         # start in separate thread :
   96         serverThread = self._createServerThread()
   97         # unexpected stop directly after start:
   98         self.server.close()
   99         # wait for end of thread :
  100         self._stopServerThread()
  101         self.assertFalse(serverThread.isAlive())
  102         # clean :
  103         self.server.stop()
  104         self.assertFalse(self.server.isActive())
  105         self.assertFalse(os.path.exists(self.sock_name))
  106 
  107     def _serverSocket(self):
  108         try:
  109             return CSocket(self.sock_name)
  110         except Exception as e:
  111             return None
  112 
  113     def testSocket(self):
  114         # start in separate thread :
  115         serverThread = self._createServerThread()
  116         client = Utils.wait_for(self._serverSocket, 2)
  117 
  118         testMessage = ["A", "test", "message"]
  119         self.assertEqual(client.send(testMessage), testMessage)
  120 
  121         # test wrong message:
  122         self.assertEqual(client.send([[TestMsg()]]), 'ERROR: test unpickle error')
  123         self.assertLogged("PROTO-error: load message failed:", "test unpickle error", all=True)
  124 
  125         # test good message again:
  126         self.assertEqual(client.send(testMessage), testMessage)
  127 
  128         # test close message
  129         client.close()
  130         # 2nd close does nothing
  131         client.close()
  132 
  133         # force shutdown:
  134         self.server.stop_communication()
  135         # test send again (should get in shutdown message):
  136         client = Utils.wait_for(self._serverSocket, 2)
  137         self.assertEqual(client.send(testMessage), ['SHUTDOWN'])
  138 
  139         self.server.stop()
  140         # wait for end of thread :
  141         self._stopServerThread()
  142         self.assertFalse(serverThread.isAlive())
  143         self.assertFalse(self.server.isActive())
  144         self.assertFalse(os.path.exists(self.sock_name))
  145 
  146     def testSocketConnectBroken(self):
  147         # start in separate thread :
  148         serverThread = self._createServerThread()
  149         client = Utils.wait_for(self._serverSocket, 2)
  150         # unexpected stop during message body:
  151         testMessage = ["A", "test", "message", [protocol.CSPROTO.END]]
  152         
  153         org_handler = RequestHandler.found_terminator
  154         try:
  155             RequestHandler.found_terminator = lambda self: self.close()
  156             self.assertRaisesRegexp(RuntimeError, r"socket connection broken", 
  157                 lambda: client.send(testMessage, timeout=unittest.F2B.maxWaitTime(10)))
  158         finally:
  159             RequestHandler.found_terminator = org_handler
  160 
  161     def testStopByCommunicate(self):
  162         # start in separate thread :
  163         serverThread = self._createServerThread()
  164         client = Utils.wait_for(self._serverSocket, 2)
  165 
  166         testMessage = ["A", "test", "message"]
  167         self.assertEqual(client.send(testMessage), testMessage)
  168 
  169         org_handler = RequestHandler.found_terminator
  170         try:
  171             RequestHandler.found_terminator = lambda self: TestMsgError()
  172             #self.assertRaisesRegexp(RuntimeError, r"socket connection broken", client.send, testMessage)
  173             self.assertEqual(client.send(testMessage), 'ERROR: test unpickle error')
  174         finally:
  175             RequestHandler.found_terminator = org_handler
  176         
  177         # check errors were logged:
  178         self.assertLogged("Unexpected communication error", "test unpickle error", all=True)
  179 
  180         self.server.stop()
  181         # wait for end of thread :
  182         self._stopServerThread()
  183         self.assertFalse(serverThread.isAlive())
  184 
  185     def testLoopErrors(self):
  186         # replace poll handler to produce error in loop-cycle:
  187         org_poll = asyncore.poll
  188         err = {'cntr': 0}
  189         def _produce_error(*args):
  190             err['cntr'] += 1
  191             if err['cntr'] < 50:
  192                 raise RuntimeError('test errors in poll')
  193             return org_poll(*args)
  194         
  195         try:
  196             asyncore.poll = _produce_error
  197             serverThread = self._createServerThread()
  198             # wait all-cases processed:
  199             self.assertTrue(Utils.wait_for(lambda: err['cntr'] > 50, unittest.F2B.maxWaitTime(10)))
  200         finally:
  201             # restore:
  202             asyncore.poll = org_poll
  203         # check errors were logged:
  204         self.assertLogged("Server connection was closed: test errors in poll",
  205             "Too many errors - stop logging connection errors", all=True)
  206 
  207     def testSocketForce(self):
  208         open(self.sock_name, 'w').close() # Create sock file
  209         # Try to start without force
  210         self.assertRaises(
  211             AsyncServerException, self.server.start, self.sock_name, False)
  212 
  213         # Try again with force set
  214         serverThread = self._createServerThread(True)
  215 
  216         self.server.stop()
  217         # wait for end of thread :
  218         self._stopServerThread()
  219         self.assertFalse(serverThread.isAlive())
  220         self.assertFalse(self.server.isActive())
  221         self.assertFalse(os.path.exists(self.sock_name))
  222 
  223 
  224 class ClientMisc(LogCaptureTestCase):
  225 
  226     def testErrorsInLoop(self):
  227         phase = {'cntr': 0}
  228         def _active():
  229             return phase['cntr'] < 40
  230         def _poll(*args):
  231             phase['cntr'] += 1
  232             raise Exception('test *%d*' % phase['cntr'])
  233         # test errors "catched" and logged:
  234         loop(_active, use_poll=_poll)
  235         self.assertLogged("test *1*", "test *10*", "test *20*", all=True)
  236         self.assertLogged("Too many errors - stop logging connection errors")
  237         self.assertNotLogged("test *21*", "test *22*", "test *23*", all=True)
  238 
  239     def testPrintFormattedAndWiki(self):
  240         # redirect stdout to devnull
  241         saved_stdout = sys.stdout
  242         sys.stdout = open(os.devnull, 'w')
  243         try:
  244             protocol.printFormatted()
  245             protocol.printWiki()
  246         finally:
  247             # restore stdout
  248             sys.stdout = saved_stdout