"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/fuglu/mailattach.py" between
fuglu-0.10.8.tar.gz and fuglu-1.0.0.tar.gz

About: FuGlu is a mail scanning daemon for Postfix written in Python. It acts as a glue application between the MTA and spam checkers and antivirus software.

mailattach.py  (fuglu-0.10.8):mailattach.py  (fuglu-1.0.0)
skipping to change at line 25 skipping to change at line 25
# #
# #
# #
import mimetypes import mimetypes
import email import email
import logging import logging
import weakref import weakref
import hashlib import hashlib
import base64 import base64
import re
import string
import random
import typing as tp import typing as tp
from typing import Optional from typing import Optional
from fuglu.extensions.filearchives import Archivehandle from fuglu.extensions.filearchives import Archivehandle
from fuglu.extensions.filetype import filetype_handler from fuglu.extensions.filetype import filetype_handler
from fuglu.caching import smart_cached_property, smart_cached_memberfunc, Cachel imits from fuglu.caching import smart_cached_property, smart_cached_memberfunc, Cachel imits
from fuglu.stringencode import force_uString, force_bString from fuglu.stringencode import force_uString, force_bString
from fuglu.lib.patchedemail import PatchedMessage from fuglu.lib.patchedemail import PatchedMessage
from io import BytesIO from io import BytesIO
# workarounds for mimetypes # workarounds for mimetypes
skipping to change at line 57 skipping to change at line 60
# Mailattachment.parsed becomes false # Mailattachment.parsed becomes false
DEFECT_ATTPARSEERROR = "<<ATTPARSEERROR>>" DEFECT_ATTPARSEERROR = "<<ATTPARSEERROR>>"
# tuple containing errors preventing correct decoding of buffer # tuple containing errors preventing correct decoding of buffer
BAD_DEFECTS = (email.errors.InvalidBase64LengthDefect,) BAD_DEFECTS = (email.errors.InvalidBase64LengthDefect,)
# The contenttype returned for an attachment with parsing error # The contenttype returned for an attachment with parsing error
ATTPARSEERROR_CONTENTTYPE= "application/unknown" ATTPARSEERROR_CONTENTTYPE= "application/unknown"
class NoExtractInfo(object): class NoExtractInfo(object):
"""Store info about files """ """Store info about files """
valid_causes = [u"level", u"archivehandle", u"toolarge", u"extractsize", u'n umfiles'] valid_causes = ["level", "archivehandle", "toolarge", "extractsize", 'numfil es']
def __init__(self): def __init__(self):
"""Constructor""" """Constructor"""
self.infolists = {} self.infolists = {}
for cause in NoExtractInfo.valid_causes: for cause in NoExtractInfo.valid_causes:
self.infolists[cause] = [] self.infolists[cause] = []
def append(self, filename, cause, message): def append(self, filename, cause, message):
""" """
skipping to change at line 295 skipping to change at line 298
- buffer (bytes): The buffer with the raw attachment content - buffer (bytes): The buffer with the raw attachment content
- in_obj (Mailattachment): Reference to parent if this object was ex tracted from an archive - in_obj (Mailattachment): Reference to parent if this object was ex tracted from an archive
- content_charset_mime (string): charset if available or None - content_charset_mime (string): charset if available or None
Returns: Returns:
unicode : the unicode string representing the buffer or an empty str ing on any error unicode : the unicode string representing the buffer or an empty str ing on any error
""" """
# only for first level attachments # only for first level attachments
decoded_buffer = u"" decoded_buffer = ""
if self.in_obj is None: if self.in_obj is None:
charset = self.content_charset_mime if self.content_charset_mime els e "utf-8" charset = self.content_charset_mime if self.content_charset_mime els e "utf-8"
decoded_buffer = force_uString(self.buffer, encodingGuess=charset) decoded_buffer = force_uString(self.buffer, encodingGuess=charset, e rrors='strict+ignore')
return decoded_buffer return decoded_buffer
def _mkchecksum(self, value, method, encoding=HASHENC_HEX, strip=False): def _mkchecksum(self, value, method, encoding=HASHENC_HEX, strip=False):
""" """
generate checksum of value using given method and encoding generate checksum of value using given method and encoding
:param value: the value to be encoded :param value: the value to be encoded
:param method: any hash algorithm supported by hashlib (see hashlib.algo rithms_guaranteed for a list) :param method: any hash algorithm supported by hashlib (see hashlib.algo rithms_guaranteed for a list)
:param encoding: returned encoding: hex (default), b64, b32 :param encoding: returned encoding: hex (default), b64, b32
:param strip: boolean: strip trailing '=' from b64, b32 hash :param strip: boolean: strip trailing '=' from b64, b32 hash
skipping to change at line 673 skipping to change at line 676
if not self.is_archive: if not self.is_archive:
return None return None
else: else:
try: try:
obj = self._buffer_archobj[fname] obj = self._buffer_archobj[fname]
except KeyError: except KeyError:
try: try:
filesize = self.archive_handle.filesize(fname) filesize = self.archive_handle.filesize(fname)
except Exception as e: except Exception as e:
if noextractinfo is not None: if noextractinfo is not None:
noextractinfo.append(fname, u"archivehandle", u"exceptio n: %s" % force_uString(e)) noextractinfo.append(fname, "archivehandle", "exception: %s" % force_uString(e))
return None return None
try: try:
buffer = self.archive_handle.extract(fname,maxsize_extract) buffer = self.archive_handle.extract(fname,maxsize_extract)
except Exception as e: except Exception as e:
if noextractinfo is not None: if noextractinfo is not None:
noextractinfo.append(fname, u"archivehandle", u"exceptio n: %s" % force_uString(e)) noextractinfo.append(fname, "archivehandle", "exception: %s" % force_uString(e))
return None return None
if buffer is None: if buffer is None:
if noextractinfo is not None: if noextractinfo is not None:
if maxsize_extract is not None \ if maxsize_extract is not None \
and filesize is not None \ and filesize is not None \
and filesize > maxsize_extract: and filesize > maxsize_extract:
noextractinfo.append(fname, u"extractsize", u"not ex noextractinfo.append(fname, "extractsize", "not extr
tracted: %u > %u" acted: %u > %u" % (filesize, maxsize_extract))
% (filesize, maxsize_extract))
else: else:
noextractinfo.append(fname, u"archivehandle", u"(no info)") noextractinfo.append(fname, "archivehandle", "(no in fo)")
return None return None
obj = Mailattachment(buffer, fname, self._mgr() if self._mgr els e None, self.fugluid, filesize=filesize, in_obj=self, obj = Mailattachment(buffer, fname, self._mgr() if self._mgr els e None, self.fugluid, filesize=filesize, in_obj=self,
is_attachment=self.is_attachment, is_inline =self.is_inline) is_attachment=self.is_attachment, is_inline =self.is_inline)
# This object caching is outside the caching decorator used in o ther parts of this # This object caching is outside the caching decorator used in o ther parts of this
# file (not for this function anyway...). # file (not for this function anyway...).
if self._mgr and self._mgr().use_caching(filesize): if self._mgr and self._mgr().use_caching(filesize):
self._buffer_archobj[fname] = obj self._buffer_archobj[fname] = obj
return obj return obj
skipping to change at line 762 skipping to change at line 764
""" """
# make sure there's no buffered archive object when # make sure there's no buffered archive object when
# the archive handle is created (or overwritten) # the archive handle is created (or overwritten)
self._buffer_archobj = {} self._buffer_archobj = {}
handle = None handle = None
if self.buffer is not None: if self.buffer is not None:
try: try:
handle = Archivehandle(self.archive_type, BytesIO(self.buffer),a rchivename=self.filename) handle = Archivehandle(self.archive_type, BytesIO(self.buffer),a rchivename=self.filename)
except Exception as e: except Exception as e:
self.logger.error("%s, Problem creating Archivehandle for file: self.logger.warning(f"{self.fugluid}, Problem creating Archiveha
" ndle for file: "
"%s using archive handler %s (message: %s) -> f"{self.filename} using archive handler {str
ignore" (self.archive_type)} "
% (self.fugluid, self.filename, str(self.archi f"(message: {force_uString(e)}) -> ignore")
ve_type), force_uString(e)))
if force_uString(e).strip() == "": if force_uString(e).strip() == "":
# store class name if no string # store class name if no string
self._archive_handle_exc = e.__class__.__name__ self._archive_handle_exc = e.__class__.__name__
else: else:
# store string # store string
self._archive_handle_exc = force_uString(e) self._archive_handle_exc = force_uString(e)
return handle return handle
@smart_cached_property(inputs=['archive_handle']) @smart_cached_property(inputs=['archive_handle'])
skipping to change at line 818 skipping to change at line 820
""" """
parentsList = [] parentsList = []
upstream_obj = weakref.ref(self) upstream_obj = weakref.ref(self)
while upstream_obj() is not None and upstream_obj().in_obj is not None: while upstream_obj() is not None and upstream_obj().in_obj is not None:
parentsList.append(upstream_obj().in_obj) parentsList.append(upstream_obj().in_obj)
upstream_obj = upstream_obj().in_obj upstream_obj = upstream_obj().in_obj
return parentsList return parentsList
def location(self): def location(self):
"""Print the location of the the file in the archive tree""" """Print the location of the the file in the archive tree"""
element_of = u" \u2208 " element_of = " \u2208 "
location = self.filename location = self.filename
if self.parent_archives: if self.parent_archives:
location += element_of + element_of.join([u"{" + obj().filename + u" }" for obj in self.parent_archives]) location += element_of + element_of.join(["{" + obj().filename + "}" for obj in self.parent_archives])
return location return location
def get_mangled_filename(self, prefix='', maxchars=15):
first=self.filename.split('.')[0]
#replace spaces with underscore
first = re.sub('\s', '_', first)
#remove all non alnum chars
first=''.join([x for x in first if x in string.ascii_letters or x in str
ing.digits])
if first.strip()=='' or first.strip()=='filename': #renattach renames to
'filename'
first='upn-%s' % random.randint(1000,9999)
elif len(first) < 4:
first = 'sh-%s-%s' % (random.randint(1000,9999), first)
#strip to a machimum of maxchars characters
first=first[:maxchars]
newfilename = '%s.%s' % (prefix, first)
return newfilename
def __str__(self): def __str__(self):
""" """
String conversion function for object. Creates String conversion function for object. Creates
a string with some basic information a string with some basic information
Returns: Returns:
(str): string with object information (str): string with object information
""" """
element_of = u" \u2208 " element_of = " \u2208 "
return u""" return """
Filename : %s Filename : %s
Size (bytes) : %s Size (bytes) : %s
Location : %s Location : %s
Archive type : %s Archive type : %s
Content type : %s""" % (self.filename,u'(unknown)' if self.filesize is None else str(self.filesize), Content type : %s""" % (self.filename,'(unknown)' if self.filesize is None else str(self.filesize),
self.filename + element_of + self.filename + element_of +
element_of.join([u"{" + obj().filename +u"}" for obj in self.parent_archives]), element_of.join(["{" + obj().filename +"}" for obj in se lf.parent_archives]),
self.archive_type, self.archive_type,
self.contenttype) self.contenttype)
class Mailattachment_mgr(object): class Mailattachment_mgr(object):
"""Mail attachment manager""" """Mail attachment manager"""
def __init__(self, def __init__(self,
msgrep: PatchedMessage, msgrep: PatchedMessage,
fugluid: str, section: Optional[str] = None, fugluid: str, section: Optional[str] = None,
cachelimit: Optional[int] = None, cachelimit: Optional[int] = None,
 End of changes. 15 change blocks. 
21 lines changed or deleted 39 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)