From febd030e1461d4f412dfb98e58d6219eeb01ebd2 Mon Sep 17 00:00:00 2001 From: Nikolaus Schulz Date: Thu, 29 Jul 2010 20:49:39 +0200 Subject: Drop .py extension from the unittest script --- MANIFEST | 2 +- Makefile | 2 +- README | 2 +- TODO | 2 +- test_archivemail | 1593 +++++++++++++++++++++++++++++++++++++++++++++++++++ test_archivemail.py | 1593 --------------------------------------------------- 6 files changed, 1597 insertions(+), 1597 deletions(-) create mode 100755 test_archivemail delete mode 100755 test_archivemail.py diff --git a/MANIFEST b/MANIFEST index 4cdcd6f..ba349c4 100644 --- a/MANIFEST +++ b/MANIFEST @@ -9,4 +9,4 @@ archivemail.1 archivemail.sgml examples/archivemail_all setup.py -test_archivemail.py +test_archivemail diff --git a/Makefile b/Makefile index 512f9e3..a43097c 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ clean: rm -rf $(HTDOCS) test: - python test_archivemail.py + python test_archivemail clobber: clean rm -rf build dist diff --git a/README b/README index 6389006..b55006b 100644 --- a/README +++ b/README @@ -36,7 +36,7 @@ INSTALLATION: If you want to test archivemail: cp archivemail archivemail.py - python test_archivemail.py + python test_archivemail (NOTE: This could take over 90 seconds on slower systems) To install archivemail, run: diff --git a/TODO b/TODO index 97ca572..aafd4a5 100644 --- a/TODO +++ b/TODO @@ -110,7 +110,7 @@ run. Better to fail gracefully and keep going. Think about the best way to specify the names of archives created with possibly an --archive-name option. -Add more tests (see top of test_archivemail.py) +Add more tests (see top of test_archivemail) We need some better checking to see if we are really looking at a valid mbox-format mailbox. diff --git a/test_archivemail b/test_archivemail new file mode 100755 index 0000000..4a11734 --- /dev/null +++ b/test_archivemail @@ -0,0 +1,1593 @@ +#! /usr/bin/env python +############################################################################ +# Copyright (C) 2002 Paul Rodger +# (C) 2006-2010 Nikolaus Schulz +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +############################################################################ +""" +Unit-test archivemail using 'PyUnit'. + +TODO: add tests for: + * dotlock locks already existing + * archiving MH-format mailboxes + * a 3rd party process changing the mbox file being read + +""" + +import sys + +def check_python_version(): + """Abort if we are running on python < v2.3""" + too_old_error = "This test script requires python version 2.3 or later. " + \ + "Your version of python is:\n%s" % sys.version + try: + version = sys.version_info # we might not even have this function! :) + if (version[0] < 2) or (version[0] == 2 and version[1] < 3): + print too_old_error + sys.exit(1) + except AttributeError: + print too_old_error + sys.exit(1) + +# define & run this early because 'unittest' requires Python >= 2.1 +check_python_version() + +import copy +import fcntl +import filecmp +import os +import re +import shutil +import stat +import tempfile +import time +import unittest +import gzip +import cStringIO +import rfc822 +import mailbox + +try: + import archivemail +except ImportError: + print "The archivemail script needs to be called 'archivemail.py'" + print "and should be in the current directory in order to be imported" + print "and tested. Sorry." + if os.path.isfile("archivemail"): + print "Try renaming it from 'archivemail' to 'archivemail.py'." + sys.exit(1) + +# We want to iterate over messages in a compressed archive mbox and verify +# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in +# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered +# by mailbox._PartialFile.seek(). The bug is still pending as of Python +# 2.5.2. To work around it, we subclass gzip.GzipFile. +# +# It should be noted that seeking backwards in a GzipFile is emulated by +# re-reading the entire file from the beginning, which is extremely +# inefficient and won't work with large files; but our test archives are all +# small, so it's okay. + +class FixedGzipFile(gzip.GzipFile): + """GzipFile with seek method accepting whence parameter.""" + def seek(self, offset, whence=0): + try: + gzip.GzipFile.seek(self, offset, whence) + except TypeError: + if whence: + if whence == 1: + offset = self.offset + offset + else: + raise ValueError('Seek from end not supported') + gzip.GzipFile.seek(self, offset) + +# precision of os.utime() when restoring mbox timestamps +utimes_precision = 5 + +class MessageIdFactory: + """Factory to create `uniqe' message-ids.""" + def __init__(self): + self.seq = 0 + def __call__(self): + self.seq += 1 + return "" % self.seq + +make_msgid = MessageIdFactory() + +class IndexedMailboxDir: + """An indexed mailbox directory, providing random message access by + message-id. Intended as a base class for a maildir and an mh subclass.""" + + def __init__(self, mdir_name): + assert tempfile.tempdir + self.root = tempfile.mkdtemp(prefix=mdir_name) + self.msg_id_dict = {} + self.deliveries = 0 + + def _add_to_index(self, msg_text, fpath): + """Add the given message to the index, for later random access.""" + # Extract the message-id as index key + msg_id = None + fp = cStringIO.StringIO(msg_text) + while True: + line = fp.readline() + # line empty means we didn't find a message-id + assert line + if line.lower().startswith("message-id:"): + msg_id = line.split(":", 1)[-1].strip() + assert msg_id + break + assert not self.msg_id_dict.has_key(msg_id) + self.msg_id_dict[msg_id] = fpath + + def get_all_filenames(self): + """Return all relative pathnames of files in this mailbox.""" + return self.msg_id_dict.values() + +class SimpleMaildir(IndexedMailboxDir): + """Primitive Maildir class, just good enough for generating short-lived + test maildirs.""" + + def __init__(self, mdir_name='maildir'): + IndexedMailboxDir.__init__(self, mdir_name) + for d in "cur", "tmp", "new": + os.mkdir(os.path.join(self.root, d)) + + def write(self, msg_str, new=True, flags=[]): + """Store a message with the given flags.""" + assert not (new and flags) + if new: + subdir = "new" + else: + subdir = "cur" + fname = self._mkname(new, flags) + relpath = os.path.join(subdir, fname) + path = os.path.join(self.root, relpath) + assert not os.path.exists(path) + f = open(path, "w") + f.write(msg_str) + f.close() + self._add_to_index(msg_str, relpath) + + def _mkname(self, new, flags): + """Generate a unique filename for a new message.""" + validflags = 'DFPRST' + for f in flags: + assert f in validflags + # This 'unique' name should be good enough, since nobody else + # will ever write messages to this maildir folder. + uniq = str(self.deliveries) + self.deliveries += 1 + if new: + return uniq + if not flags: + return uniq + ':2,' + finfo = "".join(sorted(flags)) + return uniq + ':2,' + finfo + + def get_message_and_mbox_status(self, msgid): + """For the Message-Id msgid, return the matching message in text + format and its status, expressed as a set of mbox flags.""" + fpath = self.msg_id_dict[msgid] # Barfs if not found + mdir_flags = fpath.rsplit('2,', 1)[-1] + flagmap = { + 'F': 'F', + 'R': 'A', + 'S': 'R' + } + mbox_flags = set([flagmap[x] for x in mdir_flags]) + if fpath.startswith("cur/"): + mbox_flags.add('O') + fp = open(os.path.join(self.root, fpath), "r") + msg = fp.read() + fp.close() + return msg, mbox_flags + + +class TestCaseInTempdir(unittest.TestCase): + """Base class for testcases that need to create temporary files. + All testcases that create temporary files should be derived from this + class, not directly from unittest.TestCase. + TestCaseInTempdir provides these methods: + + setUp() Creates a safe temporary directory and sets tempfile.tempdir. + + tearDown() Recursively removes the temporary directory and unsets + tempfile.tempdir. + + Overriding methods should call the ones above.""" + temproot = None + + def setUp(self): + if not self.temproot: + assert not tempfile.tempdir + self.temproot = tempfile.tempdir = \ + tempfile.mkdtemp(prefix="test-archivemail") + + def tearDown(self): + assert tempfile.tempdir == self.temproot + if self.temproot: + shutil.rmtree(self.temproot) + tempfile.tempdir = self.temproot = None + + +############ Mbox Class testing ############## + +class TestMboxDotlock(TestCaseInTempdir): + def setUp(self): + super(TestMboxDotlock, self).setUp() + self.mbox_name = make_mbox() + self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.mbox = archivemail.Mbox(self.mbox_name) + + def testDotlock(self): + """dotlock_lock/unlock should create/delete a lockfile""" + lock = self.mbox_name + ".lock" + self.mbox._dotlock_lock() + assert os.path.isfile(lock) + self.mbox._dotlock_unlock() + assert not os.path.isfile(lock) + + def testDotlockingSucceedsUponEACCES(self): + """A dotlock should silently be omitted upon EACCES.""" + archivemail.options.quiet = True + mbox_dir = os.path.dirname(self.mbox_name) + os.chmod(mbox_dir, 0500) + try: + self.mbox._dotlock_lock() + finally: + os.chmod(mbox_dir, 0700) + archivemail.options.quiet = False + +class TestMboxPosixLock(TestCaseInTempdir): + def setUp(self): + super(TestMboxPosixLock, self).setUp() + self.mbox_name = make_mbox() + self.mbox = archivemail.Mbox(self.mbox_name) + + def testPosixLock(self): + """posix_lock/unlock should create/delete an advisory lock""" + + # The following code snippet heavily lends from the Python 2.5 mailbox + # unittest. + # BEGIN robbery: + + # Fork off a subprocess that will lock the file for 2 seconds, + # unlock it, and then exit. + pid = os.fork() + if pid == 0: + # In the child, lock the mailbox. + self.mbox._posix_lock() + time.sleep(2) + self.mbox._posix_unlock() + os._exit(0) + + # In the parent, sleep a bit to give the child time to acquire + # the lock. + time.sleep(0.5) + # The parent's file self.mbox.mbox_file shares fcntl locks with the + # duplicated FD in the child; reopen it so we get a different file + # table entry. + file = open(self.mbox_name, "r+") + lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB + fd = file.fileno() + try: + self.assertRaises(IOError, fcntl.lockf, fd, lock_nb) + + finally: + # Wait for child to exit. Locking should now succeed. + exited_pid, status = os.waitpid(pid, 0) + + fcntl.lockf(fd, lock_nb) + fcntl.lockf(fd, fcntl.LOCK_UN) + # END robbery + + +class TestMboxNext(TestCaseInTempdir): + def setUp(self): + super(TestMboxNext, self).setUp() + self.not_empty_name = make_mbox(messages=18) + self.empty_name = make_mbox(messages=0) + + def testNextEmpty(self): + """mbox.next() should return None on an empty mailbox""" + mbox = archivemail.Mbox(self.empty_name) + msg = mbox.next() + self.assertEqual(msg, None) + + def testNextNotEmpty(self): + """mbox.next() should a message on a populated mailbox""" + mbox = archivemail.Mbox(self.not_empty_name) + for count in range(18): + msg = mbox.next() + assert msg + msg = mbox.next() + self.assertEqual(msg, None) + + +############ TempMbox Class testing ############## + +class TestTempMboxWrite(TestCaseInTempdir): + def setUp(self): + super(TestTempMboxWrite, self).setUp() + + def testWrite(self): + """mbox.write() should append messages to a mbox mailbox""" + read_file = make_mbox(messages=3) + mbox_read = archivemail.Mbox(read_file) + mbox_write = archivemail.TempMbox() + write_file = mbox_write.mbox_file_name + for count in range(3): + msg = mbox_read.next() + mbox_write.write(msg) + mbox_read.close() + mbox_write.close() + assert filecmp.cmp(read_file, write_file, shallow=0) + + def testWriteNone(self): + """calling mbox.write() with no message should raise AssertionError""" + write = archivemail.TempMbox() + self.assertRaises(AssertionError, write.write, None) + +class TestTempMboxRemove(TestCaseInTempdir): + def setUp(self): + super(TestTempMboxRemove, self).setUp() + self.mbox = archivemail.TempMbox() + self.mbox_name = self.mbox.mbox_file_name + + def testMboxRemove(self): + """remove() should delete a mbox mailbox""" + assert os.path.exists(self.mbox_name) + self.mbox.remove() + assert not os.path.exists(self.mbox_name) + + + +########## options class testing ################# + +class TestOptionDefaults(unittest.TestCase): + def testVerbose(self): + """verbose should be off by default""" + self.assertEqual(archivemail.options.verbose, False) + + def testDaysOldMax(self): + """default archival time should be 180 days""" + self.assertEqual(archivemail.options.days_old_max, 180) + + def testQuiet(self): + """quiet should be off by default""" + self.assertEqual(archivemail.options.quiet, False) + + def testDeleteOldMail(self): + """we should not delete old mail by default""" + self.assertEqual(archivemail.options.delete_old_mail, False) + + def testNoCompress(self): + """no-compression should be off by default""" + self.assertEqual(archivemail.options.no_compress, False) + + def testIncludeFlagged(self): + """we should not archive flagged messages by default""" + self.assertEqual(archivemail.options.include_flagged, False) + + def testPreserveUnread(self): + """we should not preserve unread messages by default""" + self.assertEqual(archivemail.options.preserve_unread, False) + +class TestOptionParser(unittest.TestCase): + def setUp(self): + self.oldopts = copy.copy(archivemail.options) + + def testOptionDate(self): + """--date and -D options are parsed correctly""" + date_formats = ( + "%Y-%m-%d", # ISO format + "%d %b %Y" , # Internet format + "%d %B %Y" , # Internet format with full month names + ) + date = time.strptime("2000-07-29", "%Y-%m-%d") + unixdate = time.mktime(date) + for df in date_formats: + d = time.strftime(df, date) + for opt in '-D', '--date=': + archivemail.options.date_old_max = None + archivemail.options.parse_args([opt+d], "") + self.assertEqual(unixdate, archivemail.options.date_old_max) + + def testOptionPreserveUnread(self): + """--preserve-unread option is parsed correctly""" + archivemail.options.parse_args(["--preserve-unread"], "") + assert archivemail.options.preserve_unread + archivemail.options.preserve_unread = False + archivemail.options.parse_args(["-u"], "") + assert archivemail.options.preserve_unread + + def testOptionSuffix(self): + """--suffix and -s options are parsed correctly""" + for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): + archivemail.options.parse_args(["--suffix="+suffix], "") + assert archivemail.options.archive_suffix == suffix + archivemail.options.suffix = None + archivemail.options.parse_args(["-s", suffix], "") + assert archivemail.options.archive_suffix == suffix + + def testOptionDryrun(self): + """--dry-run option is parsed correctly""" + archivemail.options.parse_args(["--dry-run"], "") + assert archivemail.options.dry_run + archivemail.options.preserve_unread = False + archivemail.options.parse_args(["-n"], "") + assert archivemail.options.dry_run + + def testOptionDays(self): + """--days and -d options are parsed correctly""" + archivemail.options.parse_args(["--days=11"], "") + self.assertEqual(archivemail.options.days_old_max, 11) + archivemail.options.days_old_max = None + archivemail.options.parse_args(["-d11"], "") + self.assertEqual(archivemail.options.days_old_max, 11) + + def testOptionDelete(self): + """--delete option is parsed correctly""" + archivemail.options.parse_args(["--delete"], "") + assert archivemail.options.delete_old_mail + + def testOptionCopy(self): + """--copy option is parsed correctly""" + archivemail.options.parse_args(["--copy"], "") + assert archivemail.options.copy_old_mail + + def testOptionOutputdir(self): + """--output-dir and -o options are parsed correctly""" + for path in "/just/some/path", "relative/path": + archivemail.options.parse_args(["--output-dir=%s" % path], "") + self.assertEqual(archivemail.options.output_dir, path) + archivemail.options.output_dir = None + archivemail.options.parse_args(["-o%s" % path], "") + self.assertEqual(archivemail.options.output_dir, path) + + def testOptionNocompress(self): + """--no-compress option is parsed correctly""" + archivemail.options.parse_args(["--no-compress"], "") + assert archivemail.options.no_compress + + def testOptionSize(self): + """--size and -S options are parsed correctly""" + size = "666" + archivemail.options.parse_args(["--size=%s" % size ], "") + self.assertEqual(archivemail.options.min_size, int(size)) + archivemail.options.parse_args(["-S%s" % size ], "") + self.assertEqual(archivemail.options.min_size, int(size)) + + def tearDown(self): + archivemail.options = self.oldopts + +########## archivemail.is_older_than_days() unit testing ################# + +class TestIsTooOld(unittest.TestCase): + def testVeryOld(self): + """with max_days=360, should be true for these dates > 1 year""" + for years in range(1, 10): + time_msg = time.time() - (years * 365 * 24 * 60 * 60) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=360) + + def testOld(self): + """with max_days=14, should be true for these dates > 14 days""" + for days in range(14, 360): + time_msg = time.time() - (days * 24 * 60 * 60) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=14) + + def testJustOld(self): + """with max_days=1, should be true for these dates >= 1 day""" + for minutes in range(0, 61): + time_msg = time.time() - (25 * 60 * 60) + (minutes * 60) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=1) + + def testNotOld(self): + """with max_days=9, should be false for these dates < 9 days""" + for days in range(0, 9): + time_msg = time.time() - (days * 24 * 60 * 60) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=9) + + def testJustNotOld(self): + """with max_days=1, should be false for these hours <= 1 day""" + for minutes in range(0, 60): + time_msg = time.time() - (23 * 60 * 60) - (minutes * 60) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=1) + + def testFuture(self): + """with max_days=1, should be false for times in the future""" + for minutes in range(0, 60): + time_msg = time.time() + (minutes * 60) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=1) + +########## archivemail.parse_imap_url() unit testing ################# + +class TestParseIMAPUrl(unittest.TestCase): + def setUp(self): + archivemail.options.quiet = True + archivemail.options.verbose = False + archivemail.options.pwfile = None + + urls_withoutpass = [ + ('imaps://user@example.org@imap.example.org/upperbox/lowerbox', + ('user', None, 'example.org@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://"user@example.org"@imap.example.org/upperbox/lowerbox', + ('user@example.org', None, 'imap.example.org', + 'upperbox/lowerbox')), + ('imaps://user@example.org"@imap.example.org/upperbox/lowerbox', + ('user', None, 'example.org"@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox', + ('"user', None, 'example.org@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox', + ('us"er@example.org', None, 'imap.example.org', + 'upperbox/lowerbox')), + ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox', + ('user\\', None, 'example.org@imap.example.org', + 'upperbox/lowerbox')) + ] + urls_withpass = [ + ('imaps://user@example.org:passwd@imap.example.org/upperbox/lowerbox', + ('user@example.org', 'passwd', 'imap.example.org', + 'upperbox/lowerbox'), + ('user', None, 'example.org:passwd@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox', + ('"user@example.org', "passwd", 'imap.example.org', + 'upperbox/lowerbox'), + ('"user', None, 'example.org:passwd@imap.example.org', + 'upperbox/lowerbox')), + ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox', + ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org', + 'upperbox/lowerbox'), + ('u\\ser\\', None, 'example.org:"p@sswd"@imap.example.org', + 'upperbox/lowerbox')) + ] + # These are invalid when the password's not stripped. + urls_onlywithpass = [ + ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox', + ('user@example.org', "passwd", 'imap.example.org', + 'upperbox/lowerbox')) + ] + def testUrlsWithoutPwfile(self): + """Parse test urls with --pwfile option unset. This parses a password in + the URL, if present.""" + archivemail.options.pwfile = None + for mbstr in self.urls_withpass + self.urls_withoutpass: + url = mbstr[0][mbstr[0].find('://')+3:] + result = archivemail.parse_imap_url(url) + self.assertEqual(result, mbstr[1]) + + def testUrlsWithPwfile(self): + """Parse test urls with --pwfile set. In this case the ':' character + loses its meaning as a delimiter.""" + archivemail.options.pwfile = "whocares.txt" + for mbstr in self.urls_withpass: + url = mbstr[0][mbstr[0].find('://')+3:] + result = archivemail.parse_imap_url(url) + self.assertEqual(result, mbstr[2]) + for mbstr in self.urls_onlywithpass: + url = mbstr[0][mbstr[0].find('://')+3:] + self.assertRaises(archivemail.UnexpectedError, + archivemail.parse_imap_url, url) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.verbose = False + archivemail.options.pwfile = None + +########## acceptance testing ########### + +class TestArchive(TestCaseInTempdir): + """Base class defining helper functions for doing test archiving runs.""" + mbox = None # mbox file that will be processed by archivemail + good_archive = None # Uncompressed reference archive file to verify the + # archive after processing + good_mbox = None # Reference mbox file to verify the mbox after processing + + def verify(self): + assert os.path.exists(self.mbox) + if self.good_mbox is not None: + assertEqualContent(self.mbox, self.good_mbox) + else: + self.assertEqual(os.path.getsize(self.mbox), 0) + archive_name = self.mbox + "_archive" + if not archivemail.options.no_compress: + archive_name += ".gz" + iszipped = True + else: + assert not os.path.exists(archive_name + ".gz") + iszipped = False + if self.good_archive is not None: + assertEqualContent(archive_name, self.good_archive, iszipped) + else: + assert not os.path.exists(archive_name) + + def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): + """Prepare for a test run with an old mbox by making an old mbox, + optionally an existing archive, and a reference archive to verify the + archive after archivemail has run.""" + self.mbox = make_mbox(body, headers, 181*24, messages) + archive_does_change = not (archivemail.options.dry_run or + archivemail.options.delete_old_mail) + mbox_does_not_change = archivemail.options.dry_run or \ + archivemail.options.copy_old_mail + if make_old_archive: + archive = archivemail.make_archive_name(self.mbox) + self.good_archive = make_archive_and_plain_copy(archive) + if archive_does_change: + append_file(self.mbox, self.good_archive) + elif archive_does_change: + self.good_archive = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_archive) + if mbox_does_not_change: + if archive_does_change and not make_old_archive: + self.good_mbox = self.good_archive + else: + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + + def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): + """Prepare for a test run with a mixed mbox by making a mixed mbox, + optionally an existing archive, a reference archive to verify the + archive after archivemail has run, and likewise a reference mbox to + verify the mbox.""" + self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive) + new_mbox_name = make_mbox(body, headers, 179*24, messages) + append_file(new_mbox_name, self.mbox) + if self.good_mbox is None: + self.good_mbox = new_mbox_name + else: + if self.good_mbox == self.good_archive: + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + else: + append_file(new_mbox_name, self.good_mbox) + + def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): + """Prepare for a test run with a new mbox by making a new mbox, + optionally an exiting archive, and a reference mbox to verify the mbox + after archivemail has run.""" + self.mbox = make_mbox(body, headers, 179*24, messages) + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + if make_old_archive: + archive = archivemail.make_archive_name(self.mbox) + self.good_archive = make_archive_and_plain_copy(archive) + + +class TestArchiveMbox(TestArchive): + """archiving should work based on the date of messages given""" + + def setUp(self): + self.oldopts = copy.copy(archivemail.options) + archivemail.options.quiet = True + super(TestArchiveMbox, self).setUp() + + def testOld(self): + """archiving an old mailbox""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOldFromInBody(self): + """archiving an old mailbox with 'From ' in the body""" + body = """This is a message with ^From at the start of a line +From is on this line +This is after the ^From line""" + self.make_old_mbox(messages=3, body=body) + archivemail.archive(self.mbox) + self.verify() + + def testDateSystem(self): + """test that the --date option works as expected""" + test_headers = ( + { + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : None, + }, + { + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : None, + 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : None, + 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + ) + for headers in test_headers: + msg = make_message(default_headers=headers, wantobj=True) + date = time.strptime("2000-07-29", "%Y-%m-%d") + archivemail.options.date_old_max = time.mktime(date) + assert archivemail.should_archive(msg) + date = time.strptime("2000-07-27", "%Y-%m-%d") + archivemail.options.date_old_max = time.mktime(date) + assert not archivemail.should_archive(msg) + + def testMixed(self): + """archiving a mixed mailbox""" + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testNew(self): + """archiving a new mailbox""" + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOldExisting(self): + """archiving an old mailbox with an existing archive""" + self.make_old_mbox(messages=3, make_old_archive=True) + archivemail.archive(self.mbox) + self.verify() + + def testOldWeirdHeaders(self): + """archiving old mailboxes with weird headers""" + weird_headers = ( + { # we should archive because of the date on the 'From_' line + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000', + }, + { # we should archive because of the date on the 'From_' line + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : None, + }, + { # we should archive because of the date in 'Delivery-date' + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', + 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { # we should archive because of the date in 'Delivery-date' + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : None, + 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { # we should archive because of the date in 'Resent-Date' + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', + 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { # we should archive because of the date in 'Resent-Date' + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', + 'Date' : None, + 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', + }, + { # completely blank dates were crashing < version 0.4.7 + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : '', + }, + { # completely blank dates were crashing < version 0.4.7 + 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', + 'Date' : '', + 'Resent-Date' : '', + }, + ) + fd, self.mbox = tempfile.mkstemp() + fp = os.fdopen(fd, "w") + for headers in weird_headers: + msg_text = make_message(default_headers=headers) + fp.write(msg_text*2) + fp.close() + self.good_archive = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_archive) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options = self.oldopts + super(TestArchiveMbox, self).tearDown() + + +class TestArchiveMboxTimestamp(TestCaseInTempdir): + """original mbox timestamps should always be preserved""" + def setUp(self): + super(TestArchiveMboxTimestamp, self).setUp() + archivemail.options.quiet = True + self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180)) + self.mtime = os.path.getmtime(self.mbox_name) - 66 + self.atime = os.path.getatime(self.mbox_name) - 88 + os.utime(self.mbox_name, (self.atime, self.mtime)) + + def testNew(self): + """mbox timestamps should not change after no archival""" + archivemail.options.days_old_max = 181 + archivemail.archive(self.mbox_name) + self.verify() + + def testOld(self): + """mbox timestamps should not change after archival""" + archivemail.options.days_old_max = 179 + archivemail.archive(self.mbox_name) + self.verify() + + def verify(self): + assert os.path.exists(self.mbox_name) + new_atime = os.path.getatime(self.mbox_name) + new_mtime = os.path.getmtime(self.mbox_name) + self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision) + self.assertAlmostEqual(self.atime, new_atime, utimes_precision) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.days_old_max = 180 + os.remove(self.mbox_name) + super(TestArchiveMboxTimestamp, self).tearDown() + + +class TestArchiveMboxAll(unittest.TestCase): + def setUp(self): + archivemail.options.quiet = True + archivemail.options.archive_all = True + + def testNew(self): + """new messages should be archived with --all""" + self.msg = make_message(hours_old=24*179, wantobj=True) + assert archivemail.should_archive(self.msg) + + def testOld(self): + """old messages should be archived with --all""" + self.msg = make_message(hours_old=24*181, wantobj=True) + assert archivemail.should_archive(self.msg) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.archive_all = False + +class TestArchiveMboxPreserveUnread(unittest.TestCase): + """make sure the 'preserve_unread' option works""" + def setUp(self): + archivemail.options.quiet = True + archivemail.options.preserve_unread = True + self.msg = make_message(hours_old=24*181, wantobj=True) + + def testOldRead(self): + """old read messages should be archived with --preserve-unread""" + self.msg["Status"] = "RO" + assert archivemail.should_archive(self.msg) + + def testOldUnread(self): + """old unread messages should not be archived with --preserve-unread""" + self.msg["Status"] = "O" + assert not archivemail.should_archive(self.msg) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.preserve_unread = False + + +class TestArchiveMboxSuffix(unittest.TestCase): + """make sure the 'suffix' option works""" + def setUp(self): + self.old_suffix = archivemail.options.archive_suffix + archivemail.options.quiet = True + + def testSuffix(self): + """archiving with specified --suffix arguments""" + for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): + mbox_name = "foobar" + archivemail.options.archive_suffix = suffix + days_old_max = 180 + parsed_suffix_time = time.time() - days_old_max*24*60*60 + parsed_suffix = time.strftime(suffix, + time.localtime(parsed_suffix_time)) + archive_name = mbox_name + parsed_suffix + self.assertEqual(archive_name, + archivemail.make_archive_name(mbox_name)) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.archive_suffix = self.old_suffix + + +class TestArchiveDryRun(TestArchive): + """make sure the 'dry-run' option works""" + def setUp(self): + super(TestArchiveDryRun, self).setUp() + archivemail.options.quiet = True + archivemail.options.dry_run = True + + def testOld(self): + """archiving an old mailbox with the 'dry-run' option""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options.dry_run = False + archivemail.options.quiet = False + super(TestArchiveDryRun, self).tearDown() + + +class TestArchiveDelete(TestArchive): + """make sure the 'delete' option works""" + def setUp(self): + super(TestArchiveDelete, self).setUp() + archivemail.options.quiet = True + archivemail.options.delete_old_mail = True + + def testNew(self): + """archiving a new mailbox with the 'delete' option""" + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testMixed(self): + """archiving a mixed mailbox with the 'delete' option""" + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOld(self): + """archiving an old mailbox with the 'delete' option""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options.delete_old_mail = False + archivemail.options.quiet = False + super(TestArchiveDelete, self).tearDown() + + +class TestArchiveCopy(TestArchive): + """make sure the 'copy' option works""" + def setUp(self): + super(TestArchiveCopy, self).setUp() + archivemail.options.quiet = True + archivemail.options.copy_old_mail = True + + def testNew(self): + """archiving a new mailbox with the 'copy' option""" + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testMixed(self): + """archiving a mixed mailbox with the 'copy' option""" + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOld(self): + """archiving an old mailbox with the 'copy' option""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options.copy_old_mail = False + archivemail.options.quiet = False + super(TestArchiveCopy, self).tearDown() + + +class TestArchiveMboxFlagged(unittest.TestCase): + """make sure the 'include_flagged' option works""" + def setUp(self): + archivemail.options.include_flagged = False + archivemail.options.quiet = True + + def testOld(self): + """by default, old flagged messages should not be archived""" + msg = make_message(default_headers={"X-Status": "F"}, + hours_old=24*181, wantobj=True) + assert not archivemail.should_archive(msg) + + def testIncludeFlaggedNew(self): + """new flagged messages should not be archived with include_flagged""" + msg = make_message(default_headers={"X-Status": "F"}, + hours_old=24*179, wantobj=True) + assert not archivemail.should_archive(msg) + + def testIncludeFlaggedOld(self): + """old flagged messages should be archived with include_flagged""" + archivemail.options.include_flagged = True + msg = make_message(default_headers={"X-Status": "F"}, + hours_old=24*181, wantobj=True) + assert archivemail.should_archive(msg) + + def tearDown(self): + archivemail.options.include_flagged = False + archivemail.options.quiet = False + + +class TestArchiveMboxOutputDir(unittest.TestCase): + """make sure that the 'output-dir' option works""" + def setUp(self): + archivemail.options.quiet = True + + def testOld(self): + """archiving an old mailbox with a sepecified output dir""" + for dir in "/just/a/path", "relative/path": + archivemail.options.output_dir = dir + archive_dir = archivemail.make_archive_name("/tmp/mbox") + self.assertEqual(dir, os.path.dirname(archive_dir)) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.output_dir = None + + +class TestArchiveMboxUncompressed(TestArchive): + """make sure that the 'no_compress' option works""" + mbox_name = None + new_mbox = None + old_mbox = None + copy_name = None + + def setUp(self): + archivemail.options.quiet = True + archivemail.options.no_compress = True + super(TestArchiveMboxUncompressed, self).setUp() + + def testOld(self): + """archiving an old mailbox uncompressed""" + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testNew(self): + """archiving a new mailbox uncompressed""" + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testMixed(self): + """archiving a mixed mailbox uncompressed""" + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() + + def testOldExists(self): + """archiving an old mailbox uncopressed with an existing archive""" + self.make_old_mbox(messages=3, make_old_archive=True) + archivemail.archive(self.mbox) + self.verify() + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.no_compress = False + super(TestArchiveMboxUncompressed, self).tearDown() + + +class TestArchiveSize(unittest.TestCase): + """check that the 'size' argument works""" + def setUp(self): + archivemail.options.quiet = True + msg_text = make_message(hours_old=24*181) + self.msg_size = len(msg_text) + fp = cStringIO.StringIO(msg_text) + self.msg = rfc822.Message(fp) + + def testSmaller(self): + """giving a size argument smaller than the message""" + archivemail.options.min_size = self.msg_size - 1 + assert archivemail.should_archive(self.msg) + + def testBigger(self): + """giving a size argument bigger than the message""" + archivemail.options.min_size = self.msg_size + 1 + assert not archivemail.should_archive(self.msg) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.min_size = None + + +############# Test archiving maildirs ############### + +class TestArchiveMailboxdir(TestCaseInTempdir): + """Base class defining helper functions for doing test archive runs with + maildirs.""" + maildir = None # Maildir that will be processed by archivemail + orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object + remaining_msg = set() # Filenames of maildir messages that should be preserved + number_archived = 0 # Number of messages that get archived + orig_archive = None # An uncompressed copy of a pre-existing archive, + # if one exists + + def setUp(self): + super(TestArchiveMailboxdir, self).setUp() + self.orig_maildir_obj = SimpleMaildir() + + def verify(self): + self._verify_remaining() + self._verify_archive() + + def _verify_remaining(self): + """Verify that the preserved messages weren't altered.""" + assert self.maildir + # Compare maildir with backup object. + dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root) + # Top-level has only directories cur, new, tmp and must be unchanged. + self.assertEqual(dcmp.left_list, dcmp.right_list) + found = set() + for d in dcmp.common_dirs: + dcmp2 = dcmp.subdirs[d] + # We need to verify three things. + # 1. directory is a subset of the original... + assert not dcmp2.left_only + # 2. all common files are identical... + self.assertEqual(dcmp2.common_files, dcmp2.same_files) + found = found.union([os.path.join(d, x) for x in dcmp2.common_files]) + # 3. exactly the `new' messages (recorded in self.remaining_msg) + # were preserved. + self.assertEqual(found, self.remaining_msg) + + def _verify_archive(self): + """Verify the archive correctness.""" + # TODO: currently make_archive_name does not include the .gz suffix. + # Is this something that should be fixed? + archive = archivemail.make_archive_name(self.maildir) + if archivemail.options.no_compress: + iszipped = False + else: + archive += '.gz' + iszipped = True + if self.number_archived == 0: + if self.orig_archive: + assertEqualContent(archive, self.orig_archive, iszipped) + else: + assert not os.path.exists(archive) + return + fp_new = fp_archive = tmp_archive_name = None + try: + if self.orig_archive: + new_size = os.path.getsize(archive) + # Brute force: split archive in old and new part and verify the + # parts separately. (Of course this destroys the archive.) + fp_archive = open(archive, "r+") + fp_archive.seek(self.orig_archive_size) + fd, tmp_archive_name = tempfile.mkstemp() + fp_new = os.fdopen(fd, "w") + shutil.copyfileobj(fp_archive, fp_new) + fp_new.close() + fp_archive.truncate(self.orig_archive_size) + fp_archive.close() + assertEqualContent(archive, self.orig_archive, iszipped) + new_archive = tmp_archive_name + else: + new_archive = archive + if archivemail.options.no_compress: + fp_archive = open(new_archive, "r") + else: + fp_archive = FixedGzipFile(new_archive, "r") + mb = mailbox.UnixMailbox(fp_archive) + found = 0 + for msg in mb: + self.verify_maildir_has_msg(self.orig_maildir_obj, msg) + found += 1 + self.assertEqual(found, self.number_archived) + finally: + if tmp_archive_name: + os.remove(tmp_archive_name) + if fp_new is not None: + fp_new.close() + if fp_archive is not None: + fp_archive.close() + + def verify_maildir_has_msg(self, maildir, msg): + """Assert that the given maildir has a copy of the rfc822 message.""" + mid = msg['Message-Id'] # Complains if there is no message-id + mdir_msg_str, mdir_flags = \ + maildir.get_message_and_mbox_status(mid) + mbox_flags = set(msg.get('status', '') + msg.get('x-status', '')) + self.assertEqual(mdir_flags, mbox_flags) + + headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'), + msg.headers) + headers = "".join(headers) + msg.rewindbody() + # Discard last mbox LF which is not part of the message. + body = msg.fp.read()[:-1] + msg_str = headers + os.linesep + body + self.assertEqual(mdir_msg_str, msg_str) + + def add_messages(self, body=None, headers=None, hours_old=0, messages=1): + for count in range(messages): + msg = make_message(body, default_headers=headers, mkfrom=False, + hours_old=hours_old) + self.orig_maildir_obj.write(msg, new=False) + + def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1, + make_old_archive=False): + mailbox_does_change = not (archivemail.options.dry_run or + archivemail.options.copy_old_mail) + archive_does_change = not (archivemail.options.dry_run or + archivemail.options.delete_old_mail) + if mknew: + self.add_messages(body, headers, 179*24, messages) + if archive_does_change and archivemail.options.archive_all: + self.number_archived += messages + if mailbox_does_change: + self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) + if mkold: + self.add_messages(body, headers, 181*24, messages) + if archive_does_change: + self.number_archived += messages + if not mailbox_does_change: + self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) + self.maildir = copy_maildir(self.orig_maildir_obj.root) + if make_old_archive: + archive = archivemail.make_archive_name(self.maildir) + self.orig_archive = make_archive_and_plain_copy(archive) + # FIXME: .gz extension handling is a mess II + if not archivemail.options.no_compress: + archive += '.gz' + self.orig_archive_size = os.path.getsize(archive) + +class TestEmptyMaildir(TestCaseInTempdir): + def setUp(self): + super(TestEmptyMaildir, self).setUp() + archivemail.options.quiet = True + + def testEmpty(self): + """Archiving an empty maildir should not result in an archive.""" + self.mdir = SimpleMaildir() + archivemail.archive(self.mdir.root) + assert not os.path.exists(self.mdir.root + '_archive.gz') + + def tearDown(self): + super(TestEmptyMaildir, self).tearDown() + archivemail.options.quiet = False + +class TestMaildir(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildir, self).setUp() + archivemail.options.quiet = True + + def testOld(self): + self.make_maildir(True, False, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testNew(self): + self.make_maildir(False, True, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testMixed(self): + self.make_maildir(True, True, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testMixedExisting(self): + self.make_maildir(True, True, messages=3, make_old_archive=True) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + archivemail.options.quiet = False + super(TestMaildir, self).tearDown() + + +class TestMaildirPreserveUnread(TestCaseInTempdir): + """Test if the preserve_unread option works with maildirs.""" + def setUp(self): + super(TestMaildirPreserveUnread, self).setUp() + archivemail.options.quiet = True + archivemail.options.preserve_unread = True + + def testOldRead(self): + """--preserve-unread archives old read messages in a maildir.""" + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*181) + smd.write(msg, new=False, flags='S') + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert archivemail.should_archive(msg_obj) + + def testOldUnread(self): + """--preserve-unread preserves old unread messages in a maildir.""" + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*181) + smd.write(msg, new=False) + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert not archivemail.should_archive(msg_obj) + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.preserve_unread = False + super(TestMaildirPreserveUnread, self).tearDown() + +class TestMaildirAll(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirAll, self).setUp() + archivemail.options.quiet = True + archivemail.options.archive_all = True + + def testNew(self): + """New maildir messages should be archived with --all""" + self.add_messages(hours_old=24*181) + md = mailbox.Maildir(self.orig_maildir_obj.root) + msg_obj = md.next() + assert archivemail.should_archive(msg_obj) + + def testOld(self): + """Old maildir messages should be archived with --all""" + self.add_messages(hours_old=24*179) + md = mailbox.Maildir(self.orig_maildir_obj.root) + msg_obj = md.next() + assert archivemail.should_archive(msg_obj) + + def tearDown(self): + super(TestMaildirAll, self).tearDown() + archivemail.options.quiet = False + archivemail.options.archive_all = False + +class TestMaildirDryRun(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirDryRun, self).setUp() + archivemail.options.quiet = True + archivemail.options.dry_run = True + + def testOld(self): + """archiving an old maildir mailbox with the 'dry-run' option""" + self.make_maildir(True, False) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + super(TestMaildirDryRun, self).tearDown() + archivemail.options.quiet = False + archivemail.options.dry_run = False + +class TestMaildirDelete(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirDelete, self).setUp() + archivemail.options.quiet = True + archivemail.options.delete_old_mail = True + + def testOld(self): + """archiving an old maildir mailbox with the 'delete' option""" + self.make_maildir(True, False) + archivemail.archive(self.maildir) + self.verify() + + def testNew(self): + """archiving a new maildir mailbox with the 'delete' option""" + self.make_maildir(False, True) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + super(TestMaildirDelete, self).tearDown() + archivemail.options.quiet = False + archivemail.options.delete_old_mail = False + +class TestMaildirCopy(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirCopy, self).setUp() + archivemail.options.quiet = True + archivemail.options.copy_old_mail = True + + def testOld(self): + """archiving an old maildir mailbox with the 'copy' option""" + self.make_maildir(True, False) + archivemail.archive(self.maildir) + self.verify() + + def testNew(self): + """archiving a new maildir mailbox with the 'copy' option""" + self.make_maildir(False, True) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + super(TestMaildirCopy, self).tearDown() + archivemail.options.quiet = False + archivemail.options.copy_old_mail = False + +class TestArchiveMaildirFlagged(TestCaseInTempdir): + """make sure the 'include_flagged' option works with maildir messages""" + def setUp(self): + super(TestArchiveMaildirFlagged, self).setUp() + archivemail.options.include_flagged = False + archivemail.options.quiet = True + + def testOld(self): + """by default, old flagged maildir messages should not be archived""" + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*181) + smd.write(msg, new=False, flags='F') + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert not archivemail.should_archive(msg_obj) + + def testIncludeFlaggedNew(self): + """new flagged maildir messages should not be archived with include_flagged""" + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*179) + smd.write(msg, new=False, flags='F') + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert not archivemail.should_archive(msg_obj) + + def testIncludeFlaggedOld(self): + """old flagged maildir messages should be archived with include_flagged""" + archivemail.options.include_flagged = True + smd = SimpleMaildir("orig") + msg = make_message(hours_old=24*181) + smd.write(msg, new=False, flags='F') + md = mailbox.Maildir(smd.root) + msg_obj = md.next() + assert archivemail.should_archive(msg_obj) + + def tearDown(self): + super(TestArchiveMaildirFlagged, self).tearDown() + archivemail.options.include_flagged = False + archivemail.options.quiet = False + +class TestArchiveMaildirSize(TestCaseInTempdir): + """check that the 'size' argument works with maildir messages""" + def setUp(self): + super(TestArchiveMaildirSize, self).setUp() + archivemail.options.quiet = True + msg = make_message(hours_old=24*181) + self.msg_size = len(msg) + smd = SimpleMaildir("orig") + smd.write(msg, new=False) + md = mailbox.Maildir(smd.root) + self.msg_obj = md.next() + + def testSmaller(self): + """giving a size argument smaller than the maildir message""" + archivemail.options.min_size = self.msg_size - 1 + assert archivemail.should_archive(self.msg_obj) + + def testBigger(self): + """giving a size argument bigger than the maildir message""" + archivemail.options.min_size = self.msg_size + 1 + assert not archivemail.should_archive(self.msg_obj) + + def tearDown(self): + super(TestArchiveMaildirSize, self).tearDown() + archivemail.options.quiet = False + archivemail.options.min_size = None + +########## helper routines ############ + +def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False): + headers = copy.copy(default_headers) + if not headers: + headers = {} + headers['Message-Id'] = make_msgid() + if not headers.has_key('Date'): + time_message = time.time() - (60 * 60 * hours_old) + headers['Date'] = time.asctime(time.localtime(time_message)) + if not headers.has_key('From'): + headers['From'] = "sender@dummy.domain" + if not headers.has_key('To'): + headers['To'] = "receipient@dummy.domain" + if not headers.has_key('Subject'): + headers['Subject'] = "This is the subject" + if mkfrom and not headers.has_key('From_'): + headers['From_'] = "%s %s" % (headers['From'], headers['Date']) + if not body: + body = "This is the message body" + + msg = "" + if headers.has_key('From_'): + msg = msg + ("From %s\n" % headers['From_']) + del headers['From_'] + for key in headers.keys(): + if headers[key] is not None: + msg = msg + ("%s: %s\n" % (key, headers[key])) + msg = msg + "\n\n" + body + "\n\n" + if not wantobj: + return msg + fp = cStringIO.StringIO(msg) + return rfc822.Message(fp) + +def append_file(source, dest): + """appends the file named 'source' to the file named 'dest'""" + assert os.path.isfile(source) + assert os.path.isfile(dest) + read = open(source, "r") + write = open(dest, "a+") + shutil.copyfileobj(read,write) + read.close() + write.close() + + +def make_mbox(body=None, headers=None, hours_old=0, messages=1): + assert tempfile.tempdir + fd, name = tempfile.mkstemp() + file = os.fdopen(fd, "w") + for count in range(messages): + msg = make_message(body=body, default_headers=headers, + mkfrom=True, hours_old=hours_old) + file.write(msg) + file.close() + return name + +def make_archive_and_plain_copy(archive_name): + """Make an mbox archive of the given name like archivemail may have + created it. Also make an uncompressed copy of this archive and return its + name.""" + copy_fd, copy_name = tempfile.mkstemp() + copy_fp = os.fdopen(copy_fd, "w") + if archivemail.options.no_compress: + fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) + fp = os.fdopen(fd, "w") + else: + archive_name += ".gz" + fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) + rawfp = os.fdopen(fd, "w") + fp = gzip.GzipFile(fileobj=rawfp) + for count in range(3): + msg = make_message(hours_old=24*360) + fp.write(msg) + copy_fp.write(msg) + fp.close() + copy_fp.close() + if not archivemail.options.no_compress: + rawfp.close() + return copy_name + +def copy_maildir(maildir, prefix="tmp"): + """Create a copy of the given maildir and return the absolute path of the + new direcory.""" + newdir = tempfile.mkdtemp(prefix=prefix) + for d in "cur", "new", "tmp": + shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d)) + return newdir + +def assertEqualContent(firstfile, secondfile, zippedfirst=False): + """Verify that the two files exist and have identical content. If zippedfirst + is True, assume that firstfile is gzip-compressed.""" + assert os.path.exists(firstfile) + assert os.path.exists(secondfile) + if zippedfirst: + try: + fp1 = gzip.GzipFile(firstfile, "r") + fp2 = open(secondfile, "r") + assert cmp_fileobj(fp1, fp2) + finally: + fp1.close() + fp2.close() + else: + assert filecmp.cmp(firstfile, secondfile, shallow=0) + +def cmp_fileobj(fp1, fp2): + """Return if reading the fileobjects yields identical content.""" + bufsize = 8192 + while True: + b1 = fp1.read(bufsize) + b2 = fp2.read(bufsize) + if b1 != b2: + return False + if not b1: + return True + +if __name__ == "__main__": + unittest.main() diff --git a/test_archivemail.py b/test_archivemail.py deleted file mode 100755 index 4a11734..0000000 --- a/test_archivemail.py +++ /dev/null @@ -1,1593 +0,0 @@ -#! /usr/bin/env python -############################################################################ -# Copyright (C) 2002 Paul Rodger -# (C) 2006-2010 Nikolaus Schulz -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -############################################################################ -""" -Unit-test archivemail using 'PyUnit'. - -TODO: add tests for: - * dotlock locks already existing - * archiving MH-format mailboxes - * a 3rd party process changing the mbox file being read - -""" - -import sys - -def check_python_version(): - """Abort if we are running on python < v2.3""" - too_old_error = "This test script requires python version 2.3 or later. " + \ - "Your version of python is:\n%s" % sys.version - try: - version = sys.version_info # we might not even have this function! :) - if (version[0] < 2) or (version[0] == 2 and version[1] < 3): - print too_old_error - sys.exit(1) - except AttributeError: - print too_old_error - sys.exit(1) - -# define & run this early because 'unittest' requires Python >= 2.1 -check_python_version() - -import copy -import fcntl -import filecmp -import os -import re -import shutil -import stat -import tempfile -import time -import unittest -import gzip -import cStringIO -import rfc822 -import mailbox - -try: - import archivemail -except ImportError: - print "The archivemail script needs to be called 'archivemail.py'" - print "and should be in the current directory in order to be imported" - print "and tested. Sorry." - if os.path.isfile("archivemail"): - print "Try renaming it from 'archivemail' to 'archivemail.py'." - sys.exit(1) - -# We want to iterate over messages in a compressed archive mbox and verify -# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in -# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered -# by mailbox._PartialFile.seek(). The bug is still pending as of Python -# 2.5.2. To work around it, we subclass gzip.GzipFile. -# -# It should be noted that seeking backwards in a GzipFile is emulated by -# re-reading the entire file from the beginning, which is extremely -# inefficient and won't work with large files; but our test archives are all -# small, so it's okay. - -class FixedGzipFile(gzip.GzipFile): - """GzipFile with seek method accepting whence parameter.""" - def seek(self, offset, whence=0): - try: - gzip.GzipFile.seek(self, offset, whence) - except TypeError: - if whence: - if whence == 1: - offset = self.offset + offset - else: - raise ValueError('Seek from end not supported') - gzip.GzipFile.seek(self, offset) - -# precision of os.utime() when restoring mbox timestamps -utimes_precision = 5 - -class MessageIdFactory: - """Factory to create `uniqe' message-ids.""" - def __init__(self): - self.seq = 0 - def __call__(self): - self.seq += 1 - return "" % self.seq - -make_msgid = MessageIdFactory() - -class IndexedMailboxDir: - """An indexed mailbox directory, providing random message access by - message-id. Intended as a base class for a maildir and an mh subclass.""" - - def __init__(self, mdir_name): - assert tempfile.tempdir - self.root = tempfile.mkdtemp(prefix=mdir_name) - self.msg_id_dict = {} - self.deliveries = 0 - - def _add_to_index(self, msg_text, fpath): - """Add the given message to the index, for later random access.""" - # Extract the message-id as index key - msg_id = None - fp = cStringIO.StringIO(msg_text) - while True: - line = fp.readline() - # line empty means we didn't find a message-id - assert line - if line.lower().startswith("message-id:"): - msg_id = line.split(":", 1)[-1].strip() - assert msg_id - break - assert not self.msg_id_dict.has_key(msg_id) - self.msg_id_dict[msg_id] = fpath - - def get_all_filenames(self): - """Return all relative pathnames of files in this mailbox.""" - return self.msg_id_dict.values() - -class SimpleMaildir(IndexedMailboxDir): - """Primitive Maildir class, just good enough for generating short-lived - test maildirs.""" - - def __init__(self, mdir_name='maildir'): - IndexedMailboxDir.__init__(self, mdir_name) - for d in "cur", "tmp", "new": - os.mkdir(os.path.join(self.root, d)) - - def write(self, msg_str, new=True, flags=[]): - """Store a message with the given flags.""" - assert not (new and flags) - if new: - subdir = "new" - else: - subdir = "cur" - fname = self._mkname(new, flags) - relpath = os.path.join(subdir, fname) - path = os.path.join(self.root, relpath) - assert not os.path.exists(path) - f = open(path, "w") - f.write(msg_str) - f.close() - self._add_to_index(msg_str, relpath) - - def _mkname(self, new, flags): - """Generate a unique filename for a new message.""" - validflags = 'DFPRST' - for f in flags: - assert f in validflags - # This 'unique' name should be good enough, since nobody else - # will ever write messages to this maildir folder. - uniq = str(self.deliveries) - self.deliveries += 1 - if new: - return uniq - if not flags: - return uniq + ':2,' - finfo = "".join(sorted(flags)) - return uniq + ':2,' + finfo - - def get_message_and_mbox_status(self, msgid): - """For the Message-Id msgid, return the matching message in text - format and its status, expressed as a set of mbox flags.""" - fpath = self.msg_id_dict[msgid] # Barfs if not found - mdir_flags = fpath.rsplit('2,', 1)[-1] - flagmap = { - 'F': 'F', - 'R': 'A', - 'S': 'R' - } - mbox_flags = set([flagmap[x] for x in mdir_flags]) - if fpath.startswith("cur/"): - mbox_flags.add('O') - fp = open(os.path.join(self.root, fpath), "r") - msg = fp.read() - fp.close() - return msg, mbox_flags - - -class TestCaseInTempdir(unittest.TestCase): - """Base class for testcases that need to create temporary files. - All testcases that create temporary files should be derived from this - class, not directly from unittest.TestCase. - TestCaseInTempdir provides these methods: - - setUp() Creates a safe temporary directory and sets tempfile.tempdir. - - tearDown() Recursively removes the temporary directory and unsets - tempfile.tempdir. - - Overriding methods should call the ones above.""" - temproot = None - - def setUp(self): - if not self.temproot: - assert not tempfile.tempdir - self.temproot = tempfile.tempdir = \ - tempfile.mkdtemp(prefix="test-archivemail") - - def tearDown(self): - assert tempfile.tempdir == self.temproot - if self.temproot: - shutil.rmtree(self.temproot) - tempfile.tempdir = self.temproot = None - - -############ Mbox Class testing ############## - -class TestMboxDotlock(TestCaseInTempdir): - def setUp(self): - super(TestMboxDotlock, self).setUp() - self.mbox_name = make_mbox() - self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.mbox = archivemail.Mbox(self.mbox_name) - - def testDotlock(self): - """dotlock_lock/unlock should create/delete a lockfile""" - lock = self.mbox_name + ".lock" - self.mbox._dotlock_lock() - assert os.path.isfile(lock) - self.mbox._dotlock_unlock() - assert not os.path.isfile(lock) - - def testDotlockingSucceedsUponEACCES(self): - """A dotlock should silently be omitted upon EACCES.""" - archivemail.options.quiet = True - mbox_dir = os.path.dirname(self.mbox_name) - os.chmod(mbox_dir, 0500) - try: - self.mbox._dotlock_lock() - finally: - os.chmod(mbox_dir, 0700) - archivemail.options.quiet = False - -class TestMboxPosixLock(TestCaseInTempdir): - def setUp(self): - super(TestMboxPosixLock, self).setUp() - self.mbox_name = make_mbox() - self.mbox = archivemail.Mbox(self.mbox_name) - - def testPosixLock(self): - """posix_lock/unlock should create/delete an advisory lock""" - - # The following code snippet heavily lends from the Python 2.5 mailbox - # unittest. - # BEGIN robbery: - - # Fork off a subprocess that will lock the file for 2 seconds, - # unlock it, and then exit. - pid = os.fork() - if pid == 0: - # In the child, lock the mailbox. - self.mbox._posix_lock() - time.sleep(2) - self.mbox._posix_unlock() - os._exit(0) - - # In the parent, sleep a bit to give the child time to acquire - # the lock. - time.sleep(0.5) - # The parent's file self.mbox.mbox_file shares fcntl locks with the - # duplicated FD in the child; reopen it so we get a different file - # table entry. - file = open(self.mbox_name, "r+") - lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB - fd = file.fileno() - try: - self.assertRaises(IOError, fcntl.lockf, fd, lock_nb) - - finally: - # Wait for child to exit. Locking should now succeed. - exited_pid, status = os.waitpid(pid, 0) - - fcntl.lockf(fd, lock_nb) - fcntl.lockf(fd, fcntl.LOCK_UN) - # END robbery - - -class TestMboxNext(TestCaseInTempdir): - def setUp(self): - super(TestMboxNext, self).setUp() - self.not_empty_name = make_mbox(messages=18) - self.empty_name = make_mbox(messages=0) - - def testNextEmpty(self): - """mbox.next() should return None on an empty mailbox""" - mbox = archivemail.Mbox(self.empty_name) - msg = mbox.next() - self.assertEqual(msg, None) - - def testNextNotEmpty(self): - """mbox.next() should a message on a populated mailbox""" - mbox = archivemail.Mbox(self.not_empty_name) - for count in range(18): - msg = mbox.next() - assert msg - msg = mbox.next() - self.assertEqual(msg, None) - - -############ TempMbox Class testing ############## - -class TestTempMboxWrite(TestCaseInTempdir): - def setUp(self): - super(TestTempMboxWrite, self).setUp() - - def testWrite(self): - """mbox.write() should append messages to a mbox mailbox""" - read_file = make_mbox(messages=3) - mbox_read = archivemail.Mbox(read_file) - mbox_write = archivemail.TempMbox() - write_file = mbox_write.mbox_file_name - for count in range(3): - msg = mbox_read.next() - mbox_write.write(msg) - mbox_read.close() - mbox_write.close() - assert filecmp.cmp(read_file, write_file, shallow=0) - - def testWriteNone(self): - """calling mbox.write() with no message should raise AssertionError""" - write = archivemail.TempMbox() - self.assertRaises(AssertionError, write.write, None) - -class TestTempMboxRemove(TestCaseInTempdir): - def setUp(self): - super(TestTempMboxRemove, self).setUp() - self.mbox = archivemail.TempMbox() - self.mbox_name = self.mbox.mbox_file_name - - def testMboxRemove(self): - """remove() should delete a mbox mailbox""" - assert os.path.exists(self.mbox_name) - self.mbox.remove() - assert not os.path.exists(self.mbox_name) - - - -########## options class testing ################# - -class TestOptionDefaults(unittest.TestCase): - def testVerbose(self): - """verbose should be off by default""" - self.assertEqual(archivemail.options.verbose, False) - - def testDaysOldMax(self): - """default archival time should be 180 days""" - self.assertEqual(archivemail.options.days_old_max, 180) - - def testQuiet(self): - """quiet should be off by default""" - self.assertEqual(archivemail.options.quiet, False) - - def testDeleteOldMail(self): - """we should not delete old mail by default""" - self.assertEqual(archivemail.options.delete_old_mail, False) - - def testNoCompress(self): - """no-compression should be off by default""" - self.assertEqual(archivemail.options.no_compress, False) - - def testIncludeFlagged(self): - """we should not archive flagged messages by default""" - self.assertEqual(archivemail.options.include_flagged, False) - - def testPreserveUnread(self): - """we should not preserve unread messages by default""" - self.assertEqual(archivemail.options.preserve_unread, False) - -class TestOptionParser(unittest.TestCase): - def setUp(self): - self.oldopts = copy.copy(archivemail.options) - - def testOptionDate(self): - """--date and -D options are parsed correctly""" - date_formats = ( - "%Y-%m-%d", # ISO format - "%d %b %Y" , # Internet format - "%d %B %Y" , # Internet format with full month names - ) - date = time.strptime("2000-07-29", "%Y-%m-%d") - unixdate = time.mktime(date) - for df in date_formats: - d = time.strftime(df, date) - for opt in '-D', '--date=': - archivemail.options.date_old_max = None - archivemail.options.parse_args([opt+d], "") - self.assertEqual(unixdate, archivemail.options.date_old_max) - - def testOptionPreserveUnread(self): - """--preserve-unread option is parsed correctly""" - archivemail.options.parse_args(["--preserve-unread"], "") - assert archivemail.options.preserve_unread - archivemail.options.preserve_unread = False - archivemail.options.parse_args(["-u"], "") - assert archivemail.options.preserve_unread - - def testOptionSuffix(self): - """--suffix and -s options are parsed correctly""" - for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): - archivemail.options.parse_args(["--suffix="+suffix], "") - assert archivemail.options.archive_suffix == suffix - archivemail.options.suffix = None - archivemail.options.parse_args(["-s", suffix], "") - assert archivemail.options.archive_suffix == suffix - - def testOptionDryrun(self): - """--dry-run option is parsed correctly""" - archivemail.options.parse_args(["--dry-run"], "") - assert archivemail.options.dry_run - archivemail.options.preserve_unread = False - archivemail.options.parse_args(["-n"], "") - assert archivemail.options.dry_run - - def testOptionDays(self): - """--days and -d options are parsed correctly""" - archivemail.options.parse_args(["--days=11"], "") - self.assertEqual(archivemail.options.days_old_max, 11) - archivemail.options.days_old_max = None - archivemail.options.parse_args(["-d11"], "") - self.assertEqual(archivemail.options.days_old_max, 11) - - def testOptionDelete(self): - """--delete option is parsed correctly""" - archivemail.options.parse_args(["--delete"], "") - assert archivemail.options.delete_old_mail - - def testOptionCopy(self): - """--copy option is parsed correctly""" - archivemail.options.parse_args(["--copy"], "") - assert archivemail.options.copy_old_mail - - def testOptionOutputdir(self): - """--output-dir and -o options are parsed correctly""" - for path in "/just/some/path", "relative/path": - archivemail.options.parse_args(["--output-dir=%s" % path], "") - self.assertEqual(archivemail.options.output_dir, path) - archivemail.options.output_dir = None - archivemail.options.parse_args(["-o%s" % path], "") - self.assertEqual(archivemail.options.output_dir, path) - - def testOptionNocompress(self): - """--no-compress option is parsed correctly""" - archivemail.options.parse_args(["--no-compress"], "") - assert archivemail.options.no_compress - - def testOptionSize(self): - """--size and -S options are parsed correctly""" - size = "666" - archivemail.options.parse_args(["--size=%s" % size ], "") - self.assertEqual(archivemail.options.min_size, int(size)) - archivemail.options.parse_args(["-S%s" % size ], "") - self.assertEqual(archivemail.options.min_size, int(size)) - - def tearDown(self): - archivemail.options = self.oldopts - -########## archivemail.is_older_than_days() unit testing ################# - -class TestIsTooOld(unittest.TestCase): - def testVeryOld(self): - """with max_days=360, should be true for these dates > 1 year""" - for years in range(1, 10): - time_msg = time.time() - (years * 365 * 24 * 60 * 60) - assert archivemail.is_older_than_days(time_message=time_msg, - max_days=360) - - def testOld(self): - """with max_days=14, should be true for these dates > 14 days""" - for days in range(14, 360): - time_msg = time.time() - (days * 24 * 60 * 60) - assert archivemail.is_older_than_days(time_message=time_msg, - max_days=14) - - def testJustOld(self): - """with max_days=1, should be true for these dates >= 1 day""" - for minutes in range(0, 61): - time_msg = time.time() - (25 * 60 * 60) + (minutes * 60) - assert archivemail.is_older_than_days(time_message=time_msg, - max_days=1) - - def testNotOld(self): - """with max_days=9, should be false for these dates < 9 days""" - for days in range(0, 9): - time_msg = time.time() - (days * 24 * 60 * 60) - assert not archivemail.is_older_than_days(time_message=time_msg, - max_days=9) - - def testJustNotOld(self): - """with max_days=1, should be false for these hours <= 1 day""" - for minutes in range(0, 60): - time_msg = time.time() - (23 * 60 * 60) - (minutes * 60) - assert not archivemail.is_older_than_days(time_message=time_msg, - max_days=1) - - def testFuture(self): - """with max_days=1, should be false for times in the future""" - for minutes in range(0, 60): - time_msg = time.time() + (minutes * 60) - assert not archivemail.is_older_than_days(time_message=time_msg, - max_days=1) - -########## archivemail.parse_imap_url() unit testing ################# - -class TestParseIMAPUrl(unittest.TestCase): - def setUp(self): - archivemail.options.quiet = True - archivemail.options.verbose = False - archivemail.options.pwfile = None - - urls_withoutpass = [ - ('imaps://user@example.org@imap.example.org/upperbox/lowerbox', - ('user', None, 'example.org@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://"user@example.org"@imap.example.org/upperbox/lowerbox', - ('user@example.org', None, 'imap.example.org', - 'upperbox/lowerbox')), - ('imaps://user@example.org"@imap.example.org/upperbox/lowerbox', - ('user', None, 'example.org"@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox', - ('"user', None, 'example.org@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox', - ('us"er@example.org', None, 'imap.example.org', - 'upperbox/lowerbox')), - ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox', - ('user\\', None, 'example.org@imap.example.org', - 'upperbox/lowerbox')) - ] - urls_withpass = [ - ('imaps://user@example.org:passwd@imap.example.org/upperbox/lowerbox', - ('user@example.org', 'passwd', 'imap.example.org', - 'upperbox/lowerbox'), - ('user', None, 'example.org:passwd@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox', - ('"user@example.org', "passwd", 'imap.example.org', - 'upperbox/lowerbox'), - ('"user', None, 'example.org:passwd@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox', - ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org', - 'upperbox/lowerbox'), - ('u\\ser\\', None, 'example.org:"p@sswd"@imap.example.org', - 'upperbox/lowerbox')) - ] - # These are invalid when the password's not stripped. - urls_onlywithpass = [ - ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox', - ('user@example.org', "passwd", 'imap.example.org', - 'upperbox/lowerbox')) - ] - def testUrlsWithoutPwfile(self): - """Parse test urls with --pwfile option unset. This parses a password in - the URL, if present.""" - archivemail.options.pwfile = None - for mbstr in self.urls_withpass + self.urls_withoutpass: - url = mbstr[0][mbstr[0].find('://')+3:] - result = archivemail.parse_imap_url(url) - self.assertEqual(result, mbstr[1]) - - def testUrlsWithPwfile(self): - """Parse test urls with --pwfile set. In this case the ':' character - loses its meaning as a delimiter.""" - archivemail.options.pwfile = "whocares.txt" - for mbstr in self.urls_withpass: - url = mbstr[0][mbstr[0].find('://')+3:] - result = archivemail.parse_imap_url(url) - self.assertEqual(result, mbstr[2]) - for mbstr in self.urls_onlywithpass: - url = mbstr[0][mbstr[0].find('://')+3:] - self.assertRaises(archivemail.UnexpectedError, - archivemail.parse_imap_url, url) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.verbose = False - archivemail.options.pwfile = None - -########## acceptance testing ########### - -class TestArchive(TestCaseInTempdir): - """Base class defining helper functions for doing test archiving runs.""" - mbox = None # mbox file that will be processed by archivemail - good_archive = None # Uncompressed reference archive file to verify the - # archive after processing - good_mbox = None # Reference mbox file to verify the mbox after processing - - def verify(self): - assert os.path.exists(self.mbox) - if self.good_mbox is not None: - assertEqualContent(self.mbox, self.good_mbox) - else: - self.assertEqual(os.path.getsize(self.mbox), 0) - archive_name = self.mbox + "_archive" - if not archivemail.options.no_compress: - archive_name += ".gz" - iszipped = True - else: - assert not os.path.exists(archive_name + ".gz") - iszipped = False - if self.good_archive is not None: - assertEqualContent(archive_name, self.good_archive, iszipped) - else: - assert not os.path.exists(archive_name) - - def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): - """Prepare for a test run with an old mbox by making an old mbox, - optionally an existing archive, and a reference archive to verify the - archive after archivemail has run.""" - self.mbox = make_mbox(body, headers, 181*24, messages) - archive_does_change = not (archivemail.options.dry_run or - archivemail.options.delete_old_mail) - mbox_does_not_change = archivemail.options.dry_run or \ - archivemail.options.copy_old_mail - if make_old_archive: - archive = archivemail.make_archive_name(self.mbox) - self.good_archive = make_archive_and_plain_copy(archive) - if archive_does_change: - append_file(self.mbox, self.good_archive) - elif archive_does_change: - self.good_archive = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_archive) - if mbox_does_not_change: - if archive_does_change and not make_old_archive: - self.good_mbox = self.good_archive - else: - self.good_mbox = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_mbox) - - def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): - """Prepare for a test run with a mixed mbox by making a mixed mbox, - optionally an existing archive, a reference archive to verify the - archive after archivemail has run, and likewise a reference mbox to - verify the mbox.""" - self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive) - new_mbox_name = make_mbox(body, headers, 179*24, messages) - append_file(new_mbox_name, self.mbox) - if self.good_mbox is None: - self.good_mbox = new_mbox_name - else: - if self.good_mbox == self.good_archive: - self.good_mbox = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_mbox) - else: - append_file(new_mbox_name, self.good_mbox) - - def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): - """Prepare for a test run with a new mbox by making a new mbox, - optionally an exiting archive, and a reference mbox to verify the mbox - after archivemail has run.""" - self.mbox = make_mbox(body, headers, 179*24, messages) - self.good_mbox = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_mbox) - if make_old_archive: - archive = archivemail.make_archive_name(self.mbox) - self.good_archive = make_archive_and_plain_copy(archive) - - -class TestArchiveMbox(TestArchive): - """archiving should work based on the date of messages given""" - - def setUp(self): - self.oldopts = copy.copy(archivemail.options) - archivemail.options.quiet = True - super(TestArchiveMbox, self).setUp() - - def testOld(self): - """archiving an old mailbox""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOldFromInBody(self): - """archiving an old mailbox with 'From ' in the body""" - body = """This is a message with ^From at the start of a line -From is on this line -This is after the ^From line""" - self.make_old_mbox(messages=3, body=body) - archivemail.archive(self.mbox) - self.verify() - - def testDateSystem(self): - """test that the --date option works as expected""" - test_headers = ( - { - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : None, - }, - { - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : None, - 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : None, - 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - ) - for headers in test_headers: - msg = make_message(default_headers=headers, wantobj=True) - date = time.strptime("2000-07-29", "%Y-%m-%d") - archivemail.options.date_old_max = time.mktime(date) - assert archivemail.should_archive(msg) - date = time.strptime("2000-07-27", "%Y-%m-%d") - archivemail.options.date_old_max = time.mktime(date) - assert not archivemail.should_archive(msg) - - def testMixed(self): - """archiving a mixed mailbox""" - self.make_mixed_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testNew(self): - """archiving a new mailbox""" - self.make_new_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOldExisting(self): - """archiving an old mailbox with an existing archive""" - self.make_old_mbox(messages=3, make_old_archive=True) - archivemail.archive(self.mbox) - self.verify() - - def testOldWeirdHeaders(self): - """archiving old mailboxes with weird headers""" - weird_headers = ( - { # we should archive because of the date on the 'From_' line - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000', - }, - { # we should archive because of the date on the 'From_' line - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : None, - }, - { # we should archive because of the date in 'Delivery-date' - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', - 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { # we should archive because of the date in 'Delivery-date' - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : None, - 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { # we should archive because of the date in 'Resent-Date' - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', - 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { # we should archive because of the date in 'Resent-Date' - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : None, - 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { # completely blank dates were crashing < version 0.4.7 - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : '', - }, - { # completely blank dates were crashing < version 0.4.7 - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : '', - 'Resent-Date' : '', - }, - ) - fd, self.mbox = tempfile.mkstemp() - fp = os.fdopen(fd, "w") - for headers in weird_headers: - msg_text = make_message(default_headers=headers) - fp.write(msg_text*2) - fp.close() - self.good_archive = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_archive) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options = self.oldopts - super(TestArchiveMbox, self).tearDown() - - -class TestArchiveMboxTimestamp(TestCaseInTempdir): - """original mbox timestamps should always be preserved""" - def setUp(self): - super(TestArchiveMboxTimestamp, self).setUp() - archivemail.options.quiet = True - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180)) - self.mtime = os.path.getmtime(self.mbox_name) - 66 - self.atime = os.path.getatime(self.mbox_name) - 88 - os.utime(self.mbox_name, (self.atime, self.mtime)) - - def testNew(self): - """mbox timestamps should not change after no archival""" - archivemail.options.days_old_max = 181 - archivemail.archive(self.mbox_name) - self.verify() - - def testOld(self): - """mbox timestamps should not change after archival""" - archivemail.options.days_old_max = 179 - archivemail.archive(self.mbox_name) - self.verify() - - def verify(self): - assert os.path.exists(self.mbox_name) - new_atime = os.path.getatime(self.mbox_name) - new_mtime = os.path.getmtime(self.mbox_name) - self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision) - self.assertAlmostEqual(self.atime, new_atime, utimes_precision) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.days_old_max = 180 - os.remove(self.mbox_name) - super(TestArchiveMboxTimestamp, self).tearDown() - - -class TestArchiveMboxAll(unittest.TestCase): - def setUp(self): - archivemail.options.quiet = True - archivemail.options.archive_all = True - - def testNew(self): - """new messages should be archived with --all""" - self.msg = make_message(hours_old=24*179, wantobj=True) - assert archivemail.should_archive(self.msg) - - def testOld(self): - """old messages should be archived with --all""" - self.msg = make_message(hours_old=24*181, wantobj=True) - assert archivemail.should_archive(self.msg) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.archive_all = False - -class TestArchiveMboxPreserveUnread(unittest.TestCase): - """make sure the 'preserve_unread' option works""" - def setUp(self): - archivemail.options.quiet = True - archivemail.options.preserve_unread = True - self.msg = make_message(hours_old=24*181, wantobj=True) - - def testOldRead(self): - """old read messages should be archived with --preserve-unread""" - self.msg["Status"] = "RO" - assert archivemail.should_archive(self.msg) - - def testOldUnread(self): - """old unread messages should not be archived with --preserve-unread""" - self.msg["Status"] = "O" - assert not archivemail.should_archive(self.msg) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.preserve_unread = False - - -class TestArchiveMboxSuffix(unittest.TestCase): - """make sure the 'suffix' option works""" - def setUp(self): - self.old_suffix = archivemail.options.archive_suffix - archivemail.options.quiet = True - - def testSuffix(self): - """archiving with specified --suffix arguments""" - for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): - mbox_name = "foobar" - archivemail.options.archive_suffix = suffix - days_old_max = 180 - parsed_suffix_time = time.time() - days_old_max*24*60*60 - parsed_suffix = time.strftime(suffix, - time.localtime(parsed_suffix_time)) - archive_name = mbox_name + parsed_suffix - self.assertEqual(archive_name, - archivemail.make_archive_name(mbox_name)) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.archive_suffix = self.old_suffix - - -class TestArchiveDryRun(TestArchive): - """make sure the 'dry-run' option works""" - def setUp(self): - super(TestArchiveDryRun, self).setUp() - archivemail.options.quiet = True - archivemail.options.dry_run = True - - def testOld(self): - """archiving an old mailbox with the 'dry-run' option""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options.dry_run = False - archivemail.options.quiet = False - super(TestArchiveDryRun, self).tearDown() - - -class TestArchiveDelete(TestArchive): - """make sure the 'delete' option works""" - def setUp(self): - super(TestArchiveDelete, self).setUp() - archivemail.options.quiet = True - archivemail.options.delete_old_mail = True - - def testNew(self): - """archiving a new mailbox with the 'delete' option""" - self.make_new_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testMixed(self): - """archiving a mixed mailbox with the 'delete' option""" - self.make_mixed_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOld(self): - """archiving an old mailbox with the 'delete' option""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options.delete_old_mail = False - archivemail.options.quiet = False - super(TestArchiveDelete, self).tearDown() - - -class TestArchiveCopy(TestArchive): - """make sure the 'copy' option works""" - def setUp(self): - super(TestArchiveCopy, self).setUp() - archivemail.options.quiet = True - archivemail.options.copy_old_mail = True - - def testNew(self): - """archiving a new mailbox with the 'copy' option""" - self.make_new_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testMixed(self): - """archiving a mixed mailbox with the 'copy' option""" - self.make_mixed_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOld(self): - """archiving an old mailbox with the 'copy' option""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options.copy_old_mail = False - archivemail.options.quiet = False - super(TestArchiveCopy, self).tearDown() - - -class TestArchiveMboxFlagged(unittest.TestCase): - """make sure the 'include_flagged' option works""" - def setUp(self): - archivemail.options.include_flagged = False - archivemail.options.quiet = True - - def testOld(self): - """by default, old flagged messages should not be archived""" - msg = make_message(default_headers={"X-Status": "F"}, - hours_old=24*181, wantobj=True) - assert not archivemail.should_archive(msg) - - def testIncludeFlaggedNew(self): - """new flagged messages should not be archived with include_flagged""" - msg = make_message(default_headers={"X-Status": "F"}, - hours_old=24*179, wantobj=True) - assert not archivemail.should_archive(msg) - - def testIncludeFlaggedOld(self): - """old flagged messages should be archived with include_flagged""" - archivemail.options.include_flagged = True - msg = make_message(default_headers={"X-Status": "F"}, - hours_old=24*181, wantobj=True) - assert archivemail.should_archive(msg) - - def tearDown(self): - archivemail.options.include_flagged = False - archivemail.options.quiet = False - - -class TestArchiveMboxOutputDir(unittest.TestCase): - """make sure that the 'output-dir' option works""" - def setUp(self): - archivemail.options.quiet = True - - def testOld(self): - """archiving an old mailbox with a sepecified output dir""" - for dir in "/just/a/path", "relative/path": - archivemail.options.output_dir = dir - archive_dir = archivemail.make_archive_name("/tmp/mbox") - self.assertEqual(dir, os.path.dirname(archive_dir)) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.output_dir = None - - -class TestArchiveMboxUncompressed(TestArchive): - """make sure that the 'no_compress' option works""" - mbox_name = None - new_mbox = None - old_mbox = None - copy_name = None - - def setUp(self): - archivemail.options.quiet = True - archivemail.options.no_compress = True - super(TestArchiveMboxUncompressed, self).setUp() - - def testOld(self): - """archiving an old mailbox uncompressed""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testNew(self): - """archiving a new mailbox uncompressed""" - self.make_new_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testMixed(self): - """archiving a mixed mailbox uncompressed""" - self.make_mixed_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOldExists(self): - """archiving an old mailbox uncopressed with an existing archive""" - self.make_old_mbox(messages=3, make_old_archive=True) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.no_compress = False - super(TestArchiveMboxUncompressed, self).tearDown() - - -class TestArchiveSize(unittest.TestCase): - """check that the 'size' argument works""" - def setUp(self): - archivemail.options.quiet = True - msg_text = make_message(hours_old=24*181) - self.msg_size = len(msg_text) - fp = cStringIO.StringIO(msg_text) - self.msg = rfc822.Message(fp) - - def testSmaller(self): - """giving a size argument smaller than the message""" - archivemail.options.min_size = self.msg_size - 1 - assert archivemail.should_archive(self.msg) - - def testBigger(self): - """giving a size argument bigger than the message""" - archivemail.options.min_size = self.msg_size + 1 - assert not archivemail.should_archive(self.msg) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.min_size = None - - -############# Test archiving maildirs ############### - -class TestArchiveMailboxdir(TestCaseInTempdir): - """Base class defining helper functions for doing test archive runs with - maildirs.""" - maildir = None # Maildir that will be processed by archivemail - orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object - remaining_msg = set() # Filenames of maildir messages that should be preserved - number_archived = 0 # Number of messages that get archived - orig_archive = None # An uncompressed copy of a pre-existing archive, - # if one exists - - def setUp(self): - super(TestArchiveMailboxdir, self).setUp() - self.orig_maildir_obj = SimpleMaildir() - - def verify(self): - self._verify_remaining() - self._verify_archive() - - def _verify_remaining(self): - """Verify that the preserved messages weren't altered.""" - assert self.maildir - # Compare maildir with backup object. - dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root) - # Top-level has only directories cur, new, tmp and must be unchanged. - self.assertEqual(dcmp.left_list, dcmp.right_list) - found = set() - for d in dcmp.common_dirs: - dcmp2 = dcmp.subdirs[d] - # We need to verify three things. - # 1. directory is a subset of the original... - assert not dcmp2.left_only - # 2. all common files are identical... - self.assertEqual(dcmp2.common_files, dcmp2.same_files) - found = found.union([os.path.join(d, x) for x in dcmp2.common_files]) - # 3. exactly the `new' messages (recorded in self.remaining_msg) - # were preserved. - self.assertEqual(found, self.remaining_msg) - - def _verify_archive(self): - """Verify the archive correctness.""" - # TODO: currently make_archive_name does not include the .gz suffix. - # Is this something that should be fixed? - archive = archivemail.make_archive_name(self.maildir) - if archivemail.options.no_compress: - iszipped = False - else: - archive += '.gz' - iszipped = True - if self.number_archived == 0: - if self.orig_archive: - assertEqualContent(archive, self.orig_archive, iszipped) - else: - assert not os.path.exists(archive) - return - fp_new = fp_archive = tmp_archive_name = None - try: - if self.orig_archive: - new_size = os.path.getsize(archive) - # Brute force: split archive in old and new part and verify the - # parts separately. (Of course this destroys the archive.) - fp_archive = open(archive, "r+") - fp_archive.seek(self.orig_archive_size) - fd, tmp_archive_name = tempfile.mkstemp() - fp_new = os.fdopen(fd, "w") - shutil.copyfileobj(fp_archive, fp_new) - fp_new.close() - fp_archive.truncate(self.orig_archive_size) - fp_archive.close() - assertEqualContent(archive, self.orig_archive, iszipped) - new_archive = tmp_archive_name - else: - new_archive = archive - if archivemail.options.no_compress: - fp_archive = open(new_archive, "r") - else: - fp_archive = FixedGzipFile(new_archive, "r") - mb = mailbox.UnixMailbox(fp_archive) - found = 0 - for msg in mb: - self.verify_maildir_has_msg(self.orig_maildir_obj, msg) - found += 1 - self.assertEqual(found, self.number_archived) - finally: - if tmp_archive_name: - os.remove(tmp_archive_name) - if fp_new is not None: - fp_new.close() - if fp_archive is not None: - fp_archive.close() - - def verify_maildir_has_msg(self, maildir, msg): - """Assert that the given maildir has a copy of the rfc822 message.""" - mid = msg['Message-Id'] # Complains if there is no message-id - mdir_msg_str, mdir_flags = \ - maildir.get_message_and_mbox_status(mid) - mbox_flags = set(msg.get('status', '') + msg.get('x-status', '')) - self.assertEqual(mdir_flags, mbox_flags) - - headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'), - msg.headers) - headers = "".join(headers) - msg.rewindbody() - # Discard last mbox LF which is not part of the message. - body = msg.fp.read()[:-1] - msg_str = headers + os.linesep + body - self.assertEqual(mdir_msg_str, msg_str) - - def add_messages(self, body=None, headers=None, hours_old=0, messages=1): - for count in range(messages): - msg = make_message(body, default_headers=headers, mkfrom=False, - hours_old=hours_old) - self.orig_maildir_obj.write(msg, new=False) - - def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1, - make_old_archive=False): - mailbox_does_change = not (archivemail.options.dry_run or - archivemail.options.copy_old_mail) - archive_does_change = not (archivemail.options.dry_run or - archivemail.options.delete_old_mail) - if mknew: - self.add_messages(body, headers, 179*24, messages) - if archive_does_change and archivemail.options.archive_all: - self.number_archived += messages - if mailbox_does_change: - self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) - if mkold: - self.add_messages(body, headers, 181*24, messages) - if archive_does_change: - self.number_archived += messages - if not mailbox_does_change: - self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) - self.maildir = copy_maildir(self.orig_maildir_obj.root) - if make_old_archive: - archive = archivemail.make_archive_name(self.maildir) - self.orig_archive = make_archive_and_plain_copy(archive) - # FIXME: .gz extension handling is a mess II - if not archivemail.options.no_compress: - archive += '.gz' - self.orig_archive_size = os.path.getsize(archive) - -class TestEmptyMaildir(TestCaseInTempdir): - def setUp(self): - super(TestEmptyMaildir, self).setUp() - archivemail.options.quiet = True - - def testEmpty(self): - """Archiving an empty maildir should not result in an archive.""" - self.mdir = SimpleMaildir() - archivemail.archive(self.mdir.root) - assert not os.path.exists(self.mdir.root + '_archive.gz') - - def tearDown(self): - super(TestEmptyMaildir, self).tearDown() - archivemail.options.quiet = False - -class TestMaildir(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildir, self).setUp() - archivemail.options.quiet = True - - def testOld(self): - self.make_maildir(True, False, messages=3) - archivemail.archive(self.maildir) - self.verify() - - def testNew(self): - self.make_maildir(False, True, messages=3) - archivemail.archive(self.maildir) - self.verify() - - def testMixed(self): - self.make_maildir(True, True, messages=3) - archivemail.archive(self.maildir) - self.verify() - - def testMixedExisting(self): - self.make_maildir(True, True, messages=3, make_old_archive=True) - archivemail.archive(self.maildir) - self.verify() - - def tearDown(self): - archivemail.options.quiet = False - super(TestMaildir, self).tearDown() - - -class TestMaildirPreserveUnread(TestCaseInTempdir): - """Test if the preserve_unread option works with maildirs.""" - def setUp(self): - super(TestMaildirPreserveUnread, self).setUp() - archivemail.options.quiet = True - archivemail.options.preserve_unread = True - - def testOldRead(self): - """--preserve-unread archives old read messages in a maildir.""" - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*181) - smd.write(msg, new=False, flags='S') - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert archivemail.should_archive(msg_obj) - - def testOldUnread(self): - """--preserve-unread preserves old unread messages in a maildir.""" - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*181) - smd.write(msg, new=False) - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert not archivemail.should_archive(msg_obj) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.preserve_unread = False - super(TestMaildirPreserveUnread, self).tearDown() - -class TestMaildirAll(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildirAll, self).setUp() - archivemail.options.quiet = True - archivemail.options.archive_all = True - - def testNew(self): - """New maildir messages should be archived with --all""" - self.add_messages(hours_old=24*181) - md = mailbox.Maildir(self.orig_maildir_obj.root) - msg_obj = md.next() - assert archivemail.should_archive(msg_obj) - - def testOld(self): - """Old maildir messages should be archived with --all""" - self.add_messages(hours_old=24*179) - md = mailbox.Maildir(self.orig_maildir_obj.root) - msg_obj = md.next() - assert archivemail.should_archive(msg_obj) - - def tearDown(self): - super(TestMaildirAll, self).tearDown() - archivemail.options.quiet = False - archivemail.options.archive_all = False - -class TestMaildirDryRun(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildirDryRun, self).setUp() - archivemail.options.quiet = True - archivemail.options.dry_run = True - - def testOld(self): - """archiving an old maildir mailbox with the 'dry-run' option""" - self.make_maildir(True, False) - archivemail.archive(self.maildir) - self.verify() - - def tearDown(self): - super(TestMaildirDryRun, self).tearDown() - archivemail.options.quiet = False - archivemail.options.dry_run = False - -class TestMaildirDelete(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildirDelete, self).setUp() - archivemail.options.quiet = True - archivemail.options.delete_old_mail = True - - def testOld(self): - """archiving an old maildir mailbox with the 'delete' option""" - self.make_maildir(True, False) - archivemail.archive(self.maildir) - self.verify() - - def testNew(self): - """archiving a new maildir mailbox with the 'delete' option""" - self.make_maildir(False, True) - archivemail.archive(self.maildir) - self.verify() - - def tearDown(self): - super(TestMaildirDelete, self).tearDown() - archivemail.options.quiet = False - archivemail.options.delete_old_mail = False - -class TestMaildirCopy(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildirCopy, self).setUp() - archivemail.options.quiet = True - archivemail.options.copy_old_mail = True - - def testOld(self): - """archiving an old maildir mailbox with the 'copy' option""" - self.make_maildir(True, False) - archivemail.archive(self.maildir) - self.verify() - - def testNew(self): - """archiving a new maildir mailbox with the 'copy' option""" - self.make_maildir(False, True) - archivemail.archive(self.maildir) - self.verify() - - def tearDown(self): - super(TestMaildirCopy, self).tearDown() - archivemail.options.quiet = False - archivemail.options.copy_old_mail = False - -class TestArchiveMaildirFlagged(TestCaseInTempdir): - """make sure the 'include_flagged' option works with maildir messages""" - def setUp(self): - super(TestArchiveMaildirFlagged, self).setUp() - archivemail.options.include_flagged = False - archivemail.options.quiet = True - - def testOld(self): - """by default, old flagged maildir messages should not be archived""" - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*181) - smd.write(msg, new=False, flags='F') - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert not archivemail.should_archive(msg_obj) - - def testIncludeFlaggedNew(self): - """new flagged maildir messages should not be archived with include_flagged""" - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*179) - smd.write(msg, new=False, flags='F') - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert not archivemail.should_archive(msg_obj) - - def testIncludeFlaggedOld(self): - """old flagged maildir messages should be archived with include_flagged""" - archivemail.options.include_flagged = True - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*181) - smd.write(msg, new=False, flags='F') - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert archivemail.should_archive(msg_obj) - - def tearDown(self): - super(TestArchiveMaildirFlagged, self).tearDown() - archivemail.options.include_flagged = False - archivemail.options.quiet = False - -class TestArchiveMaildirSize(TestCaseInTempdir): - """check that the 'size' argument works with maildir messages""" - def setUp(self): - super(TestArchiveMaildirSize, self).setUp() - archivemail.options.quiet = True - msg = make_message(hours_old=24*181) - self.msg_size = len(msg) - smd = SimpleMaildir("orig") - smd.write(msg, new=False) - md = mailbox.Maildir(smd.root) - self.msg_obj = md.next() - - def testSmaller(self): - """giving a size argument smaller than the maildir message""" - archivemail.options.min_size = self.msg_size - 1 - assert archivemail.should_archive(self.msg_obj) - - def testBigger(self): - """giving a size argument bigger than the maildir message""" - archivemail.options.min_size = self.msg_size + 1 - assert not archivemail.should_archive(self.msg_obj) - - def tearDown(self): - super(TestArchiveMaildirSize, self).tearDown() - archivemail.options.quiet = False - archivemail.options.min_size = None - -########## helper routines ############ - -def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False): - headers = copy.copy(default_headers) - if not headers: - headers = {} - headers['Message-Id'] = make_msgid() - if not headers.has_key('Date'): - time_message = time.time() - (60 * 60 * hours_old) - headers['Date'] = time.asctime(time.localtime(time_message)) - if not headers.has_key('From'): - headers['From'] = "sender@dummy.domain" - if not headers.has_key('To'): - headers['To'] = "receipient@dummy.domain" - if not headers.has_key('Subject'): - headers['Subject'] = "This is the subject" - if mkfrom and not headers.has_key('From_'): - headers['From_'] = "%s %s" % (headers['From'], headers['Date']) - if not body: - body = "This is the message body" - - msg = "" - if headers.has_key('From_'): - msg = msg + ("From %s\n" % headers['From_']) - del headers['From_'] - for key in headers.keys(): - if headers[key] is not None: - msg = msg + ("%s: %s\n" % (key, headers[key])) - msg = msg + "\n\n" + body + "\n\n" - if not wantobj: - return msg - fp = cStringIO.StringIO(msg) - return rfc822.Message(fp) - -def append_file(source, dest): - """appends the file named 'source' to the file named 'dest'""" - assert os.path.isfile(source) - assert os.path.isfile(dest) - read = open(source, "r") - write = open(dest, "a+") - shutil.copyfileobj(read,write) - read.close() - write.close() - - -def make_mbox(body=None, headers=None, hours_old=0, messages=1): - assert tempfile.tempdir - fd, name = tempfile.mkstemp() - file = os.fdopen(fd, "w") - for count in range(messages): - msg = make_message(body=body, default_headers=headers, - mkfrom=True, hours_old=hours_old) - file.write(msg) - file.close() - return name - -def make_archive_and_plain_copy(archive_name): - """Make an mbox archive of the given name like archivemail may have - created it. Also make an uncompressed copy of this archive and return its - name.""" - copy_fd, copy_name = tempfile.mkstemp() - copy_fp = os.fdopen(copy_fd, "w") - if archivemail.options.no_compress: - fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) - fp = os.fdopen(fd, "w") - else: - archive_name += ".gz" - fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) - rawfp = os.fdopen(fd, "w") - fp = gzip.GzipFile(fileobj=rawfp) - for count in range(3): - msg = make_message(hours_old=24*360) - fp.write(msg) - copy_fp.write(msg) - fp.close() - copy_fp.close() - if not archivemail.options.no_compress: - rawfp.close() - return copy_name - -def copy_maildir(maildir, prefix="tmp"): - """Create a copy of the given maildir and return the absolute path of the - new direcory.""" - newdir = tempfile.mkdtemp(prefix=prefix) - for d in "cur", "new", "tmp": - shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d)) - return newdir - -def assertEqualContent(firstfile, secondfile, zippedfirst=False): - """Verify that the two files exist and have identical content. If zippedfirst - is True, assume that firstfile is gzip-compressed.""" - assert os.path.exists(firstfile) - assert os.path.exists(secondfile) - if zippedfirst: - try: - fp1 = gzip.GzipFile(firstfile, "r") - fp2 = open(secondfile, "r") - assert cmp_fileobj(fp1, fp2) - finally: - fp1.close() - fp2.close() - else: - assert filecmp.cmp(firstfile, secondfile, shallow=0) - -def cmp_fileobj(fp1, fp2): - """Return if reading the fileobjects yields identical content.""" - bufsize = 8192 - while True: - b1 = fp1.read(bufsize) - b2 = fp2.read(bufsize) - if b1 != b2: - return False - if not b1: - return True - -if __name__ == "__main__": - unittest.main() -- cgit v1.2.3