aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolaus Schulz <microschulz@web.de>2008-11-21 10:48:41 +0100
committerNikolaus Schulz <microschulz@web.de>2010-07-29 15:46:10 +0200
commit0dfae37e04477a1ddd88f7da149171400c90fa57 (patch)
tree85fb3f4a3f6e9d217a7be65f0fea27b4f484dc6a
parent78c4c6e3da88b4cb68db4fa34e45469fdb8fb4d5 (diff)
downloadarchivemail-0dfae37e04477a1ddd88f7da149171400c90fa57.tar.gz
archivemail-0dfae37e04477a1ddd88f7da149171400c90fa57.tar.bz2
archivemail-0dfae37e04477a1ddd88f7da149171400c90fa57.zip
test suite: first shot at implementing maildir test cases
-rwxr-xr-xtest_archivemail.py390
1 files changed, 387 insertions, 3 deletions
diff --git a/test_archivemail.py b/test_archivemail.py
index c83d1bf..5186de0 100755
--- a/test_archivemail.py
+++ b/test_archivemail.py
@@ -60,6 +60,8 @@ import unittest
import gzip
import cStringIO
import rfc822
+import errno
+import mailbox
try:
import archivemail
@@ -71,9 +73,183 @@ except ImportError:
print "Try renaming it from 'archivemail' to 'archivemail.py'."
sys.exit(1)
+# We want to iterate over messages in a compressed archive mbox and verify
+# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in
+# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered
+# by mailbox._PartialFile.seek(). The bug is still pending as of Python
+# 2.5.2. To work around it, we subclass gzip.GzipFile.
+#
+# It should be noted that seeking backwards in a GzipFile is emulated by
+# re-reading the entire file from the beginning, which is extremely
+# inefficient and won't work with large files; but our test archives are all
+# small, so it's okay.
+
+class FixedGzipFile(gzip.GzipFile):
+ """GzipFile with seek method accepting whence parameter."""
+ def seek(self, offset, whence=0):
+ try:
+ gzip.GzipFile.seek(self, offset, whence)
+ except TypeError:
+ if whence:
+ if whence == 1:
+ offset = self.offset + offset
+ else:
+ raise ValueError('Seek from end not supported')
+ gzip.GzipFile.seek(self, offset)
+
# precision of os.utime() when restoring mbox timestamps
utimes_precision = 5
+class MessageIdFactory:
+ """Factory to create `uniqe' message-ids."""
+ def __init__(self):
+ self.seq = 0
+ def __call__(self):
+ self.seq += 1
+ return "<archivemail%d@localhost>" % self.seq
+
+make_msgid = MessageIdFactory()
+
+class IndexedMailboxDir:
+ """An indexed mailbox directory, providing random message access by
+ message-id. Base class for a maildir and an mh subclass."""
+
+ def __init__(self, mdir_name):
+ assert tempfile.tempdir
+ self.root = tempfile.mkdtemp(prefix=mdir_name)
+ self.msg_id_dict = {}
+ self.deliveries = 0
+
+ def _add_to_index(self, msg_text, fpath):
+ """Add the given message to the index, for later random access."""
+ # Extract the message-id as index key
+ msg_id = None
+ fp = cStringIO.StringIO(msg_text)
+ while True:
+ line = fp.readline()
+ # line empty means we didn't find a message-id
+ assert line
+ if line.lower().startswith("message-id:"):
+ msg_id = line.split(":", 1)[-1].strip()
+ assert msg_id
+ break
+ assert not self.msg_id_dict.has_key(msg_id)
+ self.msg_id_dict[msg_id] = fpath
+
+ def __len__(self):
+ """Return the number of messages in this folder."""
+ return len(self.msg_id_dict)
+
+ def get_all_filenames(self):
+ """Return all relative pathnames of files in this mailbox."""
+ return self.msg_id_dict.values()
+
+ def clear(self):
+ """Remove all messages in this mailbox."""
+ for relpath in self.msg_id_dict.values():
+ try: os.remove(os.path.join(self.root, relpath))
+ except OSError, e:
+ if e.errno != errno.ENOENT: raise
+ self.msg_id_dict.clear()
+
+class SimpleMaildir(IndexedMailboxDir):
+ """Primitive Maildir class, just good enough for generating short-lived
+ test maildirs."""
+
+ def __init__(self, mdir_name='maildir'):
+ IndexedMailboxDir.__init__(self, mdir_name)
+ for d in "cur", "tmp", "new":
+ os.mkdir(os.path.join(self.root, d))
+
+ def write(self, msg_str, new=True, flags=[]):
+ """Store a message with the given flags."""
+ assert not (new and flags)
+ if new:
+ subdir = "new"
+ else:
+ subdir = "cur"
+ fname = self._mkname(new, flags)
+ relpath = os.path.join(subdir, fname)
+ path = os.path.join(self.root, relpath)
+ assert not os.path.exists(path)
+ f = open(path, "w")
+ f.write(msg_str)
+ f.close()
+ self._add_to_index(msg_str, relpath)
+
+ def remove(self):
+ """Remove all files and directories that comprise this mailbox."""
+ self.clear()
+ for d in "cur", "new", "tmp":
+ os.rmdir(os.path.join(self.root, d))
+ os.rmdir(self.root)
+ self.root = None
+
+ def _mkname(self, new, flags):
+ """Generate a unique filename for a new message."""
+ validflags = 'DFPRST'
+ for f in flags:
+ assert f in validflags
+ # This 'unique' name should be good enough, since nobody else
+ # will ever write messages to this maildir folder.
+ uniq = str(self.deliveries)
+ self.deliveries += 1
+ if new:
+ return uniq
+ if not flags:
+ return uniq + ':2,'
+ finfo = "".join(sorted(flags))
+ return uniq + ':2,' + finfo
+
+ def get_message_and_mbox_status(self, msgid):
+ """For the Message-Id msgid, return the matching message in text
+ format and its status, expressed as a set of mbox flags."""
+ fpath = self.msg_id_dict[msgid] # Barfs if not found
+ mdir_flags = fpath.rsplit('2,', 1)[-1]
+ flagmap = {
+ 'F': 'F',
+ 'R': 'A',
+ 'S': 'R'
+ }
+ mbox_flags = set([flagmap[x] for x in mdir_flags])
+ if fpath.startswith("cur/"):
+ mbox_flags.add('O')
+ fp = open(os.path.join(self.root, fpath), "r")
+ msg = fp.read()
+ fp.close()
+ return msg, mbox_flags
+
+
+class SimpleMHMailbox(IndexedMailboxDir):
+ """Primitive MH mailbox class, just good enough for generating short-lived
+ test mh mailboxes."""
+
+ def __init__(self, mdir_name='mh'):
+ IndexedMailboxDir.__init__(self, mdir_name)
+
+ def write(self, msg_str):
+ self.deliveries += 1
+ fname = str(self.deliveries)
+ path = os.path.join(self.root, fname)
+ assert not os.path.exists(fpath)
+ f = open(path, "w")
+ f.write(msg_str)
+ f.close()
+ self._add_to_index(msg_str, fname)
+
+ def remove(self):
+ self.clear()
+ os.rmdir(self.root)
+ self.root = None
+
+ def get_message(self, msgid):
+ """For the Message-Id msgid, return the matching message in text
+ format."""
+ fpath = self.msg_id_dict[mid] # Barfs if not found
+ fp = open(os.path.join(self.root, fpath), "r")
+ msg_str = fp.read()
+ fp.close()
+ return msg_str
class TestCaseInTempdir(unittest.TestCase):
"""Base class for testcases that need to create temporary files.
@@ -980,12 +1156,212 @@ class TestArchiveSize(unittest.TestCase):
archivemail.options.min_size = None
+############# Test archiving maildirs ###############
+
+class TestArchiveMailboxdir(TestCaseInTempdir):
+ """Base class defining helper functions for doing test archive runs with
+ maildirs."""
+ maildir = None # Maildir that will be processed by archivemail
+ orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object
+ remaining_msg = set() # Filenames of maildir messages that should be preserved
+ orig_archive = None # An uncompressed copy of a pre-existing archive,
+ # if one exists
+
+ def setUp(self):
+ super(TestArchiveMailboxdir, self).setUp()
+ self.orig_maildir_obj = SimpleMaildir()
+
+ def verify(self):
+ self._verify_remaining()
+ self._verify_archive()
+
+ def _verify_remaining(self):
+ """Verify that the preserved messages weren't altered."""
+ assert self.maildir
+ # Compare maildir with backup object.
+ dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root)
+ # Top-level has only directories cur, new, tmp and must be unchanged.
+ self.assertEqual(dcmp.left_list, dcmp.right_list)
+ found = set()
+ for d in dcmp.common_dirs:
+ dcmp2 = dcmp.subdirs[d]
+ # We need to verify three things.
+ # 1. directory is a subset of the original...
+ assert not dcmp2.left_only
+ # 2. all common files are identical...
+ self.assertEqual(dcmp2.common_files, dcmp2.same_files)
+ found = found.union([os.path.join(d, x) for x in dcmp2.common_files])
+ # 3. exactly the `new' messages (recorded in self.remaining_msg)
+ # were preserved.
+ self.assertEqual(found, self.remaining_msg)
+
+ def _verify_archive(self):
+ """Verify the archive correctness."""
+ number_archived = len(self.orig_maildir_obj) - len(self.remaining_msg)
+ # TODO: currently make_archive_name does not include the .gz suffix.
+ # Is this something that should be fixed?
+ archive = archivemail.make_archive_name(self.maildir)
+ if archivemail.options.no_compress:
+ iszipped = False
+ else:
+ archive += '.gz'
+ iszipped = True
+ if number_archived == 0:
+ if self.orig_archive:
+ assertEqualContent(archive, self.orig_archive, iszipped)
+ else:
+ assert not os.path.exists(archive)
+ return
+ fp_new = fp_archive = tmp_archive_name = None
+ try:
+ if self.orig_archive:
+ new_size = os.path.getsize(archive)
+ # Brute force: split archive in old and new part and verify the
+ # parts separately. (Of course this destroys the archive.)
+ fp_archive = open(archive, "r+")
+ fp_archive.seek(self.orig_archive_size)
+ fd, tmp_archive_name = tempfile.mkstemp()
+ fp_new = os.fdopen(fd, "w")
+ shutil.copyfileobj(fp_archive, fp_new)
+ fp_new.close()
+ fp_archive.truncate(self.orig_archive_size)
+ fp_archive.close()
+ assertEqualContent(archive, self.orig_archive, iszipped)
+ new_archive = tmp_archive_name
+ else:
+ new_archive = archive
+ if archivemail.options.no_compress:
+ fp_archive = open(new_archive, "r")
+ else:
+ fp_archive = FixedGzipFile(new_archive, "r")
+ mb = mailbox.UnixMailbox(fp_archive)
+ found = 0
+ for msg in mb:
+ self.verify_maildir_has_msg(self.orig_maildir_obj, msg)
+ found += 1
+ self.assertEqual(found, number_archived)
+ finally:
+ if tmp_archive_name:
+ os.remove(tmp_archive_name)
+ if fp_new is not None:
+ fp_new.close()
+ if fp_archive is not None:
+ fp_archive.close()
+
+ def verify_maildir_has_msg(self, maildir, msg):
+ """Assert that the given maildir has a copy of the rfc822 message."""
+ mid = msg['Message-Id'] # Complains if there is no message-id
+ mdir_msg_str, mdir_flags = \
+ maildir.get_message_and_mbox_status(mid)
+ mbox_flags = set(msg.get('status', '') + msg.get('x-status', ''))
+ self.assertEqual(mdir_flags, mbox_flags)
+
+ headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'),
+ msg.headers)
+ headers = "".join(headers)
+ msg.rewindbody()
+ # Discard last mbox LF which is not part of the message.
+ body = msg.fp.read()[:-1]
+ msg_str = headers + os.linesep + body
+ self.assertEqual(mdir_msg_str, msg_str)
+
+ def add_messages(self, body=None, headers=None, hours_old=0, messages=1):
+ for count in range(messages):
+ msg = make_message(body, default_headers=headers, mkfrom=False,
+ hours_old=hours_old)
+ self.orig_maildir_obj.write(msg, new=False)
+
+ def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1,
+ make_old_archive=False):
+ if mknew:
+ self.add_messages(body, headers, 179*24, messages)
+ self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
+ if mkold:
+ self.add_messages(body, headers, 181*24, messages)
+ self.maildir = copy_maildir(self.orig_maildir_obj.root)
+ if make_old_archive:
+ archive = archivemail.make_archive_name(self.maildir)
+ self.orig_archive = make_archive_and_plain_copy(archive)
+ # FIXME: .gz extension handling is a mess II
+ if not archivemail.options.no_compress:
+ archive += '.gz'
+ self.orig_archive_size = os.path.getsize(archive)
+
+class TestEmptyMaildir(TestCaseInTempdir):
+ def setUp(self):
+ super(TestEmptyMaildir, self).setUp()
+ archivemail.options.quiet = True
+
+ def testEmpty(self):
+ """Archiving an empty maildir should not result in an archive."""
+ self.mdir = SimpleMaildir()
+ archivemail.archive(self.mdir.root)
+ assert not os.path.exists(self.mdir.root + '_archive.gz')
+
+ def tearDown(self):
+ super(TestEmptyMaildir, self).tearDown()
+ archivemail.options.quiet = False
+
+class TestMaildir(TestArchiveMailboxdir):
+ def setUp(self):
+ super(TestMaildir, self).setUp()
+ archivemail.options.quiet = True
+
+ def testOld(self):
+ self.make_maildir(True, False, messages=3)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def testNew(self):
+ self.make_maildir(False, True, messages=3)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def testMixed(self):
+ self.make_maildir(True, True, messages=3)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def testMixedExisting(self):
+ self.make_maildir(True, True, messages=3, make_old_archive=True)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ super(TestMaildir, self).tearDown()
+
+
+class TestMaildirPreserveUnread(TestArchiveMailboxdir):
+ def setUp(self):
+ super(TestMaildirPreserveUnread, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.preserve_unread = True
+
+ def testOldRead(self):
+ """--preserve-unread archives all old read messages in a maildir."""
+ # XXX
+ smd = self.orig_maildir_obj = SimpleMaildir("orig")
+ for count in range(3):
+ msg = make_message(hours_old=24*181)
+ smd.write(msg, new=False, flags='S')
+ self.maildir = copy_maildir(smd.root)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.preserve_unread = False
+ super(TestMaildirPreserveUnread, self).tearDown()
+
+
########## helper routines ############
-def make_message(body=None, default_headers={}, hours_old=None, wantobj=False):
+def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False):
headers = copy.copy(default_headers)
if not headers:
headers = {}
+ headers['Message-Id'] = make_msgid()
if not headers.has_key('Date'):
time_message = time.time() - (60 * 60 * hours_old)
headers['Date'] = time.asctime(time.localtime(time_message))
@@ -995,7 +1371,7 @@ def make_message(body=None, default_headers={}, hours_old=None, wantobj=False):
headers['To'] = "receipient@dummy.domain"
if not headers.has_key('Subject'):
headers['Subject'] = "This is the subject"
- if not headers.has_key('From_'):
+ if mkfrom and not headers.has_key('From_'):
headers['From_'] = "%s %s" % (headers['From'], headers['Date'])
if not body:
body = "This is the message body"
@@ -1030,7 +1406,7 @@ def make_mbox(body=None, headers=None, hours_old=0, messages=1):
file = os.fdopen(fd, "w")
for count in range(messages):
msg = make_message(body=body, default_headers=headers,
- hours_old=hours_old)
+ mkfrom=True, hours_old=hours_old)
file.write(msg)
file.close()
return name
@@ -1059,6 +1435,14 @@ def make_archive_and_plain_copy(archive_name):
rawfp.close()
return copy_name
+def copy_maildir(maildir, prefix="tmp"):
+ """Create a copy of the given maildir and return the absolute path of the
+ new direcory."""
+ newdir = tempfile.mkdtemp(prefix=prefix)
+ for d in "cur", "new", "tmp":
+ shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d))
+ return newdir
+
def assertEqualContent(firstfile, secondfile, zippedfirst=False):
"""Verify that the two files exist and have identical content. If zippedfirst
is True, assume that firstfile is gzip-compressed."""