diff options
Diffstat (limited to 'tests/wqueue2.py')
-rwxr-xr-x | tests/wqueue2.py | 43 |
1 files changed, 42 insertions, 1 deletions
diff --git a/tests/wqueue2.py b/tests/wqueue2.py index a78f315..4910b2f 100755 --- a/tests/wqueue2.py +++ b/tests/wqueue2.py @@ -1,7 +1,46 @@ -#!/usr/bin/python3.5 +""" +Manage a queue of files for processing them sequentialy. +""" + import asyncio import os +import functools +import shutils + + +u8open = functools.partial(open, encoding='utf8') + + +class PersistentQueue: + + def __init__(self, filename): + self.filename = filename + self.lock = asyncio.Lock() + + + def append(self, item): + async with lock: + with u8open(self.filename, 'a') as f: + await f.write(item) + await f.write('\n') + + async def pop(self): + # TODO: when wanting to pop an empty queue the pop should wait + # asynchronously for a new filename to arrive + error + firstline = None + async with lock: + with u8open(filename, 'r') as f, \ + u8open(filename + '.tmp', 'w') as fo: + async for line in f: + if not firstline: + firstline = line + else: + await fo.write(line) + shutils.mv(filename + '.tmp', filename) + return firstline + async def test(): print('sleeping...') @@ -26,8 +65,10 @@ async def manage_client(reader, writer): async def manage_jobs(): + pq = PersistentQueue('queue.txt') queue = [] while True: + item = await pq.pop() async with open('queue.txt') as f: line = await next(f) if not line: |