"Fossies" - the Fresh Open Source Software Archive

Member "Tardis-1.2.1/src/Tardis/Messages.py" (9 Jun 2021, 8082 Bytes) of package /linux/privat/Tardis-1.2.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "Messages.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.1.5_vs_1.2.1.

    1 # vim: set et sw=4 sts=4 fileencoding=utf-8:
    2 #
    3 # Tardis: A Backup System
    4 # Copyright 2013-2020, Eric Koldinger, All Rights Reserved.
    5 # kolding@washington.edu
    6 #
    7 # Redistribution and use in source and binary forms, with or without
    8 # modification, are permitted provided that the following conditions are met:
    9 #
   10 #     * Redistributions of source code must retain the above copyright
   11 #       notice, this list of conditions and the following disclaimer.
   12 #     * Redistributions in binary form must reproduce the above copyright
   13 #       notice, this list of conditions and the following disclaimer in the
   14 #       documentation and/or other materials provided with the distribution.
   15 #     * Neither the name of the copyright holder nor the
   16 #       names of its contributors may be used to endorse or promote products
   17 #       derived from this software without specific prior written permission.
   18 #
   19 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
   20 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   21 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   22 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
   23 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   24 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   25 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   26 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   27 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   28 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   29 # POSSIBILITY OF SUCH DAMAGE.
   30 
   31 import socket
   32 import os
   33 import sys
   34 import json
   35 import msgpack
   36 import base64
   37 import struct
   38 import zlib
   39 import snappy
   40 
   41 try:
   42     import bson
   43     _supportBson = True
   44 except:
   45     _supportBson = False
   46     pass
   47 
   48 
   49 class Messages(object):
   50     __socket = None
   51 
   52     def __init__(self, socket, stats=None):
   53         self.__socket = socket
   54         self.__stats = stats
   55 
   56     def receiveBytes(self, n):
   57         msg = bytearray()
   58         while len(msg) < n:
   59             chunk = self.__socket.recv(n-len(msg))
   60             if chunk == b'':
   61                 raise RuntimeError("socket connection broken")
   62             msg.extend(chunk)
   63         if self.__stats != None:
   64             self.__stats['bytesRecvd'] += len(msg)
   65         return msg
   66 
   67     def sendBytes(self, bytes):
   68         if self.__stats != None:
   69             self.__stats['bytesSent'] += len(bytes)
   70         self.__socket.sendall(bytes)
   71 
   72     def closeSocket(self):
   73         if self.__socket:
   74             self.__socket.close()
   75             self.__socket = None
   76 
   77 class zlibCompressor:
   78     def __init__(self):
   79         self.compressor = zlib.compressobj()
   80         self.decompressor = zlib.decompressobj()
   81 
   82     def compress(self, message):
   83         message = self.compressor.compress(message)
   84         message += self.compressor.flush(zlib.Z_SYNC_FLUSH)
   85         return message
   86 
   87     def decompress(self, message):
   88         message = self.decompressor.decompress(message)
   89         return message
   90 
   91 class BinMessages(Messages):
   92     compress = None
   93     decompress = None
   94     def __init__(self, socket, stats=None, compress='none'):
   95         Messages.__init__(self, socket, stats)
   96         if compress == 'zlib-stream':
   97             self.compressor = zlibCompressor()
   98             self.compress = self.compressor.compress
   99             self.decompress = self.compressor.decompress
  100         elif compress == 'zlib':
  101             self.compress = zlib.compress
  102             self.decompress = zlib.decompress
  103         elif compress == 'snappy':
  104             self.compress = snappy.compress
  105             self.decompress = snappy.decompress
  106         elif compress != 'none':
  107             raise Exception("Unrecognized compression method: %s" % str(compress))
  108 
  109     def sendMessage(self, message, compress=True, raw=False):
  110         if compress and self.compress:
  111             message = self.compress(message)
  112         length = len(message)
  113         if compress and self.compress:
  114             length |= 0x80000000
  115         lBytes = struct.pack("!I", length)
  116         self.sendBytes(lBytes)
  117         self.sendBytes(message)
  118 
  119     def recvMessage(self):
  120         comp = False
  121         x = self.receiveBytes(4)
  122         n = struct.unpack("!I", x)[0]
  123         if (n & 0x80000000) != 0:
  124             n &= 0x7fffffff
  125             comp = True
  126         data = self.receiveBytes(n)
  127         if comp:
  128             data = self.decompress(bytes(data))
  129         return data
  130 
  131 class TextMessages(Messages):
  132     def __init__(self, socket, stats=None):
  133         Messages.__init__(self, socket, stats)
  134 
  135     def sendMessage(self, message, compress=True):
  136         length = len(message)
  137         output = "{:06d}".format(length)
  138         self.sendBytes(output)
  139         self.sendBytes(message)
  140 
  141     def recvMessage(self):
  142         n = self.receiveBytes(6)
  143         return self.receiveBytes(int(n))
  144 
  145 class JsonMessages(TextMessages):
  146     def __init__(self, socket, stats=None, compress=False):
  147         TextMessages.__init__(self, socket, stats)
  148     
  149     def sendMessage(self, message, compress=False, raw=False):
  150         if raw:
  151             super(JsonMessages, self).sendMessage(message)
  152         else:
  153             super(JsonMessages, self).sendMessage(json.dumps(message))
  154 
  155     def recvMessage(self, raw=False):
  156         if raw:
  157             message = super(JsonMessages, self).recvMessage()
  158         else:
  159             message = json.loads(super(JsonMessages, self).recvMessage())
  160         return message
  161 
  162     def encode(self, data):
  163         return base64.b64encode(data)
  164 
  165     def decode(self, data):
  166         return base64.b64decode(data)
  167 
  168     def getEncoding(self):
  169         return "base64"
  170 
  171 class MsgPackMessages(BinMessages):
  172     def __init__(self, socket, stats=None, compress=True):
  173         BinMessages.__init__(self, socket, stats, compress=compress)
  174     
  175     def sendMessage(self, message, compress=True, raw=False):
  176         if raw:
  177             super(MsgPackMessages, self).sendMessage(message, compress=compress, raw=True)
  178         else:
  179             super(MsgPackMessages, self).sendMessage(msgpack.packb(message, use_bin_type=True), compress=compress)
  180 
  181     def recvMessage(self, raw=False):
  182         if raw:
  183             message = super(MsgPackMessages, self).recvMessage()
  184         else:
  185             mess = super(MsgPackMessages, self).recvMessage()
  186             message = msgpack.unpackb(mess, encoding='utf-8')
  187         return message
  188 
  189     def encode(self, data):
  190         return data
  191 
  192     def decode(self, data):
  193         return data
  194 
  195     def getEncoding(self):
  196         return "bin"
  197 
  198 class BsonMessages(BinMessages):
  199     def __init__(self, socket, stats=None, compress=True):
  200         BinMessages.__init__(self, socket, stats, compress=compress)
  201     
  202     def sendMessage(self, message, compress=True, raw=False):
  203         if raw:
  204             super(BsonMessages, self).sendMessage(message, compress=compress, raw=True)
  205         else:
  206             super(BsonMessages, self).sendMessage(bson.dumps(message), compress=compress)
  207 
  208     def recvMessage(self, raw=False):
  209         if raw:
  210             message = super(BsonMessages, self).recvMessage()
  211         else:
  212             message = bson.loads(super(BsonMessages, self).recvMessage())
  213         return message
  214 
  215     def encode(self, data):
  216         return data
  217 
  218     def decode(self, data):
  219         return data
  220 
  221     def getEncoding(self):
  222         return "bin"
  223 
  224 class ObjectMessages():
  225     def __init__(self, inQueue, outQueue, stats=None, compress=True, timeout=None):
  226         self.inQueue = inQueue
  227         self.outQueue= outQueue
  228         self.timeout = timeout
  229 
  230     def sendMessage(self, message, compress=False, raw=False):
  231         self.outQueue.put(message)
  232 
  233     def recvMessage(self, raw=False):
  234         ret =  self.inQueue.get(timeout=self.timeout)
  235         if isinstance(ret, BaseException):
  236             raise ret
  237         return ret
  238 
  239     def encode(self, data):
  240         return data
  241 
  242     def decode(self, data):
  243         return data
  244 
  245     def getEncoding(self):
  246         return "bin"
  247 
  248     def closeSocket(self):
  249         pass