From 4b59cc343e88cff459bc663bbc85e3c701fdd415 Mon Sep 17 00:00:00 2001 From: VG Date: Mon, 11 Apr 2016 13:21:10 +0200 Subject: Auto-commit on 6d1dbe8495b5fafbc5f50d80268d0ca5b7b097be --- src/Makefile | 10 ++++ src/command.sh | 2 + src/limitations.notes | 4 ++ src/queue.txt | 26 ++++++++++ src/setup.cfg | 4 ++ src/wqueue.py | 66 +++++++++++++++++++++++++ src/wqueue2.py | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 242 insertions(+) create mode 100644 src/Makefile create mode 100755 src/command.sh create mode 100644 src/limitations.notes create mode 100644 src/queue.txt create mode 100644 src/setup.cfg create mode 100755 src/wqueue.py create mode 100755 src/wqueue2.py (limited to 'src') diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..6929f5e --- /dev/null +++ b/src/Makefile @@ -0,0 +1,10 @@ +# static python static checker +# python3-flake8 (pyflakes + python3-pep8) +# pylint3 + +#@python3.5 -m flake8 wqueue2.py => does not support async keyword for now +q= + +test: + $(Q)python3.5 -m pep8 --first . + $(Q)PYTHONASYNCIODEBUG=1 PYTHONUNBUFFERED=1 python3.5 -Wdefault ./wqueue2.py diff --git a/src/command.sh b/src/command.sh new file mode 100755 index 0000000..c942ef4 --- /dev/null +++ b/src/command.sh @@ -0,0 +1,2 @@ +#!/bin/sh +exec sleep 10 diff --git a/src/limitations.notes b/src/limitations.notes new file mode 100644 index 0000000..d723c07 --- /dev/null +++ b/src/limitations.notes @@ -0,0 +1,4 @@ +Queue is not saved between invocations of the queue. The queue is a living +queue only. It can be edited (for waiting items only), thus if you want to +save a queue, you can always copy the list and restore it with another edit, +but it is a hack. diff --git a/src/queue.txt b/src/queue.txt new file mode 100644 index 0000000..0edb856 --- /dev/null +++ b/src/queue.txt @@ -0,0 +1,26 @@ +a +b +c +d +e +f +g +h +i +j +k +l +m +n +o +p +q +r +s +t +u +v +w +x +y +z diff --git a/src/setup.cfg b/src/setup.cfg new file mode 100644 index 0000000..2ebde94 --- /dev/null +++ b/src/setup.cfg @@ -0,0 +1,4 @@ +[pep8] +ignore = E121,E123,E126,E226,E24,E704,E265 +show-pep8 = true +exclude = wqueue.py diff --git a/src/wqueue.py b/src/wqueue.py new file mode 100755 index 0000000..7120bf4 --- /dev/null +++ b/src/wqueue.py @@ -0,0 +1,66 @@ +#!/usr/bin/python3 + +import fcntl +import time +import subprocess +import os +import asyncio + +QUEUE_FILE = 'queue.txt' +COMMAND = './command.sh' + + +# loop = asyncio.get_event_loop() + + +# class JobHandler(asyncio.SubprocessProtocol): +# def process_exited(self): +# print() + + +def do_job(command=None, arg=None): + assert(command) + assert(arg) + print('arg:', arg) + print('doing operation with arg') + #ret = subprocess.check_call([command, arg]) + loop.suprocess_exec([command, arg], + stdin=subprocess.DEVNULL, + stdout=sys.stdout, + stderr=sys.stderr) + print('ret:', ret) + return ret == 0 + +def read_next_job_arg(): + with open(QUEUE_FILE, 'r') as f: + line = f.readline() + return line.strip() + +def pop_job_arg(): + queue = QUEUE_FILE + queue_tmp = queue + '.tmp' + linecount = 0 + with open(queue, 'r') as fi, open(queue_tmp, 'w') as fo: + for line in fi: + linecount += 1 + if linecount == 1: + continue + fo.write(line) + os.rename(queue_tmp, queue) + +def check_queue(): + #loop = asyncio.get_event_loop() + #loop.run_forever() + + +#with open('queue.txt', 'w') as f: +# fcntl.lockf(f, fcntl.LOCK_EX) +# time.sleep(9999) + +#while True: +# job_arg = read_next_job_arg() +# if do_job(command=COMMAND, arg=job_arg): +# pop_job_arg() + +#loop = asyncio.get_event_loop() +loop.run_forever() diff --git a/src/wqueue2.py b/src/wqueue2.py new file mode 100755 index 0000000..4c2cd4c --- /dev/null +++ b/src/wqueue2.py @@ -0,0 +1,130 @@ +""" +Manage a queue of files for processing them sequentialy. +""" + + +import asyncio +import os +import functools +import shutils +import collections + + +u8open = functools.partial(open, encoding='utf8') + + +class PersistentQueue3: + + def __init__(self, filename=None): + self.queue = collections.deque() + self.current_job = collections.deque() + self.event = asyncio.Event() + self.filename = filename + + def push_job(self, item): + self.queue.append(item) + self.event.set() + + async def get_job(self, item): + if len(self.current_job): + return self.current_job[0] + if len(self.queue): + job_item = self.queue.pop() + else: + self.event.clear() + await self.event.wait() + job_item = self.queue.pop() + self.current_job.append(job_item)) + return self.current_job[0] + + async def mark_job_done(self): + assert len(self.current_job) + self.current_job.pop() + + def dump_queue(self, filename=None): + ''' + dump and clean the queue in order for it to be edited. + + note: current file is not saved since it would be inconsistent if + the queue is reloaded but file has finished being processed + between. + + Thus a file being processed is considered either done if done + between dump and reload, or failed if the queue is shut down + before its end. + + The view here is we do not want to redo the work on a file, if the + wanted result is a success: do not close the queue. + ''' + filename = filename if filename else self.filename + with u8open(filename, 'w') as f: + for item in self.queue: + print(item, file=f) + self.queue.clear() + + def reload_from_filename(self, filename=None): + ''' + reloads a queue after editing it externaly. + ''' + filename = filename if filename else self.filename + with u8open(filename) as f: + for item in f: + self.queue.append(item) + self.event.set() + + +async def do_the_job(job=None): + assert job + print('='*50) + print('Current job:', item) + do_the_job_here with await + print('Current job is done.') + + +async def manage_client(reader, writer): + print('='*40) + print('manage_client created') + print('called on connection only ?') + await asyncio.sleep(1) + print('after sleep1 in server()') + while True: + line = await reader.readline() + if not line: + break + print('line', line) + writer.write(line) + writer.close() + print('end of manage_client') + + +async def jobs_runner(): + jobs = PersistentQueue3(filename='queue.txt') + queue = [] + while True: + job = await jobs.get_job() + await do_the_job(job=job) + jobs.mark_job_done() + + +def main(): + print('getting event loop') + loop = asyncio.get_event_loop() + print('got event loop') + loop.call_soon(jobs_runner) + try: + os.unlink('server_sock') + print('creating coro...') + coro = asyncio.start_unix_server(manage_client, path='server_sock') + print('coro created') + loop.run_until_complete(coro) + print('coro returned') + # loop.run_until_complete(test()) + loop.run_forever() + except KeyboardInterrupt: + pass + print('loop closed') + loop.close() + + +if __name__ == '__main__': + main() -- cgit v1.2.3