"Fossies" - the Fresh Open Source Software Archive

Member "archivemail-0.9.0/test_archivemail" (9 Jul 2011, 68583 Bytes) of package /linux/privat/old/archivemail-0.9.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 #! /usr/bin/env python
    2 ############################################################################
    3 # Copyright (C) 2002  Paul Rodger <paul@paulrodger.com>
    4 #           (C) 2006-2011  Nikolaus Schulz <microschulz@web.de>
    5 #
    6 # This program is free software; you can redistribute it and/or modify
    7 # it under the terms of the GNU General Public License as published by
    8 # the Free Software Foundation; either version 2 of the License, or
    9 # (at your option) any later version.
   10 #
   11 # This program is distributed in the hope that it will be useful,
   12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
   13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   14 # GNU General Public License for more details.
   15 #
   16 # You should have received a copy of the GNU General Public License
   17 # along with this program; if not, write to the Free Software
   18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
   19 ############################################################################
   20 """
   21 Unit-test archivemail using 'PyUnit'.
   22 
   23 TODO: add tests for:
   24     * dotlock locks already existing
   25     * archiving MH-format mailboxes
   26     * a 3rd party process changing the mbox file being read
   27 
   28 """
   29 
   30 import sys
   31 
   32 def check_python_version(): 
   33     """Abort if we are running on python < v2.3"""
   34     too_old_error = "This test script requires python version 2.3 or later. " + \
   35       "Your version of python is:\n%s" % sys.version
   36     try: 
   37         version = sys.version_info  # we might not even have this function! :)
   38         if (version[0] < 2) or (version[0] == 2 and version[1] < 3):
   39             print too_old_error
   40             sys.exit(1)
   41     except AttributeError:
   42         print too_old_error
   43         sys.exit(1)
   44 
   45 # define & run this early because 'unittest' requires Python >= 2.1
   46 check_python_version()  
   47 
   48 import copy
   49 import fcntl
   50 import filecmp
   51 import os
   52 import re
   53 import shutil
   54 import stat
   55 import tempfile
   56 import time
   57 import unittest
   58 import gzip
   59 import cStringIO
   60 import rfc822
   61 import mailbox
   62 
   63 from types import ModuleType
   64 archivemail = ModuleType("archivemail")
   65 try:
   66     module_fp = open("archivemail", "r")
   67 except IOError:
   68     print "The archivemail script should be in the current directory in order"
   69     print "to be imported and tested. Sorry."
   70     sys.exit(1)
   71 exec module_fp in archivemail.__dict__
   72 
   73 # We want to iterate over messages in a compressed archive mbox and verify
   74 # them.  This involves seeking in the mbox.  The gzip.Gzipfile.seek() in
   75 # Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered
   76 # by mailbox._PartialFile.seek().  The bug is still pending as of Python
   77 # 2.5.2.  To work around it, we subclass gzip.GzipFile.
   78 #
   79 # It should be noted that seeking backwards in a GzipFile is emulated by
   80 # re-reading the entire file from the beginning, which is extremely
   81 # inefficient and won't work with large files; but our test archives are all
   82 # small, so it's okay.
   83 
   84 class FixedGzipFile(gzip.GzipFile):
   85     """GzipFile with seek method accepting whence parameter."""
   86     def seek(self, offset, whence=0):
   87         try:
   88             # Try calling gzip.GzipFile.seek with the whence parameter.
   89             # For Python >= 2.7, it returns the new offset; pass that on.
   90             return gzip.GzipFile.seek(self, offset, whence)
   91         except TypeError:
   92             if whence:
   93                 if whence == 1:
   94                     offset = self.offset + offset
   95                 else:
   96                     raise ValueError('Seek from end not supported')
   97             return gzip.GzipFile.seek(self, offset)
   98 
   99 # precision of os.utime() when restoring mbox timestamps
  100 utimes_precision = 5
  101 
  102 class MessageIdFactory:
  103     """Factory to create `uniqe' message-ids."""
  104     def __init__(self):
  105         self.seq = 0
  106     def __call__(self):
  107         self.seq += 1
  108         return "<archivemail%d@localhost>" % self.seq
  109 
  110 make_msgid = MessageIdFactory()
  111 
  112 class IndexedMailboxDir:
  113     """An indexed mailbox directory, providing random message access by
  114     message-id. Intended as a base class for a maildir and an mh subclass."""
  115 
  116     def __init__(self, mdir_name):
  117         assert tempfile.tempdir
  118         self.root = tempfile.mkdtemp(prefix=mdir_name)
  119         self.msg_id_dict = {}
  120         self.deliveries = 0
  121 
  122     def _add_to_index(self, msg_text, fpath):
  123         """Add the given message to the index, for later random access."""
  124         # Extract the message-id as index key
  125         msg_id = None
  126         fp = cStringIO.StringIO(msg_text)
  127         while True:
  128             line = fp.readline()
  129             # line empty means we didn't find a message-id
  130             assert line
  131             if line.lower().startswith("message-id:"):
  132                 msg_id = line.split(":", 1)[-1].strip()
  133                 assert msg_id
  134                 break
  135         assert not self.msg_id_dict.has_key(msg_id)
  136         self.msg_id_dict[msg_id] = fpath
  137 
  138     def get_all_filenames(self):
  139         """Return all relative pathnames of files in this mailbox."""
  140         return self.msg_id_dict.values()
  141 
  142 class SimpleMaildir(IndexedMailboxDir):
  143     """Primitive Maildir class, just good enough for generating short-lived
  144     test maildirs."""
  145 
  146     def __init__(self, mdir_name='maildir'):
  147         IndexedMailboxDir.__init__(self, mdir_name)
  148         for d in "cur", "tmp", "new":
  149             os.mkdir(os.path.join(self.root, d))
  150 
  151     def write(self, msg_str, new=True, flags=[]):
  152         """Store a message with the given flags."""
  153         assert not (new and flags)
  154         if new:
  155             subdir = "new"
  156         else:
  157             subdir = "cur"
  158         fname = self._mkname(new, flags)
  159         relpath = os.path.join(subdir, fname)
  160         path = os.path.join(self.root, relpath)
  161         assert not os.path.exists(path)
  162         f = open(path, "w")
  163         f.write(msg_str)
  164         f.close()
  165         self._add_to_index(msg_str, relpath)
  166 
  167     def _mkname(self, new, flags):
  168         """Generate a unique filename for a new message."""
  169         validflags = 'DFPRST'
  170         for f in flags:
  171             assert f in validflags
  172         # This 'unique' name should be good enough, since nobody else
  173         # will ever write messages to this maildir folder.
  174         uniq = str(self.deliveries)
  175         self.deliveries += 1
  176         if new:
  177             return uniq
  178         if not flags:
  179             return uniq + ':2,'
  180         finfo = "".join(sorted(flags))
  181         return uniq + ':2,' + finfo
  182 
  183     def get_message_and_mbox_status(self, msgid):
  184         """For the Message-Id msgid, return the matching message in text
  185         format and its status, expressed as a set of mbox flags."""
  186         fpath = self.msg_id_dict[msgid] # Barfs if not found
  187         mdir_flags = fpath.rsplit('2,', 1)[-1]
  188         flagmap = {
  189                 'F': 'F',
  190                 'R': 'A',
  191                 'S': 'R'
  192         }
  193         mbox_flags = set([flagmap[x] for x in mdir_flags])
  194         if fpath.startswith("cur/"):
  195             mbox_flags.add('O')
  196         fp = open(os.path.join(self.root, fpath), "r")
  197         msg = fp.read()
  198         fp.close()
  199         return msg, mbox_flags
  200 
  201 
  202 class TestCaseInTempdir(unittest.TestCase):
  203     """Base class for testcases that need to create temporary files. 
  204     All testcases that create temporary files should be derived from this
  205     class, not directly from unittest.TestCase.
  206     TestCaseInTempdir provides these methods:
  207     
  208     setUp()     Creates a safe temporary directory and sets tempfile.tempdir.
  209                 
  210     tearDown()  Recursively removes the temporary directory and unsets
  211                 tempfile.tempdir.
  212 
  213     Overriding methods should call the ones above."""
  214     temproot = None
  215 
  216     def setUp(self):
  217         if not self.temproot:
  218             assert not tempfile.tempdir
  219             self.temproot = tempfile.tempdir = \
  220                 tempfile.mkdtemp(prefix="test-archivemail")
  221      
  222     def tearDown(self):
  223         assert tempfile.tempdir == self.temproot
  224         if self.temproot:
  225             shutil.rmtree(self.temproot)
  226             tempfile.tempdir = self.temproot = None
  227 
  228 
  229 ############ Mbox Class testing ##############
  230 
  231 class TestMboxDotlock(TestCaseInTempdir):
  232     def setUp(self):
  233         super(TestMboxDotlock, self).setUp()
  234         self.mbox_name = make_mbox()
  235         self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
  236         self.mbox = archivemail.Mbox(self.mbox_name)
  237 
  238     def testDotlock(self):
  239         """dotlock_lock/unlock should create/delete a lockfile"""
  240         lock = self.mbox_name + ".lock"
  241         self.mbox._dotlock_lock()
  242         assert os.path.isfile(lock)
  243         self.mbox._dotlock_unlock()
  244         assert not os.path.isfile(lock)
  245 
  246     def testDotlockingSucceedsUponEACCES(self):
  247         """A dotlock should silently be omitted upon EACCES."""
  248         archivemail.options.quiet = True
  249         mbox_dir = os.path.dirname(self.mbox_name)
  250         os.chmod(mbox_dir, 0500)
  251         try:
  252             self.mbox._dotlock_lock()
  253             self.mbox._dotlock_unlock()
  254         finally:
  255             os.chmod(mbox_dir, 0700)
  256             archivemail.options.quiet = False
  257 
  258 class TestMboxPosixLock(TestCaseInTempdir):
  259     def setUp(self):
  260         super(TestMboxPosixLock, self).setUp()
  261         self.mbox_name = make_mbox()
  262         self.mbox = archivemail.Mbox(self.mbox_name)
  263 
  264     def testPosixLock(self):
  265         """posix_lock/unlock should create/delete an advisory lock"""
  266         
  267         # The following code snippet heavily lends from the Python 2.5 mailbox
  268         # unittest.
  269         # BEGIN robbery:
  270 
  271         # Fork off a subprocess that will lock the file for 2 seconds,
  272         # unlock it, and then exit.
  273         pid = os.fork()
  274         if pid == 0:
  275             # In the child, lock the mailbox.
  276             self.mbox._posix_lock()
  277             time.sleep(2)
  278             self.mbox._posix_unlock()
  279             os._exit(0)
  280 
  281         # In the parent, sleep a bit to give the child time to acquire
  282         # the lock.
  283         time.sleep(0.5)
  284         # The parent's file self.mbox.mbox_file shares fcntl locks with the
  285         # duplicated FD in the child; reopen it so we get a different file
  286         # table entry.
  287         file = open(self.mbox_name, "r+")
  288         lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB
  289         fd = file.fileno()
  290         try:
  291             self.assertRaises(IOError, fcntl.lockf, fd, lock_nb)
  292 
  293         finally:
  294             # Wait for child to exit.  Locking should now succeed.
  295             exited_pid, status = os.waitpid(pid, 0)
  296 
  297         fcntl.lockf(fd, lock_nb)
  298         fcntl.lockf(fd, fcntl.LOCK_UN)
  299         # END robbery
  300 
  301 
  302 class TestMboxNext(TestCaseInTempdir):
  303     def setUp(self):
  304         super(TestMboxNext, self).setUp()
  305         self.not_empty_name = make_mbox(messages=18)
  306         self.empty_name = make_mbox(messages=0)
  307 
  308     def testNextEmpty(self):
  309         """mbox.next() should return None on an empty mailbox"""
  310         mbox = archivemail.Mbox(self.empty_name)
  311         msg = mbox.next()
  312         self.assertEqual(msg, None)
  313 
  314     def testNextNotEmpty(self):
  315         """mbox.next() should a message on a populated mailbox"""
  316         mbox = archivemail.Mbox(self.not_empty_name)
  317         for count in range(18):
  318             msg = mbox.next()
  319             assert msg
  320         msg = mbox.next()
  321         self.assertEqual(msg, None)
  322 
  323 
  324 ############ TempMbox Class testing ##############
  325 
  326 class TestTempMboxWrite(TestCaseInTempdir):
  327     def setUp(self):
  328         super(TestTempMboxWrite, self).setUp()
  329 
  330     def testWrite(self):
  331         """mbox.write() should append messages to a mbox mailbox"""
  332         read_file = make_mbox(messages=3)
  333         mbox_read = archivemail.Mbox(read_file)
  334         mbox_write = archivemail.TempMbox()
  335         write_file = mbox_write.mbox_file_name
  336         for count in range(3):
  337             msg = mbox_read.next()
  338             mbox_write.write(msg)
  339         mbox_read.close()
  340         mbox_write.close()
  341         assert filecmp.cmp(read_file, write_file, shallow=0)
  342 
  343     def testWriteNone(self):
  344         """calling mbox.write() with no message should raise AssertionError"""
  345         write = archivemail.TempMbox()
  346         self.assertRaises(AssertionError, write.write, None)
  347 
  348 class TestTempMboxRemove(TestCaseInTempdir):
  349     def setUp(self):
  350         super(TestTempMboxRemove, self).setUp()
  351         self.mbox = archivemail.TempMbox()
  352         self.mbox_name = self.mbox.mbox_file_name
  353 
  354     def testMboxRemove(self):
  355         """remove() should delete a mbox mailbox"""
  356         assert os.path.exists(self.mbox_name)
  357         self.mbox.remove()
  358         assert not os.path.exists(self.mbox_name)
  359 
  360 
  361 
  362 ########## options class testing #################
  363 
  364 class TestOptionDefaults(unittest.TestCase):
  365     def testVerbose(self):
  366         """verbose should be off by default"""
  367         self.assertEqual(archivemail.options.verbose, False)
  368 
  369     def testDaysOldMax(self):
  370         """default archival time should be 180 days"""
  371         self.assertEqual(archivemail.options.days_old_max, 180)
  372 
  373     def testQuiet(self):
  374         """quiet should be off by default"""
  375         self.assertEqual(archivemail.options.quiet, False)
  376 
  377     def testDeleteOldMail(self):
  378         """we should not delete old mail by default"""
  379         self.assertEqual(archivemail.options.delete_old_mail, False)
  380 
  381     def testNoCompress(self):
  382         """no-compression should be off by default"""
  383         self.assertEqual(archivemail.options.no_compress, False)
  384 
  385     def testIncludeFlagged(self):
  386         """we should not archive flagged messages by default"""
  387         self.assertEqual(archivemail.options.include_flagged, False)
  388 
  389     def testPreserveUnread(self):
  390         """we should not preserve unread messages by default"""
  391         self.assertEqual(archivemail.options.preserve_unread, False)
  392 
  393 class TestOptionParser(unittest.TestCase):
  394     def setUp(self):
  395         self.oldopts = copy.copy(archivemail.options)
  396 
  397     def testOptionDate(self):
  398         """--date and -D options are parsed correctly"""
  399         date_formats = (
  400             "%Y-%m-%d",  # ISO format
  401             "%d %b %Y" , # Internet format
  402             "%d %B %Y" , # Internet format with full month names
  403         )
  404         date = time.strptime("2000-07-29", "%Y-%m-%d")
  405         unixdate = time.mktime(date)
  406         for df in date_formats:
  407             d = time.strftime(df, date)
  408             for opt in '-D', '--date=':
  409                 archivemail.options.date_old_max = None
  410                 archivemail.options.parse_args([opt+d], "")
  411                 self.assertEqual(unixdate, archivemail.options.date_old_max)
  412 
  413     def testOptionPreserveUnread(self):
  414         """--preserve-unread option is parsed correctly"""
  415         archivemail.options.parse_args(["--preserve-unread"], "")
  416         assert archivemail.options.preserve_unread
  417         archivemail.options.preserve_unread = False
  418         archivemail.options.parse_args(["-u"], "")
  419         assert archivemail.options.preserve_unread
  420 
  421     def testOptionSuffix(self):
  422         """--suffix and -s options are parsed correctly"""
  423         for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
  424             archivemail.options.parse_args(["--suffix="+suffix], "")
  425             self.assertEqual(archivemail.options.archive_suffix, suffix)
  426             archivemail.options.archive_suffix = None
  427             archivemail.options.parse_args(["-s", suffix], "")
  428             self.assertEqual(archivemail.options.archive_suffix, suffix)
  429 
  430     def testOptionPrefix(self):
  431         """--prefix and -p options are parsed correctly"""
  432         for prefix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
  433             archivemail.options.parse_args(["--prefix="+prefix], "")
  434             self.assertEqual(archivemail.options.archive_prefix, prefix)
  435             archivemail.options.archive_prefix = None
  436             archivemail.options.parse_args(["-p", prefix], "")
  437             self.assertEqual(archivemail.options.archive_prefix, prefix)
  438 
  439     def testOptionArchivename(self):
  440         """--archive-name and -a options are parsed correctly"""
  441         for name in ("custom", ".withdot", "custom_%Y", "%Y/joe"):
  442             archivemail.options.parse_args(["--archive-name="+name], "")
  443             self.assertEqual(archivemail.options.archive_name, name)
  444             archivemail.options.archive_name = None
  445             archivemail.options.parse_args(["-a", name], "")
  446             self.assertEqual(archivemail.options.archive_name, name)
  447 
  448     def testOptionDryrun(self):
  449         """--dry-run option is parsed correctly"""
  450         archivemail.options.parse_args(["--dry-run"], "")
  451         assert archivemail.options.dry_run
  452         archivemail.options.preserve_unread = False
  453         archivemail.options.parse_args(["-n"], "")
  454         assert archivemail.options.dry_run
  455 
  456     def testOptionDays(self):
  457         """--days and -d options are parsed correctly"""
  458         archivemail.options.parse_args(["--days=11"], "")
  459         self.assertEqual(archivemail.options.days_old_max, 11)
  460         archivemail.options.days_old_max = None
  461         archivemail.options.parse_args(["-d11"], "")
  462         self.assertEqual(archivemail.options.days_old_max, 11)
  463 
  464     def testOptionDelete(self):
  465         """--delete option is parsed correctly"""
  466         archivemail.options.parse_args(["--delete"], "")
  467         assert archivemail.options.delete_old_mail
  468 
  469     def testOptionCopy(self):
  470         """--copy option is parsed correctly"""
  471         archivemail.options.parse_args(["--copy"], "")
  472         assert archivemail.options.copy_old_mail
  473 
  474     def testOptionOutputdir(self):
  475         """--output-dir and -o options are parsed correctly"""
  476         for path in "/just/some/path", "relative/path":
  477             archivemail.options.parse_args(["--output-dir=%s" % path], "")
  478             self.assertEqual(archivemail.options.output_dir, path)
  479             archivemail.options.output_dir = None
  480             archivemail.options.parse_args(["-o%s" % path], "")
  481             self.assertEqual(archivemail.options.output_dir, path)
  482 
  483     def testOptionNocompress(self):
  484         """--no-compress option is parsed correctly"""
  485         archivemail.options.parse_args(["--no-compress"], "")
  486         assert archivemail.options.no_compress
  487 
  488     def testOptionSize(self):
  489         """--size and -S options are parsed correctly"""
  490         size = "666"
  491         archivemail.options.parse_args(["--size=%s" % size ], "")
  492         self.assertEqual(archivemail.options.min_size, int(size))
  493         archivemail.options.parse_args(["-S%s" % size ], "")
  494         self.assertEqual(archivemail.options.min_size, int(size))
  495 
  496     def tearDown(self):
  497         archivemail.options = self.oldopts
  498 
  499 ########## archivemail.is_older_than_days() unit testing #################
  500 
  501 class TestIsTooOld(unittest.TestCase):
  502     def testVeryOld(self):
  503         """with max_days=360, should be true for these dates > 1 year"""
  504         for years in range(1, 10):
  505             time_msg = time.time() - (years * 365 * 24 * 60 * 60)
  506             assert archivemail.is_older_than_days(time_message=time_msg,
  507                 max_days=360)
  508 
  509     def testOld(self):
  510         """with max_days=14, should be true for these dates > 14 days"""
  511         for days in range(14, 360):
  512             time_msg = time.time() - (days * 24 * 60 * 60)
  513             assert archivemail.is_older_than_days(time_message=time_msg,
  514                 max_days=14)
  515 
  516     def testJustOld(self):
  517         """with max_days=1, should be true for these dates >= 1 day"""
  518         for minutes in range(0, 61):
  519             time_msg = time.time() - (25 * 60 * 60) + (minutes * 60)
  520             assert archivemail.is_older_than_days(time_message=time_msg,
  521                 max_days=1)
  522 
  523     def testNotOld(self):
  524         """with max_days=9, should be false for these dates < 9 days"""
  525         for days in range(0, 9):
  526             time_msg = time.time() - (days * 24 * 60 * 60)
  527             assert not archivemail.is_older_than_days(time_message=time_msg,
  528                 max_days=9)
  529 
  530     def testJustNotOld(self):
  531         """with max_days=1, should be false for these hours <= 1 day"""
  532         for minutes in range(0, 60):
  533             time_msg = time.time() - (23 * 60 * 60) - (minutes * 60)
  534             assert not archivemail.is_older_than_days(time_message=time_msg,
  535                 max_days=1)
  536 
  537     def testFuture(self):
  538         """with max_days=1, should be false for times in the future"""
  539         for minutes in range(0, 60):
  540             time_msg = time.time() + (minutes * 60)
  541             assert not archivemail.is_older_than_days(time_message=time_msg,
  542                 max_days=1)
  543 
  544 ########## archivemail.parse_imap_url() unit testing #################
  545 
  546 class TestParseIMAPUrl(unittest.TestCase): 
  547     def setUp(self):
  548         archivemail.options.quiet = True
  549         archivemail.options.verbose = False
  550         archivemail.options.pwfile = None
  551         
  552     urls_withoutpass = [
  553             ('imap://user@example.org@imap.example.org/upperbox/lowerbox',
  554                 ('user', None, 'example.org@imap.example.org', 143,
  555                 'upperbox/lowerbox')), 
  556             ('imap://"user@example.org"@imap.example.org/upperbox/lowerbox',
  557                 ('user@example.org', None, 'imap.example.org', 143,
  558                 'upperbox/lowerbox')), 
  559             ('imap://user@example.org"@imap.example.org/upperbox/lowerbox',
  560                 ('user', None, 'example.org"@imap.example.org', 143,
  561                 'upperbox/lowerbox')), 
  562             ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox',
  563                 ('"user', None, 'example.org@imap.example.org', 993,
  564                 'upperbox/lowerbox')), 
  565             ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox',
  566                 ('us"er@example.org', None, 'imap.example.org', 993,
  567                 'upperbox/lowerbox')), 
  568             ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox',
  569                 ('user\\', None, 'example.org@imap.example.org', 993,
  570                 'upperbox/lowerbox'))
  571     ]
  572     urls_withpass = [
  573             ('imap://user@example.org:passwd@imap.example.org/upperbox/lowerbox',
  574                 ('user@example.org', 'passwd', 'imap.example.org', 143,
  575                 'upperbox/lowerbox')), 
  576             ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox',
  577                 ('"user@example.org', "passwd", 'imap.example.org', 993,
  578                 'upperbox/lowerbox')), 
  579             ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox', 
  580                 ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org', 993,
  581                 'upperbox/lowerbox'))
  582     ]
  583     # These are invalid when the password's not stripped. 
  584     urls_onlywithpass = [
  585             ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox',
  586                 ('user@example.org', "passwd", 'imap.example.org',
  587                 'upperbox/lowerbox'))
  588     ]
  589     def testUrlsWithoutPwfile(self):
  590         """Parse test urls with --pwfile option unset. This parses a password in
  591         the URL, if present."""
  592         archivemail.options.pwfile = None
  593         for mbstr in self.urls_withpass + self.urls_withoutpass:
  594             url = mbstr[0]
  595             result = archivemail.parse_imap_url(url)
  596             self.assertEqual(result, mbstr[1])
  597 
  598     def testUrlsWithPwfile(self):
  599         """Parse test urls with --pwfile set.  In this case the ':' character
  600         loses its meaning as a delimiter."""
  601         archivemail.options.pwfile = "whocares.txt"
  602         for mbstr in self.urls_onlywithpass: 
  603             url = mbstr[0]
  604             self.assertRaises(archivemail.UnexpectedError,
  605                     archivemail.parse_imap_url, url)
  606 
  607     def testUrlsDefaultPorts(self):
  608         """If an IMAP URL does not specify a server port, the standard ports
  609         are used."""
  610         archivemail.options.pwfile = "doesnotexist.txt"
  611         self.assertEqual(143, archivemail.parse_imap_url("imap://user@host/box")[3])
  612         self.assertEqual(993, archivemail.parse_imap_url("imaps://user@host/box")[3])
  613 
  614     def testUrlsWithPassAndPortnumber(self):
  615         """IMAP URLs with an embedded password and a server port number are
  616         correctly parsed."""
  617         self.assertEqual(1234, archivemail.parse_imap_url("imap://user:pass@host:1234/box")[3])
  618         self.assertEqual(1234, archivemail.parse_imap_url("imap://user:pass@host:1234/box")[3])
  619 
  620     def tearDown(self): 
  621         archivemail.options.quiet = False
  622         archivemail.options.verbose = False
  623         archivemail.options.pwfile = None
  624 
  625 ########## quoting and un-quoting of IMAP strings ##########
  626 
  627 class TestIMAPQuoting(unittest.TestCase):
  628     stringlist = (
  629             ('{braces} and space', '"{braces} and space"'),
  630             ('\\backslash', '"\\\\backslash"'),
  631             ('with "quotes" inbetween', '"with \\"quotes\\" inbetween"'),
  632             ('ending with "quotes"', '"ending with \\"quotes\\""'),
  633             ('\\"backslash before quote', '"\\\\\\"backslash before quote"')
  634     )
  635 
  636     def testQuote(self):
  637         for unquoted, quoted in self.stringlist:
  638             self.assertEqual(archivemail.imap_quote(unquoted), quoted)
  639 
  640     def testUnquote(self):
  641         for unquoted, quoted in self.stringlist:
  642             self.assertEqual(unquoted, archivemail.imap_unquote(quoted))
  643 
  644 
  645 ########## Modified UTF-7 support functions ##########
  646 
  647 class TestModUTF7(unittest.TestCase):
  648     goodpairs = (
  649             (u"A\N{NOT IDENTICAL TO}A.", "A&ImI-A."),
  650             (u"Hi Mom -\N{WHITE SMILING FACE}-!", "Hi Mom -&Jjo--!"),
  651             (u"~peter/mail/\u53f0\u5317/\u65e5\u672c\u8a9e",
  652                 "~peter/mail/&U,BTFw-/&ZeVnLIqe-")
  653     )
  654 
  655     def testEncode(self):
  656         """Ensure that encoding text in modified UTF-7 works properly."""
  657         for text, code in self.goodpairs:
  658             self.assertEqual(archivemail.mod_utf7_encode(text), code)
  659 
  660     def testDecode(self):
  661         """Ensure that decoding modified UTF-7 to text works properly."""
  662         for text, code in self.goodpairs:
  663             self.assertEqual(archivemail.mod_utf7_decode(code), text)
  664 
  665 
  666 ########## acceptance testing ###########
  667 
  668 class TestArchive(TestCaseInTempdir):
  669     """Base class defining helper functions for doing test archiving runs."""
  670     mbox = None         # mbox file that will be processed by archivemail
  671     good_archive = None # Uncompressed reference archive file to verify the
  672                         # archive after processing
  673     good_mbox = None    # Reference mbox file to verify the mbox after processing
  674 
  675     def verify(self):
  676         assert os.path.exists(self.mbox)
  677         if self.good_mbox is not None:
  678             assertEqualContent(self.mbox, self.good_mbox)
  679         else:
  680             self.assertEqual(os.path.getsize(self.mbox), 0)
  681         archive_name = self.mbox + "_archive"
  682         if not archivemail.options.no_compress:
  683             archive_name += ".gz"
  684             iszipped = True
  685         else:
  686             assert not os.path.exists(archive_name + ".gz")
  687             iszipped = False
  688         if self.good_archive is not None:
  689             assertEqualContent(archive_name, self.good_archive, iszipped)
  690         else:
  691             assert not os.path.exists(archive_name)
  692 
  693     def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
  694         """Prepare for a test run with an old mbox by making an old mbox,
  695         optionally an existing archive, and a reference archive to verify the
  696         archive after archivemail has run."""
  697         self.mbox = make_mbox(body, headers, 181*24, messages)
  698         archive_does_change = not (archivemail.options.dry_run or
  699                 archivemail.options.delete_old_mail)
  700         mbox_does_not_change = archivemail.options.dry_run or \
  701                 archivemail.options.copy_old_mail
  702         if make_old_archive:
  703             archive = archivemail.make_archive_name(self.mbox)
  704             self.good_archive = make_archive_and_plain_copy(archive)
  705             if archive_does_change:
  706                 append_file(self.mbox, self.good_archive)
  707         elif archive_does_change:
  708             self.good_archive = tempfile.mkstemp()[1]
  709             shutil.copyfile(self.mbox, self.good_archive)
  710         if mbox_does_not_change:
  711             if archive_does_change and not make_old_archive:
  712                 self.good_mbox = self.good_archive
  713             else:
  714                 self.good_mbox = tempfile.mkstemp()[1]
  715                 shutil.copyfile(self.mbox, self.good_mbox)
  716 
  717     def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
  718         """Prepare for a test run with a mixed mbox by making a mixed mbox,
  719         optionally an existing archive, a reference archive to verify the
  720         archive after archivemail has run, and likewise a reference mbox to
  721         verify the mbox."""
  722         self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive)
  723         new_mbox_name = make_mbox(body, headers, 179*24, messages)
  724         append_file(new_mbox_name, self.mbox)
  725         if self.good_mbox is None:
  726             self.good_mbox = new_mbox_name
  727         else:
  728             if self.good_mbox == self.good_archive:
  729                 self.good_mbox = tempfile.mkstemp()[1]
  730                 shutil.copyfile(self.mbox, self.good_mbox)
  731             else:
  732                 append_file(new_mbox_name, self.good_mbox)
  733 
  734     def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
  735         """Prepare for a test run with a new mbox by making a new mbox,
  736         optionally an exiting archive, and a reference mbox to verify the mbox
  737         after archivemail has run."""
  738         self.mbox = make_mbox(body, headers, 179*24, messages)
  739         self.good_mbox = tempfile.mkstemp()[1]
  740         shutil.copyfile(self.mbox, self.good_mbox)
  741         if make_old_archive:
  742             archive = archivemail.make_archive_name(self.mbox)
  743             self.good_archive = make_archive_and_plain_copy(archive)
  744 
  745 
  746 class TestArchiveMbox(TestArchive):
  747     """archiving should work based on the date of messages given"""
  748 
  749     def setUp(self):
  750         self.oldopts = copy.copy(archivemail.options)
  751         archivemail.options.quiet = True
  752         super(TestArchiveMbox, self).setUp()
  753  
  754     def testOld(self):
  755         """archiving an old mailbox"""
  756         self.make_old_mbox(messages=3)
  757         archivemail.archive(self.mbox)
  758         self.verify()
  759 
  760     def testOldFromInBody(self):
  761         """archiving an old mailbox with 'From ' in the body"""
  762         body = """This is a message with ^From at the start of a line
  763 From is on this line
  764 This is after the ^From line"""
  765         self.make_old_mbox(messages=3, body=body)
  766         archivemail.archive(self.mbox)
  767         self.verify()
  768 
  769     def testDateSystem(self):
  770         """test that the --date option works as expected"""
  771         test_headers = (
  772             {
  773                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
  774                 'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
  775             },
  776             {
  777                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
  778                 'Date' : None,
  779             },
  780             {
  781                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
  782                 'Date' : None,
  783                 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
  784             },
  785             {
  786                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
  787                 'Date' : None,
  788                 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
  789             },
  790         )
  791         for headers in test_headers:
  792             msg = make_message(default_headers=headers, wantobj=True)
  793             date = time.strptime("2000-07-29", "%Y-%m-%d")
  794             archivemail.options.date_old_max = time.mktime(date)
  795             assert archivemail.should_archive(msg)
  796             date = time.strptime("2000-07-27", "%Y-%m-%d")
  797             archivemail.options.date_old_max = time.mktime(date)
  798             assert not archivemail.should_archive(msg)
  799 
  800     def testMixed(self):
  801         """archiving a mixed mailbox"""
  802         self.make_mixed_mbox(messages=3)
  803         archivemail.archive(self.mbox)
  804         self.verify()
  805 
  806     def testNew(self):
  807         """archiving a new mailbox"""
  808         self.make_new_mbox(messages=3)
  809         archivemail.archive(self.mbox)
  810         self.verify()
  811 
  812     def testOldExisting(self):
  813         """archiving an old mailbox with an existing archive"""
  814         self.make_old_mbox(messages=3, make_old_archive=True)
  815         archivemail.archive(self.mbox)
  816         self.verify()
  817 
  818     def testOldWeirdHeaders(self):
  819         """archiving old mailboxes with weird headers"""
  820         weird_headers = (
  821             {   # we should archive because of the date on the 'From_' line
  822                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
  823                 'Date'  : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000',
  824             },
  825             {   # we should archive because of the date on the 'From_' line
  826                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
  827                 'Date'  : None,
  828             },
  829             {   # we should archive because of the date in 'Delivery-date'
  830                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
  831                 'Date'  : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
  832                 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
  833             },
  834             {   # we should archive because of the date in 'Delivery-date'
  835                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
  836                 'Date' : None,
  837                 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
  838             },
  839             {   # we should archive because of the date in 'Resent-Date'
  840                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
  841                 'Date'  : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
  842                 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
  843             },
  844             {   # we should archive because of the date in 'Resent-Date'
  845                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
  846                 'Date' : None,
  847                 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
  848             },
  849             {   # completely blank dates were crashing < version 0.4.7
  850                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
  851                 'Date'  : '',
  852             },
  853             {   # completely blank dates were crashing < version 0.4.7
  854                 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
  855                 'Date'  : '',
  856                 'Resent-Date'  : '',
  857             },
  858         )
  859         fd, self.mbox = tempfile.mkstemp()
  860         fp = os.fdopen(fd, "w")
  861         for headers in weird_headers:
  862             msg_text = make_message(default_headers=headers)
  863             fp.write(msg_text*2)
  864         fp.close()
  865         self.good_archive = tempfile.mkstemp()[1]
  866         shutil.copyfile(self.mbox, self.good_archive)
  867         archivemail.archive(self.mbox)
  868         self.verify()
  869 
  870     def tearDown(self):
  871         archivemail.options = self.oldopts
  872         super(TestArchiveMbox, self).tearDown()
  873 
  874 
  875 class TestArchiveMboxTimestamp(TestCaseInTempdir):
  876     """original mbox timestamps should always be preserved"""
  877     def setUp(self):
  878         super(TestArchiveMboxTimestamp, self).setUp() 
  879         archivemail.options.quiet = True
  880         self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180))
  881         self.mtime = os.path.getmtime(self.mbox_name) - 66
  882         self.atime = os.path.getatime(self.mbox_name) - 88
  883         os.utime(self.mbox_name, (self.atime, self.mtime))
  884 
  885     def testNew(self):
  886         """mbox timestamps should not change after no archival"""
  887         archivemail.options.days_old_max = 181
  888         archivemail.archive(self.mbox_name)
  889         self.verify()
  890 
  891     def testOld(self):
  892         """mbox timestamps should not change after archival"""
  893         archivemail.options.days_old_max = 179
  894         archivemail.archive(self.mbox_name)
  895         self.verify()
  896 
  897     def verify(self):
  898         assert os.path.exists(self.mbox_name)
  899         new_atime = os.path.getatime(self.mbox_name)
  900         new_mtime = os.path.getmtime(self.mbox_name)
  901         self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision)
  902         self.assertAlmostEqual(self.atime, new_atime, utimes_precision)
  903 
  904     def tearDown(self):
  905         archivemail.options.quiet = False
  906         archivemail.options.days_old_max = 180
  907         os.remove(self.mbox_name)
  908         super(TestArchiveMboxTimestamp, self).tearDown()
  909 
  910 
  911 class TestArchiveMboxAll(unittest.TestCase):
  912     def setUp(self):
  913         archivemail.options.quiet = True
  914         archivemail.options.archive_all = True
  915 
  916     def testNew(self):
  917         """new messages should be archived with --all"""
  918         self.msg = make_message(hours_old=24*179, wantobj=True)
  919         assert archivemail.should_archive(self.msg)
  920 
  921     def testOld(self):
  922         """old messages should be archived with --all"""
  923         self.msg = make_message(hours_old=24*181, wantobj=True)
  924         assert archivemail.should_archive(self.msg)
  925 
  926     def tearDown(self):
  927         archivemail.options.quiet = False
  928         archivemail.options.archive_all = False
  929 
  930 class TestArchiveMboxPreserveUnread(unittest.TestCase):
  931     """make sure the 'preserve_unread' option works"""
  932     def setUp(self):
  933         archivemail.options.quiet = True
  934         archivemail.options.preserve_unread = True
  935         self.msg = make_message(hours_old=24*181, wantobj=True)
  936 
  937     def testOldRead(self):
  938         """old read messages should be archived with --preserve-unread"""
  939         self.msg["Status"] = "RO"
  940         assert archivemail.should_archive(self.msg)
  941 
  942     def testOldUnread(self):
  943         """old unread messages should not be archived with --preserve-unread"""
  944         self.msg["Status"] = "O"
  945         assert not archivemail.should_archive(self.msg)
  946 
  947     def tearDown(self):
  948         archivemail.options.quiet = False
  949         archivemail.options.preserve_unread = False
  950 
  951 
  952 class TestArchiveMboxSuffix(unittest.TestCase):
  953     """make sure the 'suffix' option works"""
  954     def setUp(self):
  955         archivemail.options.quiet = True
  956 
  957     def testSuffix(self):
  958         """archiving with specified --suffix arguments"""
  959         for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
  960             mbox_name = "foobar"
  961             archivemail.options.archive_suffix = suffix
  962             days_old_max = 180
  963             parsed_suffix_time = time.time() - days_old_max*24*60*60
  964             parsed_suffix = time.strftime(suffix,
  965                 time.localtime(parsed_suffix_time))
  966             archive_name = mbox_name + parsed_suffix
  967             self.assertEqual(archive_name,
  968                     archivemail.make_archive_name(mbox_name))
  969 
  970     def tearDown(self):
  971         archivemail.options.quiet = False
  972         archivemail.options.archive_suffix = None
  973 
  974 class TestArchiveMboxPrefix(unittest.TestCase):
  975     """make sure the 'prefix' option works"""
  976     def setUp(self):
  977         archivemail.options.quiet = True
  978 
  979     def testPrefix(self):
  980         """archiving with specified --prefix arguments"""
  981         for archive_prefix in ("_static_", "_%B_%Y", "-%Y-%m-%d", "%Y/%m/"):
  982             archivemail.options.archive_prefix = archive_prefix
  983             for mbox_name in "foobar", "/tmp/foobar", "schnorchz/foobar":
  984                 archive_dir, archive_base = os.path.split(mbox_name)
  985                 days = archivemail.options.days_old_max
  986                 tm = time.localtime(time.time() - days*24*60*60)
  987                 prefix = time.strftime(archive_prefix, tm)
  988                 archive_name = os.path.join(archive_dir, prefix + archive_base)
  989                 self.assertEqual(archive_name,
  990                         archivemail.make_archive_name(mbox_name))
  991 
  992     def tearDown(self):
  993         archivemail.options.quiet = False
  994         archivemail.options.archive_prefix = None
  995 
  996 class TestArchiveName(unittest.TestCase):
  997     def setUp(self):
  998         archivemail.options.quiet = True
  999 
 1000     def testArchiveName(self):
 1001         """test the --archive-name option"""
 1002         archive_names = ("custom", ".withdot", "custom_%Y", "%Y/joe")
 1003         mbox = "foobar"
 1004         for name in archive_names:
 1005             archivemail.options.archive_name = name
 1006             days = archivemail.options.days_old_max
 1007             tm = time.localtime(time.time() - days*24*60*60)
 1008             name = time.strftime(name, tm)
 1009             self.assertEqual(archivemail.make_archive_name(mbox), name)
 1010 
 1011     def tearDown(self):
 1012         archivemail.options.quiet = False
 1013         archivemail.options.archive_name = None
 1014 
 1015 class TestArchiveAffixes(unittest.TestCase):
 1016     def setUp(self):
 1017         self.mbox = "harbsch"
 1018         self.archive_prefix = "wurbl+"
 1019         self.archive_suffix = "+schronk&borsz"
 1020         archivemail.options.quiet = True
 1021 
 1022     def testDefaultPrefix(self):
 1023         """if no archive name affix is specified, the default archive suffix is appended"""
 1024         self.assertEqual(archivemail.make_archive_name(self.mbox),
 1025                 self.mbox + archivemail.options.archive_default_suffix)
 1026 
 1027     def testPrefixKillsDefaultSuffix(self):
 1028         """if an archive name prefix is specified, the default archive suffix is not appended"""
 1029         archivemail.options.archive_prefix = self.archive_prefix
 1030         self.assertEqual(archivemail.make_archive_name(self.mbox),
 1031                 self.archive_prefix + self.mbox)
 1032 
 1033     def testPrefixAndSuffix(self):
 1034         """specifying both an archive name prefix and suffix works"""
 1035         archivemail.options.archive_prefix = self.archive_prefix
 1036         archivemail.options.archive_suffix = self.archive_suffix
 1037         self.assertEqual(archivemail.make_archive_name(self.mbox),
 1038                 self.archive_prefix + self.mbox + self.archive_suffix)
 1039 
 1040     def tearDown(self):
 1041         archivemail.options.archive_prefix = None
 1042         archivemail.options.archive_suffix = None
 1043         archivemail.options.quiet = False
 1044 
 1045 class TestArchiveHiddenMbox(unittest.TestCase):
 1046     def setUp(self):
 1047         archivemail.options.quiet = True
 1048         self.mbox = ".upper.lower"
 1049 
 1050     def testHiddenMbox(self):
 1051         """leading dots are stripped from the archive name when no prefix is added"""
 1052         self.assertEqual(archivemail.make_archive_name(self.mbox),
 1053                 self.mbox.lstrip('.') +
 1054                 archivemail.options.archive_default_suffix)
 1055 
 1056     def testHiddenMboxPrefixedArchive(self):
 1057         """no dots are stripped from the archive name when a prefix is added"""
 1058         prefix = ".hidden_"
 1059         archivemail.options.archive_prefix = prefix
 1060         self.assertEqual(archivemail.make_archive_name(self.mbox),
 1061                 prefix + self.mbox)
 1062 
 1063     def tearDown(self):
 1064         archivemail.options.quiet = False
 1065         archivemail.options.archive_prefix = None
 1066 
 1067 class TestArchiveDryRun(TestArchive):
 1068     """make sure the 'dry-run' option works"""
 1069     def setUp(self):
 1070         super(TestArchiveDryRun, self).setUp()
 1071         archivemail.options.quiet = True
 1072         archivemail.options.dry_run = True
 1073 
 1074     def testOld(self):
 1075         """archiving an old mailbox with the 'dry-run' option"""
 1076         self.make_old_mbox(messages=3)
 1077         archivemail.archive(self.mbox)
 1078         self.verify()
 1079 
 1080     def tearDown(self):
 1081         archivemail.options.dry_run = False
 1082         archivemail.options.quiet = False
 1083         super(TestArchiveDryRun, self).tearDown()
 1084 
 1085 
 1086 class TestArchiveDelete(TestArchive):
 1087     """make sure the 'delete' option works"""
 1088     def setUp(self):
 1089         super(TestArchiveDelete, self).setUp()
 1090         archivemail.options.quiet = True
 1091         archivemail.options.delete_old_mail = True
 1092 
 1093     def testNew(self):
 1094         """archiving a new mailbox with the 'delete' option"""
 1095         self.make_new_mbox(messages=3)
 1096         archivemail.archive(self.mbox)
 1097         self.verify()
 1098 
 1099     def testMixed(self):
 1100         """archiving a mixed mailbox with the 'delete' option"""
 1101         self.make_mixed_mbox(messages=3)
 1102         archivemail.archive(self.mbox)
 1103         self.verify()
 1104 
 1105     def testOld(self):
 1106         """archiving an old mailbox with the 'delete' option"""
 1107         self.make_old_mbox(messages=3)
 1108         archivemail.archive(self.mbox)
 1109         self.verify()
 1110 
 1111     def tearDown(self):
 1112         archivemail.options.delete_old_mail = False
 1113         archivemail.options.quiet = False
 1114         super(TestArchiveDelete, self).tearDown()
 1115 
 1116 
 1117 class TestArchiveCopy(TestArchive):
 1118     """make sure the 'copy' option works"""
 1119     def setUp(self):
 1120         super(TestArchiveCopy, self).setUp()
 1121         archivemail.options.quiet = True
 1122         archivemail.options.copy_old_mail = True
 1123 
 1124     def testNew(self):
 1125         """archiving a new mailbox with the 'copy' option"""
 1126         self.make_new_mbox(messages=3)
 1127         archivemail.archive(self.mbox)
 1128         self.verify()
 1129 
 1130     def testMixed(self):
 1131         """archiving a mixed mailbox with the 'copy' option"""
 1132         self.make_mixed_mbox(messages=3)
 1133         archivemail.archive(self.mbox)
 1134         self.verify()
 1135 
 1136     def testOld(self):
 1137         """archiving an old mailbox with the 'copy' option"""
 1138         self.make_old_mbox(messages=3)
 1139         archivemail.archive(self.mbox)
 1140         self.verify()
 1141 
 1142     def tearDown(self):
 1143         archivemail.options.copy_old_mail = False
 1144         archivemail.options.quiet = False
 1145         super(TestArchiveCopy, self).tearDown()
 1146 
 1147 
 1148 class TestArchiveMboxFlagged(unittest.TestCase):
 1149     """make sure the 'include_flagged' option works"""
 1150     def setUp(self):
 1151         archivemail.options.include_flagged = False
 1152         archivemail.options.quiet = True
 1153 
 1154     def testOld(self):
 1155         """by default, old flagged messages should not be archived"""
 1156         msg = make_message(default_headers={"X-Status": "F"},
 1157                 hours_old=24*181, wantobj=True)
 1158         assert not archivemail.should_archive(msg)
 1159 
 1160     def testIncludeFlaggedNew(self):
 1161         """new flagged messages should not be archived with include_flagged"""
 1162         msg = make_message(default_headers={"X-Status": "F"},
 1163                 hours_old=24*179, wantobj=True)
 1164         assert not archivemail.should_archive(msg)
 1165 
 1166     def testIncludeFlaggedOld(self):
 1167         """old flagged messages should be archived with include_flagged"""
 1168         archivemail.options.include_flagged = True
 1169         msg = make_message(default_headers={"X-Status": "F"},
 1170                 hours_old=24*181, wantobj=True)
 1171         assert archivemail.should_archive(msg)
 1172 
 1173     def tearDown(self):
 1174         archivemail.options.include_flagged = False
 1175         archivemail.options.quiet = False
 1176 
 1177 
 1178 class TestArchiveMboxOutputDir(unittest.TestCase):
 1179     """make sure that the 'output-dir' option works"""
 1180     def setUp(self):
 1181         archivemail.options.quiet = True
 1182 
 1183     def testOld(self):
 1184         """archiving an old mailbox with a sepecified output dir"""
 1185         for dir in "/just/a/path", "relative/path":
 1186             archivemail.options.output_dir = dir
 1187             archive_dir = archivemail.make_archive_name("/tmp/mbox")
 1188             self.assertEqual(dir, os.path.dirname(archive_dir))
 1189 
 1190     def tearDown(self):
 1191         archivemail.options.quiet = False
 1192         archivemail.options.output_dir = None
 1193 
 1194 
 1195 class TestArchiveMboxUncompressed(TestArchive):
 1196     """make sure that the 'no_compress' option works"""
 1197     mbox_name = None
 1198     new_mbox = None
 1199     old_mbox = None
 1200     copy_name = None
 1201 
 1202     def setUp(self):
 1203         archivemail.options.quiet = True
 1204         archivemail.options.no_compress = True
 1205         super(TestArchiveMboxUncompressed, self).setUp()
 1206 
 1207     def testOld(self):
 1208         """archiving an old mailbox uncompressed"""
 1209         self.make_old_mbox(messages=3)
 1210         archivemail.archive(self.mbox)
 1211         self.verify()
 1212 
 1213     def testNew(self):
 1214         """archiving a new mailbox uncompressed"""
 1215         self.make_new_mbox(messages=3)
 1216         archivemail.archive(self.mbox)
 1217         self.verify()
 1218 
 1219     def testMixed(self):
 1220         """archiving a mixed mailbox uncompressed"""
 1221         self.make_mixed_mbox(messages=3)
 1222         archivemail.archive(self.mbox)
 1223         self.verify()
 1224 
 1225     def testOldExists(self):
 1226         """archiving an old mailbox uncopressed with an existing archive"""
 1227         self.make_old_mbox(messages=3, make_old_archive=True)
 1228         archivemail.archive(self.mbox)
 1229         self.verify()
 1230 
 1231     def tearDown(self):
 1232         archivemail.options.quiet = False
 1233         archivemail.options.no_compress = False
 1234         super(TestArchiveMboxUncompressed, self).tearDown()
 1235 
 1236 
 1237 class TestArchiveSize(unittest.TestCase):
 1238     """check that the 'size' argument works"""
 1239     def setUp(self):
 1240         archivemail.options.quiet = True
 1241         msg_text = make_message(hours_old=24*181)
 1242         self.msg_size = len(msg_text)
 1243         fp = cStringIO.StringIO(msg_text)
 1244         self.msg = rfc822.Message(fp)
 1245 
 1246     def testSmaller(self):
 1247         """giving a size argument smaller than the message"""
 1248         archivemail.options.min_size = self.msg_size - 1
 1249         assert archivemail.should_archive(self.msg)
 1250 
 1251     def testBigger(self):
 1252         """giving a size argument bigger than the message"""
 1253         archivemail.options.min_size = self.msg_size + 1
 1254         assert not archivemail.should_archive(self.msg)
 1255 
 1256     def tearDown(self):
 1257         archivemail.options.quiet = False
 1258         archivemail.options.min_size = None
 1259 
 1260 
 1261 class TestXIMAPMessage(TestArchive):
 1262     """Test if IMAP pseudo messages in mboxes are properly handled."""
 1263     def setUp(self):
 1264         super(TestXIMAPMessage, self).setUp()
 1265         archivemail.options.quiet = True
 1266 
 1267     def testXIMAPMbox(self):
 1268         """IMAP pseudo messages in an mbox are always preserved."""
 1269         self.good_mbox = make_mbox(hours_old=181*24, headers={'X-IMAP': 'dummytext'},
 1270                 messages=1)
 1271         self.good_archive = make_mbox(hours_old=181*24, messages=3)
 1272         self.mbox = tempfile.mkstemp()[-1]
 1273         shutil.copyfile(self.good_mbox, self.mbox)
 1274         append_file(self.good_archive, self.mbox)
 1275         archivemail.archive(self.mbox)
 1276         self.verify()
 1277 
 1278     def tearDown(self):
 1279         super(TestXIMAPMessage, self).tearDown()
 1280         archivemail.options.quiet = False
 1281 
 1282 
 1283 ############# Test archiving maildirs ###############
 1284 
 1285 class TestArchiveMailboxdir(TestCaseInTempdir):
 1286     """Base class defining helper functions for doing test archive runs with
 1287     maildirs."""
 1288     maildir = None           # Maildir that will be processed by archivemail
 1289     orig_maildir_obj = None  # A backup copy of the maildir, a SimpleMaildir object
 1290     remaining_msg = set()    # Filenames of maildir messages that should be preserved
 1291     number_archived = 0      # Number of messages that get archived
 1292     orig_archive = None      # An uncompressed copy of a pre-existing archive,
 1293                              # if one exists
 1294 
 1295     def setUp(self):
 1296         super(TestArchiveMailboxdir, self).setUp()
 1297         self.orig_maildir_obj = SimpleMaildir()
 1298 
 1299     def verify(self):
 1300         self._verify_remaining()
 1301         self._verify_archive()
 1302 
 1303     def _verify_remaining(self):
 1304         """Verify that the preserved messages weren't altered."""
 1305         assert self.maildir
 1306         # Compare maildir with backup object.
 1307         dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root)
 1308         # Top-level has only directories cur, new, tmp and must be unchanged.
 1309         self.assertEqual(dcmp.left_list, dcmp.right_list)
 1310         found = set()
 1311         for d in dcmp.common_dirs:
 1312             dcmp2 = dcmp.subdirs[d]
 1313             # We need to verify three things.
 1314             # 1. directory is a subset of the original...
 1315             assert not dcmp2.left_only
 1316             # 2. all common files are identical...
 1317             self.assertEqual(dcmp2.common_files, dcmp2.same_files)
 1318             found = found.union([os.path.join(d, x) for x in dcmp2.common_files])
 1319         # 3. exactly the `new' messages (recorded in self.remaining_msg)
 1320         #    were preserved.
 1321         self.assertEqual(found, self.remaining_msg)
 1322 
 1323     def _verify_archive(self):
 1324         """Verify the archive correctness."""
 1325         # TODO: currently make_archive_name does not include the .gz suffix.
 1326         # Is this something that should be fixed?
 1327         archive = archivemail.make_archive_name(self.maildir)
 1328         if archivemail.options.no_compress:
 1329             iszipped = False
 1330         else:
 1331             archive += '.gz'
 1332             iszipped = True
 1333         if self.number_archived == 0:
 1334             if self.orig_archive:
 1335                 assertEqualContent(archive, self.orig_archive, iszipped)
 1336             else:
 1337                 assert not os.path.exists(archive)
 1338             return
 1339         fp_new = fp_archive = tmp_archive_name = None
 1340         try:
 1341             if self.orig_archive:
 1342                 new_size = os.path.getsize(archive)
 1343                 # Brute force: split archive in old and new part and verify the
 1344                 # parts separately.  (Of course this destroys the archive.)
 1345                 fp_archive = open(archive, "r+")
 1346                 fp_archive.seek(self.orig_archive_size)
 1347                 fd, tmp_archive_name = tempfile.mkstemp()
 1348                 fp_new = os.fdopen(fd, "w")
 1349                 shutil.copyfileobj(fp_archive, fp_new)
 1350                 fp_new.close()
 1351                 fp_archive.truncate(self.orig_archive_size)
 1352                 fp_archive.close()
 1353                 assertEqualContent(archive, self.orig_archive, iszipped)
 1354                 new_archive = tmp_archive_name
 1355             else:
 1356                 new_archive = archive
 1357             if archivemail.options.no_compress:
 1358                 fp_archive = open(new_archive, "r")
 1359             else:
 1360                 fp_archive = FixedGzipFile(new_archive, "r")
 1361             mb = mailbox.UnixMailbox(fp_archive)
 1362             found = 0
 1363             for msg in mb:
 1364                 self.verify_maildir_has_msg(self.orig_maildir_obj, msg)
 1365                 found += 1
 1366             self.assertEqual(found, self.number_archived)
 1367         finally:
 1368             if tmp_archive_name:
 1369                 os.remove(tmp_archive_name)
 1370             if fp_new is not None:
 1371                 fp_new.close()
 1372             if fp_archive is not None:
 1373                 fp_archive.close()
 1374 
 1375     def verify_maildir_has_msg(self, maildir, msg):
 1376         """Assert that the given maildir has a copy of the rfc822 message."""
 1377         mid = msg['Message-Id'] # Complains if there is no message-id
 1378         mdir_msg_str, mdir_flags = \
 1379             maildir.get_message_and_mbox_status(mid)
 1380         mbox_flags = set(msg.get('status', '') + msg.get('x-status', ''))
 1381         self.assertEqual(mdir_flags, mbox_flags)
 1382 
 1383         headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'),
 1384                 msg.headers)
 1385         headers = "".join(headers)
 1386         msg.rewindbody()
 1387         # Discard last mbox LF which is not part of the message.
 1388         body = msg.fp.read()[:-1]
 1389         msg_str = headers + os.linesep + body
 1390         self.assertEqual(mdir_msg_str, msg_str)
 1391 
 1392     def add_messages(self, body=None, headers=None, hours_old=0, messages=1):
 1393         for count in range(messages):
 1394             msg = make_message(body, default_headers=headers, mkfrom=False,
 1395                     hours_old=hours_old)
 1396             self.orig_maildir_obj.write(msg, new=False)
 1397 
 1398     def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1,
 1399             make_old_archive=False):
 1400         mailbox_does_change = not (archivemail.options.dry_run or
 1401                 archivemail.options.copy_old_mail)
 1402         archive_does_change = not (archivemail.options.dry_run or
 1403                 archivemail.options.delete_old_mail)
 1404         if mknew:
 1405             self.add_messages(body, headers, 179*24, messages)
 1406             if archive_does_change and archivemail.options.archive_all:
 1407                 self.number_archived += messages
 1408             if mailbox_does_change:
 1409                 self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
 1410         if mkold:
 1411             self.add_messages(body, headers, 181*24, messages)
 1412             if archive_does_change:
 1413                 self.number_archived += messages
 1414         if not mailbox_does_change:
 1415             self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
 1416         self.maildir = copy_maildir(self.orig_maildir_obj.root)
 1417         if make_old_archive:
 1418             archive = archivemail.make_archive_name(self.maildir)
 1419             self.orig_archive = make_archive_and_plain_copy(archive)
 1420             # FIXME: .gz extension handling is a mess II
 1421             if not archivemail.options.no_compress:
 1422                 archive += '.gz'
 1423             self.orig_archive_size = os.path.getsize(archive)
 1424 
 1425 class TestEmptyMaildir(TestCaseInTempdir):
 1426     def setUp(self):
 1427         super(TestEmptyMaildir, self).setUp()
 1428         archivemail.options.quiet = True
 1429 
 1430     def testEmpty(self):
 1431         """Archiving an empty maildir should not result in an archive."""
 1432         self.mdir = SimpleMaildir()
 1433         archivemail.archive(self.mdir.root)
 1434         assert not os.path.exists(self.mdir.root + '_archive.gz')
 1435 
 1436     def tearDown(self):
 1437         super(TestEmptyMaildir, self).tearDown()
 1438         archivemail.options.quiet = False
 1439 
 1440 class TestMaildir(TestArchiveMailboxdir):
 1441     def setUp(self):
 1442         super(TestMaildir, self).setUp()
 1443         archivemail.options.quiet = True
 1444 
 1445     def testOld(self):
 1446         self.make_maildir(True, False, messages=3)
 1447         archivemail.archive(self.maildir)
 1448         self.verify()
 1449 
 1450     def testNew(self):
 1451         self.make_maildir(False, True, messages=3)
 1452         archivemail.archive(self.maildir)
 1453         self.verify()
 1454 
 1455     def testMixed(self):
 1456         self.make_maildir(True, True, messages=3)
 1457         archivemail.archive(self.maildir)
 1458         self.verify()
 1459 
 1460     def testMixedExisting(self):
 1461         self.make_maildir(True, True, messages=3, make_old_archive=True)
 1462         archivemail.archive(self.maildir)
 1463         self.verify()
 1464 
 1465     def tearDown(self):
 1466         archivemail.options.quiet = False
 1467         super(TestMaildir, self).tearDown()
 1468 
 1469 
 1470 class TestMaildirPreserveUnread(TestCaseInTempdir):
 1471     """Test if the preserve_unread option works with maildirs."""
 1472     def setUp(self):
 1473         super(TestMaildirPreserveUnread, self).setUp()
 1474         archivemail.options.quiet = True
 1475         archivemail.options.preserve_unread = True
 1476 
 1477     def testOldRead(self):
 1478         """--preserve-unread archives old read messages in a maildir."""
 1479         smd = SimpleMaildir("orig")
 1480         msg = make_message(hours_old=24*181)
 1481         smd.write(msg, new=False, flags='S')
 1482         md = mailbox.Maildir(smd.root)
 1483         msg_obj = md.next()
 1484         assert archivemail.should_archive(msg_obj)
 1485 
 1486     def testOldUnread(self):
 1487         """--preserve-unread preserves old unread messages in a maildir."""
 1488         smd = SimpleMaildir("orig")
 1489         msg = make_message(hours_old=24*181)
 1490         smd.write(msg, new=False)
 1491         md = mailbox.Maildir(smd.root)
 1492         msg_obj = md.next()
 1493         assert not archivemail.should_archive(msg_obj)
 1494 
 1495     def tearDown(self):
 1496         archivemail.options.quiet = False
 1497         archivemail.options.preserve_unread = False
 1498         super(TestMaildirPreserveUnread, self).tearDown()
 1499 
 1500 class TestMaildirAll(TestArchiveMailboxdir):
 1501     def setUp(self):
 1502         super(TestMaildirAll, self).setUp()
 1503         archivemail.options.quiet = True
 1504         archivemail.options.archive_all = True
 1505 
 1506     def testNew(self):
 1507         """New maildir messages should be archived with --all"""
 1508         self.add_messages(hours_old=24*181)
 1509         md = mailbox.Maildir(self.orig_maildir_obj.root)
 1510         msg_obj = md.next()
 1511         assert archivemail.should_archive(msg_obj)
 1512 
 1513     def testOld(self):
 1514         """Old maildir messages should be archived with --all"""
 1515         self.add_messages(hours_old=24*179)
 1516         md = mailbox.Maildir(self.orig_maildir_obj.root)
 1517         msg_obj = md.next()
 1518         assert archivemail.should_archive(msg_obj)
 1519 
 1520     def tearDown(self):
 1521         super(TestMaildirAll, self).tearDown()
 1522         archivemail.options.quiet = False
 1523         archivemail.options.archive_all = False
 1524 
 1525 class TestMaildirDryRun(TestArchiveMailboxdir):
 1526     def setUp(self):
 1527         super(TestMaildirDryRun, self).setUp()
 1528         archivemail.options.quiet = True
 1529         archivemail.options.dry_run = True
 1530 
 1531     def testOld(self):
 1532         """archiving an old maildir mailbox with the 'dry-run' option"""
 1533         self.make_maildir(True, False)
 1534         archivemail.archive(self.maildir)
 1535         self.verify()
 1536 
 1537     def tearDown(self):
 1538         super(TestMaildirDryRun, self).tearDown()
 1539         archivemail.options.quiet = False
 1540         archivemail.options.dry_run = False
 1541 
 1542 class TestMaildirDelete(TestArchiveMailboxdir):
 1543     def setUp(self):
 1544         super(TestMaildirDelete, self).setUp()
 1545         archivemail.options.quiet = True
 1546         archivemail.options.delete_old_mail = True
 1547 
 1548     def testOld(self):
 1549         """archiving an old maildir mailbox with the 'delete' option"""
 1550         self.make_maildir(True, False)
 1551         archivemail.archive(self.maildir)
 1552         self.verify()
 1553 
 1554     def testNew(self):
 1555         """archiving a new maildir mailbox with the 'delete' option"""
 1556         self.make_maildir(False, True)
 1557         archivemail.archive(self.maildir)
 1558         self.verify()
 1559 
 1560     def tearDown(self):
 1561         super(TestMaildirDelete, self).tearDown()
 1562         archivemail.options.quiet = False
 1563         archivemail.options.delete_old_mail = False
 1564 
 1565 class TestMaildirCopy(TestArchiveMailboxdir):
 1566     def setUp(self):
 1567         super(TestMaildirCopy, self).setUp()
 1568         archivemail.options.quiet = True
 1569         archivemail.options.copy_old_mail = True
 1570 
 1571     def testOld(self):
 1572         """archiving an old maildir mailbox with the 'copy' option"""
 1573         self.make_maildir(True, False)
 1574         archivemail.archive(self.maildir)
 1575         self.verify()
 1576 
 1577     def testNew(self):
 1578         """archiving a new maildir mailbox with the 'copy' option"""
 1579         self.make_maildir(False, True)
 1580         archivemail.archive(self.maildir)
 1581         self.verify()
 1582 
 1583     def tearDown(self):
 1584         super(TestMaildirCopy, self).tearDown()
 1585         archivemail.options.quiet = False
 1586         archivemail.options.copy_old_mail = False
 1587 
 1588 class TestArchiveMaildirFlagged(TestCaseInTempdir):
 1589     """make sure the 'include_flagged' option works with maildir messages"""
 1590     def setUp(self):
 1591         super(TestArchiveMaildirFlagged, self).setUp()
 1592         archivemail.options.include_flagged = False
 1593         archivemail.options.quiet = True
 1594 
 1595     def testOld(self):
 1596         """by default, old flagged maildir messages should not be archived"""
 1597         smd = SimpleMaildir("orig")
 1598         msg = make_message(hours_old=24*181)
 1599         smd.write(msg, new=False, flags='F')
 1600         md = mailbox.Maildir(smd.root)
 1601         msg_obj = md.next()
 1602         assert not archivemail.should_archive(msg_obj)
 1603 
 1604     def testIncludeFlaggedNew(self):
 1605         """new flagged maildir messages should not be archived with include_flagged"""
 1606         smd = SimpleMaildir("orig")
 1607         msg = make_message(hours_old=24*179)
 1608         smd.write(msg, new=False, flags='F')
 1609         md = mailbox.Maildir(smd.root)
 1610         msg_obj = md.next()
 1611         assert not archivemail.should_archive(msg_obj)
 1612 
 1613     def testIncludeFlaggedOld(self):
 1614         """old flagged maildir messages should be archived with include_flagged"""
 1615         archivemail.options.include_flagged = True
 1616         smd = SimpleMaildir("orig")
 1617         msg = make_message(hours_old=24*181)
 1618         smd.write(msg, new=False, flags='F')
 1619         md = mailbox.Maildir(smd.root)
 1620         msg_obj = md.next()
 1621         assert archivemail.should_archive(msg_obj)
 1622 
 1623     def tearDown(self):
 1624         super(TestArchiveMaildirFlagged, self).tearDown()
 1625         archivemail.options.include_flagged = False
 1626         archivemail.options.quiet = False
 1627 
 1628 class TestArchiveMaildirSize(TestCaseInTempdir):
 1629     """check that the 'size' argument works with maildir messages"""
 1630     def setUp(self):
 1631         super(TestArchiveMaildirSize, self).setUp()
 1632         archivemail.options.quiet = True
 1633         msg = make_message(hours_old=24*181)
 1634         self.msg_size = len(msg)
 1635         smd = SimpleMaildir("orig")
 1636         smd.write(msg, new=False)
 1637         md = mailbox.Maildir(smd.root)
 1638         self.msg_obj = md.next()
 1639 
 1640     def testSmaller(self):
 1641         """giving a size argument smaller than the maildir message"""
 1642         archivemail.options.min_size = self.msg_size - 1
 1643         assert archivemail.should_archive(self.msg_obj)
 1644 
 1645     def testBigger(self):
 1646         """giving a size argument bigger than the maildir message"""
 1647         archivemail.options.min_size = self.msg_size + 1
 1648         assert not archivemail.should_archive(self.msg_obj)
 1649 
 1650     def tearDown(self):
 1651         super(TestArchiveMaildirSize, self).tearDown()
 1652         archivemail.options.quiet = False
 1653         archivemail.options.min_size = None
 1654 
 1655 ########## helper routines ############
 1656 
 1657 def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False):
 1658     headers = copy.copy(default_headers)
 1659     if not headers:
 1660         headers = {}
 1661     headers['Message-Id'] = make_msgid()
 1662     if not headers.has_key('Date'):
 1663         time_message = time.time() - (60 * 60 * hours_old)
 1664         headers['Date'] = time.asctime(time.localtime(time_message))
 1665     if not headers.has_key('From'):
 1666         headers['From'] = "sender@dummy.domain"        
 1667     if not headers.has_key('To'):
 1668         headers['To'] = "receipient@dummy.domain"        
 1669     if not headers.has_key('Subject'):
 1670         headers['Subject'] = "This is the subject"
 1671     if mkfrom and not headers.has_key('From_'):
 1672         headers['From_'] = "%s %s" % (headers['From'], headers['Date'])
 1673     if not body:
 1674         body = "This is the message body"
 1675 
 1676     msg = ""
 1677     if headers.has_key('From_'):
 1678         msg = msg + ("From %s\n" % headers['From_'])
 1679         del headers['From_']
 1680     for key in headers.keys():
 1681         if headers[key] is not None:
 1682             msg = msg + ("%s: %s\n" % (key, headers[key]))
 1683     msg = msg + "\n\n" + body + "\n\n"
 1684     if not wantobj:
 1685         return msg
 1686     fp = cStringIO.StringIO(msg)
 1687     return rfc822.Message(fp)
 1688 
 1689 def append_file(source, dest):
 1690     """appends the file named 'source' to the file named 'dest'"""
 1691     assert os.path.isfile(source)
 1692     assert os.path.isfile(dest)
 1693     read = open(source, "r")
 1694     write = open(dest, "a+")
 1695     shutil.copyfileobj(read,write)
 1696     read.close()
 1697     write.close()
 1698 
 1699 
 1700 def make_mbox(body=None, headers=None, hours_old=0, messages=1):
 1701     assert tempfile.tempdir
 1702     fd, name = tempfile.mkstemp()
 1703     file = os.fdopen(fd, "w")
 1704     for count in range(messages):
 1705         msg = make_message(body=body, default_headers=headers, 
 1706             mkfrom=True, hours_old=hours_old)
 1707         file.write(msg)
 1708     file.close()
 1709     return name
 1710 
 1711 def make_archive_and_plain_copy(archive_name):
 1712     """Make an mbox archive of the given name like archivemail may have
 1713     created it.  Also make an uncompressed copy of this archive and return its
 1714     name."""
 1715     copy_fd, copy_name = tempfile.mkstemp()
 1716     copy_fp = os.fdopen(copy_fd, "w")
 1717     if archivemail.options.no_compress:
 1718         fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
 1719         fp = os.fdopen(fd, "w")
 1720     else:
 1721         archive_name += ".gz"
 1722         fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
 1723         rawfp = os.fdopen(fd, "w")
 1724         fp = gzip.GzipFile(fileobj=rawfp)
 1725     for count in range(3):
 1726         msg = make_message(hours_old=24*360)
 1727         fp.write(msg)
 1728         copy_fp.write(msg)
 1729     fp.close()
 1730     copy_fp.close()
 1731     if not archivemail.options.no_compress:
 1732         rawfp.close()
 1733     return copy_name
 1734 
 1735 def copy_maildir(maildir, prefix="tmp"):
 1736     """Create a copy of the given maildir and return the absolute path of the
 1737     new direcory."""
 1738     newdir = tempfile.mkdtemp(prefix=prefix)
 1739     for d in "cur", "new", "tmp":
 1740         shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d))
 1741     return newdir
 1742 
 1743 def assertEqualContent(firstfile, secondfile, zippedfirst=False):
 1744     """Verify that the two files exist and have identical content. If zippedfirst
 1745     is True, assume that firstfile is gzip-compressed."""
 1746     assert os.path.exists(firstfile)
 1747     assert os.path.exists(secondfile)
 1748     if zippedfirst:
 1749         try:
 1750             fp1 = gzip.GzipFile(firstfile, "r")
 1751             fp2 = open(secondfile, "r")
 1752             assert cmp_fileobj(fp1, fp2)
 1753         finally:
 1754             fp1.close()
 1755             fp2.close()
 1756     else:
 1757         assert filecmp.cmp(firstfile, secondfile, shallow=0)
 1758 
 1759 def cmp_fileobj(fp1, fp2):
 1760     """Return if reading the fileobjects yields identical content."""
 1761     bufsize = 8192
 1762     while True:
 1763         b1 = fp1.read(bufsize)
 1764         b2 = fp2.read(bufsize)
 1765         if b1 != b2:
 1766             return False
 1767         if not b1:
 1768             return True
 1769 
 1770 if __name__ == "__main__":
 1771     unittest.main()