aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xtest_archivemail.py174
1 files changed, 60 insertions, 114 deletions
diff --git a/test_archivemail.py b/test_archivemail.py
index 01f2c01..4fa827b 100755
--- a/test_archivemail.py
+++ b/test_archivemail.py
@@ -58,6 +58,7 @@ import stat
import tempfile
import time
import unittest
+import gzip
try:
import archivemail
@@ -413,11 +414,7 @@ class TestArchiveMbox(TestCaseInTempdir):
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(self.mbox_mode, new_mode)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def testOldFromInBody(self):
"""archiving an old mailbox with 'From ' in the body"""
@@ -431,11 +428,7 @@ This is after the ^From line"""
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def testDateSystem(self):
"""test that the --date option works as expected"""
@@ -470,11 +463,7 @@ This is after the ^From line"""
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
for option in ('--date=2000-07-27', '-D2000-07-27',
'--date="27 Jul 2000"', '--date="27 July 2000"'):
self.mbox_name = make_mbox(messages=3, headers=headers)
@@ -482,8 +471,7 @@ This is after the ^From line"""
shutil.copyfile(self.mbox_name, self.copy_name)
run = "./archivemail.py -q %s %s" % (option, self.mbox_name)
self.assertEqual(os.system(run), 0)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -502,14 +490,9 @@ This is after the ^From line"""
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.new_mbox)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
+ assertEqualContent(archive_name, self.old_mbox, zipfirst=True)
def testNew(self):
"""archiving a new mailbox"""
@@ -525,8 +508,7 @@ This is after the ^From line"""
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(self.mbox_mode, new_mode)
archive_name = self.mbox_name + "_archive.gz"
@@ -556,11 +538,7 @@ This is after the ^From line"""
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(self.mbox_mode, new_mode)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def testOldWeirdHeaders(self):
"""archiving old mailboxes with weird headers"""
@@ -611,11 +589,7 @@ This is after the ^From line"""
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def tearDown(self):
archivemail.options.quiet = 0
@@ -759,11 +733,7 @@ class TestArchiveMboxPreserveStatus(TestCaseInTempdir):
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def testOldUnread(self):
"""archiving an unread mailbox should not create an archive"""
@@ -782,8 +752,7 @@ class TestArchiveMboxPreserveStatus(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -829,11 +798,7 @@ class TestArchiveMboxSuffix(TestCaseInTempdir):
time.localtime(parsed_suffix_time))
archive_name = self.mbox_name + parsed_suffix + ".gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = re.sub("\.gz$", "", archive_name)
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
os.remove(archive_name)
def tearDown(self):
@@ -865,8 +830,7 @@ class TestArchiveDryRun(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -902,11 +866,7 @@ class TestArchiveDays(TestCaseInTempdir):
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def testNew(self):
"""specifying the 'days' option on a newer mailbox"""
@@ -925,8 +885,7 @@ class TestArchiveDays(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -961,8 +920,7 @@ class TestArchiveDelete(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -981,8 +939,7 @@ class TestArchiveDelete(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.new_mbox)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -1035,11 +992,8 @@ class TestArchiveCopy(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- # mbox must not have changed:
- assert(filecmp.cmp(self.mbox_name, self.mbox_backup_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.mbox_backup_name)
archive_name = self.mbox_name + "_archive.gz"
- # There is no archive.
assert(not os.path.exists(archive_name))
def testMixed(self):
@@ -1059,16 +1013,9 @@ class TestArchiveCopy(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- # mbox must not have changed:
- assert(filecmp.cmp(self.mbox_backup_name, self.mbox_name, shallow=0))
- # archive has the old messages.
+ assertEqualContent(self.mbox_name, self.mbox_backup_name)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
+ assertEqualContent(archive_name, self.old_mbox, zipfirst=True)
def testOld(self):
"""archiving an old mailbox with the 'copy' option"""
@@ -1083,16 +1030,9 @@ class TestArchiveCopy(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- # mbox must not have changed:
- assert(filecmp.cmp(self.mbox_backup_name, self.mbox_name, shallow=0))
- # archive has the old messages.
+ assertEqualContent(self.mbox_name, self.mbox_backup_name)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.mbox_name, shallow=0))
+ assertEqualContent(archive_name, self.mbox_name, zipfirst=True)
def tearDown(self):
archivemail.options.copy_old_mail = 0
@@ -1121,8 +1061,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -1142,8 +1081,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -1166,11 +1104,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir):
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def tearDown(self):
archivemail.options.include_flagged = 0
@@ -1208,11 +1142,7 @@ class TestArchiveMboxOutputDir(TestCaseInTempdir):
self.assertEqual(os.path.getsize(self.mbox_name), 0)
archive_name = self.dir_name + "/" + \
os.path.basename(self.mbox_name) + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = re.sub(".gz$", "", archive_name)
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def tearDown(self):
archivemail.options.quiet = 0
@@ -1252,8 +1182,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir):
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(self.mbox_mode, new_mode)
archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name)
assert(not os.path.exists(archive_name + ".gz"))
def testNew(self):
@@ -1271,8 +1200,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(self.mbox_mode, new_mode)
archive_name = self.mbox_name + "_archive"
@@ -1295,11 +1223,9 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.new_mbox)
archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
+ assertEqualContent(archive_name, self.old_mbox)
assert(not os.path.exists(archive_name + ".gz"))
def testOldExists(self):
@@ -1325,8 +1251,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir):
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(self.mbox_mode, new_mode)
archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name)
assert(not os.path.exists(archive_name + ".gz"))
def tearDown(self):
@@ -1364,11 +1289,7 @@ class TestArchiveSize(TestCaseInTempdir):
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(archive_name))
- self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
- archive_name = self.mbox_name + "_archive"
- assert(os.path.exists(archive_name))
- assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
+ assertEqualContent(archive_name, self.copy_name, zipfirst=True)
def testBigger(self):
"""giving a size argument bigger than the message"""
@@ -1390,8 +1311,7 @@ class TestArchiveSize(TestCaseInTempdir):
self.assertEqual(os.system(run), 0)
else:
sys.exit(1)
- assert(os.path.exists(self.mbox_name))
- assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
+ assertEqualContent(self.mbox_name, self.copy_name)
archive_name = self.mbox_name + "_archive.gz"
assert(not os.path.exists(archive_name))
@@ -1506,6 +1426,32 @@ def make_mbox(body=None, headers=None, hours_old=0, messages=1):
file.close()
return name
+def assertEqualContent(firstfile, secondfile, zipfirst=False):
+ """Verify that the two files exist and have identical content. If zipfirst
+ is True, assume that firstfile is gzip-compressed."""
+ assert(os.path.exists(firstfile))
+ assert(os.path.exists(secondfile))
+ if zipfirst:
+ try:
+ fp1 = gzip.GzipFile(firstfile, "r")
+ fp2 = open(secondfile, "r")
+ assert(cmp_fileobj(fp1, fp2))
+ finally:
+ fp1.close()
+ fp2.close()
+ else:
+ assert(filecmp.cmp(firstfile, secondfile, shallow=0))
+
+def cmp_fileobj(fp1, fp2):
+ """Return if reading the fileobjects yields identical content."""
+ bufsize = 8192
+ while True:
+ b1 = fp1.read(bufsize)
+ b2 = fp2.read(bufsize)
+ if b1 != b2:
+ return False
+ if not b1:
+ return True
if __name__ == "__main__":
unittest.main()