summaryrefslogtreecommitdiffstats
path: root/src/wqueue2.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/wqueue2.py')
-rwxr-xr-xsrc/wqueue2.py130
1 files changed, 130 insertions, 0 deletions
diff --git a/src/wqueue2.py b/src/wqueue2.py
new file mode 100755
index 0000000..4c2cd4c
--- /dev/null
+++ b/src/wqueue2.py
@@ -0,0 +1,130 @@
+"""
+Manage a queue of files for processing them sequentialy.
+"""
+
+
+import asyncio
+import os
+import functools
+import shutils
+import collections
+
+
+u8open = functools.partial(open, encoding='utf8')
+
+
+class PersistentQueue3:
+
+ def __init__(self, filename=None):
+ self.queue = collections.deque()
+ self.current_job = collections.deque()
+ self.event = asyncio.Event()
+ self.filename = filename
+
+ def push_job(self, item):
+ self.queue.append(item)
+ self.event.set()
+
+ async def get_job(self, item):
+ if len(self.current_job):
+ return self.current_job[0]
+ if len(self.queue):
+ job_item = self.queue.pop()
+ else:
+ self.event.clear()
+ await self.event.wait()
+ job_item = self.queue.pop()
+ self.current_job.append(job_item))
+ return self.current_job[0]
+
+ async def mark_job_done(self):
+ assert len(self.current_job)
+ self.current_job.pop()
+
+ def dump_queue(self, filename=None):
+ '''
+ dump and clean the queue in order for it to be edited.
+
+ note: current file is not saved since it would be inconsistent if
+ the queue is reloaded but file has finished being processed
+ between.
+
+ Thus a file being processed is considered either done if done
+ between dump and reload, or failed if the queue is shut down
+ before its end.
+
+ The view here is we do not want to redo the work on a file, if the
+ wanted result is a success: do not close the queue.
+ '''
+ filename = filename if filename else self.filename
+ with u8open(filename, 'w') as f:
+ for item in self.queue:
+ print(item, file=f)
+ self.queue.clear()
+
+ def reload_from_filename(self, filename=None):
+ '''
+ reloads a queue after editing it externaly.
+ '''
+ filename = filename if filename else self.filename
+ with u8open(filename) as f:
+ for item in f:
+ self.queue.append(item)
+ self.event.set()
+
+
+async def do_the_job(job=None):
+ assert job
+ print('='*50)
+ print('Current job:', item)
+ do_the_job_here with await
+ print('Current job is done.')
+
+
+async def manage_client(reader, writer):
+ print('='*40)
+ print('manage_client created')
+ print('called on connection only ?')
+ await asyncio.sleep(1)
+ print('after sleep1 in server()')
+ while True:
+ line = await reader.readline()
+ if not line:
+ break
+ print('line', line)
+ writer.write(line)
+ writer.close()
+ print('end of manage_client')
+
+
+async def jobs_runner():
+ jobs = PersistentQueue3(filename='queue.txt')
+ queue = []
+ while True:
+ job = await jobs.get_job()
+ await do_the_job(job=job)
+ jobs.mark_job_done()
+
+
+def main():
+ print('getting event loop')
+ loop = asyncio.get_event_loop()
+ print('got event loop')
+ loop.call_soon(jobs_runner)
+ try:
+ os.unlink('server_sock')
+ print('creating coro...')
+ coro = asyncio.start_unix_server(manage_client, path='server_sock')
+ print('coro created')
+ loop.run_until_complete(coro)
+ print('coro returned')
+ # loop.run_until_complete(test())
+ loop.run_forever()
+ except KeyboardInterrupt:
+ pass
+ print('loop closed')
+ loop.close()
+
+
+if __name__ == '__main__':
+ main()