From 4b59cc343e88cff459bc663bbc85e3c701fdd415 Mon Sep 17 00:00:00 2001 From: VG Date: Mon, 11 Apr 2016 13:21:10 +0200 Subject: Auto-commit on 6d1dbe8495b5fafbc5f50d80268d0ca5b7b097be --- src/Makefile | 10 ++++ src/command.sh | 2 + src/limitations.notes | 4 ++ src/queue.txt | 26 ++++++++++ src/setup.cfg | 4 ++ src/wqueue.py | 66 ++++++++++++++++++++++++ src/wqueue2.py | 130 ++++++++++++++++++++++++++++++++++++++++++++++++ tests/Makefile | 10 ---- tests/command.sh | 2 - tests/limitations.notes | 4 -- tests/queue.txt | 26 ---------- tests/setup.cfg | 4 -- tests/wqueue.py | 66 ------------------------ tests/wqueue2.py | 130 ------------------------------------------------ 14 files changed, 242 insertions(+), 242 deletions(-) create mode 100644 src/Makefile create mode 100755 src/command.sh create mode 100644 src/limitations.notes create mode 100644 src/queue.txt create mode 100644 src/setup.cfg create mode 100755 src/wqueue.py create mode 100755 src/wqueue2.py delete mode 100644 tests/Makefile delete mode 100755 tests/command.sh delete mode 100644 tests/limitations.notes delete mode 100644 tests/queue.txt delete mode 100644 tests/setup.cfg delete mode 100755 tests/wqueue.py delete mode 100755 tests/wqueue2.py diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..6929f5e --- /dev/null +++ b/src/Makefile @@ -0,0 +1,10 @@ +# static python static checker +# python3-flake8 (pyflakes + python3-pep8) +# pylint3 + +#@python3.5 -m flake8 wqueue2.py => does not support async keyword for now +q= + +test: + $(Q)python3.5 -m pep8 --first . + $(Q)PYTHONASYNCIODEBUG=1 PYTHONUNBUFFERED=1 python3.5 -Wdefault ./wqueue2.py diff --git a/src/command.sh b/src/command.sh new file mode 100755 index 0000000..c942ef4 --- /dev/null +++ b/src/command.sh @@ -0,0 +1,2 @@ +#!/bin/sh +exec sleep 10 diff --git a/src/limitations.notes b/src/limitations.notes new file mode 100644 index 0000000..d723c07 --- /dev/null +++ b/src/limitations.notes @@ -0,0 +1,4 @@ +Queue is not saved between invocations of the queue. The queue is a living +queue only. It can be edited (for waiting items only), thus if you want to +save a queue, you can always copy the list and restore it with another edit, +but it is a hack. diff --git a/src/queue.txt b/src/queue.txt new file mode 100644 index 0000000..0edb856 --- /dev/null +++ b/src/queue.txt @@ -0,0 +1,26 @@ +a +b +c +d +e +f +g +h +i +j +k +l +m +n +o +p +q +r +s +t +u +v +w +x +y +z diff --git a/src/setup.cfg b/src/setup.cfg new file mode 100644 index 0000000..2ebde94 --- /dev/null +++ b/src/setup.cfg @@ -0,0 +1,4 @@ +[pep8] +ignore = E121,E123,E126,E226,E24,E704,E265 +show-pep8 = true +exclude = wqueue.py diff --git a/src/wqueue.py b/src/wqueue.py new file mode 100755 index 0000000..7120bf4 --- /dev/null +++ b/src/wqueue.py @@ -0,0 +1,66 @@ +#!/usr/bin/python3 + +import fcntl +import time +import subprocess +import os +import asyncio + +QUEUE_FILE = 'queue.txt' +COMMAND = './command.sh' + + +# loop = asyncio.get_event_loop() + + +# class JobHandler(asyncio.SubprocessProtocol): +# def process_exited(self): +# print() + + +def do_job(command=None, arg=None): + assert(command) + assert(arg) + print('arg:', arg) + print('doing operation with arg') + #ret = subprocess.check_call([command, arg]) + loop.suprocess_exec([command, arg], + stdin=subprocess.DEVNULL, + stdout=sys.stdout, + stderr=sys.stderr) + print('ret:', ret) + return ret == 0 + +def read_next_job_arg(): + with open(QUEUE_FILE, 'r') as f: + line = f.readline() + return line.strip() + +def pop_job_arg(): + queue = QUEUE_FILE + queue_tmp = queue + '.tmp' + linecount = 0 + with open(queue, 'r') as fi, open(queue_tmp, 'w') as fo: + for line in fi: + linecount += 1 + if linecount == 1: + continue + fo.write(line) + os.rename(queue_tmp, queue) + +def check_queue(): + #loop = asyncio.get_event_loop() + #loop.run_forever() + + +#with open('queue.txt', 'w') as f: +# fcntl.lockf(f, fcntl.LOCK_EX) +# time.sleep(9999) + +#while True: +# job_arg = read_next_job_arg() +# if do_job(command=COMMAND, arg=job_arg): +# pop_job_arg() + +#loop = asyncio.get_event_loop() +loop.run_forever() diff --git a/src/wqueue2.py b/src/wqueue2.py new file mode 100755 index 0000000..4c2cd4c --- /dev/null +++ b/src/wqueue2.py @@ -0,0 +1,130 @@ +""" +Manage a queue of files for processing them sequentialy. +""" + + +import asyncio +import os +import functools +import shutils +import collections + + +u8open = functools.partial(open, encoding='utf8') + + +class PersistentQueue3: + + def __init__(self, filename=None): + self.queue = collections.deque() + self.current_job = collections.deque() + self.event = asyncio.Event() + self.filename = filename + + def push_job(self, item): + self.queue.append(item) + self.event.set() + + async def get_job(self, item): + if len(self.current_job): + return self.current_job[0] + if len(self.queue): + job_item = self.queue.pop() + else: + self.event.clear() + await self.event.wait() + job_item = self.queue.pop() + self.current_job.append(job_item)) + return self.current_job[0] + + async def mark_job_done(self): + assert len(self.current_job) + self.current_job.pop() + + def dump_queue(self, filename=None): + ''' + dump and clean the queue in order for it to be edited. + + note: current file is not saved since it would be inconsistent if + the queue is reloaded but file has finished being processed + between. + + Thus a file being processed is considered either done if done + between dump and reload, or failed if the queue is shut down + before its end. + + The view here is we do not want to redo the work on a file, if the + wanted result is a success: do not close the queue. + ''' + filename = filename if filename else self.filename + with u8open(filename, 'w') as f: + for item in self.queue: + print(item, file=f) + self.queue.clear() + + def reload_from_filename(self, filename=None): + ''' + reloads a queue after editing it externaly. + ''' + filename = filename if filename else self.filename + with u8open(filename) as f: + for item in f: + self.queue.append(item) + self.event.set() + + +async def do_the_job(job=None): + assert job + print('='*50) + print('Current job:', item) + do_the_job_here with await + print('Current job is done.') + + +async def manage_client(reader, writer): + print('='*40) + print('manage_client created') + print('called on connection only ?') + await asyncio.sleep(1) + print('after sleep1 in server()') + while True: + line = await reader.readline() + if not line: + break + print('line', line) + writer.write(line) + writer.close() + print('end of manage_client') + + +async def jobs_runner(): + jobs = PersistentQueue3(filename='queue.txt') + queue = [] + while True: + job = await jobs.get_job() + await do_the_job(job=job) + jobs.mark_job_done() + + +def main(): + print('getting event loop') + loop = asyncio.get_event_loop() + print('got event loop') + loop.call_soon(jobs_runner) + try: + os.unlink('server_sock') + print('creating coro...') + coro = asyncio.start_unix_server(manage_client, path='server_sock') + print('coro created') + loop.run_until_complete(coro) + print('coro returned') + # loop.run_until_complete(test()) + loop.run_forever() + except KeyboardInterrupt: + pass + print('loop closed') + loop.close() + + +if __name__ == '__main__': + main() diff --git a/tests/Makefile b/tests/Makefile deleted file mode 100644 index 6929f5e..0000000 --- a/tests/Makefile +++ /dev/null @@ -1,10 +0,0 @@ -# static python static checker -# python3-flake8 (pyflakes + python3-pep8) -# pylint3 - -#@python3.5 -m flake8 wqueue2.py => does not support async keyword for now -q= - -test: - $(Q)python3.5 -m pep8 --first . - $(Q)PYTHONASYNCIODEBUG=1 PYTHONUNBUFFERED=1 python3.5 -Wdefault ./wqueue2.py diff --git a/tests/command.sh b/tests/command.sh deleted file mode 100755 index c942ef4..0000000 --- a/tests/command.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -exec sleep 10 diff --git a/tests/limitations.notes b/tests/limitations.notes deleted file mode 100644 index d723c07..0000000 --- a/tests/limitations.notes +++ /dev/null @@ -1,4 +0,0 @@ -Queue is not saved between invocations of the queue. The queue is a living -queue only. It can be edited (for waiting items only), thus if you want to -save a queue, you can always copy the list and restore it with another edit, -but it is a hack. diff --git a/tests/queue.txt b/tests/queue.txt deleted file mode 100644 index 0edb856..0000000 --- a/tests/queue.txt +++ /dev/null @@ -1,26 +0,0 @@ -a -b -c -d -e -f -g -h -i -j -k -l -m -n -o -p -q -r -s -t -u -v -w -x -y -z diff --git a/tests/setup.cfg b/tests/setup.cfg deleted file mode 100644 index 2ebde94..0000000 --- a/tests/setup.cfg +++ /dev/null @@ -1,4 +0,0 @@ -[pep8] -ignore = E121,E123,E126,E226,E24,E704,E265 -show-pep8 = true -exclude = wqueue.py diff --git a/tests/wqueue.py b/tests/wqueue.py deleted file mode 100755 index 7120bf4..0000000 --- a/tests/wqueue.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/python3 - -import fcntl -import time -import subprocess -import os -import asyncio - -QUEUE_FILE = 'queue.txt' -COMMAND = './command.sh' - - -# loop = asyncio.get_event_loop() - - -# class JobHandler(asyncio.SubprocessProtocol): -# def process_exited(self): -# print() - - -def do_job(command=None, arg=None): - assert(command) - assert(arg) - print('arg:', arg) - print('doing operation with arg') - #ret = subprocess.check_call([command, arg]) - loop.suprocess_exec([command, arg], - stdin=subprocess.DEVNULL, - stdout=sys.stdout, - stderr=sys.stderr) - print('ret:', ret) - return ret == 0 - -def read_next_job_arg(): - with open(QUEUE_FILE, 'r') as f: - line = f.readline() - return line.strip() - -def pop_job_arg(): - queue = QUEUE_FILE - queue_tmp = queue + '.tmp' - linecount = 0 - with open(queue, 'r') as fi, open(queue_tmp, 'w') as fo: - for line in fi: - linecount += 1 - if linecount == 1: - continue - fo.write(line) - os.rename(queue_tmp, queue) - -def check_queue(): - #loop = asyncio.get_event_loop() - #loop.run_forever() - - -#with open('queue.txt', 'w') as f: -# fcntl.lockf(f, fcntl.LOCK_EX) -# time.sleep(9999) - -#while True: -# job_arg = read_next_job_arg() -# if do_job(command=COMMAND, arg=job_arg): -# pop_job_arg() - -#loop = asyncio.get_event_loop() -loop.run_forever() diff --git a/tests/wqueue2.py b/tests/wqueue2.py deleted file mode 100755 index 4c2cd4c..0000000 --- a/tests/wqueue2.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -Manage a queue of files for processing them sequentialy. -""" - - -import asyncio -import os -import functools -import shutils -import collections - - -u8open = functools.partial(open, encoding='utf8') - - -class PersistentQueue3: - - def __init__(self, filename=None): - self.queue = collections.deque() - self.current_job = collections.deque() - self.event = asyncio.Event() - self.filename = filename - - def push_job(self, item): - self.queue.append(item) - self.event.set() - - async def get_job(self, item): - if len(self.current_job): - return self.current_job[0] - if len(self.queue): - job_item = self.queue.pop() - else: - self.event.clear() - await self.event.wait() - job_item = self.queue.pop() - self.current_job.append(job_item)) - return self.current_job[0] - - async def mark_job_done(self): - assert len(self.current_job) - self.current_job.pop() - - def dump_queue(self, filename=None): - ''' - dump and clean the queue in order for it to be edited. - - note: current file is not saved since it would be inconsistent if - the queue is reloaded but file has finished being processed - between. - - Thus a file being processed is considered either done if done - between dump and reload, or failed if the queue is shut down - before its end. - - The view here is we do not want to redo the work on a file, if the - wanted result is a success: do not close the queue. - ''' - filename = filename if filename else self.filename - with u8open(filename, 'w') as f: - for item in self.queue: - print(item, file=f) - self.queue.clear() - - def reload_from_filename(self, filename=None): - ''' - reloads a queue after editing it externaly. - ''' - filename = filename if filename else self.filename - with u8open(filename) as f: - for item in f: - self.queue.append(item) - self.event.set() - - -async def do_the_job(job=None): - assert job - print('='*50) - print('Current job:', item) - do_the_job_here with await - print('Current job is done.') - - -async def manage_client(reader, writer): - print('='*40) - print('manage_client created') - print('called on connection only ?') - await asyncio.sleep(1) - print('after sleep1 in server()') - while True: - line = await reader.readline() - if not line: - break - print('line', line) - writer.write(line) - writer.close() - print('end of manage_client') - - -async def jobs_runner(): - jobs = PersistentQueue3(filename='queue.txt') - queue = [] - while True: - job = await jobs.get_job() - await do_the_job(job=job) - jobs.mark_job_done() - - -def main(): - print('getting event loop') - loop = asyncio.get_event_loop() - print('got event loop') - loop.call_soon(jobs_runner) - try: - os.unlink('server_sock') - print('creating coro...') - coro = asyncio.start_unix_server(manage_client, path='server_sock') - print('coro created') - loop.run_until_complete(coro) - print('coro returned') - # loop.run_until_complete(test()) - loop.run_forever() - except KeyboardInterrupt: - pass - print('loop closed') - loop.close() - - -if __name__ == '__main__': - main() -- cgit v1.2.3