""" Manage a queue of files for processing them sequentialy. """ import asyncio import os import functools import shutils u8open = functools.partial(open, encoding='utf8') class PersistentJobQueue2: def __init__(self, filename, loop=None): self.queue = deque.deque() self.current_job = deque.deque() self.event = asyncio.Event(loop=loop) def push_job(self, item): self.queue.push_right(item) self.event.set() async def get_job(self, item): if not self.current_job.empty(): return self.current_job[0] if not self.queue.empty(): job_item = self.queue.pop() else: self.event.clear() await self.event.wait() job_item = self.queue.pop() self.current_job.push_right(self.queue.pop()) return self.current_job[0] async def mark_job_done(self): assert not self.current_job.empty() self.current_job.pop() def dump_queue(self, filename): with u8open(filename, 'w') as f: f.write(self.current_job.dump()) f.write(self.queue.dump()) def reload_from_filename(self, filename): with u8open(filename) as f: items = f.read().parse() if class PersistentJobQueue: def __init__(self, filename): self.filename = filename self.lock = asyncio.Lock() def append(self, item): async with lock: self.queue.append(item) with u8open(self.filename, 'a') as f: await f.write(item) await f.write('\n') async def get_job(self): async with lock: self.current_job.append(self.queue.pop()) return self.current_job[0] async def mark_job_done(self): async with lock: self.current_job.pop() async def pop(self): # TODO: when wanting to pop an empty queue the pop should wait # asynchronously for a new filename to arrive error firstline = None async with lock: with u8open(filename, 'r') as f, \ u8open(filename + '.tmp', 'w') as fo: async for line in f: if not firstline: firstline = line else: await fo.write(line) shutils.mv(filename + '.tmp', filename) return firstline async def test(): print('sleeping...') await asyncio.sleep(3) print('slept') async def manage_client(reader, writer): print('='*40) print('manage_client created') print('called on connection only ?') await asyncio.sleep(1) print('after sleep1 in server()') while True: line = await reader.readline() if not line: break print('line', line) writer.write(line) writer.close() print('end of manage_client') async def manage_jobs(): pq = PersistentQueue('queue.txt') queue = [] while True: item = await pq.pop() async with open('queue.txt') as f: line = await next(f) if not line: # schedule new job print('executing next scheduled job') def read_next_job_arg(): with open(QUEUE_FILE, 'r') as f: line = f.readline() def main(): print('getting event loop') loop = asyncio.get_event_loop() print('got event loop') # loop.call_soon(test) try: os.unlink('server_sock') print('creating coro...') coro = asyncio.start_unix_server(manage_client, path='server_sock') print('coro created') loop.run_until_complete(coro) print('coro returned') # loop.run_until_complete(test()) loop.run_forever() except KeyboardInterrupt: pass print('loop closed') loop.close() if __name__ == '__main__': main()