diff options
Diffstat (limited to 'test_archivemail')
-rwxr-xr-x | test_archivemail | 1593 |
1 files changed, 1593 insertions, 0 deletions
diff --git a/test_archivemail b/test_archivemail new file mode 100755 index 0000000..4a11734 --- /dev/null +++ b/test_archivemail @@ -0,0 +1,1593 @@ +#! /usr/bin/env python +############################################################################ +# Copyright (C) 2002 Paul Rodger <paul@paulrodger.com> +# (C) 2006-2010 Nikolaus Schulz <microschulz@web.de> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +############################################################################ +""" +Unit-test archivemail using 'PyUnit'. + +TODO: add tests for: + * dotlock locks already existing + * archiving MH-format mailboxes + * a 3rd party process changing the mbox file being read + +""" + +import sys + +def check_python_version(): + """Abort if we are running on python < v2.3""" + too_old_error = "This test script requires python version 2.3 or later. " + \ + "Your version of python is:\n%s" % sys.version + try: + version = sys.version_info # we might not even have this function! :) + if (version[0] < 2) or (version[0] == 2 and version[1] < 3): + print too_old_error + sys.exit(1) + except AttributeError: + print too_old_error + sys.exit(1) + +# define & run this early because 'unittest' requires Python >= 2.1 +check_python_version() + +import copy +import fcntl +import filecmp +import os +import re +import shutil +import stat +import tempfile +import time +import unittest +import gzip +import cStringIO +import rfc822 +import mailbox + +try: + import archivemail +except ImportError: + print "The archivemail script needs to be called 'archivemail.py'" + print "and should be in the current directory in order to be imported" + print "and tested. Sorry." + if os.path.isfile("archivemail"): + print "Try renaming it from 'archivemail' to 'archivemail.py'." + sys.exit(1) + +# We want to iterate over messages in a compressed archive mbox and verify +# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in +# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered +# by mailbox._PartialFile.seek(). The bug is still pending as of Python +# 2.5.2. To work around it, we subclass gzip.GzipFile. +# +# It should be noted that seeking backwards in a GzipFile is emulated by +# re-reading the entire file from the beginning, which is extremely +# inefficient and won't work with large files; but our test archives are all +# small, so it's okay. + +class FixedGzipFile(gzip.GzipFile): + """GzipFile with seek method accepting whence parameter.""" + def seek(self, offset, whence=0): + try: + gzip.GzipFile.seek(self, offset, whence) + except TypeError: + if whence: + if whence == 1: + offset = self.offset + offset + else: + raise ValueError('Seek from end not supported') + gzip.GzipFile.seek(self, offset) + +# precision of os.utime() when restoring mbox timestamps +utimes_precision = 5 + +class MessageIdFactory: + """Factory to create `uniqe' message-ids.""" + def __init__(self): + self.seq = 0 + def __call__(self): + self.seq += 1 + return "<archivemail%d@localhost>" % self.seq + +make_msgid = MessageIdFactory() + +class IndexedMailboxDir: + """An indexed mailbox directory, providing random message access by + message-id. Intended as a base class for a maildir and an mh subclass.""" + + def __init__(self, mdir_name): + assert tempfile.tempdir + self.root = tempfile.mkdtemp(prefix=mdir_name) + self.msg_id_dict = {} + self.deliveries = 0 + + def _add_to_index(self, msg_text, fpath): + """Add the given message to the index, for later random access.""" + # Extract the message-id as index key + msg_id = None + fp = cStringIO.StringIO(msg_text) + while True: + line = fp.readline() + # line empty means we didn't find a message-id + assert line + if line.lower().startswith("message-id:"): + msg_id = line.split(":", 1)[-1].strip() + assert msg_id + break + assert not self.msg_id_dict.has_key(msg_id) + self.msg_id_dict[msg_id] = fpath + + def get_all_filenames(self): + """Return all relative pathnames of files in this mailbox.""" + return self.msg_id_dict.values() + +class SimpleMaildir(IndexedMailboxDir): + """Primitive Maildir class, just good enough for generating short-lived + test maildirs.""" + + def __init__(self, mdir_name='maildir'): + IndexedMailboxDir.__init__(self, mdir_name) + for d in "cur", "tmp", "new": + os.mkdir(os.path.join(self.root, d)) + + def write(self, msg_str, new=True, flags=[]): + """Store a message with the given flags.""" + assert not (new and flags) + if new: + subdir = "new" + else: + subdir = "cur" + fname = self._mkname(new, flags) + relpath = os.path.join(subdir, fname) + path = os.path.join(self.root, relpath) + assert not os.path.exists(path) + f = open(path, "w") + f.write(msg_str) + f.close() + self._add_to_index(msg_str, relpath) + + def _mkname(self, new, flags): + """Generate a unique filename for a new message.""" + validflags = 'DFPRST' + for f in flags: + assert f in validflags + # This 'unique' name should be good enough, since nobody else + # will ever write messages to this maildir folder. + uniq = str(self.deliveries) + self.deliveries += 1 + if new: + return uniq + if not flags: + return uniq + ':2,' + finfo = "".join(sorted(flags)) + return uniq + ':2,' + finfo + + def get_message_and_mbox_status(self, msgid): + """For the Message-Id msgid, return the matching message in text + format and its status, expressed as a set of mbox flags.""" + fpath = self.msg_id_dict[msgid] # Barfs if not found + mdir_flags = fpath.rsplit('2,', 1)[-1] + flagmap = { + 'F': 'F', + 'R': 'A', + 'S': 'R' + } + mbox_flags = set([flagmap[x] for x in mdir_flags]) + if fpath.startswith("cur/"): + mbox_flags.add('O') + fp = open(os.path.join(self.root, fpath), "r") + msg = fp.read() + fp.close() + return msg, mbox_flags + + +class TestCaseInTempdir(unittest.TestCase): + """Base class for testcases that need to create temporary files. + All testcases that create temporary files should be derived from this + class, not directly from unittest.TestCase. + TestCaseInTempdir provides these methods: + + setUp() Creates a safe temporary directory and sets tempfile.tempdir. + + tearDown() Recursively removes the temporary directory and unsets + tempfile.tempdir. + + Overriding methods should call the ones above.""" + temproot = None + + def setUp(self): + if not self.temproot: + assert not tempfile.tempdir + self.temproot = tempfile.tempdir = \ + tempfile.mkdtemp(prefix="test-archivemail") + + def tearDown(self): + assert tempfile.tempdir == self.temproot + if self.temproot: + shutil.rmtree(self.temproot) + tempfile.tempdir = self.temproot = None + + +############ Mbox Class testing ############## + +class TestMboxDotlock(TestCaseInTempdir): + def setUp(self): + super(TestMboxDotlock, self).setUp() + self.mbox_name = make_mbox() + self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.mbox = archivemail.Mbox(self.mbox_name) + + def testDotlock(self): + """dotlock_lock/unlock should create/delete a lockfile""" + lock = self.mbox_name + ".lock" + self.mbox._dotlock_lock() + assert os.path.isfile(lock) + self.mbox._dotlock_unlock() + assert not os.path.isfile(lock) + + def testDotlockingSucceedsUponEACCES(self): + """A dotlock should silently be omitted upon EACCES.""" + archivemail.options.quiet = True + mbox_dir = os.path.dirname(self.mbox_name) + os.chmod(mbox_dir, 0500) + try: + self.mbox._dotlock_lock() + finally: + os.chmod(mbox_dir, 0700) + archivemail.options.quiet = False + +class TestMboxPosixLock(TestCaseInTempdir): + def setUp(self): + super(TestMboxPosixLock, self).setUp() + self.mbox_name = make_mbox() + self.mbox = archivemail.Mbox(self.mbox_name) + + def testPosixLock(self): + """posix_lock/unlock should create/delete an advisory lock""" + + # The following code snippet heavily lends from the Python 2.5 mailbox + # unittest. + # BEGIN robbery: + + # Fork off a subprocess that will lock the file for 2 seconds, + # unlock it, and then exit. + pid = os.fork() + if pid == 0: + # In the child, lock the mailbox. + self.mbox._posix_lock() + time.sleep(2) + self.mbox._posix_unlock() + os._exit(0) + + # In the parent, sleep a bit to give the child time to acquire + # the lock. + time.sleep(0.5) + # The parent's file self.mbox.mbox_file shares fcntl locks with the + # duplicated FD in the child; reopen it so we get a different file + # table entry. + file = open(self.mbox_name, "r+") + lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB + fd = file.fileno() + try: + self.assertRaises(IOError, fcntl.lockf, fd, lock_nb) + + finally: + # Wait for child to exit. Locking should now succeed. + exited_pid, status = os.waitpid(pid, 0) + + fcntl.lockf(fd, lock_nb) + fcntl.lockf(fd, fcntl.LOCK_UN) + # END robbery + + +class TestMboxNext(TestCaseInTempdir): + def setUp(self): + super(TestMboxNext, self).setUp() + self.not_empty_name = make_mbox(messages=18) + self.empty_name = make_mbox(messages=0) + + def testNextEmpty(self): + """mbox.next() should return None on an empty mailbox""" + mbox = archivemail.Mbox(self.empty_name) + msg = mbox.next() + self.assertEqual(msg, None) + + def testNextNotEmpty(self): + """mbox.next() should a message on a populated mailbox""" + mbox = archivemail.Mbox(self.not_empty_name) + for count in range(18): + msg = mbox.next() + assert msg + msg = mbox.next() + self.assertEqual(msg, None) + + +############ TempMbox Class testing ############## + +class TestTempMboxWrite(TestCaseInTempdir): + def setUp(self): + super(TestTempMboxWrite, self).setUp() + + def testWrite(self): + """mbox.write() should append messages to a mbox mailbox""" + read_file = make_mbox(messages=3) + mbox_read = archivemail.Mbox(read_file) + mbox_write = archivemail.TempMbox() + write_file = mbox_write.mbox_file_name + for count in range(3): + msg = mbox_read.next() + mbox_write.write(msg) + mbox_read.close() + mbox_write.close() + assert filecmp.cmp(read_file, write_file, shallow=0) + + def testWriteNone(self): + """calling mbox.write() with no message should raise AssertionError""" + write = archivemail.TempMbox() + self.assertRaises(AssertionError, write.write, None) + +class TestTempMboxRemove(TestCaseInTempdir): + def setUp(self): + super(TestTempMboxRemove, self).setUp() + self.mbox = archivemail.TempMbox() + self.mbox_name = self.mbox.mbox_file_name + + def testMboxRemove(self): + """remove() should delete a mbox mailbox""" + assert os.path.exists(self.mbox_name) + self.mbox.remove() + assert not os.path.exists(self.mbox_name) + + + +########## options class testing ################# + +class TestOptionDefaults(unittest.TestCase): + def testVerbose(self): + """verbose should be off by default""" + self.assertEqual(archivemail.options.verbose, False) + + def testDaysOldMax(self): + """default archival time should be 180 days""" + self.assertEqual(archivemail.options.days_old_max, 180) + + def testQuiet(self): + """quiet should be off by default""" + self.assertEqual(archivemail.options.quiet, False) + + def testDeleteOldMail(self): + """we should not delete old mail by default""" + self.assertEqual(archivemail.options.delete_old_mail, False) + + def testNoCompress(self): + """no-compression should be off by default""" + self.assertEqual(archivemail.options.no_compress, False) + + def testIncludeFlagged(self): + """we should not archive flagged messages by default""" + self.assertEqual(archivemail.options.include_flagged, False) + + def testPreserveUnread(self): + """we should not preserve unread messages by default""" + self.assertEqual(archivemail.options.preserve_unread, False) + +class TestOptionParser(unittest.TestCase): + def setUp(self): + self.oldopts = copy.copy(archivemail.options) + + def testOptionDate(self): + """--date and -D options are parsed correctly""" + date_formats = ( + "%Y-%m-%d", # ISO format + "%d %b %Y" , # Internet format + "%d %B %Y" , # Internet format with full month names + ) + date = time.strptime("2000-07-29", "%Y-%m-%d") + unixdate = time.mktime(date) + for df in date_formats: + d = time.strftime(df, date) + for opt in '-D', '--date=': + archivemail.options.date_old_max = None + archivemail.options.parse_args([opt+d], "") + self.assertEqual(unixdate, archivemail.options.date_old_max) + + def testOptionPreserveUnread(self): + """--preserve-unread option is parsed correctly""" + archivemail.options.parse_args(["--preserve-unread"], "") + assert archivemail.options.preserve_unread + archivemail.options.preserve_unread = False + archivemail.options.parse_args(["-u"], "") + assert archivemail.options.preserve_unread + + def testOptionSuffix(self): + """--suffix and -s options are parsed correctly""" + for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): + archivemail.options.parse_args(["--suffix="+suffix], "") + assert archivemail.options.archive_suffix == suffix + archivemail.options.suffix = None + archivemail.options.parse_args(["-s", suffix], "") + assert archivemail.options.archive_suffix == suffix + + def testOptionDryrun(self): + """--dry-run option is parsed correctly""" + archivemail.options.parse_args(["--dry-run"], "") + assert archivemail.options.dry_run + archivemail.options.preserve_unread = False + archivemail.options.parse_args(["-n"], "") + assert archivemail.options.dry_run + + def testOptionDays(self): + """--days and -d options are parsed correctly""" + archivemail.options.parse_args(["--days=11"], "") + self.assertEqual(archivemail.options.days_old_max, 11) + archivemail.options.days_old_max = None + archivemail.options.parse_args(["-d11"], "") + self.assertEqual(archivemail.options.days_old_max, 11) + + def testOptionDelete(self): + """--delete option is parsed correctly""" + archivemail.options.parse_args(["--delete"], "") + assert archivemail.options.delete_old_mail + + def testOptionCopy(self): + """--copy option is parsed correctly""" + archivemail.options.parse_args(["--copy"], "") + assert archivemail.options.copy_old_mail + + def testOptionOutputdir(self): + """--output-dir and -o options are parsed correctly""" + for path in "/just/some/path", "relative/path": + archivemail.options.parse_args(["--output-dir=%s" % path], "") + self.assertEqual(archivemail.options.output_dir, path) + archivemail.options.output_dir = None + archivemail.options.parse_args(["-o%s" % path], "") + self.assertEqual(archivemail.options.output_dir, path) + + def testOptionNocompress(self): + """--no-compress option is parsed correctly""" + archivemail.options.parse_args(["--no-compress"], "") + assert archivemail.options.no_compress + + def testOptionSize(self): + """--size and -S options are parsed correctly""" + size = "666" + archivemail.options.parse_args(["--size=%s" % size ], "") + self.assertEqual(archivemail.options.min_size, int(size)) + archivemail.options.parse_args(["-S%s" % size ], "") + self.assertEqual(archivemail.options.min_size, int(size)) + + def tearDown(self): + archivemail.options = self.oldopts + +########## archivemail.is_older_than_days() unit testing ################# + +class TestIsTooOld(unittest.TestCase): + def testVeryOld(self): + """with max_days=360, should be true for these dates > 1 year""" + for years in range(1, 10): + time_msg = time.time() - (years * 365 * 24 * 60 * 60) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=360) + + def testOld(self): + """with max_days=14, should be true for these dates > 14 days""" + for days in range(14, 360): + time_msg = time.time() - (days * 24 * 60 * 60) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=14) + + def testJustOld(self): + """with max_days=1, should be true for these dates >= 1 day""" + for minutes in range(0, 61): + time_msg = time.time() - (25 * 60 * 60) + (minutes * 60) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=1) + + def testNotOld(self): + """with max_days=9, should be false for these dates < 9 days""" + for days in range(0, 9): + time_msg = time.time() - (days * 24 * 60 * 60) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=9) + + def testJustNotOld(self): + """with max_days=1, should be false for these hours <= 1 day""" + for minutes in range(0, 60): + time_msg = time.time() - (23 * 60 * 60) - (minutes * 60) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=1) + + def testFuture(self): + """with max_days=1, should be false for times in the future""" + for minutes in range(0, 60): + time_msg = time.time() + (minutes * 60) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=1) + +########## archivemail.parse_imap_url() unit testing ################# + +class TestParseIMAPUrl(unittest.TestCase): + def setUp(self): + archivemail.options.quiet = True + archivemail.options.verbose = False + archivemail.options.pwfile = None + + urls_withoutpass = [ + ('imaps://user@example.org@imap.example.org/upperbox/lowerbox', + ('user', None, 'example.org@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://"user@example.org"@imap.example.org/upperbox/lowerbox', + ('user@example.org', None, 'imap.example.org', + 'upperbox/lowerbox')), + ('imaps://user@example.org"@imap.example.org/upperbox/lowerbox', + ('user', None, 'example.org"@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox', + ('"user', None, 'example.org@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox', + ('us"er@example.org', None, 'imap.example.org', + 'upperbox/lowerbox')), + ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox', + ('user\\', None, 'example.org@imap.example.org', + 'upperbox/lowerbox')) + ] + urls_withpass = [ + ('imaps://user@example.org:passwd@imap.example.org/upperbox/lowerbox', + ('user@example.org', 'passwd', 'imap.example.org', + 'upperbox/lowerbox'), + ('user', None, 'example.org:passwd@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox', + ('"user@example.org', "passwd", 'imap.example.org', + 'upperbox/lowerbox'), + ('"user', None, 'example.org:passwd@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox', + ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org', + 'upperbox/lowerbox'), + ('u\\ser\\', None, 'example.org:"p@sswd"@imap.example.org', + 'upperbox/lowerbox')) + ] + # These are invalid when the password's not stripped. + urls_onlywithpass = [ + ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox', + ('user@example.org', "passwd", 'imap.example.org', + 'upperbox/lowerbox')) + ] + def testUrlsWithoutPwfile(self): + """Parse test urls with --pwfile option unset. This parses a password in + the URL, if present.""" + archivemail.options.pwfile = None + for mbstr in self.urls_withpass + self.urls_withoutpass: + url = mbstr[0][mbstr[0].find('://')+3:] + result = archivemail.parse_imap_url(url) + self.assertEqual(result, mbstr[1]) + + def testUrlsWithPwfile(self): + """Parse test urls with --pwfile set. In this case the ':' character + loses its meaning as a delimiter.""" + archivemail.options.pwfile = "whocares.txt" + for mbstr in self.urls_withpass: + url = mbstr[0][mbstr[0].find('://')+3:] + result = archivemail.parse_imap_url(url) + self.assertEqual(result, mbstr[2]) + for mbstr in self.urls_onlywithpass: + url = mbstr[0][mbstr[0].find('://')+3:] + self.assertRaises(archivemail.UnexpectedError, + archivemail.parse_imap_url, url) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.verbose = False + archivemail.options.pwfile = None + +########## acceptance testing ########### + +class TestArchive(TestCaseInTempdir): + """Base class defining helper functions for doing test archiving runs.""" + mbox = None # mbox file that will be processed by archivemail + good_archive = None # Uncompressed reference archive file to verify the + # archive after processing + good_mbox = None # Reference mbox file to verify the mbox after processing + + def verify(self): + assert os.path.exists(self.mbox) + if self.good_mbox is not None: + assertEqualContent(self.mbox, self.good_mbox) + else: + self.assertEqual(os.path.getsize(self.mbox), 0) + archive_name = self.mbox + "_archive" + if not archivemail.options.no_compress: + archive_name += ".gz" + iszipped = True + else: + assert not os.path.exists(archive_name + ".gz") + iszipped = False + if self.good_archive is not None: + assertEqualContent(archive_name, self.good_archive, iszipped) + else: + assert not os.path.exists(archive_name) + + def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): + """Prepare for a test run with an old mbox by making an old mbox, + optionally an existing archive, and a reference archive to verify the + archive after archivemail has run.""" + self.mbox = make_mbox(body, headers, 181*24, messages) + archive_does_change = not (archivemail.options.dry_run or + archivemail.options.delete_old_mail) + mbox_does_not_change = archivemail.options.dry_run or \ + archivemail.options.copy_old_mail + if make_old_archive: + archive = archivemail.make_archive_name(self.mbox) + self.good_archive = make_archive_and_plain_copy(archive) + if archive_does_change: + append_file(self.mbox, self.good_archive) + elif archive_does_change: + self.good_archive = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_archive) + if mbox_does_not_change: + if archive_does_change and not make_old_archive: + self.good_mbox = self.good_archive + else: + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + + def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): + """Prepare for a test run with a mixed mbox by making a mixed mbox, + optionally an existing archive, a reference archive to verify the + archive after archivemail has run, and likewise a reference mbox to + verify the mbox.""" + self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive) + new_mbox_name = make_mbox(body, headers, 179*24, messages) + append_file(new_mbox_name, self.mbox) + if self.good_mbox is None: + self.good_mbox = new_mbox_name + else: + if self.good_mbox == self.good_archive: + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + else: + append_file(new_mbox_name, self.good_mbox) + + def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): + """Prepare for a test run with a new mbox by making a new mbox, + optionally an exiting archive, and a reference mbox to verify the mbox + after archivemail has run.""" + self.mbox = make_mbox(body, headers, 179*24, messages) + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + if make_old_archive: + archive = archivemail.make_archive_name(self.mbox) + self.good_archive = make_archive_and_plain_copy(archive) + + +class TestArchiveMbox(TestArchive): + """archiving should work based on the date of messages given""" + + def setUp(self): + self.oldopts = copy.copy(archivemail.options) + archivemail.options.quiet = True + super(TestArchiveMbox, self).setUp() + + def testOld(self): + """archiving an old mailbox""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOldFromInBody(self): + """archiving an old mailbox with 'From ' in the body""" + body = """This is a message with ^From at the start of a line +From is on this line +This is after the ^From line""" + self.make_old_mbox(messages=3, body=body) + archivemail.archive(self.mbox) + self.verify() + + def testDateSystem(self): + """test that the --date option works as expected""" + test_headers = ( + { + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : None, + }, + { + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : None, + 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : None, + 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + ) + for headers in test_headers: + msg = make_message(default_headers=headers, wantobj=True) + date = time.strptime("2000-07-29", "%Y-%m-%d") + archivemail.options.date_old_max = time.mktime(date) + assert archivemail.should_archive(msg) + date = time.strptime("2000-07-27", "%Y-%m-%d") + archivemail.options.date_old_max = time.mktime(date) + assert not archivemail.should_archive(msg) + + def testMixed(self): + """archiving a mixed mailbox""" + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testNew(self): + """archiving a new mailbox""" + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOldExisting(self): + """archiving an old mailbox with an existing archive""" + self.make_old_mbox(messages=3, make_old_archive=True) + archivemail.archive(self.mbox) + self.verify() + + def testOldWeirdHeaders(self): + """archiving old mailboxes with weird headers""" + weird_headers = ( + { # we should archive because of the date on the 'From_' line + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000', + }, + { # we should archive because of the date on the 'From_' line + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : None, + }, + { # we should archive because of the date in 'Delivery-date' + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', + 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { # we should archive because of the date in 'Delivery-date' + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : None, + 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { # we should archive because of the date in 'Resent-Date' + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', + 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { # we should archive because of the date in 'Resent-Date' + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : None, + 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { # completely blank dates were crashing < version 0.4.7 + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : '', + }, + { # completely blank dates were crashing < version 0.4.7 + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : '', + 'Resent-Date' : '', + }, + ) + fd, self.mbox = tempfile.mkstemp() + fp = os.fdopen(fd, "w") + for headers in weird_headers: + msg_text = make_message(default_headers=headers) + fp.write(msg_text*2) + fp.close() + self.good_archive = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_archive) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options = self.oldopts + super(TestArchiveMbox, self).tearDown() + + +class TestArchiveMboxTimestamp(TestCaseInTempdir): + """original mbox timestamps should always be preserved""" + def setUp(self): + super(TestArchiveMboxTimestamp, self).setUp() + archivemail.options.quiet = True + self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180)) + self.mtime = os.path.getmtime(self.mbox_name) - 66 + self.atime = os.path.getatime(self.mbox_name) - 88 + os.utime(self.mbox_name, (self.atime, self.mtime)) + + def testNew(self): + """mbox timestamps should not change after no archival""" + archivemail.options.days_old_max = 181 + archivemail.archive(self.mbox_name) + self.verify() + + def testOld(self): + """mbox timestamps should not change after archival""" + archivemail.options.days_old_max = 179 + archivemail.archive(self.mbox_name) + self.verify() + + def verify(self): + assert os.path.exists(self.mbox_name) + new_atime = os.path.getatime(self.mbox_name) + new_mtime = os.path.getmtime(self.mbox_name) + self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision) + self.assertAlmostEqual(self.atime, new_atime, utimes_precision) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.days_old_max = 180 + os.remove(self.mbox_name) + super(TestArchiveMboxTimestamp, self).tearDown() + + +class TestArchiveMboxAll(unittest.TestCase): + def setUp(self): + archivemail.options.quiet = True + archivemail.options.archive_all = True + + def testNew(self): + """new messages should be archived with --all""" + self.msg = make_message(hours_old=24*179, wantobj=True) + assert archivemail.should_archive(self.msg) + + def testOld(self): + """old messages should be archived with --all""" + self.msg = make_message(hours_old=24*181, wantobj=True) + assert archivemail.should_archive(self.msg) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.archive_all = False + +class TestArchiveMboxPreserveUnread(unittest.TestCase): + """make sure the 'preserve_unread' option works""" + def setUp(self): + archivemail.options.quiet = True + archivemail.options.preserve_unread = True + self.msg = make_message(hours_old=24*181, wantobj=True) + + def testOldRead(self): + """old read messages should be archived with --preserve-unread""" + self.msg["Status"] = "RO" + assert archivemail.should_archive(self.msg) + + def testOldUnread(self): + """old unread messages should not be archived with --preserve-unread""" + self.msg["Status"] = "O" + assert not archivemail.should_archive(self.msg) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.preserve_unread = False + + +class TestArchiveMboxSuffix(unittest.TestCase): + """make sure the 'suffix' option works""" + def setUp(self): + self.old_suffix = archivemail.options.archive_suffix + archivemail.options.quiet = True + + def testSuffix(self): + """archiving with specified --suffix arguments""" + for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): + mbox_name = "foobar" + archivemail.options.archive_suffix = suffix + days_old_max = 180 + parsed_suffix_time = time.time() - days_old_max*24*60*60 + parsed_suffix = time.strftime(suffix, + time.localtime(parsed_suffix_time)) + archive_name = mbox_name + parsed_suffix + self.assertEqual(archive_name, + archivemail.make_archive_name(mbox_name)) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.archive_suffix = self.old_suffix + + +class TestArchiveDryRun(TestArchive): + """make sure the 'dry-run' option works""" + def setUp(self): + super(TestArchiveDryRun, self).setUp() + archivemail.options.quiet = True + archivemail.options.dry_run = True + + def testOld(self): + """archiving an old mailbox with the 'dry-run' option""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options.dry_run = False + archivemail.options.quiet = False + super(TestArchiveDryRun, self).tearDown() + + +class TestArchiveDelete(TestArchive): + """make sure the 'delete' option works""" + def setUp(self): + super(TestArchiveDelete, self).setUp() + archivemail.options.quiet = True + archivemail.options.delete_old_mail = True + + def testNew(self): + """archiving a new mailbox with the 'delete' option""" + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testMixed(self): + """archiving a mixed mailbox with the 'delete' option""" + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOld(self): + """archiving an old mailbox with the 'delete' option""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options.delete_old_mail = False + archivemail.options.quiet = False + super(TestArchiveDelete, self).tearDown() + + +class TestArchiveCopy(TestArchive): + """make sure the 'copy' option works""" + def setUp(self): + super(TestArchiveCopy, self).setUp() + archivemail.options.quiet = True + archivemail.options.copy_old_mail = True + + def testNew(self): + """archiving a new mailbox with the 'copy' option""" + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testMixed(self): + """archiving a mixed mailbox with the 'copy' option""" + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOld(self): + """archiving an old mailbox with the 'copy' option""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options.copy_old_mail = False + archivemail.options.quiet = False + super(TestArchiveCopy, self).tearDown() + + +class TestArchiveMboxFlagged(unittest.TestCase): + """make sure the 'include_flagged' option works""" + def setUp(self): + archivemail.options.include_flagged = False + archivemail.options.quiet = True + + def testOld(self): + """by default, old flagged messages should not be archived""" + msg = make_message(default_headers={"X-Status": "F"}, + hours_old=24*181, wantobj=True) + assert not archivemail.should_archive(msg) + + def testIncludeFlaggedNew(self): + """new flagged messages should not be archived with include_flagged""" + msg = make_message(default_headers={"X-Status": "F"}, + hours_old=24*179, wantobj=True) + assert not archivemail.should_archive(msg) + + def testIncludeFlaggedOld(self): + """old flagged messages should be archived with include_flagged""" + archivemail.options.include_flagged = True + msg = make_message(default_headers={"X-Status": "F"}, + hours_old=24*181, wantobj=True) + assert archivemail.should_archive(msg) + + def tearDown(self): + archivemail.options.include_flagged = False + archivemail.options.quiet = False + + +class TestArchiveMboxOutputDir(unittest.TestCase): + """make sure that the 'output-dir' option works""" + def setUp(self): + archivemail.options.quiet = True + + def testOld(self): + """archiving an old mailbox with a sepecified output dir""" + for dir in "/just/a/path", "relative/path": + archivemail.options.output_dir = dir + archive_dir = archivemail.make_archive_name("/tmp/mbox") + self.assertEqual(dir, os.path.dirname(archive_dir)) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.output_dir = None + + +class TestArchiveMboxUncompressed(TestArchive): + """make sure that the 'no_compress' option works""" + mbox_name = None + new_mbox = None + old_mbox = None + copy_name = None + + def setUp(self): + archivemail.options.quiet = True + archivemail.options.no_compress = True + super(TestArchiveMboxUncompressed, self).setUp() + + def testOld(self): + """archiving an old mailbox uncompressed""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testNew(self): + """archiving a new mailbox uncompressed""" + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testMixed(self): + """archiving a mixed mailbox uncompressed""" + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOldExists(self): + """archiving an old mailbox uncopressed with an existing archive""" + self.make_old_mbox(messages=3, make_old_archive=True) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.no_compress = False + super(TestArchiveMboxUncompressed, self).tearDown() + + +class TestArchiveSize(unittest.TestCase): + """check that the 'size' argument works""" + def setUp(self): + archivemail.options.quiet = True + msg_text = make_message(hours_old=24*181) + self.msg_size = len(msg_text) + fp = cStringIO.StringIO(msg_text) + self.msg = rfc822.Message(fp) + + def testSmaller(self): + """giving a size argument smaller than the message""" + archivemail.options.min_size = self.msg_size - 1 + assert archivemail.should_archive(self.msg) + + def testBigger(self): + """giving a size argument bigger than the message""" + archivemail.options.min_size = self.msg_size + 1 + assert not archivemail.should_archive(self.msg) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.min_size = None + + +############# Test archiving maildirs ############### + +class TestArchiveMailboxdir(TestCaseInTempdir): + """Base class defining helper functions for doing test archive runs with + maildirs.""" + maildir = None # Maildir that will be processed by archivemail + orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object + remaining_msg = set() # Filenames of maildir messages that should be preserved + number_archived = 0 # Number of messages that get archived + orig_archive = None # An uncompressed copy of a pre-existing archive, + # if one exists + + def setUp(self): + super(TestArchiveMailboxdir, self).setUp() + self.orig_maildir_obj = SimpleMaildir() + + def verify(self): + self._verify_remaining() + self._verify_archive() + + def _verify_remaining(self): + """Verify that the preserved messages weren't altered.""" + assert self.maildir + # Compare maildir with backup object. + dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root) + # Top-level has only directories cur, new, tmp and must be unchanged. + self.assertEqual(dcmp.left_list, dcmp.right_list) + found = set() + for d in dcmp.common_dirs: + dcmp2 = dcmp.subdirs[d] + # We need to verify three things. + # 1. directory is a subset of the original... + assert not dcmp2.left_only + # 2. all common files are identical... + self.assertEqual(dcmp2.common_files, dcmp2.same_files) + found = found.union([os.path.join(d, x) for x in dcmp2.common_files]) + # 3. exactly the `new' messages (recorded in self.remaining_msg) + # were preserved. + self.assertEqual(found, self.remaining_msg) + + def _verify_archive(self): + """Verify the archive correctness.""" + # TODO: currently make_archive_name does not include the .gz suffix. + # Is this something that should be fixed? + archive = archivemail.make_archive_name(self.maildir) + if archivemail.options.no_compress: + iszipped = False + else: + archive += '.gz' + iszipped = True + if self.number_archived == 0: + if self.orig_archive: + assertEqualContent(archive, self.orig_archive, iszipped) + else: + assert not os.path.exists(archive) + return + fp_new = fp_archive = tmp_archive_name = None + try: + if self.orig_archive: + new_size = os.path.getsize(archive) + # Brute force: split archive in old and new part and verify the + # parts separately. (Of course this destroys the archive.) + fp_archive = open(archive, "r+") + fp_archive.seek(self.orig_archive_size) + fd, tmp_archive_name = tempfile.mkstemp() + fp_new = os.fdopen(fd, "w") + shutil.copyfileobj(fp_archive, fp_new) + fp_new.close() + fp_archive.truncate(self.orig_archive_size) + fp_archive.close() + assertEqualContent(archive, self.orig_archive, iszipped) + new_archive = tmp_archive_name + else: + new_archive = archive + if archivemail.options.no_compress: + fp_archive = open(new_archive, "r") + else: + fp_archive = FixedGzipFile(new_archive, "r") + mb = mailbox.UnixMailbox(fp_archive) + found = 0 + for msg in mb: + self.verify_maildir_has_msg(self.orig_maildir_obj, msg) + found += 1 + self.assertEqual(found, self.number_archived) + finally: + if tmp_archive_name: + os.remove(tmp_archive_name) + if fp_new is not None: + fp_new.close() + if fp_archive is not None: + fp_archive.close() + + def verify_maildir_has_msg(self, maildir, msg): + """Assert that the given maildir has a copy of the rfc822 message.""" + mid = msg['Message-Id'] # Complains if there is no message-id + mdir_msg_str, mdir_flags = \ + maildir.get_message_and_mbox_status(mid) + mbox_flags = set(msg.get('status', '') + msg.get('x-status', '')) + self.assertEqual(mdir_flags, mbox_flags) + + headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'), + msg.headers) + headers = "".join(headers) + msg.rewindbody() + # Discard last mbox LF which is not part of the message. + body = msg.fp.read()[:-1] + msg_str = headers + os.linesep + body + self.assertEqual(mdir_msg_str, msg_str) + + def add_messages(self, body=None, headers=None, hours_old=0, messages=1): + for count in range(messages): + msg = make_message(body, default_headers=headers, mkfrom=False, + hours_old=hours_old) + self.orig_maildir_obj.write(msg, new=False) + + def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1, + make_old_archive=False): + mailbox_does_change = not (archivemail.options.dry_run or + archivemail.options.copy_old_mail) + archive_does_change = not (archivemail.options.dry_run or + archivemail.options.delete_old_mail) + if mknew: + self.add_messages(body, headers, 179*24, messages) + if archive_does_change and archivemail.options.archive_all: + self.number_archived += messages + if mailbox_does_change: + self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) + if mkold: + self.add_messages(body, headers, 181*24, messages) + if archive_does_change: + self.number_archived += messages + if not mailbox_does_change: + self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) + self.maildir = copy_maildir(self.orig_maildir_obj.root) + if make_old_archive: + archive = archivemail.make_archive_name(self.maildir) + self.orig_archive = make_archive_and_plain_copy(archive) + # FIXME: .gz extension handling is a mess II + if not archivemail.options.no_compress: + archive += '.gz' + self.orig_archive_size = os.path.getsize(archive) + +class TestEmptyMaildir(TestCaseInTempdir): + def setUp(self): + super(TestEmptyMaildir, self).setUp() + archivemail.options.quiet = True + + def testEmpty(self): + """Archiving an empty maildir should not result in an archive.""" + self.mdir = SimpleMaildir() + archivemail.archive(self.mdir.root) + assert not os.path.exists(self.mdir.root + '_archive.gz') + + def tearDown(self): + super(TestEmptyMaildir, self).tearDown() + archivemail.options.quiet = False + +class TestMaildir(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildir, self).setUp() + archivemail.options.quiet = True + + def testOld(self): + self.make_maildir(True, False, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testNew(self): + self.make_maildir(False, True, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testMixed(self): + self.make_maildir(True, True, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testMixedExisting(self): + self.make_maildir(True, True, messages=3, make_old_archive=True) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + archivemail.options.quiet = False + super(TestMaildir, self).tearDown() + + +class TestMaildirPreserveUnread(TestCaseInTempdir): + """Test if the preserve_unread option works with maildirs.""" + def setUp(self): + super(TestMaildirPreserveUnread, self).setUp() + archivemail.options.quiet = True + archivemail.options.preserve_unread = True + + def testOldRead(self): + """--preserve-unread archives old read messages in a maildir.""" + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*181) + smd.write(msg, new=False, flags='S') + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert archivemail.should_archive(msg_obj) + + def testOldUnread(self): + """--preserve-unread preserves old unread messages in a maildir.""" + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*181) + smd.write(msg, new=False) + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert not archivemail.should_archive(msg_obj) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.preserve_unread = False + super(TestMaildirPreserveUnread, self).tearDown() + +class TestMaildirAll(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirAll, self).setUp() + archivemail.options.quiet = True + archivemail.options.archive_all = True + + def testNew(self): + """New maildir messages should be archived with --all""" + self.add_messages(hours_old=24*181) + md = mailbox.Maildir(self.orig_maildir_obj.root) + msg_obj = md.next() + assert archivemail.should_archive(msg_obj) + + def testOld(self): + """Old maildir messages should be archived with --all""" + self.add_messages(hours_old=24*179) + md = mailbox.Maildir(self.orig_maildir_obj.root) + msg_obj = md.next() + assert archivemail.should_archive(msg_obj) + + def tearDown(self): + super(TestMaildirAll, self).tearDown() + archivemail.options.quiet = False + archivemail.options.archive_all = False + +class TestMaildirDryRun(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirDryRun, self).setUp() + archivemail.options.quiet = True + archivemail.options.dry_run = True + + def testOld(self): + """archiving an old maildir mailbox with the 'dry-run' option""" + self.make_maildir(True, False) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + super(TestMaildirDryRun, self).tearDown() + archivemail.options.quiet = False + archivemail.options.dry_run = False + +class TestMaildirDelete(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirDelete, self).setUp() + archivemail.options.quiet = True + archivemail.options.delete_old_mail = True + + def testOld(self): + """archiving an old maildir mailbox with the 'delete' option""" + self.make_maildir(True, False) + archivemail.archive(self.maildir) + self.verify() + + def testNew(self): + """archiving a new maildir mailbox with the 'delete' option""" + self.make_maildir(False, True) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + super(TestMaildirDelete, self).tearDown() + archivemail.options.quiet = False + archivemail.options.delete_old_mail = False + +class TestMaildirCopy(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirCopy, self).setUp() + archivemail.options.quiet = True + archivemail.options.copy_old_mail = True + + def testOld(self): + """archiving an old maildir mailbox with the 'copy' option""" + self.make_maildir(True, False) + archivemail.archive(self.maildir) + self.verify() + + def testNew(self): + """archiving a new maildir mailbox with the 'copy' option""" + self.make_maildir(False, True) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + super(TestMaildirCopy, self).tearDown() + archivemail.options.quiet = False + archivemail.options.copy_old_mail = False + +class TestArchiveMaildirFlagged(TestCaseInTempdir): + """make sure the 'include_flagged' option works with maildir messages""" + def setUp(self): + super(TestArchiveMaildirFlagged, self).setUp() + archivemail.options.include_flagged = False + archivemail.options.quiet = True + + def testOld(self): + """by default, old flagged maildir messages should not be archived""" + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*181) + smd.write(msg, new=False, flags='F') + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert not archivemail.should_archive(msg_obj) + + def testIncludeFlaggedNew(self): + """new flagged maildir messages should not be archived with include_flagged""" + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*179) + smd.write(msg, new=False, flags='F') + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert not archivemail.should_archive(msg_obj) + + def testIncludeFlaggedOld(self): + """old flagged maildir messages should be archived with include_flagged""" + archivemail.options.include_flagged = True + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*181) + smd.write(msg, new=False, flags='F') + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert archivemail.should_archive(msg_obj) + + def tearDown(self): + super(TestArchiveMaildirFlagged, self).tearDown() + archivemail.options.include_flagged = False + archivemail.options.quiet = False + +class TestArchiveMaildirSize(TestCaseInTempdir): + """check that the 'size' argument works with maildir messages""" + def setUp(self): + super(TestArchiveMaildirSize, self).setUp() + archivemail.options.quiet = True + msg = make_message(hours_old=24*181) + self.msg_size = len(msg) + smd = SimpleMaildir("orig") + smd.write(msg, new=False) + md = mailbox.Maildir(smd.root) + self.msg_obj = md.next() + + def testSmaller(self): + """giving a size argument smaller than the maildir message""" + archivemail.options.min_size = self.msg_size - 1 + assert archivemail.should_archive(self.msg_obj) + + def testBigger(self): + """giving a size argument bigger than the maildir message""" + archivemail.options.min_size = self.msg_size + 1 + assert not archivemail.should_archive(self.msg_obj) + + def tearDown(self): + super(TestArchiveMaildirSize, self).tearDown() + archivemail.options.quiet = False + archivemail.options.min_size = None + +########## helper routines ############ + +def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False): + headers = copy.copy(default_headers) + if not headers: + headers = {} + headers['Message-Id'] = make_msgid() + if not headers.has_key('Date'): + time_message = time.time() - (60 * 60 * hours_old) + headers['Date'] = time.asctime(time.localtime(time_message)) + if not headers.has_key('From'): + headers['From'] = "sender@dummy.domain" + if not headers.has_key('To'): + headers['To'] = "receipient@dummy.domain" + if not headers.has_key('Subject'): + headers['Subject'] = "This is the subject" + if mkfrom and not headers.has_key('From_'): + headers['From_'] = "%s %s" % (headers['From'], headers['Date']) + if not body: + body = "This is the message body" + + msg = "" + if headers.has_key('From_'): + msg = msg + ("From %s\n" % headers['From_']) + del headers['From_'] + for key in headers.keys(): + if headers[key] is not None: + msg = msg + ("%s: %s\n" % (key, headers[key])) + msg = msg + "\n\n" + body + "\n\n" + if not wantobj: + return msg + fp = cStringIO.StringIO(msg) + return rfc822.Message(fp) + +def append_file(source, dest): + """appends the file named 'source' to the file named 'dest'""" + assert os.path.isfile(source) + assert os.path.isfile(dest) + read = open(source, "r") + write = open(dest, "a+") + shutil.copyfileobj(read,write) + read.close() + write.close() + + +def make_mbox(body=None, headers=None, hours_old=0, messages=1): + assert tempfile.tempdir + fd, name = tempfile.mkstemp() + file = os.fdopen(fd, "w") + for count in range(messages): + msg = make_message(body=body, default_headers=headers, + mkfrom=True, hours_old=hours_old) + file.write(msg) + file.close() + return name + +def make_archive_and_plain_copy(archive_name): + """Make an mbox archive of the given name like archivemail may have + created it. Also make an uncompressed copy of this archive and return its + name.""" + copy_fd, copy_name = tempfile.mkstemp() + copy_fp = os.fdopen(copy_fd, "w") + if archivemail.options.no_compress: + fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) + fp = os.fdopen(fd, "w") + else: + archive_name += ".gz" + fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) + rawfp = os.fdopen(fd, "w") + fp = gzip.GzipFile(fileobj=rawfp) + for count in range(3): + msg = make_message(hours_old=24*360) + fp.write(msg) + copy_fp.write(msg) + fp.close() + copy_fp.close() + if not archivemail.options.no_compress: + rawfp.close() + return copy_name + +def copy_maildir(maildir, prefix="tmp"): + """Create a copy of the given maildir and return the absolute path of the + new direcory.""" + newdir = tempfile.mkdtemp(prefix=prefix) + for d in "cur", "new", "tmp": + shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d)) + return newdir + +def assertEqualContent(firstfile, secondfile, zippedfirst=False): + """Verify that the two files exist and have identical content. If zippedfirst + is True, assume that firstfile is gzip-compressed.""" + assert os.path.exists(firstfile) + assert os.path.exists(secondfile) + if zippedfirst: + try: + fp1 = gzip.GzipFile(firstfile, "r") + fp2 = open(secondfile, "r") + assert cmp_fileobj(fp1, fp2) + finally: + fp1.close() + fp2.close() + else: + assert filecmp.cmp(firstfile, secondfile, shallow=0) + +def cmp_fileobj(fp1, fp2): + """Return if reading the fileobjects yields identical content.""" + bufsize = 8192 + while True: + b1 = fp1.read(bufsize) + b2 = fp2.read(bufsize) + if b1 != b2: + return False + if not b1: + return True + +if __name__ == "__main__": + unittest.main() |