"Fossies" - the Fresh Open Source Software Archive 
Member "pyzor-1.0.0/pyzor/server.py" (10 Dec 2014, 15777 Bytes) of package /linux/privat/pyzor-1.0.0.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "server.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
0.9.0_vs_1.0.0.
1 """Networked spam-signature detection server.
2
3 The server receives the request in the form of a RFC5321 message, and
4 responds with another RFC5321 message. Neither of these messages has a
5 body - all of the data is encapsulated in the headers.
6
7 The response headers will always include a "Code" header, which is a
8 HTTP-style response code, and a "Diag" header, which is a human-readable
9 message explaining the response code (typically this will be "OK").
10
11 Both the request and response headers always include a "PV" header, which
12 indicates the protocol version that is being used (in a major.minor format).
13 Both the requestion and response headers also always include a "Thread",
14 which uniquely identifies the request (this is a requirement of using UDP).
15 Responses to requests may arrive in any order, but the "Thread" header of
16 a response will always match the "Thread" header of the appropriate request.
17
18 Authenticated requests must also have "User", "Time" (timestamp), and "Sig"
19 (signature) headers.
20 """
21 import os
22 import sys
23 import time
24 import errno
25 import socket
26 import signal
27 import logging
28 import threading
29 import traceback
30 import email.message
31
32 try:
33 import SocketServer
34 except ImportError:
35 import socketserver as SocketServer
36
37 import pyzor.config
38 import pyzor.account
39 import pyzor.engines.common
40
41 import pyzor.hacks.py26
42
43
44 pyzor.hacks.py26.hack_all()
45
46
47 def _eintr_retry(func, *args):
48 """restart a system call interrupted by EINTR"""
49 while True:
50 try:
51 return func(*args)
52 except OSError as e:
53 if e.args[0] != errno.EINTR:
54 raise
55
56
57 class Server(SocketServer.UDPServer):
58 """The pyzord server. Handles incoming UDP connections in a single
59 thread and single process."""
60 max_packet_size = 8192
61 time_diff_allowance = 180
62
63 def __init__(self, address, database, passwd_fn, access_fn,
64 forwarder=None):
65 if ":" in address[0]:
66 Server.address_family = socket.AF_INET6
67 else:
68 Server.address_family = socket.AF_INET
69 self.log = logging.getLogger("pyzord")
70 self.usage_log = logging.getLogger("pyzord-usage")
71 self.database = database
72 self.one_step = getattr(self.database, "handles_one_step", False)
73
74 # Handle configuration files
75 self.passwd_fn = passwd_fn
76 self.access_fn = access_fn
77 self.accounts = {}
78 self.acl = {}
79 self.load_config()
80
81 self.forwarder = forwarder
82
83 self.log.debug("Listening on %s", address)
84 SocketServer.UDPServer.__init__(self, address, RequestHandler,
85 bind_and_activate=False)
86 try:
87 self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
88 except (AttributeError, socket.error) as e:
89 self.log.debug("Unable to set IPV6_V6ONLY to false %s", e)
90 self.server_bind()
91 self.server_activate()
92
93 # Finally, set signals
94 signal.signal(signal.SIGUSR1, self.reload_handler)
95 signal.signal(signal.SIGTERM, self.shutdown_handler)
96
97 def load_config(self):
98 """Reads the configuration files and loads the accounts and ACLs."""
99 self.accounts = pyzor.config.load_passwd_file(self.passwd_fn)
100 self.acl = pyzor.config.load_access_file(self.access_fn, self.accounts)
101
102 def shutdown_handler(self, *args, **kwargs):
103 """Handler for the SIGTERM signal. This should be used to kill the
104 daemon and ensure proper clean-up.
105 """
106 self.log.info("SIGTERM received. Shutting down.")
107 t = threading.Thread(target=self.shutdown)
108 t.start()
109
110 def reload_handler(self, *args, **kwargs):
111 """Handler for the SIGUSR1 signal. This should be used to reload
112 the configuration files.
113 """
114 self.log.info("SIGUSR1 received. Reloading configuration.")
115 t = threading.Thread(target=self.load_config)
116 t.start()
117
118 def handle_error(self, request, client_address):
119 self.log.error("Error while processing request from: %s",
120 client_address, exc_info=True)
121
122
123 class PreForkServer(Server):
124 """The same as Server, but prefork itself when starting the self, by
125 forking a number of child-processes.
126
127 The parent process will then wait for all his child process to complete.
128 """
129 def __init__(self, address, database, passwd_fn, access_fn, prefork=4):
130 """The same as Server.__init__ but requires a list of databases
131 instead of a single database connection.
132 """
133 self.pids = None
134 Server.__init__(self, address, database, passwd_fn, access_fn)
135 self._prefork = prefork
136
137 def serve_forever(self, poll_interval=0.5):
138 """Fork the current process and wait for all children to finish."""
139 pids = []
140 for dummy in xrange(self._prefork):
141 database = self.database.next()
142 pid = os.fork()
143 if not pid:
144 # Create the database in the child process, to prevent issues
145 self.database = database()
146 Server.serve_forever(self, poll_interval=poll_interval)
147 os._exit(0)
148 else:
149 pids.append(pid)
150 self.pids = pids
151 for pid in self.pids:
152 _eintr_retry(os.waitpid, pid, 0)
153
154 def shutdown(self):
155 """If this is the parent process send the TERM signal to all children,
156 else call the super method.
157 """
158 for pid in self.pids or ():
159 os.kill(pid, signal.SIGTERM)
160 if self.pids is None:
161 Server.shutdown(self)
162
163 def load_config(self):
164 """If this is the parent process send the USR1 signal to all children,
165 else call the super method.
166 """
167 for pid in self.pids or ():
168 os.kill(pid, signal.SIGUSR1)
169 if self.pids is None:
170 Server.load_config(self)
171
172
173 class ThreadingServer(SocketServer.ThreadingMixIn, Server):
174 """A threaded version of the pyzord server. Each connection is served
175 in a new thread. This may not be suitable for all database types."""
176 pass
177
178
179 class BoundedThreadingServer(ThreadingServer):
180 """Same as ThreadingServer but this also accepts a limited number of
181 concurrent threads.
182 """
183
184 def __init__(self, address, database, passwd_fn, access_fn, max_threads,
185 forwarding_server=None):
186 ThreadingServer.__init__(self, address, database, passwd_fn, access_fn,
187 forwarder=forwarding_server)
188 self.semaphore = threading.Semaphore(max_threads)
189
190 def process_request(self, request, client_address):
191 self.semaphore.acquire()
192 ThreadingServer.process_request(self, request, client_address)
193
194 def process_request_thread(self, request, client_address):
195 ThreadingServer.process_request_thread(self, request, client_address)
196 self.semaphore.release()
197
198
199 class ProcessServer(SocketServer.ForkingMixIn, Server):
200 """A multi-processing version of the pyzord server. Each connection is
201 served in a new process. This may not be suitable for all database types.
202 """
203
204 def __init__(self, address, database, passwd_fn, access_fn,
205 max_children=40, forwarding_server=None):
206 ProcessServer.max_children = max_children
207 Server.__init__(self, address, database, passwd_fn, access_fn,
208 forwarder=forwarding_server)
209
210
211 class RequestHandler(SocketServer.DatagramRequestHandler):
212 """Handle a single pyzord request."""
213
214 def __init__(self, *args, **kwargs):
215 self.response = email.message.Message()
216 SocketServer.DatagramRequestHandler.__init__(self, *args, **kwargs)
217
218 def handle(self):
219 """Handle a pyzord operation, cleanly handling any errors."""
220 self.response["Code"] = "200"
221 self.response["Diag"] = "OK"
222 self.response["PV"] = "%s" % pyzor.proto_version
223 try:
224 self._really_handle()
225 except NotImplementedError as e:
226 self.handle_error(501, "Not implemented: %s" % e)
227 except pyzor.UnsupportedVersionError as e:
228 self.handle_error(505, "Version Not Supported: %s" % e)
229 except pyzor.ProtocolError as e:
230 self.handle_error(400, "Bad request: %s" % e)
231 except pyzor.SignatureError as e:
232 self.handle_error(401, "Unauthorized: Signature Error: %s" % e)
233 except pyzor.AuthorizationError as e:
234 self.handle_error(403, "Forbidden: %s" % e)
235 except Exception as e:
236 self.handle_error(500, "Internal Server Error: %s" % e)
237 self.server.log.error(traceback.format_exc())
238 self.server.log.debug("Sending: %r", self.response.as_string())
239 self.wfile.write(self.response.as_string().encode("utf8"))
240
241 def _really_handle(self):
242 """handle() without the exception handling."""
243 self.server.log.debug("Received: %r", self.packet)
244
245 # Read the request.
246 # Old versions of the client sent a double \n after the signature,
247 # which screws up the RFC5321 format. Specifically handle that
248 # here - this could be removed in time.
249 request = email.message_from_bytes(
250 self.rfile.read().replace(b"\n\n", b"\n") + b"\n")
251
252 # Ensure that the response can be paired with the request.
253 self.response["Thread"] = request["Thread"]
254
255 # If this is an authenticated request, then check the authentication
256 # details.
257 user = request["User"] or pyzor.anonymous_user
258 if user != pyzor.anonymous_user:
259 try:
260 pyzor.account.verify_signature(request,
261 self.server.accounts[user])
262 except KeyError:
263 raise pyzor.SignatureError("Unknown user.")
264
265 if "PV" not in request:
266 raise pyzor.ProtocolError("Protocol Version not specified in "
267 "request")
268
269 # The protocol version is compatible if the major number is
270 # identical (changes in the minor number are unimportant).
271 try:
272 if int(float(request["PV"])) != int(pyzor.proto_version):
273 raise pyzor.UnsupportedVersionError()
274 except ValueError:
275 self.server.log.warn("Invalid PV: %s", request["PV"])
276 raise pyzor.ProtocolError("Invalid Protocol Version")
277
278 # Check that the user has permission to execute the requested
279 # operation.
280 opcode = request["Op"]
281 if opcode not in self.server.acl[user]:
282 raise pyzor.AuthorizationError(
283 "User is not authorized to request the operation.")
284 self.server.log.debug("Got a %s command from %s", opcode,
285 self.client_address[0])
286 # Get a handle to the appropriate method to execute this operation.
287 try:
288 dispatch = self.dispatches[opcode]
289 except KeyError:
290 raise NotImplementedError("Requested operation is not "
291 "implemented.")
292 # Get the existing record from the database (or a blank one if
293 # there is no matching record).
294 digests = request.get_all("Op-Digest")
295
296 # Do the requested operation, log what we have done, and return.
297 if dispatch and digests:
298 dispatch(self, digests)
299 self.server.usage_log.info("%s,%s,%s,%r,%s", user,
300 self.client_address[0], opcode, digests,
301 self.response["Code"])
302
303 def handle_error(self, code, message):
304 """Create an appropriate response for an error."""
305 self.server.usage_log.error("%s: %s", code, message)
306 self.response.replace_header("Code", "%d" % code)
307 self.response.replace_header("Diag", message)
308
309 def handle_pong(self, digests):
310 """Handle the 'pong' command.
311
312 This command returns maxint for report counts and 0 whitelist.
313 """
314 self.server.log.debug("Request pong for %s", digests[0])
315 self.response["Count"] = "%d" % sys.maxint
316 self.response["WL-Count"] = "%d" % 0
317
318 def handle_check(self, digests):
319 """Handle the 'check' command.
320
321 This command returns the spam/ham counts for the specified digest.
322 """
323 digest = digests[0]
324 try:
325 record = self.server.database[digest]
326 except KeyError:
327 record = pyzor.engines.common.Record()
328 self.server.log.debug("Request to check digest %s", digest)
329 self.response["Count"] = "%d" % record.r_count
330 self.response["WL-Count"] = "%d" % record.wl_count
331
332 def handle_report(self, digests):
333 """Handle the 'report' command in a single step.
334
335 This command increases the spam count for the specified digests."""
336 self.server.log.debug("Request to report digests %s", digests)
337 if self.server.one_step:
338 self.server.database.report(digests)
339 else:
340 for digest in digests:
341 try:
342 record = self.server.database[digest]
343 except KeyError:
344 record = pyzor.engines.common.Record()
345 record.r_increment()
346 self.server.database[digest] = record
347 if self.server.forwarder:
348 for digest in digests:
349 self.server.forwarder.queue_forward_request(digest)
350
351 def handle_whitelist(self, digests):
352 """Handle the 'whitelist' command in a single step.
353
354 This command increases the ham count for the specified digests."""
355 self.server.log.debug("Request to whitelist digests %s", digests)
356 if self.server.one_step:
357 self.server.database.whitelist(digests)
358 else:
359 for digest in digests:
360 try:
361 record = self.server.database[digest]
362 except KeyError:
363 record = pyzor.engines.common.Record()
364 record.wl_increment()
365 self.server.database[digest] = record
366 if self.server.forwarder:
367 for digest in digests:
368 self.server.forwarder.queue_forward_request(digest, True)
369
370 def handle_info(self, digests):
371 """Handle the 'info' command.
372
373 This command returns diagnostic data about a digest (timestamps for
374 when the digest was first/last seen as spam/ham, and spam/ham
375 counts).
376 """
377 digest = digests[0]
378 try:
379 record = self.server.database[digest]
380 except KeyError:
381 record = pyzor.engines.common.Record()
382 self.server.log.debug("Request for information about digest %s",
383 digest)
384
385 def time_output(time_obj):
386 """Convert a datetime object to a POSIX timestamp.
387
388 If the object is None, then return 0.
389 """
390 if not time_obj:
391 return 0
392 return time.mktime(time_obj.timetuple())
393
394 self.response["Entered"] = "%d" % time_output(record.r_entered)
395 self.response["Updated"] = "%d" % time_output(record.r_updated)
396 self.response["WL-Entered"] = "%d" % time_output(record.wl_entered)
397 self.response["WL-Updated"] = "%d" % time_output(record.wl_updated)
398 self.response["Count"] = "%d" % record.r_count
399 self.response["WL-Count"] = "%d" % record.wl_count
400
401 dispatches = {
402 'ping': None,
403 'pong': handle_pong,
404 'info': handle_info,
405 'check': handle_check,
406 'report': handle_report,
407 'whitelist': handle_whitelist,
408 }