diff options
Diffstat (limited to 'tests/wqueue2.py')
-rwxr-xr-x | tests/wqueue2.py | 130 |
1 files changed, 0 insertions, 130 deletions
diff --git a/tests/wqueue2.py b/tests/wqueue2.py deleted file mode 100755 index 4c2cd4c..0000000 --- a/tests/wqueue2.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -Manage a queue of files for processing them sequentialy. -""" - - -import asyncio -import os -import functools -import shutils -import collections - - -u8open = functools.partial(open, encoding='utf8') - - -class PersistentQueue3: - - def __init__(self, filename=None): - self.queue = collections.deque() - self.current_job = collections.deque() - self.event = asyncio.Event() - self.filename = filename - - def push_job(self, item): - self.queue.append(item) - self.event.set() - - async def get_job(self, item): - if len(self.current_job): - return self.current_job[0] - if len(self.queue): - job_item = self.queue.pop() - else: - self.event.clear() - await self.event.wait() - job_item = self.queue.pop() - self.current_job.append(job_item)) - return self.current_job[0] - - async def mark_job_done(self): - assert len(self.current_job) - self.current_job.pop() - - def dump_queue(self, filename=None): - ''' - dump and clean the queue in order for it to be edited. - - note: current file is not saved since it would be inconsistent if - the queue is reloaded but file has finished being processed - between. - - Thus a file being processed is considered either done if done - between dump and reload, or failed if the queue is shut down - before its end. - - The view here is we do not want to redo the work on a file, if the - wanted result is a success: do not close the queue. - ''' - filename = filename if filename else self.filename - with u8open(filename, 'w') as f: - for item in self.queue: - print(item, file=f) - self.queue.clear() - - def reload_from_filename(self, filename=None): - ''' - reloads a queue after editing it externaly. - ''' - filename = filename if filename else self.filename - with u8open(filename) as f: - for item in f: - self.queue.append(item) - self.event.set() - - -async def do_the_job(job=None): - assert job - print('='*50) - print('Current job:', item) - do_the_job_here with await - print('Current job is done.') - - -async def manage_client(reader, writer): - print('='*40) - print('manage_client created') - print('called on connection only ?') - await asyncio.sleep(1) - print('after sleep1 in server()') - while True: - line = await reader.readline() - if not line: - break - print('line', line) - writer.write(line) - writer.close() - print('end of manage_client') - - -async def jobs_runner(): - jobs = PersistentQueue3(filename='queue.txt') - queue = [] - while True: - job = await jobs.get_job() - await do_the_job(job=job) - jobs.mark_job_done() - - -def main(): - print('getting event loop') - loop = asyncio.get_event_loop() - print('got event loop') - loop.call_soon(jobs_runner) - try: - os.unlink('server_sock') - print('creating coro...') - coro = asyncio.start_unix_server(manage_client, path='server_sock') - print('coro created') - loop.run_until_complete(coro) - print('coro returned') - # loop.run_until_complete(test()) - loop.run_forever() - except KeyboardInterrupt: - pass - print('loop closed') - loop.close() - - -if __name__ == '__main__': - main() |