#! /usr/bin/env python
############################################################################
# Copyright (C) 2002  Paul Rodger <paul@paulrodger.com>
#           (C) 2006-2010  Nikolaus Schulz <microschulz@web.de>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
############################################################################
"""
Unit-test archivemail using 'PyUnit'.

TODO: add tests for:
    * dotlock locks already existing
    * archiving MH-format mailboxes
    * a 3rd party process changing the mbox file being read

"""

import sys

def check_python_version(): 
    """Abort if we are running on python < v2.3"""
    too_old_error = "This test script requires python version 2.3 or later. " + \
      "Your version of python is:\n%s" % sys.version
    try: 
        version = sys.version_info  # we might not even have this function! :)
        if (version[0] < 2) or (version[0] == 2 and version[1] < 3):
            print too_old_error
            sys.exit(1)
    except AttributeError:
        print too_old_error
        sys.exit(1)

# define & run this early because 'unittest' requires Python >= 2.1
check_python_version()  

import copy
import fcntl
import filecmp
import os
import re
import shutil
import stat
import tempfile
import time
import unittest
import gzip
import cStringIO
import rfc822
import mailbox

try:
    import archivemail
except ImportError:
    print "The archivemail script needs to be called 'archivemail.py'"
    print "and should be in the current directory in order to be imported"
    print "and tested. Sorry."
    if os.path.isfile("archivemail"):
        print "Try renaming it from 'archivemail' to 'archivemail.py'."
    sys.exit(1)

# We want to iterate over messages in a compressed archive mbox and verify
# them.  This involves seeking in the mbox.  The gzip.Gzipfile.seek() in
# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered
# by mailbox._PartialFile.seek().  The bug is still pending as of Python
# 2.5.2.  To work around it, we subclass gzip.GzipFile.
#
# It should be noted that seeking backwards in a GzipFile is emulated by
# re-reading the entire file from the beginning, which is extremely
# inefficient and won't work with large files; but our test archives are all
# small, so it's okay.

class FixedGzipFile(gzip.GzipFile):
    """GzipFile with seek method accepting whence parameter."""
    def seek(self, offset, whence=0):
        try:
            gzip.GzipFile.seek(self, offset, whence)
        except TypeError:
            if whence:
                if whence == 1:
                    offset = self.offset + offset
                else:
                    raise ValueError('Seek from end not supported')
            gzip.GzipFile.seek(self, offset)

# precision of os.utime() when restoring mbox timestamps
utimes_precision = 5

class MessageIdFactory:
    """Factory to create `uniqe' message-ids."""
    def __init__(self):
        self.seq = 0
    def __call__(self):
        self.seq += 1
        return "<archivemail%d@localhost>" % self.seq

make_msgid = MessageIdFactory()

class IndexedMailboxDir:
    """An indexed mailbox directory, providing random message access by
    message-id. Intended as a base class for a maildir and an mh subclass."""

    def __init__(self, mdir_name):
        assert tempfile.tempdir
        self.root = tempfile.mkdtemp(prefix=mdir_name)
        self.msg_id_dict = {}
        self.deliveries = 0

    def _add_to_index(self, msg_text, fpath):
        """Add the given message to the index, for later random access."""
        # Extract the message-id as index key
        msg_id = None
        fp = cStringIO.StringIO(msg_text)
        while True:
            line = fp.readline()
            # line empty means we didn't find a message-id
            assert line
            if line.lower().startswith("message-id:"):
                msg_id = line.split(":", 1)[-1].strip()
                assert msg_id
                break
        assert not self.msg_id_dict.has_key(msg_id)
        self.msg_id_dict[msg_id] = fpath

    def get_all_filenames(self):
        """Return all relative pathnames of files in this mailbox."""
        return self.msg_id_dict.values()

class SimpleMaildir(IndexedMailboxDir):
    """Primitive Maildir class, just good enough for generating short-lived
    test maildirs."""

    def __init__(self, mdir_name='maildir'):
        IndexedMailboxDir.__init__(self, mdir_name)
        for d in "cur", "tmp", "new":
            os.mkdir(os.path.join(self.root, d))

    def write(self, msg_str, new=True, flags=[]):
        """Store a message with the given flags."""
        assert not (new and flags)
        if new:
            subdir = "new"
        else:
            subdir = "cur"
        fname = self._mkname(new, flags)
        relpath = os.path.join(subdir, fname)
        path = os.path.join(self.root, relpath)
        assert not os.path.exists(path)
        f = open(path, "w")
        f.write(msg_str)
        f.close()
        self._add_to_index(msg_str, relpath)

    def _mkname(self, new, flags):
        """Generate a unique filename for a new message."""
        validflags = 'DFPRST'
        for f in flags:
            assert f in validflags
        # This 'unique' name should be good enough, since nobody else
        # will ever write messages to this maildir folder.
        uniq = str(self.deliveries)
        self.deliveries += 1
        if new:
            return uniq
        if not flags:
            return uniq + ':2,'
        finfo = "".join(sorted(flags))
        return uniq + ':2,' + finfo

    def get_message_and_mbox_status(self, msgid):
        """For the Message-Id msgid, return the matching message in text
        format and its status, expressed as a set of mbox flags."""
        fpath = self.msg_id_dict[msgid] # Barfs if not found
        mdir_flags = fpath.rsplit('2,', 1)[-1]
        flagmap = {
                'F': 'F',
                'R': 'A',
                'S': 'R'
        }
        mbox_flags = set([flagmap[x] for x in mdir_flags])
        if fpath.startswith("cur/"):
            mbox_flags.add('O')
        fp = open(os.path.join(self.root, fpath), "r")
        msg = fp.read()
        fp.close()
        return msg, mbox_flags


class TestCaseInTempdir(unittest.TestCase):
    """Base class for testcases that need to create temporary files. 
    All testcases that create temporary files should be derived from this
    class, not directly from unittest.TestCase.
    TestCaseInTempdir provides these methods:
    
    setUp()     Creates a safe temporary directory and sets tempfile.tempdir.
                
    tearDown()  Recursively removes the temporary directory and unsets
                tempfile.tempdir.

    Overriding methods should call the ones above."""
    temproot = None

    def setUp(self):
        if not self.temproot:
            assert not tempfile.tempdir
            self.temproot = tempfile.tempdir = \
                tempfile.mkdtemp(prefix="test-archivemail")
     
    def tearDown(self):
        assert tempfile.tempdir == self.temproot
        if self.temproot:
            shutil.rmtree(self.temproot)
            tempfile.tempdir = self.temproot = None


############ Mbox Class testing ##############

class TestMboxDotlock(TestCaseInTempdir):
    def setUp(self):
        super(TestMboxDotlock, self).setUp()
        self.mbox_name = make_mbox()
        self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
        self.mbox = archivemail.Mbox(self.mbox_name)

    def testDotlock(self):
        """dotlock_lock/unlock should create/delete a lockfile"""
        lock = self.mbox_name + ".lock"
        self.mbox._dotlock_lock()
        assert os.path.isfile(lock)
        self.mbox._dotlock_unlock()
        assert not os.path.isfile(lock)

    def testDotlockingSucceedsUponEACCES(self):
        """A dotlock should silently be omitted upon EACCES."""
        archivemail.options.quiet = True
        mbox_dir = os.path.dirname(self.mbox_name)
        os.chmod(mbox_dir, 0500)
        try:
            self.mbox._dotlock_lock()
        finally:
            os.chmod(mbox_dir, 0700)
            archivemail.options.quiet = False

class TestMboxPosixLock(TestCaseInTempdir):
    def setUp(self):
        super(TestMboxPosixLock, self).setUp()
        self.mbox_name = make_mbox()
        self.mbox = archivemail.Mbox(self.mbox_name)

    def testPosixLock(self):
        """posix_lock/unlock should create/delete an advisory lock"""
        
        # The following code snippet heavily lends from the Python 2.5 mailbox
        # unittest.
        # BEGIN robbery:

        # Fork off a subprocess that will lock the file for 2 seconds,
        # unlock it, and then exit.
        pid = os.fork()
        if pid == 0:
            # In the child, lock the mailbox.
            self.mbox._posix_lock()
            time.sleep(2)
            self.mbox._posix_unlock()
            os._exit(0)

        # In the parent, sleep a bit to give the child time to acquire
        # the lock.
        time.sleep(0.5)
        # The parent's file self.mbox.mbox_file shares fcntl locks with the
        # duplicated FD in the child; reopen it so we get a different file
        # table entry.
        file = open(self.mbox_name, "r+")
        lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB
        fd = file.fileno()
        try:
            self.assertRaises(IOError, fcntl.lockf, fd, lock_nb)

        finally:
            # Wait for child to exit.  Locking should now succeed.
            exited_pid, status = os.waitpid(pid, 0)

        fcntl.lockf(fd, lock_nb)
        fcntl.lockf(fd, fcntl.LOCK_UN)
        # END robbery


class TestMboxNext(TestCaseInTempdir):
    def setUp(self):
        super(TestMboxNext, self).setUp()
        self.not_empty_name = make_mbox(messages=18)
        self.empty_name = make_mbox(messages=0)

    def testNextEmpty(self):
        """mbox.next() should return None on an empty mailbox"""
        mbox = archivemail.Mbox(self.empty_name)
        msg = mbox.next()
        self.assertEqual(msg, None)

    def testNextNotEmpty(self):
        """mbox.next() should a message on a populated mailbox"""
        mbox = archivemail.Mbox(self.not_empty_name)
        for count in range(18):
            msg = mbox.next()
            assert msg
        msg = mbox.next()
        self.assertEqual(msg, None)


############ TempMbox Class testing ##############

class TestTempMboxWrite(TestCaseInTempdir):
    def setUp(self):
        super(TestTempMboxWrite, self).setUp()

    def testWrite(self):
        """mbox.write() should append messages to a mbox mailbox"""
        read_file = make_mbox(messages=3)
        mbox_read = archivemail.Mbox(read_file)
        mbox_write = archivemail.TempMbox()
        write_file = mbox_write.mbox_file_name
        for count in range(3):
            msg = mbox_read.next()
            mbox_write.write(msg)
        mbox_read.close()
        mbox_write.close()
        assert filecmp.cmp(read_file, write_file, shallow=0)

    def testWriteNone(self):
        """calling mbox.write() with no message should raise AssertionError"""
        write = archivemail.TempMbox()
        self.assertRaises(AssertionError, write.write, None)

class TestTempMboxRemove(TestCaseInTempdir):
    def setUp(self):
        super(TestTempMboxRemove, self).setUp()
        self.mbox = archivemail.TempMbox()
        self.mbox_name = self.mbox.mbox_file_name

    def testMboxRemove(self):
        """remove() should delete a mbox mailbox"""
        assert os.path.exists(self.mbox_name)
        self.mbox.remove()
        assert not os.path.exists(self.mbox_name)



########## options class testing #################

class TestOptionDefaults(unittest.TestCase):
    def testVerbose(self):
        """verbose should be off by default"""
        self.assertEqual(archivemail.options.verbose, False)

    def testDaysOldMax(self):
        """default archival time should be 180 days"""
        self.assertEqual(archivemail.options.days_old_max, 180)

    def testQuiet(self):
        """quiet should be off by default"""
        self.assertEqual(archivemail.options.quiet, False)

    def testDeleteOldMail(self):
        """we should not delete old mail by default"""
        self.assertEqual(archivemail.options.delete_old_mail, False)

    def testNoCompress(self):
        """no-compression should be off by default"""
        self.assertEqual(archivemail.options.no_compress, False)

    def testIncludeFlagged(self):
        """we should not archive flagged messages by default"""
        self.assertEqual(archivemail.options.include_flagged, False)

    def testPreserveUnread(self):
        """we should not preserve unread messages by default"""
        self.assertEqual(archivemail.options.preserve_unread, False)

class TestOptionParser(unittest.TestCase):
    def setUp(self):
        self.oldopts = copy.copy(archivemail.options)

    def testOptionDate(self):
        """--date and -D options are parsed correctly"""
        date_formats = (
            "%Y-%m-%d",  # ISO format
            "%d %b %Y" , # Internet format
            "%d %B %Y" , # Internet format with full month names
        )
        date = time.strptime("2000-07-29", "%Y-%m-%d")
        unixdate = time.mktime(date)
        for df in date_formats:
            d = time.strftime(df, date)
            for opt in '-D', '--date=':
                archivemail.options.date_old_max = None
                archivemail.options.parse_args([opt+d], "")
                self.assertEqual(unixdate, archivemail.options.date_old_max)

    def testOptionPreserveUnread(self):
        """--preserve-unread option is parsed correctly"""
        archivemail.options.parse_args(["--preserve-unread"], "")
        assert archivemail.options.preserve_unread
        archivemail.options.preserve_unread = False
        archivemail.options.parse_args(["-u"], "")
        assert archivemail.options.preserve_unread

    def testOptionSuffix(self):
        """--suffix and -s options are parsed correctly"""
        for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
            archivemail.options.parse_args(["--suffix="+suffix], "")
            assert archivemail.options.archive_suffix == suffix
            archivemail.options.suffix = None
            archivemail.options.parse_args(["-s", suffix], "")
            assert archivemail.options.archive_suffix == suffix

    def testOptionDryrun(self):
        """--dry-run option is parsed correctly"""
        archivemail.options.parse_args(["--dry-run"], "")
        assert archivemail.options.dry_run
        archivemail.options.preserve_unread = False
        archivemail.options.parse_args(["-n"], "")
        assert archivemail.options.dry_run

    def testOptionDays(self):
        """--days and -d options are parsed correctly"""
        archivemail.options.parse_args(["--days=11"], "")
        self.assertEqual(archivemail.options.days_old_max, 11)
        archivemail.options.days_old_max = None
        archivemail.options.parse_args(["-d11"], "")
        self.assertEqual(archivemail.options.days_old_max, 11)

    def testOptionDelete(self):
        """--delete option is parsed correctly"""
        archivemail.options.parse_args(["--delete"], "")
        assert archivemail.options.delete_old_mail

    def testOptionCopy(self):
        """--copy option is parsed correctly"""
        archivemail.options.parse_args(["--copy"], "")
        assert archivemail.options.copy_old_mail

    def testOptionOutputdir(self):
        """--output-dir and -o options are parsed correctly"""
        for path in "/just/some/path", "relative/path":
            archivemail.options.parse_args(["--output-dir=%s" % path], "")
            self.assertEqual(archivemail.options.output_dir, path)
            archivemail.options.output_dir = None
            archivemail.options.parse_args(["-o%s" % path], "")
            self.assertEqual(archivemail.options.output_dir, path)

    def testOptionNocompress(self):
        """--no-compress option is parsed correctly"""
        archivemail.options.parse_args(["--no-compress"], "")
        assert archivemail.options.no_compress

    def testOptionSize(self):
        """--size and -S options are parsed correctly"""
        size = "666"
        archivemail.options.parse_args(["--size=%s" % size ], "")
        self.assertEqual(archivemail.options.min_size, int(size))
        archivemail.options.parse_args(["-S%s" % size ], "")
        self.assertEqual(archivemail.options.min_size, int(size))

    def tearDown(self):
        archivemail.options = self.oldopts

########## archivemail.is_older_than_days() unit testing #################

class TestIsTooOld(unittest.TestCase):
    def testVeryOld(self):
        """with max_days=360, should be true for these dates > 1 year"""
        for years in range(1, 10):
            time_msg = time.time() - (years * 365 * 24 * 60 * 60)
            assert archivemail.is_older_than_days(time_message=time_msg,
                max_days=360)

    def testOld(self):
        """with max_days=14, should be true for these dates > 14 days"""
        for days in range(14, 360):
            time_msg = time.time() - (days * 24 * 60 * 60)
            assert archivemail.is_older_than_days(time_message=time_msg,
                max_days=14)

    def testJustOld(self):
        """with max_days=1, should be true for these dates >= 1 day"""
        for minutes in range(0, 61):
            time_msg = time.time() - (25 * 60 * 60) + (minutes * 60)
            assert archivemail.is_older_than_days(time_message=time_msg,
                max_days=1)

    def testNotOld(self):
        """with max_days=9, should be false for these dates < 9 days"""
        for days in range(0, 9):
            time_msg = time.time() - (days * 24 * 60 * 60)
            assert not archivemail.is_older_than_days(time_message=time_msg,
                max_days=9)

    def testJustNotOld(self):
        """with max_days=1, should be false for these hours <= 1 day"""
        for minutes in range(0, 60):
            time_msg = time.time() - (23 * 60 * 60) - (minutes * 60)
            assert not archivemail.is_older_than_days(time_message=time_msg,
                max_days=1)

    def testFuture(self):
        """with max_days=1, should be false for times in the future"""
        for minutes in range(0, 60):
            time_msg = time.time() + (minutes * 60)
            assert not archivemail.is_older_than_days(time_message=time_msg,
                max_days=1)

########## archivemail.parse_imap_url() unit testing #################

class TestParseIMAPUrl(unittest.TestCase): 
    def setUp(self):
        archivemail.options.quiet = True
        archivemail.options.verbose = False
        archivemail.options.pwfile = None
        
    urls_withoutpass = [
            ('imaps://user@example.org@imap.example.org/upperbox/lowerbox',
                ('user', None, 'example.org@imap.example.org',
                'upperbox/lowerbox')), 
            ('imaps://"user@example.org"@imap.example.org/upperbox/lowerbox',
                ('user@example.org', None, 'imap.example.org',
                'upperbox/lowerbox')), 
            ('imaps://user@example.org"@imap.example.org/upperbox/lowerbox',
                ('user', None, 'example.org"@imap.example.org',
                'upperbox/lowerbox')), 
            ('imaps://"user@example.org@imap.example.org/upperbox/lowerbox',
                ('"user', None, 'example.org@imap.example.org',
                'upperbox/lowerbox')), 
            ('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox',
                ('us"er@example.org', None, 'imap.example.org',
                'upperbox/lowerbox')), 
            ('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox',
                ('user\\', None, 'example.org@imap.example.org',
                'upperbox/lowerbox'))
    ]
    urls_withpass = [
            ('imaps://user@example.org:passwd@imap.example.org/upperbox/lowerbox',
                ('user@example.org', 'passwd', 'imap.example.org',
                'upperbox/lowerbox'), 
                ('user', None, 'example.org:passwd@imap.example.org',
                'upperbox/lowerbox')), 
            ('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox',
                ('"user@example.org', "passwd", 'imap.example.org',
                'upperbox/lowerbox'), 
                ('"user', None, 'example.org:passwd@imap.example.org',
                'upperbox/lowerbox')), 
            ('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox', 
                ('u\\ser\\@example.org', 'p@sswd', 'imap.example.org',
                'upperbox/lowerbox'),
                ('u\\ser\\', None, 'example.org:"p@sswd"@imap.example.org',
                'upperbox/lowerbox'))
    ]
    # These are invalid when the password's not stripped. 
    urls_onlywithpass = [
            ('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox',
                ('user@example.org', "passwd", 'imap.example.org',
                'upperbox/lowerbox'))
    ]
    def testUrlsWithoutPwfile(self):
        """Parse test urls with --pwfile option unset. This parses a password in
        the URL, if present."""
        archivemail.options.pwfile = None
        for mbstr in self.urls_withpass + self.urls_withoutpass:
            url = mbstr[0][mbstr[0].find('://')+3:]
            result = archivemail.parse_imap_url(url)
            self.assertEqual(result, mbstr[1])

    def testUrlsWithPwfile(self):
        """Parse test urls with --pwfile set.  In this case the ':' character
        loses its meaning as a delimiter."""
        archivemail.options.pwfile = "whocares.txt"
        for mbstr in self.urls_withpass: 
            url = mbstr[0][mbstr[0].find('://')+3:]
            result = archivemail.parse_imap_url(url)
            self.assertEqual(result, mbstr[2])
        for mbstr in self.urls_onlywithpass: 
            url = mbstr[0][mbstr[0].find('://')+3:]
            self.assertRaises(archivemail.UnexpectedError,
                    archivemail.parse_imap_url, url)

    def tearDown(self): 
        archivemail.options.quiet = False
        archivemail.options.verbose = False
        archivemail.options.pwfile = None

########## acceptance testing ###########

class TestArchive(TestCaseInTempdir):
    """Base class defining helper functions for doing test archiving runs."""
    mbox = None         # mbox file that will be processed by archivemail
    good_archive = None # Uncompressed reference archive file to verify the
                        # archive after processing
    good_mbox = None    # Reference mbox file to verify the mbox after processing

    def verify(self):
        assert os.path.exists(self.mbox)
        if self.good_mbox is not None:
            assertEqualContent(self.mbox, self.good_mbox)
        else:
            self.assertEqual(os.path.getsize(self.mbox), 0)
        archive_name = self.mbox + "_archive"
        if not archivemail.options.no_compress:
            archive_name += ".gz"
            iszipped = True
        else:
            assert not os.path.exists(archive_name + ".gz")
            iszipped = False
        if self.good_archive is not None:
            assertEqualContent(archive_name, self.good_archive, iszipped)
        else:
            assert not os.path.exists(archive_name)

    def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
        """Prepare for a test run with an old mbox by making an old mbox,
        optionally an existing archive, and a reference archive to verify the
        archive after archivemail has run."""
        self.mbox = make_mbox(body, headers, 181*24, messages)
        archive_does_change = not (archivemail.options.dry_run or
                archivemail.options.delete_old_mail)
        mbox_does_not_change = archivemail.options.dry_run or \
                archivemail.options.copy_old_mail
        if make_old_archive:
            archive = archivemail.make_archive_name(self.mbox)
            self.good_archive = make_archive_and_plain_copy(archive)
            if archive_does_change:
                append_file(self.mbox, self.good_archive)
        elif archive_does_change:
            self.good_archive = tempfile.mkstemp()[1]
            shutil.copyfile(self.mbox, self.good_archive)
        if mbox_does_not_change:
            if archive_does_change and not make_old_archive:
                self.good_mbox = self.good_archive
            else:
                self.good_mbox = tempfile.mkstemp()[1]
                shutil.copyfile(self.mbox, self.good_mbox)

    def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
        """Prepare for a test run with a mixed mbox by making a mixed mbox,
        optionally an existing archive, a reference archive to verify the
        archive after archivemail has run, and likewise a reference mbox to
        verify the mbox."""
        self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive)
        new_mbox_name = make_mbox(body, headers, 179*24, messages)
        append_file(new_mbox_name, self.mbox)
        if self.good_mbox is None:
            self.good_mbox = new_mbox_name
        else:
            if self.good_mbox == self.good_archive:
                self.good_mbox = tempfile.mkstemp()[1]
                shutil.copyfile(self.mbox, self.good_mbox)
            else:
                append_file(new_mbox_name, self.good_mbox)

    def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
        """Prepare for a test run with a new mbox by making a new mbox,
        optionally an exiting archive, and a reference mbox to verify the mbox
        after archivemail has run."""
        self.mbox = make_mbox(body, headers, 179*24, messages)
        self.good_mbox = tempfile.mkstemp()[1]
        shutil.copyfile(self.mbox, self.good_mbox)
        if make_old_archive:
            archive = archivemail.make_archive_name(self.mbox)
            self.good_archive = make_archive_and_plain_copy(archive)


class TestArchiveMbox(TestArchive):
    """archiving should work based on the date of messages given"""

    def setUp(self):
        self.oldopts = copy.copy(archivemail.options)
        archivemail.options.quiet = True
        super(TestArchiveMbox, self).setUp()
 
    def testOld(self):
        """archiving an old mailbox"""
        self.make_old_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testOldFromInBody(self):
        """archiving an old mailbox with 'From ' in the body"""
        body = """This is a message with ^From at the start of a line
From is on this line
This is after the ^From line"""
        self.make_old_mbox(messages=3, body=body)
        archivemail.archive(self.mbox)
        self.verify()

    def testDateSystem(self):
        """test that the --date option works as expected"""
        test_headers = (
            {
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
                'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
            },
            {
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
                'Date' : None,
            },
            {
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
                'Date' : None,
                'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
            },
            {
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
                'Date' : None,
                'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
            },
        )
        for headers in test_headers:
            msg = make_message(default_headers=headers, wantobj=True)
            date = time.strptime("2000-07-29", "%Y-%m-%d")
            archivemail.options.date_old_max = time.mktime(date)
            assert archivemail.should_archive(msg)
            date = time.strptime("2000-07-27", "%Y-%m-%d")
            archivemail.options.date_old_max = time.mktime(date)
            assert not archivemail.should_archive(msg)

    def testMixed(self):
        """archiving a mixed mailbox"""
        self.make_mixed_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testNew(self):
        """archiving a new mailbox"""
        self.make_new_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testOldExisting(self):
        """archiving an old mailbox with an existing archive"""
        self.make_old_mbox(messages=3, make_old_archive=True)
        archivemail.archive(self.mbox)
        self.verify()

    def testOldWeirdHeaders(self):
        """archiving old mailboxes with weird headers"""
        weird_headers = (
            {   # we should archive because of the date on the 'From_' line
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
                'Date'  : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000',
            },
            {   # we should archive because of the date on the 'From_' line
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
                'Date'  : None,
            },
            {   # we should archive because of the date in 'Delivery-date'
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
                'Date'  : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
                'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
            },
            {   # we should archive because of the date in 'Delivery-date'
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
                'Date' : None,
                'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
            },
            {   # we should archive because of the date in 'Resent-Date'
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
                'Date'  : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
                'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
            },
            {   # we should archive because of the date in 'Resent-Date'
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
                'Date' : None,
                'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
            },
            {   # completely blank dates were crashing < version 0.4.7
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
                'Date'  : '',
            },
            {   # completely blank dates were crashing < version 0.4.7
                'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
                'Date'  : '',
                'Resent-Date'  : '',
            },
        )
        fd, self.mbox = tempfile.mkstemp()
        fp = os.fdopen(fd, "w")
        for headers in weird_headers:
            msg_text = make_message(default_headers=headers)
            fp.write(msg_text*2)
        fp.close()
        self.good_archive = tempfile.mkstemp()[1]
        shutil.copyfile(self.mbox, self.good_archive)
        archivemail.archive(self.mbox)
        self.verify()

    def tearDown(self):
        archivemail.options = self.oldopts
        super(TestArchiveMbox, self).tearDown()


class TestArchiveMboxTimestamp(TestCaseInTempdir):
    """original mbox timestamps should always be preserved"""
    def setUp(self):
        super(TestArchiveMboxTimestamp, self).setUp() 
        archivemail.options.quiet = True
        self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180))
        self.mtime = os.path.getmtime(self.mbox_name) - 66
        self.atime = os.path.getatime(self.mbox_name) - 88
        os.utime(self.mbox_name, (self.atime, self.mtime))

    def testNew(self):
        """mbox timestamps should not change after no archival"""
        archivemail.options.days_old_max = 181
        archivemail.archive(self.mbox_name)
        self.verify()

    def testOld(self):
        """mbox timestamps should not change after archival"""
        archivemail.options.days_old_max = 179
        archivemail.archive(self.mbox_name)
        self.verify()

    def verify(self):
        assert os.path.exists(self.mbox_name)
        new_atime = os.path.getatime(self.mbox_name)
        new_mtime = os.path.getmtime(self.mbox_name)
        self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision)
        self.assertAlmostEqual(self.atime, new_atime, utimes_precision)

    def tearDown(self):
        archivemail.options.quiet = False
        archivemail.options.days_old_max = 180
        os.remove(self.mbox_name)
        super(TestArchiveMboxTimestamp, self).tearDown()


class TestArchiveMboxAll(unittest.TestCase):
    def setUp(self):
        archivemail.options.quiet = True
        archivemail.options.archive_all = True

    def testNew(self):
        """new messages should be archived with --all"""
        self.msg = make_message(hours_old=24*179, wantobj=True)
        assert archivemail.should_archive(self.msg)

    def testOld(self):
        """old messages should be archived with --all"""
        self.msg = make_message(hours_old=24*181, wantobj=True)
        assert archivemail.should_archive(self.msg)

    def tearDown(self):
        archivemail.options.quiet = False
        archivemail.options.archive_all = False

class TestArchiveMboxPreserveUnread(unittest.TestCase):
    """make sure the 'preserve_unread' option works"""
    def setUp(self):
        archivemail.options.quiet = True
        archivemail.options.preserve_unread = True
        self.msg = make_message(hours_old=24*181, wantobj=True)

    def testOldRead(self):
        """old read messages should be archived with --preserve-unread"""
        self.msg["Status"] = "RO"
        assert archivemail.should_archive(self.msg)

    def testOldUnread(self):
        """old unread messages should not be archived with --preserve-unread"""
        self.msg["Status"] = "O"
        assert not archivemail.should_archive(self.msg)

    def tearDown(self):
        archivemail.options.quiet = False
        archivemail.options.preserve_unread = False


class TestArchiveMboxSuffix(unittest.TestCase):
    """make sure the 'suffix' option works"""
    def setUp(self):
        self.old_suffix = archivemail.options.archive_suffix
        archivemail.options.quiet = True

    def testSuffix(self):
        """archiving with specified --suffix arguments"""
        for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
            mbox_name = "foobar"
            archivemail.options.archive_suffix = suffix
            days_old_max = 180
            parsed_suffix_time = time.time() - days_old_max*24*60*60
            parsed_suffix = time.strftime(suffix,
                time.localtime(parsed_suffix_time))
            archive_name = mbox_name + parsed_suffix
            self.assertEqual(archive_name,
                    archivemail.make_archive_name(mbox_name))

    def tearDown(self):
        archivemail.options.quiet = False
        archivemail.options.archive_suffix = self.old_suffix


class TestArchiveDryRun(TestArchive):
    """make sure the 'dry-run' option works"""
    def setUp(self):
        super(TestArchiveDryRun, self).setUp()
        archivemail.options.quiet = True
        archivemail.options.dry_run = True

    def testOld(self):
        """archiving an old mailbox with the 'dry-run' option"""
        self.make_old_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def tearDown(self):
        archivemail.options.dry_run = False
        archivemail.options.quiet = False
        super(TestArchiveDryRun, self).tearDown()


class TestArchiveDelete(TestArchive):
    """make sure the 'delete' option works"""
    def setUp(self):
        super(TestArchiveDelete, self).setUp()
        archivemail.options.quiet = True
        archivemail.options.delete_old_mail = True

    def testNew(self):
        """archiving a new mailbox with the 'delete' option"""
        self.make_new_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testMixed(self):
        """archiving a mixed mailbox with the 'delete' option"""
        self.make_mixed_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testOld(self):
        """archiving an old mailbox with the 'delete' option"""
        self.make_old_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def tearDown(self):
        archivemail.options.delete_old_mail = False
        archivemail.options.quiet = False
        super(TestArchiveDelete, self).tearDown()


class TestArchiveCopy(TestArchive):
    """make sure the 'copy' option works"""
    def setUp(self):
        super(TestArchiveCopy, self).setUp()
        archivemail.options.quiet = True
        archivemail.options.copy_old_mail = True

    def testNew(self):
        """archiving a new mailbox with the 'copy' option"""
        self.make_new_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testMixed(self):
        """archiving a mixed mailbox with the 'copy' option"""
        self.make_mixed_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testOld(self):
        """archiving an old mailbox with the 'copy' option"""
        self.make_old_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def tearDown(self):
        archivemail.options.copy_old_mail = False
        archivemail.options.quiet = False
        super(TestArchiveCopy, self).tearDown()


class TestArchiveMboxFlagged(unittest.TestCase):
    """make sure the 'include_flagged' option works"""
    def setUp(self):
        archivemail.options.include_flagged = False
        archivemail.options.quiet = True

    def testOld(self):
        """by default, old flagged messages should not be archived"""
        msg = make_message(default_headers={"X-Status": "F"},
                hours_old=24*181, wantobj=True)
        assert not archivemail.should_archive(msg)

    def testIncludeFlaggedNew(self):
        """new flagged messages should not be archived with include_flagged"""
        msg = make_message(default_headers={"X-Status": "F"},
                hours_old=24*179, wantobj=True)
        assert not archivemail.should_archive(msg)

    def testIncludeFlaggedOld(self):
        """old flagged messages should be archived with include_flagged"""
        archivemail.options.include_flagged = True
        msg = make_message(default_headers={"X-Status": "F"},
                hours_old=24*181, wantobj=True)
        assert archivemail.should_archive(msg)

    def tearDown(self):
        archivemail.options.include_flagged = False
        archivemail.options.quiet = False


class TestArchiveMboxOutputDir(unittest.TestCase):
    """make sure that the 'output-dir' option works"""
    def setUp(self):
        archivemail.options.quiet = True

    def testOld(self):
        """archiving an old mailbox with a sepecified output dir"""
        for dir in "/just/a/path", "relative/path":
            archivemail.options.output_dir = dir
            archive_dir = archivemail.make_archive_name("/tmp/mbox")
            self.assertEqual(dir, os.path.dirname(archive_dir))

    def tearDown(self):
        archivemail.options.quiet = False
        archivemail.options.output_dir = None


class TestArchiveMboxUncompressed(TestArchive):
    """make sure that the 'no_compress' option works"""
    mbox_name = None
    new_mbox = None
    old_mbox = None
    copy_name = None

    def setUp(self):
        archivemail.options.quiet = True
        archivemail.options.no_compress = True
        super(TestArchiveMboxUncompressed, self).setUp()

    def testOld(self):
        """archiving an old mailbox uncompressed"""
        self.make_old_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testNew(self):
        """archiving a new mailbox uncompressed"""
        self.make_new_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testMixed(self):
        """archiving a mixed mailbox uncompressed"""
        self.make_mixed_mbox(messages=3)
        archivemail.archive(self.mbox)
        self.verify()

    def testOldExists(self):
        """archiving an old mailbox uncopressed with an existing archive"""
        self.make_old_mbox(messages=3, make_old_archive=True)
        archivemail.archive(self.mbox)
        self.verify()

    def tearDown(self):
        archivemail.options.quiet = False
        archivemail.options.no_compress = False
        super(TestArchiveMboxUncompressed, self).tearDown()


class TestArchiveSize(unittest.TestCase):
    """check that the 'size' argument works"""
    def setUp(self):
        archivemail.options.quiet = True
        msg_text = make_message(hours_old=24*181)
        self.msg_size = len(msg_text)
        fp = cStringIO.StringIO(msg_text)
        self.msg = rfc822.Message(fp)

    def testSmaller(self):
        """giving a size argument smaller than the message"""
        archivemail.options.min_size = self.msg_size - 1
        assert archivemail.should_archive(self.msg)

    def testBigger(self):
        """giving a size argument bigger than the message"""
        archivemail.options.min_size = self.msg_size + 1
        assert not archivemail.should_archive(self.msg)

    def tearDown(self):
        archivemail.options.quiet = False
        archivemail.options.min_size = None


############# Test archiving maildirs ###############

class TestArchiveMailboxdir(TestCaseInTempdir):
    """Base class defining helper functions for doing test archive runs with
    maildirs."""
    maildir = None           # Maildir that will be processed by archivemail
    orig_maildir_obj = None  # A backup copy of the maildir, a SimpleMaildir object
    remaining_msg = set()    # Filenames of maildir messages that should be preserved
    number_archived = 0      # Number of messages that get archived
    orig_archive = None      # An uncompressed copy of a pre-existing archive,
                             # if one exists

    def setUp(self):
        super(TestArchiveMailboxdir, self).setUp()
        self.orig_maildir_obj = SimpleMaildir()

    def verify(self):
        self._verify_remaining()
        self._verify_archive()

    def _verify_remaining(self):
        """Verify that the preserved messages weren't altered."""
        assert self.maildir
        # Compare maildir with backup object.
        dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root)
        # Top-level has only directories cur, new, tmp and must be unchanged.
        self.assertEqual(dcmp.left_list, dcmp.right_list)
        found = set()
        for d in dcmp.common_dirs:
            dcmp2 = dcmp.subdirs[d]
            # We need to verify three things.
            # 1. directory is a subset of the original...
            assert not dcmp2.left_only
            # 2. all common files are identical...
            self.assertEqual(dcmp2.common_files, dcmp2.same_files)
            found = found.union([os.path.join(d, x) for x in dcmp2.common_files])
        # 3. exactly the `new' messages (recorded in self.remaining_msg)
        #    were preserved.
        self.assertEqual(found, self.remaining_msg)

    def _verify_archive(self):
        """Verify the archive correctness."""
        # TODO: currently make_archive_name does not include the .gz suffix.
        # Is this something that should be fixed?
        archive = archivemail.make_archive_name(self.maildir)
        if archivemail.options.no_compress:
            iszipped = False
        else:
            archive += '.gz'
            iszipped = True
        if self.number_archived == 0:
            if self.orig_archive:
                assertEqualContent(archive, self.orig_archive, iszipped)
            else:
                assert not os.path.exists(archive)
            return
        fp_new = fp_archive = tmp_archive_name = None
        try:
            if self.orig_archive:
                new_size = os.path.getsize(archive)
                # Brute force: split archive in old and new part and verify the
                # parts separately.  (Of course this destroys the archive.)
                fp_archive = open(archive, "r+")
                fp_archive.seek(self.orig_archive_size)
                fd, tmp_archive_name = tempfile.mkstemp()
                fp_new = os.fdopen(fd, "w")
                shutil.copyfileobj(fp_archive, fp_new)
                fp_new.close()
                fp_archive.truncate(self.orig_archive_size)
                fp_archive.close()
                assertEqualContent(archive, self.orig_archive, iszipped)
                new_archive = tmp_archive_name
            else:
                new_archive = archive
            if archivemail.options.no_compress:
                fp_archive = open(new_archive, "r")
            else:
                fp_archive = FixedGzipFile(new_archive, "r")
            mb = mailbox.UnixMailbox(fp_archive)
            found = 0
            for msg in mb:
                self.verify_maildir_has_msg(self.orig_maildir_obj, msg)
                found += 1
            self.assertEqual(found, self.number_archived)
        finally:
            if tmp_archive_name:
                os.remove(tmp_archive_name)
            if fp_new is not None:
                fp_new.close()
            if fp_archive is not None:
                fp_archive.close()

    def verify_maildir_has_msg(self, maildir, msg):
        """Assert that the given maildir has a copy of the rfc822 message."""
        mid = msg['Message-Id'] # Complains if there is no message-id
        mdir_msg_str, mdir_flags = \
            maildir.get_message_and_mbox_status(mid)
        mbox_flags = set(msg.get('status', '') + msg.get('x-status', ''))
        self.assertEqual(mdir_flags, mbox_flags)

        headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'),
                msg.headers)
        headers = "".join(headers)
        msg.rewindbody()
        # Discard last mbox LF which is not part of the message.
        body = msg.fp.read()[:-1]
        msg_str = headers + os.linesep + body
        self.assertEqual(mdir_msg_str, msg_str)

    def add_messages(self, body=None, headers=None, hours_old=0, messages=1):
        for count in range(messages):
            msg = make_message(body, default_headers=headers, mkfrom=False,
                    hours_old=hours_old)
            self.orig_maildir_obj.write(msg, new=False)

    def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1,
            make_old_archive=False):
        mailbox_does_change = not (archivemail.options.dry_run or
                archivemail.options.copy_old_mail)
        archive_does_change = not (archivemail.options.dry_run or
                archivemail.options.delete_old_mail)
        if mknew:
            self.add_messages(body, headers, 179*24, messages)
            if archive_does_change and archivemail.options.archive_all:
                self.number_archived += messages
            if mailbox_does_change:
                self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
        if mkold:
            self.add_messages(body, headers, 181*24, messages)
            if archive_does_change:
                self.number_archived += messages
        if not mailbox_does_change:
            self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
        self.maildir = copy_maildir(self.orig_maildir_obj.root)
        if make_old_archive:
            archive = archivemail.make_archive_name(self.maildir)
            self.orig_archive = make_archive_and_plain_copy(archive)
            # FIXME: .gz extension handling is a mess II
            if not archivemail.options.no_compress:
                archive += '.gz'
            self.orig_archive_size = os.path.getsize(archive)

class TestEmptyMaildir(TestCaseInTempdir):
    def setUp(self):
        super(TestEmptyMaildir, self).setUp()
        archivemail.options.quiet = True

    def testEmpty(self):
        """Archiving an empty maildir should not result in an archive."""
        self.mdir = SimpleMaildir()
        archivemail.archive(self.mdir.root)
        assert not os.path.exists(self.mdir.root + '_archive.gz')

    def tearDown(self):
        super(TestEmptyMaildir, self).tearDown()
        archivemail.options.quiet = False

class TestMaildir(TestArchiveMailboxdir):
    def setUp(self):
        super(TestMaildir, self).setUp()
        archivemail.options.quiet = True

    def testOld(self):
        self.make_maildir(True, False, messages=3)
        archivemail.archive(self.maildir)
        self.verify()

    def testNew(self):
        self.make_maildir(False, True, messages=3)
        archivemail.archive(self.maildir)
        self.verify()

    def testMixed(self):
        self.make_maildir(True, True, messages=3)
        archivemail.archive(self.maildir)
        self.verify()

    def testMixedExisting(self):
        self.make_maildir(True, True, messages=3, make_old_archive=True)
        archivemail.archive(self.maildir)
        self.verify()

    def tearDown(self):
        archivemail.options.quiet = False
        super(TestMaildir, self).tearDown()


class TestMaildirPreserveUnread(TestCaseInTempdir):
    """Test if the preserve_unread option works with maildirs."""
    def setUp(self):
        super(TestMaildirPreserveUnread, self).setUp()
        archivemail.options.quiet = True
        archivemail.options.preserve_unread = True

    def testOldRead(self):
        """--preserve-unread archives old read messages in a maildir."""
        smd = SimpleMaildir("orig")
        msg = make_message(hours_old=24*181)
        smd.write(msg, new=False, flags='S')
        md = mailbox.Maildir(smd.root)
        msg_obj = md.next()
        assert archivemail.should_archive(msg_obj)

    def testOldUnread(self):
        """--preserve-unread preserves old unread messages in a maildir."""
        smd = SimpleMaildir("orig")
        msg = make_message(hours_old=24*181)
        smd.write(msg, new=False)
        md = mailbox.Maildir(smd.root)
        msg_obj = md.next()
        assert not archivemail.should_archive(msg_obj)

    def tearDown(self):
        archivemail.options.quiet = False
        archivemail.options.preserve_unread = False
        super(TestMaildirPreserveUnread, self).tearDown()

class TestMaildirAll(TestArchiveMailboxdir):
    def setUp(self):
        super(TestMaildirAll, self).setUp()
        archivemail.options.quiet = True
        archivemail.options.archive_all = True

    def testNew(self):
        """New maildir messages should be archived with --all"""
        self.add_messages(hours_old=24*181)
        md = mailbox.Maildir(self.orig_maildir_obj.root)
        msg_obj = md.next()
        assert archivemail.should_archive(msg_obj)

    def testOld(self):
        """Old maildir messages should be archived with --all"""
        self.add_messages(hours_old=24*179)
        md = mailbox.Maildir(self.orig_maildir_obj.root)
        msg_obj = md.next()
        assert archivemail.should_archive(msg_obj)

    def tearDown(self):
        super(TestMaildirAll, self).tearDown()
        archivemail.options.quiet = False
        archivemail.options.archive_all = False

class TestMaildirDryRun(TestArchiveMailboxdir):
    def setUp(self):
        super(TestMaildirDryRun, self).setUp()
        archivemail.options.quiet = True
        archivemail.options.dry_run = True

    def testOld(self):
        """archiving an old maildir mailbox with the 'dry-run' option"""
        self.make_maildir(True, False)
        archivemail.archive(self.maildir)
        self.verify()

    def tearDown(self):
        super(TestMaildirDryRun, self).tearDown()
        archivemail.options.quiet = False
        archivemail.options.dry_run = False

class TestMaildirDelete(TestArchiveMailboxdir):
    def setUp(self):
        super(TestMaildirDelete, self).setUp()
        archivemail.options.quiet = True
        archivemail.options.delete_old_mail = True

    def testOld(self):
        """archiving an old maildir mailbox with the 'delete' option"""
        self.make_maildir(True, False)
        archivemail.archive(self.maildir)
        self.verify()

    def testNew(self):
        """archiving a new maildir mailbox with the 'delete' option"""
        self.make_maildir(False, True)
        archivemail.archive(self.maildir)
        self.verify()

    def tearDown(self):
        super(TestMaildirDelete, self).tearDown()
        archivemail.options.quiet = False
        archivemail.options.delete_old_mail = False

class TestMaildirCopy(TestArchiveMailboxdir):
    def setUp(self):
        super(TestMaildirCopy, self).setUp()
        archivemail.options.quiet = True
        archivemail.options.copy_old_mail = True

    def testOld(self):
        """archiving an old maildir mailbox with the 'copy' option"""
        self.make_maildir(True, False)
        archivemail.archive(self.maildir)
        self.verify()

    def testNew(self):
        """archiving a new maildir mailbox with the 'copy' option"""
        self.make_maildir(False, True)
        archivemail.archive(self.maildir)
        self.verify()

    def tearDown(self):
        super(TestMaildirCopy, self).tearDown()
        archivemail.options.quiet = False
        archivemail.options.copy_old_mail = False

class TestArchiveMaildirFlagged(TestCaseInTempdir):
    """make sure the 'include_flagged' option works with maildir messages"""
    def setUp(self):
        super(TestArchiveMaildirFlagged, self).setUp()
        archivemail.options.include_flagged = False
        archivemail.options.quiet = True

    def testOld(self):
        """by default, old flagged maildir messages should not be archived"""
        smd = SimpleMaildir("orig")
        msg = make_message(hours_old=24*181)
        smd.write(msg, new=False, flags='F')
        md = mailbox.Maildir(smd.root)
        msg_obj = md.next()
        assert not archivemail.should_archive(msg_obj)

    def testIncludeFlaggedNew(self):
        """new flagged maildir messages should not be archived with include_flagged"""
        smd = SimpleMaildir("orig")
        msg = make_message(hours_old=24*179)
        smd.write(msg, new=False, flags='F')
        md = mailbox.Maildir(smd.root)
        msg_obj = md.next()
        assert not archivemail.should_archive(msg_obj)

    def testIncludeFlaggedOld(self):
        """old flagged maildir messages should be archived with include_flagged"""
        archivemail.options.include_flagged = True
        smd = SimpleMaildir("orig")
        msg = make_message(hours_old=24*181)
        smd.write(msg, new=False, flags='F')
        md = mailbox.Maildir(smd.root)
        msg_obj = md.next()
        assert archivemail.should_archive(msg_obj)

    def tearDown(self):
        super(TestArchiveMaildirFlagged, self).tearDown()
        archivemail.options.include_flagged = False
        archivemail.options.quiet = False

class TestArchiveMaildirSize(TestCaseInTempdir):
    """check that the 'size' argument works with maildir messages"""
    def setUp(self):
        super(TestArchiveMaildirSize, self).setUp()
        archivemail.options.quiet = True
        msg = make_message(hours_old=24*181)
        self.msg_size = len(msg)
        smd = SimpleMaildir("orig")
        smd.write(msg, new=False)
        md = mailbox.Maildir(smd.root)
        self.msg_obj = md.next()

    def testSmaller(self):
        """giving a size argument smaller than the maildir message"""
        archivemail.options.min_size = self.msg_size - 1
        assert archivemail.should_archive(self.msg_obj)

    def testBigger(self):
        """giving a size argument bigger than the maildir message"""
        archivemail.options.min_size = self.msg_size + 1
        assert not archivemail.should_archive(self.msg_obj)

    def tearDown(self):
        super(TestArchiveMaildirSize, self).tearDown()
        archivemail.options.quiet = False
        archivemail.options.min_size = None

########## helper routines ############

def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False):
    headers = copy.copy(default_headers)
    if not headers:
        headers = {}
    headers['Message-Id'] = make_msgid()
    if not headers.has_key('Date'):
        time_message = time.time() - (60 * 60 * hours_old)
        headers['Date'] = time.asctime(time.localtime(time_message))
    if not headers.has_key('From'):
        headers['From'] = "sender@dummy.domain"        
    if not headers.has_key('To'):
        headers['To'] = "receipient@dummy.domain"        
    if not headers.has_key('Subject'):
        headers['Subject'] = "This is the subject"
    if mkfrom and not headers.has_key('From_'):
        headers['From_'] = "%s %s" % (headers['From'], headers['Date'])
    if not body:
        body = "This is the message body"

    msg = ""
    if headers.has_key('From_'):
        msg = msg + ("From %s\n" % headers['From_'])
        del headers['From_']
    for key in headers.keys():
        if headers[key] is not None:
            msg = msg + ("%s: %s\n" % (key, headers[key]))
    msg = msg + "\n\n" + body + "\n\n"
    if not wantobj:
        return msg
    fp = cStringIO.StringIO(msg)
    return rfc822.Message(fp)

def append_file(source, dest):
    """appends the file named 'source' to the file named 'dest'"""
    assert os.path.isfile(source)
    assert os.path.isfile(dest)
    read = open(source, "r")
    write = open(dest, "a+")
    shutil.copyfileobj(read,write)
    read.close()
    write.close()


def make_mbox(body=None, headers=None, hours_old=0, messages=1):
    assert tempfile.tempdir
    fd, name = tempfile.mkstemp()
    file = os.fdopen(fd, "w")
    for count in range(messages):
        msg = make_message(body=body, default_headers=headers, 
            mkfrom=True, hours_old=hours_old)
        file.write(msg)
    file.close()
    return name

def make_archive_and_plain_copy(archive_name):
    """Make an mbox archive of the given name like archivemail may have
    created it.  Also make an uncompressed copy of this archive and return its
    name."""
    copy_fd, copy_name = tempfile.mkstemp()
    copy_fp = os.fdopen(copy_fd, "w")
    if archivemail.options.no_compress:
        fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
        fp = os.fdopen(fd, "w")
    else:
        archive_name += ".gz"
        fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
        rawfp = os.fdopen(fd, "w")
        fp = gzip.GzipFile(fileobj=rawfp)
    for count in range(3):
        msg = make_message(hours_old=24*360)
        fp.write(msg)
        copy_fp.write(msg)
    fp.close()
    copy_fp.close()
    if not archivemail.options.no_compress:
        rawfp.close()
    return copy_name

def copy_maildir(maildir, prefix="tmp"):
    """Create a copy of the given maildir and return the absolute path of the
    new direcory."""
    newdir = tempfile.mkdtemp(prefix=prefix)
    for d in "cur", "new", "tmp":
        shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d))
    return newdir

def assertEqualContent(firstfile, secondfile, zippedfirst=False):
    """Verify that the two files exist and have identical content. If zippedfirst
    is True, assume that firstfile is gzip-compressed."""
    assert os.path.exists(firstfile)
    assert os.path.exists(secondfile)
    if zippedfirst:
        try:
            fp1 = gzip.GzipFile(firstfile, "r")
            fp2 = open(secondfile, "r")
            assert cmp_fileobj(fp1, fp2)
        finally:
            fp1.close()
            fp2.close()
    else:
        assert filecmp.cmp(firstfile, secondfile, shallow=0)

def cmp_fileobj(fp1, fp2):
    """Return if reading the fileobjects yields identical content."""
    bufsize = 8192
    while True:
        b1 = fp1.read(bufsize)
        b2 = fp2.read(bufsize)
        if b1 != b2:
            return False
        if not b1:
            return True

if __name__ == "__main__":
    unittest.main()