pyzor  0.5.0
About: Pyzor is a collaborative, networked system to detect and block spam using identifying digests of messages.
  Fossies Dox: pyzor-0.5.0.tar.gz  ("inofficial" and yet experimental doxygen-generated source code documentation)  

 All Classes Namespaces Files Functions Variables Properties
server.py
Go to the documentation of this file.
1 """networked spam-signature detection server"""
2 
3 from __future__ import division
4 
5 import os
6 import sys
7 import SocketServer
8 import time
9 import gdbm
10 import cStringIO
11 import traceback
12 import threading
13 
14 import pyzor
15 from pyzor import *
16 
17 __author__ = pyzor.__author__
18 __version__ = pyzor.__version__
19 __revision__ = "$Id: server.py,v 1.29 2002-10-09 00:45:45 ftobin Exp $"
20 
21 
23  """signature was valid, but not permitted to
24  do the requested action"""
25  pass
26 
27 
28 class ACL(object):
29  __slots__ = ['entries']
30  default_allow = False
31 
32  def __init__(self):
33  self.entries = []
34 
35  def add_entry(self, entry):
36  typecheck(entry, ACLEntry)
37  self.entries.append(entry)
38 
39  def allows(self, user, op):
40  typecheck(user, Username)
41  typecheck(op, Opname)
42 
43  for entry in self.entries:
44  if entry.allows(user, op):
45  return True
46  if entry.denies(user, op):
47  return False
48  return self.default_allow
49 
50 
51 class ACLEntry(tuple):
52  all_keyword = 'all'.lower()
53 
54  def __init__(self, v):
55  (user, op, allow) = v
56  typecheck(user, Username)
57  typecheck(op, Opname)
58  assert bool(allow) == allow
59 
60  def user(self):
61  return self[0]
62  user = property(user)
63 
64  def op(self):
65  return self[1]
66  op = property(op)
67 
68  def allow(self):
69  return self[2]
70  allow = property(allow)
71 
72  def allows(self, user, op):
73  return self._says(user, op, True)
74 
75  def denies(self, user, op):
76  return self._says(user, op, False)
77 
78  def _says(self, user, op, allow):
79  """If allow is True, we return true if and only if we allow user to do op.
80  If allow is False, we return true if and only if we deny user to do op
81  """
82  typecheck(user, Username)
83  typecheck(op, Opname)
84  assert bool(allow) == allow
85 
86  return (self.allow == allow
87  and (self.user == user
88  or self.user.lower() == self.all_keyword)
89  and (self.op == op
90  or self.op.lower() == self.all_keyword))
91 
92 
93 
94 class AccessFile(object):
95  # I started doing an iterator protocol for this, but it just
96  # got too complicated keeping track of everything on the line
97  __slots__ = ['file', 'output', 'lineno']
98  allow_keyword = 'allow'
99  deny_keyword = 'deny'
100 
101  def __init__(self, f):
102  self.output = Output()
103  self.file = f
104  self.lineno = 0
105 
106  def feed_into(self, acl):
107  typecheck(acl, ACL)
108 
109  for orig_line in self.file:
110  self.lineno += 1
111 
112  line = orig_line.strip()
113  if not line or line.startswith('#'):
114  continue
115 
116  parts = line.split(':')
117 
118  if len(parts) != 3:
119  self.output.warn("access file: invalid number of parts in line %d"
120  % self.lineno)
121  continue
122 
123  (ops_str, users_str, allow_str) = parts
124 
125  ops = []
126  for op_str in ops_str.split():
127  try:
128  op = Opname(op_str)
129  except ValueError, e:
130  self.output.warn("access file: invalid opname %s line %d: %s"
131  % (repr(op_str), self.lineno, e))
132  else:
133  ops.append(op)
134 
135  users = []
136  for u in users_str.split():
137  try:
138  user = Username(u)
139  except ValueError, e:
140  self.output.warn("access file: invalid username %s line %d: %s"
141  % (repr(u), self.lineno, e))
142  else:
143  users.append(user)
144 
145  allow_str = allow_str.strip()
146  if allow_str.lower() == self.allow_keyword:
147  allow = True
148  elif allow_str.lower() == self.deny_keyword:
149  allow = False
150  else:
151  self.output.warn("access file: invalid allow/deny keyword %s line %d"
152  % (repr(allow_str), self.lineno))
153  continue
154 
155  for op in ops:
156  for user in users:
157  acl.add_entry(ACLEntry((user, op, allow)))
158 
159 
160 
161 class Passwd(dict):
162  def __setitem__(self, k, v):
164  typecheck(v, long)
165  super(Passwd, self).__setitem__(k, v)
166 
167 
168 
170  """Iteration gives (Username, long) objects
171 
172  Format of file is:
173  user : key
174  """
175  __slots__ = ['file', 'output', 'lineno']
176 
177  def __init__(self, f):
178  self.file = f
179  self.output = Output()
180  self.lineno = 0
181 
182 
183  def next(self):
184  while True:
185  orig_line = self.file.readline()
186  self.lineno += 1
187 
188  if not orig_line:
189  raise StopIteration
190 
191  line = orig_line.strip()
192  if not line or line.startswith('#'):
193  continue
194  fields = line.split(':')
195  fields = map(lambda x: x.strip(), fields)
196 
197  if len(fields) != 2:
198  self.output.warn("passwd line %d is invalid (wrong number of parts)"
199  % self.lineno)
200  continue
201 
202  try:
203  return (Username(fields[0]), long(fields[1], 16))
204  except ValueError, e:
205  self.output.warn("invalid passwd entry line %d: %s"
206  % (self.lineno, e))
207 
208 
209 
210 class Log(object):
211  __slots__ = ['fp']
212 
213  def __init__(self, fp=None):
214  self.fp = fp
215 
216  def log(self, address, user=None, command=None, arg=None, code=None):
217  # we don't use defaults because we want to be able
218  # to pass in None
219  if user is None: user = ''
220  if command is None: command = ''
221  if arg is None: arg = ''
222  if code is None: code = -1
223 
224  # We duplicate the time field merely so that
225  # humans can peruse through the entries without processing
226  ts = int(time.time())
227  if self.fp is not None:
228  self.fp.write("%s\n" %
229  ','.join((("%d" % ts),
230  time.ctime(ts),
231  user,
232  address[0],
233  command,
234  repr(arg),
235  ("%d" % code)
236  )))
237  self.fp.flush()
238 
239 
240 
241 class Record(object):
242  """Prefix conventions used in this class:
243  r = report (spam)
244  wl = whitelist
245  """
246 
247  __slots__ = ['r_count', 'r_entered', 'r_updated',
248  'wl_count', 'wl_entered', 'wl_updated',
249  ]
250  fields = ('r_count', 'r_entered', 'r_updated',
251  'wl_count', 'wl_entered', 'wl_updated',
252  )
253  this_version = '1'
254 
255  # epoch seconds
256  never = -1
257 
258  def __init__(self, r_count=0, wl_count=0):
259  self.r_count = r_count
260  self.wl_count = wl_count
261 
262  self.r_entered = self.never
263  self.r_updated = self.never
264 
265  self.wl_entered = self.never
266  self.wl_updated = self.never
267 
268  def wl_increment(self):
269  # overflow prevention
270  if self.wl_count < sys.maxint:
271  self.wl_count += 1
272  if self.wl_entered == self.never:
273  self.wl_entered = int(time.time())
274  self.wl_update()
275 
276  def r_increment(self):
277  # overflow prevention
278  if self.r_count < sys.maxint:
279  self.r_count += 1
280  if self.r_entered == self.never:
281  self.r_entered = int(time.time())
282  self.r_update()
283 
284  def r_update(self):
285  self.r_updated = int(time.time())
286 
287  def wl_update(self):
288  self.wl_updated = int(time.time())
289 
290  def __str__(self):
291  return "%s,%d,%d,%d,%d,%d,%d" \
292  % ((self.this_version,)
293  + tuple(map(lambda x: getattr(self, x), self.fields)))
294 
295 
296  def from_str(self, s):
297  parts = s.split(',')
298  dispatch = None
299 
300  version = parts[0]
301 
302  if len(parts) == 3:
303  dispatch = self.from_str_0
304  elif version == '1':
305  dispatch = self.from_str_1
306  else:
307  raise StandardError, ("don't know how to handle db value %s"
308  % repr(s))
309 
310  return apply(dispatch, (s,))
311 
312  from_str = classmethod(from_str)
313 
314 
315  def from_str_0(self, s):
316  r = Record()
317  parts = s.split(',')
318 
319  fields = ('r_count', 'r_entered', 'r_updated')
320  assert len(parts) == len(fields)
321 
322  for i in range(len(parts)):
323  setattr(r, fields[i], int(parts[i]))
324 
325  return r
326 
327  from_str_0 = classmethod(from_str_0)
328 
329 
330  def from_str_1(self, s):
331  r = Record()
332  parts = s.split(',')[1:]
333 
334  assert len(parts) == len(self.fields)
335 
336  for i in range(len(parts)):
337  setattr(r, self.fields[i], int(parts[i]))
338 
339  return r
340 
341  from_str_1 = classmethod(from_str_1)
342 
343 
344 
346  __slots__ = ['output', 'initialized']
347  db_lock = threading.Lock()
348  max_age = 3600*24*30*4 # 3 months
349  db = None
350  sync_period = 60
351  reorganize_period = 3600*24 # 1 day
352 
353  def __init__(self):
354  assert self.db is not None, "database was not initialized"
355 
356  def initialize(self, fn, mode):
357  self.output = Output()
358  self.db = gdbm.open(fn, mode)
359  self.start_reorganizing()
360  self.start_syncing()
361  initialize = classmethod(initialize)
362 
363  def apply_locking_method(self, method, varargs=(), kwargs={}):
364  # just so we don't carry around a mutable kwargs
365  if kwargs == {}:
366  kwargs = {}
367  self.output.debug("acquiring lock")
368  self.db_lock.acquire()
369  self.output.debug("acquired lock")
370  try:
371  result = apply(method, varargs, kwargs)
372  finally:
373  self.output.debug("releasing lock")
374  self.db_lock.release()
375  self.output.debug("released lock")
376  return result
377  apply_locking_method = classmethod(apply_locking_method)
378 
379  def __getitem__(self, key):
380  return self.apply_locking_method(self._really_getitem, (key,))
381 
382  def _really_getitem(self, key):
383  return self.db[key]
384 
385  def __setitem__(self, key, value):
386  self.apply_locking_method(self._really_setitem, (key, value))
387 
388  def _really_setitem(self, key, value):
389  self.db[key] = value
390 
391  def start_syncing(self):
393  self.sync_timer = threading.Timer(self.sync_period,
394  self.start_syncing)
395  self.sync_timer.start()
396  start_syncing = classmethod(start_syncing)
397 
398  def _really_sync(self):
399  self.db.sync()
400  _really_sync = classmethod(_really_sync)
401 
404  self.reorganize_timer = threading.Timer(self.reorganize_period,
405  self.start_reorganizing)
406  self.reorganize_timer.start()
407  start_reorganizing = classmethod(start_reorganizing)
408 
409  def _really_reorganize(self):
410  self.output.debug("reorganizing the database")
411  key = self.db.firstkey()
412  breakpoint = time.time() - self.max_age
413 
414  while key is not None:
415  rec = Record.from_str(self.db[key])
416  delkey = None
417  if rec.r_updated < breakpoint:
418  self.output.debug("deleting key %s" % key)
419  delkey = key
420  key = self.db.nextkey(key)
421  if delkey:
422  del self.db[delkey]
423  self.db.reorganize()
424  _really_reorganize = classmethod(_really_reorganize)
425 
426 
427 class Server(SocketServer.ThreadingUDPServer, object):
428  max_packet_size = 8192
429  time_diff_allowance = 180
430 
431  def __init__(self, address, log):
432  typecheck(log, Log)
433  self.output = Output()
434  RequestHandler.output = self.output
435  RequestHandler.log = log
436 
437  self.output.debug('listening on %s' % str(address))
438  super(Server, self).__init__(address, RequestHandler)
439 
440  def serve_forever(self):
441  self.pid = os.getpid()
442  super(Server, self).serve_forever()
443 
444  def replace_log(self, newlog):
445  typecheck(newlog, Log)
446  RequestHandler.log = newlog
447  self.output.debug("changing logfile")
448 
449 
450 class RequestHandler(SocketServer.DatagramRequestHandler, object):
451  def setup(self):
452  super(RequestHandler, self).setup()
453 
454  # This is to work around a bug in current versions
455  # of Python. The bug has been reported, and fixed
456  # in Python's CVS.
457  self.wfile = cStringIO.StringIO()
458 
460 
461  self.out_msg = Response()
462  self.user = None
463  self.op = None
464  self.op_arg = None
465  self.out_code = None
466  self.msg_thread = None
467 
468 
469  def handle(self):
470  try:
471  self._really_handle()
472  except UnsupportedVersionError, e:
473  self.handle_error(505, "Version Not Supported: %s" % e)
474  except NotImplementedError, e:
475  self.handle_error(501, "Not implemented: %s" % e)
476  except (ProtocolError, KeyError), e:
477  # We assume that KeyErrors are due to not
478  # finding a key in the RFC822 message
479  self.handle_error(400, "Bad request: %s" % e)
480  except AuthorizationError, e:
481  self.handle_error(401, "Unauthorized: %s" % e)
482  except SignatureError, e:
483  self.handle_error(401, "Unauthorized, Signature Error: %s" % e)
484  except Exception, e:
485  self.handle_error(500, "Internal Server Error: %s" % e)
486  traceback.print_exc()
487 
488  self.out_msg.setdefault('Code', str(self.out_msg.ok_code))
489  self.out_msg.setdefault('Diag', 'OK')
490  self.out_msg.init_for_sending()
491 
492  self.log.log(self.client_address, self.user, self.op, self.op_arg,
493  int(self.out_msg['Code']))
494 
495  msg_str = str(self.out_msg)
496  self.output.debug("sending: %s" % repr(msg_str))
497  self.wfile.write(msg_str)
498 
499 
500  def _really_handle(self):
501  """handle() without the exception handling"""
502 
503  self.output.debug("received: %s" % repr(self.packet))
504 
505  signed_msg = MacEnvelope(self.rfile)
506 
507  self.user = Username(signed_msg['User'])
508 
509  if self.user != pyzor.anonymous_user:
510  if self.server.passwd.has_key(self.user):
511  signed_msg.verify_sig(self.server.passwd[self.user])
512  else:
513  raise SignatureError, "unknown user"
514 
515  self.in_msg = signed_msg.get_submsg(pyzor.Request)
516 
517  self.msg_thread = self.in_msg.get_thread()
518 
519  # We take the int() of the proto versions because
520  # if the int()'s are the same, then they should be compatible
521  if int(self.in_msg.get_protocol_version()) != int(proto_version):
522  raise UnsupportedVersionError
523 
524  self.out_msg.set_thread(self.msg_thread)
525 
526  self.op = Opname(self.in_msg.get_op())
527  if not self.server.acl.allows(self.user, self.op):
528  raise AuthorizationError, "user is unauthorized to request the operation"
529 
530  self.output.debug("got a %s command from %s" %
531  (self.op, self.client_address))
532 
533 
534  if not self.dispatches.has_key(self.op):
535  raise NotImplementedError, "requested operation is not implemented"
536 
537  dispatch = self.dispatches[self.op]
538  if dispatch is not None:
539  apply(dispatch, (self,))
540 
541 
542  def handle_error(self, code, s):
543  self.out_msg = ErrorResponse(code, s)
544 
545  if self.msg_thread is None:
546  self.out_msg.set_thread(ThreadId(0))
547  else:
548  self.out_msg.set_thread(self.msg_thread)
549 
550 
551  def handle_check(self):
552  digest = self.in_msg['Op-Digest']
553  self.op_arg = digest
554  self.output.debug("request to check digest %s" % digest)
555 
556  db = DBHandle()
557  try:
558  rec = Record.from_str(db[digest])
559  r_count = rec.r_count
560  wl_count = rec.wl_count
561  except KeyError:
562  r_count = 0
563  wl_count = 0
564 
565  self.out_msg['Count'] = "%d" % r_count
566  self.out_msg['WL-Count'] = "%d" % wl_count
567 
568 
569  def handle_report(self):
570  digest = self.in_msg['Op-Digest']
571  self.op_arg = digest
572  self.output.debug("request to report digest %s" % digest)
573 
574  db = DBHandle()
575  try:
576  rec = Record.from_str(db[digest])
577  except KeyError:
578  rec = Record()
579  rec.r_increment()
580  db[digest] = str(rec)
581 
582 
583  def handle_whitelist(self):
584  digest = self.in_msg['Op-Digest']
585  self.op_arg = digest
586  self.output.debug("request to whitelist digest %s" % digest)
587 
588  db = DBHandle()
589  try:
590  rec = Record.from_str(db[digest])
591  except KeyError:
592  rec = Record()
593  rec.wl_increment()
594  db[digest] = str(rec)
595 
596 
597  def handle_info(self):
598  digest = self.in_msg['Op-Digest']
599  self.op_arg = digest
600  self.output.debug("request to check digest %s" % digest)
601 
602  db = DBHandle()
603  try:
604  record = Record.from_str(db[digest])
605  except KeyError:
606  record = Record()
607 
608  r_count = record.r_count
609  wl_count = record.wl_count
610 
611  self.out_msg['Entered'] = "%d" % record.r_entered
612  self.out_msg['Updated'] = "%d" % record.r_updated
613 
614  self.out_msg['WL-Entered'] = "%d" % record.wl_entered
615  self.out_msg['WL-Updated'] = "%d" % record.wl_updated
616 
617  self.out_msg['Count'] = "%d" % r_count
618  self.out_msg['WL-Count'] = "%d" % wl_count
619 
620 
621  dispatches = { 'check': handle_check,
622  'report': handle_report,
623  'ping': None,
624  'info': handle_info,
625  'whitelist': handle_whitelist,
626  }