"Fossies" - the Fresh Open Source Software Archive

Member "Tardis-1.2.1/src/Tardis/Connection.py" (9 Jun 2021, 8038 Bytes) of package /linux/privat/Tardis-1.2.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "Connection.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.1.5_vs_1.2.1.

    1 # vim: set et sw=4 sts=4 fileencoding=utf-8:
    2 #
    3 # Tardis: A Backup System
    4 # Copyright 2013-2020, Eric Koldinger, All Rights Reserved.
    5 # kolding@washington.edu
    6 #
    7 # Redistribution and use in source and binary forms, with or without
    8 # modification, are permitted provided that the following conditions are met:
    9 #
   10 #     * Redistributions of source code must retain the above copyright
   11 #       notice, this list of conditions and the following disclaimer.
   12 #     * Redistributions in binary form must reproduce the above copyright
   13 #       notice, this list of conditions and the following disclaimer in the
   14 #       documentation and/or other materials provided with the distribution.
   15 #     * Neither the name of the copyright holder nor the
   16 #       names of its contributors may be used to endorse or promote products
   17 #       derived from this software without specific prior written permission.
   18 #
   19 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   20 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   21 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   22 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
   23 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   24 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   25 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   26 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   27 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   28 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   29 # POSSIBILITY OF SUCH DAMAGE.
   30 
   31 import socket
   32 import json
   33 import time
   34 import ssl
   35 import queue
   36 
   37 import Tardis
   38 import Tardis.Messages as Messages
   39 
   40 protocolVersion = "1.4"
   41 headerString    = "TARDIS " + protocolVersion
   42 sslHeaderString = headerString + "/SSL"
   43 
   44 class ConnectionException(Exception):
   45     pass
   46 
   47 class Connection(object):
   48     """ Root class for handling connections to the tardis server """
   49     def __init__(self, host, port, encoding, compress, timeout=None, validate=False):
   50         self.stats = { 'messagesRecvd': 0, 'messagesSent' : 0, 'bytesRecvd': 0, 'bytesSent': 0 }
   51 
   52         # Create and open the socket
   53         if host:
   54             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   55             if timeout:
   56                 sock.settimeout(timeout)
   57             sock.connect((host, int(port)))
   58             self.sock = sock
   59         else:
   60             self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
   61             if timeout:
   62                 self.sock.settimeout(timeout)
   63             self.sock.connect(port)
   64 
   65         try:
   66             # Receive a string.  TARDIS proto=1.0
   67             message = str(self.sock.recv(32).strip(), 'utf8')
   68             if message == sslHeaderString:
   69                 # Overwrite self.sock
   70                 self.sock = ssl.wrap_socket(self.sock, server_side=False) #, cert_reqs=ssl.CERT_REQUIRED, ca_certs="/etc/ssl/certs/ca-bundle.crt")
   71                 if validate:
   72                     pass        # TODO Check the certificate hostname.  Requires python 2.7.9 or higher.
   73             elif not message:
   74                 raise Exception("No header string.")
   75             elif message != headerString:
   76                 raise Exception("Unknown protocol: {}".format(message))
   77             resp = { 'encoding': encoding, 'compress': compress }
   78             self.put(bytes(json.dumps(resp), 'utf8'))
   79 
   80             message = self.sock.recv(256).strip()
   81             fields = json.loads(message)
   82             if fields['status'] != 'OK':
   83                 raise ConnectionException("Unable to connect")
   84         except Exception:
   85             self.sock.close()
   86             raise
   87 
   88     def put(self, message):
   89         self.sock.sendall(message)
   90         self.stats['messagesSent'] += 1
   91         return
   92 
   93     def recv(self, n):
   94         msg = ''
   95         while len(msg) < n:
   96             chunk = self.sock.recv(n-len(msg))
   97             if chunk == '':
   98                 raise RuntimeError("socket connection broken")
   99             msg = msg + chunk
  100         return msg
  101 
  102     def get(self, size):
  103         message = self.sock.recv(size).strip()
  104         self.stats['messagesRecvd'] += 1
  105         return message
  106 
  107     def close(self):
  108         self.sock.close()
  109 
  110     def getStats(self):
  111         return self.stats
  112 
  113 class ProtocolConnection(Connection):
  114     sender = None
  115     def __init__(self, host, port, protocol, compress, timeout):
  116         Connection.__init__(self, host, port, protocol, compress)
  117 
  118     def send(self, message, compress=True):
  119         self.sender.sendMessage(message, compress)
  120         self.stats['messagesSent'] += 1
  121 
  122     def receive(self):
  123         message = self.sender.recvMessage()
  124         self.stats['messagesRecvd'] += 1
  125         return message
  126 
  127     def close(self, error=None):
  128         message = {"message": "BYE" }
  129         if error:
  130             message["error"] = error
  131         self.send(message)
  132         super(ProtocolConnection, self).close()
  133 
  134     def encode(self, string):
  135         return self.sender.encode(string)
  136 
  137     def decode(self, string):
  138         return self.sender.decode(string)
  139 
  140 _defaultVersion = Tardis.__buildversion__  or Tardis.__version__
  141 
  142 class JsonConnection(ProtocolConnection):
  143     """ Class to communicate with the Tardis server using a JSON based protocol """
  144     def __init__(self, host, port, compress, timeout):
  145         ProtocolConnection.__init__(self, host, port, 'JSON', False, timeout)
  146         # Really, cons this up in the connection, but it needs access to the sock parameter, so.....
  147         self.sender = Messages.JsonMessages(self.sock, stats=self.stats)
  148 
  149 class BsonConnection(ProtocolConnection):
  150     def __init__(self, host, port, compress, timeout):
  151         ProtocolConnection.__init__(self, host, port, 'BSON', compress, timeout)
  152         # Really, cons this up in the connection, but it needs access to the sock parameter, so.....
  153         self.sender = Messages.BsonMessages(self.sock, stats=self.stats, compress=compress)
  154 
  155 class MsgPackConnection(ProtocolConnection):
  156     def __init__(self, host, port, compress, timeout):
  157         ProtocolConnection.__init__(self, host, port, 'MSGP', compress, timeout)
  158         # Really, cons this up in the connection, but it needs access to the sock parameter, so.....
  159         self.sender = Messages.MsgPackMessages(self.sock, stats=self.stats, compress=compress)
  160 
  161 class DirectConnection:
  162     stats = { 'messagesRecvd': 0, 'messagesSent' : 0, 'bytesRecvd': 0, 'bytesSent': 0 }
  163     serverStats = { 'messagesRecvd': 0, 'messagesSent' : 0, 'bytesRecvd': 0, 'bytesSent': 0 }
  164 
  165     def __init__(self, timeout):
  166         self.timeout = timeout
  167         self.toClientQueue = queue.SimpleQueue()
  168         self.toServerQueue = queue.Queue(1024)
  169         self.clientMessages = Messages.ObjectMessages(self.toClientQueue, self.toServerQueue, self.stats, timeout)
  170         self.serverMessages = Messages.ObjectMessages(self.toServerQueue, self.toClientQueue)
  171         self.sender = self.clientMessages
  172 
  173     def send(self, message, compress=True):
  174         self.sender.sendMessage(message, compress)
  175         self.stats['messagesSent'] += 1
  176 
  177     def receive(self):
  178         message = self.sender.recvMessage()
  179         self.stats['messagesRecvd'] += 1
  180         return message
  181 
  182     def close(self):
  183         self.send({"message" : "BYE" })
  184 
  185     def encode(self, string):
  186         return self.sender.encode(string)
  187 
  188     def decode(self, string):
  189         return self.sender.decode(string)
  190 
  191     def getStats(self):
  192         return self.stats
  193 
  194 
  195 if __name__ == "__main__":
  196     """
  197     Test Code
  198     conn = JsonConnection("localhost", 9999, "HiMom")
  199     print(conn.getSessionId())
  200     conn.send({ 'x' : 1 })
  201     print(conn.receive())
  202     conn.send({ 'y' : 2 })
  203     print(conn.receive())
  204     conn.close()
  205     """
  206 
  207     conn = DirectConnection(None)
  208     server = conn.serverMessages
  209 
  210     conn.send({"a" : 1})
  211     print(server.recvMessage())
  212     server.sendMessage({"b": 2, "c": ['a', 'b', 'c']})
  213     print(conn.receive())
  214 
  215