aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG8
-rw-r--r--TODO4
-rwxr-xr-xarchivemail.py138
-rwxr-xr-xarchivemail_test.py41
-rwxr-xr-xtest_archivemail.py424
5 files changed, 521 insertions, 94 deletions
diff --git a/CHANGELOG b/CHANGELOG
index c19ebba..554a470 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,11 @@
+Version 0.3.0 - ???
+ * We now preserve the last-accessed and last-modified timestamps correctly
+ * Fixed a bug where lockfiles were being created that were not
+ world-readable
+ * Made archivemail work better when used as a python module so it can
+ integrate better with unittest.
+ * Budled a unit testing script for archivemail.
+
Version 0.2.1 - 4 April 2002
* Since we might not have a parse-able 'Date-Received' or 'Date' field,
use 5 different ways to guess the date of a message.
diff --git a/TODO b/TODO
index 35a9431..630e310 100644
--- a/TODO
+++ b/TODO
@@ -1,9 +1,9 @@
Goals for next minor release (0.2.2):
-------------------------------------
-* Test exclusive locking works with another test process
-* Perserve atime of original mailbox properly
* Finish man page
+* If a mailbox has a mode of 660, preserve it.
+ (Especially in /var/spool/mail with groupid of 'mail')
Goals for next major release (0.3.0):
-------------------------------------
diff --git a/archivemail.py b/archivemail.py
index 3eabe4e..965735a 100755
--- a/archivemail.py
+++ b/archivemail.py
@@ -16,12 +16,19 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
############################################################################
-
"""
Archive and compress old mail in mbox or maildir-format mailboxes.
Website: http://archivemail.sourceforge.net/
"""
+# global administrivia
+__version__ = "archivemail v0.3.0"
+__cvs_id__ = "$Id$"
+__copyright__ = """Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
+This is free software; see the source for copying conditions. There is NO
+warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE."""
+
+
import sys
def check_python_version():
@@ -51,15 +58,6 @@ import string
import tempfile
import time
-# global administrivia
-__version__ = "archivemail v0.2.1"
-__cvs_id__ = "$Id$"
-__copyright__ = """Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
-This is free software; see the source for copying conditions. There is NO
-warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE."""
-
-_stale = None # list of files to delete on abnormal exit
-
############## class definitions ###############
class Stats:
@@ -83,7 +81,8 @@ class Stats:
assert(final_archive_name)
self.__start_time = time.time()
self.__mailbox_name = mailbox_name
- self.__archive_name = final_archive_name + _options.compressor_extension
+ self.__archive_name = final_archive_name + \
+ _options.compressor_extension()
def another_message(self):
"""Add one to the internal count of total messages processed"""
@@ -138,8 +137,7 @@ class StaleFiles:
class Options:
"""Class to store runtime options, including defaults"""
archive_suffix = "_archive"
- compressor = None
- compressor_extension = None
+ compressor = "gzip"
days_old_max = 180
delete_old_mail = 0
dry_run = 0
@@ -171,6 +169,8 @@ class Options:
"warn-duplicate"])
except getopt.error, msg:
user_error(msg)
+
+ chosen_compressor = None
for o, a in opts:
if o == '--delete':
self.delete_old_mail = 1
@@ -195,25 +195,22 @@ class Options:
print __version__ + "\n\n" + __copyright__
sys.exit(0)
if o in ('-z', '--gzip'):
- if (self.compressor):
+ if (chosen_compressor):
user_error("conflicting compression options")
self.compressor = "gzip"
+ chosen_compressor = 1
if o in ('-Z', '--compress'):
- if (self.compressor):
+ if (chosen_compressor):
user_error("conflicting compression options")
self.compressor = "compress"
+ chosen_compressor = 1
if o in ('-I', '--bzip2'):
- if (self.compressor):
+ if (chosen_compressor):
user_error("conflicting compression options")
self.compressor = "bzip2"
+ chosen_compressor = 1
if not self.compressor:
self.compressor = "gzip"
- extensions = {
- "compress" : ".Z",
- "gzip" : ".gz",
- "bzip2" : ".bz2",
- }
- self.compressor_extension = extensions[self.compressor]
return args
def sanity_check(self):
@@ -233,6 +230,15 @@ class Options:
if (self.days_old_max >= 10000):
user_error("argument to -d must be less than 10000")
+ def compressor_extension(self):
+ extensions = {
+ "compress" : ".Z",
+ "gzip" : ".gz",
+ "bzip2" : ".bz2",
+ }
+ return extensions[self.compressor]
+
+
class Mbox(mailbox.PortableUnixMailbox):
"""Class that allows read/write access to a 'mbox' mailbox.
@@ -240,18 +246,23 @@ class Mbox(mailbox.PortableUnixMailbox):
"""
mbox_file = None # file handle for the mbox file
+ original_atime = None # last-accessed timestamp
+ original_mtime = None # last-modified timestamp
- def __init__(self, path_name):
+ def __init__(self, path, mode="r"):
"""Constructor for opening an existing 'mbox' mailbox.
Extends constructor for mailbox.PortableUnixMailbox()
- Arguments:
- path_name -- file name of the 'mbox' file to be opened
+ Named Arguments:
+ path -- file name of the 'mbox' file to be opened
+ mode -- mode to open the file in (default is read-only)
"""
- assert(path_name)
+ assert(path)
try:
- self.mbox_file = open(path_name, "r")
+ self.original_atime = os.path.getatime(path)
+ self.original_mtime = os.path.getmtime(path)
+ self.mbox_file = open(path, mode)
except IOError, msg:
unexpected_error(msg)
mailbox.PortableUnixMailbox.__init__(self, self.mbox_file)
@@ -266,6 +277,8 @@ class Mbox(mailbox.PortableUnixMailbox):
"""
assert(msg)
+ assert(self.mbox_file)
+
vprint("saving message to file '%s'" % self.mbox_file.name)
unix_from = msg.unixfrom
if not unix_from:
@@ -300,6 +313,14 @@ class Mbox(mailbox.PortableUnixMailbox):
vprint("closing file '%s'" % self.mbox_file.name)
self.mbox_file.close()
+ def reset_time_stamps(self):
+ """Set the file timestamps to the original value"""
+ assert(self.original_atime)
+ assert(self.original_mtime)
+ assert(self.mbox_file.name)
+ os.utime(self.mbox_file.name, (self.original_atime, \
+ self.original_mtime))
+
def exclusive_lock(self):
"""Set an advisory lock on the 'mbox' mailbox"""
vprint("obtaining exclusive lock on file '%s'" % self.mbox_file.name)
@@ -322,9 +343,11 @@ class Mbox(mailbox.PortableUnixMailbox):
unexpected_error("Giving up waiting for procmail lock '%s'"
% lock_name)
vprint("writing lockfile '%s'" % lock_name)
+ old_umask = os.umask(022) # is this dodgy?
lock = open(lock_name, "w")
_stale.procmail_lock = lock_name
lock.close()
+ old_umask = os.umask(old_umask)
def procmail_unlock(self):
"""Delete the procmail lockfile on the 'mbox' mailbox"""
@@ -342,11 +365,9 @@ class Mbox(mailbox.PortableUnixMailbox):
completely deleted."""
assert(os.path.isfile(self.mbox_file.name))
vprint("turning '%s' into a zero-length file" % self.mbox_file.name)
- atime = os.path.getatime(self.mbox_file.name)
mtime = os.path.getmtime(self.mbox_file.name)
blank_file = open(self.mbox_file.name, "w")
blank_file.close()
- os.utime(self.mbox_file.name, (atime, mtime)) # to original timestamps
class RetainMbox(Mbox):
@@ -377,8 +398,14 @@ class RetainMbox(Mbox):
"""Overwrite the original mailbox with this temporary mailbox."""
assert(self.__final_name)
self.close()
+
+ # make sure that the retained mailbox has the same timestamps and
+ # permission as the original mailbox
atime = os.path.getatime(self.__final_name)
mtime = os.path.getmtime(self.__final_name)
+ mode = os.stat(self.__final_name)[stat.ST_MODE]
+ os.chmod(self.mbox_file.name, mode)
+
vprint("renaming '%s' to '%s'" % (self.mbox_file.name, self.__final_name))
os.rename(self.mbox_file.name, self.__final_name)
os.utime(self.__final_name, (atime, mtime)) # reset to original timestamps
@@ -414,7 +441,7 @@ class ArchiveMbox(Mbox):
"""
assert(final_name)
compressor = _options.compressor
- compressedfilename = final_name + _options.compressor_extension
+ compressedfilename = final_name + _options.compressor_extension()
if os.path.isfile(final_name):
unexpected_error("""There is already a file named '%s'!
@@ -445,13 +472,14 @@ manually, and try running me again.""" % final_name)
self.close()
compressor = _options.compressor
compressed_archive_name = self.mbox_file.name + \
- _options.compressor_extension
+ _options.compressor_extension()
compress = compressor + " " + self.mbox_file.name
vprint("running compressor: '%s'" % compress)
_stale.compressed_archive = compressed_archive_name
system_or_die(compress)
_stale.archive = None
- compressed_final_name = self.__final_name + _options.compressor_extension
+ compressed_final_name = self.__final_name + \
+ _options.compressor_extension()
vprint("renaming '%s' to '%s'" % (compressed_archive_name,
compressed_final_name))
os.rename(compressed_archive_name, compressed_final_name)
@@ -478,6 +506,7 @@ class IdentityCache:
# global class instances
_options = Options() # the run-time options object
+_stale = StaleFiles() # remember what we have to delete on abnormal exit
def main(args = sys.argv[1:]):
@@ -519,17 +548,6 @@ Website: http://archivemail.sourceforge.net/ """ % \
sys.exit(1)
_options.sanity_check()
- os.umask(077) # saves setting permissions on mailboxes/tempfiles
-
- # Make sure we clean up nicely - we don't want to leave stale procmail
- # lockfiles about if something bad happens to us. This is quite
- # important, even though procmail will delete stale files after a while.
- _stale = StaleFiles() # remember what we have to delete
- atexit.register(clean_up) # delete stale files on exceptions/normal exit
- signal.signal(signal.SIGHUP, clean_up_signal) # signal 1
- # SIGINT (signal 2) is handled as a python exception
- signal.signal(signal.SIGQUIT, clean_up_signal) # signal 3
- signal.signal(signal.SIGTERM, clean_up_signal) # signal 15
for mailbox_path in args:
archive(mailbox_path)
@@ -599,7 +617,7 @@ def guess_delivery_time(message):
if date:
try:
time_message = time.mktime(date)
- assert(time_message, 'time.mktime() returned false')
+ assert(time_message)
vprint("using valid time found from '%s' header" % header)
return time_message
except (ValueError, OverflowError): pass
@@ -612,7 +630,7 @@ def guess_delivery_time(message):
if date:
try:
time_message = time.mktime(date)
- assert(time_message, 'time.mktime() returned false')
+ assert(time_message)
vprint("using valid time found from unix 'From_' header")
return time_message
except (ValueError, OverflowError): pass
@@ -634,7 +652,7 @@ def guess_delivery_time(message):
return time_message
-def is_too_old(time_message):
+def is_too_old(time_message, max_days):
"""Return true if a message is too old (and should be archived),
false otherwise.
@@ -643,12 +661,14 @@ def is_too_old(time_message):
since the epoch
"""
- assert(time_message)
+ assert(time_message > 0)
+ assert(max_days >= 1)
+
time_now = time.time()
if time_message > time_now:
vprint("warning: message has date in the future")
return 0
- secs_old_max = (_options.days_old_max * 24 * 60 * 60)
+ secs_old_max = (max_days * 24 * 60 * 60)
days_old = (time_now - time_message) / 24 / 60 / 60
vprint("message is %.2f days old" % days_old)
if ((time_message + secs_old_max) < time_now):
@@ -670,6 +690,9 @@ def archive(mailbox_name):
"""
assert(mailbox_name)
+ set_signal_handlers()
+ os.umask(077) # saves setting permissions on mailboxes/tempfiles
+
final_archive_name = mailbox_name + _options.archive_suffix
if _options.output_dir:
final_archive_name = os.path.join(_options.output_dir,
@@ -730,7 +753,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
archive = None
retain = None
stats = Stats(mailbox_name, final_archive_name)
- original = Mbox(mailbox_name)
+ original = Mbox(path=mailbox_name)
cache = IdentityCache(mailbox_name)
original.procmail_lock()
@@ -742,7 +765,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
if _options.warn_duplicates:
cache.warn_if_dupe(msg)
time_message = guess_delivery_time(msg)
- if is_too_old(time_message):
+ if is_too_old(time_message, _options.days_old_max):
stats.another_archived()
if _options.delete_old_mail:
vprint("decision: delete message")
@@ -762,6 +785,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
vprint("finished reading messages")
original.exclusive_unlock()
original.close()
+ original.reset_time_stamps()
if not _options.dry_run:
if retain: retain.close()
if archive: archive.close()
@@ -772,6 +796,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
else:
# nothing was retained - everything was deleted
original.leave_empty()
+ original.reset_time_stamps()
elif archive:
archive.finalise()
if retain:
@@ -779,6 +804,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
else:
# nothing was retained - everything was deleted
original.leave_empty()
+ original.reset_time_stamps()
else:
# There was nothing to archive
if retain:
@@ -816,7 +842,7 @@ def _archive_dir(mailbox_name, final_archive_name, type):
if _options.warn_duplicates:
cache.warn_if_dupe(msg)
time_message = guess_delivery_time(msg)
- if is_too_old(time_message):
+ if is_too_old(time_message, _options.days_old_max):
stats.another_archived()
if _options.delete_old_mail:
vprint("decision: delete message")
@@ -868,6 +894,16 @@ def choose_temp_dir(mailbox_name):
return temp_dir
+def set_signal_handlers():
+ # Make sure we clean up nicely - we don't want to leave stale procmail
+ # lockfiles about if something bad happens to us. This is quite
+ # important, even though procmail will delete stale files after a while.
+ atexit.register(clean_up) # delete stale files on exceptions/normal exit
+ signal.signal(signal.SIGHUP, clean_up_signal) # signal 1
+ # SIGINT (signal 2) is handled as a python exception
+ signal.signal(signal.SIGQUIT, clean_up_signal) # signal 3
+ signal.signal(signal.SIGTERM, clean_up_signal) # signal 15
+
def clean_up():
"""Delete stale files -- to be registered with atexit.register()"""
vprint("cleaning up ...")
diff --git a/archivemail_test.py b/archivemail_test.py
deleted file mode 100755
index a412dae..0000000
--- a/archivemail_test.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python
-
-import archivemail
-import os
-import tempfile
-import unittest
-
-class TempfileTestCase(unittest.TestCase):
- def setUp(self):
- self.output_dir = tempfile.mktemp()
- os.mkdir(self.output_dir)
- self.sub_dir = tempfile.mktemp()
- os.mkdir(self.sub_dir)
-
- def testCurrentDir(self):
- archivemail._options.output_dir = None
- dir = archivemail.choose_temp_dir("dummy")
- self.assertEqual(dir, os.curdir)
-
- def testSubDir(self):
- archivemail._options.output_dir = None
- dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy"))
- self.assertEqual(dir, self.sub_dir)
-
- def testOutputDir(self):
- archivemail._options.output_dir = self.output_dir
- dir = archivemail.choose_temp_dir("dummy")
- self.assertEqual(dir, self.output_dir)
-
- def testSubDirOutputDir(self):
- archivemail._options.output_dir = self.output_dir
- dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy"))
- self.assertEqual(dir, self.output_dir)
-
- def tearDown(self):
- os.rmdir(self.output_dir)
- os.rmdir(self.sub_dir)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/test_archivemail.py b/test_archivemail.py
new file mode 100755
index 0000000..df1677b
--- /dev/null
+++ b/test_archivemail.py
@@ -0,0 +1,424 @@
+#!/usr/bin/env python
+############################################################################
+# Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+############################################################################
+"""
+Test archivemail works correctly using 'pyunit'.
+"""
+
+import fcntl
+import filecmp
+import os
+import shutil
+import stat
+import tempfile
+import time
+import unittest
+
+import archivemail
+
+__version__ = """$Id$"""
+
+############ Mbox Class testing ##############
+
+class TestMboxIsEmpty(unittest.TestCase):
+ def setUp(self):
+ self.empty_name = make_mbox(messages=0)
+ self.not_empty_name = make_mbox(messages=1)
+
+ def testEmpty(self):
+ mbox = archivemail.Mbox(self.empty_name)
+ assert(mbox.is_empty())
+
+ def testNotEmpty(self):
+ mbox = archivemail.Mbox(self.not_empty_name)
+ assert(not mbox.is_empty())
+
+ def tearDown(self):
+ if os.path.exists(self.empty_name):
+ os.remove(self.empty_name)
+ if os.path.exists(self.not_empty_name):
+ os.remove(self.not_empty_name)
+
+
+class TestMboxLeaveEmpty(unittest.TestCase):
+ def setUp(self):
+ self.mbox_name = make_mbox()
+ self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.mbox = archivemail.Mbox(self.mbox_name)
+
+ def testLeaveEmpty(self):
+ self.mbox.leave_empty()
+ assert(os.path.isfile(self.mbox_name))
+ self.assertEqual(os.path.getsize(self.mbox_name), 0)
+ new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.assertEqual(new_mode, self.mbox_mode)
+
+ def tearDown(self):
+ if os.path.exists(self.mbox_name):
+ os.remove(self.mbox_name)
+
+
+class TestMboxProcmailLock(unittest.TestCase):
+ def setUp(self):
+ self.mbox_name = make_mbox()
+ self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.mbox = archivemail.Mbox(self.mbox_name)
+
+ def testProcmailLock(self):
+ lock = self.mbox_name + ".lock"
+ self.mbox.procmail_lock()
+ assert(os.path.isfile(lock))
+ assert(is_world_readable(lock))
+ self.mbox.procmail_unlock()
+ assert(not os.path.isfile(lock))
+
+ # TODO: add a test where the lock already exists
+
+ def tearDown(self):
+ if os.path.exists(self.mbox_name):
+ os.remove(self.mbox_name)
+
+
+class TestMboxRemove(unittest.TestCase):
+ def setUp(self):
+ self.mbox_name = make_mbox()
+ self.mbox = archivemail.Mbox(self.mbox_name)
+
+ def testProcmailLock(self):
+ assert(os.path.exists(self.mbox_name))
+ self.mbox.remove()
+ assert(not os.path.exists(self.mbox_name))
+
+ def tearDown(self):
+ if os.path.exists(self.mbox_name):
+ os.remove(self.mbox_name)
+
+
+class TestMboxExclusiveLock(unittest.TestCase):
+ def setUp(self):
+ self.mbox_name = make_mbox()
+ self.mbox = archivemail.Mbox(self.mbox_name)
+
+ def testExclusiveLock(self):
+ self.mbox.exclusive_lock()
+ file = open(self.mbox_name, "r+")
+ lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB
+ self.assertRaises(IOError, fcntl.flock, file, lock_nb)
+
+ self.mbox.exclusive_unlock()
+ fcntl.flock(file, lock_nb)
+ fcntl.flock(file, fcntl.LOCK_UN)
+
+ def tearDown(self):
+ if os.path.exists(self.mbox_name):
+ os.remove(self.mbox_name)
+
+
+class TestMboxNext(unittest.TestCase):
+ def setUp(self):
+ self.not_empty_name = make_mbox(messages=18)
+ self.empty_name = make_mbox(messages=0)
+
+ def testNextEmpty(self):
+ mbox = archivemail.Mbox(self.empty_name)
+ msg = mbox.next()
+ self.assertEqual(msg, None)
+
+ def testNextNotEmpty(self):
+ mbox = archivemail.Mbox(self.not_empty_name)
+ for count in range(18):
+ msg = mbox.next()
+ assert(msg)
+ msg = mbox.next()
+ self.assertEqual(msg, None)
+
+ def tearDown(self):
+ if os.path.exists(self.not_empty_name):
+ os.remove(self.not_empty_name)
+ if os.path.exists(self.empty_name):
+ os.remove(self.empty_name)
+
+
+class TestMboxWrite(unittest.TestCase):
+ def setUp(self):
+ self.mbox_read = make_mbox(messages=3)
+ self.mbox_write = make_mbox(messages=0)
+
+ def testWrite(self):
+ read = archivemail.Mbox(self.mbox_read)
+ write = archivemail.Mbox(self.mbox_write, mode="w")
+ for count in range(3):
+ msg = read.next()
+ write.write(msg)
+ read.close()
+ write.close()
+ assert(filecmp.cmp(self.mbox_read, self.mbox_write))
+
+ def testWriteNone(self):
+ write = archivemail.Mbox(self.mbox_write, mode="w")
+ self.assertRaises(AssertionError, write.write, None)
+
+ def tearDown(self):
+ if os.path.exists(self.mbox_write):
+ os.remove(self.mbox_write)
+ if os.path.exists(self.mbox_read):
+ os.remove(self.mbox_read)
+
+
+########## generic routine testing #################
+
+
+class TestIsTooOld(unittest.TestCase):
+ def testOld(self):
+ time_msg = time.time() - (15 * 24 * 60 * 60) # 15 days old
+ assert(archivemail.is_too_old(time_message=time_msg, max_days=14))
+
+ def testJustOld(self):
+ time_msg = time.time() - (25 * 60 * 60) # 25 hours old
+ assert(archivemail.is_too_old(time_message=time_msg, max_days=1))
+
+ def testNotOld(self):
+ time_msg = time.time() - (8 * 24 * 60 * 60) # 8 days old
+ assert(not archivemail.is_too_old(time_message=time_msg, max_days=9))
+
+ def testJustNotOld(self):
+ time_msg = time.time() - (23 * 60 * 60) # 23 hours old
+ assert(not archivemail.is_too_old(time_message=time_msg, max_days=1))
+
+ def testFuture(self):
+ time_msg = time.time() + (1 * 24 * 60 * 60) # tomorrow
+ assert(not archivemail.is_too_old(time_message=time_msg, max_days=1))
+
+
+class TestChooseTempDir(unittest.TestCase):
+ def setUp(self):
+ self.output_dir = tempfile.mktemp()
+ os.mkdir(self.output_dir)
+ self.sub_dir = tempfile.mktemp()
+ os.mkdir(self.sub_dir)
+
+ def testCurrentDir(self):
+ archivemail._options.output_dir = None
+ dir = archivemail.choose_temp_dir("dummy")
+ self.assertEqual(dir, os.curdir)
+
+ def testSubDir(self):
+ archivemail._options.output_dir = None
+ dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy"))
+ self.assertEqual(dir, self.sub_dir)
+
+ def testOutputDir(self):
+ archivemail._options.output_dir = self.output_dir
+ dir = archivemail.choose_temp_dir("dummy")
+ self.assertEqual(dir, self.output_dir)
+
+ def testSubDirOutputDir(self):
+ archivemail._options.output_dir = self.output_dir
+ dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy"))
+ self.assertEqual(dir, self.output_dir)
+
+ def tearDown(self):
+ os.rmdir(self.output_dir)
+ os.rmdir(self.sub_dir)
+
+
+########## proper archival testing ###########
+
+class TestArchiveMboxTimestampNew(unittest.TestCase):
+ def setUp(self):
+ self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179))
+ self.mtime = os.path.getmtime(self.mbox_name) - 66
+ self.atime = os.path.getatime(self.mbox_name) - 88
+ os.utime(self.mbox_name, (self.atime, self.mtime))
+ archivemail._options.quiet = 1
+
+ def testTime(self):
+ archivemail._options.compressor = "gzip"
+ archivemail.archive(self.mbox_name)
+ assert(os.path.exists(self.mbox_name))
+ new_atime = os.path.getatime(self.mbox_name)
+ new_mtime = os.path.getmtime(self.mbox_name)
+ self.assertEqual(self.mtime, new_mtime)
+ self.assertEqual(self.atime, new_atime)
+
+ def tearDown(self):
+ if os.path.exists(self.mbox_name):
+ os.remove(self.mbox_name)
+ archivemail._options.quiet = 0
+
+
+class TestArchiveMboxTimestampOld(unittest.TestCase):
+ def setUp(self):
+ self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
+ self.mtime = os.path.getmtime(self.mbox_name) - 66
+ self.atime = os.path.getatime(self.mbox_name) - 88
+ os.utime(self.mbox_name, (self.atime, self.mtime))
+ archivemail._options.quiet = 1
+
+ def testTime(self):
+ archivemail._options.compressor = "gzip"
+ archive_name = self.mbox_name + "_archive.gz"
+ archivemail.archive(self.mbox_name)
+ assert(os.path.exists(self.mbox_name))
+ new_atime = os.path.getatime(self.mbox_name)
+ new_mtime = os.path.getmtime(self.mbox_name)
+ self.assertEqual(self.mtime, new_mtime)
+ self.assertEqual(self.atime, new_atime)
+
+ def tearDown(self):
+ if os.path.exists(self.mbox_name):
+ os.remove(self.mbox_name)
+ for ext in (".gz", ".bz2", ".Z"):
+ if os.path.exists(self.mbox_name + ext):
+ os.remove(self.mbox_name + ext)
+ archivemail._options.quiet = 0
+
+
+class TestArchiveMboxOld(unittest.TestCase):
+ def setUp(self):
+ self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
+ self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.copy_name = tempfile.mktemp()
+ shutil.copyfile(self.mbox_name, self.copy_name)
+ archivemail._options.quiet = 1
+
+ def testArchiveOldGzip(self):
+ archivemail._options.compressor = "gzip"
+ archivemail.archive(self.mbox_name)
+ assert(os.path.exists(self.mbox_name))
+ self.assertEqual(os.path.getsize(self.mbox_name), 0)
+ new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.assertEqual(self.mbox_mode, new_mode)
+
+ archive_name = self.mbox_name + "_archive.gz"
+ assert(os.path.exists(archive_name))
+ os.system("gzip -d " + archive_name)
+
+ archive_name = self.mbox_name + "_archive"
+ assert(os.path.exists(archive_name))
+ assert(filecmp.cmp(archive_name, self.copy_name))
+ self.tearDown()
+ self.setUp()
+
+ def testArchiveOldBzip2(self):
+ archivemail._options.compressor = "bzip2"
+ archivemail.archive(self.mbox_name)
+ assert(os.path.exists(self.mbox_name))
+ self.assertEqual(os.path.getsize(self.mbox_name), 0)
+ new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.assertEqual(self.mbox_mode, new_mode)
+
+ archive_name = self.mbox_name + "_archive.bz2"
+ assert(os.path.exists(archive_name))
+ os.system("bzip2 -d " + archive_name)
+
+ archive_name = self.mbox_name + "_archive"
+ assert(os.path.exists(archive_name))
+ assert(filecmp.cmp(archive_name, self.copy_name))
+ self.tearDown()
+ self.setUp()
+
+ def testArchiveOldCompress(self):
+ archivemail._options.compressor = "compress"
+ archivemail.archive(self.mbox_name)
+ assert(os.path.exists(self.mbox_name))
+ self.assertEqual(os.path.getsize(self.mbox_name), 0)
+ new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.assertEqual(self.mbox_mode, new_mode)
+
+ archive_name = self.mbox_name + "_archive.Z"
+ assert(os.path.exists(archive_name))
+ os.system("compress -d " + archive_name)
+
+ archive_name = self.mbox_name + "_archive"
+ assert(os.path.exists(archive_name))
+ assert(filecmp.cmp(archive_name, self.copy_name))
+ self.tearDown()
+ self.setUp()
+
+ def tearDown(self):
+ if os.path.exists(self.mbox_name):
+ os.remove(self.mbox_name)
+ for ext in (".gz", ".bz2", ".Z"):
+ if os.path.exists(self.mbox_name + ext):
+ os.remove(self.mbox_name + ext)
+ if os.path.exists(self.copy_name):
+ os.remove(self.copy_name)
+ archivemail._options.quiet = 0
+
+
+class TestArchiveMboxNew(unittest.TestCase):
+ def setUp(self):
+ archivemail._options.quiet = 1
+ self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179))
+ self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.copy_name = tempfile.mktemp()
+ shutil.copyfile(self.mbox_name, self.copy_name)
+
+ def testArchiveNew(self):
+ archivemail.archive(self.mbox_name)
+ assert(os.path.exists(self.mbox_name))
+ assert(filecmp.cmp(self.mbox_name, self.copy_name))
+ new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
+ self.assertEqual(self.mbox_mode, new_mode)
+
+ archive_name = self.mbox_name + "_archive.gz"
+ assert(not os.path.exists(archive_name))
+
+ def tearDown(self):
+ archivemail._options.quiet = 0
+ if os.path.exists(self.mbox_name):
+ os.remove(self.mbox_name)
+ if os.path.exists(self.copy_name):
+ os.remove(self.copy_name)
+
+
+########## helper routines ############
+
+def make_message(hours_old=0):
+ time_message = time.time() - (60 * 60 * hours_old)
+ time_string = time.asctime(time.localtime(time_message))
+
+ return """From sender@domain %s
+From: sender@domain
+To: receipient@domain
+Subject: This is a dummy message
+Date: %s
+
+This is the message body.
+It's very exciting.
+
+
+""" % (time_string, time_string)
+
+def make_mbox(messages=1, hours_old=0):
+ name = tempfile.mktemp()
+ file = open(name, "w")
+ for count in range(messages):
+ file.write(make_message(hours_old=hours_old))
+ file.close()
+ return name
+
+def is_world_readable(path):
+ """Return true if the path is world-readable, false otherwise"""
+ assert(path)
+ return (os.stat(path)[stat.ST_MODE] & stat.S_IROTH)
+
+
+if __name__ == "__main__":
+ unittest.main()