From 06d158c576feaa46f27f23f7e15eecbe4115a65d Mon Sep 17 00:00:00 2001 From: Nikolaus Schulz Date: Fri, 20 Oct 2006 22:50:38 +0000 Subject: Fixed unittest TestMboxExclusiveLock: on some systems flock(2) is emulated with fcntl(2) calls. fcntl locks don't support interlocking within a process, so we need to fork() to correctly test them. --- test_archivemail.py | 43 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/test_archivemail.py b/test_archivemail.py index 78c5c91..ef46949 100755 --- a/test_archivemail.py +++ b/test_archivemail.py @@ -158,14 +158,47 @@ class TestMboxExclusiveLock(unittest.TestCase): def testExclusiveLock(self): """exclusive_lock/unlock should create/delete an advisory lock""" - self.mbox.exclusive_lock() + + # We're using flock(2) locks; these aren't completely portable, and on + # some systems (e.g. Solaris) they may be emulated with fcntl(2) locks, + # which have pretty different semantics. We could test real flock + # locks within this process, but that doesn't work for fcntl locks. + # + # The following code snippet heavily lends from the Python 2.5 mailbox + # unittest. + # BEGIN robbery: + + # Fork off a subprocess that will lock the file for 2 seconds, + # unlock it, and then exit. + if not hasattr(os, 'fork'): + return + pid = os.fork() + if pid == 0: + # In the child, lock the mailbox. + self.mbox.exclusive_lock() + time.sleep(2) + self.mbox.exclusive_unlock() + os._exit(0) + + # In the parent, sleep a bit to give the child time to acquire + # the lock. + time.sleep(0.5) + # The parent's file self.mbox.mbox_file shares flock locks with the + # duplicated FD in the child; reopen it so we get a different file + # table entry. file = open(self.mbox_name, "r+") lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB - self.assertRaises(IOError, fcntl.flock, file.fileno(), lock_nb) + fd = file.fileno() + try: + self.assertRaises(IOError, fcntl.flock, fd, lock_nb) - self.mbox.exclusive_unlock() - fcntl.flock(file.fileno(), lock_nb) - fcntl.flock(file.fileno(), fcntl.LOCK_UN) + finally: + # Wait for child to exit. Locking should now succeed. + exited_pid, status = os.waitpid(pid, 0) + + fcntl.flock(fd, lock_nb) + fcntl.flock(fd, fcntl.LOCK_UN) + # END robbery def tearDown(self): if os.path.exists(self.mbox_name): -- cgit v1.2.3