diff options
author | VG <vg@devys.org> | 2016-04-05 11:26:37 +0200 |
---|---|---|
committer | VG <vg@devys.org> | 2016-04-05 11:26:37 +0200 |
commit | 587b76de425fd84bdc3fa319af79dbc2d4245f05 (patch) | |
tree | 4f97a7b38e7bfc3201e97689a50bca9548072731 | |
parent | 309621421193c2eebd6362df6b8850407bc77df9 (diff) | |
download | wqueue-587b76de425fd84bdc3fa319af79dbc2d4245f05.tar.gz wqueue-587b76de425fd84bdc3fa319af79dbc2d4245f05.tar.bz2 wqueue-587b76de425fd84bdc3fa319af79dbc2d4245f05.zip |
wip
-rwxr-xr-x | tests/wqueue2.py | 51 |
1 files changed, 50 insertions, 1 deletions
diff --git a/tests/wqueue2.py b/tests/wqueue2.py index 4910b2f..5858fcd 100755 --- a/tests/wqueue2.py +++ b/tests/wqueue2.py @@ -12,7 +12,46 @@ import shutils u8open = functools.partial(open, encoding='utf8') -class PersistentQueue: +class PersistentJobQueue2: + + def __init__(self, filename, loop=None): + self.queue = deque.deque() + self.current_job = deque.deque() + self.event = asyncio.Event(loop=loop) + + def push_job(self, item): + self.queue.push_right(item) + self.event.set() + + async def get_job(self, item): + if not self.current_job.empty(): + return self.current_job[0] + if not self.queue.empty(): + job_item = self.queue.pop() + else: + self.event.clear() + await self.event.wait() + job_item = self.queue.pop() + self.current_job.push_right(self.queue.pop()) + return self.current_job[0] + + async def mark_job_done(self): + assert not self.current_job.empty() + self.current_job.pop() + + + def dump_queue(self, filename): + with u8open(filename, 'w') as f: + f.write(self.current_job.dump()) + f.write(self.queue.dump()) + + def reload_from_filename(self, filename): + with u8open(filename) as f: + items = f.read().parse() + if + + +class PersistentJobQueue: def __init__(self, filename): self.filename = filename @@ -21,10 +60,20 @@ class PersistentQueue: def append(self, item): async with lock: + self.queue.append(item) with u8open(self.filename, 'a') as f: await f.write(item) await f.write('\n') + async def get_job(self): + async with lock: + self.current_job.append(self.queue.pop()) + return self.current_job[0] + + async def mark_job_done(self): + async with lock: + self.current_job.pop() + async def pop(self): # TODO: when wanting to pop an empty queue the pop should wait # asynchronously for a new filename to arrive |