#! /usr/bin/env python ############################################################################ # Copyright (C) 2002 Paul Rodger # (C) 2006-2008 Nikolaus Schulz # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ############################################################################ """ Unit-test archivemail using 'PyUnit'. TODO: add tests for: * dotlock locks already existing * archiving maildir-format mailboxes * archiving MH-format mailboxes * preservation of status information from maildir to mbox * a 3rd party process changing the mbox file being read """ import sys def check_python_version(): """Abort if we are running on python < v2.3""" too_old_error = "This test script requires python version 2.3 or later. " + \ "Your version of python is:\n%s" % sys.version try: version = sys.version_info # we might not even have this function! :) if (version[0] < 2) or (version[0] == 2 and version[1] < 3): print too_old_error sys.exit(1) except AttributeError: print too_old_error sys.exit(1) # define & run this early because 'unittest' requires Python >= 2.1 check_python_version() import copy import fcntl import filecmp import os import re import shutil import stat import tempfile import time import unittest import gzip import cStringIO import rfc822 import mailbox try: import archivemail except ImportError: print "The archivemail script needs to be called 'archivemail.py'" print "and should be in the current directory in order to be imported" print "and tested. Sorry." if os.path.isfile("archivemail"): print "Try renaming it from 'archivemail' to 'archivemail.py'." sys.exit(1) # We want to iterate over messages in a compressed archive mbox and verify # them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in # Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered # by mailbox._PartialFile.seek(). The bug is still pending as of Python # 2.5.2. To work around it, we subclass gzip.GzipFile. # # It should be noted that seeking backwards in a GzipFile is emulated by # re-reading the entire file from the beginning, which is extremely # inefficient and won't work with large files; but our test archives are all # small, so it's okay. class FixedGzipFile(gzip.GzipFile): """GzipFile with seek method accepting whence parameter.""" def seek(self, offset, whence=0): try: gzip.GzipFile.seek(self, offset, whence) except TypeError: if whence: if whence == 1: offset = self.offset + offset else: raise ValueError('Seek from end not supported') gzip.GzipFile.seek(self, offset) # precision of os.utime() when restoring mbox timestamps utimes_precision = 5 class MessageIdFactory: """Factory to create `uniqe' message-ids.""" def __init__(self): self.seq = 0 def __call__(self): self.seq += 1 return "" % self.seq make_msgid = MessageIdFactory() class IndexedMailboxDir: """An indexed mailbox directory, providing random message access by message-id. Intended as a base class for a maildir and an mh subclass.""" def __init__(self, mdir_name): assert tempfile.tempdir self.root = tempfile.mkdtemp(prefix=mdir_name) self.msg_id_dict = {} self.deliveries = 0 def _add_to_index(self, msg_text, fpath): """Add the given message to the index, for later random access.""" # Extract the message-id as index key msg_id = None fp = cStringIO.StringIO(msg_text) while True: line = fp.readline() # line empty means we didn't find a message-id assert line if line.lower().startswith("message-id:"): msg_id = line.split(":", 1)[-1].strip() assert msg_id break assert not self.msg_id_dict.has_key(msg_id) self.msg_id_dict[msg_id] = fpath def get_all_filenames(self): """Return all relative pathnames of files in this mailbox.""" return self.msg_id_dict.values() class SimpleMaildir(IndexedMailboxDir): """Primitive Maildir class, just good enough for generating short-lived test maildirs.""" def __init__(self, mdir_name='maildir'): IndexedMailboxDir.__init__(self, mdir_name) for d in "cur", "tmp", "new": os.mkdir(os.path.join(self.root, d)) def write(self, msg_str, new=True, flags=[]): """Store a message with the given flags.""" assert not (new and flags) if new: subdir = "new" else: subdir = "cur" fname = self._mkname(new, flags) relpath = os.path.join(subdir, fname) path = os.path.join(self.root, relpath) assert not os.path.exists(path) f = open(path, "w") f.write(msg_str) f.close() self._add_to_index(msg_str, relpath) def _mkname(self, new, flags): """Generate a unique filename for a new message.""" validflags = 'DFPRST' for f in flags: assert f in validflags # This 'unique' name should be good enough, since nobody else # will ever write messages to this maildir folder. uniq = str(self.deliveries) self.deliveries += 1 if new: return uniq if not flags: return uniq + ':2,' finfo = "".join(sorted(flags)) return uniq + ':2,' + finfo def get_message_and_mbox_status(self, msgid): """For the Message-Id msgid, return the matching message in text format and its status, expressed as a set of mbox flags.""" fpath = self.msg_id_dict[msgid] # Barfs if not found mdir_flags = fpath.rsplit('2,', 1)[-1] flagmap = { 'F': 'F', 'R': 'A', 'S': 'R' } mbox_flags = set([flagmap[x] for x in mdir_flags]) if fpath.startswith("cur/"): mbox_flags.add('O') fp = open(os.path.join(self.root, fpath), "r") msg = fp.read() fp.close() return msg, mbox_flags class TestCaseInTempdir(unittest.TestCase): """Base class for testcases that need to create temporary files. All testcases that create temporary files should be derived from this class, not directly from unittest.TestCase. TestCaseInTempdir provides these methods: setUp() Creates a safe temporary directory and sets tempfile.tempdir. tearDown() Recursively removes the temporary directory and unsets tempfile.tempdir. Overriding methods should call the ones above.""" temproot = None def setUp(self): if not self.temproot: assert not tempfile.tempdir self.temproot = tempfile.tempdir = \ tempfile.mkdtemp(prefix="test-archivemail") def tearDown(self): assert tempfile.tempdir == self.temproot if self.temproot: shutil.rmtree(self.temproot) tempfile.tempdir = self.temproot = None ############ Mbox Class testing ############## class TestMboxDotlock(TestCaseInTempdir): def setUp(self): super(TestMboxDotlock, self).setUp() self.mbox_name = make_mbox() self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.mbox = archivemail.Mbox(self.mbox_name) def testDotlock(self): """dotlock_lock/unlock should create/delete a lockfile""" lock = self.mbox_name + ".lock" self.mbox._dotlock_lock() assert os.path.isfile(lock) self.mbox._dotlock_unlock() assert not os.path.isfile(lock) def testDotlockingSucceedsUponEACCES(self): """A dotlock should silently be omitted upon EACCES.""" archivemail.options.quiet = True mbox_dir = os.path.dirname(self.mbox_name) os.chmod(mbox_dir, 0500) try: self.mbox._dotlock_lock() finally: os.chmod(mbox_dir, 0700) archivemail.options.quiet = False class TestMboxPosixLock(TestCaseInTempdir): def setUp(self): super(TestMboxPosixLock, self).setUp() self.mbox_name = make_mbox() self.mbox = archivemail.Mbox(self.mbox_name) def testPosixLock(self): """posix_lock/unlock should create/delete an advisory lock""" # The following code snippet heavily lends from the Python 2.5 mailbox # unittest. # BEGIN robbery: # Fork off a subprocess that will lock the file for 2 seconds, # unlock it, and then exit. pid = os.fork() if pid == 0: # In the child, lock the mailbox. self.mbox._posix_lock() time.sleep(2) self.mbox._posix_unlock() os._exit(0) # In the parent, sleep a bit to give the child time to acquire # the lock. time.sleep(0.5) # The parent's file self.mbox.mbox_file shares fcntl locks with the # duplicated FD in the child; reopen it so we get a different file # table entry. file = open(self.mbox_name, "r+") lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB fd = file.fileno() try: self.assertRaises(IOError, fcntl.lockf, fd, lock_nb) finally: # Wait for child to exit. Locking should now succeed. exited_pid, status = os.waitpid(pid, 0) fcntl.lockf(fd, lock_nb) fcntl.lockf(fd, fcntl.LOCK_UN) # END robbery class TestMboxNext(TestCaseInTempdir): def setUp(self): super(TestMboxNext, self).setUp() self.not_empty_name = make_mbox(messages=18) self.empty_name = make_mbox(messages=0) def testNextEmpty(self): """mbox.next() should return None on an empty mailbox""" mbox = archivemail.Mbox(self.empty_name) msg = mbox.next() self.assertEqual(msg, None) def testNextNotEmpty(self): """mbox.next() should a message on a populated mailbox""" mbox = archivemail.Mbox(self.not_empty_name) for count in range(18): msg = mbox.next() assert msg msg = mbox.next() self.assertEqual(msg, None) ############ TempMbox Class testing ############## class TestTempMboxWrite(TestCaseInTempdir): def setUp(self): super(TestTempMboxWrite, self).setUp() def testWrite(self): """mbox.write() should append messages to a mbox mailbox""" read_file = make_mbox(messages=3) mbox_read = archivemail.Mbox(read_file) mbox_write = archivemail.TempMbox() write_file = mbox_write.mbox_file_name for count in range(3): msg = mbox_read.next() mbox_write.write(msg) mbox_read.close() mbox_write.close() assert filecmp.cmp(read_file, write_file, shallow=0) def testWriteNone(self): """calling mbox.write() with no message should raise AssertionError""" write = archivemail.TempMbox() self.assertRaises(AssertionError, write.write, None) class TestTempMboxRemove(TestCaseInTempdir): def setUp(self): super(TestTempMboxRemove, self).setUp() self.mbox = archivemail.TempMbox() self.mbox_name = self.mbox.mbox_file_name def testMboxRemove(self): """remove() should delete a mbox mailbox""" assert os.path.exists(self.mbox_name) self.mbox.remove() assert not os.path.exists(self.mbox_name) ########## options class testing ################# class TestOptionDefaults(unittest.TestCase): def testVerbose(self): """verbose should be off by default""" self.assertEqual(archivemail.options.verbose, False) def testDaysOldMax(self): """default archival time should be 180 days""" self.assertEqual(archivemail.options.days_old_max, 180) def testQuiet(self): """quiet should be off by default""" self.assertEqual(archivemail.options.quiet, False) def testDeleteOldMail(self): """we should not delete old mail by default""" self.assertEqual(archivemail.options.delete_old_mail, False) def testNoCompress(self): """no-compression should be off by default""" self.assertEqual(archivemail.options.no_compress, False) def testIncludeFlagged(self): """we should not archive flagged messages by default""" self.assertEqual(archivemail.options.include_flagged, False) def testPreserveUnread(self): """we should not preserve unread messages by default""" self.assertEqual(archivemail.options.preserve_unread, False) class TestOptionParser(unittest.TestCase): def setUp(self): self.oldopts = copy.copy(archivemail.options) def testOptionDate(self): """--date and -D options are parsed correctly""" date_formats = ( "%Y-%m-%d", # ISO format "%d %b %Y" , # Internet format "%d %B %Y" , # Internet format with full month names ) date = time.strptime("2000-07-29", "%Y-%m-%d") unixdate = time.mktime(date) for df in date_formats: d = time.strftime(df, date) for opt in '-D', '--date=': archivemail.options.date_old_max = None archivemail.options.parse_args([opt+d], "") self.assertEqual(unixdate, archivemail.options.date_old_max) def testOptionPreserveUnread(self): """--preserve-unread option is parsed correctly""" archivemail.options.parse_args(["--preserve-unread"], "") assert archivemail.options.preserve_unread archivemail.options.preserve_unread = False archivemail.options.parse_args(["-u"], "") assert archivemail.options.preserve_unread def testOptionSuffix(self): """--suffix and -s options are parsed correctly""" for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): archivemail.options.parse_args(["--suffix="+suffix], "") assert archivemail.options.archive_suffix == suffix archivemail.options.suffix = None archivemail.options.parse_args(["-s", suffix], "") assert archivemail.options.archive_suffix == suffix def testOptionDryrun(self): """--dry-run option is parsed correctly""" archivemail.options.parse_args(["--dry-run"], "") assert archivemail.options.dry_run archivemail.options.preserve_unread = False archivemail.options.parse_args(["-n"], "") assert archivemail.options.dry_run def testOptionDays(self): """--days and -d options are parsed correctly""" archivemail.options.parse_args(["--days=11"], "") self.assertEqual(archivemail.options.days_old_max, 11) archivemail.options.days_old_max = None archivemail.options.parse_args(["-d11"], "") self.assertEqual(archivemail.options.days_old_max, 11) def testOptionDelete(self): """--delete option is parsed correctly""" archivemail.options.parse_args(["--delete"], "") assert archivemail.options.delete_old_mail def testOptionCopy(self): """--copy option is parsed correctly""" archivemail.options.parse_args(["--copy"], "") assert archivemail.options.copy_old_mail def testOptionOutputdir(self): """--output-dir and -o options are parsed correctly""" for path in "/just/some/path", "relative/path": archivemail.options.parse_args(["--output-dir=%s" % path], "") self.assertEqual(archivemail.options.output_dir, path) archivemail.options.output_dir = None archivemail.options.parse_args(["-o%s" % path], "") self.assertEqual(archivemail.options.output_dir, path) def testOptionNocompress(self): """--no-compress option is parsed correctly""" archivemail.options.parse_args(["--no-compress"], "") assert archivemail.options.no_compress def testOptionSize(self): """--size and -S options are parsed correctly""" size = "666" archivemail.options.parse_args(["--size=%s" % size ], "") self.assertEqual(archivemail.options.min_size, int(size)) archivemail.options.parse_args(["-S%s" % size ], "") self.assertEqual(archivemail.options.min_size, int(size)) def tearDown(self): archivemail.options = self.oldopts ########## archivemail.is_older_than_days() unit testing ################# class TestIsTooOld(unittest.TestCase): def testVeryOld(self): """with max_days=360, should be true for these dates > 1 year""" for years in range(1, 10): time_msg = time.time() - (years * 365 * 24 * 60 * 60) assert archivemail.is_older_than_days(time_message=time_msg, max_days=360) def testOld(self): """with max_days=14, should be true for these dates > 14 days""" for days in range(14, 360): time_msg = time.time() - (days * 24 * 60 * 60) assert archivemail.is_older_than_days(time_message=time_msg, max_days=14) def testJustOld(self): """with max_days=1, should be true for these dates >= 1 day""" for minutes in range(0, 61): time_msg = time.time() - (25 * 60 * 60) + (minutes * 60) assert archivemail.is_older_than_days(time_message=time_msg, max_days=1) def testNotOld(self): """with max_days=9, should be false for these dates < 9 days""" for days in range(0, 9): time_msg = time.time() - (days * 24 * 60 * 60) assert not archivemail.is_older_than_days(time_message=time_msg, max_days=9) def testJustNotOld(self): """with max_days=1, should be false for these hours <= 1 day""" for minutes in range(0, 60): time_msg = time.time() - (23 * 60 * 60) - (minutes * 60) assert not archivemail.is_older_than_days(time_message=time_msg, max_days=1) def testFuture(self): """with max_days=1, should be false for times in the future""" for minutes in range(0, 60): time_msg = time.time() + (minutes * 60) assert not archivemail.is_older_than_days(time_message=time_msg, max_days=1) ########## archivemail.parse_imap_url() unit testing ################# class TestParseIMAPUrl(unittest.TestCase): def setUp(self): archivemail.options.quiet = True archivemail.options.verbose = False archivemail.options.pwfile = None urls_withoutpass = [ ('imaps://user@example.org@imap.example.org/upperbox/lowerbox', ('user', None, 'example.org@imap.example.org', 'upperbox/lowerbox')), ('imaps://"user@example.org"@imap.example.org/upperbox/lowerbox', ('user@example.org', None, 'imap.example.org', 'upperbox/lowerbox')), ('imaps://user@example.org"@imap.example.org/upperbox/lowerbox', ('user', None, 'example.org"@imap.example.org', 'upperbox/lowerbox')), ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox', ('"user', None, 'example.org@imap.example.org', 'upperbox/lowerbox')), ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox', ('us"er@example.org', None, 'imap.example.org', 'upperbox/lowerbox')), ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox', ('user\\', None, 'example.org@imap.example.org', 'upperbox/lowerbox')) ] urls_withpass = [ ('imaps://user@example.org:passwd@imap.example.org/upperbox/lowerbox', ('user@example.org', 'passwd', 'imap.example.org', 'upperbox/lowerbox'), ('user', None, 'example.org:passwd@imap.example.org', 'upperbox/lowerbox')), ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox', ('"user@example.org', "passwd", 'imap.example.org', 'upperbox/lowerbox'), ('"user', None, 'example.org:passwd@imap.example.org', 'upperbox/lowerbox')), ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox', ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org', 'upperbox/lowerbox'), ('u\\ser\\', None, 'example.org:"p@sswd"@imap.example.org', 'upperbox/lowerbox')) ] # These are invalid when the password's not stripped. urls_onlywithpass = [ ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox', ('user@example.org', "passwd", 'imap.example.org', 'upperbox/lowerbox')) ] def testUrlsWithoutPwfile(self): """Parse test urls with --pwfile option unset. This parses a password in the URL, if present.""" archivemail.options.pwfile = None for mbstr in self.urls_withpass + self.urls_withoutpass: url = mbstr[0][mbstr[0].find('://')+3:] result = archivemail.parse_imap_url(url) self.assertEqual(result, mbstr[1]) def testUrlsWithPwfile(self): """Parse test urls with --pwfile set. In this case the ':' character loses its meaning as a delimiter.""" archivemail.options.pwfile = "whocares.txt" for mbstr in self.urls_withpass: url = mbstr[0][mbstr[0].find('://')+3:] result = archivemail.parse_imap_url(url) self.assertEqual(result, mbstr[2]) for mbstr in self.urls_onlywithpass: url = mbstr[0][mbstr[0].find('://')+3:] self.assertRaises(archivemail.UnexpectedError, archivemail.parse_imap_url, url) def tearDown(self): archivemail.options.quiet = False archivemail.options.verbose = False archivemail.options.pwfile = None ########## acceptance testing ########### class TestArchive(TestCaseInTempdir): """Base class defining helper functions for doing test archiving runs.""" mbox = None # mbox file that will be processed by archivemail good_archive = None # Uncompressed reference archive file to verify the # archive after processing good_mbox = None # Reference mbox file to verify the mbox after processing def verify(self): assert os.path.exists(self.mbox) if self.good_mbox is not None: assertEqualContent(self.mbox, self.good_mbox) else: self.assertEqual(os.path.getsize(self.mbox), 0) archive_name = self.mbox + "_archive" if not archivemail.options.no_compress: archive_name += ".gz" iszipped = True else: assert not os.path.exists(archive_name + ".gz") iszipped = False if self.good_archive is not None: assertEqualContent(archive_name, self.good_archive, iszipped) else: assert not os.path.exists(archive_name) def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): """Prepare for a test run with an old mbox by making an old mbox, optionally an existing archive, and a reference archive to verify the archive after archivemail has run.""" self.mbox = make_mbox(body, headers, 181*24, messages) archive_does_change = not (archivemail.options.dry_run or archivemail.options.delete_old_mail) mbox_does_not_change = archivemail.options.dry_run or \ archivemail.options.copy_old_mail if make_old_archive: archive = archivemail.make_archive_name(self.mbox) self.good_archive = make_archive_and_plain_copy(archive) if archive_does_change: append_file(self.mbox, self.good_archive) elif archive_does_change: self.good_archive = tempfile.mkstemp()[1] shutil.copyfile(self.mbox, self.good_archive) if mbox_does_not_change: if archive_does_change and not make_old_archive: self.good_mbox = self.good_archive else: self.good_mbox = tempfile.mkstemp()[1] shutil.copyfile(self.mbox, self.good_mbox) def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): """Prepare for a test run with a mixed mbox by making a mixed mbox, optionally an existing archive, a reference archive to verify the archive after archivemail has run, and likewise a reference mbox to verify the mbox.""" self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive) new_mbox_name = make_mbox(body, headers, 179*24, messages) append_file(new_mbox_name, self.mbox) if self.good_mbox is None: self.good_mbox = new_mbox_name else: if self.good_mbox == self.good_archive: self.good_mbox = tempfile.mkstemp()[1] shutil.copyfile(self.mbox, self.good_mbox) else: append_file(new_mbox_name, self.good_mbox) def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False): """Prepare for a test run with a new mbox by making a new mbox, optionally an exiting archive, and a reference mbox to verify the mbox after archivemail has run.""" self.mbox = make_mbox(body, headers, 179*24, messages) self.good_mbox = tempfile.mkstemp()[1] shutil.copyfile(self.mbox, self.good_mbox) if make_old_archive: archive = archivemail.make_archive_name(self.mbox) self.good_archive = make_archive_and_plain_copy(archive) class TestArchiveMbox(TestArchive): """archiving should work based on the date of messages given""" def setUp(self): self.oldopts = copy.copy(archivemail.options) archivemail.options.quiet = True super(TestArchiveMbox, self).setUp() def testOld(self): """archiving an old mailbox""" self.make_old_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testOldFromInBody(self): """archiving an old mailbox with 'From ' in the body""" body = """This is a message with ^From at the start of a line From is on this line This is after the ^From line""" self.make_old_mbox(messages=3, body=body) archivemail.archive(self.mbox) self.verify() def testDateSystem(self): """test that the --date option works as expected""" test_headers = ( { 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', 'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', }, { 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', 'Date' : None, }, { 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', 'Date' : None, 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', }, { 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', 'Date' : None, 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', }, ) for headers in test_headers: msg = make_message(default_headers=headers, wantobj=True) date = time.strptime("2000-07-29", "%Y-%m-%d") archivemail.options.date_old_max = time.mktime(date) assert archivemail.should_archive(msg) date = time.strptime("2000-07-27", "%Y-%m-%d") archivemail.options.date_old_max = time.mktime(date) assert not archivemail.should_archive(msg) def testMixed(self): """archiving a mixed mailbox""" self.make_mixed_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testNew(self): """archiving a new mailbox""" self.make_new_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testOldExisting(self): """archiving an old mailbox with an existing archive""" self.make_old_mbox(messages=3, make_old_archive=True) archivemail.archive(self.mbox) self.verify() def testOldWeirdHeaders(self): """archiving old mailboxes with weird headers""" weird_headers = ( { # we should archive because of the date on the 'From_' line 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', 'Date' : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000', }, { # we should archive because of the date on the 'From_' line 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', 'Date' : None, }, { # we should archive because of the date in 'Delivery-date' 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', }, { # we should archive because of the date in 'Delivery-date' 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', 'Date' : None, 'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000', }, { # we should archive because of the date in 'Resent-Date' 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', 'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000', 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', }, { # we should archive because of the date in 'Resent-Date' 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030', 'Date' : None, 'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000', }, { # completely blank dates were crashing < version 0.4.7 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', 'Date' : '', }, { # completely blank dates were crashing < version 0.4.7 'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000', 'Date' : '', 'Resent-Date' : '', }, ) fd, self.mbox = tempfile.mkstemp() fp = os.fdopen(fd, "w") for headers in weird_headers: msg_text = make_message(default_headers=headers) fp.write(msg_text*2) fp.close() self.good_archive = tempfile.mkstemp()[1] shutil.copyfile(self.mbox, self.good_archive) archivemail.archive(self.mbox) self.verify() def tearDown(self): archivemail.options = self.oldopts super(TestArchiveMbox, self).tearDown() class TestArchiveMboxTimestamp(TestCaseInTempdir): """original mbox timestamps should always be preserved""" def setUp(self): super(TestArchiveMboxTimestamp, self).setUp() archivemail.options.quiet = True self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180)) self.mtime = os.path.getmtime(self.mbox_name) - 66 self.atime = os.path.getatime(self.mbox_name) - 88 os.utime(self.mbox_name, (self.atime, self.mtime)) def testNew(self): """mbox timestamps should not change after no archival""" archivemail.options.days_old_max = 181 archivemail.archive(self.mbox_name) self.verify() def testOld(self): """mbox timestamps should not change after archival""" archivemail.options.days_old_max = 179 archivemail.archive(self.mbox_name) self.verify() def verify(self): assert os.path.exists(self.mbox_name) new_atime = os.path.getatime(self.mbox_name) new_mtime = os.path.getmtime(self.mbox_name) self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision) self.assertAlmostEqual(self.atime, new_atime, utimes_precision) def tearDown(self): archivemail.options.quiet = False archivemail.options.days_old_max = 180 os.remove(self.mbox_name) super(TestArchiveMboxTimestamp, self).tearDown() class TestArchiveMboxAll(unittest.TestCase): def setUp(self): archivemail.options.quiet = True archivemail.options.archive_all = True def testNew(self): """new messages should be archived with --all""" self.msg = make_message(hours_old=24*179, wantobj=True) assert archivemail.should_archive(self.msg) def testOld(self): """old messages should be archived with --all""" self.msg = make_message(hours_old=24*181, wantobj=True) assert archivemail.should_archive(self.msg) def tearDown(self): archivemail.options.quiet = False archivemail.options.archive_all = False class TestArchiveMboxPreserveUnread(unittest.TestCase): """make sure the 'preserve_unread' option works""" def setUp(self): archivemail.options.quiet = True archivemail.options.preserve_unread = True self.msg = make_message(hours_old=24*181, wantobj=True) def testOldRead(self): """old read messages should be archived with --preserve-unread""" self.msg["Status"] = "RO" assert archivemail.should_archive(self.msg) def testOldUnread(self): """old unread messages should not be archived with --preserve-unread""" self.msg["Status"] = "O" assert not archivemail.should_archive(self.msg) def tearDown(self): archivemail.options.quiet = False archivemail.options.preserve_unread = False class TestArchiveMboxSuffix(unittest.TestCase): """make sure the 'suffix' option works""" def setUp(self): self.old_suffix = archivemail.options.archive_suffix archivemail.options.quiet = True def testSuffix(self): """archiving with specified --suffix arguments""" for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): mbox_name = "foobar" archivemail.options.archive_suffix = suffix days_old_max = 180 parsed_suffix_time = time.time() - days_old_max*24*60*60 parsed_suffix = time.strftime(suffix, time.localtime(parsed_suffix_time)) archive_name = mbox_name + parsed_suffix self.assertEqual(archive_name, archivemail.make_archive_name(mbox_name)) def tearDown(self): archivemail.options.quiet = False archivemail.options.archive_suffix = self.old_suffix class TestArchiveDryRun(TestArchive): """make sure the 'dry-run' option works""" def setUp(self): super(TestArchiveDryRun, self).setUp() archivemail.options.quiet = True archivemail.options.dry_run = True def testOld(self): """archiving an old mailbox with the 'dry-run' option""" self.make_old_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def tearDown(self): archivemail.options.dry_run = False archivemail.options.quiet = False super(TestArchiveDryRun, self).tearDown() class TestArchiveDelete(TestArchive): """make sure the 'delete' option works""" def setUp(self): super(TestArchiveDelete, self).setUp() archivemail.options.quiet = True archivemail.options.delete_old_mail = True def testNew(self): """archiving a new mailbox with the 'delete' option""" self.make_new_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testMixed(self): """archiving a mixed mailbox with the 'delete' option""" self.make_mixed_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testOld(self): """archiving an old mailbox with the 'delete' option""" self.make_old_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def tearDown(self): archivemail.options.delete_old_mail = False archivemail.options.quiet = False super(TestArchiveDelete, self).tearDown() class TestArchiveCopy(TestArchive): """make sure the 'copy' option works""" def setUp(self): super(TestArchiveCopy, self).setUp() archivemail.options.quiet = True archivemail.options.copy_old_mail = True def testNew(self): """archiving a new mailbox with the 'copy' option""" self.make_new_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testMixed(self): """archiving a mixed mailbox with the 'copy' option""" self.make_mixed_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testOld(self): """archiving an old mailbox with the 'copy' option""" self.make_old_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def tearDown(self): archivemail.options.copy_old_mail = False archivemail.options.quiet = False super(TestArchiveCopy, self).tearDown() class TestArchiveMboxFlagged(unittest.TestCase): """make sure the 'include_flagged' option works""" def setUp(self): archivemail.options.include_flagged = False archivemail.options.quiet = True def testOld(self): """by default, old flagged messages should not be archived""" msg = make_message(default_headers={"X-Status": "F"}, hours_old=24*181, wantobj=True) assert not archivemail.should_archive(msg) def testIncludeFlaggedNew(self): """new flagged messages should not be archived with include_flagged""" msg = make_message(default_headers={"X-Status": "F"}, hours_old=24*179, wantobj=True) assert not archivemail.should_archive(msg) def testIncludeFlaggedOld(self): """old flagged messages should be archived with include_flagged""" archivemail.options.include_flagged = True msg = make_message(default_headers={"X-Status": "F"}, hours_old=24*181, wantobj=True) assert archivemail.should_archive(msg) def tearDown(self): archivemail.options.include_flagged = False archivemail.options.quiet = False class TestArchiveMboxOutputDir(unittest.TestCase): """make sure that the 'output-dir' option works""" def setUp(self): archivemail.options.quiet = True def testOld(self): """archiving an old mailbox with a sepecified output dir""" for dir in "/just/a/path", "relative/path": archivemail.options.output_dir = dir archive_dir = archivemail.make_archive_name("/tmp/mbox") self.assertEqual(dir, os.path.dirname(archive_dir)) def tearDown(self): archivemail.options.quiet = False archivemail.options.output_dir = None class TestArchiveMboxUncompressed(TestArchive): """make sure that the 'no_compress' option works""" mbox_name = None new_mbox = None old_mbox = None copy_name = None def setUp(self): archivemail.options.quiet = True archivemail.options.no_compress = True super(TestArchiveMboxUncompressed, self).setUp() def testOld(self): """archiving an old mailbox uncompressed""" self.make_old_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testNew(self): """archiving a new mailbox uncompressed""" self.make_new_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testMixed(self): """archiving a mixed mailbox uncompressed""" self.make_mixed_mbox(messages=3) archivemail.archive(self.mbox) self.verify() def testOldExists(self): """archiving an old mailbox uncopressed with an existing archive""" self.make_old_mbox(messages=3, make_old_archive=True) archivemail.archive(self.mbox) self.verify() def tearDown(self): archivemail.options.quiet = False archivemail.options.no_compress = False super(TestArchiveMboxUncompressed, self).tearDown() class TestArchiveSize(unittest.TestCase): """check that the 'size' argument works""" def setUp(self): archivemail.options.quiet = True msg_text = make_message(hours_old=24*181) self.msg_size = len(msg_text) fp = cStringIO.StringIO(msg_text) self.msg = rfc822.Message(fp) def testSmaller(self): """giving a size argument smaller than the message""" archivemail.options.min_size = self.msg_size - 1 assert archivemail.should_archive(self.msg) def testBigger(self): """giving a size argument bigger than the message""" archivemail.options.min_size = self.msg_size + 1 assert not archivemail.should_archive(self.msg) def tearDown(self): archivemail.options.quiet = False archivemail.options.min_size = None ############# Test archiving maildirs ############### class TestArchiveMailboxdir(TestCaseInTempdir): """Base class defining helper functions for doing test archive runs with maildirs.""" maildir = None # Maildir that will be processed by archivemail orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object remaining_msg = set() # Filenames of maildir messages that should be preserved number_archived = 0 # Number of messages that get archived orig_archive = None # An uncompressed copy of a pre-existing archive, # if one exists def setUp(self): super(TestArchiveMailboxdir, self).setUp() self.orig_maildir_obj = SimpleMaildir() def verify(self): self._verify_remaining() self._verify_archive() def _verify_remaining(self): """Verify that the preserved messages weren't altered.""" assert self.maildir # Compare maildir with backup object. dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root) # Top-level has only directories cur, new, tmp and must be unchanged. self.assertEqual(dcmp.left_list, dcmp.right_list) found = set() for d in dcmp.common_dirs: dcmp2 = dcmp.subdirs[d] # We need to verify three things. # 1. directory is a subset of the original... assert not dcmp2.left_only # 2. all common files are identical... self.assertEqual(dcmp2.common_files, dcmp2.same_files) found = found.union([os.path.join(d, x) for x in dcmp2.common_files]) # 3. exactly the `new' messages (recorded in self.remaining_msg) # were preserved. self.assertEqual(found, self.remaining_msg) def _verify_archive(self): """Verify the archive correctness.""" # TODO: currently make_archive_name does not include the .gz suffix. # Is this something that should be fixed? archive = archivemail.make_archive_name(self.maildir) if archivemail.options.no_compress: iszipped = False else: archive += '.gz' iszipped = True if self.number_archived == 0: if self.orig_archive: assertEqualContent(archive, self.orig_archive, iszipped) else: assert not os.path.exists(archive) return fp_new = fp_archive = tmp_archive_name = None try: if self.orig_archive: new_size = os.path.getsize(archive) # Brute force: split archive in old and new part and verify the # parts separately. (Of course this destroys the archive.) fp_archive = open(archive, "r+") fp_archive.seek(self.orig_archive_size) fd, tmp_archive_name = tempfile.mkstemp() fp_new = os.fdopen(fd, "w") shutil.copyfileobj(fp_archive, fp_new) fp_new.close() fp_archive.truncate(self.orig_archive_size) fp_archive.close() assertEqualContent(archive, self.orig_archive, iszipped) new_archive = tmp_archive_name else: new_archive = archive if archivemail.options.no_compress: fp_archive = open(new_archive, "r") else: fp_archive = FixedGzipFile(new_archive, "r") mb = mailbox.UnixMailbox(fp_archive) found = 0 for msg in mb: self.verify_maildir_has_msg(self.orig_maildir_obj, msg) found += 1 self.assertEqual(found, self.number_archived) finally: if tmp_archive_name: os.remove(tmp_archive_name) if fp_new is not None: fp_new.close() if fp_archive is not None: fp_archive.close() def verify_maildir_has_msg(self, maildir, msg): """Assert that the given maildir has a copy of the rfc822 message.""" mid = msg['Message-Id'] # Complains if there is no message-id mdir_msg_str, mdir_flags = \ maildir.get_message_and_mbox_status(mid) mbox_flags = set(msg.get('status', '') + msg.get('x-status', '')) self.assertEqual(mdir_flags, mbox_flags) headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'), msg.headers) headers = "".join(headers) msg.rewindbody() # Discard last mbox LF which is not part of the message. body = msg.fp.read()[:-1] msg_str = headers + os.linesep + body self.assertEqual(mdir_msg_str, msg_str) def add_messages(self, body=None, headers=None, hours_old=0, messages=1): for count in range(messages): msg = make_message(body, default_headers=headers, mkfrom=False, hours_old=hours_old) self.orig_maildir_obj.write(msg, new=False) def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1, make_old_archive=False): mailbox_does_change = not (archivemail.options.dry_run or archivemail.options.copy_old_mail) archive_does_change = not (archivemail.options.dry_run or archivemail.options.delete_old_mail) if mknew: self.add_messages(body, headers, 179*24, messages) if archive_does_change and archivemail.options.archive_all: self.number_archived += messages if mailbox_does_change: self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) if mkold: self.add_messages(body, headers, 181*24, messages) if archive_does_change: self.number_archived += messages if not mailbox_does_change: self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) self.maildir = copy_maildir(self.orig_maildir_obj.root) if make_old_archive: archive = archivemail.make_archive_name(self.maildir) self.orig_archive = make_archive_and_plain_copy(archive) # FIXME: .gz extension handling is a mess II if not archivemail.options.no_compress: archive += '.gz' self.orig_archive_size = os.path.getsize(archive) class TestEmptyMaildir(TestCaseInTempdir): def setUp(self): super(TestEmptyMaildir, self).setUp() archivemail.options.quiet = True def testEmpty(self): """Archiving an empty maildir should not result in an archive.""" self.mdir = SimpleMaildir() archivemail.archive(self.mdir.root) assert not os.path.exists(self.mdir.root + '_archive.gz') def tearDown(self): super(TestEmptyMaildir, self).tearDown() archivemail.options.quiet = False class TestMaildir(TestArchiveMailboxdir): def setUp(self): super(TestMaildir, self).setUp() archivemail.options.quiet = True def testOld(self): self.make_maildir(True, False, messages=3) archivemail.archive(self.maildir) self.verify() def testNew(self): self.make_maildir(False, True, messages=3) archivemail.archive(self.maildir) self.verify() def testMixed(self): self.make_maildir(True, True, messages=3) archivemail.archive(self.maildir) self.verify() def testMixedExisting(self): self.make_maildir(True, True, messages=3, make_old_archive=True) archivemail.archive(self.maildir) self.verify() def tearDown(self): archivemail.options.quiet = False super(TestMaildir, self).tearDown() class TestMaildirPreserveUnread(TestCaseInTempdir): """Test if the preserve_unread option works with maildirs.""" def setUp(self): super(TestMaildirPreserveUnread, self).setUp() archivemail.options.quiet = True archivemail.options.preserve_unread = True def testOldRead(self): """--preserve-unread archives old read messages in a maildir.""" smd = SimpleMaildir("orig") msg = make_message(hours_old=24*181) smd.write(msg, new=False, flags='S') md = mailbox.Maildir(smd.root) msg_obj = md.next() assert archivemail.should_archive(msg_obj) def testOldUnread(self): """--preserve-unread preserves old unread messages in a maildir.""" smd = SimpleMaildir("orig") msg = make_message(hours_old=24*181) smd.write(msg, new=False) md = mailbox.Maildir(smd.root) msg_obj = md.next() assert not archivemail.should_archive(msg_obj) def tearDown(self): archivemail.options.quiet = False archivemail.options.preserve_unread = False super(TestMaildirPreserveUnread, self).tearDown() class TestMaildirAll(TestArchiveMailboxdir): def setUp(self): super(TestMaildirAll, self).setUp() archivemail.options.quiet = True archivemail.options.archive_all = True def testNew(self): """New maildir messages should be archived with --all""" self.add_messages(hours_old=24*181) md = mailbox.Maildir(self.orig_maildir_obj.root) msg_obj = md.next() assert archivemail.should_archive(msg_obj) def testOld(self): """Old maildir messages should be archived with --all""" self.add_messages(hours_old=24*179) md = mailbox.Maildir(self.orig_maildir_obj.root) msg_obj = md.next() assert archivemail.should_archive(msg_obj) def tearDown(self): super(TestMaildirAll, self).tearDown() archivemail.options.quiet = False archivemail.options.archive_all = False class TestMaildirDryRun(TestArchiveMailboxdir): def setUp(self): super(TestMaildirDryRun, self).setUp() archivemail.options.quiet = True archivemail.options.dry_run = True def testOld(self): """archiving an old maildir mailbox with the 'dry-run' option""" self.make_maildir(True, False) archivemail.archive(self.maildir) self.verify() def tearDown(self): super(TestMaildirDryRun, self).tearDown() archivemail.options.quiet = False archivemail.options.dry_run = False class TestMaildirDelete(TestArchiveMailboxdir): def setUp(self): super(TestMaildirDelete, self).setUp() archivemail.options.quiet = True archivemail.options.delete_old_mail = True def testOld(self): """archiving an old maildir mailbox with the 'delete' option""" self.make_maildir(True, False) archivemail.archive(self.maildir) self.verify() def testNew(self): """archiving a new maildir mailbox with the 'delete' option""" self.make_maildir(False, True) archivemail.archive(self.maildir) self.verify() def tearDown(self): super(TestMaildirDelete, self).tearDown() archivemail.options.quiet = False archivemail.options.delete_old_mail = False class TestMaildirCopy(TestArchiveMailboxdir): def setUp(self): super(TestMaildirCopy, self).setUp() archivemail.options.quiet = True archivemail.options.copy_old_mail = True def testOld(self): """archiving an old maildir mailbox with the 'copy' option""" self.make_maildir(True, False) archivemail.archive(self.maildir) self.verify() def testNew(self): """archiving a new maildir mailbox with the 'copy' option""" self.make_maildir(False, True) archivemail.archive(self.maildir) self.verify() def tearDown(self): super(TestMaildirCopy, self).tearDown() archivemail.options.quiet = False archivemail.options.copy_old_mail = False class TestArchiveMaildirFlagged(TestCaseInTempdir): """make sure the 'include_flagged' option works with maildir messages""" def setUp(self): super(TestArchiveMaildirFlagged, self).setUp() archivemail.options.include_flagged = False archivemail.options.quiet = True def testOld(self): """by default, old flagged maildir messages should not be archived""" smd = SimpleMaildir("orig") msg = make_message(hours_old=24*181) smd.write(msg, new=False, flags='F') md = mailbox.Maildir(smd.root) msg_obj = md.next() assert not archivemail.should_archive(msg_obj) def testIncludeFlaggedNew(self): """new flagged maildir messages should not be archived with include_flagged""" smd = SimpleMaildir("orig") msg = make_message(hours_old=24*179) smd.write(msg, new=False, flags='F') md = mailbox.Maildir(smd.root) msg_obj = md.next() assert not archivemail.should_archive(msg_obj) def testIncludeFlaggedOld(self): """old flagged maildir messages should be archived with include_flagged""" archivemail.options.include_flagged = True smd = SimpleMaildir("orig") msg = make_message(hours_old=24*181) smd.write(msg, new=False, flags='F') md = mailbox.Maildir(smd.root) msg_obj = md.next() assert archivemail.should_archive(msg_obj) def tearDown(self): super(TestArchiveMaildirFlagged, self).tearDown() archivemail.options.include_flagged = False archivemail.options.quiet = False class TestArchiveMaildirSize(TestCaseInTempdir): """check that the 'size' argument works with maildir messages""" def setUp(self): super(TestArchiveMaildirSize, self).setUp() archivemail.options.quiet = True msg = make_message(hours_old=24*181) self.msg_size = len(msg) smd = SimpleMaildir("orig") smd.write(msg, new=False) md = mailbox.Maildir(smd.root) self.msg_obj = md.next() def testSmaller(self): """giving a size argument smaller than the maildir message""" archivemail.options.min_size = self.msg_size - 1 assert archivemail.should_archive(self.msg_obj) def testBigger(self): """giving a size argument bigger than the maildir message""" archivemail.options.min_size = self.msg_size + 1 assert not archivemail.should_archive(self.msg_obj) def tearDown(self): super(TestArchiveMaildirSize, self).tearDown() archivemail.options.quiet = False archivemail.options.min_size = None ########## helper routines ############ def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False): headers = copy.copy(default_headers) if not headers: headers = {} headers['Message-Id'] = make_msgid() if not headers.has_key('Date'): time_message = time.time() - (60 * 60 * hours_old) headers['Date'] = time.asctime(time.localtime(time_message)) if not headers.has_key('From'): headers['From'] = "sender@dummy.domain" if not headers.has_key('To'): headers['To'] = "receipient@dummy.domain" if not headers.has_key('Subject'): headers['Subject'] = "This is the subject" if mkfrom and not headers.has_key('From_'): headers['From_'] = "%s %s" % (headers['From'], headers['Date']) if not body: body = "This is the message body" msg = "" if headers.has_key('From_'): msg = msg + ("From %s\n" % headers['From_']) del headers['From_'] for key in headers.keys(): if headers[key] is not None: msg = msg + ("%s: %s\n" % (key, headers[key])) msg = msg + "\n\n" + body + "\n\n" if not wantobj: return msg fp = cStringIO.StringIO(msg) return rfc822.Message(fp) def append_file(source, dest): """appends the file named 'source' to the file named 'dest'""" assert os.path.isfile(source) assert os.path.isfile(dest) read = open(source, "r") write = open(dest, "a+") shutil.copyfileobj(read,write) read.close() write.close() def make_mbox(body=None, headers=None, hours_old=0, messages=1): assert tempfile.tempdir fd, name = tempfile.mkstemp() file = os.fdopen(fd, "w") for count in range(messages): msg = make_message(body=body, default_headers=headers, mkfrom=True, hours_old=hours_old) file.write(msg) file.close() return name def make_archive_and_plain_copy(archive_name): """Make an mbox archive of the given name like archivemail may have created it. Also make an uncompressed copy of this archive and return its name.""" copy_fd, copy_name = tempfile.mkstemp() copy_fp = os.fdopen(copy_fd, "w") if archivemail.options.no_compress: fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) fp = os.fdopen(fd, "w") else: archive_name += ".gz" fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) rawfp = os.fdopen(fd, "w") fp = gzip.GzipFile(fileobj=rawfp) for count in range(3): msg = make_message(hours_old=24*360) fp.write(msg) copy_fp.write(msg) fp.close() copy_fp.close() if not archivemail.options.no_compress: rawfp.close() return copy_name def copy_maildir(maildir, prefix="tmp"): """Create a copy of the given maildir and return the absolute path of the new direcory.""" newdir = tempfile.mkdtemp(prefix=prefix) for d in "cur", "new", "tmp": shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d)) return newdir def assertEqualContent(firstfile, secondfile, zippedfirst=False): """Verify that the two files exist and have identical content. If zippedfirst is True, assume that firstfile is gzip-compressed.""" assert os.path.exists(firstfile) assert os.path.exists(secondfile) if zippedfirst: try: fp1 = gzip.GzipFile(firstfile, "r") fp2 = open(secondfile, "r") assert cmp_fileobj(fp1, fp2) finally: fp1.close() fp2.close() else: assert filecmp.cmp(firstfile, secondfile, shallow=0) def cmp_fileobj(fp1, fp2): """Return if reading the fileobjects yields identical content.""" bufsize = 8192 while True: b1 = fp1.read(bufsize) b2 = fp2.read(bufsize) if b1 != b2: return False if not b1: return True if __name__ == "__main__": unittest.main()