"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/fuglu/plugins/uriextract.py" between
fuglu-0.10.8.tar.gz and fuglu-1.0.0.tar.gz

About: FuGlu is a mail scanning daemon for Postfix written in Python. It acts as a glue application between the MTA and spam checkers and antivirus software.

uriextract.py  (fuglu-0.10.8):uriextract.py  (fuglu-1.0.0)
skipping to change at line 25 skipping to change at line 25
# #
# #
""" """
A collection of plugins to A collection of plugins to
- extract URIs/email addresses from mail body and text attachments - extract URIs/email addresses from mail body and text attachments
- lookup URIs/email addresses on RBLs - lookup URIs/email addresses on RBLs
These plugins require beautifulsoup and domainmagic These plugins require beautifulsoup and domainmagic
""" """
from fuglu.shared import ScannerPlugin, DUNNO, string_to_actioncode, apply_templ ate, FileList, AppenderPlugin, HAVE_BEAUTIFULSOUP, Suspect from fuglu.shared import ScannerPlugin, DUNNO, string_to_actioncode, apply_templ ate, FileList, AppenderPlugin, HAVE_BEAUTIFULSOUP, Suspect, get_default_cache
from fuglu.stringencode import force_uString from fuglu.stringencode import force_uString
from fuglu.extensions.sql import SQL_EXTENSION_ENABLED, get_session
from .sa import UserPref, GLOBALSCOPE
import os import os
import html import html
import re import re
import random import random
import typing as tp import typing as tp
import io
try: try:
from domainmagic.extractor import URIExtractor, fqdn_from_uri, redirect_from _url from domainmagic.extractor import URIExtractor, fqdn_from_uri, redirect_from _url
from domainmagic.rbl import RBLLookup from domainmagic.rbl import RBLLookup
from domainmagic.tld import TLDMagic from domainmagic.tld import TLDMagic
from domainmagic.mailaddr import domain_from_mail from domainmagic.mailaddr import domain_from_mail
DOMAINMAGIC_AVAILABLE = True DOMAINMAGIC_AVAILABLE = True
except ImportError: except ImportError:
DOMAINMAGIC_AVAILABLE=False DOMAINMAGIC_AVAILABLE=False
if HAVE_BEAUTIFULSOUP: if HAVE_BEAUTIFULSOUP:
import bs4 as BeautifulSoup import bs4 as BeautifulSoup
else: else:
BeautifulSoup = None BeautifulSoup = None
try:
import vobject
from contextlib import redirect_stdout
except ImportError:
vobject = None
redirect_stdout = None
# remove invisible characters # remove invisible characters
ichars = r"[\0-\x1F\x7F-\x9F\xAD\u0378\u0379\u037F-\u0383\u038B\u038D\u03A2\u052 8-\u0530\u0557\u0558\u0560\u0588\u058B-\u058E\u0590\u05C8-\u05CF\u05EB-\u05EF\u0 5F5-\u0605\u061C\u061D\u06DD\u070E\u070F\u074B\u074C\u07B2-\u07BF\u07FB-\u07FF\u 082E\u082F\u083F\u085C\u085D\u085F-\u089F\u08A1\u08AD-\u08E3\u08FF\u0978\u0980\u 0984\u098D\u098E\u0991\u0992\u09A9\u09B1\u09B3-\u09B5\u09BA\u09BB\u09C5\u09C6\u0 9C9\u09CA\u09CF-\u09D6\u09D8-\u09DB\u09DE\u09E4\u09E5\u09FC-\u0A00\u0A04\u0A0B-\ u0A0E\u0A11\u0A12\u0A29\u0A31\u0A34\u0A37\u0A3A\u0A3B\u0A3D\u0A43-\u0A46\u0A49\u 0A4A\u0A4E-\u0A50\u0A52-\u0A58\u0A5D\u0A5F-\u0A65\u0A76-\u0A80\u0A84\u0A8E\u0A92 \u0AA9\u0AB1\u0AB4\u0ABA\u0ABB\u0AC6\u0ACA\u0ACE\u0ACF\u0AD1-\u0ADF\u0AE4\u0AE5\ u0AF2-\u0B00\u0B04\u0B0D\u0B0E\u0B11\u0B12\u0B29\u0B31\u0B34\u0B3A\u0B3B\u0B45\u 0B46\u0B49\u0B4A\u0B4E-\u0B55\u0B58-\u0B5B\u0B5E\u0B64\u0B65\u0B78-\u0B81\u0B84\ u0B8B-\u0B8D\u0B91\u0B96-\u0B98\u0B9B\u0B9D\u0BA0-\u0BA2\u0BA5-\u0BA7\u0BAB-\u0B AD\u0BBA-\u0BBD\u0BC3-\u0BC5\u0BC9\u0BCE\u0BCF\u0BD1-\u0BD6\u0BD8-\u0BE5\u0BFB-\ u0C00\u0C04\u0C0D\u0C11\u0C29\u0C34\u0C3A-\u0C3C\u0C45\u0C49\u0C4E-\u0C54\u0C57\ u0C5A-\u0C5F\u0C64\u0C65\u0C70-\u0C77\u0C80\u0C81\u0C84\u0C8D\u0C91\u0CA9\u0CB4\ u0CBA\u0CBB\u0CC5\u0CC9\u0CCE-\u0CD4\u0CD7-\u0CDD\u0CDF\u0CE4\u0CE5\u0CF0\u0CF3- \u0D01\u0D04\u0D0D\u0D11\u0D3B\u0D3C\u0D45\u0D49\u0D4F-\u0D56\u0D58-\u0D5F\u0D64 \u0D65\u0D76-\u0D78\u0D80\u0D81\u0D84\u0D97-\u0D99\u0DB2\u0DBC\u0DBE\u0DBF\u0DC7 -\u0DC9\u0DCB-\u0DCE\u0DD5\u0DD7\u0DE0-\u0DF1\u0DF5-\u0E00\u0E3B-\u0E3E\u0E5C-\u 0E80\u0E83\u0E85\u0E86\u0E89\u0E8B\u0E8C\u0E8E-\u0E93\u0E98\u0EA0\u0EA4\u0EA6\u0 EA8\u0EA9\u0EAC\u0EBA\u0EBE\u0EBF\u0EC5\u0EC7\u0ECE\u0ECF\u0EDA\u0EDB\u0EE0-\u0E FF\u0F48\u0F6D-\u0F70\u0F98\u0FBD\u0FCD\u0FDB-\u0FFF\u10C6\u10C8-\u10CC\u10CE\u1 0CF\u1249\u124E\u124F\u1257\u1259\u125E\u125F\u1289\u128E\u128F\u12B1\u12B6\u12B 7\u12BF\u12C1\u12C6\u12C7\u12D7\u1311\u1316\u1317\u135B\u135C\u137D-\u137F\u139A -\u139F\u13F5-\u13FF\u169D-\u169F\u16F1-\u16FF\u170D\u1715-\u171F\u1737-\u173F\u 1754-\u175F\u176D\u1771\u1774-\u177F\u17DE\u17DF\u17EA-\u17EF\u17FA-\u17FF\u180F \u181A-\u181F\u1878-\u187F\u18AB-\u18AF\u18F6-\u18FF\u191D-\u191F\u192C-\u192F\u 193C-\u193F\u1941-\u1943\u196E\u196F\u1975-\u197F\u19AC-\u19AF\u19CA-\u19CF\u19D B-\u19DD\u1A1C\u1A1D\u1A5F\u1A7D\u1A7E\u1A8A-\u1A8F\u1A9A-\u1A9F\u1AAE-\u1AFF\u1 B4C-\u1B4F\u1B7D-\u1B7F\u1BF4-\u1BFB\u1C38-\u1C3A\u1C4A-\u1C4C\u1C80-\u1CBF\u1CC 8-\u1CCF\u1CF7-\u1CFF\u1DE7-\u1DFB\u1F16\u1F17\u1F1E\u1F1F\u1F46\u1F47\u1F4E\u1F 4F\u1F58\u1F5A\u1F5C\u1F5E\u1F7E\u1F7F\u1FB5\u1FC5\u1FD4\u1FD5\u1FDC\u1FF0\u1FF1 \u1FF5\u1FFF\u200B-\u200F\u202A-\u202E\u2060-\u206F\u2072\u2073\u208F\u209D-\u20 9F\u20BB-\u20CF\u20F1-\u20FF\u218A-\u218F\u23F4-\u23FF\u2427-\u243F\u244B-\u245F \u2700\u2B4D-\u2B4F\u2B5A-\u2BFF\u2C2F\u2C5F\u2CF4-\u2CF8\u2D26\u2D28-\u2D2C\u2D 2E\u2D2F\u2D68-\u2D6E\u2D71-\u2D7E\u2D97-\u2D9F\u2DA7\u2DAF\u2DB7\u2DBF\u2DC7\u2 DCF\u2DD7\u2DDF\u2E3C-\u2E7F\u2E9A\u2EF4-\u2EFF\u2FD6-\u2FEF\u2FFC-\u2FFF\u3040\ u3097\u3098\u3100-\u3104\u312E-\u3130\u318F\u31BB-\u31BF\u31E4-\u31EF\u321F\u32F F\u4DB6-\u4DBF\u9FCD-\u9FFF\uA48D-\uA48F\uA4C7-\uA4CF\uA62C-\uA63F\uA698-\uA69E\ uA6F8-\uA6FF\uA78F\uA794-\uA79F\uA7AB-\uA7F7\uA82C-\uA82F\uA83A-\uA83F\uA878-\uA 87F\uA8C5-\uA8CD\uA8DA-\uA8DF\uA8FC-\uA8FF\uA954-\uA95E\uA97D-\uA97F\uA9CE\uA9DA -\uA9DD\uA9E0-\uA9FF\uAA37-\uAA3F\uAA4E\uAA4F\uAA5A\uAA5B\uAA7C-\uAA7F\uAAC3-\uA ADA\uAAF7-\uAB00\uAB07\uAB08\uAB0F\uAB10\uAB17-\uAB1F\uAB27\uAB2F-\uABBF\uABEE\u ABEF\uABFA-\uABFF\uD7A4-\uD7AF\uD7C7-\uD7CA\uD7FC-\uF8FF\uFA6E\uFA6F\uFADA-\uFAF F\uFB07-\uFB12\uFB18-\uFB1C\uFB37\uFB3D\uFB3F\uFB42\uFB45\uFBC2-\uFBD2\uFD40-\uF D4F\uFD90\uFD91\uFDC8-\uFDEF\uFDFE\uFDFF\uFE1A-\uFE1F\uFE27-\uFE2F\uFE53\uFE67\u FE6C-\uFE6F\uFE75\uFEFD-\uFF00\uFFBF-\uFFC1\uFFC8\uFFC9\uFFD0\uFFD1\uFFD8\uFFD9\ uFFDD-\uFFDF\uFFE7\uFFEF-\uFFFB\uFFFE\uFFFF]" ichars = r"[\0-\x1F\x7F-\x9F\xAD\u0378\u0379\u037F-\u0383\u038B\u038D\u03A2\u052 8-\u0530\u0557\u0558\u0560\u0588\u058B-\u058E\u0590\u05C8-\u05CF\u05EB-\u05EF\u0 5F5-\u0605\u061C\u061D\u06DD\u070E\u070F\u074B\u074C\u07B2-\u07BF\u07FB-\u07FF\u 082E\u082F\u083F\u085C\u085D\u085F-\u089F\u08A1\u08AD-\u08E3\u08FF\u0978\u0980\u 0984\u098D\u098E\u0991\u0992\u09A9\u09B1\u09B3-\u09B5\u09BA\u09BB\u09C5\u09C6\u0 9C9\u09CA\u09CF-\u09D6\u09D8-\u09DB\u09DE\u09E4\u09E5\u09FC-\u0A00\u0A04\u0A0B-\ u0A0E\u0A11\u0A12\u0A29\u0A31\u0A34\u0A37\u0A3A\u0A3B\u0A3D\u0A43-\u0A46\u0A49\u 0A4A\u0A4E-\u0A50\u0A52-\u0A58\u0A5D\u0A5F-\u0A65\u0A76-\u0A80\u0A84\u0A8E\u0A92 \u0AA9\u0AB1\u0AB4\u0ABA\u0ABB\u0AC6\u0ACA\u0ACE\u0ACF\u0AD1-\u0ADF\u0AE4\u0AE5\ u0AF2-\u0B00\u0B04\u0B0D\u0B0E\u0B11\u0B12\u0B29\u0B31\u0B34\u0B3A\u0B3B\u0B45\u 0B46\u0B49\u0B4A\u0B4E-\u0B55\u0B58-\u0B5B\u0B5E\u0B64\u0B65\u0B78-\u0B81\u0B84\ u0B8B-\u0B8D\u0B91\u0B96-\u0B98\u0B9B\u0B9D\u0BA0-\u0BA2\u0BA5-\u0BA7\u0BAB-\u0B AD\u0BBA-\u0BBD\u0BC3-\u0BC5\u0BC9\u0BCE\u0BCF\u0BD1-\u0BD6\u0BD8-\u0BE5\u0BFB-\ u0C00\u0C04\u0C0D\u0C11\u0C29\u0C34\u0C3A-\u0C3C\u0C45\u0C49\u0C4E-\u0C54\u0C57\ u0C5A-\u0C5F\u0C64\u0C65\u0C70-\u0C77\u0C80\u0C81\u0C84\u0C8D\u0C91\u0CA9\u0CB4\ u0CBA\u0CBB\u0CC5\u0CC9\u0CCE-\u0CD4\u0CD7-\u0CDD\u0CDF\u0CE4\u0CE5\u0CF0\u0CF3- \u0D01\u0D04\u0D0D\u0D11\u0D3B\u0D3C\u0D45\u0D49\u0D4F-\u0D56\u0D58-\u0D5F\u0D64 \u0D65\u0D76-\u0D78\u0D80\u0D81\u0D84\u0D97-\u0D99\u0DB2\u0DBC\u0DBE\u0DBF\u0DC7 -\u0DC9\u0DCB-\u0DCE\u0DD5\u0DD7\u0DE0-\u0DF1\u0DF5-\u0E00\u0E3B-\u0E3E\u0E5C-\u 0E80\u0E83\u0E85\u0E86\u0E89\u0E8B\u0E8C\u0E8E-\u0E93\u0E98\u0EA0\u0EA4\u0EA6\u0 EA8\u0EA9\u0EAC\u0EBA\u0EBE\u0EBF\u0EC5\u0EC7\u0ECE\u0ECF\u0EDA\u0EDB\u0EE0-\u0E FF\u0F48\u0F6D-\u0F70\u0F98\u0FBD\u0FCD\u0FDB-\u0FFF\u10C6\u10C8-\u10CC\u10CE\u1 0CF\u1249\u124E\u124F\u1257\u1259\u125E\u125F\u1289\u128E\u128F\u12B1\u12B6\u12B 7\u12BF\u12C1\u12C6\u12C7\u12D7\u1311\u1316\u1317\u135B\u135C\u137D-\u137F\u139A -\u139F\u13F5-\u13FF\u169D-\u169F\u16F1-\u16FF\u170D\u1715-\u171F\u1737-\u173F\u 1754-\u175F\u176D\u1771\u1774-\u177F\u17DE\u17DF\u17EA-\u17EF\u17FA-\u17FF\u180F \u181A-\u181F\u1878-\u187F\u18AB-\u18AF\u18F6-\u18FF\u191D-\u191F\u192C-\u192F\u 193C-\u193F\u1941-\u1943\u196E\u196F\u1975-\u197F\u19AC-\u19AF\u19CA-\u19CF\u19D B-\u19DD\u1A1C\u1A1D\u1A5F\u1A7D\u1A7E\u1A8A-\u1A8F\u1A9A-\u1A9F\u1AAE-\u1AFF\u1 B4C-\u1B4F\u1B7D-\u1B7F\u1BF4-\u1BFB\u1C38-\u1C3A\u1C4A-\u1C4C\u1C80-\u1CBF\u1CC 8-\u1CCF\u1CF7-\u1CFF\u1DE7-\u1DFB\u1F16\u1F17\u1F1E\u1F1F\u1F46\u1F47\u1F4E\u1F 4F\u1F58\u1F5A\u1F5C\u1F5E\u1F7E\u1F7F\u1FB5\u1FC5\u1FD4\u1FD5\u1FDC\u1FF0\u1FF1 \u1FF5\u1FFF\u200B-\u200F\u202A-\u202E\u2060-\u206F\u2072\u2073\u208F\u209D-\u20 9F\u20BB-\u20CF\u20F1-\u20FF\u218A-\u218F\u23F4-\u23FF\u2427-\u243F\u244B-\u245F \u2700\u2B4D-\u2B4F\u2B5A-\u2BFF\u2C2F\u2C5F\u2CF4-\u2CF8\u2D26\u2D28-\u2D2C\u2D 2E\u2D2F\u2D68-\u2D6E\u2D71-\u2D7E\u2D97-\u2D9F\u2DA7\u2DAF\u2DB7\u2DBF\u2DC7\u2 DCF\u2DD7\u2DDF\u2E3C-\u2E7F\u2E9A\u2EF4-\u2EFF\u2FD6-\u2FEF\u2FFC-\u2FFF\u3040\ u3097\u3098\u3100-\u3104\u312E-\u3130\u318F\u31BB-\u31BF\u31E4-\u31EF\u321F\u32F F\u4DB6-\u4DBF\u9FCD-\u9FFF\uA48D-\uA48F\uA4C7-\uA4CF\uA62C-\uA63F\uA698-\uA69E\ uA6F8-\uA6FF\uA78F\uA794-\uA79F\uA7AB-\uA7F7\uA82C-\uA82F\uA83A-\uA83F\uA878-\uA 87F\uA8C5-\uA8CD\uA8DA-\uA8DF\uA8FC-\uA8FF\uA954-\uA95E\uA97D-\uA97F\uA9CE\uA9DA -\uA9DD\uA9E0-\uA9FF\uAA37-\uAA3F\uAA4E\uAA4F\uAA5A\uAA5B\uAA7C-\uAA7F\uAAC3-\uA ADA\uAAF7-\uAB00\uAB07\uAB08\uAB0F\uAB10\uAB17-\uAB1F\uAB27\uAB2F-\uABBF\uABEE\u ABEF\uABFA-\uABFF\uD7A4-\uD7AF\uD7C7-\uD7CA\uD7FC-\uF8FF\uFA6E\uFA6F\uFADA-\uFAF F\uFB07-\uFB12\uFB18-\uFB1C\uFB37\uFB3D\uFB3F\uFB42\uFB45\uFBC2-\uFBD2\uFD40-\uF D4F\uFD90\uFD91\uFDC8-\uFDEF\uFDFE\uFDFF\uFE1A-\uFE1F\uFE27-\uFE2F\uFE53\uFE67\u FE6C-\uFE6F\uFE75\uFEFD-\uFF00\uFFBF-\uFFC1\uFFC8\uFFC9\uFFD0\uFFD1\uFFD8\uFFD9\ uFFDD-\uFFDF\uFFE7\uFFEF-\uFFFB\uFFFE\uFFFF]"
invisible = re.compile(ichars) invisible = re.compile(ichars)
class URIExtract(ScannerPlugin): class URIExtract(ScannerPlugin):
"""Extract URIs from message bodies and store them as list in tag body.uris" "" """Extract URIs from message bodies and store them as list in tag body.uris" ""
def __init__(self,config,section=None): def __init__(self,config,section=None):
ScannerPlugin.__init__(self,config,section) ScannerPlugin.__init__(self,config,section)
self.logger = self._logger() self.logger = self._logger()
self.extractor=None self.extractor=None
self.requiredvars = { self.requiredvars = {
'domainskiplist':{ 'domainskiplist':{
'default':'/etc/fuglu/extract-skip-domains.txt', 'default':'/etc/fuglu/extract-skip-domains.txt',
'description':'Domain skip list', 'description':'Domain skip list',
}, },
'timeout': {
'default': '-1',
'description': 'Max. time after which extraction will be stopped
(approximate only, <0.0:infinite)',
},
'maxsize': { 'maxsize': {
'default': '10485000', 'default': '10485000',
'description': 'Maximum size of processed mail parts/attachments .', 'description': 'Maximum size of processed mail parts/attachments .',
}, },
'maxsize_analyse': { 'maxsize_analyse': {
'default': '2000000', 'default': '2000000',
'description': 'Maximum size of string to analyze in bytes.', 'description': 'Maximum size of string to analyze in bytes.',
}, },
'loguris':{ 'loguris':{
'default':'no', 'default':'no',
'description':'print extracted uris in fuglu log', 'description':'print extracted uris in fuglu log',
}, },
'usehacks': { 'usehacks': {
'default': 'false', 'default': '0',
'description': 'Use extra hacks trying to parse uris', 'description': 'Use extra hacks (int level) trying to parse uris
(0: no hacks)',
}, },
'uricheckheaders': { 'uricheckheaders': {
'default': '', 'default': '',
'description': 'List with headers to check for uris', 'description': 'List with headers to check for uris',
}, },
} }
def _prepare(self): def _prepare(self):
if self.extractor is None: if self.extractor is None:
self.extractor = URIExtractor() self.extractor = URIExtractor()
skiplist=self.config.get(self.section,'domainskiplist') skiplist=self.config.get(self.section,'domainskiplist')
if skiplist!='': if skiplist!='':
self.extractor.load_skiplist(skiplist) self.extractor.load_skiplist(skiplist)
def _run(self,suspect): def _run(self, suspect: Suspect):
if not DOMAINMAGIC_AVAILABLE: if not DOMAINMAGIC_AVAILABLE:
self.logger.info('Not scanning - Domainmagic not available') self.logger.info('Not scanning - Domainmagic not available')
return DUNNO return DUNNO
maxsize = self.config.getint(self.section, 'maxsize') maxsize = self.config.getint(self.section, 'maxsize')
maxsize_analyse = self.config.getint(self.section, 'maxsize_analyse') maxsize_analyse = self.config.getint(self.section, 'maxsize_analyse')
usehacks = self.config.getboolean(self.section, 'usehacks')
try:
usehacks = self.config.getint(self.section, 'usehacks')
except Exception:
usehacks = self.config.getboolean(self.section, 'usehacks')
usehacks = 1 if usehacks else 0
timeout = self.config.getfloat(self.section, 'timeout', 0.0)
if timeout and timeout > 0:
# use section name as timeout tag
suspect.stimeout_set_timer(self.section, timeout)
self._prepare() self._prepare()
uris = [] uris = []
hrefs = [] hrefs = []
for content in self.get_decoded_textparts(suspect, ignore_words_without= textparts = self.get_decoded_textparts(suspect, ignore_words_without='.'
'.', maxsize=maxsize, maxsize_analyse=maxsize_analyse, hrefs=hrefs, use_hacks=u , maxsize=maxsize, maxsize_analyse=maxsize_analyse, hrefs=hrefs, use_hacks=useh
sehacks): acks)
for content in textparts:
# check for timeout
if not suspect.stimeout_continue(self.section):
self.logger.warning(f"{suspect.id} Timeout in content loop: {sus
pect.stimeout_string(self.section)}")
# save whatever is available atm
suspect.set_tag('body.uris', uris)
return DUNNO
try: try:
parturis = self.extractor.extracturis(content, use_hacks=usehack for uri in content.split(' '):
s) parturis = self.extractor.extracturis(uri, use_hacks=usehack
uris.extend(parturis) s)
uris.extend(parturis)
if not suspect.stimeout_continue(self.section):
self.logger.warning(
f"{suspect.id} Timeout in uri content loop: {suspect
.stimeout_string(self.section)}")
# save whatever is available atm
suspect.set_tag('body.uris', uris)
return DUNNO
except Exception as e: except Exception as e:
self.logger.error('%s failed to extract URIs from msg part: %s' % (suspect.id, str(e))) self.logger.error('%s failed to extract URIs from msg part: %s' % (suspect.id, str(e)))
# add hrefs from html a-tags directly to list # add hrefs from html a-tags directly to list
# - ignore mail addresses (mailto:) # - ignore mail addresses (mailto:)
# - ignore internal references, phone numbers, javascript and html tags (#...) # - ignore internal references, phone numbers, javascript and html tags (#...)
# - ignore incomplete template replacements typically starting with squa re brackets # - ignore incomplete template replacements typically starting with squa re brackets
hrefs = [h for h in hrefs if not h.lower().startswith( hrefs = [h for h in hrefs if not h.lower().startswith(
("mailto:", "cid:", "tel:", "fax:", "javascript:", '#', "file:", "[" , ("mailto:", "cid:", "tel:", "fax:", "javascript:", '#', "file:", "[" ,
"x-apple-data-detectors:", "applewebdata:",)) "x-apple-data-detectors:", "applewebdata:",))
] ]
uris.extend(hrefs) uris.extend(hrefs)
skipping to change at line 149 skipping to change at line 190
self.logger.debug(f"{suspect.id} Extracted uris \"{headeruris}\" fro m headers") self.logger.debug(f"{suspect.id} Extracted uris \"{headeruris}\" fro m headers")
if self.config.getboolean(self.section,'loguris'): if self.config.getboolean(self.section,'loguris'):
self.logger.info('%s Extracted URIs: %s' % (suspect.id, uris)) self.logger.info('%s Extracted URIs: %s' % (suspect.id, uris))
suspect.set_tag('body.uris', uris) suspect.set_tag('body.uris', uris)
return DUNNO return DUNNO
def examine(self, suspect): def examine(self, suspect):
return self._run(suspect) return self._run(suspect)
def get_decoded_textparts(self, suspect,bcompatible=True, ignore_words_witho def get_decoded_textparts(self, suspect,ignore_words_without=(),maxsize=None
ut=(), , maxsize_analyse=None, hrefs=None, use_hacks=None):
maxsize=None, maxsize_analyse=None, hrefs=None, us
e_hacks=None):
"""bcompatible True will work with FUGLU version before implementation o
f attachment manager in Suspect """
textparts = [] textparts = []
try:
att_mgr = suspect.att_mgr
except AttributeError:
message = 'This version of URIextract is supposed to use a FUGLU ver
sion with Attachment Manager. \n' \
'Please update your FUGLU version'
if bcompatible:
self.logger.warning(message)
else:
raise AttributeError(message)
return self.get_decoded_textparts_deprecated(suspect)
size_string_analyse = 0 size_string_analyse = 0
att_mgr = suspect.att_mgr
for attObj in att_mgr.get_objectlist(): for attObj in att_mgr.get_objectlist():
# check for timeout
if not suspect.stimeout_continue(self.section):
self.logger.warning(f"{suspect.id} Timeout in loop extracting te
xt parts: {suspect.stimeout_string(self.section)}")
# save whatever is available atm
return textparts
decoded_payload = None decoded_payload = None
if attObj.content_fname_check(contenttype_start="text/") \ if attObj.content_fname_check(contenttype_start="text/") \
or attObj.content_fname_check(name_end=(".txt", ".html", ".h tm")) \ or attObj.content_fname_check(name_end=(".txt", ".html", ".h tm")) \
or (attObj.defects and attObj.content_fname_check(ctype_star t="text/")): or (attObj.defects and attObj.content_fname_check(ctype_star t="text/")):
if maxsize and attObj.filesize and attObj.filesize > maxsize: if maxsize and attObj.filesize and attObj.filesize > maxsize:
# ignore parts larger than given limit # ignore parts larger than given limit
self.logger.info("%s, ignore part %s with size %s" self.logger.info("%s, ignore part %s with size %s" % (suspec
% (suspect.id, attObj.filename, attObj.file t.id, attObj.filename, attObj.filesize))
size))
continue continue
decoded_payload = attObj.decoded_buffer_text
decoded_payload = attObj.decoded_buffer_text
if attObj.content_fname_check(contenttype_contains="html") \ if attObj.content_fname_check(contenttype_contains="html") \
or attObj.content_fname_check(name_contains=".htm") \ or attObj.content_fname_check(name_contains=".htm") \
or (attObj.defects and attObj.content_fname_check(ctype_ contains="html")): or (attObj.defects and attObj.content_fname_check(ctype_ contains="html")):
# remove invisible characters (including \r\n) but also chec k original source # remove invisible characters (including \r\n) but also chec k original source
decoded_payload_orig = decoded_payload decoded_payload_orig = decoded_payload
decoded_payload = invisible.sub("", decoded_payload_orig) decoded_payload = invisible.sub("", decoded_payload_orig)
decoded_payload_replacedchars = "" decoded_payload_replacedchars = ""
if use_hacks: if use_hacks:
# same as above, but handle newlines differently to catc h a link starting at a # same as above, but handle newlines differently to catc h a link starting at a
# new line which would otherwise be concatenated and the n not recognised by domainmagic # new line which would otherwise be concatenated and the n not recognised by domainmagic
decoded_payload_replacedchars = \ decoded_payload_replacedchars = invisible.sub("", decode
invisible.sub("", decoded_payload_orig.replace('\r', d_payload_orig.replace('\r', ' ').replace('\n', ' '))
' ').replace('\n', ' '))
try: try:
decoded_payload = html.unescape(decoded_payload) decoded_payload = html.unescape(decoded_payload)
decoded_payload_replacedchars = html.unescape(decoded_pa yload_replacedchars) decoded_payload_replacedchars = html.unescape(decoded_pa yload_replacedchars)
except Exception: except Exception:
self.logger.debug('%s failed to unescape html entities' % suspect.id) self.logger.debug('%s failed to unescape html entities' % suspect.id)
if BeautifulSoup: if HAVE_BEAUTIFULSOUP:
saferedir = [] saferedir = []
atags = [] atags = []
imgtags = [] imgtags = []
atagshtml = None atagshtml = None
if isinstance(hrefs, list): if isinstance(hrefs, list):
atagshtml = BeautifulSoup.BeautifulSoup(decoded_payl oad, "lxml").find_all('a') atagshtml = BeautifulSoup.BeautifulSoup(decoded_payl oad, "lxml").find_all('a')
if atagshtml: if atagshtml:
atags = list(set([atag.get("href") for atag in a tagshtml if atag.get("href")])) atags = list(set([atag.get("href") for atag in a tagshtml if atag.get("href")]))
# some gmail-fu # some gmail-fu
saferedir = list(set([atag.get("data-saferedirec turl") for atag in atagshtml if atag.get("data-saferedirecturl")])) saferedir = list(set([atag.get("data-saferedirec turl") for atag in atagshtml if atag.get("data-saferedirecturl")]))
skipping to change at line 279 skipping to change at line 311
# ignore parts larger than given limit # ignore parts larger than given limit
self.logger.info("%s, ignore part with contenttype 'multipar t/alternative' and size %u" self.logger.info("%s, ignore part with contenttype 'multipar t/alternative' and size %u"
% (suspect.id, len(attObj.decoded_buffer_te xt))) % (suspect.id, len(attObj.decoded_buffer_te xt)))
continue continue
decoded_payload = attObj.decoded_buffer_text decoded_payload = attObj.decoded_buffer_text
# Calendar items are special, line continuation starts # Calendar items are special, line continuation starts
# with a whitespace -> join correctly to detect links correctly # with a whitespace -> join correctly to detect links correctly
if attObj.content_fname_check(contenttype="text/calendar"): if attObj.content_fname_check(contenttype="text/calendar"):
buffer = decoded_payload.replace('\r\n', '\n').split('\n') joinedlines = None
if vobject and redirect_stdout:
joinedlines = [] try:
for line in buffer: parsed = vobject.readOne(decoded_payload)
if line.startswith(' '): f = io.StringIO()
if joinedlines: with redirect_stdout(f):
joinedlines[-1] = joinedlines[-1].rstrip() + line.ls parsed.prettyPrint()
trip() joinedlines = f.getvalue().splitlines()
self.logger.info(f"{suspect.id} decoded calendar item us
ing vobject to {len(joinedlines)} lines")
except Exception as e:
self.logger.warning(f"{suspect.id} problem decoding cale
ndar item using vobject: {str(e)}")
if joinedlines is None:
buffer = decoded_payload.replace('\r\n', '\n').split('\n')
joinedlines = []
for line in buffer:
if line.startswith(' '):
if joinedlines:
joinedlines[-1] = joinedlines[-1].rstrip() + lin
e.lstrip()
else:
joinedlines.append(line)
else: else:
joinedlines.append(line) joinedlines.append(line)
else: self.logger.info(f"{suspect.id} decoded calendar item to {le
joinedlines.append(line) n(joinedlines)} lines")
decoded_payload = " ".join(joinedlines) decoded_payload = " ".join(joinedlines)
if decoded_payload: if decoded_payload:
# Some spam mails create very long lines that will dramatically slow down the regex later on. # Some spam mails create very long lines that will dramatically slow down the regex later on.
for ignore_element in ignore_words_without: for ignore_element in ignore_words_without:
decoded_payload = " ".join([part for part in decoded_payload .split(' ') if ignore_element in part]) decoded_payload = " ".join([part for part in decoded_payload .split(' ') if ignore_element in part])
if maxsize_analyse and size_string_analyse + len(decoded_payload ) > maxsize_analyse: if maxsize_analyse and size_string_analyse + len(decoded_payload ) > maxsize_analyse:
# ignore parts larger than given limit # ignore parts larger than given limit
self.logger.info("%s, ignore part %s due to processed size % u and current size of analyse string %u" self.logger.info("%s, ignore part %s due to processed size % u and current size of analyse string %u"
skipping to change at line 347 skipping to change at line 394
for ignore_element in ignore_words_without: for ignore_element in ignore_words_without:
decoded_payload = " ".join([part for part in hstring.spl it(' ') if ignore_element in part]) decoded_payload = " ".join([part for part in hstring.spl it(' ') if ignore_element in part])
if decoded_payload.strip(): if decoded_payload.strip():
stringlist2analyse.append(decoded_payload) stringlist2analyse.append(decoded_payload)
string2analyse = " ".join(stringlist2analyse) string2analyse = " ".join(stringlist2analyse)
if not stringlist2analyse: if not stringlist2analyse:
return [] return []
usehacks = self.config.getboolean(self.section, 'usehacks') try:
usehacks = self.config.getint(self.section, 'usehacks')
except Exception:
usehacks = self.config.getboolean(self.section, 'usehacks')
usehacks = 1 if usehacks else 0
headeruris = self.extractor.extracturis(string2analyse, use_hacks=usehac ks) headeruris = self.extractor.extracturis(string2analyse, use_hacks=usehac ks)
suspect.set_tag('headers.uris', headeruris) suspect.set_tag('headers.uris', headeruris)
return headeruris return headeruris
def get_decoded_textparts_deprecated(self, suspect):
"""Returns a list of all text contents"""
messagerep = suspect.get_message_rep()
textparts=[]
for part in messagerep.walk():
if part.is_multipart():
continue
fname=part.get_filename(None)
if fname is None:
fname=""
fname=fname.lower()
contenttype=part.get_content_type()
if contenttype.startswith('text/') or fname.endswith(".txt") or fnam
e.endswith(".html") or fname.endswith(".htm"):
payload=part.get_payload(None,True)
if payload is not None:
# Try to decode using the given char set (or utf-8 by defaul
t)
charset = part.get_content_charset("utf-8")
payload = force_uString(payload,encodingGuess=charset)
if 'html' in contenttype or '.htm' in fname: #remove newlines fr
om html so we get uris spanning multiple lines
payload=payload.replace('\n', '').replace('\r', '')
try:
payload = html.unescape(payload)
except Exception:
self.logger.debug('%s failed to unescape html entities' % su
spect.id)
textparts.append(payload)
if contenttype=='multipart/alternative':
try:
payload = part.get_payload(None,True)
if payload is not None:
# Try to decode using the given char set
charset = part.get_content_charset("utf-8")
text = force_uString(payload,encodingGuess=charset)
textparts.append(text)
except (UnicodeEncodeError, UnicodeDecodeError):
self.logger.debug('%s failed to convert alternative part to
string' % suspect.id)
return textparts
def lint(self): def lint(self):
allok = True allok = True
if not DOMAINMAGIC_AVAILABLE: if not DOMAINMAGIC_AVAILABLE:
print("ERROR: domainmagic lib or one of its dependencies (dnspython/ pygeoip) is not installed!") print("ERROR: domainmagic lib or one of its dependencies (dnspython/ pygeoip) is not installed!")
allok = False allok = False
if allok: if allok:
allok = self.check_config() allok = self.check_config()
return allok return allok
skipping to change at line 436 skipping to change at line 445
} }
}) })
def _run(self,suspect): def _run(self,suspect):
if not DOMAINMAGIC_AVAILABLE: if not DOMAINMAGIC_AVAILABLE:
self.logger.info('Not scanning - Domainmagic not available') self.logger.info('Not scanning - Domainmagic not available')
return DUNNO return DUNNO
maxsize = self.config.getint(self.section, 'maxsize') maxsize = self.config.getint(self.section, 'maxsize')
maxsize_analyse = self.config.getint(self.section, 'maxsize_analyse') maxsize_analyse = self.config.getint(self.section, 'maxsize_analyse')
timeout = self.config.getfloat(self.section, 'timeout', 0.0)
if timeout and timeout > 0:
# use section name as timeout tag
suspect.stimeout_set_timer(self.section, timeout)
self._prepare() self._prepare()
body_emails = []
hrefs = [] hrefs = []
textparts=" ".join(self.get_decoded_textparts(suspect, ignore_words_with for content in self.get_decoded_textparts(suspect, ignore_words_without=
out="@", maxsize=maxsize, maxsize_analyse=maxsize_analyse, hrefs=hrefs)) "@", maxsize=maxsize, maxsize_analyse=maxsize_analyse, hrefs=hrefs):
body_emails = self.extractor.extractemails(textparts) # check for timeout
if not suspect.stimeout_continue(self.section):
self.logger.warning(f"{suspect.id} Timeout in content loop: {sus
pect.stimeout_string(self.section)}")
# save whatever is available atm
suspect.set_tag('body.emails', body_emails)
suspect.set_tag('body.emails.domains', [])
suspect.set_tag('header.emails', [])
suspect.set_tag('header.emails.domains', [])
suspect.set_tag('emails', body_emails)
suspect.set_tag('emails.domains', [])
return DUNNO
try:
for email in content.split(' '):
part_emails = self.extractor.extractemails(email)
body_emails.extend(part_emails)
if not suspect.stimeout_continue(self.section):
self.logger.warning(
f"{suspect.id} Timeout in email content loop: {suspe
ct.stimeout_string(self.section)}")
# save whatever is available atm
suspect.set_tag('body.emails', body_emails)
suspect.set_tag('body.emails.domains', [])
suspect.set_tag('header.emails', [])
suspect.set_tag('header.emails.domains', [])
suspect.set_tag('emails', body_emails)
suspect.set_tag('emails.domains', [])
return DUNNO
except Exception as e:
self.logger.error('%s failed to extract URIs from msg part: %s'
% (suspect.id, str(e)))
# directly use mail addresses from html hrefs in atags # directly use mail addresses from html hrefs in atags
hrefs = [h[len("mailto:"):] for h in hrefs if h.lower().startswith("mail to:")] hrefs = [h[len("mailto:"):] for h in hrefs if h.lower().startswith("mail to:")]
body_emails.extend(hrefs) body_emails.extend(hrefs)
hdrs = '' hdrs = ''
for hdr in self.config.get(self.section, 'headers').split(','): for hdr in self.config.getlist(self.section, 'headers'):
hdrs += " " + " ".join(force_uString(suspect.get_message_rep().get_a ll(hdr, ""))) hdrs += " " + " ".join(force_uString(suspect.get_message_rep().get_a ll(hdr, "")))
hdr_emails = self.extractor.extractemails(hdrs) hdr_emails = self.extractor.extractemails(hdrs)
if self.config.getboolean(self.section, 'with_envelope_sender') and susp ect.from_address: if self.config.getboolean(self.section, 'with_envelope_sender') and susp ect.from_address:
hdr_emails.append(suspect.from_address) hdr_emails.append(suspect.from_address)
ignoreemailtext="" ignoreemailtext=""
for hdr in self.config.get(self.section,'skipheaders').split(','): for hdr in self.config.getlist(self.section,'skipheaders'):
ignoreemailtext += " " + " ".join(force_uString(suspect.get_message_ rep().get_all(hdr,""))) ignoreemailtext += " " + " ".join(force_uString(suspect.get_message_ rep().get_all(hdr,"")))
ignoreemails=[x.lower() for x in self.extractor.extractemails(ignoreemai ltext)] ignoreemails=[x.lower() for x in self.extractor.extractemails(ignoreemai ltext)]
ignoreemails.extend(suspect.recipients) ignoreemails.extend(suspect.recipients)
body_emails_final = [] body_emails_final = []
for e in body_emails: for e in body_emails:
if e.lower() not in ignoreemails: if e.lower() not in ignoreemails:
body_emails_final.append(e) body_emails_final.append(e)
hdr_emails_final = [] hdr_emails_final = []
for e in hdr_emails: for e in hdr_emails:
if e.lower() not in ignoreemails: if e.lower() not in ignoreemails:
hdr_emails_final.append(e) hdr_emails_final.append(e)
# make lists unique # make lists unique
body_emails_final = list(set(body_emails_final)) body_emails_final = list(set(body_emails_final))
hdr_emails_final = list(set(hdr_emails_final)) hdr_emails_final = list(set(hdr_emails_final))
all_emails = list(set(body_emails_final + hdr_emails_final)) all_emails = list(set(body_emails_final + hdr_emails_final))
# collect domains
body_emaildomains_final = []
for mail in body_emails_final:
if mail and "@" in mail:
try:
loc, dom = mail.rsplit("@", 1)
if dom:
body_emaildomains_final.append(dom)
except Exception as e:
self.logger.error(f"(bodydomain) Couldn't split {mail} in lo
calpart & domain: {str(e)}")
hdr_emaildomains_final = []
for mail in hdr_emails_final:
if mail and "@" in mail:
try:
loc, dom = mail.rsplit("@", 1)
if dom:
hdr_emaildomains_final.append(dom)
except Exception as e:
self.logger.error(f"(headerdomain) Couldn't split {mail} in
localpart & domain: {str(e)}")
body_emaildomains_final = list(set(body_emaildomains_final))
hdr_emaildomains_final = list(set(hdr_emaildomains_final))
all_emaildomains = list(set(body_emaildomains_final + hdr_emaildomains_f
inal))
# set tags
suspect.set_tag('body.emails', body_emails_final) suspect.set_tag('body.emails', body_emails_final)
suspect.set_tag('body.emails.domains', body_emaildomains_final)
suspect.set_tag('header.emails', hdr_emails_final) suspect.set_tag('header.emails', hdr_emails_final)
suspect.set_tag('header.emails.domains', hdr_emaildomains_final)
suspect.set_tag('emails', all_emails) suspect.set_tag('emails', all_emails)
suspect.set_tag('emails.domains', all_emaildomains)
if self.config.getboolean(self.section,'loguris'): if self.config.getboolean(self.section,'loguris'):
self.logger.info("Extracted emails: %s" % all_emails) self.logger.info("Extracted emails: %s" % all_emails)
self.logger.info("Extracted emaildomains: %s" % all_emaildomains)
return DUNNO return DUNNO
class DomainAction(ScannerPlugin): class DomainAction(ScannerPlugin):
"""Perform Action based on Domains in message body""" """Perform Action based on Domains in message body"""
def __init__(self,config,section=None): def __init__(self,config,section=None):
ScannerPlugin.__init__(self,config,section) ScannerPlugin.__init__(self,config,section)
self.logger = self._logger() self.logger = self._logger()
skipping to change at line 534 skipping to change at line 613
'description': 'test record that should be included in at least one checked rbl (only used in lint)' 'description': 'test record that should be included in at least one checked rbl (only used in lint)'
}, },
'exceptions_file': { 'exceptions_file': {
'default':'', 'default':'',
'description': 'path to file containing domains that should not be checked (one per line)' 'description': 'path to file containing domains that should not be checked (one per line)'
}, },
'suspect_tags': { 'suspect_tags': {
'default':'body.uris', 'default':'body.uris',
'description': 'evaluate URIs listed in given tags (list tags wh ite space separated)' 'description': 'evaluate URIs listed in given tags (list tags wh ite space separated)'
}, },
'userpref_types': {
'default': 'uridnsbl_skip_domain',
'description': 'comma separated list of spamassassin userpref ty
pes containing skip domain entries'
},
'userpref_dbconnection': {
'default': '',
'description': "sqlalchemy db connect string, e.g. mysql:///loca
lhost/spamassassin",
},
'userpref_usecache':{
'default':"True",
'description':'Use Mem Cache. This is recommended. However, if e
nabled it will take up to userpref_cache_ttl seconds until listing changes are e
ffective.',
},
} }
self.cache = get_default_cache()
self.rbllookup = None self.rbllookup = None
self.tldmagic = None self.tldmagic = None
self.extratlds = None self.extratlds = None
self.lasttlds = None self.lasttlds = None
self.exceptions = None self.exceptions = None
def _init_tldmagic(self): def _init_tldmagic(self):
init_tldmagic = False init_tldmagic = False
extratlds = [] extratlds = []
skipping to change at line 563 skipping to change at line 655
extratlds = self.extratlds.get_list() extratlds = self.extratlds.get_list()
if self.lasttlds != extratlds: # extra tld file changed if self.lasttlds != extratlds: # extra tld file changed
self.lasttlds = extratlds self.lasttlds = extratlds
init_tldmagic = True init_tldmagic = True
if self.tldmagic is None or init_tldmagic: if self.tldmagic is None or init_tldmagic:
self.tldmagic = TLDMagic() self.tldmagic = TLDMagic()
for tld in extratlds: # add extra tlds to tldmagic for tld in extratlds: # add extra tlds to tldmagic
self.tldmagic.add_tld(tld) self.tldmagic.add_tld(tld)
def _check_skiplist(self, value): def _get_sa_userpref(self):
key = '%s-userpref' % self.__class__.__name__
usecache = self.config.getboolean(self.section, 'userpref_usecache')
skipuri = {}
if usecache:
skipuri = self.cache.get_cache(key) or {}
if not skipuri:
dbconn = self.config.get(self.section, 'userpref_dbconnection')
userpref_types = self.config.getlist(self.section, 'userpref_types')
if not dbconn or not userpref_types:
self.logger.debug('userpref_dbconnection or userpref_types not s
et')
return skipuri
dbsession = get_session(dbconn)
query = dbsession.query(UserPref)
query = query.filter(UserPref.preference.in_(userpref_types))
result = query.all()
for r in result:
if r.preference == "emailbl_acl_freemail" and not r.value.starts
with('!'):
continue
value = r.value.strip('!')
try:
skipuri[r.username].append(value)
except KeyError:
skipuri[r.username] = [value]
for username in skipuri.keys():
skipuri[username] = list(set(skipuri[username]))
if skipuri:
cachettl = self.config.getint(self.section, 'userpref_cache_ttl'
)
self.cache.put_cache(key, skipuri, ttl=cachettl)
return skipuri
def _gen_userskiplist(self, recipient):
skiplist = []
if self.exceptions is None: if self.exceptions is None:
exceptionsfile = self.config.get(self.section, 'exceptions_file') exceptionsfile = self.config.get(self.section, 'exceptions_file')
if exceptionsfile and os.path.exists(exceptionsfile): if exceptionsfile and os.path.exists(exceptionsfile):
self.exceptions = FileList(exceptionsfile, lowercase=True) self.exceptions = FileList(exceptionsfile, lowercase=True)
if self.exceptions is not None: if self.exceptions is not None:
exceptionlist = self.exceptions.get_list() skiplist.extend(self.exceptions.get_list())
if value in exceptionlist:
if SQL_EXTENSION_ENABLED:
recipient = recipient.lower()
userprefs = self._get_sa_userpref()
skiplist.extend(userprefs.get(GLOBALSCOPE, []))
skiplist.extend(userprefs.get(recipient.rsplit('@', 1)[-1], []))
skiplist.extend(userprefs.get(recipient, []))
return list(set(skiplist))
def _check_skiplist(self, skiplist, domain):
for item in skiplist:
if domain == item or domain.endswith(f'.{item}'):
return True return True
return False return False
def _init_rbllookup(self): def _init_rbllookup(self):
if self.rbllookup is None: if self.rbllookup is None:
blacklistconfig = self.config.get(self.section,'blacklistconfig') blacklistconfig = self.config.get(self.section,'blacklistconfig')
if os.path.exists(blacklistconfig): if os.path.exists(blacklistconfig):
self.rbllookup = RBLLookup() self.rbllookup = RBLLookup()
self.rbllookup.from_config(blacklistconfig) self.rbllookup.from_config(blacklistconfig)
skipping to change at line 617 skipping to change at line 753
domains = list(domains) domains = list(domains)
if self.config.getboolean(self.section, 'randomise'): if self.config.getboolean(self.section, 'randomise'):
domains = random.shuffle(domains) domains = random.shuffle(domains)
action = DUNNO action = DUNNO
message = None message = None
hits = {} hits = {}
counter=0 counter=0
self.logger.debug('%s checking domains %s' % (suspect.id, ', '.join(doma ins))) self.logger.debug('%s checking domains %s' % (suspect.id, ', '.join(doma ins)))
skiplist = self._gen_userskiplist(suspect.to_address)
for domain in domains: for domain in domains:
if self._check_skiplist(domain): if self._check_skiplist(skiplist, domain):
self.logger.debug('%s skipping lookup of %s (skiplisted)' % (sus pect.id, domain)) self.logger.debug('%s skipping lookup of %s (skiplisted)' % (sus pect.id, domain))
continue continue
counter+=1 counter+=1
if counter>self.config.getint(self.section,'maxlookups'): if counter>self.config.getint(self.section,'maxlookups'):
self.logger.info("%s maximum number of domains reached" % suspec t.id) self.logger.info("%s maximum number of domains reached" % suspec t.id)
break break
tldcount=self.tldmagic.get_tld_count(domain) tldcount=self.tldmagic.get_tld_count(domain)
parts=domain.split('.') parts=domain.split('.')
skipping to change at line 663 skipping to change at line 800
def lint(self): def lint(self):
allok = True allok = True
if not DOMAINMAGIC_AVAILABLE: if not DOMAINMAGIC_AVAILABLE:
print("ERROR: domainmagic lib or one of its dependencies (dnspython/ pygeoip) is not installed!") print("ERROR: domainmagic lib or one of its dependencies (dnspython/ pygeoip) is not installed!")
allok = False allok = False
if allok: if allok:
allok = self.check_config() allok = self.check_config()
if allok and not SQL_EXTENSION_ENABLED and self.config.get(self.section,
'userpref_dbconnection'):
print('WARNING: sql extension not active but spamassassin userpref q
uery enabled')
allok = False
if allok: if allok:
blconf = self.config.get(self.section,'blacklistconfig') blconf = self.config.get(self.section,'blacklistconfig')
if not blconf: if not blconf:
allok = False allok = False
print('ERROR: blacklistconfig not defined') print('ERROR: blacklistconfig not defined')
elif not os.path.exists(blconf): elif not os.path.exists(blconf):
allok = False allok = False
print('ERROR: blacklistconfig %s not found' % blconf) print('ERROR: blacklistconfig %s not found' % blconf)
if allok and self.config.has_option(self.section, 'extra_tld_file'): if allok and self.config.has_option(self.section, 'extra_tld_file'):
skipping to change at line 717 skipping to change at line 858
DomainAction.__init__(self,config,section) DomainAction.__init__(self,config,section)
self.domainlist = None self.domainlist = None
del self.requiredvars['extra_tld_file'] del self.requiredvars['extra_tld_file']
del self.requiredvars['checksubdomains'] del self.requiredvars['checksubdomains']
self.requiredvars['message']['default'] = '5.7.1 black listed email addr ess ${address} by ${blacklist}' self.requiredvars['message']['default'] = '5.7.1 black listed email addr ess ${address} by ${blacklist}'
self.requiredvars['domainlist_file'] = { self.requiredvars['domainlist_file'] = {
'default':'', 'default':'',
'description':'path to file containing a list of domains. if specifi ed, only query email addresses in these domains.' 'description':'path to file containing a list of domains. if specifi ed, only query email addresses in these domains.'
} }
self.requiredvars['exceptions_file']['description'] = 'path to file cont aining email addresses that should not be checked (one per line)' self.requiredvars['exceptions_file']['description'] = 'path to file cont aining email addresses that should not be checked (one per line)'
self.requiredvars['userpref_types']['default'] = 'emailbl_acl_freemail, uridnsbl_skip_domain'
def _in_domainlist(self, email_address): def _in_domainlist(self, email_address):
domainlist_file = self.config.get(self.section,'domainlist_file').strip( ) domainlist_file = self.config.get(self.section,'domainlist_file').strip( )
if domainlist_file == '': if domainlist_file == '':
return True return True
if self.domainlist is None: if self.domainlist is None:
self.domainlist = FileList(domainlist_file, lowercase=True) self.domainlist = FileList(domainlist_file, lowercase=True)
in_domainlist = False in_domainlist = False
domain = domain_from_mail(email_address) domain = domain_from_mail(email_address)
if domain in self.domainlist.get_list(): if domain in self.domainlist.get_list():
in_domainlist = True in_domainlist = True
return in_domainlist return in_domainlist
def _check_skiplist(self, skiplist, emailaddr):
maildomain = emailaddr.rsplit('@', 1)[-1]
for item in skiplist:
comp_mail = '@' in item
if comp_mail and item==emailaddr:
return True
elif not comp_mail and maildomain==item or maildomain.endswith(f'.{i
tem}'):
return True
return False
def examine(self, suspect): def examine(self, suspect):
if not DOMAINMAGIC_AVAILABLE: if not DOMAINMAGIC_AVAILABLE:
self.logger.info('Not scanning - Domainmagic not available') self.logger.info('Not scanning - Domainmagic not available')
return DUNNO return DUNNO
self._init_rbllookup() self._init_rbllookup()
if self.rbllookup is None: if self.rbllookup is None:
self.logger.error('Not scanning - blacklistconfig could not be loade d') self.logger.error('Not scanning - blacklistconfig could not be loade d')
action = DUNNO action = DUNNO
message = None message = None
hits = {} hits = {}
checked = {} checked = {}
skiplist = self._gen_userskiplist(suspect.to_address)
for addrtype in ['header.emails', 'body.emails']: for addrtype in ['header.emails', 'body.emails']:
addrs = suspect.get_tag(addrtype, []) addrs = suspect.get_tag(addrtype, [])
if self.config.getboolean(self.section, 'randomise'): if self.config.getboolean(self.section, 'randomise'):
addrs = random.shuffle(addrs) addrs = random.shuffle(addrs)
for addr in addrs: for addr in addrs:
if self._check_skiplist(addr): if self._check_skiplist(skiplist, addr):
self.logger.debug('%s skipping lookup of %s (skiplisted)' % (suspect.id, addr)) self.logger.debug('%s skipping lookup of %s (skiplisted)' % (suspect.id, addr))
continue continue
if not self._in_domainlist(addr): if not self._in_domainlist(addr):
self.logger.debug('%s skipping lookup of %s (not in domain l ist)' % (suspect.id, addr)) self.logger.debug('%s skipping lookup of %s (not in domain l ist)' % (suspect.id, addr))
continue continue
if len(checked) > self.config.getint(self.section, 'maxlookups') : if len(checked) > self.config.getint(self.section, 'maxlookups') :
self.logger.info("%s maximum number of %s addresses reached" % (suspect.id, addrtype)) self.logger.info("%s maximum number of %s addresses reached" % (suspect.id, addrtype))
break break
 End of changes. 46 change blocks. 
108 lines changed or deleted 277 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)