From 587b76de425fd84bdc3fa319af79dbc2d4245f05 Mon Sep 17 00:00:00 2001 From: VG Date: Tue, 5 Apr 2016 11:26:37 +0200 Subject: wip --- tests/wqueue2.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/tests/wqueue2.py b/tests/wqueue2.py index 4910b2f..5858fcd 100755 --- a/tests/wqueue2.py +++ b/tests/wqueue2.py @@ -12,7 +12,46 @@ import shutils u8open = functools.partial(open, encoding='utf8') -class PersistentQueue: +class PersistentJobQueue2: + + def __init__(self, filename, loop=None): + self.queue = deque.deque() + self.current_job = deque.deque() + self.event = asyncio.Event(loop=loop) + + def push_job(self, item): + self.queue.push_right(item) + self.event.set() + + async def get_job(self, item): + if not self.current_job.empty(): + return self.current_job[0] + if not self.queue.empty(): + job_item = self.queue.pop() + else: + self.event.clear() + await self.event.wait() + job_item = self.queue.pop() + self.current_job.push_right(self.queue.pop()) + return self.current_job[0] + + async def mark_job_done(self): + assert not self.current_job.empty() + self.current_job.pop() + + + def dump_queue(self, filename): + with u8open(filename, 'w') as f: + f.write(self.current_job.dump()) + f.write(self.queue.dump()) + + def reload_from_filename(self, filename): + with u8open(filename) as f: + items = f.read().parse() + if + + +class PersistentJobQueue: def __init__(self, filename): self.filename = filename @@ -21,10 +60,20 @@ class PersistentQueue: def append(self, item): async with lock: + self.queue.append(item) with u8open(self.filename, 'a') as f: await f.write(item) await f.write('\n') + async def get_job(self): + async with lock: + self.current_job.append(self.queue.pop()) + return self.current_job[0] + + async def mark_job_done(self): + async with lock: + self.current_job.pop() + async def pop(self): # TODO: when wanting to pop an empty queue the pop should wait # asynchronously for a new filename to arrive -- cgit v1.2.3