aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xarchivemail.py78
-rwxr-xr-xtest_archivemail.py112
2 files changed, 95 insertions, 95 deletions
diff --git a/archivemail.py b/archivemail.py
index a575929..b98a552 100755
--- a/archivemail.py
+++ b/archivemail.py
@@ -103,8 +103,8 @@ class Stats:
compression extension (eg .gz)
"""
- assert(mailbox_name)
- assert(final_archive_name)
+ assert mailbox_name
+ assert final_archive_name
self.__start_time = time.time()
self.__mailbox_name = mailbox_name
self.__archive_name = final_archive_name + ".gz"
@@ -326,7 +326,7 @@ class LockableMboxMixin:
def lock(self):
"""Lock this mbox with both a dotlock and a posix lock."""
- assert(not self._locked)
+ assert not self._locked
attempt = 1
while True:
try:
@@ -347,7 +347,7 @@ class LockableMboxMixin:
def unlock(self):
"""Unlock this mbox."""
- assert(self._locked)
+ assert self._locked
self._dotlock_unlock()
self._posix_unlock()
self._locked = False
@@ -411,7 +411,7 @@ class LockableMboxMixin:
def _dotlock_unlock(self):
"""Delete the dotlock file for the 'mbox' mailbox."""
- assert(self.mbox_file_name)
+ assert self.mbox_file_name
lock_name = self.mbox_file_name + options.lockfile_extension
vprint("removing lockfile '%s'" % lock_name)
os.remove(lock_name)
@@ -425,7 +425,7 @@ class LockableMboxMixin:
def close(self):
"""Close the mbox file"""
vprint("closing file '%s'" % self.mbox_file_name)
- assert(not self._locked)
+ assert not self._locked
self.mbox_file.close()
@@ -440,7 +440,7 @@ class Mbox(mailbox.UnixMailbox, LockableMboxMixin):
Named Arguments:
path -- file name of the 'mbox' file to be opened
"""
- assert(path)
+ assert path
fd = safe_open_existing(path)
st = os.fstat(fd)
self.original_atime = st.st_atime
@@ -453,9 +453,9 @@ class Mbox(mailbox.UnixMailbox, LockableMboxMixin):
def reset_timestamps(self):
"""Set the file timestamps to the original values"""
- assert(self.original_atime)
- assert(self.original_mtime)
- assert(self.mbox_file_name)
+ assert self.original_atime
+ assert self.original_mtime
+ assert self.mbox_file_name
os.utime(self.mbox_file_name, (self.original_atime, \
self.original_mtime))
@@ -481,7 +481,7 @@ class ArchiveMbox(LockableMboxMixin):
def append(self, filename):
"""Append the content of the given file to the mbox."""
- assert(self._locked)
+ assert self._locked
fin = open(filename, "r")
oldsize = os.fstat(self.mbox_file.fileno()).st_size
try:
@@ -516,8 +516,8 @@ class TempMbox:
msg -- rfc822 message object to be written
"""
- assert(msg)
- assert(self.mbox_file)
+ assert msg
+ assert self.mbox_file
self.empty = False
vprint("saving message to file '%s'" % self.mbox_file_name)
@@ -528,13 +528,13 @@ class TempMbox:
msg_has_mbox_format = False
unix_from = make_mbox_from(msg)
self.mbox_file.write(unix_from)
- assert(msg.headers)
+ assert msg.headers
self.mbox_file.writelines(msg.headers)
self.mbox_file.write(os.linesep)
# The following while loop is about twice as fast in
# practice to 'self.mbox_file.writelines(msg.fp.readlines())'
- assert(options.read_buffer_size > 0)
+ assert options.read_buffer_size > 0
linebuf = ""
while 1:
body = msg.fp.read(options.read_buffer_size)
@@ -606,14 +606,14 @@ class IdentityCache:
def __init__(self, mailbox_name):
"""Constructor: takes the mailbox name as an argument"""
- assert(mailbox_name)
+ assert mailbox_name
self.mailbox_name = mailbox_name
def warn_if_dupe(self, msg):
"""Print a warning message if the message has already appeared"""
- assert(msg)
+ assert msg
message_id = msg.get('Message-ID')
- assert(message_id)
+ assert message_id
if self.seen_ids.has_key(message_id):
user_warning("duplicate message id: '%s' in mailbox '%s'" %
(message_id, self.mailbox_name))
@@ -724,11 +724,11 @@ def make_mbox_from(message):
message -- the rfc822 message object
"""
- assert(message)
+ assert message
address = guess_return_path(message)
time_message = guess_delivery_time(message)
date = time.localtime(time_message)
- assert(date)
+ assert date
date_string = time.asctime(date)
mbox_from = "From %s %s\n" % (address, date_string)
return mbox_from
@@ -736,7 +736,7 @@ def make_mbox_from(message):
def guess_return_path(message):
"""Return a guess at the Return Path address of an rfc822 message"""
- assert(message)
+ assert message
for header in ('Return-path', 'From'):
address_header = message.get(header)
@@ -747,13 +747,13 @@ def guess_return_path(message):
# argh, we can't find any valid 'Return-path' guesses - just
# just use the current unix username like mutt does
login = pwd.getpwuid(os.getuid())[0]
- assert(login)
+ assert login
return login
def guess_delivery_time(message):
"""Return a guess at the delivery date of an rfc822 message"""
- assert(message)
+ assert message
# try to guess the delivery date from various headers
# get more desparate as we go through the array
for header in 'Delivery-date', 'Received', 'Resent-Date', 'Date':
@@ -928,7 +928,7 @@ def is_unread(message):
def sizeof_message(message):
"""Return size of message in bytes (octets)."""
- assert(message)
+ assert message
file_name = None
message_size = None
try:
@@ -955,8 +955,8 @@ def sizeof_message(message):
def is_smaller(message, size):
"""Return true if the message is smaller than size bytes, false otherwise"""
- assert(message)
- assert(size > 0)
+ assert message
+ assert size > 0
message_size = sizeof_message(message)
if message_size < size:
vprint("message is too small (%d bytes), minimum bytes : %d" % \
@@ -1064,13 +1064,13 @@ def archive(mailbox_name):
Arguments:
mailbox_name -- the filename/dirname/url of the mailbox to be archived
"""
- assert(mailbox_name)
+ assert mailbox_name
# strip any trailing slash (we could be archiving a maildir or MH format
# mailbox and somebody was pressing <tab> in bash) - we don't want to use
# the trailing slash in the archive name
mailbox_name = mailbox_name.rstrip("/")
- assert(mailbox_name)
+ assert mailbox_name
set_signal_handlers()
os.umask(077) # saves setting permissions on mailboxes/tempfiles
@@ -1099,7 +1099,7 @@ def archive(mailbox_name):
# create a temporary directory for us to work in securely
tempfile.tempdir = None
new_temp_dir = tempfile.mkdtemp('archivemail')
- assert(new_temp_dir)
+ assert new_temp_dir
_stale.temp_dir = new_temp_dir
tempfile.tempdir = new_temp_dir
vprint("set tempfile directory to '%s'" % new_temp_dir)
@@ -1139,8 +1139,8 @@ def _archive_mbox(mailbox_name, final_archive_name):
old messages to - appending if the archive
already exists
"""
- assert(mailbox_name)
- assert(final_archive_name)
+ assert mailbox_name
+ assert final_archive_name
stats = Stats(mailbox_name, final_archive_name)
cache = IdentityCache(mailbox_name)
original = Mbox(path=mailbox_name)
@@ -1215,9 +1215,9 @@ def _archive_mbox(mailbox_name, final_archive_name):
def _archive_dir(mailbox_name, final_archive_name, type):
"""Archive a 'maildir' or 'MH' style mailbox - used by archive_mailbox()"""
- assert(mailbox_name)
- assert(final_archive_name)
- assert(type)
+ assert mailbox_name
+ assert final_archive_name
+ assert type
stats = Stats(mailbox_name, final_archive_name)
delete_queue = []
@@ -1267,8 +1267,8 @@ def _archive_dir(mailbox_name, final_archive_name, type):
def _archive_imap(mailbox_name, final_archive_name):
"""Archive an imap mailbox - used by archive_mailbox()"""
- assert(mailbox_name)
- assert(final_archive_name)
+ assert mailbox_name
+ assert final_archive_name
import imaplib
import cStringIO
import getpass
@@ -1456,7 +1456,7 @@ def imap_getdelim(imap_server):
def imap_get_namespace(srv):
"""Return the IMAP namespace prefixes and hierarchy delimiters."""
- assert('NAMESPACE' in srv.capabilities)
+ assert 'NAMESPACE' in srv.capabilities
result, response = srv.namespace()
if result != 'OK':
unexpected_error("Cannot retrieve IMAP namespace; server says: '%s'"
@@ -1637,7 +1637,7 @@ def make_archive_name(mailbox_name):
def check_sane_destdir(dir):
"""Do a very primitive check if the given directory looks like a reasonable
destination directory and bail out if it doesn't."""
- assert(dir)
+ assert dir
if not os.path.isdir(dir):
user_error("output directory does not exist: '%s'" % dir)
if not os.access(dir, os.W_OK):
@@ -1685,7 +1685,7 @@ def get_filename(msg):
# (msg from mailbox.Maildir, Python >= 2.5)
# File object is msg.fp._file, we do want that.
if msg.fp.__class__ == mailbox._ProxyFile:
- assert(hasattr(mailbox, "_PartialFile"))
+ assert hasattr(mailbox, "_PartialFile")
return msg.fp._file.name
raise
diff --git a/test_archivemail.py b/test_archivemail.py
index 738a5c2..329bd33 100755
--- a/test_archivemail.py
+++ b/test_archivemail.py
@@ -92,12 +92,12 @@ class TestCaseInTempdir(unittest.TestCase):
def setUp(self):
if not self.temproot:
- assert(not tempfile.tempdir)
+ assert not tempfile.tempdir
self.temproot = tempfile.tempdir = \
tempfile.mkdtemp(prefix="test-archivemail")
def tearDown(self):
- assert(tempfile.tempdir == self.temproot)
+ assert tempfile.tempdir == self.temproot
if self.temproot:
shutil.rmtree(self.temproot)
tempfile.tempdir = self.temproot = None
@@ -116,9 +116,9 @@ class TestMboxDotlock(TestCaseInTempdir):
"""dotlock_lock/unlock should create/delete a lockfile"""
lock = self.mbox_name + ".lock"
self.mbox._dotlock_lock()
- assert(os.path.isfile(lock))
+ assert os.path.isfile(lock)
self.mbox._dotlock_unlock()
- assert(not os.path.isfile(lock))
+ assert not os.path.isfile(lock)
def testDotlockingSucceedsUponEACCES(self):
"""A dotlock should silently be omitted upon EACCES."""
@@ -192,7 +192,7 @@ class TestMboxNext(TestCaseInTempdir):
mbox = archivemail.Mbox(self.not_empty_name)
for count in range(18):
msg = mbox.next()
- assert(msg)
+ assert msg
msg = mbox.next()
self.assertEqual(msg, None)
@@ -214,7 +214,7 @@ class TestTempMboxWrite(TestCaseInTempdir):
mbox_write.write(msg)
mbox_read.close()
mbox_write.close()
- assert(filecmp.cmp(read_file, write_file, shallow=0))
+ assert filecmp.cmp(read_file, write_file, shallow=0)
def testWriteNone(self):
"""calling mbox.write() with no message should raise AssertionError"""
@@ -229,9 +229,9 @@ class TestTempMboxRemove(TestCaseInTempdir):
def testMboxRemove(self):
"""remove() should delete a mbox mailbox"""
- assert(os.path.exists(self.mbox_name))
+ assert os.path.exists(self.mbox_name)
self.mbox.remove()
- assert(not os.path.exists(self.mbox_name))
+ assert not os.path.exists(self.mbox_name)
@@ -289,27 +289,27 @@ class TestOptionParser(unittest.TestCase):
def testOptionPreserveUnread(self):
"""--preserve-unread option is parsed correctly"""
archivemail.options.parse_args(["--preserve-unread"], "")
- assert(archivemail.options.preserve_unread)
+ assert archivemail.options.preserve_unread
archivemail.options.preserve_unread = 0
archivemail.options.parse_args(["-u"], "")
- assert(archivemail.options.preserve_unread)
+ assert archivemail.options.preserve_unread
def testOptionSuffix(self):
"""--suffix and -s options are parsed correctly"""
for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
archivemail.options.parse_args(["--suffix="+suffix], "")
- assert(archivemail.options.archive_suffix == suffix)
+ assert archivemail.options.archive_suffix == suffix
archivemail.options.suffix = None
archivemail.options.parse_args(["-s", suffix], "")
- assert(archivemail.options.archive_suffix == suffix)
+ assert archivemail.options.archive_suffix == suffix
def testOptionDryrun(self):
"""--dry-run option is parsed correctly"""
archivemail.options.parse_args(["--dry-run"], "")
- assert(archivemail.options.dry_run)
+ assert archivemail.options.dry_run
archivemail.options.preserve_unread = 0
archivemail.options.parse_args(["-n"], "")
- assert(archivemail.options.dry_run)
+ assert archivemail.options.dry_run
def testOptionDays(self):
"""--days and -d options are parsed correctly"""
@@ -322,12 +322,12 @@ class TestOptionParser(unittest.TestCase):
def testOptionDelete(self):
"""--delete option is parsed correctly"""
archivemail.options.parse_args(["--delete"], "")
- assert(archivemail.options.delete_old_mail)
+ assert archivemail.options.delete_old_mail
def testOptionCopy(self):
"""--copy option is parsed correctly"""
archivemail.options.parse_args(["--copy"], "")
- assert(archivemail.options.copy_old_mail)
+ assert archivemail.options.copy_old_mail
def testOptionOutputdir(self):
"""--output-dir and -o options are parsed correctly"""
@@ -341,7 +341,7 @@ class TestOptionParser(unittest.TestCase):
def testOptionNocompress(self):
"""--no-compress option is parsed correctly"""
archivemail.options.parse_args(["--no-compress"], "")
- assert(archivemail.options.no_compress)
+ assert archivemail.options.no_compress
def testOptionSize(self):
"""--size and -S options are parsed correctly"""
@@ -361,43 +361,43 @@ class TestIsTooOld(unittest.TestCase):
"""with max_days=360, should be true for these dates > 1 year"""
for years in range(1, 10):
time_msg = time.time() - (years * 365 * 24 * 60 * 60)
- assert(archivemail.is_older_than_days(time_message=time_msg,
- max_days=360))
+ assert archivemail.is_older_than_days(time_message=time_msg,
+ max_days=360)
def testOld(self):
"""with max_days=14, should be true for these dates > 14 days"""
for days in range(14, 360):
time_msg = time.time() - (days * 24 * 60 * 60)
- assert(archivemail.is_older_than_days(time_message=time_msg,
- max_days=14))
+ assert archivemail.is_older_than_days(time_message=time_msg,
+ max_days=14)
def testJustOld(self):
"""with max_days=1, should be true for these dates >= 1 day"""
for minutes in range(0, 61):
time_msg = time.time() - (25 * 60 * 60) + (minutes * 60)
- assert(archivemail.is_older_than_days(time_message=time_msg,
- max_days=1))
+ assert archivemail.is_older_than_days(time_message=time_msg,
+ max_days=1)
def testNotOld(self):
"""with max_days=9, should be false for these dates < 9 days"""
for days in range(0, 9):
time_msg = time.time() - (days * 24 * 60 * 60)
- assert(not archivemail.is_older_than_days(time_message=time_msg,
- max_days=9))
+ assert not archivemail.is_older_than_days(time_message=time_msg,
+ max_days=9)
def testJustNotOld(self):
"""with max_days=1, should be false for these hours <= 1 day"""
for minutes in range(0, 60):
time_msg = time.time() - (23 * 60 * 60) - (minutes * 60)
- assert(not archivemail.is_older_than_days(time_message=time_msg,
- max_days=1))
+ assert not archivemail.is_older_than_days(time_message=time_msg,
+ max_days=1)
def testFuture(self):
"""with max_days=1, should be false for times in the future"""
for minutes in range(0, 60):
time_msg = time.time() + (minutes * 60)
- assert(not archivemail.is_older_than_days(time_message=time_msg,
- max_days=1))
+ assert not archivemail.is_older_than_days(time_message=time_msg,
+ max_days=1)
########## archivemail.parse_imap_url() unit testing #################
@@ -469,7 +469,7 @@ class TestParseIMAPUrl(unittest.TestCase):
self.assertEqual(result, mbstr[2])
for mbstr in self.urls_onlywithpass:
url = mbstr[0][mbstr[0].find('://')+3:]
- self.assertRaises(archivemail.UnexpectedError,
+ self.assertRaises(archivemail.UnexpectedError,
archivemail.parse_imap_url, url)
def tearDown(self):
@@ -485,7 +485,7 @@ class TestArchive(TestCaseInTempdir):
good_archive = good_mbox = None
def verify(self):
- assert(os.path.exists(self.mbox))
+ assert os.path.exists(self.mbox)
if self.good_mbox is not None:
assertEqualContent(self.mbox, self.good_mbox)
else:
@@ -495,12 +495,12 @@ class TestArchive(TestCaseInTempdir):
archive_name += ".gz"
iszipped = True
else:
- assert(not os.path.exists(archive_name + ".gz"))
+ assert not os.path.exists(archive_name + ".gz")
iszipped = False
if self.good_archive is not None:
assertEqualContent(archive_name, self.good_archive, iszipped)
else:
- assert(not os.path.exists(archive_name))
+ assert not os.path.exists(archive_name)
def make_old_mbox(self, body=None, headers=None, messages=1, old_archive=False):
"""Make an old mbox, optionally an existing archive, and make a reference
@@ -601,10 +601,10 @@ This is after the ^From line"""
msg = make_message(default_headers=headers, wantobj=True)
date = time.strptime("2000-07-29", "%Y-%m-%d")
archivemail.options.date_old_max = time.mktime(date)
- assert(archivemail.should_archive(msg))
+ assert archivemail.should_archive(msg)
date = time.strptime("2000-07-27", "%Y-%m-%d")
archivemail.options.date_old_max = time.mktime(date)
- assert(not archivemail.should_archive(msg))
+ assert not archivemail.should_archive(msg)
def testMixed(self):
"""archiving a mixed mailbox"""
@@ -704,7 +704,7 @@ class TestArchiveMboxTimestamp(TestCaseInTempdir):
self.verify()
def verify(self):
- assert(os.path.exists(self.mbox_name))
+ assert os.path.exists(self.mbox_name)
new_atime = os.path.getatime(self.mbox_name)
new_mtime = os.path.getmtime(self.mbox_name)
self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision)
@@ -725,12 +725,12 @@ class TestArchiveMboxAll(unittest.TestCase):
def testNew(self):
"""new messages should be archived with --all"""
self.msg = make_message(hours_old=24*179, wantobj=True)
- assert(archivemail.should_archive(self.msg))
+ assert archivemail.should_archive(self.msg)
def testOld(self):
"""old messages should be archived with --all"""
self.msg = make_message(hours_old=24*181, wantobj=True)
- assert(archivemail.should_archive(self.msg))
+ assert archivemail.should_archive(self.msg)
def tearDown(self):
archivemail.options.quiet = 0
@@ -746,12 +746,12 @@ class TestArchiveMboxPreserveUnread(unittest.TestCase):
def testOldRead(self):
"""old read messages should be archived with --preserve-unread"""
self.msg["Status"] = "RO"
- assert(archivemail.should_archive(self.msg))
+ assert archivemail.should_archive(self.msg)
def testOldUnread(self):
"""old unread messages should not be archived with --preserve-unread"""
self.msg["Status"] = "O"
- assert(not archivemail.should_archive(self.msg))
+ assert not archivemail.should_archive(self.msg)
def tearDown(self):
archivemail.options.quiet = 0
@@ -873,20 +873,20 @@ class TestArchiveMboxFlagged(unittest.TestCase):
"""by default, old flagged messages should not be archived"""
msg = make_message(default_headers={"X-Status": "F"},
hours_old=24*181, wantobj=True)
- assert(not archivemail.should_archive(msg))
+ assert not archivemail.should_archive(msg)
def testIncludeFlaggedNew(self):
"""new flagged messages should not be archived with include_flagged"""
msg = make_message(default_headers={"X-Status": "F"},
hours_old=24*179, wantobj=True)
- assert(not archivemail.should_archive(msg))
+ assert not archivemail.should_archive(msg)
def testIncludeFlaggedOld(self):
"""old flagged messages should be archived with include_flagged"""
archivemail.options.include_flagged = 1
msg = make_message(default_headers={"X-Status": "F"},
hours_old=24*181, wantobj=True)
- assert(archivemail.should_archive(msg))
+ assert archivemail.should_archive(msg)
def tearDown(self):
archivemail.options.include_flagged = 0
@@ -964,12 +964,12 @@ class TestArchiveSize(unittest.TestCase):
def testSmaller(self):
"""giving a size argument smaller than the message"""
archivemail.options.min_size = self.msg_size - 1
- assert(archivemail.should_archive(self.msg))
+ assert archivemail.should_archive(self.msg)
def testBigger(self):
"""giving a size argument bigger than the message"""
archivemail.options.min_size = self.msg_size + 1
- assert(not archivemail.should_archive(self.msg))
+ assert not archivemail.should_archive(self.msg)
def tearDown(self):
archivemail.options.quiet = 0
@@ -989,8 +989,8 @@ class TestArchiveMboxMode(TestCaseInTempdir):
os.chmod(self.mbox_name, mode)
archivemail.archive(self.mbox_name)
archive_name = self.mbox_name + "_archive.gz"
- assert(os.path.exists(self.mbox_name))
- assert(os.path.exists(archive_name))
+ assert os.path.exists(self.mbox_name)
+ assert os.path.exists(archive_name)
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(mode, stat.S_IMODE(new_mode))
archive_mode = os.stat(archive_name)[stat.ST_MODE]
@@ -1003,8 +1003,8 @@ class TestArchiveMboxMode(TestCaseInTempdir):
os.chmod(self.mbox_name, mode)
archivemail.archive(self.mbox_name)
archive_name = self.mbox_name + "_archive.gz"
- assert(not os.path.exists(archive_name))
- assert(os.path.exists(self.mbox_name))
+ assert not os.path.exists(archive_name)
+ assert os.path.exists(self.mbox_name)
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(mode, stat.S_IMODE(new_mode))
os.remove(self.mbox_name)
@@ -1049,8 +1049,8 @@ def make_message(body=None, default_headers={}, hours_old=None, wantobj=False):
def append_file(source, dest):
"""appends the file named 'source' to the file named 'dest'"""
- assert(os.path.isfile(source))
- assert(os.path.isfile(dest))
+ assert os.path.isfile(source)
+ assert os.path.isfile(dest)
read = open(source, "r")
write = open(dest, "a+")
shutil.copyfileobj(read,write)
@@ -1059,7 +1059,7 @@ def append_file(source, dest):
def make_mbox(body=None, headers=None, hours_old=0, messages=1):
- assert(tempfile.tempdir)
+ assert tempfile.tempdir
fd, name = tempfile.mkstemp()
file = os.fdopen(fd, "w")
for count in range(messages):
@@ -1097,18 +1097,18 @@ def make_old_archive(mailbox_name):
def assertEqualContent(firstfile, secondfile, zipfirst=False):
"""Verify that the two files exist and have identical content. If zipfirst
is True, assume that firstfile is gzip-compressed."""
- assert(os.path.exists(firstfile))
- assert(os.path.exists(secondfile))
+ assert os.path.exists(firstfile)
+ assert os.path.exists(secondfile)
if zipfirst:
try:
fp1 = gzip.GzipFile(firstfile, "r")
fp2 = open(secondfile, "r")
- assert(cmp_fileobj(fp1, fp2))
+ assert cmp_fileobj(fp1, fp2)
finally:
fp1.close()
fp2.close()
else:
- assert(filecmp.cmp(firstfile, secondfile, shallow=0))
+ assert filecmp.cmp(firstfile, secondfile, shallow=0)
def cmp_fileobj(fp1, fp2):
"""Return if reading the fileobjects yields identical content."""