summaryrefslogtreecommitdiffstats
path: root/tests/wqueue2.py
diff options
context:
space:
mode:
authorVG <vg@devys.org>2016-03-29 17:43:04 +0200
committerVG <vg@devys.org>2016-03-29 17:43:04 +0200
commit309621421193c2eebd6362df6b8850407bc77df9 (patch)
treecdd120b47f958a03746fa5b49d48cec85e28664b /tests/wqueue2.py
parent976602f3d3d6cd22c7357821117a14b67d9edf46 (diff)
downloadwqueue-309621421193c2eebd6362df6b8850407bc77df9.tar.gz
wqueue-309621421193c2eebd6362df6b8850407bc77df9.tar.bz2
wqueue-309621421193c2eebd6362df6b8850407bc77df9.zip
wip
Diffstat (limited to 'tests/wqueue2.py')
-rwxr-xr-xtests/wqueue2.py43
1 files changed, 42 insertions, 1 deletions
diff --git a/tests/wqueue2.py b/tests/wqueue2.py
index a78f315..4910b2f 100755
--- a/tests/wqueue2.py
+++ b/tests/wqueue2.py
@@ -1,7 +1,46 @@
-#!/usr/bin/python3.5
+"""
+Manage a queue of files for processing them sequentialy.
+"""
+
import asyncio
import os
+import functools
+import shutils
+
+
+u8open = functools.partial(open, encoding='utf8')
+
+
+class PersistentQueue:
+
+ def __init__(self, filename):
+ self.filename = filename
+ self.lock = asyncio.Lock()
+
+
+ def append(self, item):
+ async with lock:
+ with u8open(self.filename, 'a') as f:
+ await f.write(item)
+ await f.write('\n')
+
+ async def pop(self):
+ # TODO: when wanting to pop an empty queue the pop should wait
+ # asynchronously for a new filename to arrive
+ error
+ firstline = None
+ async with lock:
+ with u8open(filename, 'r') as f, \
+ u8open(filename + '.tmp', 'w') as fo:
+ async for line in f:
+ if not firstline:
+ firstline = line
+ else:
+ await fo.write(line)
+ shutils.mv(filename + '.tmp', filename)
+ return firstline
+
async def test():
print('sleeping...')
@@ -26,8 +65,10 @@ async def manage_client(reader, writer):
async def manage_jobs():
+ pq = PersistentQueue('queue.txt')
queue = []
while True:
+ item = await pq.pop()
async with open('queue.txt') as f:
line = await next(f)
if not line: