"Fossies" - the Fresh Open Source Software Archive

Member "LinOTP-release-2.11/linotpd/src/linotp/tests/integration/linotp_selenium_helper/smtp_server.py" (12 Nov 2019, 6378 Bytes) of package /linux/misc/LinOTP-release-2.11.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 # -*- coding: utf-8 -*-
    2 #
    3 #    LinOTP - the open source solution for two factor authentication
    4 #    Copyright (C) 2010 - 2019 KeyIdentity GmbH
    5 #
    6 #    This file is part of LinOTP server.
    7 #
    8 #    This program is free software: you can redistribute it and/or
    9 #    modify it under the terms of the GNU Affero General Public
   10 #    License, version 3, as published by the Free Software Foundation.
   11 #
   12 #    This program is distributed in the hope that it will be useful,
   13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
   14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   15 #    GNU Affero General Public License for more details.
   16 #
   17 #    You should have received a copy of the
   18 #               GNU Affero General Public License
   19 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
   20 #
   21 #
   22 #    E-mail: linotp@keyidentity.com
   23 #    Contact: www.linotp.org
   24 #    Support: www.keyidentity.com
   25 #
   26 from contextlib import closing
   27 from smtpd import SMTPServer
   28 
   29 import asyncore
   30 import email
   31 import logging
   32 import socket
   33 
   34 from .set_config import SetConfig
   35 from multiprocessing.process import Process
   36 from multiprocessing.queues import Queue
   37 
   38 """
   39 This file contains functionality to set up an SMTP
   40 process to receive messages from LinOTP and pass
   41 them back to the test case. 
   42 """
   43 
   44 logger = logging.getLogger(__name__)
   45 
   46 
   47 class SmtpListener(SMTPServer):
   48     """
   49     SMTPServer class to handle incoming SMTP messages
   50     """
   51 
   52     def __init__(self, localaddr, queue):
   53         SMTPServer.__init__(self, localaddr, None)
   54         self.queue = queue
   55 
   56     def process_message(self, peer, mailfrom, rcpttos, data):
   57         logger.debug("Mail from:%s to:%s data:<%s>", mailfrom, rcpttos, data)
   58         self.queue.put(data)
   59 
   60     def get_port(self):
   61         assert self.socket
   62         addr = self.socket.getsockname()
   63         logger.debug("SMTP server listening on %s:%s", addr[0], addr[1])
   64         return addr[1]
   65 
   66 
   67 def get_otp_mail(queue, timeout):
   68     """
   69     Background process runner
   70 
   71     * Start an SMTP server
   72     * Send the port number through the queue
   73     * Receive message via SMTP
   74     * Send message payload through the queue
   75     """
   76     smtpserver = SmtpListener(('0.0.0.0', 0), queue)
   77     port = smtpserver.get_port()
   78     queue.put(port)
   79     logger.debug("Waiting on port %s for OTP email", port)
   80     asyncore.loop(5, False, None, timeout)
   81     smtpserver.close()
   82 
   83 
   84 class SmtpMessageServer(object):
   85     """
   86     This class can start an SMTP debugging server,
   87     configure LinOTP to talk to it and read the
   88     results back to the parent tester.
   89 
   90     On open, an SMTP server is set up to listen locally.
   91     Derived classes can define a hook to set the LinOTP
   92     configuration to point to this server.
   93 
   94     Example usage:
   95 
   96     with SmtpMessageServer(testcase) as smtp:
   97         get_otp()
   98     """
   99 
  100     def __init__(self, testcase, message_timeout):
  101         self.testcase = testcase
  102 
  103         # We need a minimum version of 2.9.2 to set the SMTP port number, so
  104         # skip if testing an earlier version
  105         self.testcase.need_linotp_version('2.9.2')
  106 
  107         self.timeout = message_timeout
  108 
  109         self.set_config = SetConfig(testcase.http_protocol,
  110                                     testcase.http_host,
  111                                     testcase.http_port,
  112                                     testcase.http_username,
  113                                     testcase.http_password)
  114 
  115         # We advertise the local SMTP server hostname
  116         # using the IP address that connects to LinOTP
  117         self.addr = self._get_local_ip()
  118         self.msg_payload = None
  119 
  120     def __enter__(self):
  121         self.smtp_process_queue = Queue()
  122         self.smtp_process = Process(
  123             target=get_otp_mail, args=(self.smtp_process_queue, self.timeout))
  124         self.smtp_process.start()
  125         self.port = self.smtp_process_queue.get(True, 5)
  126         self._do_lintop_config()
  127 
  128         return self
  129 
  130     def _do_lintop_config(self):
  131         parameters = self.get_config_parameters()
  132 
  133         logger.debug("Configuration parameters: %s", parameters)
  134         result = self.set_config.setConfig(parameters)
  135 
  136         assert result, "It was not possible to set the config. Result:%s" % result
  137 
  138     def get_config_parameters(self):
  139         # This function can be overridden to provide configuration parameters to configure
  140         # specific parts of LinOTP
  141         assert False, "This function should be overridden"
  142 
  143     def get_otp(self):
  144         messagestr = self.smtp_process_queue.get(True, 10)
  145         msg = email.message_from_string(messagestr)
  146         otp = msg.get_payload()
  147 
  148         logger.debug("Received email message payload:%s", otp)
  149 
  150         return otp
  151 
  152     def __exit__(self, *args):
  153         self.smtp_process_queue.close()
  154         self.smtp_process.terminate()
  155         self.smtp_process.join(5)
  156 
  157     def _get_local_ip(self):
  158         """
  159         Get the IP address of the interface that connects to
  160         LinOTP
  161         """
  162 
  163         with closing(socket.create_connection((self.testcase.http_host,
  164                                                int(self.testcase.http_port)),
  165                                               10)) as s:
  166             addr = s.getsockname()[0]
  167 
  168         return addr
  169 
  170 
  171 class EmailProviderServer(SmtpMessageServer):
  172     """
  173     Implementation of SmtpMessageServer that configures LinOTP's email provider
  174     """
  175 
  176     def get_config_parameters(self):
  177 
  178         # SMTP e-mail configuration
  179         config = '''{
  180             "SMTP_SERVER": "%s",
  181             "SMTP_PORT": %s
  182         }''' % (self.addr, self.port)
  183 
  184         parameters = {
  185             'EmailProviderConfig': config
  186         }
  187         return parameters
  188 
  189 
  190 class SMSProviderServer(SmtpMessageServer):
  191     """
  192     Implementation of SmtpMessageServer that configures LinOTP's SMS provider
  193     to send SMS challenges via email
  194     """
  195 
  196     def get_config_parameters(self):
  197 
  198         sms_provider_config = '''{
  199             "mailserver" : "%s",
  200             "mailserver_port": %s,
  201             "mailsender" : "linotp-sms@localhost",
  202             "mailto": "seleniumtest@localhost"
  203         }''' % (self.addr, self.port)
  204         print sms_provider_config
  205 
  206         # Set SMTP sms config
  207         parameters = {
  208             'SMSProvider': 'smsprovider.SmtpSMSProvider.SmtpSMSProvider',
  209             'SMSProviderConfig': sms_provider_config,
  210         }
  211         return parameters