""" Manage a queue of files for processing them sequentialy. """ import asyncio import os import functools import shutils u8open = functools.partial(open, encoding='utf8') class PersistentQueue: def __init__(self, filename): self.filename = filename self.lock = asyncio.Lock() def append(self, item): async with lock: with u8open(self.filename, 'a') as f: await f.write(item) await f.write('\n') async def pop(self): # TODO: when wanting to pop an empty queue the pop should wait # asynchronously for a new filename to arrive error firstline = None async with lock: with u8open(filename, 'r') as f, \ u8open(filename + '.tmp', 'w') as fo: async for line in f: if not firstline: firstline = line else: await fo.write(line) shutils.mv(filename + '.tmp', filename) return firstline async def test(): print('sleeping...') await asyncio.sleep(3) print('slept') async def manage_client(reader, writer): print('='*40) print('manage_client created') print('called on connection only ?') await asyncio.sleep(1) print('after sleep1 in server()') while True: line = await reader.readline() if not line: break print('line', line) writer.write(line) writer.close() print('end of manage_client') async def manage_jobs(): pq = PersistentQueue('queue.txt') queue = [] while True: item = await pq.pop() async with open('queue.txt') as f: line = await next(f) if not line: # schedule new job print('executing next scheduled job') def read_next_job_arg(): with open(QUEUE_FILE, 'r') as f: line = f.readline() def main(): print('getting event loop') loop = asyncio.get_event_loop() print('got event loop') # loop.call_soon(test) try: os.unlink('server_sock') print('creating coro...') coro = asyncio.start_unix_server(manage_client, path='server_sock') print('coro created') loop.run_until_complete(coro) print('coro returned') # loop.run_until_complete(test()) loop.run_forever() except KeyboardInterrupt: pass print('loop closed') loop.close() if __name__ == '__main__': main()