"Fossies" - the Fresh Open Source Software Archive 
Member "archivemail-0.9.0/test_archivemail" (9 Jul 2011, 68583 Bytes) of package /linux/privat/old/archivemail-0.9.0.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
1 #! /usr/bin/env python
2 ############################################################################
3 # Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
4 # (C) 2006-2011 Nikolaus Schulz <microschulz@web.de>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 ############################################################################
20 """
21 Unit-test archivemail using 'PyUnit'.
22
23 TODO: add tests for:
24 * dotlock locks already existing
25 * archiving MH-format mailboxes
26 * a 3rd party process changing the mbox file being read
27
28 """
29
30 import sys
31
32 def check_python_version():
33 """Abort if we are running on python < v2.3"""
34 too_old_error = "This test script requires python version 2.3 or later. " + \
35 "Your version of python is:\n%s" % sys.version
36 try:
37 version = sys.version_info # we might not even have this function! :)
38 if (version[0] < 2) or (version[0] == 2 and version[1] < 3):
39 print too_old_error
40 sys.exit(1)
41 except AttributeError:
42 print too_old_error
43 sys.exit(1)
44
45 # define & run this early because 'unittest' requires Python >= 2.1
46 check_python_version()
47
48 import copy
49 import fcntl
50 import filecmp
51 import os
52 import re
53 import shutil
54 import stat
55 import tempfile
56 import time
57 import unittest
58 import gzip
59 import cStringIO
60 import rfc822
61 import mailbox
62
63 from types import ModuleType
64 archivemail = ModuleType("archivemail")
65 try:
66 module_fp = open("archivemail", "r")
67 except IOError:
68 print "The archivemail script should be in the current directory in order"
69 print "to be imported and tested. Sorry."
70 sys.exit(1)
71 exec module_fp in archivemail.__dict__
72
73 # We want to iterate over messages in a compressed archive mbox and verify
74 # them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in
75 # Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered
76 # by mailbox._PartialFile.seek(). The bug is still pending as of Python
77 # 2.5.2. To work around it, we subclass gzip.GzipFile.
78 #
79 # It should be noted that seeking backwards in a GzipFile is emulated by
80 # re-reading the entire file from the beginning, which is extremely
81 # inefficient and won't work with large files; but our test archives are all
82 # small, so it's okay.
83
84 class FixedGzipFile(gzip.GzipFile):
85 """GzipFile with seek method accepting whence parameter."""
86 def seek(self, offset, whence=0):
87 try:
88 # Try calling gzip.GzipFile.seek with the whence parameter.
89 # For Python >= 2.7, it returns the new offset; pass that on.
90 return gzip.GzipFile.seek(self, offset, whence)
91 except TypeError:
92 if whence:
93 if whence == 1:
94 offset = self.offset + offset
95 else:
96 raise ValueError('Seek from end not supported')
97 return gzip.GzipFile.seek(self, offset)
98
99 # precision of os.utime() when restoring mbox timestamps
100 utimes_precision = 5
101
102 class MessageIdFactory:
103 """Factory to create `uniqe' message-ids."""
104 def __init__(self):
105 self.seq = 0
106 def __call__(self):
107 self.seq += 1
108 return "<archivemail%d@localhost>" % self.seq
109
110 make_msgid = MessageIdFactory()
111
112 class IndexedMailboxDir:
113 """An indexed mailbox directory, providing random message access by
114 message-id. Intended as a base class for a maildir and an mh subclass."""
115
116 def __init__(self, mdir_name):
117 assert tempfile.tempdir
118 self.root = tempfile.mkdtemp(prefix=mdir_name)
119 self.msg_id_dict = {}
120 self.deliveries = 0
121
122 def _add_to_index(self, msg_text, fpath):
123 """Add the given message to the index, for later random access."""
124 # Extract the message-id as index key
125 msg_id = None
126 fp = cStringIO.StringIO(msg_text)
127 while True:
128 line = fp.readline()
129 # line empty means we didn't find a message-id
130 assert line
131 if line.lower().startswith("message-id:"):
132 msg_id = line.split(":", 1)[-1].strip()
133 assert msg_id
134 break
135 assert not self.msg_id_dict.has_key(msg_id)
136 self.msg_id_dict[msg_id] = fpath
137
138 def get_all_filenames(self):
139 """Return all relative pathnames of files in this mailbox."""
140 return self.msg_id_dict.values()
141
142 class SimpleMaildir(IndexedMailboxDir):
143 """Primitive Maildir class, just good enough for generating short-lived
144 test maildirs."""
145
146 def __init__(self, mdir_name='maildir'):
147 IndexedMailboxDir.__init__(self, mdir_name)
148 for d in "cur", "tmp", "new":
149 os.mkdir(os.path.join(self.root, d))
150
151 def write(self, msg_str, new=True, flags=[]):
152 """Store a message with the given flags."""
153 assert not (new and flags)
154 if new:
155 subdir = "new"
156 else:
157 subdir = "cur"
158 fname = self._mkname(new, flags)
159 relpath = os.path.join(subdir, fname)
160 path = os.path.join(self.root, relpath)
161 assert not os.path.exists(path)
162 f = open(path, "w")
163 f.write(msg_str)
164 f.close()
165 self._add_to_index(msg_str, relpath)
166
167 def _mkname(self, new, flags):
168 """Generate a unique filename for a new message."""
169 validflags = 'DFPRST'
170 for f in flags:
171 assert f in validflags
172 # This 'unique' name should be good enough, since nobody else
173 # will ever write messages to this maildir folder.
174 uniq = str(self.deliveries)
175 self.deliveries += 1
176 if new:
177 return uniq
178 if not flags:
179 return uniq + ':2,'
180 finfo = "".join(sorted(flags))
181 return uniq + ':2,' + finfo
182
183 def get_message_and_mbox_status(self, msgid):
184 """For the Message-Id msgid, return the matching message in text
185 format and its status, expressed as a set of mbox flags."""
186 fpath = self.msg_id_dict[msgid] # Barfs if not found
187 mdir_flags = fpath.rsplit('2,', 1)[-1]
188 flagmap = {
189 'F': 'F',
190 'R': 'A',
191 'S': 'R'
192 }
193 mbox_flags = set([flagmap[x] for x in mdir_flags])
194 if fpath.startswith("cur/"):
195 mbox_flags.add('O')
196 fp = open(os.path.join(self.root, fpath), "r")
197 msg = fp.read()
198 fp.close()
199 return msg, mbox_flags
200
201
202 class TestCaseInTempdir(unittest.TestCase):
203 """Base class for testcases that need to create temporary files.
204 All testcases that create temporary files should be derived from this
205 class, not directly from unittest.TestCase.
206 TestCaseInTempdir provides these methods:
207
208 setUp() Creates a safe temporary directory and sets tempfile.tempdir.
209
210 tearDown() Recursively removes the temporary directory and unsets
211 tempfile.tempdir.
212
213 Overriding methods should call the ones above."""
214 temproot = None
215
216 def setUp(self):
217 if not self.temproot:
218 assert not tempfile.tempdir
219 self.temproot = tempfile.tempdir = \
220 tempfile.mkdtemp(prefix="test-archivemail")
221
222 def tearDown(self):
223 assert tempfile.tempdir == self.temproot
224 if self.temproot:
225 shutil.rmtree(self.temproot)
226 tempfile.tempdir = self.temproot = None
227
228
229 ############ Mbox Class testing ##############
230
231 class TestMboxDotlock(TestCaseInTempdir):
232 def setUp(self):
233 super(TestMboxDotlock, self).setUp()
234 self.mbox_name = make_mbox()
235 self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
236 self.mbox = archivemail.Mbox(self.mbox_name)
237
238 def testDotlock(self):
239 """dotlock_lock/unlock should create/delete a lockfile"""
240 lock = self.mbox_name + ".lock"
241 self.mbox._dotlock_lock()
242 assert os.path.isfile(lock)
243 self.mbox._dotlock_unlock()
244 assert not os.path.isfile(lock)
245
246 def testDotlockingSucceedsUponEACCES(self):
247 """A dotlock should silently be omitted upon EACCES."""
248 archivemail.options.quiet = True
249 mbox_dir = os.path.dirname(self.mbox_name)
250 os.chmod(mbox_dir, 0500)
251 try:
252 self.mbox._dotlock_lock()
253 self.mbox._dotlock_unlock()
254 finally:
255 os.chmod(mbox_dir, 0700)
256 archivemail.options.quiet = False
257
258 class TestMboxPosixLock(TestCaseInTempdir):
259 def setUp(self):
260 super(TestMboxPosixLock, self).setUp()
261 self.mbox_name = make_mbox()
262 self.mbox = archivemail.Mbox(self.mbox_name)
263
264 def testPosixLock(self):
265 """posix_lock/unlock should create/delete an advisory lock"""
266
267 # The following code snippet heavily lends from the Python 2.5 mailbox
268 # unittest.
269 # BEGIN robbery:
270
271 # Fork off a subprocess that will lock the file for 2 seconds,
272 # unlock it, and then exit.
273 pid = os.fork()
274 if pid == 0:
275 # In the child, lock the mailbox.
276 self.mbox._posix_lock()
277 time.sleep(2)
278 self.mbox._posix_unlock()
279 os._exit(0)
280
281 # In the parent, sleep a bit to give the child time to acquire
282 # the lock.
283 time.sleep(0.5)
284 # The parent's file self.mbox.mbox_file shares fcntl locks with the
285 # duplicated FD in the child; reopen it so we get a different file
286 # table entry.
287 file = open(self.mbox_name, "r+")
288 lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB
289 fd = file.fileno()
290 try:
291 self.assertRaises(IOError, fcntl.lockf, fd, lock_nb)
292
293 finally:
294 # Wait for child to exit. Locking should now succeed.
295 exited_pid, status = os.waitpid(pid, 0)
296
297 fcntl.lockf(fd, lock_nb)
298 fcntl.lockf(fd, fcntl.LOCK_UN)
299 # END robbery
300
301
302 class TestMboxNext(TestCaseInTempdir):
303 def setUp(self):
304 super(TestMboxNext, self).setUp()
305 self.not_empty_name = make_mbox(messages=18)
306 self.empty_name = make_mbox(messages=0)
307
308 def testNextEmpty(self):
309 """mbox.next() should return None on an empty mailbox"""
310 mbox = archivemail.Mbox(self.empty_name)
311 msg = mbox.next()
312 self.assertEqual(msg, None)
313
314 def testNextNotEmpty(self):
315 """mbox.next() should a message on a populated mailbox"""
316 mbox = archivemail.Mbox(self.not_empty_name)
317 for count in range(18):
318 msg = mbox.next()
319 assert msg
320 msg = mbox.next()
321 self.assertEqual(msg, None)
322
323
324 ############ TempMbox Class testing ##############
325
326 class TestTempMboxWrite(TestCaseInTempdir):
327 def setUp(self):
328 super(TestTempMboxWrite, self).setUp()
329
330 def testWrite(self):
331 """mbox.write() should append messages to a mbox mailbox"""
332 read_file = make_mbox(messages=3)
333 mbox_read = archivemail.Mbox(read_file)
334 mbox_write = archivemail.TempMbox()
335 write_file = mbox_write.mbox_file_name
336 for count in range(3):
337 msg = mbox_read.next()
338 mbox_write.write(msg)
339 mbox_read.close()
340 mbox_write.close()
341 assert filecmp.cmp(read_file, write_file, shallow=0)
342
343 def testWriteNone(self):
344 """calling mbox.write() with no message should raise AssertionError"""
345 write = archivemail.TempMbox()
346 self.assertRaises(AssertionError, write.write, None)
347
348 class TestTempMboxRemove(TestCaseInTempdir):
349 def setUp(self):
350 super(TestTempMboxRemove, self).setUp()
351 self.mbox = archivemail.TempMbox()
352 self.mbox_name = self.mbox.mbox_file_name
353
354 def testMboxRemove(self):
355 """remove() should delete a mbox mailbox"""
356 assert os.path.exists(self.mbox_name)
357 self.mbox.remove()
358 assert not os.path.exists(self.mbox_name)
359
360
361
362 ########## options class testing #################
363
364 class TestOptionDefaults(unittest.TestCase):
365 def testVerbose(self):
366 """verbose should be off by default"""
367 self.assertEqual(archivemail.options.verbose, False)
368
369 def testDaysOldMax(self):
370 """default archival time should be 180 days"""
371 self.assertEqual(archivemail.options.days_old_max, 180)
372
373 def testQuiet(self):
374 """quiet should be off by default"""
375 self.assertEqual(archivemail.options.quiet, False)
376
377 def testDeleteOldMail(self):
378 """we should not delete old mail by default"""
379 self.assertEqual(archivemail.options.delete_old_mail, False)
380
381 def testNoCompress(self):
382 """no-compression should be off by default"""
383 self.assertEqual(archivemail.options.no_compress, False)
384
385 def testIncludeFlagged(self):
386 """we should not archive flagged messages by default"""
387 self.assertEqual(archivemail.options.include_flagged, False)
388
389 def testPreserveUnread(self):
390 """we should not preserve unread messages by default"""
391 self.assertEqual(archivemail.options.preserve_unread, False)
392
393 class TestOptionParser(unittest.TestCase):
394 def setUp(self):
395 self.oldopts = copy.copy(archivemail.options)
396
397 def testOptionDate(self):
398 """--date and -D options are parsed correctly"""
399 date_formats = (
400 "%Y-%m-%d", # ISO format
401 "%d %b %Y" , # Internet format
402 "%d %B %Y" , # Internet format with full month names
403 )
404 date = time.strptime("2000-07-29", "%Y-%m-%d")
405 unixdate = time.mktime(date)
406 for df in date_formats:
407 d = time.strftime(df, date)
408 for opt in '-D', '--date=':
409 archivemail.options.date_old_max = None
410 archivemail.options.parse_args([opt+d], "")
411 self.assertEqual(unixdate, archivemail.options.date_old_max)
412
413 def testOptionPreserveUnread(self):
414 """--preserve-unread option is parsed correctly"""
415 archivemail.options.parse_args(["--preserve-unread"], "")
416 assert archivemail.options.preserve_unread
417 archivemail.options.preserve_unread = False
418 archivemail.options.parse_args(["-u"], "")
419 assert archivemail.options.preserve_unread
420
421 def testOptionSuffix(self):
422 """--suffix and -s options are parsed correctly"""
423 for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
424 archivemail.options.parse_args(["--suffix="+suffix], "")
425 self.assertEqual(archivemail.options.archive_suffix, suffix)
426 archivemail.options.archive_suffix = None
427 archivemail.options.parse_args(["-s", suffix], "")
428 self.assertEqual(archivemail.options.archive_suffix, suffix)
429
430 def testOptionPrefix(self):
431 """--prefix and -p options are parsed correctly"""
432 for prefix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
433 archivemail.options.parse_args(["--prefix="+prefix], "")
434 self.assertEqual(archivemail.options.archive_prefix, prefix)
435 archivemail.options.archive_prefix = None
436 archivemail.options.parse_args(["-p", prefix], "")
437 self.assertEqual(archivemail.options.archive_prefix, prefix)
438
439 def testOptionArchivename(self):
440 """--archive-name and -a options are parsed correctly"""
441 for name in ("custom", ".withdot", "custom_%Y", "%Y/joe"):
442 archivemail.options.parse_args(["--archive-name="+name], "")
443 self.assertEqual(archivemail.options.archive_name, name)
444 archivemail.options.archive_name = None
445 archivemail.options.parse_args(["-a", name], "")
446 self.assertEqual(archivemail.options.archive_name, name)
447
448 def testOptionDryrun(self):
449 """--dry-run option is parsed correctly"""
450 archivemail.options.parse_args(["--dry-run"], "")
451 assert archivemail.options.dry_run
452 archivemail.options.preserve_unread = False
453 archivemail.options.parse_args(["-n"], "")
454 assert archivemail.options.dry_run
455
456 def testOptionDays(self):
457 """--days and -d options are parsed correctly"""
458 archivemail.options.parse_args(["--days=11"], "")
459 self.assertEqual(archivemail.options.days_old_max, 11)
460 archivemail.options.days_old_max = None
461 archivemail.options.parse_args(["-d11"], "")
462 self.assertEqual(archivemail.options.days_old_max, 11)
463
464 def testOptionDelete(self):
465 """--delete option is parsed correctly"""
466 archivemail.options.parse_args(["--delete"], "")
467 assert archivemail.options.delete_old_mail
468
469 def testOptionCopy(self):
470 """--copy option is parsed correctly"""
471 archivemail.options.parse_args(["--copy"], "")
472 assert archivemail.options.copy_old_mail
473
474 def testOptionOutputdir(self):
475 """--output-dir and -o options are parsed correctly"""
476 for path in "/just/some/path", "relative/path":
477 archivemail.options.parse_args(["--output-dir=%s" % path], "")
478 self.assertEqual(archivemail.options.output_dir, path)
479 archivemail.options.output_dir = None
480 archivemail.options.parse_args(["-o%s" % path], "")
481 self.assertEqual(archivemail.options.output_dir, path)
482
483 def testOptionNocompress(self):
484 """--no-compress option is parsed correctly"""
485 archivemail.options.parse_args(["--no-compress"], "")
486 assert archivemail.options.no_compress
487
488 def testOptionSize(self):
489 """--size and -S options are parsed correctly"""
490 size = "666"
491 archivemail.options.parse_args(["--size=%s" % size ], "")
492 self.assertEqual(archivemail.options.min_size, int(size))
493 archivemail.options.parse_args(["-S%s" % size ], "")
494 self.assertEqual(archivemail.options.min_size, int(size))
495
496 def tearDown(self):
497 archivemail.options = self.oldopts
498
499 ########## archivemail.is_older_than_days() unit testing #################
500
501 class TestIsTooOld(unittest.TestCase):
502 def testVeryOld(self):
503 """with max_days=360, should be true for these dates > 1 year"""
504 for years in range(1, 10):
505 time_msg = time.time() - (years * 365 * 24 * 60 * 60)
506 assert archivemail.is_older_than_days(time_message=time_msg,
507 max_days=360)
508
509 def testOld(self):
510 """with max_days=14, should be true for these dates > 14 days"""
511 for days in range(14, 360):
512 time_msg = time.time() - (days * 24 * 60 * 60)
513 assert archivemail.is_older_than_days(time_message=time_msg,
514 max_days=14)
515
516 def testJustOld(self):
517 """with max_days=1, should be true for these dates >= 1 day"""
518 for minutes in range(0, 61):
519 time_msg = time.time() - (25 * 60 * 60) + (minutes * 60)
520 assert archivemail.is_older_than_days(time_message=time_msg,
521 max_days=1)
522
523 def testNotOld(self):
524 """with max_days=9, should be false for these dates < 9 days"""
525 for days in range(0, 9):
526 time_msg = time.time() - (days * 24 * 60 * 60)
527 assert not archivemail.is_older_than_days(time_message=time_msg,
528 max_days=9)
529
530 def testJustNotOld(self):
531 """with max_days=1, should be false for these hours <= 1 day"""
532 for minutes in range(0, 60):
533 time_msg = time.time() - (23 * 60 * 60) - (minutes * 60)
534 assert not archivemail.is_older_than_days(time_message=time_msg,
535 max_days=1)
536
537 def testFuture(self):
538 """with max_days=1, should be false for times in the future"""
539 for minutes in range(0, 60):
540 time_msg = time.time() + (minutes * 60)
541 assert not archivemail.is_older_than_days(time_message=time_msg,
542 max_days=1)
543
544 ########## archivemail.parse_imap_url() unit testing #################
545
546 class TestParseIMAPUrl(unittest.TestCase):
547 def setUp(self):
548 archivemail.options.quiet = True
549 archivemail.options.verbose = False
550 archivemail.options.pwfile = None
551
552 urls_withoutpass = [
553 ('imap://user@example.org@imap.example.org/upperbox/lowerbox',
554 ('user', None, 'example.org@imap.example.org', 143,
555 'upperbox/lowerbox')),
556 ('imap://"user@example.org"@imap.example.org/upperbox/lowerbox',
557 ('user@example.org', None, 'imap.example.org', 143,
558 'upperbox/lowerbox')),
559 ('imap://user@example.org"@imap.example.org/upperbox/lowerbox',
560 ('user', None, 'example.org"@imap.example.org', 143,
561 'upperbox/lowerbox')),
562 ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox',
563 ('"user', None, 'example.org@imap.example.org', 993,
564 'upperbox/lowerbox')),
565 ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox',
566 ('us"er@example.org', None, 'imap.example.org', 993,
567 'upperbox/lowerbox')),
568 ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox',
569 ('user\\', None, 'example.org@imap.example.org', 993,
570 'upperbox/lowerbox'))
571 ]
572 urls_withpass = [
573 ('imap://user@example.org:passwd@imap.example.org/upperbox/lowerbox',
574 ('user@example.org', 'passwd', 'imap.example.org', 143,
575 'upperbox/lowerbox')),
576 ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox',
577 ('"user@example.org', "passwd", 'imap.example.org', 993,
578 'upperbox/lowerbox')),
579 ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox',
580 ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org', 993,
581 'upperbox/lowerbox'))
582 ]
583 # These are invalid when the password's not stripped.
584 urls_onlywithpass = [
585 ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox',
586 ('user@example.org', "passwd", 'imap.example.org',
587 'upperbox/lowerbox'))
588 ]
589 def testUrlsWithoutPwfile(self):
590 """Parse test urls with --pwfile option unset. This parses a password in
591 the URL, if present."""
592 archivemail.options.pwfile = None
593 for mbstr in self.urls_withpass + self.urls_withoutpass:
594 url = mbstr[0]
595 result = archivemail.parse_imap_url(url)
596 self.assertEqual(result, mbstr[1])
597
598 def testUrlsWithPwfile(self):
599 """Parse test urls with --pwfile set. In this case the ':' character
600 loses its meaning as a delimiter."""
601 archivemail.options.pwfile = "whocares.txt"
602 for mbstr in self.urls_onlywithpass:
603 url = mbstr[0]
604 self.assertRaises(archivemail.UnexpectedError,
605 archivemail.parse_imap_url, url)
606
607 def testUrlsDefaultPorts(self):
608 """If an IMAP URL does not specify a server port, the standard ports
609 are used."""
610 archivemail.options.pwfile = "doesnotexist.txt"
611 self.assertEqual(143, archivemail.parse_imap_url("imap://user@host/box")[3])
612 self.assertEqual(993, archivemail.parse_imap_url("imaps://user@host/box")[3])
613
614 def testUrlsWithPassAndPortnumber(self):
615 """IMAP URLs with an embedded password and a server port number are
616 correctly parsed."""
617 self.assertEqual(1234, archivemail.parse_imap_url("imap://user:pass@host:1234/box")[3])
618 self.assertEqual(1234, archivemail.parse_imap_url("imap://user:pass@host:1234/box")[3])
619
620 def tearDown(self):
621 archivemail.options.quiet = False
622 archivemail.options.verbose = False
623 archivemail.options.pwfile = None
624
625 ########## quoting and un-quoting of IMAP strings ##########
626
627 class TestIMAPQuoting(unittest.TestCase):
628 stringlist = (
629 ('{braces} and space', '"{braces} and space"'),
630 ('\\backslash', '"\\\\backslash"'),
631 ('with "quotes" inbetween', '"with \\"quotes\\" inbetween"'),
632 ('ending with "quotes"', '"ending with \\"quotes\\""'),
633 ('\\"backslash before quote', '"\\\\\\"backslash before quote"')
634 )
635
636 def testQuote(self):
637 for unquoted, quoted in self.stringlist:
638 self.assertEqual(archivemail.imap_quote(unquoted), quoted)
639
640 def testUnquote(self):
641 for unquoted, quoted in self.stringlist:
642 self.assertEqual(unquoted, archivemail.imap_unquote(quoted))
643
644
645 ########## Modified UTF-7 support functions ##########
646
647 class TestModUTF7(unittest.TestCase):
648 goodpairs = (
649 (u"A\N{NOT IDENTICAL TO}A.", "A&ImI-A."),
650 (u"Hi Mom -\N{WHITE SMILING FACE}-!", "Hi Mom -&Jjo--!"),
651 (u"~peter/mail/\u53f0\u5317/\u65e5\u672c\u8a9e",
652 "~peter/mail/&U,BTFw-/&ZeVnLIqe-")
653 )
654
655 def testEncode(self):
656 """Ensure that encoding text in modified UTF-7 works properly."""
657 for text, code in self.goodpairs:
658 self.assertEqual(archivemail.mod_utf7_encode(text), code)
659
660 def testDecode(self):
661 """Ensure that decoding modified UTF-7 to text works properly."""
662 for text, code in self.goodpairs:
663 self.assertEqual(archivemail.mod_utf7_decode(code), text)
664
665
666 ########## acceptance testing ###########
667
668 class TestArchive(TestCaseInTempdir):
669 """Base class defining helper functions for doing test archiving runs."""
670 mbox = None # mbox file that will be processed by archivemail
671 good_archive = None # Uncompressed reference archive file to verify the
672 # archive after processing
673 good_mbox = None # Reference mbox file to verify the mbox after processing
674
675 def verify(self):
676 assert os.path.exists(self.mbox)
677 if self.good_mbox is not None:
678 assertEqualContent(self.mbox, self.good_mbox)
679 else:
680 self.assertEqual(os.path.getsize(self.mbox), 0)
681 archive_name = self.mbox + "_archive"
682 if not archivemail.options.no_compress:
683 archive_name += ".gz"
684 iszipped = True
685 else:
686 assert not os.path.exists(archive_name + ".gz")
687 iszipped = False
688 if self.good_archive is not None:
689 assertEqualContent(archive_name, self.good_archive, iszipped)
690 else:
691 assert not os.path.exists(archive_name)
692
693 def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
694 """Prepare for a test run with an old mbox by making an old mbox,
695 optionally an existing archive, and a reference archive to verify the
696 archive after archivemail has run."""
697 self.mbox = make_mbox(body, headers, 181*24, messages)
698 archive_does_change = not (archivemail.options.dry_run or
699 archivemail.options.delete_old_mail)
700 mbox_does_not_change = archivemail.options.dry_run or \
701 archivemail.options.copy_old_mail
702 if make_old_archive:
703 archive = archivemail.make_archive_name(self.mbox)
704 self.good_archive = make_archive_and_plain_copy(archive)
705 if archive_does_change:
706 append_file(self.mbox, self.good_archive)
707 elif archive_does_change:
708 self.good_archive = tempfile.mkstemp()[1]
709 shutil.copyfile(self.mbox, self.good_archive)
710 if mbox_does_not_change:
711 if archive_does_change and not make_old_archive:
712 self.good_mbox = self.good_archive
713 else:
714 self.good_mbox = tempfile.mkstemp()[1]
715 shutil.copyfile(self.mbox, self.good_mbox)
716
717 def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
718 """Prepare for a test run with a mixed mbox by making a mixed mbox,
719 optionally an existing archive, a reference archive to verify the
720 archive after archivemail has run, and likewise a reference mbox to
721 verify the mbox."""
722 self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive)
723 new_mbox_name = make_mbox(body, headers, 179*24, messages)
724 append_file(new_mbox_name, self.mbox)
725 if self.good_mbox is None:
726 self.good_mbox = new_mbox_name
727 else:
728 if self.good_mbox == self.good_archive:
729 self.good_mbox = tempfile.mkstemp()[1]
730 shutil.copyfile(self.mbox, self.good_mbox)
731 else:
732 append_file(new_mbox_name, self.good_mbox)
733
734 def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
735 """Prepare for a test run with a new mbox by making a new mbox,
736 optionally an exiting archive, and a reference mbox to verify the mbox
737 after archivemail has run."""
738 self.mbox = make_mbox(body, headers, 179*24, messages)
739 self.good_mbox = tempfile.mkstemp()[1]
740 shutil.copyfile(self.mbox, self.good_mbox)
741 if make_old_archive:
742 archive = archivemail.make_archive_name(self.mbox)
743 self.good_archive = make_archive_and_plain_copy(archive)
744
745
746 class TestArchiveMbox(TestArchive):
747 """archiving should work based on the date of messages given"""
748
749 def setUp(self):
750 self.oldopts = copy.copy(archivemail.options)
751 archivemail.options.quiet = True
752 super(TestArchiveMbox, self).setUp()
753
754 def testOld(self):
755 """archiving an old mailbox"""
756 self.make_old_mbox(messages=3)
757 archivemail.archive(self.mbox)
758 self.verify()
759
760 def testOldFromInBody(self):
761 """archiving an old mailbox with 'From ' in the body"""
762 body = """This is a message with ^From at the start of a line
763 From is on this line
764 This is after the ^From line"""
765 self.make_old_mbox(messages=3, body=body)
766 archivemail.archive(self.mbox)
767 self.verify()
768
769 def testDateSystem(self):
770 """test that the --date option works as expected"""
771 test_headers = (
772 {
773 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
774 'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
775 },
776 {
777 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
778 'Date' : None,
779 },
780 {
781 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
782 'Date' : None,
783 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
784 },
785 {
786 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
787 'Date' : None,
788 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
789 },
790 )
791 for headers in test_headers:
792 msg = make_message(default_headers=headers, wantobj=True)
793 date = time.strptime("2000-07-29", "%Y-%m-%d")
794 archivemail.options.date_old_max = time.mktime(date)
795 assert archivemail.should_archive(msg)
796 date = time.strptime("2000-07-27", "%Y-%m-%d")
797 archivemail.options.date_old_max = time.mktime(date)
798 assert not archivemail.should_archive(msg)
799
800 def testMixed(self):
801 """archiving a mixed mailbox"""
802 self.make_mixed_mbox(messages=3)
803 archivemail.archive(self.mbox)
804 self.verify()
805
806 def testNew(self):
807 """archiving a new mailbox"""
808 self.make_new_mbox(messages=3)
809 archivemail.archive(self.mbox)
810 self.verify()
811
812 def testOldExisting(self):
813 """archiving an old mailbox with an existing archive"""
814 self.make_old_mbox(messages=3, make_old_archive=True)
815 archivemail.archive(self.mbox)
816 self.verify()
817
818 def testOldWeirdHeaders(self):
819 """archiving old mailboxes with weird headers"""
820 weird_headers = (
821 { # we should archive because of the date on the 'From_' line
822 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
823 'Date' : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000',
824 },
825 { # we should archive because of the date on the 'From_' line
826 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
827 'Date' : None,
828 },
829 { # we should archive because of the date in 'Delivery-date'
830 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
831 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
832 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
833 },
834 { # we should archive because of the date in 'Delivery-date'
835 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
836 'Date' : None,
837 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
838 },
839 { # we should archive because of the date in 'Resent-Date'
840 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
841 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
842 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
843 },
844 { # we should archive because of the date in 'Resent-Date'
845 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
846 'Date' : None,
847 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
848 },
849 { # completely blank dates were crashing < version 0.4.7
850 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
851 'Date' : '',
852 },
853 { # completely blank dates were crashing < version 0.4.7
854 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
855 'Date' : '',
856 'Resent-Date' : '',
857 },
858 )
859 fd, self.mbox = tempfile.mkstemp()
860 fp = os.fdopen(fd, "w")
861 for headers in weird_headers:
862 msg_text = make_message(default_headers=headers)
863 fp.write(msg_text*2)
864 fp.close()
865 self.good_archive = tempfile.mkstemp()[1]
866 shutil.copyfile(self.mbox, self.good_archive)
867 archivemail.archive(self.mbox)
868 self.verify()
869
870 def tearDown(self):
871 archivemail.options = self.oldopts
872 super(TestArchiveMbox, self).tearDown()
873
874
875 class TestArchiveMboxTimestamp(TestCaseInTempdir):
876 """original mbox timestamps should always be preserved"""
877 def setUp(self):
878 super(TestArchiveMboxTimestamp, self).setUp()
879 archivemail.options.quiet = True
880 self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180))
881 self.mtime = os.path.getmtime(self.mbox_name) - 66
882 self.atime = os.path.getatime(self.mbox_name) - 88
883 os.utime(self.mbox_name, (self.atime, self.mtime))
884
885 def testNew(self):
886 """mbox timestamps should not change after no archival"""
887 archivemail.options.days_old_max = 181
888 archivemail.archive(self.mbox_name)
889 self.verify()
890
891 def testOld(self):
892 """mbox timestamps should not change after archival"""
893 archivemail.options.days_old_max = 179
894 archivemail.archive(self.mbox_name)
895 self.verify()
896
897 def verify(self):
898 assert os.path.exists(self.mbox_name)
899 new_atime = os.path.getatime(self.mbox_name)
900 new_mtime = os.path.getmtime(self.mbox_name)
901 self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision)
902 self.assertAlmostEqual(self.atime, new_atime, utimes_precision)
903
904 def tearDown(self):
905 archivemail.options.quiet = False
906 archivemail.options.days_old_max = 180
907 os.remove(self.mbox_name)
908 super(TestArchiveMboxTimestamp, self).tearDown()
909
910
911 class TestArchiveMboxAll(unittest.TestCase):
912 def setUp(self):
913 archivemail.options.quiet = True
914 archivemail.options.archive_all = True
915
916 def testNew(self):
917 """new messages should be archived with --all"""
918 self.msg = make_message(hours_old=24*179, wantobj=True)
919 assert archivemail.should_archive(self.msg)
920
921 def testOld(self):
922 """old messages should be archived with --all"""
923 self.msg = make_message(hours_old=24*181, wantobj=True)
924 assert archivemail.should_archive(self.msg)
925
926 def tearDown(self):
927 archivemail.options.quiet = False
928 archivemail.options.archive_all = False
929
930 class TestArchiveMboxPreserveUnread(unittest.TestCase):
931 """make sure the 'preserve_unread' option works"""
932 def setUp(self):
933 archivemail.options.quiet = True
934 archivemail.options.preserve_unread = True
935 self.msg = make_message(hours_old=24*181, wantobj=True)
936
937 def testOldRead(self):
938 """old read messages should be archived with --preserve-unread"""
939 self.msg["Status"] = "RO"
940 assert archivemail.should_archive(self.msg)
941
942 def testOldUnread(self):
943 """old unread messages should not be archived with --preserve-unread"""
944 self.msg["Status"] = "O"
945 assert not archivemail.should_archive(self.msg)
946
947 def tearDown(self):
948 archivemail.options.quiet = False
949 archivemail.options.preserve_unread = False
950
951
952 class TestArchiveMboxSuffix(unittest.TestCase):
953 """make sure the 'suffix' option works"""
954 def setUp(self):
955 archivemail.options.quiet = True
956
957 def testSuffix(self):
958 """archiving with specified --suffix arguments"""
959 for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
960 mbox_name = "foobar"
961 archivemail.options.archive_suffix = suffix
962 days_old_max = 180
963 parsed_suffix_time = time.time() - days_old_max*24*60*60
964 parsed_suffix = time.strftime(suffix,
965 time.localtime(parsed_suffix_time))
966 archive_name = mbox_name + parsed_suffix
967 self.assertEqual(archive_name,
968 archivemail.make_archive_name(mbox_name))
969
970 def tearDown(self):
971 archivemail.options.quiet = False
972 archivemail.options.archive_suffix = None
973
974 class TestArchiveMboxPrefix(unittest.TestCase):
975 """make sure the 'prefix' option works"""
976 def setUp(self):
977 archivemail.options.quiet = True
978
979 def testPrefix(self):
980 """archiving with specified --prefix arguments"""
981 for archive_prefix in ("_static_", "_%B_%Y", "-%Y-%m-%d", "%Y/%m/"):
982 archivemail.options.archive_prefix = archive_prefix
983 for mbox_name in "foobar", "/tmp/foobar", "schnorchz/foobar":
984 archive_dir, archive_base = os.path.split(mbox_name)
985 days = archivemail.options.days_old_max
986 tm = time.localtime(time.time() - days*24*60*60)
987 prefix = time.strftime(archive_prefix, tm)
988 archive_name = os.path.join(archive_dir, prefix + archive_base)
989 self.assertEqual(archive_name,
990 archivemail.make_archive_name(mbox_name))
991
992 def tearDown(self):
993 archivemail.options.quiet = False
994 archivemail.options.archive_prefix = None
995
996 class TestArchiveName(unittest.TestCase):
997 def setUp(self):
998 archivemail.options.quiet = True
999
1000 def testArchiveName(self):
1001 """test the --archive-name option"""
1002 archive_names = ("custom", ".withdot", "custom_%Y", "%Y/joe")
1003 mbox = "foobar"
1004 for name in archive_names:
1005 archivemail.options.archive_name = name
1006 days = archivemail.options.days_old_max
1007 tm = time.localtime(time.time() - days*24*60*60)
1008 name = time.strftime(name, tm)
1009 self.assertEqual(archivemail.make_archive_name(mbox), name)
1010
1011 def tearDown(self):
1012 archivemail.options.quiet = False
1013 archivemail.options.archive_name = None
1014
1015 class TestArchiveAffixes(unittest.TestCase):
1016 def setUp(self):
1017 self.mbox = "harbsch"
1018 self.archive_prefix = "wurbl+"
1019 self.archive_suffix = "+schronk&borsz"
1020 archivemail.options.quiet = True
1021
1022 def testDefaultPrefix(self):
1023 """if no archive name affix is specified, the default archive suffix is appended"""
1024 self.assertEqual(archivemail.make_archive_name(self.mbox),
1025 self.mbox + archivemail.options.archive_default_suffix)
1026
1027 def testPrefixKillsDefaultSuffix(self):
1028 """if an archive name prefix is specified, the default archive suffix is not appended"""
1029 archivemail.options.archive_prefix = self.archive_prefix
1030 self.assertEqual(archivemail.make_archive_name(self.mbox),
1031 self.archive_prefix + self.mbox)
1032
1033 def testPrefixAndSuffix(self):
1034 """specifying both an archive name prefix and suffix works"""
1035 archivemail.options.archive_prefix = self.archive_prefix
1036 archivemail.options.archive_suffix = self.archive_suffix
1037 self.assertEqual(archivemail.make_archive_name(self.mbox),
1038 self.archive_prefix + self.mbox + self.archive_suffix)
1039
1040 def tearDown(self):
1041 archivemail.options.archive_prefix = None
1042 archivemail.options.archive_suffix = None
1043 archivemail.options.quiet = False
1044
1045 class TestArchiveHiddenMbox(unittest.TestCase):
1046 def setUp(self):
1047 archivemail.options.quiet = True
1048 self.mbox = ".upper.lower"
1049
1050 def testHiddenMbox(self):
1051 """leading dots are stripped from the archive name when no prefix is added"""
1052 self.assertEqual(archivemail.make_archive_name(self.mbox),
1053 self.mbox.lstrip('.') +
1054 archivemail.options.archive_default_suffix)
1055
1056 def testHiddenMboxPrefixedArchive(self):
1057 """no dots are stripped from the archive name when a prefix is added"""
1058 prefix = ".hidden_"
1059 archivemail.options.archive_prefix = prefix
1060 self.assertEqual(archivemail.make_archive_name(self.mbox),
1061 prefix + self.mbox)
1062
1063 def tearDown(self):
1064 archivemail.options.quiet = False
1065 archivemail.options.archive_prefix = None
1066
1067 class TestArchiveDryRun(TestArchive):
1068 """make sure the 'dry-run' option works"""
1069 def setUp(self):
1070 super(TestArchiveDryRun, self).setUp()
1071 archivemail.options.quiet = True
1072 archivemail.options.dry_run = True
1073
1074 def testOld(self):
1075 """archiving an old mailbox with the 'dry-run' option"""
1076 self.make_old_mbox(messages=3)
1077 archivemail.archive(self.mbox)
1078 self.verify()
1079
1080 def tearDown(self):
1081 archivemail.options.dry_run = False
1082 archivemail.options.quiet = False
1083 super(TestArchiveDryRun, self).tearDown()
1084
1085
1086 class TestArchiveDelete(TestArchive):
1087 """make sure the 'delete' option works"""
1088 def setUp(self):
1089 super(TestArchiveDelete, self).setUp()
1090 archivemail.options.quiet = True
1091 archivemail.options.delete_old_mail = True
1092
1093 def testNew(self):
1094 """archiving a new mailbox with the 'delete' option"""
1095 self.make_new_mbox(messages=3)
1096 archivemail.archive(self.mbox)
1097 self.verify()
1098
1099 def testMixed(self):
1100 """archiving a mixed mailbox with the 'delete' option"""
1101 self.make_mixed_mbox(messages=3)
1102 archivemail.archive(self.mbox)
1103 self.verify()
1104
1105 def testOld(self):
1106 """archiving an old mailbox with the 'delete' option"""
1107 self.make_old_mbox(messages=3)
1108 archivemail.archive(self.mbox)
1109 self.verify()
1110
1111 def tearDown(self):
1112 archivemail.options.delete_old_mail = False
1113 archivemail.options.quiet = False
1114 super(TestArchiveDelete, self).tearDown()
1115
1116
1117 class TestArchiveCopy(TestArchive):
1118 """make sure the 'copy' option works"""
1119 def setUp(self):
1120 super(TestArchiveCopy, self).setUp()
1121 archivemail.options.quiet = True
1122 archivemail.options.copy_old_mail = True
1123
1124 def testNew(self):
1125 """archiving a new mailbox with the 'copy' option"""
1126 self.make_new_mbox(messages=3)
1127 archivemail.archive(self.mbox)
1128 self.verify()
1129
1130 def testMixed(self):
1131 """archiving a mixed mailbox with the 'copy' option"""
1132 self.make_mixed_mbox(messages=3)
1133 archivemail.archive(self.mbox)
1134 self.verify()
1135
1136 def testOld(self):
1137 """archiving an old mailbox with the 'copy' option"""
1138 self.make_old_mbox(messages=3)
1139 archivemail.archive(self.mbox)
1140 self.verify()
1141
1142 def tearDown(self):
1143 archivemail.options.copy_old_mail = False
1144 archivemail.options.quiet = False
1145 super(TestArchiveCopy, self).tearDown()
1146
1147
1148 class TestArchiveMboxFlagged(unittest.TestCase):
1149 """make sure the 'include_flagged' option works"""
1150 def setUp(self):
1151 archivemail.options.include_flagged = False
1152 archivemail.options.quiet = True
1153
1154 def testOld(self):
1155 """by default, old flagged messages should not be archived"""
1156 msg = make_message(default_headers={"X-Status": "F"},
1157 hours_old=24*181, wantobj=True)
1158 assert not archivemail.should_archive(msg)
1159
1160 def testIncludeFlaggedNew(self):
1161 """new flagged messages should not be archived with include_flagged"""
1162 msg = make_message(default_headers={"X-Status": "F"},
1163 hours_old=24*179, wantobj=True)
1164 assert not archivemail.should_archive(msg)
1165
1166 def testIncludeFlaggedOld(self):
1167 """old flagged messages should be archived with include_flagged"""
1168 archivemail.options.include_flagged = True
1169 msg = make_message(default_headers={"X-Status": "F"},
1170 hours_old=24*181, wantobj=True)
1171 assert archivemail.should_archive(msg)
1172
1173 def tearDown(self):
1174 archivemail.options.include_flagged = False
1175 archivemail.options.quiet = False
1176
1177
1178 class TestArchiveMboxOutputDir(unittest.TestCase):
1179 """make sure that the 'output-dir' option works"""
1180 def setUp(self):
1181 archivemail.options.quiet = True
1182
1183 def testOld(self):
1184 """archiving an old mailbox with a sepecified output dir"""
1185 for dir in "/just/a/path", "relative/path":
1186 archivemail.options.output_dir = dir
1187 archive_dir = archivemail.make_archive_name("/tmp/mbox")
1188 self.assertEqual(dir, os.path.dirname(archive_dir))
1189
1190 def tearDown(self):
1191 archivemail.options.quiet = False
1192 archivemail.options.output_dir = None
1193
1194
1195 class TestArchiveMboxUncompressed(TestArchive):
1196 """make sure that the 'no_compress' option works"""
1197 mbox_name = None
1198 new_mbox = None
1199 old_mbox = None
1200 copy_name = None
1201
1202 def setUp(self):
1203 archivemail.options.quiet = True
1204 archivemail.options.no_compress = True
1205 super(TestArchiveMboxUncompressed, self).setUp()
1206
1207 def testOld(self):
1208 """archiving an old mailbox uncompressed"""
1209 self.make_old_mbox(messages=3)
1210 archivemail.archive(self.mbox)
1211 self.verify()
1212
1213 def testNew(self):
1214 """archiving a new mailbox uncompressed"""
1215 self.make_new_mbox(messages=3)
1216 archivemail.archive(self.mbox)
1217 self.verify()
1218
1219 def testMixed(self):
1220 """archiving a mixed mailbox uncompressed"""
1221 self.make_mixed_mbox(messages=3)
1222 archivemail.archive(self.mbox)
1223 self.verify()
1224
1225 def testOldExists(self):
1226 """archiving an old mailbox uncopressed with an existing archive"""
1227 self.make_old_mbox(messages=3, make_old_archive=True)
1228 archivemail.archive(self.mbox)
1229 self.verify()
1230
1231 def tearDown(self):
1232 archivemail.options.quiet = False
1233 archivemail.options.no_compress = False
1234 super(TestArchiveMboxUncompressed, self).tearDown()
1235
1236
1237 class TestArchiveSize(unittest.TestCase):
1238 """check that the 'size' argument works"""
1239 def setUp(self):
1240 archivemail.options.quiet = True
1241 msg_text = make_message(hours_old=24*181)
1242 self.msg_size = len(msg_text)
1243 fp = cStringIO.StringIO(msg_text)
1244 self.msg = rfc822.Message(fp)
1245
1246 def testSmaller(self):
1247 """giving a size argument smaller than the message"""
1248 archivemail.options.min_size = self.msg_size - 1
1249 assert archivemail.should_archive(self.msg)
1250
1251 def testBigger(self):
1252 """giving a size argument bigger than the message"""
1253 archivemail.options.min_size = self.msg_size + 1
1254 assert not archivemail.should_archive(self.msg)
1255
1256 def tearDown(self):
1257 archivemail.options.quiet = False
1258 archivemail.options.min_size = None
1259
1260
1261 class TestXIMAPMessage(TestArchive):
1262 """Test if IMAP pseudo messages in mboxes are properly handled."""
1263 def setUp(self):
1264 super(TestXIMAPMessage, self).setUp()
1265 archivemail.options.quiet = True
1266
1267 def testXIMAPMbox(self):
1268 """IMAP pseudo messages in an mbox are always preserved."""
1269 self.good_mbox = make_mbox(hours_old=181*24, headers={'X-IMAP': 'dummytext'},
1270 messages=1)
1271 self.good_archive = make_mbox(hours_old=181*24, messages=3)
1272 self.mbox = tempfile.mkstemp()[-1]
1273 shutil.copyfile(self.good_mbox, self.mbox)
1274 append_file(self.good_archive, self.mbox)
1275 archivemail.archive(self.mbox)
1276 self.verify()
1277
1278 def tearDown(self):
1279 super(TestXIMAPMessage, self).tearDown()
1280 archivemail.options.quiet = False
1281
1282
1283 ############# Test archiving maildirs ###############
1284
1285 class TestArchiveMailboxdir(TestCaseInTempdir):
1286 """Base class defining helper functions for doing test archive runs with
1287 maildirs."""
1288 maildir = None # Maildir that will be processed by archivemail
1289 orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object
1290 remaining_msg = set() # Filenames of maildir messages that should be preserved
1291 number_archived = 0 # Number of messages that get archived
1292 orig_archive = None # An uncompressed copy of a pre-existing archive,
1293 # if one exists
1294
1295 def setUp(self):
1296 super(TestArchiveMailboxdir, self).setUp()
1297 self.orig_maildir_obj = SimpleMaildir()
1298
1299 def verify(self):
1300 self._verify_remaining()
1301 self._verify_archive()
1302
1303 def _verify_remaining(self):
1304 """Verify that the preserved messages weren't altered."""
1305 assert self.maildir
1306 # Compare maildir with backup object.
1307 dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root)
1308 # Top-level has only directories cur, new, tmp and must be unchanged.
1309 self.assertEqual(dcmp.left_list, dcmp.right_list)
1310 found = set()
1311 for d in dcmp.common_dirs:
1312 dcmp2 = dcmp.subdirs[d]
1313 # We need to verify three things.
1314 # 1. directory is a subset of the original...
1315 assert not dcmp2.left_only
1316 # 2. all common files are identical...
1317 self.assertEqual(dcmp2.common_files, dcmp2.same_files)
1318 found = found.union([os.path.join(d, x) for x in dcmp2.common_files])
1319 # 3. exactly the `new' messages (recorded in self.remaining_msg)
1320 # were preserved.
1321 self.assertEqual(found, self.remaining_msg)
1322
1323 def _verify_archive(self):
1324 """Verify the archive correctness."""
1325 # TODO: currently make_archive_name does not include the .gz suffix.
1326 # Is this something that should be fixed?
1327 archive = archivemail.make_archive_name(self.maildir)
1328 if archivemail.options.no_compress:
1329 iszipped = False
1330 else:
1331 archive += '.gz'
1332 iszipped = True
1333 if self.number_archived == 0:
1334 if self.orig_archive:
1335 assertEqualContent(archive, self.orig_archive, iszipped)
1336 else:
1337 assert not os.path.exists(archive)
1338 return
1339 fp_new = fp_archive = tmp_archive_name = None
1340 try:
1341 if self.orig_archive:
1342 new_size = os.path.getsize(archive)
1343 # Brute force: split archive in old and new part and verify the
1344 # parts separately. (Of course this destroys the archive.)
1345 fp_archive = open(archive, "r+")
1346 fp_archive.seek(self.orig_archive_size)
1347 fd, tmp_archive_name = tempfile.mkstemp()
1348 fp_new = os.fdopen(fd, "w")
1349 shutil.copyfileobj(fp_archive, fp_new)
1350 fp_new.close()
1351 fp_archive.truncate(self.orig_archive_size)
1352 fp_archive.close()
1353 assertEqualContent(archive, self.orig_archive, iszipped)
1354 new_archive = tmp_archive_name
1355 else:
1356 new_archive = archive
1357 if archivemail.options.no_compress:
1358 fp_archive = open(new_archive, "r")
1359 else:
1360 fp_archive = FixedGzipFile(new_archive, "r")
1361 mb = mailbox.UnixMailbox(fp_archive)
1362 found = 0
1363 for msg in mb:
1364 self.verify_maildir_has_msg(self.orig_maildir_obj, msg)
1365 found += 1
1366 self.assertEqual(found, self.number_archived)
1367 finally:
1368 if tmp_archive_name:
1369 os.remove(tmp_archive_name)
1370 if fp_new is not None:
1371 fp_new.close()
1372 if fp_archive is not None:
1373 fp_archive.close()
1374
1375 def verify_maildir_has_msg(self, maildir, msg):
1376 """Assert that the given maildir has a copy of the rfc822 message."""
1377 mid = msg['Message-Id'] # Complains if there is no message-id
1378 mdir_msg_str, mdir_flags = \
1379 maildir.get_message_and_mbox_status(mid)
1380 mbox_flags = set(msg.get('status', '') + msg.get('x-status', ''))
1381 self.assertEqual(mdir_flags, mbox_flags)
1382
1383 headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'),
1384 msg.headers)
1385 headers = "".join(headers)
1386 msg.rewindbody()
1387 # Discard last mbox LF which is not part of the message.
1388 body = msg.fp.read()[:-1]
1389 msg_str = headers + os.linesep + body
1390 self.assertEqual(mdir_msg_str, msg_str)
1391
1392 def add_messages(self, body=None, headers=None, hours_old=0, messages=1):
1393 for count in range(messages):
1394 msg = make_message(body, default_headers=headers, mkfrom=False,
1395 hours_old=hours_old)
1396 self.orig_maildir_obj.write(msg, new=False)
1397
1398 def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1,
1399 make_old_archive=False):
1400 mailbox_does_change = not (archivemail.options.dry_run or
1401 archivemail.options.copy_old_mail)
1402 archive_does_change = not (archivemail.options.dry_run or
1403 archivemail.options.delete_old_mail)
1404 if mknew:
1405 self.add_messages(body, headers, 179*24, messages)
1406 if archive_does_change and archivemail.options.archive_all:
1407 self.number_archived += messages
1408 if mailbox_does_change:
1409 self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
1410 if mkold:
1411 self.add_messages(body, headers, 181*24, messages)
1412 if archive_does_change:
1413 self.number_archived += messages
1414 if not mailbox_does_change:
1415 self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
1416 self.maildir = copy_maildir(self.orig_maildir_obj.root)
1417 if make_old_archive:
1418 archive = archivemail.make_archive_name(self.maildir)
1419 self.orig_archive = make_archive_and_plain_copy(archive)
1420 # FIXME: .gz extension handling is a mess II
1421 if not archivemail.options.no_compress:
1422 archive += '.gz'
1423 self.orig_archive_size = os.path.getsize(archive)
1424
1425 class TestEmptyMaildir(TestCaseInTempdir):
1426 def setUp(self):
1427 super(TestEmptyMaildir, self).setUp()
1428 archivemail.options.quiet = True
1429
1430 def testEmpty(self):
1431 """Archiving an empty maildir should not result in an archive."""
1432 self.mdir = SimpleMaildir()
1433 archivemail.archive(self.mdir.root)
1434 assert not os.path.exists(self.mdir.root + '_archive.gz')
1435
1436 def tearDown(self):
1437 super(TestEmptyMaildir, self).tearDown()
1438 archivemail.options.quiet = False
1439
1440 class TestMaildir(TestArchiveMailboxdir):
1441 def setUp(self):
1442 super(TestMaildir, self).setUp()
1443 archivemail.options.quiet = True
1444
1445 def testOld(self):
1446 self.make_maildir(True, False, messages=3)
1447 archivemail.archive(self.maildir)
1448 self.verify()
1449
1450 def testNew(self):
1451 self.make_maildir(False, True, messages=3)
1452 archivemail.archive(self.maildir)
1453 self.verify()
1454
1455 def testMixed(self):
1456 self.make_maildir(True, True, messages=3)
1457 archivemail.archive(self.maildir)
1458 self.verify()
1459
1460 def testMixedExisting(self):
1461 self.make_maildir(True, True, messages=3, make_old_archive=True)
1462 archivemail.archive(self.maildir)
1463 self.verify()
1464
1465 def tearDown(self):
1466 archivemail.options.quiet = False
1467 super(TestMaildir, self).tearDown()
1468
1469
1470 class TestMaildirPreserveUnread(TestCaseInTempdir):
1471 """Test if the preserve_unread option works with maildirs."""
1472 def setUp(self):
1473 super(TestMaildirPreserveUnread, self).setUp()
1474 archivemail.options.quiet = True
1475 archivemail.options.preserve_unread = True
1476
1477 def testOldRead(self):
1478 """--preserve-unread archives old read messages in a maildir."""
1479 smd = SimpleMaildir("orig")
1480 msg = make_message(hours_old=24*181)
1481 smd.write(msg, new=False, flags='S')
1482 md = mailbox.Maildir(smd.root)
1483 msg_obj = md.next()
1484 assert archivemail.should_archive(msg_obj)
1485
1486 def testOldUnread(self):
1487 """--preserve-unread preserves old unread messages in a maildir."""
1488 smd = SimpleMaildir("orig")
1489 msg = make_message(hours_old=24*181)
1490 smd.write(msg, new=False)
1491 md = mailbox.Maildir(smd.root)
1492 msg_obj = md.next()
1493 assert not archivemail.should_archive(msg_obj)
1494
1495 def tearDown(self):
1496 archivemail.options.quiet = False
1497 archivemail.options.preserve_unread = False
1498 super(TestMaildirPreserveUnread, self).tearDown()
1499
1500 class TestMaildirAll(TestArchiveMailboxdir):
1501 def setUp(self):
1502 super(TestMaildirAll, self).setUp()
1503 archivemail.options.quiet = True
1504 archivemail.options.archive_all = True
1505
1506 def testNew(self):
1507 """New maildir messages should be archived with --all"""
1508 self.add_messages(hours_old=24*181)
1509 md = mailbox.Maildir(self.orig_maildir_obj.root)
1510 msg_obj = md.next()
1511 assert archivemail.should_archive(msg_obj)
1512
1513 def testOld(self):
1514 """Old maildir messages should be archived with --all"""
1515 self.add_messages(hours_old=24*179)
1516 md = mailbox.Maildir(self.orig_maildir_obj.root)
1517 msg_obj = md.next()
1518 assert archivemail.should_archive(msg_obj)
1519
1520 def tearDown(self):
1521 super(TestMaildirAll, self).tearDown()
1522 archivemail.options.quiet = False
1523 archivemail.options.archive_all = False
1524
1525 class TestMaildirDryRun(TestArchiveMailboxdir):
1526 def setUp(self):
1527 super(TestMaildirDryRun, self).setUp()
1528 archivemail.options.quiet = True
1529 archivemail.options.dry_run = True
1530
1531 def testOld(self):
1532 """archiving an old maildir mailbox with the 'dry-run' option"""
1533 self.make_maildir(True, False)
1534 archivemail.archive(self.maildir)
1535 self.verify()
1536
1537 def tearDown(self):
1538 super(TestMaildirDryRun, self).tearDown()
1539 archivemail.options.quiet = False
1540 archivemail.options.dry_run = False
1541
1542 class TestMaildirDelete(TestArchiveMailboxdir):
1543 def setUp(self):
1544 super(TestMaildirDelete, self).setUp()
1545 archivemail.options.quiet = True
1546 archivemail.options.delete_old_mail = True
1547
1548 def testOld(self):
1549 """archiving an old maildir mailbox with the 'delete' option"""
1550 self.make_maildir(True, False)
1551 archivemail.archive(self.maildir)
1552 self.verify()
1553
1554 def testNew(self):
1555 """archiving a new maildir mailbox with the 'delete' option"""
1556 self.make_maildir(False, True)
1557 archivemail.archive(self.maildir)
1558 self.verify()
1559
1560 def tearDown(self):
1561 super(TestMaildirDelete, self).tearDown()
1562 archivemail.options.quiet = False
1563 archivemail.options.delete_old_mail = False
1564
1565 class TestMaildirCopy(TestArchiveMailboxdir):
1566 def setUp(self):
1567 super(TestMaildirCopy, self).setUp()
1568 archivemail.options.quiet = True
1569 archivemail.options.copy_old_mail = True
1570
1571 def testOld(self):
1572 """archiving an old maildir mailbox with the 'copy' option"""
1573 self.make_maildir(True, False)
1574 archivemail.archive(self.maildir)
1575 self.verify()
1576
1577 def testNew(self):
1578 """archiving a new maildir mailbox with the 'copy' option"""
1579 self.make_maildir(False, True)
1580 archivemail.archive(self.maildir)
1581 self.verify()
1582
1583 def tearDown(self):
1584 super(TestMaildirCopy, self).tearDown()
1585 archivemail.options.quiet = False
1586 archivemail.options.copy_old_mail = False
1587
1588 class TestArchiveMaildirFlagged(TestCaseInTempdir):
1589 """make sure the 'include_flagged' option works with maildir messages"""
1590 def setUp(self):
1591 super(TestArchiveMaildirFlagged, self).setUp()
1592 archivemail.options.include_flagged = False
1593 archivemail.options.quiet = True
1594
1595 def testOld(self):
1596 """by default, old flagged maildir messages should not be archived"""
1597 smd = SimpleMaildir("orig")
1598 msg = make_message(hours_old=24*181)
1599 smd.write(msg, new=False, flags='F')
1600 md = mailbox.Maildir(smd.root)
1601 msg_obj = md.next()
1602 assert not archivemail.should_archive(msg_obj)
1603
1604 def testIncludeFlaggedNew(self):
1605 """new flagged maildir messages should not be archived with include_flagged"""
1606 smd = SimpleMaildir("orig")
1607 msg = make_message(hours_old=24*179)
1608 smd.write(msg, new=False, flags='F')
1609 md = mailbox.Maildir(smd.root)
1610 msg_obj = md.next()
1611 assert not archivemail.should_archive(msg_obj)
1612
1613 def testIncludeFlaggedOld(self):
1614 """old flagged maildir messages should be archived with include_flagged"""
1615 archivemail.options.include_flagged = True
1616 smd = SimpleMaildir("orig")
1617 msg = make_message(hours_old=24*181)
1618 smd.write(msg, new=False, flags='F')
1619 md = mailbox.Maildir(smd.root)
1620 msg_obj = md.next()
1621 assert archivemail.should_archive(msg_obj)
1622
1623 def tearDown(self):
1624 super(TestArchiveMaildirFlagged, self).tearDown()
1625 archivemail.options.include_flagged = False
1626 archivemail.options.quiet = False
1627
1628 class TestArchiveMaildirSize(TestCaseInTempdir):
1629 """check that the 'size' argument works with maildir messages"""
1630 def setUp(self):
1631 super(TestArchiveMaildirSize, self).setUp()
1632 archivemail.options.quiet = True
1633 msg = make_message(hours_old=24*181)
1634 self.msg_size = len(msg)
1635 smd = SimpleMaildir("orig")
1636 smd.write(msg, new=False)
1637 md = mailbox.Maildir(smd.root)
1638 self.msg_obj = md.next()
1639
1640 def testSmaller(self):
1641 """giving a size argument smaller than the maildir message"""
1642 archivemail.options.min_size = self.msg_size - 1
1643 assert archivemail.should_archive(self.msg_obj)
1644
1645 def testBigger(self):
1646 """giving a size argument bigger than the maildir message"""
1647 archivemail.options.min_size = self.msg_size + 1
1648 assert not archivemail.should_archive(self.msg_obj)
1649
1650 def tearDown(self):
1651 super(TestArchiveMaildirSize, self).tearDown()
1652 archivemail.options.quiet = False
1653 archivemail.options.min_size = None
1654
1655 ########## helper routines ############
1656
1657 def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False):
1658 headers = copy.copy(default_headers)
1659 if not headers:
1660 headers = {}
1661 headers['Message-Id'] = make_msgid()
1662 if not headers.has_key('Date'):
1663 time_message = time.time() - (60 * 60 * hours_old)
1664 headers['Date'] = time.asctime(time.localtime(time_message))
1665 if not headers.has_key('From'):
1666 headers['From'] = "sender@dummy.domain"
1667 if not headers.has_key('To'):
1668 headers['To'] = "receipient@dummy.domain"
1669 if not headers.has_key('Subject'):
1670 headers['Subject'] = "This is the subject"
1671 if mkfrom and not headers.has_key('From_'):
1672 headers['From_'] = "%s %s" % (headers['From'], headers['Date'])
1673 if not body:
1674 body = "This is the message body"
1675
1676 msg = ""
1677 if headers.has_key('From_'):
1678 msg = msg + ("From %s\n" % headers['From_'])
1679 del headers['From_']
1680 for key in headers.keys():
1681 if headers[key] is not None:
1682 msg = msg + ("%s: %s\n" % (key, headers[key]))
1683 msg = msg + "\n\n" + body + "\n\n"
1684 if not wantobj:
1685 return msg
1686 fp = cStringIO.StringIO(msg)
1687 return rfc822.Message(fp)
1688
1689 def append_file(source, dest):
1690 """appends the file named 'source' to the file named 'dest'"""
1691 assert os.path.isfile(source)
1692 assert os.path.isfile(dest)
1693 read = open(source, "r")
1694 write = open(dest, "a+")
1695 shutil.copyfileobj(read,write)
1696 read.close()
1697 write.close()
1698
1699
1700 def make_mbox(body=None, headers=None, hours_old=0, messages=1):
1701 assert tempfile.tempdir
1702 fd, name = tempfile.mkstemp()
1703 file = os.fdopen(fd, "w")
1704 for count in range(messages):
1705 msg = make_message(body=body, default_headers=headers,
1706 mkfrom=True, hours_old=hours_old)
1707 file.write(msg)
1708 file.close()
1709 return name
1710
1711 def make_archive_and_plain_copy(archive_name):
1712 """Make an mbox archive of the given name like archivemail may have
1713 created it. Also make an uncompressed copy of this archive and return its
1714 name."""
1715 copy_fd, copy_name = tempfile.mkstemp()
1716 copy_fp = os.fdopen(copy_fd, "w")
1717 if archivemail.options.no_compress:
1718 fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
1719 fp = os.fdopen(fd, "w")
1720 else:
1721 archive_name += ".gz"
1722 fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
1723 rawfp = os.fdopen(fd, "w")
1724 fp = gzip.GzipFile(fileobj=rawfp)
1725 for count in range(3):
1726 msg = make_message(hours_old=24*360)
1727 fp.write(msg)
1728 copy_fp.write(msg)
1729 fp.close()
1730 copy_fp.close()
1731 if not archivemail.options.no_compress:
1732 rawfp.close()
1733 return copy_name
1734
1735 def copy_maildir(maildir, prefix="tmp"):
1736 """Create a copy of the given maildir and return the absolute path of the
1737 new direcory."""
1738 newdir = tempfile.mkdtemp(prefix=prefix)
1739 for d in "cur", "new", "tmp":
1740 shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d))
1741 return newdir
1742
1743 def assertEqualContent(firstfile, secondfile, zippedfirst=False):
1744 """Verify that the two files exist and have identical content. If zippedfirst
1745 is True, assume that firstfile is gzip-compressed."""
1746 assert os.path.exists(firstfile)
1747 assert os.path.exists(secondfile)
1748 if zippedfirst:
1749 try:
1750 fp1 = gzip.GzipFile(firstfile, "r")
1751 fp2 = open(secondfile, "r")
1752 assert cmp_fileobj(fp1, fp2)
1753 finally:
1754 fp1.close()
1755 fp2.close()
1756 else:
1757 assert filecmp.cmp(firstfile, secondfile, shallow=0)
1758
1759 def cmp_fileobj(fp1, fp2):
1760 """Return if reading the fileobjects yields identical content."""
1761 bufsize = 8192
1762 while True:
1763 b1 = fp1.read(bufsize)
1764 b2 = fp2.read(bufsize)
1765 if b1 != b2:
1766 return False
1767 if not b1:
1768 return True
1769
1770 if __name__ == "__main__":
1771 unittest.main()