aboutsummaryrefslogtreecommitdiffstats
path: root/test_archivemail.py
diff options
context:
space:
mode:
Diffstat (limited to 'test_archivemail.py')
-rwxr-xr-xtest_archivemail.py43
1 files changed, 38 insertions, 5 deletions
diff --git a/test_archivemail.py b/test_archivemail.py
index 78c5c91..ef46949 100755
--- a/test_archivemail.py
+++ b/test_archivemail.py
@@ -158,14 +158,47 @@ class TestMboxExclusiveLock(unittest.TestCase):
def testExclusiveLock(self):
"""exclusive_lock/unlock should create/delete an advisory lock"""
- self.mbox.exclusive_lock()
+
+ # We're using flock(2) locks; these aren't completely portable, and on
+ # some systems (e.g. Solaris) they may be emulated with fcntl(2) locks,
+ # which have pretty different semantics. We could test real flock
+ # locks within this process, but that doesn't work for fcntl locks.
+ #
+ # The following code snippet heavily lends from the Python 2.5 mailbox
+ # unittest.
+ # BEGIN robbery:
+
+ # Fork off a subprocess that will lock the file for 2 seconds,
+ # unlock it, and then exit.
+ if not hasattr(os, 'fork'):
+ return
+ pid = os.fork()
+ if pid == 0:
+ # In the child, lock the mailbox.
+ self.mbox.exclusive_lock()
+ time.sleep(2)
+ self.mbox.exclusive_unlock()
+ os._exit(0)
+
+ # In the parent, sleep a bit to give the child time to acquire
+ # the lock.
+ time.sleep(0.5)
+ # The parent's file self.mbox.mbox_file shares flock locks with the
+ # duplicated FD in the child; reopen it so we get a different file
+ # table entry.
file = open(self.mbox_name, "r+")
lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB
- self.assertRaises(IOError, fcntl.flock, file.fileno(), lock_nb)
+ fd = file.fileno()
+ try:
+ self.assertRaises(IOError, fcntl.flock, fd, lock_nb)
- self.mbox.exclusive_unlock()
- fcntl.flock(file.fileno(), lock_nb)
- fcntl.flock(file.fileno(), fcntl.LOCK_UN)
+ finally:
+ # Wait for child to exit. Locking should now succeed.
+ exited_pid, status = os.waitpid(pid, 0)
+
+ fcntl.flock(fd, lock_nb)
+ fcntl.flock(fd, fcntl.LOCK_UN)
+ # END robbery
def tearDown(self):
if os.path.exists(self.mbox_name):