summaryrefslogtreecommitdiffstats
path: root/tests/wqueue2.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/wqueue2.py')
-rwxr-xr-xtests/wqueue2.py43
1 files changed, 42 insertions, 1 deletions
diff --git a/tests/wqueue2.py b/tests/wqueue2.py
index a78f315..4910b2f 100755
--- a/tests/wqueue2.py
+++ b/tests/wqueue2.py
@@ -1,7 +1,46 @@
-#!/usr/bin/python3.5
+"""
+Manage a queue of files for processing them sequentialy.
+"""
+
import asyncio
import os
+import functools
+import shutils
+
+
+u8open = functools.partial(open, encoding='utf8')
+
+
+class PersistentQueue:
+
+ def __init__(self, filename):
+ self.filename = filename
+ self.lock = asyncio.Lock()
+
+
+ def append(self, item):
+ async with lock:
+ with u8open(self.filename, 'a') as f:
+ await f.write(item)
+ await f.write('\n')
+
+ async def pop(self):
+ # TODO: when wanting to pop an empty queue the pop should wait
+ # asynchronously for a new filename to arrive
+ error
+ firstline = None
+ async with lock:
+ with u8open(filename, 'r') as f, \
+ u8open(filename + '.tmp', 'w') as fo:
+ async for line in f:
+ if not firstline:
+ firstline = line
+ else:
+ await fo.write(line)
+ shutils.mv(filename + '.tmp', filename)
+ return firstline
+
async def test():
print('sleeping...')
@@ -26,8 +65,10 @@ async def manage_client(reader, writer):
async def manage_jobs():
+ pq = PersistentQueue('queue.txt')
queue = []
while True:
+ item = await pq.pop()
async with open('queue.txt') as f:
line = await next(f)
if not line: