diff options
author | VG <vg@devys.org> | 2016-04-11 13:21:10 +0200 |
---|---|---|
committer | VG <vg@devys.org> | 2016-04-11 13:21:10 +0200 |
commit | 4b59cc343e88cff459bc663bbc85e3c701fdd415 (patch) | |
tree | d10be0cd65a3b234b03ad8d54382041db659116c /src/wqueue2.py | |
parent | 1c8b8d2a5faefdf73ec9fccaf24ebbe826f21b97 (diff) | |
download | wqueue-4b59cc343e88cff459bc663bbc85e3c701fdd415.tar.gz wqueue-4b59cc343e88cff459bc663bbc85e3c701fdd415.tar.bz2 wqueue-4b59cc343e88cff459bc663bbc85e3c701fdd415.zip |
Auto-commit on 6d1dbe8495b5fafbc5f50d80268d0ca5b7b097be
Diffstat (limited to 'src/wqueue2.py')
-rwxr-xr-x | src/wqueue2.py | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/src/wqueue2.py b/src/wqueue2.py new file mode 100755 index 0000000..4c2cd4c --- /dev/null +++ b/src/wqueue2.py @@ -0,0 +1,130 @@ +""" +Manage a queue of files for processing them sequentialy. +""" + + +import asyncio +import os +import functools +import shutils +import collections + + +u8open = functools.partial(open, encoding='utf8') + + +class PersistentQueue3: + + def __init__(self, filename=None): + self.queue = collections.deque() + self.current_job = collections.deque() + self.event = asyncio.Event() + self.filename = filename + + def push_job(self, item): + self.queue.append(item) + self.event.set() + + async def get_job(self, item): + if len(self.current_job): + return self.current_job[0] + if len(self.queue): + job_item = self.queue.pop() + else: + self.event.clear() + await self.event.wait() + job_item = self.queue.pop() + self.current_job.append(job_item)) + return self.current_job[0] + + async def mark_job_done(self): + assert len(self.current_job) + self.current_job.pop() + + def dump_queue(self, filename=None): + ''' + dump and clean the queue in order for it to be edited. + + note: current file is not saved since it would be inconsistent if + the queue is reloaded but file has finished being processed + between. + + Thus a file being processed is considered either done if done + between dump and reload, or failed if the queue is shut down + before its end. + + The view here is we do not want to redo the work on a file, if the + wanted result is a success: do not close the queue. + ''' + filename = filename if filename else self.filename + with u8open(filename, 'w') as f: + for item in self.queue: + print(item, file=f) + self.queue.clear() + + def reload_from_filename(self, filename=None): + ''' + reloads a queue after editing it externaly. + ''' + filename = filename if filename else self.filename + with u8open(filename) as f: + for item in f: + self.queue.append(item) + self.event.set() + + +async def do_the_job(job=None): + assert job + print('='*50) + print('Current job:', item) + do_the_job_here with await + print('Current job is done.') + + +async def manage_client(reader, writer): + print('='*40) + print('manage_client created') + print('called on connection only ?') + await asyncio.sleep(1) + print('after sleep1 in server()') + while True: + line = await reader.readline() + if not line: + break + print('line', line) + writer.write(line) + writer.close() + print('end of manage_client') + + +async def jobs_runner(): + jobs = PersistentQueue3(filename='queue.txt') + queue = [] + while True: + job = await jobs.get_job() + await do_the_job(job=job) + jobs.mark_job_done() + + +def main(): + print('getting event loop') + loop = asyncio.get_event_loop() + print('got event loop') + loop.call_soon(jobs_runner) + try: + os.unlink('server_sock') + print('creating coro...') + coro = asyncio.start_unix_server(manage_client, path='server_sock') + print('coro created') + loop.run_until_complete(coro) + print('coro returned') + # loop.run_until_complete(test()) + loop.run_forever() + except KeyboardInterrupt: + pass + print('loop closed') + loop.close() + + +if __name__ == '__main__': + main() |