diff options
| -rw-r--r-- | CHANGELOG | 8 | ||||
| -rw-r--r-- | TODO | 4 | ||||
| -rwxr-xr-x | archivemail.py | 138 | ||||
| -rwxr-xr-x | archivemail_test.py | 41 | ||||
| -rwxr-xr-x | test_archivemail.py | 424 | 
5 files changed, 521 insertions, 94 deletions
@@ -1,3 +1,11 @@ +Version 0.3.0   - ??? +  * We now preserve the last-accessed and last-modified timestamps correctly +  * Fixed a bug where lockfiles were being created that were not +    world-readable +  * Made archivemail work better when used as a python module so it can +    integrate better with unittest. +  * Budled a unit testing script for archivemail. +  Version 0.2.1   - 4 April 2002    * Since we might not have a parse-able 'Date-Received' or 'Date' field,      use 5 different ways to guess the date of a message.  @@ -1,9 +1,9 @@  Goals for next minor release (0.2.2):  ------------------------------------- -* Test exclusive locking works with another test process -* Perserve atime of original mailbox properly  * Finish man page +* If a mailbox has a mode of 660, preserve it.  +  (Especially in /var/spool/mail with groupid of 'mail')  Goals for next major release (0.3.0):  ------------------------------------- diff --git a/archivemail.py b/archivemail.py index 3eabe4e..965735a 100755 --- a/archivemail.py +++ b/archivemail.py @@ -16,12 +16,19 @@  # along with this program; if not, write to the Free Software  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  ############################################################################ -  """  Archive and compress old mail in mbox or maildir-format mailboxes.  Website: http://archivemail.sourceforge.net/  """ +# global administrivia  +__version__ = "archivemail v0.3.0" +__cvs_id__ = "$Id$" +__copyright__ = """Copyright (C) 2002  Paul Rodger <paul@paulrodger.com> +This is free software; see the source for copying conditions. There is NO +warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.""" + +  import sys  def check_python_version():  @@ -51,15 +58,6 @@ import string  import tempfile  import time -# global administrivia  -__version__ = "archivemail v0.2.1" -__cvs_id__ = "$Id$" -__copyright__ = """Copyright (C) 2002  Paul Rodger <paul@paulrodger.com> -This is free software; see the source for copying conditions. There is NO -warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.""" - -_stale = None    # list of files to delete on abnormal exit -  ############## class definitions ###############  class Stats: @@ -83,7 +81,8 @@ class Stats:          assert(final_archive_name)          self.__start_time = time.time()          self.__mailbox_name = mailbox_name -        self.__archive_name = final_archive_name + _options.compressor_extension +        self.__archive_name = final_archive_name + \ +            _options.compressor_extension()      def another_message(self):          """Add one to the internal count of total messages processed""" @@ -138,8 +137,7 @@ class StaleFiles:  class Options:      """Class to store runtime options, including defaults"""      archive_suffix       = "_archive" -    compressor           = None -    compressor_extension = None +    compressor           = "gzip"      days_old_max         = 180      delete_old_mail      = 0      dry_run              = 0 @@ -171,6 +169,8 @@ class Options:                               "warn-duplicate"])          except getopt.error, msg:              user_error(msg) + +        chosen_compressor = None          for o, a in opts:              if o == '--delete':                  self.delete_old_mail = 1 @@ -195,25 +195,22 @@ class Options:                  print __version__ + "\n\n" + __copyright__                  sys.exit(0)              if o in ('-z', '--gzip'): -                if (self.compressor): +                if (chosen_compressor):                      user_error("conflicting compression options")                  self.compressor = "gzip" +                chosen_compressor = 1              if o in ('-Z', '--compress'): -                if (self.compressor): +                if (chosen_compressor):                      user_error("conflicting compression options")                  self.compressor = "compress" +                chosen_compressor = 1              if o in ('-I', '--bzip2'): -                if (self.compressor): +                if (chosen_compressor):                      user_error("conflicting compression options")                  self.compressor = "bzip2" +                chosen_compressor = 1          if not self.compressor:              self.compressor = "gzip" -        extensions = { -            "compress" : ".Z", -            "gzip"     : ".gz", -            "bzip2"    : ".bz2", -            } -        self.compressor_extension = extensions[self.compressor]          return args      def sanity_check(self): @@ -233,6 +230,15 @@ class Options:          if (self.days_old_max >= 10000):              user_error("argument to -d must be less than 10000") +    def compressor_extension(self): +        extensions = { +            "compress" : ".Z", +            "gzip"     : ".gz", +            "bzip2"    : ".bz2", +            } +        return extensions[self.compressor] + +  class Mbox(mailbox.PortableUnixMailbox):      """Class that allows read/write access to a 'mbox' mailbox.  @@ -240,18 +246,23 @@ class Mbox(mailbox.PortableUnixMailbox):      """      mbox_file = None   # file handle for the mbox file +    original_atime = None # last-accessed timestamp +    original_mtime = None # last-modified timestamp -    def __init__(self, path_name): +    def __init__(self, path, mode="r"):          """Constructor for opening an existing 'mbox' mailbox.          Extends constructor for mailbox.PortableUnixMailbox() -        Arguments: -        path_name -- file name of the 'mbox' file to be opened +        Named Arguments: +        path -- file name of the 'mbox' file to be opened +        mode -- mode to open the file in (default is read-only)          """ -        assert(path_name) +        assert(path)          try: -            self.mbox_file = open(path_name, "r") +            self.original_atime = os.path.getatime(path) +            self.original_mtime = os.path.getmtime(path) +            self.mbox_file = open(path, mode)          except IOError, msg:              unexpected_error(msg)          mailbox.PortableUnixMailbox.__init__(self, self.mbox_file) @@ -266,6 +277,8 @@ class Mbox(mailbox.PortableUnixMailbox):          """          assert(msg) +        assert(self.mbox_file) +          vprint("saving message to file '%s'" % self.mbox_file.name)          unix_from = msg.unixfrom          if not unix_from: @@ -300,6 +313,14 @@ class Mbox(mailbox.PortableUnixMailbox):              vprint("closing file '%s'" % self.mbox_file.name)              self.mbox_file.close() +    def reset_time_stamps(self): +        """Set the file timestamps to the original value""" +        assert(self.original_atime) +        assert(self.original_mtime) +        assert(self.mbox_file.name) +        os.utime(self.mbox_file.name, (self.original_atime,  \ +            self.original_mtime))  +      def exclusive_lock(self):          """Set an advisory lock on the 'mbox' mailbox"""          vprint("obtaining exclusive lock on file '%s'" % self.mbox_file.name) @@ -322,9 +343,11 @@ class Mbox(mailbox.PortableUnixMailbox):                  unexpected_error("Giving up waiting for procmail lock '%s'"                       % lock_name)          vprint("writing lockfile '%s'" % lock_name) +        old_umask = os.umask(022) # is this dodgy?          lock = open(lock_name, "w")          _stale.procmail_lock = lock_name          lock.close() +        old_umask = os.umask(old_umask)      def procmail_unlock(self):          """Delete the procmail lockfile on the 'mbox' mailbox""" @@ -342,11 +365,9 @@ class Mbox(mailbox.PortableUnixMailbox):          completely deleted."""          assert(os.path.isfile(self.mbox_file.name))          vprint("turning '%s' into a zero-length file" % self.mbox_file.name) -        atime = os.path.getatime(self.mbox_file.name)          mtime = os.path.getmtime(self.mbox_file.name)          blank_file = open(self.mbox_file.name, "w")          blank_file.close() -        os.utime(self.mbox_file.name, (atime, mtime)) # to original timestamps  class RetainMbox(Mbox): @@ -377,8 +398,14 @@ class RetainMbox(Mbox):          """Overwrite the original mailbox with this temporary mailbox."""          assert(self.__final_name)          self.close() + +        # make sure that the retained mailbox has the same timestamps and  +        # permission as the original mailbox          atime = os.path.getatime(self.__final_name)          mtime = os.path.getmtime(self.__final_name) +        mode =  os.stat(self.__final_name)[stat.ST_MODE] +        os.chmod(self.mbox_file.name, mode) +          vprint("renaming '%s' to '%s'" % (self.mbox_file.name, self.__final_name))          os.rename(self.mbox_file.name, self.__final_name)          os.utime(self.__final_name, (atime, mtime)) # reset to original timestamps @@ -414,7 +441,7 @@ class ArchiveMbox(Mbox):          """          assert(final_name)          compressor = _options.compressor -        compressedfilename = final_name + _options.compressor_extension +        compressedfilename = final_name + _options.compressor_extension()          if os.path.isfile(final_name):              unexpected_error("""There is already a file named '%s'! @@ -445,13 +472,14 @@ manually, and try running me again.""" % final_name)          self.close()          compressor = _options.compressor          compressed_archive_name = self.mbox_file.name +  \ -            _options.compressor_extension +            _options.compressor_extension()          compress = compressor + " " + self.mbox_file.name          vprint("running compressor: '%s'" % compress)          _stale.compressed_archive = compressed_archive_name          system_or_die(compress)          _stale.archive = None -        compressed_final_name = self.__final_name + _options.compressor_extension +        compressed_final_name = self.__final_name + \ +            _options.compressor_extension()          vprint("renaming '%s' to '%s'" % (compressed_archive_name,               compressed_final_name))          os.rename(compressed_archive_name, compressed_final_name) @@ -478,6 +506,7 @@ class IdentityCache:  # global class instances  _options = Options()  # the run-time options object +_stale = StaleFiles() # remember what we have to delete on abnormal exit  def main(args = sys.argv[1:]): @@ -519,17 +548,6 @@ Website: http://archivemail.sourceforge.net/ """ %   \          sys.exit(1)      _options.sanity_check() -    os.umask(077) # saves setting permissions on mailboxes/tempfiles - -    # Make sure we clean up nicely - we don't want to leave stale procmail -    # lockfiles about if something bad happens to us. This is quite  -    # important, even though procmail will delete stale files after a while. -    _stale = StaleFiles() # remember what we have to delete -    atexit.register(clean_up) # delete stale files on exceptions/normal exit -    signal.signal(signal.SIGHUP, clean_up_signal)   # signal 1 -    # SIGINT (signal 2) is handled as a python exception -    signal.signal(signal.SIGQUIT, clean_up_signal)  # signal 3 -    signal.signal(signal.SIGTERM, clean_up_signal)  # signal 15      for mailbox_path in args:          archive(mailbox_path) @@ -599,7 +617,7 @@ def guess_delivery_time(message):          if date:              try:                  time_message = time.mktime(date) -                assert(time_message, 'time.mktime() returned false') +                assert(time_message)                  vprint("using valid time found from '%s' header" % header)                  return time_message              except (ValueError, OverflowError): pass @@ -612,7 +630,7 @@ def guess_delivery_time(message):          if date:              try:                  time_message = time.mktime(date) -                assert(time_message, 'time.mktime() returned false') +                assert(time_message)                  vprint("using valid time found from unix 'From_' header")                  return time_message              except (ValueError, OverflowError): pass @@ -634,7 +652,7 @@ def guess_delivery_time(message):      return time_message -def is_too_old(time_message): +def is_too_old(time_message, max_days):      """Return true if a message is too old (and should be archived),       false otherwise. @@ -643,12 +661,14 @@ def is_too_old(time_message):                      since the epoch      """ -    assert(time_message) +    assert(time_message > 0) +    assert(max_days >= 1) +      time_now = time.time()      if time_message > time_now:          vprint("warning: message has date in the future")          return 0 -    secs_old_max = (_options.days_old_max * 24 * 60 * 60) +    secs_old_max = (max_days * 24 * 60 * 60)      days_old = (time_now - time_message) / 24 / 60 / 60      vprint("message is %.2f days old" % days_old)      if ((time_message + secs_old_max) < time_now): @@ -670,6 +690,9 @@ def archive(mailbox_name):      """      assert(mailbox_name)  +    set_signal_handlers() +    os.umask(077) # saves setting permissions on mailboxes/tempfiles +      final_archive_name = mailbox_name + _options.archive_suffix      if _options.output_dir:          final_archive_name = os.path.join(_options.output_dir,  @@ -730,7 +753,7 @@ def _archive_mbox(mailbox_name, final_archive_name):      archive = None      retain = None      stats = Stats(mailbox_name, final_archive_name) -    original = Mbox(mailbox_name) +    original = Mbox(path=mailbox_name)      cache = IdentityCache(mailbox_name)      original.procmail_lock() @@ -742,7 +765,7 @@ def _archive_mbox(mailbox_name, final_archive_name):          if _options.warn_duplicates:              cache.warn_if_dupe(msg)                       time_message = guess_delivery_time(msg) -        if is_too_old(time_message): +        if is_too_old(time_message, _options.days_old_max):              stats.another_archived()              if _options.delete_old_mail:                  vprint("decision: delete message") @@ -762,6 +785,7 @@ def _archive_mbox(mailbox_name, final_archive_name):      vprint("finished reading messages")       original.exclusive_unlock()      original.close() +    original.reset_time_stamps()      if not _options.dry_run:          if retain: retain.close()          if archive: archive.close() @@ -772,6 +796,7 @@ def _archive_mbox(mailbox_name, final_archive_name):              else:                  # nothing was retained - everything was deleted                  original.leave_empty() +                original.reset_time_stamps()          elif archive:              archive.finalise()              if retain: @@ -779,6 +804,7 @@ def _archive_mbox(mailbox_name, final_archive_name):              else:                  # nothing was retained - everything was deleted                  original.leave_empty() +                original.reset_time_stamps()          else:              # There was nothing to archive              if retain: @@ -816,7 +842,7 @@ def _archive_dir(mailbox_name, final_archive_name, type):          if _options.warn_duplicates:              cache.warn_if_dupe(msg)                       time_message = guess_delivery_time(msg) -        if is_too_old(time_message): +        if is_too_old(time_message, _options.days_old_max):              stats.another_archived()              if _options.delete_old_mail:                  vprint("decision: delete message") @@ -868,6 +894,16 @@ def choose_temp_dir(mailbox_name):      return temp_dir +def set_signal_handlers(): +    # Make sure we clean up nicely - we don't want to leave stale procmail +    # lockfiles about if something bad happens to us. This is quite  +    # important, even though procmail will delete stale files after a while. +    atexit.register(clean_up) # delete stale files on exceptions/normal exit +    signal.signal(signal.SIGHUP, clean_up_signal)   # signal 1 +    # SIGINT (signal 2) is handled as a python exception +    signal.signal(signal.SIGQUIT, clean_up_signal)  # signal 3 +    signal.signal(signal.SIGTERM, clean_up_signal)  # signal 15 +  def clean_up():      """Delete stale files -- to be registered with atexit.register()"""      vprint("cleaning up ...") diff --git a/archivemail_test.py b/archivemail_test.py deleted file mode 100755 index a412dae..0000000 --- a/archivemail_test.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python - -import archivemail -import os -import tempfile -import unittest - -class TempfileTestCase(unittest.TestCase): -    def setUp(self): -        self.output_dir = tempfile.mktemp() -        os.mkdir(self.output_dir) -        self.sub_dir = tempfile.mktemp() -        os.mkdir(self.sub_dir) - -    def testCurrentDir(self): -        archivemail._options.output_dir = None -        dir = archivemail.choose_temp_dir("dummy") -        self.assertEqual(dir, os.curdir) - -    def testSubDir(self): -        archivemail._options.output_dir = None -        dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy")) -        self.assertEqual(dir, self.sub_dir) - -    def testOutputDir(self): -        archivemail._options.output_dir = self.output_dir -        dir = archivemail.choose_temp_dir("dummy") -        self.assertEqual(dir, self.output_dir) - -    def testSubDirOutputDir(self): -        archivemail._options.output_dir = self.output_dir -        dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy")) -        self.assertEqual(dir, self.output_dir) - -    def tearDown(self): -        os.rmdir(self.output_dir) -        os.rmdir(self.sub_dir) - - -if __name__ == "__main__": -    unittest.main() diff --git a/test_archivemail.py b/test_archivemail.py new file mode 100755 index 0000000..df1677b --- /dev/null +++ b/test_archivemail.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python +############################################################################ +# Copyright (C) 2002  Paul Rodger <paul@paulrodger.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA +############################################################################ +""" +Test archivemail works correctly using 'pyunit'. +""" + +import fcntl +import filecmp +import os +import shutil +import stat +import tempfile +import time +import unittest + +import archivemail + +__version__ = """$Id$""" + +############ Mbox Class testing ############## + +class TestMboxIsEmpty(unittest.TestCase): +    def setUp(self): +        self.empty_name = make_mbox(messages=0) +        self.not_empty_name = make_mbox(messages=1) + +    def testEmpty(self): +        mbox = archivemail.Mbox(self.empty_name) +        assert(mbox.is_empty()) + +    def testNotEmpty(self): +        mbox = archivemail.Mbox(self.not_empty_name) +        assert(not mbox.is_empty()) + +    def tearDown(self): +        if os.path.exists(self.empty_name): +            os.remove(self.empty_name) +        if os.path.exists(self.not_empty_name): +            os.remove(self.not_empty_name) + + +class TestMboxLeaveEmpty(unittest.TestCase): +    def setUp(self): +        self.mbox_name = make_mbox() +        self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.mbox = archivemail.Mbox(self.mbox_name) + +    def testLeaveEmpty(self): +        self.mbox.leave_empty() +        assert(os.path.isfile(self.mbox_name)) +        self.assertEqual(os.path.getsize(self.mbox_name), 0) +        new_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.assertEqual(new_mode, self.mbox_mode) + +    def tearDown(self): +        if os.path.exists(self.mbox_name): +            os.remove(self.mbox_name) + + +class TestMboxProcmailLock(unittest.TestCase): +    def setUp(self): +        self.mbox_name = make_mbox() +        self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.mbox = archivemail.Mbox(self.mbox_name) + +    def testProcmailLock(self): +        lock = self.mbox_name + ".lock" +        self.mbox.procmail_lock() +        assert(os.path.isfile(lock)) +        assert(is_world_readable(lock)) +        self.mbox.procmail_unlock() +        assert(not os.path.isfile(lock)) + +    # TODO: add a test where the lock already exists + +    def tearDown(self): +        if os.path.exists(self.mbox_name): +            os.remove(self.mbox_name) + + +class TestMboxRemove(unittest.TestCase): +    def setUp(self): +        self.mbox_name = make_mbox() +        self.mbox = archivemail.Mbox(self.mbox_name) + +    def testProcmailLock(self): +        assert(os.path.exists(self.mbox_name)) +        self.mbox.remove() +        assert(not os.path.exists(self.mbox_name)) + +    def tearDown(self): +        if os.path.exists(self.mbox_name): +            os.remove(self.mbox_name) + + +class TestMboxExclusiveLock(unittest.TestCase): +    def setUp(self): +        self.mbox_name = make_mbox() +        self.mbox = archivemail.Mbox(self.mbox_name) + +    def testExclusiveLock(self): +        self.mbox.exclusive_lock() +        file = open(self.mbox_name, "r+") +        lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB +        self.assertRaises(IOError, fcntl.flock, file, lock_nb) + +        self.mbox.exclusive_unlock() +        fcntl.flock(file, lock_nb) +        fcntl.flock(file, fcntl.LOCK_UN) + +    def tearDown(self): +        if os.path.exists(self.mbox_name): +            os.remove(self.mbox_name) + + +class TestMboxNext(unittest.TestCase): +    def setUp(self): +        self.not_empty_name = make_mbox(messages=18) +        self.empty_name = make_mbox(messages=0) + +    def testNextEmpty(self): +        mbox = archivemail.Mbox(self.empty_name) +        msg = mbox.next() +        self.assertEqual(msg, None) + +    def testNextNotEmpty(self): +        mbox = archivemail.Mbox(self.not_empty_name) +        for count in range(18): +            msg = mbox.next() +            assert(msg) +        msg = mbox.next() +        self.assertEqual(msg, None) + +    def tearDown(self): +        if os.path.exists(self.not_empty_name): +            os.remove(self.not_empty_name) +        if os.path.exists(self.empty_name): +            os.remove(self.empty_name) + + +class TestMboxWrite(unittest.TestCase): +    def setUp(self): +        self.mbox_read = make_mbox(messages=3) +        self.mbox_write = make_mbox(messages=0) + +    def testWrite(self): +        read = archivemail.Mbox(self.mbox_read) +        write = archivemail.Mbox(self.mbox_write, mode="w") +        for count in range(3): +            msg = read.next() +            write.write(msg) +        read.close() +        write.close() +        assert(filecmp.cmp(self.mbox_read, self.mbox_write)) + +    def testWriteNone(self): +        write = archivemail.Mbox(self.mbox_write, mode="w") +        self.assertRaises(AssertionError, write.write, None) + +    def tearDown(self): +        if os.path.exists(self.mbox_write): +            os.remove(self.mbox_write) +        if os.path.exists(self.mbox_read): +            os.remove(self.mbox_read) + + +########## generic routine testing ################# + + +class TestIsTooOld(unittest.TestCase): +    def testOld(self): +        time_msg = time.time() - (15 * 24 * 60 * 60) # 15 days old +        assert(archivemail.is_too_old(time_message=time_msg, max_days=14)) + +    def testJustOld(self): +        time_msg = time.time() - (25 * 60 * 60) # 25 hours old +        assert(archivemail.is_too_old(time_message=time_msg, max_days=1)) + +    def testNotOld(self): +        time_msg = time.time() - (8 * 24 * 60 * 60) # 8 days old +        assert(not archivemail.is_too_old(time_message=time_msg, max_days=9)) + +    def testJustNotOld(self): +        time_msg = time.time() - (23 * 60 * 60) # 23 hours old +        assert(not archivemail.is_too_old(time_message=time_msg, max_days=1)) + +    def testFuture(self): +        time_msg = time.time() + (1 * 24 * 60 * 60) # tomorrow +        assert(not archivemail.is_too_old(time_message=time_msg, max_days=1)) + + +class TestChooseTempDir(unittest.TestCase): +    def setUp(self): +        self.output_dir = tempfile.mktemp() +        os.mkdir(self.output_dir) +        self.sub_dir = tempfile.mktemp() +        os.mkdir(self.sub_dir) + +    def testCurrentDir(self): +        archivemail._options.output_dir = None +        dir = archivemail.choose_temp_dir("dummy") +        self.assertEqual(dir, os.curdir) + +    def testSubDir(self): +        archivemail._options.output_dir = None +        dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy")) +        self.assertEqual(dir, self.sub_dir) + +    def testOutputDir(self): +        archivemail._options.output_dir = self.output_dir +        dir = archivemail.choose_temp_dir("dummy") +        self.assertEqual(dir, self.output_dir) + +    def testSubDirOutputDir(self): +        archivemail._options.output_dir = self.output_dir +        dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy")) +        self.assertEqual(dir, self.output_dir) + +    def tearDown(self): +        os.rmdir(self.output_dir) +        os.rmdir(self.sub_dir) + + +########## proper archival testing ########### + +class TestArchiveMboxTimestampNew(unittest.TestCase): +    def setUp(self): +        self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) +        self.mtime = os.path.getmtime(self.mbox_name) - 66 +        self.atime = os.path.getatime(self.mbox_name) - 88 +        os.utime(self.mbox_name, (self.atime, self.mtime)) +        archivemail._options.quiet = 1 + +    def testTime(self): +        archivemail._options.compressor = "gzip" +        archivemail.archive(self.mbox_name) +        assert(os.path.exists(self.mbox_name)) +        new_atime = os.path.getatime(self.mbox_name) +        new_mtime = os.path.getmtime(self.mbox_name) +        self.assertEqual(self.mtime, new_mtime) +        self.assertEqual(self.atime, new_atime) + +    def tearDown(self): +        if os.path.exists(self.mbox_name): +            os.remove(self.mbox_name) +        archivemail._options.quiet = 0 + + +class TestArchiveMboxTimestampOld(unittest.TestCase): +    def setUp(self): +        self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) +        self.mtime = os.path.getmtime(self.mbox_name) - 66 +        self.atime = os.path.getatime(self.mbox_name) - 88 +        os.utime(self.mbox_name, (self.atime, self.mtime)) +        archivemail._options.quiet = 1 + +    def testTime(self): +        archivemail._options.compressor = "gzip" +        archive_name = self.mbox_name + "_archive.gz" +        archivemail.archive(self.mbox_name) +        assert(os.path.exists(self.mbox_name)) +        new_atime = os.path.getatime(self.mbox_name) +        new_mtime = os.path.getmtime(self.mbox_name) +        self.assertEqual(self.mtime, new_mtime) +        self.assertEqual(self.atime, new_atime) + +    def tearDown(self): +        if os.path.exists(self.mbox_name): +            os.remove(self.mbox_name) +        for ext in (".gz", ".bz2", ".Z"): +            if os.path.exists(self.mbox_name + ext): +                os.remove(self.mbox_name + ext) +        archivemail._options.quiet = 0 + + +class TestArchiveMboxOld(unittest.TestCase): +    def setUp(self): +        self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) +        self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.copy_name = tempfile.mktemp() +        shutil.copyfile(self.mbox_name, self.copy_name) +        archivemail._options.quiet = 1 + +    def testArchiveOldGzip(self): +        archivemail._options.compressor = "gzip" +        archivemail.archive(self.mbox_name) +        assert(os.path.exists(self.mbox_name)) +        self.assertEqual(os.path.getsize(self.mbox_name), 0) +        new_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.assertEqual(self.mbox_mode, new_mode) + +        archive_name = self.mbox_name + "_archive.gz" +        assert(os.path.exists(archive_name)) +        os.system("gzip -d " + archive_name) + +        archive_name = self.mbox_name + "_archive" +        assert(os.path.exists(archive_name)) +        assert(filecmp.cmp(archive_name, self.copy_name)) +        self.tearDown() +        self.setUp() + +    def testArchiveOldBzip2(self): +        archivemail._options.compressor = "bzip2" +        archivemail.archive(self.mbox_name) +        assert(os.path.exists(self.mbox_name)) +        self.assertEqual(os.path.getsize(self.mbox_name), 0) +        new_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.assertEqual(self.mbox_mode, new_mode) + +        archive_name = self.mbox_name + "_archive.bz2" +        assert(os.path.exists(archive_name)) +        os.system("bzip2 -d " + archive_name) + +        archive_name = self.mbox_name + "_archive" +        assert(os.path.exists(archive_name)) +        assert(filecmp.cmp(archive_name, self.copy_name)) +        self.tearDown() +        self.setUp() + +    def testArchiveOldCompress(self): +        archivemail._options.compressor = "compress" +        archivemail.archive(self.mbox_name) +        assert(os.path.exists(self.mbox_name)) +        self.assertEqual(os.path.getsize(self.mbox_name), 0) +        new_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.assertEqual(self.mbox_mode, new_mode) + +        archive_name = self.mbox_name + "_archive.Z" +        assert(os.path.exists(archive_name)) +        os.system("compress -d " + archive_name) + +        archive_name = self.mbox_name + "_archive" +        assert(os.path.exists(archive_name)) +        assert(filecmp.cmp(archive_name, self.copy_name)) +        self.tearDown() +        self.setUp() + +    def tearDown(self): +        if os.path.exists(self.mbox_name): +            os.remove(self.mbox_name) +        for ext in (".gz", ".bz2", ".Z"): +            if os.path.exists(self.mbox_name + ext): +                os.remove(self.mbox_name + ext) +        if os.path.exists(self.copy_name): +            os.remove(self.copy_name) +        archivemail._options.quiet = 0 + + +class TestArchiveMboxNew(unittest.TestCase): +    def setUp(self): +        archivemail._options.quiet = 1 +        self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) +        self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.copy_name = tempfile.mktemp() +        shutil.copyfile(self.mbox_name, self.copy_name) + +    def testArchiveNew(self): +        archivemail.archive(self.mbox_name) +        assert(os.path.exists(self.mbox_name)) +        assert(filecmp.cmp(self.mbox_name, self.copy_name)) +        new_mode = os.stat(self.mbox_name)[stat.ST_MODE] +        self.assertEqual(self.mbox_mode, new_mode) +         +        archive_name = self.mbox_name + "_archive.gz" +        assert(not os.path.exists(archive_name)) + +    def tearDown(self): +        archivemail._options.quiet = 0 +        if os.path.exists(self.mbox_name): +            os.remove(self.mbox_name) +        if os.path.exists(self.copy_name): +            os.remove(self.copy_name) + + +########## helper routines ############ + +def make_message(hours_old=0): +    time_message = time.time() - (60 * 60 * hours_old) +    time_string = time.asctime(time.localtime(time_message)) + +    return """From sender@domain %s +From: sender@domain +To: receipient@domain +Subject: This is a dummy message +Date: %s + +This is the message body. +It's very exciting. + + +""" % (time_string, time_string) + +def make_mbox(messages=1, hours_old=0): +    name = tempfile.mktemp() +    file = open(name, "w") +    for count in range(messages): +        file.write(make_message(hours_old=hours_old)) +    file.close() +    return name +     +def is_world_readable(path): +    """Return true if the path is world-readable, false otherwise""" +    assert(path) +    return (os.stat(path)[stat.ST_MODE] & stat.S_IROTH) + + +if __name__ == "__main__": +    unittest.main()  | 
