From febd030e1461d4f412dfb98e58d6219eeb01ebd2 Mon Sep 17 00:00:00 2001 From: Nikolaus Schulz Date: Thu, 29 Jul 2010 20:49:39 +0200 Subject: Drop .py extension from the unittest script --- test_archivemail.py | 1593 --------------------------------------------------- 1 file changed, 1593 deletions(-) delete mode 100755 test_archivemail.py (limited to 'test_archivemail.py') diff --git a/test_archivemail.py b/test_archivemail.py deleted file mode 100755 index 4a11734..0000000 --- a/test_archivemail.py +++ /dev/null @@ -1,1593 +0,0 @@ -#! /usr/bin/env python -############################################################################ -# Copyright (C) 2002 Paul Rodger -# (C) 2006-2010 Nikolaus Schulz -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -############################################################################ -""" -Unit-test archivemail using 'PyUnit'. - -TODO: add tests for: - * dotlock locks already existing - * archiving MH-format mailboxes - * a 3rd party process changing the mbox file being read - -""" - -import sys - -def check_python_version(): - """Abort if we are running on python < v2.3""" - too_old_error = "This test script requires python version 2.3 or later. " + \ - "Your version of python is:\n%s" % sys.version - try: - version = sys.version_info # we might not even have this function! :) - if (version[0] < 2) or (version[0] == 2 and version[1] < 3): - print too_old_error - sys.exit(1) - except AttributeError: - print too_old_error - sys.exit(1) - -# define & run this early because 'unittest' requires Python >= 2.1 -check_python_version() - -import copy -import fcntl -import filecmp -import os -import re -import shutil -import stat -import tempfile -import time -import unittest -import gzip -import cStringIO -import rfc822 -import mailbox - -try: - import archivemail -except ImportError: - print "The archivemail script needs to be called 'archivemail.py'" - print "and should be in the current directory in order to be imported" - print "and tested. Sorry." - if os.path.isfile("archivemail"): - print "Try renaming it from 'archivemail' to 'archivemail.py'." - sys.exit(1) - -# We want to iterate over messages in a compressed archive mbox and verify -# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in -# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered -# by mailbox._PartialFile.seek(). The bug is still pending as of Python -# 2.5.2. To work around it, we subclass gzip.GzipFile. -# -# It should be noted that seeking backwards in a GzipFile is emulated by -# re-reading the entire file from the beginning, which is extremely -# inefficient and won't work with large files; but our test archives are all -# small, so it's okay. - -class FixedGzipFile(gzip.GzipFile): - """GzipFile with seek method accepting whence parameter.""" - def seek(self, offset, whence=0): - try: - gzip.GzipFile.seek(self, offset, whence) - except TypeError: - if whence: - if whence == 1: - offset = self.offset + offset - else: - raise ValueError('Seek from end not supported') - gzip.GzipFile.seek(self, offset) - -# precision of os.utime() when restoring mbox timestamps -utimes_precision = 5 - -class MessageIdFactory: - """Factory to create `uniqe' message-ids.""" - def __init__(self): - self.seq = 0 - def __call__(self): - self.seq += 1 - return "" % self.seq - -make_msgid = MessageIdFactory() - -class IndexedMailboxDir: - """An indexed mailbox directory, providing random message access by - message-id. Intended as a base class for a maildir and an mh subclass.""" - - def __init__(self, mdir_name): - assert tempfile.tempdir - self.root = tempfile.mkdtemp(prefix=mdir_name) - self.msg_id_dict = {} - self.deliveries = 0 - - def _add_to_index(self, msg_text, fpath): - """Add the given message to the index, for later random access.""" - # Extract the message-id as index key - msg_id = None - fp = cStringIO.StringIO(msg_text) - while True: - line = fp.readline() - # line empty means we didn't find a message-id - assert line - if line.lower().startswith("message-id:"): - msg_id = line.split(":", 1)[-1].strip() - assert msg_id - break - assert not self.msg_id_dict.has_key(msg_id) - self.msg_id_dict[msg_id] = fpath - - def get_all_filenames(self): - """Return all relative pathnames of files in this mailbox.""" - return self.msg_id_dict.values() - -class SimpleMaildir(IndexedMailboxDir): - """Primitive Maildir class, just good enough for generating short-lived - test maildirs.""" - - def __init__(self, mdir_name='maildir'): - IndexedMailboxDir.__init__(self, mdir_name) - for d in "cur", "tmp", "new": - os.mkdir(os.path.join(self.root, d)) - - def write(self, msg_str, new=True, flags=[]): - """Store a message with the given flags.""" - assert not (new and flags) - if new: - subdir = "new" - else: - subdir = "cur" - fname = self._mkname(new, flags) - relpath = os.path.join(subdir, fname) - path = os.path.join(self.root, relpath) - assert not os.path.exists(path) - f = open(path, "w") - f.write(msg_str) - f.close() - self._add_to_index(msg_str, relpath) - - def _mkname(self, new, flags): - """Generate a unique filename for a new message.""" - validflags = 'DFPRST' - for f in flags: - assert f in validflags - # This 'unique' name should be good enough, since nobody else - # will ever write messages to this maildir folder. - uniq = str(self.deliveries) - self.deliveries += 1 - if new: - return uniq - if not flags: - return uniq + ':2,' - finfo = "".join(sorted(flags)) - return uniq + ':2,' + finfo - - def get_message_and_mbox_status(self, msgid): - """For the Message-Id msgid, return the matching message in text - format and its status, expressed as a set of mbox flags.""" - fpath = self.msg_id_dict[msgid] # Barfs if not found - mdir_flags = fpath.rsplit('2,', 1)[-1] - flagmap = { - 'F': 'F', - 'R': 'A', - 'S': 'R' - } - mbox_flags = set([flagmap[x] for x in mdir_flags]) - if fpath.startswith("cur/"): - mbox_flags.add('O') - fp = open(os.path.join(self.root, fpath), "r") - msg = fp.read() - fp.close() - return msg, mbox_flags - - -class TestCaseInTempdir(unittest.TestCase): - """Base class for testcases that need to create temporary files. - All testcases that create temporary files should be derived from this - class, not directly from unittest.TestCase. - TestCaseInTempdir provides these methods: - - setUp() Creates a safe temporary directory and sets tempfile.tempdir. - - tearDown() Recursively removes the temporary directory and unsets - tempfile.tempdir. - - Overriding methods should call the ones above.""" - temproot = None - - def setUp(self): - if not self.temproot: - assert not tempfile.tempdir - self.temproot = tempfile.tempdir = \ - tempfile.mkdtemp(prefix="test-archivemail") - - def tearDown(self): - assert tempfile.tempdir == self.temproot - if self.temproot: - shutil.rmtree(self.temproot) - tempfile.tempdir = self.temproot = None - - -############ Mbox Class testing ############## - -class TestMboxDotlock(TestCaseInTempdir): - def setUp(self): - super(TestMboxDotlock, self).setUp() - self.mbox_name = make_mbox() - self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.mbox = archivemail.Mbox(self.mbox_name) - - def testDotlock(self): - """dotlock_lock/unlock should create/delete a lockfile""" - lock = self.mbox_name + ".lock" - self.mbox._dotlock_lock() - assert os.path.isfile(lock) - self.mbox._dotlock_unlock() - assert not os.path.isfile(lock) - - def testDotlockingSucceedsUponEACCES(self): - """A dotlock should silently be omitted upon EACCES.""" - archivemail.options.quiet = True - mbox_dir = os.path.dirname(self.mbox_name) - os.chmod(mbox_dir, 0500) - try: - self.mbox._dotlock_lock() - finally: - os.chmod(mbox_dir, 0700) - archivemail.options.quiet = False - -class TestMboxPosixLock(TestCaseInTempdir): - def setUp(self): - super(TestMboxPosixLock, self).setUp() - self.mbox_name = make_mbox() - self.mbox = archivemail.Mbox(self.mbox_name) - - def testPosixLock(self): - """posix_lock/unlock should create/delete an advisory lock""" - - # The following code snippet heavily lends from the Python 2.5 mailbox - # unittest. - # BEGIN robbery: - - # Fork off a subprocess that will lock the file for 2 seconds, - # unlock it, and then exit. - pid = os.fork() - if pid == 0: - # In the child, lock the mailbox. - self.mbox._posix_lock() - time.sleep(2) - self.mbox._posix_unlock() - os._exit(0) - - # In the parent, sleep a bit to give the child time to acquire - # the lock. - time.sleep(0.5) - # The parent's file self.mbox.mbox_file shares fcntl locks with the - # duplicated FD in the child; reopen it so we get a different file - # table entry. - file = open(self.mbox_name, "r+") - lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB - fd = file.fileno() - try: - self.assertRaises(IOError, fcntl.lockf, fd, lock_nb) - - finally: - # Wait for child to exit. Locking should now succeed. - exited_pid, status = os.waitpid(pid, 0) - - fcntl.lockf(fd, lock_nb) - fcntl.lockf(fd, fcntl.LOCK_UN) - # END robbery - - -class TestMboxNext(TestCaseInTempdir): - def setUp(self): - super(TestMboxNext, self).setUp() - self.not_empty_name = make_mbox(messages=18) - self.empty_name = make_mbox(messages=0) - - def testNextEmpty(self): - """mbox.next() should return None on an empty mailbox""" - mbox = archivemail.Mbox(self.empty_name) - msg = mbox.next() - self.assertEqual(msg, None) - - def testNextNotEmpty(self): - """mbox.next() should a message on a populated mailbox""" - mbox = archivemail.Mbox(self.not_empty_name) - for count in range(18): - msg = mbox.next() - assert msg - msg = mbox.next() - self.assertEqual(msg, None) - - -############ TempMbox Class testing ############## - -class TestTempMboxWrite(TestCaseInTempdir): - def setUp(self): - super(TestTempMboxWrite, self).setUp() - - def testWrite(self): - """mbox.write() should append messages to a mbox mailbox""" - read_file = make_mbox(messages=3) - mbox_read = archivemail.Mbox(read_file) - mbox_write = archivemail.TempMbox() - write_file = mbox_write.mbox_file_name - for count in range(3): - msg = mbox_read.next() - mbox_write.write(msg) - mbox_read.close() - mbox_write.close() - assert filecmp.cmp(read_file, write_file, shallow=0) - - def testWriteNone(self): - """calling mbox.write() with no message should raise AssertionError""" - write = archivemail.TempMbox() - self.assertRaises(AssertionError, write.write, None) - -class TestTempMboxRemove(TestCaseInTempdir): - def setUp(self): - super(TestTempMboxRemove, self).setUp() - self.mbox = archivemail.TempMbox() - self.mbox_name = self.mbox.mbox_file_name - - def testMboxRemove(self): - """remove() should delete a mbox mailbox""" - assert os.path.exists(self.mbox_name) - self.mbox.remove() - assert not os.path.exists(self.mbox_name) - - - -########## options class testing ################# - -class TestOptionDefaults(unittest.TestCase): - def testVerbose(self): - """verbose should be off by default""" - self.assertEqual(archivemail.options.verbose, False) - - def testDaysOldMax(self): - """default archival time should be 180 days""" - self.assertEqual(archivemail.options.days_old_max, 180) - - def testQuiet(self): - """quiet should be off by default""" - self.assertEqual(archivemail.options.quiet, False) - - def testDeleteOldMail(self): - """we should not delete old mail by default""" - self.assertEqual(archivemail.options.delete_old_mail, False) - - def testNoCompress(self): - """no-compression should be off by default""" - self.assertEqual(archivemail.options.no_compress, False) - - def testIncludeFlagged(self): - """we should not archive flagged messages by default""" - self.assertEqual(archivemail.options.include_flagged, False) - - def testPreserveUnread(self): - """we should not preserve unread messages by default""" - self.assertEqual(archivemail.options.preserve_unread, False) - -class TestOptionParser(unittest.TestCase): - def setUp(self): - self.oldopts = copy.copy(archivemail.options) - - def testOptionDate(self): - """--date and -D options are parsed correctly""" - date_formats = ( - "%Y-%m-%d", # ISO format - "%d %b %Y" , # Internet format - "%d %B %Y" , # Internet format with full month names - ) - date = time.strptime("2000-07-29", "%Y-%m-%d") - unixdate = time.mktime(date) - for df in date_formats: - d = time.strftime(df, date) - for opt in '-D', '--date=': - archivemail.options.date_old_max = None - archivemail.options.parse_args([opt+d], "") - self.assertEqual(unixdate, archivemail.options.date_old_max) - - def testOptionPreserveUnread(self): - """--preserve-unread option is parsed correctly""" - archivemail.options.parse_args(["--preserve-unread"], "") - assert archivemail.options.preserve_unread - archivemail.options.preserve_unread = False - archivemail.options.parse_args(["-u"], "") - assert archivemail.options.preserve_unread - - def testOptionSuffix(self): - """--suffix and -s options are parsed correctly""" - for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): - archivemail.options.parse_args(["--suffix="+suffix], "") - assert archivemail.options.archive_suffix == suffix - archivemail.options.suffix = None - archivemail.options.parse_args(["-s", suffix], "") - assert archivemail.options.archive_suffix == suffix - - def testOptionDryrun(self): - """--dry-run option is parsed correctly""" - archivemail.options.parse_args(["--dry-run"], "") - assert archivemail.options.dry_run - archivemail.options.preserve_unread = False - archivemail.options.parse_args(["-n"], "") - assert archivemail.options.dry_run - - def testOptionDays(self): - """--days and -d options are parsed correctly""" - archivemail.options.parse_args(["--days=11"], "") - self.assertEqual(archivemail.options.days_old_max, 11) - archivemail.options.days_old_max = None - archivemail.options.parse_args(["-d11"], "") - self.assertEqual(archivemail.options.days_old_max, 11) - - def testOptionDelete(self): - """--delete option is parsed correctly""" - archivemail.options.parse_args(["--delete"], "") - assert archivemail.options.delete_old_mail - - def testOptionCopy(self): - """--copy option is parsed correctly""" - archivemail.options.parse_args(["--copy"], "") - assert archivemail.options.copy_old_mail - - def testOptionOutputdir(self): - """--output-dir and -o options are parsed correctly""" - for path in "/just/some/path", "relative/path": - archivemail.options.parse_args(["--output-dir=%s" % path], "") - self.assertEqual(archivemail.options.output_dir, path) - archivemail.options.output_dir = None - archivemail.options.parse_args(["-o%s" % path], "") - self.assertEqual(archivemail.options.output_dir, path) - - def testOptionNocompress(self): - """--no-compress option is parsed correctly""" - archivemail.options.parse_args(["--no-compress"], "") - assert archivemail.options.no_compress - - def testOptionSize(self): - """--size and -S options are parsed correctly""" - size = "666" - archivemail.options.parse_args(["--size=%s" % size ], "") - self.assertEqual(archivemail.options.min_size, int(size)) - archivemail.options.parse_args(["-S%s" % size ], "") - self.assertEqual(archivemail.options.min_size, int(size)) - - def tearDown(self): - archivemail.options = self.oldopts - -########## archivemail.is_older_than_days() unit testing ################# - -class TestIsTooOld(unittest.TestCase): - def testVeryOld(self): - """with max_days=360, should be true for these dates > 1 year""" - for years in range(1, 10): - time_msg = time.time() - (years * 365 * 24 * 60 * 60) - assert archivemail.is_older_than_days(time_message=time_msg, - max_days=360) - - def testOld(self): - """with max_days=14, should be true for these dates > 14 days""" - for days in range(14, 360): - time_msg = time.time() - (days * 24 * 60 * 60) - assert archivemail.is_older_than_days(time_message=time_msg, - max_days=14) - - def testJustOld(self): - """with max_days=1, should be true for these dates >= 1 day""" - for minutes in range(0, 61): - time_msg = time.time() - (25 * 60 * 60) + (minutes * 60) - assert archivemail.is_older_than_days(time_message=time_msg, - max_days=1) - - def testNotOld(self): - """with max_days=9, should be false for these dates < 9 days""" - for days in range(0, 9): - time_msg = time.time() - (days * 24 * 60 * 60) - assert not archivemail.is_older_than_days(time_message=time_msg, - max_days=9) - - def testJustNotOld(self): - """with max_days=1, should be false for these hours <= 1 day""" - for minutes in range(0, 60): - time_msg = time.time() - (23 * 60 * 60) - (minutes * 60) - assert not archivemail.is_older_than_days(time_message=time_msg, - max_days=1) - - def testFuture(self): - """with max_days=1, should be false for times in the future""" - for minutes in range(0, 60): - time_msg = time.time() + (minutes * 60) - assert not archivemail.is_older_than_days(time_message=time_msg, - max_days=1) - -########## archivemail.parse_imap_url() unit testing ################# - -class TestParseIMAPUrl(unittest.TestCase): - def setUp(self): - archivemail.options.quiet = True - archivemail.options.verbose = False - archivemail.options.pwfile = None - - urls_withoutpass = [ - ('imaps://user@example.org@imap.example.org/upperbox/lowerbox', - ('user', None, 'example.org@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://"user@example.org"@imap.example.org/upperbox/lowerbox', - ('user@example.org', None, 'imap.example.org', - 'upperbox/lowerbox')), - ('imaps://user@example.org"@imap.example.org/upperbox/lowerbox', - ('user', None, 'example.org"@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox', - ('"user', None, 'example.org@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox', - ('us"er@example.org', None, 'imap.example.org', - 'upperbox/lowerbox')), - ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox', - ('user\\', None, 'example.org@imap.example.org', - 'upperbox/lowerbox')) - ] - urls_withpass = [ - ('imaps://user@example.org:passwd@imap.example.org/upperbox/lowerbox', - ('user@example.org', 'passwd', 'imap.example.org', - 'upperbox/lowerbox'), - ('user', None, 'example.org:passwd@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox', - ('"user@example.org', "passwd", 'imap.example.org', - 'upperbox/lowerbox'), - ('"user', None, 'example.org:passwd@imap.example.org', - 'upperbox/lowerbox')), - ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox', - ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org', - 'upperbox/lowerbox'), - ('u\\ser\\', None, 'example.org:"p@sswd"@imap.example.org', - 'upperbox/lowerbox')) - ] - # These are invalid when the password's not stripped. - urls_onlywithpass = [ - ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox', - ('user@example.org', "passwd", 'imap.example.org', - 'upperbox/lowerbox')) - ] - def testUrlsWithoutPwfile(self): - """Parse test urls with --pwfile option unset. This parses a password in - the URL, if present.""" - archivemail.options.pwfile = None - for mbstr in self.urls_withpass + self.urls_withoutpass: - url = mbstr[0][mbstr[0].find('://')+3:] - result = archivemail.parse_imap_url(url) - self.assertEqual(result, mbstr[1]) - - def testUrlsWithPwfile(self): - """Parse test urls with --pwfile set. In this case the ':' character - loses its meaning as a delimiter.""" - archivemail.options.pwfile = "whocares.txt" - for mbstr in self.urls_withpass: - url = mbstr[0][mbstr[0].find('://')+3:] - result = archivemail.parse_imap_url(url) - self.assertEqual(result, mbstr[2]) - for mbstr in self.urls_onlywithpass: - url = mbstr[0][mbstr[0].find('://')+3:] - self.assertRaises(archivemail.UnexpectedError, - archivemail.parse_imap_url, url) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.verbose = False - archivemail.options.pwfile = None - -########## acceptance testing ########### - -class TestArchive(TestCaseInTempdir): - """Base class defining helper functions for doing test archiving runs.""" - mbox = None # mbox file that will be processed by archivemail - good_archive = None # Uncompressed reference archive file to verify the - # archive after processing - good_mbox = None # Reference mbox file to verify the mbox after processing - - def verify(self): - assert os.path.exists(self.mbox) - if self.good_mbox is not None: - assertEqualContent(self.mbox, self.good_mbox) - else: - self.assertEqual(os.path.getsize(self.mbox), 0) - archive_name = self.mbox + "_archive" - if not archivemail.options.no_compress: - archive_name += ".gz" - iszipped = True - else: - assert not os.path.exists(archive_name + ".gz") - iszipped = False - if self.good_archive is not None: - assertEqualContent(archive_name, self.good_archive, iszipped) - else: - assert not os.path.exists(archive_name) - - def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): - """Prepare for a test run with an old mbox by making an old mbox, - optionally an existing archive, and a reference archive to verify the - archive after archivemail has run.""" - self.mbox = make_mbox(body, headers, 181*24, messages) - archive_does_change = not (archivemail.options.dry_run or - archivemail.options.delete_old_mail) - mbox_does_not_change = archivemail.options.dry_run or \ - archivemail.options.copy_old_mail - if make_old_archive: - archive = archivemail.make_archive_name(self.mbox) - self.good_archive = make_archive_and_plain_copy(archive) - if archive_does_change: - append_file(self.mbox, self.good_archive) - elif archive_does_change: - self.good_archive = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_archive) - if mbox_does_not_change: - if archive_does_change and not make_old_archive: - self.good_mbox = self.good_archive - else: - self.good_mbox = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_mbox) - - def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): - """Prepare for a test run with a mixed mbox by making a mixed mbox, - optionally an existing archive, a reference archive to verify the - archive after archivemail has run, and likewise a reference mbox to - verify the mbox.""" - self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive) - new_mbox_name = make_mbox(body, headers, 179*24, messages) - append_file(new_mbox_name, self.mbox) - if self.good_mbox is None: - self.good_mbox = new_mbox_name - else: - if self.good_mbox == self.good_archive: - self.good_mbox = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_mbox) - else: - append_file(new_mbox_name, self.good_mbox) - - def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): - """Prepare for a test run with a new mbox by making a new mbox, - optionally an exiting archive, and a reference mbox to verify the mbox - after archivemail has run.""" - self.mbox = make_mbox(body, headers, 179*24, messages) - self.good_mbox = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_mbox) - if make_old_archive: - archive = archivemail.make_archive_name(self.mbox) - self.good_archive = make_archive_and_plain_copy(archive) - - -class TestArchiveMbox(TestArchive): - """archiving should work based on the date of messages given""" - - def setUp(self): - self.oldopts = copy.copy(archivemail.options) - archivemail.options.quiet = True - super(TestArchiveMbox, self).setUp() - - def testOld(self): - """archiving an old mailbox""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOldFromInBody(self): - """archiving an old mailbox with 'From ' in the body""" - body = """This is a message with ^From at the start of a line -From is on this line -This is after the ^From line""" - self.make_old_mbox(messages=3, body=body) - archivemail.archive(self.mbox) - self.verify() - - def testDateSystem(self): - """test that the --date option works as expected""" - test_headers = ( - { - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : None, - }, - { - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : None, - 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : None, - 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - ) - for headers in test_headers: - msg = make_message(default_headers=headers, wantobj=True) - date = time.strptime("2000-07-29", "%Y-%m-%d") - archivemail.options.date_old_max = time.mktime(date) - assert archivemail.should_archive(msg) - date = time.strptime("2000-07-27", "%Y-%m-%d") - archivemail.options.date_old_max = time.mktime(date) - assert not archivemail.should_archive(msg) - - def testMixed(self): - """archiving a mixed mailbox""" - self.make_mixed_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testNew(self): - """archiving a new mailbox""" - self.make_new_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOldExisting(self): - """archiving an old mailbox with an existing archive""" - self.make_old_mbox(messages=3, make_old_archive=True) - archivemail.archive(self.mbox) - self.verify() - - def testOldWeirdHeaders(self): - """archiving old mailboxes with weird headers""" - weird_headers = ( - { # we should archive because of the date on the 'From_' line - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000', - }, - { # we should archive because of the date on the 'From_' line - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : None, - }, - { # we should archive because of the date in 'Delivery-date' - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', - 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { # we should archive because of the date in 'Delivery-date' - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : None, - 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { # we should archive because of the date in 'Resent-Date' - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', - 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { # we should archive because of the date in 'Resent-Date' - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', - 'Date' : None, - 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', - }, - { # completely blank dates were crashing < version 0.4.7 - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : '', - }, - { # completely blank dates were crashing < version 0.4.7 - 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', - 'Date' : '', - 'Resent-Date' : '', - }, - ) - fd, self.mbox = tempfile.mkstemp() - fp = os.fdopen(fd, "w") - for headers in weird_headers: - msg_text = make_message(default_headers=headers) - fp.write(msg_text*2) - fp.close() - self.good_archive = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox, self.good_archive) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options = self.oldopts - super(TestArchiveMbox, self).tearDown() - - -class TestArchiveMboxTimestamp(TestCaseInTempdir): - """original mbox timestamps should always be preserved""" - def setUp(self): - super(TestArchiveMboxTimestamp, self).setUp() - archivemail.options.quiet = True - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180)) - self.mtime = os.path.getmtime(self.mbox_name) - 66 - self.atime = os.path.getatime(self.mbox_name) - 88 - os.utime(self.mbox_name, (self.atime, self.mtime)) - - def testNew(self): - """mbox timestamps should not change after no archival""" - archivemail.options.days_old_max = 181 - archivemail.archive(self.mbox_name) - self.verify() - - def testOld(self): - """mbox timestamps should not change after archival""" - archivemail.options.days_old_max = 179 - archivemail.archive(self.mbox_name) - self.verify() - - def verify(self): - assert os.path.exists(self.mbox_name) - new_atime = os.path.getatime(self.mbox_name) - new_mtime = os.path.getmtime(self.mbox_name) - self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision) - self.assertAlmostEqual(self.atime, new_atime, utimes_precision) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.days_old_max = 180 - os.remove(self.mbox_name) - super(TestArchiveMboxTimestamp, self).tearDown() - - -class TestArchiveMboxAll(unittest.TestCase): - def setUp(self): - archivemail.options.quiet = True - archivemail.options.archive_all = True - - def testNew(self): - """new messages should be archived with --all""" - self.msg = make_message(hours_old=24*179, wantobj=True) - assert archivemail.should_archive(self.msg) - - def testOld(self): - """old messages should be archived with --all""" - self.msg = make_message(hours_old=24*181, wantobj=True) - assert archivemail.should_archive(self.msg) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.archive_all = False - -class TestArchiveMboxPreserveUnread(unittest.TestCase): - """make sure the 'preserve_unread' option works""" - def setUp(self): - archivemail.options.quiet = True - archivemail.options.preserve_unread = True - self.msg = make_message(hours_old=24*181, wantobj=True) - - def testOldRead(self): - """old read messages should be archived with --preserve-unread""" - self.msg["Status"] = "RO" - assert archivemail.should_archive(self.msg) - - def testOldUnread(self): - """old unread messages should not be archived with --preserve-unread""" - self.msg["Status"] = "O" - assert not archivemail.should_archive(self.msg) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.preserve_unread = False - - -class TestArchiveMboxSuffix(unittest.TestCase): - """make sure the 'suffix' option works""" - def setUp(self): - self.old_suffix = archivemail.options.archive_suffix - archivemail.options.quiet = True - - def testSuffix(self): - """archiving with specified --suffix arguments""" - for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): - mbox_name = "foobar" - archivemail.options.archive_suffix = suffix - days_old_max = 180 - parsed_suffix_time = time.time() - days_old_max*24*60*60 - parsed_suffix = time.strftime(suffix, - time.localtime(parsed_suffix_time)) - archive_name = mbox_name + parsed_suffix - self.assertEqual(archive_name, - archivemail.make_archive_name(mbox_name)) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.archive_suffix = self.old_suffix - - -class TestArchiveDryRun(TestArchive): - """make sure the 'dry-run' option works""" - def setUp(self): - super(TestArchiveDryRun, self).setUp() - archivemail.options.quiet = True - archivemail.options.dry_run = True - - def testOld(self): - """archiving an old mailbox with the 'dry-run' option""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options.dry_run = False - archivemail.options.quiet = False - super(TestArchiveDryRun, self).tearDown() - - -class TestArchiveDelete(TestArchive): - """make sure the 'delete' option works""" - def setUp(self): - super(TestArchiveDelete, self).setUp() - archivemail.options.quiet = True - archivemail.options.delete_old_mail = True - - def testNew(self): - """archiving a new mailbox with the 'delete' option""" - self.make_new_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testMixed(self): - """archiving a mixed mailbox with the 'delete' option""" - self.make_mixed_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOld(self): - """archiving an old mailbox with the 'delete' option""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options.delete_old_mail = False - archivemail.options.quiet = False - super(TestArchiveDelete, self).tearDown() - - -class TestArchiveCopy(TestArchive): - """make sure the 'copy' option works""" - def setUp(self): - super(TestArchiveCopy, self).setUp() - archivemail.options.quiet = True - archivemail.options.copy_old_mail = True - - def testNew(self): - """archiving a new mailbox with the 'copy' option""" - self.make_new_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testMixed(self): - """archiving a mixed mailbox with the 'copy' option""" - self.make_mixed_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOld(self): - """archiving an old mailbox with the 'copy' option""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options.copy_old_mail = False - archivemail.options.quiet = False - super(TestArchiveCopy, self).tearDown() - - -class TestArchiveMboxFlagged(unittest.TestCase): - """make sure the 'include_flagged' option works""" - def setUp(self): - archivemail.options.include_flagged = False - archivemail.options.quiet = True - - def testOld(self): - """by default, old flagged messages should not be archived""" - msg = make_message(default_headers={"X-Status": "F"}, - hours_old=24*181, wantobj=True) - assert not archivemail.should_archive(msg) - - def testIncludeFlaggedNew(self): - """new flagged messages should not be archived with include_flagged""" - msg = make_message(default_headers={"X-Status": "F"}, - hours_old=24*179, wantobj=True) - assert not archivemail.should_archive(msg) - - def testIncludeFlaggedOld(self): - """old flagged messages should be archived with include_flagged""" - archivemail.options.include_flagged = True - msg = make_message(default_headers={"X-Status": "F"}, - hours_old=24*181, wantobj=True) - assert archivemail.should_archive(msg) - - def tearDown(self): - archivemail.options.include_flagged = False - archivemail.options.quiet = False - - -class TestArchiveMboxOutputDir(unittest.TestCase): - """make sure that the 'output-dir' option works""" - def setUp(self): - archivemail.options.quiet = True - - def testOld(self): - """archiving an old mailbox with a sepecified output dir""" - for dir in "/just/a/path", "relative/path": - archivemail.options.output_dir = dir - archive_dir = archivemail.make_archive_name("/tmp/mbox") - self.assertEqual(dir, os.path.dirname(archive_dir)) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.output_dir = None - - -class TestArchiveMboxUncompressed(TestArchive): - """make sure that the 'no_compress' option works""" - mbox_name = None - new_mbox = None - old_mbox = None - copy_name = None - - def setUp(self): - archivemail.options.quiet = True - archivemail.options.no_compress = True - super(TestArchiveMboxUncompressed, self).setUp() - - def testOld(self): - """archiving an old mailbox uncompressed""" - self.make_old_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testNew(self): - """archiving a new mailbox uncompressed""" - self.make_new_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testMixed(self): - """archiving a mixed mailbox uncompressed""" - self.make_mixed_mbox(messages=3) - archivemail.archive(self.mbox) - self.verify() - - def testOldExists(self): - """archiving an old mailbox uncopressed with an existing archive""" - self.make_old_mbox(messages=3, make_old_archive=True) - archivemail.archive(self.mbox) - self.verify() - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.no_compress = False - super(TestArchiveMboxUncompressed, self).tearDown() - - -class TestArchiveSize(unittest.TestCase): - """check that the 'size' argument works""" - def setUp(self): - archivemail.options.quiet = True - msg_text = make_message(hours_old=24*181) - self.msg_size = len(msg_text) - fp = cStringIO.StringIO(msg_text) - self.msg = rfc822.Message(fp) - - def testSmaller(self): - """giving a size argument smaller than the message""" - archivemail.options.min_size = self.msg_size - 1 - assert archivemail.should_archive(self.msg) - - def testBigger(self): - """giving a size argument bigger than the message""" - archivemail.options.min_size = self.msg_size + 1 - assert not archivemail.should_archive(self.msg) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.min_size = None - - -############# Test archiving maildirs ############### - -class TestArchiveMailboxdir(TestCaseInTempdir): - """Base class defining helper functions for doing test archive runs with - maildirs.""" - maildir = None # Maildir that will be processed by archivemail - orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object - remaining_msg = set() # Filenames of maildir messages that should be preserved - number_archived = 0 # Number of messages that get archived - orig_archive = None # An uncompressed copy of a pre-existing archive, - # if one exists - - def setUp(self): - super(TestArchiveMailboxdir, self).setUp() - self.orig_maildir_obj = SimpleMaildir() - - def verify(self): - self._verify_remaining() - self._verify_archive() - - def _verify_remaining(self): - """Verify that the preserved messages weren't altered.""" - assert self.maildir - # Compare maildir with backup object. - dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root) - # Top-level has only directories cur, new, tmp and must be unchanged. - self.assertEqual(dcmp.left_list, dcmp.right_list) - found = set() - for d in dcmp.common_dirs: - dcmp2 = dcmp.subdirs[d] - # We need to verify three things. - # 1. directory is a subset of the original... - assert not dcmp2.left_only - # 2. all common files are identical... - self.assertEqual(dcmp2.common_files, dcmp2.same_files) - found = found.union([os.path.join(d, x) for x in dcmp2.common_files]) - # 3. exactly the `new' messages (recorded in self.remaining_msg) - # were preserved. - self.assertEqual(found, self.remaining_msg) - - def _verify_archive(self): - """Verify the archive correctness.""" - # TODO: currently make_archive_name does not include the .gz suffix. - # Is this something that should be fixed? - archive = archivemail.make_archive_name(self.maildir) - if archivemail.options.no_compress: - iszipped = False - else: - archive += '.gz' - iszipped = True - if self.number_archived == 0: - if self.orig_archive: - assertEqualContent(archive, self.orig_archive, iszipped) - else: - assert not os.path.exists(archive) - return - fp_new = fp_archive = tmp_archive_name = None - try: - if self.orig_archive: - new_size = os.path.getsize(archive) - # Brute force: split archive in old and new part and verify the - # parts separately. (Of course this destroys the archive.) - fp_archive = open(archive, "r+") - fp_archive.seek(self.orig_archive_size) - fd, tmp_archive_name = tempfile.mkstemp() - fp_new = os.fdopen(fd, "w") - shutil.copyfileobj(fp_archive, fp_new) - fp_new.close() - fp_archive.truncate(self.orig_archive_size) - fp_archive.close() - assertEqualContent(archive, self.orig_archive, iszipped) - new_archive = tmp_archive_name - else: - new_archive = archive - if archivemail.options.no_compress: - fp_archive = open(new_archive, "r") - else: - fp_archive = FixedGzipFile(new_archive, "r") - mb = mailbox.UnixMailbox(fp_archive) - found = 0 - for msg in mb: - self.verify_maildir_has_msg(self.orig_maildir_obj, msg) - found += 1 - self.assertEqual(found, self.number_archived) - finally: - if tmp_archive_name: - os.remove(tmp_archive_name) - if fp_new is not None: - fp_new.close() - if fp_archive is not None: - fp_archive.close() - - def verify_maildir_has_msg(self, maildir, msg): - """Assert that the given maildir has a copy of the rfc822 message.""" - mid = msg['Message-Id'] # Complains if there is no message-id - mdir_msg_str, mdir_flags = \ - maildir.get_message_and_mbox_status(mid) - mbox_flags = set(msg.get('status', '') + msg.get('x-status', '')) - self.assertEqual(mdir_flags, mbox_flags) - - headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'), - msg.headers) - headers = "".join(headers) - msg.rewindbody() - # Discard last mbox LF which is not part of the message. - body = msg.fp.read()[:-1] - msg_str = headers + os.linesep + body - self.assertEqual(mdir_msg_str, msg_str) - - def add_messages(self, body=None, headers=None, hours_old=0, messages=1): - for count in range(messages): - msg = make_message(body, default_headers=headers, mkfrom=False, - hours_old=hours_old) - self.orig_maildir_obj.write(msg, new=False) - - def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1, - make_old_archive=False): - mailbox_does_change = not (archivemail.options.dry_run or - archivemail.options.copy_old_mail) - archive_does_change = not (archivemail.options.dry_run or - archivemail.options.delete_old_mail) - if mknew: - self.add_messages(body, headers, 179*24, messages) - if archive_does_change and archivemail.options.archive_all: - self.number_archived += messages - if mailbox_does_change: - self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) - if mkold: - self.add_messages(body, headers, 181*24, messages) - if archive_does_change: - self.number_archived += messages - if not mailbox_does_change: - self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) - self.maildir = copy_maildir(self.orig_maildir_obj.root) - if make_old_archive: - archive = archivemail.make_archive_name(self.maildir) - self.orig_archive = make_archive_and_plain_copy(archive) - # FIXME: .gz extension handling is a mess II - if not archivemail.options.no_compress: - archive += '.gz' - self.orig_archive_size = os.path.getsize(archive) - -class TestEmptyMaildir(TestCaseInTempdir): - def setUp(self): - super(TestEmptyMaildir, self).setUp() - archivemail.options.quiet = True - - def testEmpty(self): - """Archiving an empty maildir should not result in an archive.""" - self.mdir = SimpleMaildir() - archivemail.archive(self.mdir.root) - assert not os.path.exists(self.mdir.root + '_archive.gz') - - def tearDown(self): - super(TestEmptyMaildir, self).tearDown() - archivemail.options.quiet = False - -class TestMaildir(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildir, self).setUp() - archivemail.options.quiet = True - - def testOld(self): - self.make_maildir(True, False, messages=3) - archivemail.archive(self.maildir) - self.verify() - - def testNew(self): - self.make_maildir(False, True, messages=3) - archivemail.archive(self.maildir) - self.verify() - - def testMixed(self): - self.make_maildir(True, True, messages=3) - archivemail.archive(self.maildir) - self.verify() - - def testMixedExisting(self): - self.make_maildir(True, True, messages=3, make_old_archive=True) - archivemail.archive(self.maildir) - self.verify() - - def tearDown(self): - archivemail.options.quiet = False - super(TestMaildir, self).tearDown() - - -class TestMaildirPreserveUnread(TestCaseInTempdir): - """Test if the preserve_unread option works with maildirs.""" - def setUp(self): - super(TestMaildirPreserveUnread, self).setUp() - archivemail.options.quiet = True - archivemail.options.preserve_unread = True - - def testOldRead(self): - """--preserve-unread archives old read messages in a maildir.""" - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*181) - smd.write(msg, new=False, flags='S') - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert archivemail.should_archive(msg_obj) - - def testOldUnread(self): - """--preserve-unread preserves old unread messages in a maildir.""" - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*181) - smd.write(msg, new=False) - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert not archivemail.should_archive(msg_obj) - - def tearDown(self): - archivemail.options.quiet = False - archivemail.options.preserve_unread = False - super(TestMaildirPreserveUnread, self).tearDown() - -class TestMaildirAll(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildirAll, self).setUp() - archivemail.options.quiet = True - archivemail.options.archive_all = True - - def testNew(self): - """New maildir messages should be archived with --all""" - self.add_messages(hours_old=24*181) - md = mailbox.Maildir(self.orig_maildir_obj.root) - msg_obj = md.next() - assert archivemail.should_archive(msg_obj) - - def testOld(self): - """Old maildir messages should be archived with --all""" - self.add_messages(hours_old=24*179) - md = mailbox.Maildir(self.orig_maildir_obj.root) - msg_obj = md.next() - assert archivemail.should_archive(msg_obj) - - def tearDown(self): - super(TestMaildirAll, self).tearDown() - archivemail.options.quiet = False - archivemail.options.archive_all = False - -class TestMaildirDryRun(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildirDryRun, self).setUp() - archivemail.options.quiet = True - archivemail.options.dry_run = True - - def testOld(self): - """archiving an old maildir mailbox with the 'dry-run' option""" - self.make_maildir(True, False) - archivemail.archive(self.maildir) - self.verify() - - def tearDown(self): - super(TestMaildirDryRun, self).tearDown() - archivemail.options.quiet = False - archivemail.options.dry_run = False - -class TestMaildirDelete(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildirDelete, self).setUp() - archivemail.options.quiet = True - archivemail.options.delete_old_mail = True - - def testOld(self): - """archiving an old maildir mailbox with the 'delete' option""" - self.make_maildir(True, False) - archivemail.archive(self.maildir) - self.verify() - - def testNew(self): - """archiving a new maildir mailbox with the 'delete' option""" - self.make_maildir(False, True) - archivemail.archive(self.maildir) - self.verify() - - def tearDown(self): - super(TestMaildirDelete, self).tearDown() - archivemail.options.quiet = False - archivemail.options.delete_old_mail = False - -class TestMaildirCopy(TestArchiveMailboxdir): - def setUp(self): - super(TestMaildirCopy, self).setUp() - archivemail.options.quiet = True - archivemail.options.copy_old_mail = True - - def testOld(self): - """archiving an old maildir mailbox with the 'copy' option""" - self.make_maildir(True, False) - archivemail.archive(self.maildir) - self.verify() - - def testNew(self): - """archiving a new maildir mailbox with the 'copy' option""" - self.make_maildir(False, True) - archivemail.archive(self.maildir) - self.verify() - - def tearDown(self): - super(TestMaildirCopy, self).tearDown() - archivemail.options.quiet = False - archivemail.options.copy_old_mail = False - -class TestArchiveMaildirFlagged(TestCaseInTempdir): - """make sure the 'include_flagged' option works with maildir messages""" - def setUp(self): - super(TestArchiveMaildirFlagged, self).setUp() - archivemail.options.include_flagged = False - archivemail.options.quiet = True - - def testOld(self): - """by default, old flagged maildir messages should not be archived""" - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*181) - smd.write(msg, new=False, flags='F') - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert not archivemail.should_archive(msg_obj) - - def testIncludeFlaggedNew(self): - """new flagged maildir messages should not be archived with include_flagged""" - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*179) - smd.write(msg, new=False, flags='F') - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert not archivemail.should_archive(msg_obj) - - def testIncludeFlaggedOld(self): - """old flagged maildir messages should be archived with include_flagged""" - archivemail.options.include_flagged = True - smd = SimpleMaildir("orig") - msg = make_message(hours_old=24*181) - smd.write(msg, new=False, flags='F') - md = mailbox.Maildir(smd.root) - msg_obj = md.next() - assert archivemail.should_archive(msg_obj) - - def tearDown(self): - super(TestArchiveMaildirFlagged, self).tearDown() - archivemail.options.include_flagged = False - archivemail.options.quiet = False - -class TestArchiveMaildirSize(TestCaseInTempdir): - """check that the 'size' argument works with maildir messages""" - def setUp(self): - super(TestArchiveMaildirSize, self).setUp() - archivemail.options.quiet = True - msg = make_message(hours_old=24*181) - self.msg_size = len(msg) - smd = SimpleMaildir("orig") - smd.write(msg, new=False) - md = mailbox.Maildir(smd.root) - self.msg_obj = md.next() - - def testSmaller(self): - """giving a size argument smaller than the maildir message""" - archivemail.options.min_size = self.msg_size - 1 - assert archivemail.should_archive(self.msg_obj) - - def testBigger(self): - """giving a size argument bigger than the maildir message""" - archivemail.options.min_size = self.msg_size + 1 - assert not archivemail.should_archive(self.msg_obj) - - def tearDown(self): - super(TestArchiveMaildirSize, self).tearDown() - archivemail.options.quiet = False - archivemail.options.min_size = None - -########## helper routines ############ - -def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False): - headers = copy.copy(default_headers) - if not headers: - headers = {} - headers['Message-Id'] = make_msgid() - if not headers.has_key('Date'): - time_message = time.time() - (60 * 60 * hours_old) - headers['Date'] = time.asctime(time.localtime(time_message)) - if not headers.has_key('From'): - headers['From'] = "sender@dummy.domain" - if not headers.has_key('To'): - headers['To'] = "receipient@dummy.domain" - if not headers.has_key('Subject'): - headers['Subject'] = "This is the subject" - if mkfrom and not headers.has_key('From_'): - headers['From_'] = "%s %s" % (headers['From'], headers['Date']) - if not body: - body = "This is the message body" - - msg = "" - if headers.has_key('From_'): - msg = msg + ("From %s\n" % headers['From_']) - del headers['From_'] - for key in headers.keys(): - if headers[key] is not None: - msg = msg + ("%s: %s\n" % (key, headers[key])) - msg = msg + "\n\n" + body + "\n\n" - if not wantobj: - return msg - fp = cStringIO.StringIO(msg) - return rfc822.Message(fp) - -def append_file(source, dest): - """appends the file named 'source' to the file named 'dest'""" - assert os.path.isfile(source) - assert os.path.isfile(dest) - read = open(source, "r") - write = open(dest, "a+") - shutil.copyfileobj(read,write) - read.close() - write.close() - - -def make_mbox(body=None, headers=None, hours_old=0, messages=1): - assert tempfile.tempdir - fd, name = tempfile.mkstemp() - file = os.fdopen(fd, "w") - for count in range(messages): - msg = make_message(body=body, default_headers=headers, - mkfrom=True, hours_old=hours_old) - file.write(msg) - file.close() - return name - -def make_archive_and_plain_copy(archive_name): - """Make an mbox archive of the given name like archivemail may have - created it. Also make an uncompressed copy of this archive and return its - name.""" - copy_fd, copy_name = tempfile.mkstemp() - copy_fp = os.fdopen(copy_fd, "w") - if archivemail.options.no_compress: - fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) - fp = os.fdopen(fd, "w") - else: - archive_name += ".gz" - fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) - rawfp = os.fdopen(fd, "w") - fp = gzip.GzipFile(fileobj=rawfp) - for count in range(3): - msg = make_message(hours_old=24*360) - fp.write(msg) - copy_fp.write(msg) - fp.close() - copy_fp.close() - if not archivemail.options.no_compress: - rawfp.close() - return copy_name - -def copy_maildir(maildir, prefix="tmp"): - """Create a copy of the given maildir and return the absolute path of the - new direcory.""" - newdir = tempfile.mkdtemp(prefix=prefix) - for d in "cur", "new", "tmp": - shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d)) - return newdir - -def assertEqualContent(firstfile, secondfile, zippedfirst=False): - """Verify that the two files exist and have identical content. If zippedfirst - is True, assume that firstfile is gzip-compressed.""" - assert os.path.exists(firstfile) - assert os.path.exists(secondfile) - if zippedfirst: - try: - fp1 = gzip.GzipFile(firstfile, "r") - fp2 = open(secondfile, "r") - assert cmp_fileobj(fp1, fp2) - finally: - fp1.close() - fp2.close() - else: - assert filecmp.cmp(firstfile, secondfile, shallow=0) - -def cmp_fileobj(fp1, fp2): - """Return if reading the fileobjects yields identical content.""" - bufsize = 8192 - while True: - b1 = fp1.read(bufsize) - b2 = fp2.read(bufsize) - if b1 != b2: - return False - if not b1: - return True - -if __name__ == "__main__": - unittest.main() -- cgit v1.2.3