summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xtests/wqueue2.py51
1 files changed, 50 insertions, 1 deletions
diff --git a/tests/wqueue2.py b/tests/wqueue2.py
index 4910b2f..5858fcd 100755
--- a/tests/wqueue2.py
+++ b/tests/wqueue2.py
@@ -12,7 +12,46 @@ import shutils
u8open = functools.partial(open, encoding='utf8')
-class PersistentQueue:
+class PersistentJobQueue2:
+
+ def __init__(self, filename, loop=None):
+ self.queue = deque.deque()
+ self.current_job = deque.deque()
+ self.event = asyncio.Event(loop=loop)
+
+ def push_job(self, item):
+ self.queue.push_right(item)
+ self.event.set()
+
+ async def get_job(self, item):
+ if not self.current_job.empty():
+ return self.current_job[0]
+ if not self.queue.empty():
+ job_item = self.queue.pop()
+ else:
+ self.event.clear()
+ await self.event.wait()
+ job_item = self.queue.pop()
+ self.current_job.push_right(self.queue.pop())
+ return self.current_job[0]
+
+ async def mark_job_done(self):
+ assert not self.current_job.empty()
+ self.current_job.pop()
+
+
+ def dump_queue(self, filename):
+ with u8open(filename, 'w') as f:
+ f.write(self.current_job.dump())
+ f.write(self.queue.dump())
+
+ def reload_from_filename(self, filename):
+ with u8open(filename) as f:
+ items = f.read().parse()
+ if
+
+
+class PersistentJobQueue:
def __init__(self, filename):
self.filename = filename
@@ -21,10 +60,20 @@ class PersistentQueue:
def append(self, item):
async with lock:
+ self.queue.append(item)
with u8open(self.filename, 'a') as f:
await f.write(item)
await f.write('\n')
+ async def get_job(self):
+ async with lock:
+ self.current_job.append(self.queue.pop())
+ return self.current_job[0]
+
+ async def mark_job_done(self):
+ async with lock:
+ self.current_job.pop()
+
async def pop(self):
# TODO: when wanting to pop an empty queue the pop should wait
# asynchronously for a new filename to arrive