aboutsummaryrefslogtreecommitdiffstats
path: root/test_archivemail
diff options
context:
space:
mode:
Diffstat (limited to 'test_archivemail')
-rwxr-xr-xtest_archivemail1593
1 files changed, 1593 insertions, 0 deletions
diff --git a/test_archivemail b/test_archivemail
new file mode 100755
index 0000000..4a11734
--- /dev/null
+++ b/test_archivemail
@@ -0,0 +1,1593 @@
+#! /usr/bin/env python
+############################################################################
+# Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
+# (C) 2006-2010 Nikolaus Schulz <microschulz@web.de>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+############################################################################
+"""
+Unit-test archivemail using 'PyUnit'.
+
+TODO: add tests for:
+ * dotlock locks already existing
+ * archiving MH-format mailboxes
+ * a 3rd party process changing the mbox file being read
+
+"""
+
+import sys
+
+def check_python_version():
+ """Abort if we are running on python < v2.3"""
+ too_old_error = "This test script requires python version 2.3 or later. " + \
+ "Your version of python is:\n%s" % sys.version
+ try:
+ version = sys.version_info # we might not even have this function! :)
+ if (version[0] < 2) or (version[0] == 2 and version[1] < 3):
+ print too_old_error
+ sys.exit(1)
+ except AttributeError:
+ print too_old_error
+ sys.exit(1)
+
+# define & run this early because 'unittest' requires Python >= 2.1
+check_python_version()
+
+import copy
+import fcntl
+import filecmp
+import os
+import re
+import shutil
+import stat
+import tempfile
+import time
+import unittest
+import gzip
+import cStringIO
+import rfc822
+import mailbox
+
+try:
+ import archivemail
+except ImportError:
+ print "The archivemail script needs to be called 'archivemail.py'"
+ print "and should be in the current directory in order to be imported"
+ print "and tested. Sorry."
+ if os.path.isfile("archivemail"):
+ print "Try renaming it from 'archivemail' to 'archivemail.py'."
+ sys.exit(1)
+
+# We want to iterate over messages in a compressed archive mbox and verify
+# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in
+# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered
+# by mailbox._PartialFile.seek(). The bug is still pending as of Python
+# 2.5.2. To work around it, we subclass gzip.GzipFile.
+#
+# It should be noted that seeking backwards in a GzipFile is emulated by
+# re-reading the entire file from the beginning, which is extremely
+# inefficient and won't work with large files; but our test archives are all
+# small, so it's okay.
+
+class FixedGzipFile(gzip.GzipFile):
+ """GzipFile with seek method accepting whence parameter."""
+ def seek(self, offset, whence=0):
+ try:
+ gzip.GzipFile.seek(self, offset, whence)
+ except TypeError:
+ if whence:
+ if whence == 1:
+ offset = self.offset + offset
+ else:
+ raise ValueError('Seek from end not supported')
+ gzip.GzipFile.seek(self, offset)
+
+# precision of os.utime() when restoring mbox timestamps
+utimes_precision = 5
+
+class MessageIdFactory:
+ """Factory to create `uniqe' message-ids."""
+ def __init__(self):
+ self.seq = 0
+ def __call__(self):
+ self.seq += 1
+ return "<archivemail%d@localhost>" % self.seq
+
+make_msgid = MessageIdFactory()
+
+class IndexedMailboxDir:
+ """An indexed mailbox directory, providing random message access by
+ message-id. Intended as a base class for a maildir and an mh subclass."""
+
+ def __init__(self, mdir_name):
+ assert tempfile.tempdir
+ self.root = tempfile.mkdtemp(prefix=mdir_name)
+ self.msg_id_dict = {}
+ self.deliveries = 0
+
+ def _add_to_index(self, msg_text, fpath):
+ """Add the given message to the index, for later random access."""
+ # Extract the message-id as index key
+ msg_id = None
+ fp = cStringIO.StringIO(msg_text)
+ while True:
+ line = fp.readline()
+ # line empty means we didn't find a message-id
+ assert line
+ if line.lower().startswith("message-id:"):
+ msg_id = line.split(":", 1)[-1].strip()
+ assert msg_id
+ break
+ assert not self.msg_id_dict.has_key(msg_id)
+ self.msg_id_dict[msg_id] = fpath
+
+ def get_all_filenames(self):
+ """Return all relative pathnames of files in this mailbox."""
+ return self.msg_id_dict.values()
+
+class SimpleMaildir(IndexedMailboxDir):
+ """Primitive Maildir class, just good enough for generating short-lived
+ test maildirs."""
+
+ def __init__(self, mdir_name='maildir'):
+ IndexedMailboxDir.__init__(self, mdir_name)
+ for d in "cur", "tmp", "new":
+ os.mkdir(os.path.join(self.root, d))
+
+ def write(self, msg_str, new=True, flags=[]):
+ """Store a message with the given flags."""
+ assert not (new and flags)
+ if new:
+ subdir = "new"
+ else:
+ subdir = "cur"
+ fname = self._mkname(new, flags)
+ relpath = os.path.join(subdir, fname)
+ path = os.path.join(self.root, relpath)
+ assert not os.path.exists(path)
+ f = open(path, "w")
+ f.write(msg_str)
+ f.close()
+ self._add_to_index(msg_str, relpath)
+
+ def _mkname(self, new, flags):
+ """Generate a unique filename for a new message."""
+ validflags = 'DFPRST'
+ for f in flags:
+ assert f in validflags
+ # This 'unique' name should be good enough, since nobody else
+ # will ever write messages to this maildir folder.
+ uniq = str(self.deliveries)
+ self.deliveries += 1
+ if new:
+ return uniq
+ if not flags:
+ return uniq + ':2,'
+ finfo = "".join(sorted(flags))
+ return uniq + ':2,' + finfo
+
+ def get_message_and_mbox_status(self, msgid):
+ """For the Message-Id msgid, return the matching message in text
+ format and its status, expressed as a set of mbox flags."""
+ fpath = self.msg_id_dict[msgid] # Barfs if not found
+ mdir_flags = fpath.rsplit('2,', 1)[-1]
+ flagmap = {
+ 'F': 'F',
+ 'R': 'A',
+ 'S': 'R'
+ }
+ mbox_flags = set([flagmap[x] for x in mdir_flags])
+ if fpath.startswith("cur/"):
+ mbox_flags.add('O')
+ fp = open(os.path.join(self.root, fpath), "r")
+ msg = fp.read()
+ fp.close()
+ return msg, mbox_flags
+
+
+class TestCaseInTempdir(unittest.TestCase):
+ """Base class for testcases that need to create temporary files.
+ All testcases that create temporary files should be derived from this
+ class, not directly from unittest.TestCase.
+ TestCaseInTempdir provides these methods:
+
+ setUp() Creates a safe temporary directory and sets tempfile.tempdir.
+
+ tearDown() Recursively removes the temporary directory and unsets
+ tempfile.tempdir.
+
+ Overriding methods should call the ones above."""
+ temproot = None
+
+ def setUp(self):
+ if not self.temproot:
+ assert not tempfile.tempdir
+ self.temproot = tempfile.tempdir = \
+ tempfile.mkdtemp(prefix="test-archivemail")
+
+ def tearDown(self):
+ assert tempfile.tempdir == self.temproot
+ if self.temproot:
+ shutil.rmtree(self.temproot)
+ tempfile.tempdir = self.temproot = None
+
+
+############ Mbox Class testing ##############
+
+class TestMboxDotlock(TestCaseInTempdir):
+ def setUp(self):
+ super(TestMboxDotlock, self).setUp()
+ self.mbox_name = make_mbox()
+ self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.mbox = archivemail.Mbox(self.mbox_name)
+
+ def testDotlock(self):
+ """dotlock_lock/unlock should create/delete a lockfile"""
+ lock = self.mbox_name + ".lock"
+ self.mbox._dotlock_lock()
+ assert os.path.isfile(lock)
+ self.mbox._dotlock_unlock()
+ assert not os.path.isfile(lock)
+
+ def testDotlockingSucceedsUponEACCES(self):
+ """A dotlock should silently be omitted upon EACCES."""
+ archivemail.options.quiet = True
+ mbox_dir = os.path.dirname(self.mbox_name)
+ os.chmod(mbox_dir, 0500)
+ try:
+ self.mbox._dotlock_lock()
+ finally:
+ os.chmod(mbox_dir, 0700)
+ archivemail.options.quiet = False
+
+class TestMboxPosixLock(TestCaseInTempdir):
+ def setUp(self):
+ super(TestMboxPosixLock, self).setUp()
+ self.mbox_name = make_mbox()
+ self.mbox = archivemail.Mbox(self.mbox_name)
+
+ def testPosixLock(self):
+ """posix_lock/unlock should create/delete an advisory lock"""
+
+ # The following code snippet heavily lends from the Python 2.5 mailbox
+ # unittest.
+ # BEGIN robbery:
+
+ # Fork off a subprocess that will lock the file for 2 seconds,
+ # unlock it, and then exit.
+ pid = os.fork()
+ if pid == 0:
+ # In the child, lock the mailbox.
+ self.mbox._posix_lock()
+ time.sleep(2)
+ self.mbox._posix_unlock()
+ os._exit(0)
+
+ # In the parent, sleep a bit to give the child time to acquire
+ # the lock.
+ time.sleep(0.5)
+ # The parent's file self.mbox.mbox_file shares fcntl locks with the
+ # duplicated FD in the child; reopen it so we get a different file
+ # table entry.
+ file = open(self.mbox_name, "r+")
+ lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB
+ fd = file.fileno()
+ try:
+ self.assertRaises(IOError, fcntl.lockf, fd, lock_nb)
+
+ finally:
+ # Wait for child to exit. Locking should now succeed.
+ exited_pid, status = os.waitpid(pid, 0)
+
+ fcntl.lockf(fd, lock_nb)
+ fcntl.lockf(fd, fcntl.LOCK_UN)
+ # END robbery
+
+
+class TestMboxNext(TestCaseInTempdir):
+ def setUp(self):
+ super(TestMboxNext, self).setUp()
+ self.not_empty_name = make_mbox(messages=18)
+ self.empty_name = make_mbox(messages=0)
+
+ def testNextEmpty(self):
+ """mbox.next() should return None on an empty mailbox"""
+ mbox = archivemail.Mbox(self.empty_name)
+ msg = mbox.next()
+ self.assertEqual(msg, None)
+
+ def testNextNotEmpty(self):
+ """mbox.next() should a message on a populated mailbox"""
+ mbox = archivemail.Mbox(self.not_empty_name)
+ for count in range(18):
+ msg = mbox.next()
+ assert msg
+ msg = mbox.next()
+ self.assertEqual(msg, None)
+
+
+############ TempMbox Class testing ##############
+
+class TestTempMboxWrite(TestCaseInTempdir):
+ def setUp(self):
+ super(TestTempMboxWrite, self).setUp()
+
+ def testWrite(self):
+ """mbox.write() should append messages to a mbox mailbox"""
+ read_file = make_mbox(messages=3)
+ mbox_read = archivemail.Mbox(read_file)
+ mbox_write = archivemail.TempMbox()
+ write_file = mbox_write.mbox_file_name
+ for count in range(3):
+ msg = mbox_read.next()
+ mbox_write.write(msg)
+ mbox_read.close()
+ mbox_write.close()
+ assert filecmp.cmp(read_file, write_file, shallow=0)
+
+ def testWriteNone(self):
+ """calling mbox.write() with no message should raise AssertionError"""
+ write = archivemail.TempMbox()
+ self.assertRaises(AssertionError, write.write, None)
+
+class TestTempMboxRemove(TestCaseInTempdir):
+ def setUp(self):
+ super(TestTempMboxRemove, self).setUp()
+ self.mbox = archivemail.TempMbox()
+ self.mbox_name = self.mbox.mbox_file_name
+
+ def testMboxRemove(self):
+ """remove() should delete a mbox mailbox"""
+ assert os.path.exists(self.mbox_name)
+ self.mbox.remove()
+ assert not os.path.exists(self.mbox_name)
+
+
+
+########## options class testing #################
+
+class TestOptionDefaults(unittest.TestCase):
+ def testVerbose(self):
+ """verbose should be off by default"""
+ self.assertEqual(archivemail.options.verbose, False)
+
+ def testDaysOldMax(self):
+ """default archival time should be 180 days"""
+ self.assertEqual(archivemail.options.days_old_max, 180)
+
+ def testQuiet(self):
+ """quiet should be off by default"""
+ self.assertEqual(archivemail.options.quiet, False)
+
+ def testDeleteOldMail(self):
+ """we should not delete old mail by default"""
+ self.assertEqual(archivemail.options.delete_old_mail, False)
+
+ def testNoCompress(self):
+ """no-compression should be off by default"""
+ self.assertEqual(archivemail.options.no_compress, False)
+
+ def testIncludeFlagged(self):
+ """we should not archive flagged messages by default"""
+ self.assertEqual(archivemail.options.include_flagged, False)
+
+ def testPreserveUnread(self):
+ """we should not preserve unread messages by default"""
+ self.assertEqual(archivemail.options.preserve_unread, False)
+
+class TestOptionParser(unittest.TestCase):
+ def setUp(self):
+ self.oldopts = copy.copy(archivemail.options)
+
+ def testOptionDate(self):
+ """--date and -D options are parsed correctly"""
+ date_formats = (
+ "%Y-%m-%d", # ISO format
+ "%d %b %Y" , # Internet format
+ "%d %B %Y" , # Internet format with full month names
+ )
+ date = time.strptime("2000-07-29", "%Y-%m-%d")
+ unixdate = time.mktime(date)
+ for df in date_formats:
+ d = time.strftime(df, date)
+ for opt in '-D', '--date=':
+ archivemail.options.date_old_max = None
+ archivemail.options.parse_args([opt+d], "")
+ self.assertEqual(unixdate, archivemail.options.date_old_max)
+
+ def testOptionPreserveUnread(self):
+ """--preserve-unread option is parsed correctly"""
+ archivemail.options.parse_args(["--preserve-unread"], "")
+ assert archivemail.options.preserve_unread
+ archivemail.options.preserve_unread = False
+ archivemail.options.parse_args(["-u"], "")
+ assert archivemail.options.preserve_unread
+
+ def testOptionSuffix(self):
+ """--suffix and -s options are parsed correctly"""
+ for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
+ archivemail.options.parse_args(["--suffix="+suffix], "")
+ assert archivemail.options.archive_suffix == suffix
+ archivemail.options.suffix = None
+ archivemail.options.parse_args(["-s", suffix], "")
+ assert archivemail.options.archive_suffix == suffix
+
+ def testOptionDryrun(self):
+ """--dry-run option is parsed correctly"""
+ archivemail.options.parse_args(["--dry-run"], "")
+ assert archivemail.options.dry_run
+ archivemail.options.preserve_unread = False
+ archivemail.options.parse_args(["-n"], "")
+ assert archivemail.options.dry_run
+
+ def testOptionDays(self):
+ """--days and -d options are parsed correctly"""
+ archivemail.options.parse_args(["--days=11"], "")
+ self.assertEqual(archivemail.options.days_old_max, 11)
+ archivemail.options.days_old_max = None
+ archivemail.options.parse_args(["-d11"], "")
+ self.assertEqual(archivemail.options.days_old_max, 11)
+
+ def testOptionDelete(self):
+ """--delete option is parsed correctly"""
+ archivemail.options.parse_args(["--delete"], "")
+ assert archivemail.options.delete_old_mail
+
+ def testOptionCopy(self):
+ """--copy option is parsed correctly"""
+ archivemail.options.parse_args(["--copy"], "")
+ assert archivemail.options.copy_old_mail
+
+ def testOptionOutputdir(self):
+ """--output-dir and -o options are parsed correctly"""
+ for path in "/just/some/path", "relative/path":
+ archivemail.options.parse_args(["--output-dir=%s" % path], "")
+ self.assertEqual(archivemail.options.output_dir, path)
+ archivemail.options.output_dir = None
+ archivemail.options.parse_args(["-o%s" % path], "")
+ self.assertEqual(archivemail.options.output_dir, path)
+
+ def testOptionNocompress(self):
+ """--no-compress option is parsed correctly"""
+ archivemail.options.parse_args(["--no-compress"], "")
+ assert archivemail.options.no_compress
+
+ def testOptionSize(self):
+ """--size and -S options are parsed correctly"""
+ size = "666"
+ archivemail.options.parse_args(["--size=%s" % size ], "")
+ self.assertEqual(archivemail.options.min_size, int(size))
+ archivemail.options.parse_args(["-S%s" % size ], "")
+ self.assertEqual(archivemail.options.min_size, int(size))
+
+ def tearDown(self):
+ archivemail.options = self.oldopts
+
+########## archivemail.is_older_than_days() unit testing #################
+
+class TestIsTooOld(unittest.TestCase):
+ def testVeryOld(self):
+ """with max_days=360, should be true for these dates > 1 year"""
+ for years in range(1, 10):
+ time_msg = time.time() - (years * 365 * 24 * 60 * 60)
+ assert archivemail.is_older_than_days(time_message=time_msg,
+ max_days=360)
+
+ def testOld(self):
+ """with max_days=14, should be true for these dates > 14 days"""
+ for days in range(14, 360):
+ time_msg = time.time() - (days * 24 * 60 * 60)
+ assert archivemail.is_older_than_days(time_message=time_msg,
+ max_days=14)
+
+ def testJustOld(self):
+ """with max_days=1, should be true for these dates >= 1 day"""
+ for minutes in range(0, 61):
+ time_msg = time.time() - (25 * 60 * 60) + (minutes * 60)
+ assert archivemail.is_older_than_days(time_message=time_msg,
+ max_days=1)
+
+ def testNotOld(self):
+ """with max_days=9, should be false for these dates < 9 days"""
+ for days in range(0, 9):
+ time_msg = time.time() - (days * 24 * 60 * 60)
+ assert not archivemail.is_older_than_days(time_message=time_msg,
+ max_days=9)
+
+ def testJustNotOld(self):
+ """with max_days=1, should be false for these hours <= 1 day"""
+ for minutes in range(0, 60):
+ time_msg = time.time() - (23 * 60 * 60) - (minutes * 60)
+ assert not archivemail.is_older_than_days(time_message=time_msg,
+ max_days=1)
+
+ def testFuture(self):
+ """with max_days=1, should be false for times in the future"""
+ for minutes in range(0, 60):
+ time_msg = time.time() + (minutes * 60)
+ assert not archivemail.is_older_than_days(time_message=time_msg,
+ max_days=1)
+
+########## archivemail.parse_imap_url() unit testing #################
+
+class TestParseIMAPUrl(unittest.TestCase):
+ def setUp(self):
+ archivemail.options.quiet = True
+ archivemail.options.verbose = False
+ archivemail.options.pwfile = None
+
+ urls_withoutpass = [
+ ('imaps://user@example.org@imap.example.org/upperbox/lowerbox',
+ ('user', None, 'example.org@imap.example.org',
+ 'upperbox/lowerbox')),
+ ('imaps://"user@example.org"@imap.example.org/upperbox/lowerbox',
+ ('user@example.org', None, 'imap.example.org',
+ 'upperbox/lowerbox')),
+ ('imaps://user@example.org"@imap.example.org/upperbox/lowerbox',
+ ('user', None, 'example.org"@imap.example.org',
+ 'upperbox/lowerbox')),
+ ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox',
+ ('"user', None, 'example.org@imap.example.org',
+ 'upperbox/lowerbox')),
+ ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox',
+ ('us"er@example.org', None, 'imap.example.org',
+ 'upperbox/lowerbox')),
+ ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox',
+ ('user\\', None, 'example.org@imap.example.org',
+ 'upperbox/lowerbox'))
+ ]
+ urls_withpass = [
+ ('imaps://user@example.org:passwd@imap.example.org/upperbox/lowerbox',
+ ('user@example.org', 'passwd', 'imap.example.org',
+ 'upperbox/lowerbox'),
+ ('user', None, 'example.org:passwd@imap.example.org',
+ 'upperbox/lowerbox')),
+ ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox',
+ ('"user@example.org', "passwd", 'imap.example.org',
+ 'upperbox/lowerbox'),
+ ('"user', None, 'example.org:passwd@imap.example.org',
+ 'upperbox/lowerbox')),
+ ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox',
+ ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org',
+ 'upperbox/lowerbox'),
+ ('u\\ser\\', None, 'example.org:"p@sswd"@imap.example.org',
+ 'upperbox/lowerbox'))
+ ]
+ # These are invalid when the password's not stripped.
+ urls_onlywithpass = [
+ ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox',
+ ('user@example.org', "passwd", 'imap.example.org',
+ 'upperbox/lowerbox'))
+ ]
+ def testUrlsWithoutPwfile(self):
+ """Parse test urls with --pwfile option unset. This parses a password in
+ the URL, if present."""
+ archivemail.options.pwfile = None
+ for mbstr in self.urls_withpass + self.urls_withoutpass:
+ url = mbstr[0][mbstr[0].find('://')+3:]
+ result = archivemail.parse_imap_url(url)
+ self.assertEqual(result, mbstr[1])
+
+ def testUrlsWithPwfile(self):
+ """Parse test urls with --pwfile set. In this case the ':' character
+ loses its meaning as a delimiter."""
+ archivemail.options.pwfile = "whocares.txt"
+ for mbstr in self.urls_withpass:
+ url = mbstr[0][mbstr[0].find('://')+3:]
+ result = archivemail.parse_imap_url(url)
+ self.assertEqual(result, mbstr[2])
+ for mbstr in self.urls_onlywithpass:
+ url = mbstr[0][mbstr[0].find('://')+3:]
+ self.assertRaises(archivemail.UnexpectedError,
+ archivemail.parse_imap_url, url)
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.verbose = False
+ archivemail.options.pwfile = None
+
+########## acceptance testing ###########
+
+class TestArchive(TestCaseInTempdir):
+ """Base class defining helper functions for doing test archiving runs."""
+ mbox = None # mbox file that will be processed by archivemail
+ good_archive = None # Uncompressed reference archive file to verify the
+ # archive after processing
+ good_mbox = None # Reference mbox file to verify the mbox after processing
+
+ def verify(self):
+ assert os.path.exists(self.mbox)
+ if self.good_mbox is not None:
+ assertEqualContent(self.mbox, self.good_mbox)
+ else:
+ self.assertEqual(os.path.getsize(self.mbox), 0)
+ archive_name = self.mbox + "_archive"
+ if not archivemail.options.no_compress:
+ archive_name += ".gz"
+ iszipped = True
+ else:
+ assert not os.path.exists(archive_name + ".gz")
+ iszipped = False
+ if self.good_archive is not None:
+ assertEqualContent(archive_name, self.good_archive, iszipped)
+ else:
+ assert not os.path.exists(archive_name)
+
+ def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
+ """Prepare for a test run with an old mbox by making an old mbox,
+ optionally an existing archive, and a reference archive to verify the
+ archive after archivemail has run."""
+ self.mbox = make_mbox(body, headers, 181*24, messages)
+ archive_does_change = not (archivemail.options.dry_run or
+ archivemail.options.delete_old_mail)
+ mbox_does_not_change = archivemail.options.dry_run or \
+ archivemail.options.copy_old_mail
+ if make_old_archive:
+ archive = archivemail.make_archive_name(self.mbox)
+ self.good_archive = make_archive_and_plain_copy(archive)
+ if archive_does_change:
+ append_file(self.mbox, self.good_archive)
+ elif archive_does_change:
+ self.good_archive = tempfile.mkstemp()[1]
+ shutil.copyfile(self.mbox, self.good_archive)
+ if mbox_does_not_change:
+ if archive_does_change and not make_old_archive:
+ self.good_mbox = self.good_archive
+ else:
+ self.good_mbox = tempfile.mkstemp()[1]
+ shutil.copyfile(self.mbox, self.good_mbox)
+
+ def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
+ """Prepare for a test run with a mixed mbox by making a mixed mbox,
+ optionally an existing archive, a reference archive to verify the
+ archive after archivemail has run, and likewise a reference mbox to
+ verify the mbox."""
+ self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive)
+ new_mbox_name = make_mbox(body, headers, 179*24, messages)
+ append_file(new_mbox_name, self.mbox)
+ if self.good_mbox is None:
+ self.good_mbox = new_mbox_name
+ else:
+ if self.good_mbox == self.good_archive:
+ self.good_mbox = tempfile.mkstemp()[1]
+ shutil.copyfile(self.mbox, self.good_mbox)
+ else:
+ append_file(new_mbox_name, self.good_mbox)
+
+ def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
+ """Prepare for a test run with a new mbox by making a new mbox,
+ optionally an exiting archive, and a reference mbox to verify the mbox
+ after archivemail has run."""
+ self.mbox = make_mbox(body, headers, 179*24, messages)
+ self.good_mbox = tempfile.mkstemp()[1]
+ shutil.copyfile(self.mbox, self.good_mbox)
+ if make_old_archive:
+ archive = archivemail.make_archive_name(self.mbox)
+ self.good_archive = make_archive_and_plain_copy(archive)
+
+
+class TestArchiveMbox(TestArchive):
+ """archiving should work based on the date of messages given"""
+
+ def setUp(self):
+ self.oldopts = copy.copy(archivemail.options)
+ archivemail.options.quiet = True
+ super(TestArchiveMbox, self).setUp()
+
+ def testOld(self):
+ """archiving an old mailbox"""
+ self.make_old_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testOldFromInBody(self):
+ """archiving an old mailbox with 'From ' in the body"""
+ body = """This is a message with ^From at the start of a line
+From is on this line
+This is after the ^From line"""
+ self.make_old_mbox(messages=3, body=body)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testDateSystem(self):
+ """test that the --date option works as expected"""
+ test_headers = (
+ {
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
+ 'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
+ },
+ {
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
+ 'Date' : None,
+ },
+ {
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
+ 'Date' : None,
+ 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
+ },
+ {
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
+ 'Date' : None,
+ 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
+ },
+ )
+ for headers in test_headers:
+ msg = make_message(default_headers=headers, wantobj=True)
+ date = time.strptime("2000-07-29", "%Y-%m-%d")
+ archivemail.options.date_old_max = time.mktime(date)
+ assert archivemail.should_archive(msg)
+ date = time.strptime("2000-07-27", "%Y-%m-%d")
+ archivemail.options.date_old_max = time.mktime(date)
+ assert not archivemail.should_archive(msg)
+
+ def testMixed(self):
+ """archiving a mixed mailbox"""
+ self.make_mixed_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testNew(self):
+ """archiving a new mailbox"""
+ self.make_new_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testOldExisting(self):
+ """archiving an old mailbox with an existing archive"""
+ self.make_old_mbox(messages=3, make_old_archive=True)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testOldWeirdHeaders(self):
+ """archiving old mailboxes with weird headers"""
+ weird_headers = (
+ { # we should archive because of the date on the 'From_' line
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
+ 'Date' : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000',
+ },
+ { # we should archive because of the date on the 'From_' line
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
+ 'Date' : None,
+ },
+ { # we should archive because of the date in 'Delivery-date'
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
+ 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
+ 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
+ },
+ { # we should archive because of the date in 'Delivery-date'
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
+ 'Date' : None,
+ 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
+ },
+ { # we should archive because of the date in 'Resent-Date'
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
+ 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
+ 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
+ },
+ { # we should archive because of the date in 'Resent-Date'
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
+ 'Date' : None,
+ 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
+ },
+ { # completely blank dates were crashing < version 0.4.7
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
+ 'Date' : '',
+ },
+ { # completely blank dates were crashing < version 0.4.7
+ 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
+ 'Date' : '',
+ 'Resent-Date' : '',
+ },
+ )
+ fd, self.mbox = tempfile.mkstemp()
+ fp = os.fdopen(fd, "w")
+ for headers in weird_headers:
+ msg_text = make_message(default_headers=headers)
+ fp.write(msg_text*2)
+ fp.close()
+ self.good_archive = tempfile.mkstemp()[1]
+ shutil.copyfile(self.mbox, self.good_archive)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def tearDown(self):
+ archivemail.options = self.oldopts
+ super(TestArchiveMbox, self).tearDown()
+
+
+class TestArchiveMboxTimestamp(TestCaseInTempdir):
+ """original mbox timestamps should always be preserved"""
+ def setUp(self):
+ super(TestArchiveMboxTimestamp, self).setUp()
+ archivemail.options.quiet = True
+ self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180))
+ self.mtime = os.path.getmtime(self.mbox_name) - 66
+ self.atime = os.path.getatime(self.mbox_name) - 88
+ os.utime(self.mbox_name, (self.atime, self.mtime))
+
+ def testNew(self):
+ """mbox timestamps should not change after no archival"""
+ archivemail.options.days_old_max = 181
+ archivemail.archive(self.mbox_name)
+ self.verify()
+
+ def testOld(self):
+ """mbox timestamps should not change after archival"""
+ archivemail.options.days_old_max = 179
+ archivemail.archive(self.mbox_name)
+ self.verify()
+
+ def verify(self):
+ assert os.path.exists(self.mbox_name)
+ new_atime = os.path.getatime(self.mbox_name)
+ new_mtime = os.path.getmtime(self.mbox_name)
+ self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision)
+ self.assertAlmostEqual(self.atime, new_atime, utimes_precision)
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.days_old_max = 180
+ os.remove(self.mbox_name)
+ super(TestArchiveMboxTimestamp, self).tearDown()
+
+
+class TestArchiveMboxAll(unittest.TestCase):
+ def setUp(self):
+ archivemail.options.quiet = True
+ archivemail.options.archive_all = True
+
+ def testNew(self):
+ """new messages should be archived with --all"""
+ self.msg = make_message(hours_old=24*179, wantobj=True)
+ assert archivemail.should_archive(self.msg)
+
+ def testOld(self):
+ """old messages should be archived with --all"""
+ self.msg = make_message(hours_old=24*181, wantobj=True)
+ assert archivemail.should_archive(self.msg)
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.archive_all = False
+
+class TestArchiveMboxPreserveUnread(unittest.TestCase):
+ """make sure the 'preserve_unread' option works"""
+ def setUp(self):
+ archivemail.options.quiet = True
+ archivemail.options.preserve_unread = True
+ self.msg = make_message(hours_old=24*181, wantobj=True)
+
+ def testOldRead(self):
+ """old read messages should be archived with --preserve-unread"""
+ self.msg["Status"] = "RO"
+ assert archivemail.should_archive(self.msg)
+
+ def testOldUnread(self):
+ """old unread messages should not be archived with --preserve-unread"""
+ self.msg["Status"] = "O"
+ assert not archivemail.should_archive(self.msg)
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.preserve_unread = False
+
+
+class TestArchiveMboxSuffix(unittest.TestCase):
+ """make sure the 'suffix' option works"""
+ def setUp(self):
+ self.old_suffix = archivemail.options.archive_suffix
+ archivemail.options.quiet = True
+
+ def testSuffix(self):
+ """archiving with specified --suffix arguments"""
+ for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
+ mbox_name = "foobar"
+ archivemail.options.archive_suffix = suffix
+ days_old_max = 180
+ parsed_suffix_time = time.time() - days_old_max*24*60*60
+ parsed_suffix = time.strftime(suffix,
+ time.localtime(parsed_suffix_time))
+ archive_name = mbox_name + parsed_suffix
+ self.assertEqual(archive_name,
+ archivemail.make_archive_name(mbox_name))
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.archive_suffix = self.old_suffix
+
+
+class TestArchiveDryRun(TestArchive):
+ """make sure the 'dry-run' option works"""
+ def setUp(self):
+ super(TestArchiveDryRun, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.dry_run = True
+
+ def testOld(self):
+ """archiving an old mailbox with the 'dry-run' option"""
+ self.make_old_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def tearDown(self):
+ archivemail.options.dry_run = False
+ archivemail.options.quiet = False
+ super(TestArchiveDryRun, self).tearDown()
+
+
+class TestArchiveDelete(TestArchive):
+ """make sure the 'delete' option works"""
+ def setUp(self):
+ super(TestArchiveDelete, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.delete_old_mail = True
+
+ def testNew(self):
+ """archiving a new mailbox with the 'delete' option"""
+ self.make_new_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testMixed(self):
+ """archiving a mixed mailbox with the 'delete' option"""
+ self.make_mixed_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testOld(self):
+ """archiving an old mailbox with the 'delete' option"""
+ self.make_old_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def tearDown(self):
+ archivemail.options.delete_old_mail = False
+ archivemail.options.quiet = False
+ super(TestArchiveDelete, self).tearDown()
+
+
+class TestArchiveCopy(TestArchive):
+ """make sure the 'copy' option works"""
+ def setUp(self):
+ super(TestArchiveCopy, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.copy_old_mail = True
+
+ def testNew(self):
+ """archiving a new mailbox with the 'copy' option"""
+ self.make_new_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testMixed(self):
+ """archiving a mixed mailbox with the 'copy' option"""
+ self.make_mixed_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testOld(self):
+ """archiving an old mailbox with the 'copy' option"""
+ self.make_old_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def tearDown(self):
+ archivemail.options.copy_old_mail = False
+ archivemail.options.quiet = False
+ super(TestArchiveCopy, self).tearDown()
+
+
+class TestArchiveMboxFlagged(unittest.TestCase):
+ """make sure the 'include_flagged' option works"""
+ def setUp(self):
+ archivemail.options.include_flagged = False
+ archivemail.options.quiet = True
+
+ def testOld(self):
+ """by default, old flagged messages should not be archived"""
+ msg = make_message(default_headers={"X-Status": "F"},
+ hours_old=24*181, wantobj=True)
+ assert not archivemail.should_archive(msg)
+
+ def testIncludeFlaggedNew(self):
+ """new flagged messages should not be archived with include_flagged"""
+ msg = make_message(default_headers={"X-Status": "F"},
+ hours_old=24*179, wantobj=True)
+ assert not archivemail.should_archive(msg)
+
+ def testIncludeFlaggedOld(self):
+ """old flagged messages should be archived with include_flagged"""
+ archivemail.options.include_flagged = True
+ msg = make_message(default_headers={"X-Status": "F"},
+ hours_old=24*181, wantobj=True)
+ assert archivemail.should_archive(msg)
+
+ def tearDown(self):
+ archivemail.options.include_flagged = False
+ archivemail.options.quiet = False
+
+
+class TestArchiveMboxOutputDir(unittest.TestCase):
+ """make sure that the 'output-dir' option works"""
+ def setUp(self):
+ archivemail.options.quiet = True
+
+ def testOld(self):
+ """archiving an old mailbox with a sepecified output dir"""
+ for dir in "/just/a/path", "relative/path":
+ archivemail.options.output_dir = dir
+ archive_dir = archivemail.make_archive_name("/tmp/mbox")
+ self.assertEqual(dir, os.path.dirname(archive_dir))
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.output_dir = None
+
+
+class TestArchiveMboxUncompressed(TestArchive):
+ """make sure that the 'no_compress' option works"""
+ mbox_name = None
+ new_mbox = None
+ old_mbox = None
+ copy_name = None
+
+ def setUp(self):
+ archivemail.options.quiet = True
+ archivemail.options.no_compress = True
+ super(TestArchiveMboxUncompressed, self).setUp()
+
+ def testOld(self):
+ """archiving an old mailbox uncompressed"""
+ self.make_old_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testNew(self):
+ """archiving a new mailbox uncompressed"""
+ self.make_new_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testMixed(self):
+ """archiving a mixed mailbox uncompressed"""
+ self.make_mixed_mbox(messages=3)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def testOldExists(self):
+ """archiving an old mailbox uncopressed with an existing archive"""
+ self.make_old_mbox(messages=3, make_old_archive=True)
+ archivemail.archive(self.mbox)
+ self.verify()
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.no_compress = False
+ super(TestArchiveMboxUncompressed, self).tearDown()
+
+
+class TestArchiveSize(unittest.TestCase):
+ """check that the 'size' argument works"""
+ def setUp(self):
+ archivemail.options.quiet = True
+ msg_text = make_message(hours_old=24*181)
+ self.msg_size = len(msg_text)
+ fp = cStringIO.StringIO(msg_text)
+ self.msg = rfc822.Message(fp)
+
+ def testSmaller(self):
+ """giving a size argument smaller than the message"""
+ archivemail.options.min_size = self.msg_size - 1
+ assert archivemail.should_archive(self.msg)
+
+ def testBigger(self):
+ """giving a size argument bigger than the message"""
+ archivemail.options.min_size = self.msg_size + 1
+ assert not archivemail.should_archive(self.msg)
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.min_size = None
+
+
+############# Test archiving maildirs ###############
+
+class TestArchiveMailboxdir(TestCaseInTempdir):
+ """Base class defining helper functions for doing test archive runs with
+ maildirs."""
+ maildir = None # Maildir that will be processed by archivemail
+ orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object
+ remaining_msg = set() # Filenames of maildir messages that should be preserved
+ number_archived = 0 # Number of messages that get archived
+ orig_archive = None # An uncompressed copy of a pre-existing archive,
+ # if one exists
+
+ def setUp(self):
+ super(TestArchiveMailboxdir, self).setUp()
+ self.orig_maildir_obj = SimpleMaildir()
+
+ def verify(self):
+ self._verify_remaining()
+ self._verify_archive()
+
+ def _verify_remaining(self):
+ """Verify that the preserved messages weren't altered."""
+ assert self.maildir
+ # Compare maildir with backup object.
+ dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root)
+ # Top-level has only directories cur, new, tmp and must be unchanged.
+ self.assertEqual(dcmp.left_list, dcmp.right_list)
+ found = set()
+ for d in dcmp.common_dirs:
+ dcmp2 = dcmp.subdirs[d]
+ # We need to verify three things.
+ # 1. directory is a subset of the original...
+ assert not dcmp2.left_only
+ # 2. all common files are identical...
+ self.assertEqual(dcmp2.common_files, dcmp2.same_files)
+ found = found.union([os.path.join(d, x) for x in dcmp2.common_files])
+ # 3. exactly the `new' messages (recorded in self.remaining_msg)
+ # were preserved.
+ self.assertEqual(found, self.remaining_msg)
+
+ def _verify_archive(self):
+ """Verify the archive correctness."""
+ # TODO: currently make_archive_name does not include the .gz suffix.
+ # Is this something that should be fixed?
+ archive = archivemail.make_archive_name(self.maildir)
+ if archivemail.options.no_compress:
+ iszipped = False
+ else:
+ archive += '.gz'
+ iszipped = True
+ if self.number_archived == 0:
+ if self.orig_archive:
+ assertEqualContent(archive, self.orig_archive, iszipped)
+ else:
+ assert not os.path.exists(archive)
+ return
+ fp_new = fp_archive = tmp_archive_name = None
+ try:
+ if self.orig_archive:
+ new_size = os.path.getsize(archive)
+ # Brute force: split archive in old and new part and verify the
+ # parts separately. (Of course this destroys the archive.)
+ fp_archive = open(archive, "r+")
+ fp_archive.seek(self.orig_archive_size)
+ fd, tmp_archive_name = tempfile.mkstemp()
+ fp_new = os.fdopen(fd, "w")
+ shutil.copyfileobj(fp_archive, fp_new)
+ fp_new.close()
+ fp_archive.truncate(self.orig_archive_size)
+ fp_archive.close()
+ assertEqualContent(archive, self.orig_archive, iszipped)
+ new_archive = tmp_archive_name
+ else:
+ new_archive = archive
+ if archivemail.options.no_compress:
+ fp_archive = open(new_archive, "r")
+ else:
+ fp_archive = FixedGzipFile(new_archive, "r")
+ mb = mailbox.UnixMailbox(fp_archive)
+ found = 0
+ for msg in mb:
+ self.verify_maildir_has_msg(self.orig_maildir_obj, msg)
+ found += 1
+ self.assertEqual(found, self.number_archived)
+ finally:
+ if tmp_archive_name:
+ os.remove(tmp_archive_name)
+ if fp_new is not None:
+ fp_new.close()
+ if fp_archive is not None:
+ fp_archive.close()
+
+ def verify_maildir_has_msg(self, maildir, msg):
+ """Assert that the given maildir has a copy of the rfc822 message."""
+ mid = msg['Message-Id'] # Complains if there is no message-id
+ mdir_msg_str, mdir_flags = \
+ maildir.get_message_and_mbox_status(mid)
+ mbox_flags = set(msg.get('status', '') + msg.get('x-status', ''))
+ self.assertEqual(mdir_flags, mbox_flags)
+
+ headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'),
+ msg.headers)
+ headers = "".join(headers)
+ msg.rewindbody()
+ # Discard last mbox LF which is not part of the message.
+ body = msg.fp.read()[:-1]
+ msg_str = headers + os.linesep + body
+ self.assertEqual(mdir_msg_str, msg_str)
+
+ def add_messages(self, body=None, headers=None, hours_old=0, messages=1):
+ for count in range(messages):
+ msg = make_message(body, default_headers=headers, mkfrom=False,
+ hours_old=hours_old)
+ self.orig_maildir_obj.write(msg, new=False)
+
+ def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1,
+ make_old_archive=False):
+ mailbox_does_change = not (archivemail.options.dry_run or
+ archivemail.options.copy_old_mail)
+ archive_does_change = not (archivemail.options.dry_run or
+ archivemail.options.delete_old_mail)
+ if mknew:
+ self.add_messages(body, headers, 179*24, messages)
+ if archive_does_change and archivemail.options.archive_all:
+ self.number_archived += messages
+ if mailbox_does_change:
+ self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
+ if mkold:
+ self.add_messages(body, headers, 181*24, messages)
+ if archive_does_change:
+ self.number_archived += messages
+ if not mailbox_does_change:
+ self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
+ self.maildir = copy_maildir(self.orig_maildir_obj.root)
+ if make_old_archive:
+ archive = archivemail.make_archive_name(self.maildir)
+ self.orig_archive = make_archive_and_plain_copy(archive)
+ # FIXME: .gz extension handling is a mess II
+ if not archivemail.options.no_compress:
+ archive += '.gz'
+ self.orig_archive_size = os.path.getsize(archive)
+
+class TestEmptyMaildir(TestCaseInTempdir):
+ def setUp(self):
+ super(TestEmptyMaildir, self).setUp()
+ archivemail.options.quiet = True
+
+ def testEmpty(self):
+ """Archiving an empty maildir should not result in an archive."""
+ self.mdir = SimpleMaildir()
+ archivemail.archive(self.mdir.root)
+ assert not os.path.exists(self.mdir.root + '_archive.gz')
+
+ def tearDown(self):
+ super(TestEmptyMaildir, self).tearDown()
+ archivemail.options.quiet = False
+
+class TestMaildir(TestArchiveMailboxdir):
+ def setUp(self):
+ super(TestMaildir, self).setUp()
+ archivemail.options.quiet = True
+
+ def testOld(self):
+ self.make_maildir(True, False, messages=3)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def testNew(self):
+ self.make_maildir(False, True, messages=3)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def testMixed(self):
+ self.make_maildir(True, True, messages=3)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def testMixedExisting(self):
+ self.make_maildir(True, True, messages=3, make_old_archive=True)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ super(TestMaildir, self).tearDown()
+
+
+class TestMaildirPreserveUnread(TestCaseInTempdir):
+ """Test if the preserve_unread option works with maildirs."""
+ def setUp(self):
+ super(TestMaildirPreserveUnread, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.preserve_unread = True
+
+ def testOldRead(self):
+ """--preserve-unread archives old read messages in a maildir."""
+ smd = SimpleMaildir("orig")
+ msg = make_message(hours_old=24*181)
+ smd.write(msg, new=False, flags='S')
+ md = mailbox.Maildir(smd.root)
+ msg_obj = md.next()
+ assert archivemail.should_archive(msg_obj)
+
+ def testOldUnread(self):
+ """--preserve-unread preserves old unread messages in a maildir."""
+ smd = SimpleMaildir("orig")
+ msg = make_message(hours_old=24*181)
+ smd.write(msg, new=False)
+ md = mailbox.Maildir(smd.root)
+ msg_obj = md.next()
+ assert not archivemail.should_archive(msg_obj)
+
+ def tearDown(self):
+ archivemail.options.quiet = False
+ archivemail.options.preserve_unread = False
+ super(TestMaildirPreserveUnread, self).tearDown()
+
+class TestMaildirAll(TestArchiveMailboxdir):
+ def setUp(self):
+ super(TestMaildirAll, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.archive_all = True
+
+ def testNew(self):
+ """New maildir messages should be archived with --all"""
+ self.add_messages(hours_old=24*181)
+ md = mailbox.Maildir(self.orig_maildir_obj.root)
+ msg_obj = md.next()
+ assert archivemail.should_archive(msg_obj)
+
+ def testOld(self):
+ """Old maildir messages should be archived with --all"""
+ self.add_messages(hours_old=24*179)
+ md = mailbox.Maildir(self.orig_maildir_obj.root)
+ msg_obj = md.next()
+ assert archivemail.should_archive(msg_obj)
+
+ def tearDown(self):
+ super(TestMaildirAll, self).tearDown()
+ archivemail.options.quiet = False
+ archivemail.options.archive_all = False
+
+class TestMaildirDryRun(TestArchiveMailboxdir):
+ def setUp(self):
+ super(TestMaildirDryRun, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.dry_run = True
+
+ def testOld(self):
+ """archiving an old maildir mailbox with the 'dry-run' option"""
+ self.make_maildir(True, False)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def tearDown(self):
+ super(TestMaildirDryRun, self).tearDown()
+ archivemail.options.quiet = False
+ archivemail.options.dry_run = False
+
+class TestMaildirDelete(TestArchiveMailboxdir):
+ def setUp(self):
+ super(TestMaildirDelete, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.delete_old_mail = True
+
+ def testOld(self):
+ """archiving an old maildir mailbox with the 'delete' option"""
+ self.make_maildir(True, False)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def testNew(self):
+ """archiving a new maildir mailbox with the 'delete' option"""
+ self.make_maildir(False, True)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def tearDown(self):
+ super(TestMaildirDelete, self).tearDown()
+ archivemail.options.quiet = False
+ archivemail.options.delete_old_mail = False
+
+class TestMaildirCopy(TestArchiveMailboxdir):
+ def setUp(self):
+ super(TestMaildirCopy, self).setUp()
+ archivemail.options.quiet = True
+ archivemail.options.copy_old_mail = True
+
+ def testOld(self):
+ """archiving an old maildir mailbox with the 'copy' option"""
+ self.make_maildir(True, False)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def testNew(self):
+ """archiving a new maildir mailbox with the 'copy' option"""
+ self.make_maildir(False, True)
+ archivemail.archive(self.maildir)
+ self.verify()
+
+ def tearDown(self):
+ super(TestMaildirCopy, self).tearDown()
+ archivemail.options.quiet = False
+ archivemail.options.copy_old_mail = False
+
+class TestArchiveMaildirFlagged(TestCaseInTempdir):
+ """make sure the 'include_flagged' option works with maildir messages"""
+ def setUp(self):
+ super(TestArchiveMaildirFlagged, self).setUp()
+ archivemail.options.include_flagged = False
+ archivemail.options.quiet = True
+
+ def testOld(self):
+ """by default, old flagged maildir messages should not be archived"""
+ smd = SimpleMaildir("orig")
+ msg = make_message(hours_old=24*181)
+ smd.write(msg, new=False, flags='F')
+ md = mailbox.Maildir(smd.root)
+ msg_obj = md.next()
+ assert not archivemail.should_archive(msg_obj)
+
+ def testIncludeFlaggedNew(self):
+ """new flagged maildir messages should not be archived with include_flagged"""
+ smd = SimpleMaildir("orig")
+ msg = make_message(hours_old=24*179)
+ smd.write(msg, new=False, flags='F')
+ md = mailbox.Maildir(smd.root)
+ msg_obj = md.next()
+ assert not archivemail.should_archive(msg_obj)
+
+ def testIncludeFlaggedOld(self):
+ """old flagged maildir messages should be archived with include_flagged"""
+ archivemail.options.include_flagged = True
+ smd = SimpleMaildir("orig")
+ msg = make_message(hours_old=24*181)
+ smd.write(msg, new=False, flags='F')
+ md = mailbox.Maildir(smd.root)
+ msg_obj = md.next()
+ assert archivemail.should_archive(msg_obj)
+
+ def tearDown(self):
+ super(TestArchiveMaildirFlagged, self).tearDown()
+ archivemail.options.include_flagged = False
+ archivemail.options.quiet = False
+
+class TestArchiveMaildirSize(TestCaseInTempdir):
+ """check that the 'size' argument works with maildir messages"""
+ def setUp(self):
+ super(TestArchiveMaildirSize, self).setUp()
+ archivemail.options.quiet = True
+ msg = make_message(hours_old=24*181)
+ self.msg_size = len(msg)
+ smd = SimpleMaildir("orig")
+ smd.write(msg, new=False)
+ md = mailbox.Maildir(smd.root)
+ self.msg_obj = md.next()
+
+ def testSmaller(self):
+ """giving a size argument smaller than the maildir message"""
+ archivemail.options.min_size = self.msg_size - 1
+ assert archivemail.should_archive(self.msg_obj)
+
+ def testBigger(self):
+ """giving a size argument bigger than the maildir message"""
+ archivemail.options.min_size = self.msg_size + 1
+ assert not archivemail.should_archive(self.msg_obj)
+
+ def tearDown(self):
+ super(TestArchiveMaildirSize, self).tearDown()
+ archivemail.options.quiet = False
+ archivemail.options.min_size = None
+
+########## helper routines ############
+
+def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False):
+ headers = copy.copy(default_headers)
+ if not headers:
+ headers = {}
+ headers['Message-Id'] = make_msgid()
+ if not headers.has_key('Date'):
+ time_message = time.time() - (60 * 60 * hours_old)
+ headers['Date'] = time.asctime(time.localtime(time_message))
+ if not headers.has_key('From'):
+ headers['From'] = "sender@dummy.domain"
+ if not headers.has_key('To'):
+ headers['To'] = "receipient@dummy.domain"
+ if not headers.has_key('Subject'):
+ headers['Subject'] = "This is the subject"
+ if mkfrom and not headers.has_key('From_'):
+ headers['From_'] = "%s %s" % (headers['From'], headers['Date'])
+ if not body:
+ body = "This is the message body"
+
+ msg = ""
+ if headers.has_key('From_'):
+ msg = msg + ("From %s\n" % headers['From_'])
+ del headers['From_']
+ for key in headers.keys():
+ if headers[key] is not None:
+ msg = msg + ("%s: %s\n" % (key, headers[key]))
+ msg = msg + "\n\n" + body + "\n\n"
+ if not wantobj:
+ return msg
+ fp = cStringIO.StringIO(msg)
+ return rfc822.Message(fp)
+
+def append_file(source, dest):
+ """appends the file named 'source' to the file named 'dest'"""
+ assert os.path.isfile(source)
+ assert os.path.isfile(dest)
+ read = open(source, "r")
+ write = open(dest, "a+")
+ shutil.copyfileobj(read,write)
+ read.close()
+ write.close()
+
+
+def make_mbox(body=None, headers=None, hours_old=0, messages=1):
+ assert tempfile.tempdir
+ fd, name = tempfile.mkstemp()
+ file = os.fdopen(fd, "w")
+ for count in range(messages):
+ msg = make_message(body=body, default_headers=headers,
+ mkfrom=True, hours_old=hours_old)
+ file.write(msg)
+ file.close()
+ return name
+
+def make_archive_and_plain_copy(archive_name):
+ """Make an mbox archive of the given name like archivemail may have
+ created it. Also make an uncompressed copy of this archive and return its
+ name."""
+ copy_fd, copy_name = tempfile.mkstemp()
+ copy_fp = os.fdopen(copy_fd, "w")
+ if archivemail.options.no_compress:
+ fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
+ fp = os.fdopen(fd, "w")
+ else:
+ archive_name += ".gz"
+ fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
+ rawfp = os.fdopen(fd, "w")
+ fp = gzip.GzipFile(fileobj=rawfp)
+ for count in range(3):
+ msg = make_message(hours_old=24*360)
+ fp.write(msg)
+ copy_fp.write(msg)
+ fp.close()
+ copy_fp.close()
+ if not archivemail.options.no_compress:
+ rawfp.close()
+ return copy_name
+
+def copy_maildir(maildir, prefix="tmp"):
+ """Create a copy of the given maildir and return the absolute path of the
+ new direcory."""
+ newdir = tempfile.mkdtemp(prefix=prefix)
+ for d in "cur", "new", "tmp":
+ shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d))
+ return newdir
+
+def assertEqualContent(firstfile, secondfile, zippedfirst=False):
+ """Verify that the two files exist and have identical content. If zippedfirst
+ is True, assume that firstfile is gzip-compressed."""
+ assert os.path.exists(firstfile)
+ assert os.path.exists(secondfile)
+ if zippedfirst:
+ try:
+ fp1 = gzip.GzipFile(firstfile, "r")
+ fp2 = open(secondfile, "r")
+ assert cmp_fileobj(fp1, fp2)
+ finally:
+ fp1.close()
+ fp2.close()
+ else:
+ assert filecmp.cmp(firstfile, secondfile, shallow=0)
+
+def cmp_fileobj(fp1, fp2):
+ """Return if reading the fileobjects yields identical content."""
+ bufsize = 8192
+ while True:
+ b1 = fp1.read(bufsize)
+ b2 = fp2.read(bufsize)
+ if b1 != b2:
+ return False
+ if not b1:
+ return True
+
+if __name__ == "__main__":
+ unittest.main()