summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile10
-rwxr-xr-xsrc/command.sh2
-rw-r--r--src/limitations.notes4
-rw-r--r--src/queue.txt26
-rw-r--r--src/setup.cfg4
-rwxr-xr-xsrc/wqueue.py66
-rwxr-xr-xsrc/wqueue2.py130
7 files changed, 242 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile
new file mode 100644
index 0000000..6929f5e
--- /dev/null
+++ b/src/Makefile
@@ -0,0 +1,10 @@
+# static python static checker
+# python3-flake8 (pyflakes + python3-pep8)
+# pylint3
+
+#@python3.5 -m flake8 wqueue2.py => does not support async keyword for now
+q=
+
+test:
+ $(Q)python3.5 -m pep8 --first .
+ $(Q)PYTHONASYNCIODEBUG=1 PYTHONUNBUFFERED=1 python3.5 -Wdefault ./wqueue2.py
diff --git a/src/command.sh b/src/command.sh
new file mode 100755
index 0000000..c942ef4
--- /dev/null
+++ b/src/command.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+exec sleep 10
diff --git a/src/limitations.notes b/src/limitations.notes
new file mode 100644
index 0000000..d723c07
--- /dev/null
+++ b/src/limitations.notes
@@ -0,0 +1,4 @@
+Queue is not saved between invocations of the queue. The queue is a living
+queue only. It can be edited (for waiting items only), thus if you want to
+save a queue, you can always copy the list and restore it with another edit,
+but it is a hack.
diff --git a/src/queue.txt b/src/queue.txt
new file mode 100644
index 0000000..0edb856
--- /dev/null
+++ b/src/queue.txt
@@ -0,0 +1,26 @@
+a
+b
+c
+d
+e
+f
+g
+h
+i
+j
+k
+l
+m
+n
+o
+p
+q
+r
+s
+t
+u
+v
+w
+x
+y
+z
diff --git a/src/setup.cfg b/src/setup.cfg
new file mode 100644
index 0000000..2ebde94
--- /dev/null
+++ b/src/setup.cfg
@@ -0,0 +1,4 @@
+[pep8]
+ignore = E121,E123,E126,E226,E24,E704,E265
+show-pep8 = true
+exclude = wqueue.py
diff --git a/src/wqueue.py b/src/wqueue.py
new file mode 100755
index 0000000..7120bf4
--- /dev/null
+++ b/src/wqueue.py
@@ -0,0 +1,66 @@
+#!/usr/bin/python3
+
+import fcntl
+import time
+import subprocess
+import os
+import asyncio
+
+QUEUE_FILE = 'queue.txt'
+COMMAND = './command.sh'
+
+
+# loop = asyncio.get_event_loop()
+
+
+# class JobHandler(asyncio.SubprocessProtocol):
+# def process_exited(self):
+# print()
+
+
+def do_job(command=None, arg=None):
+ assert(command)
+ assert(arg)
+ print('arg:', arg)
+ print('doing operation with arg')
+ #ret = subprocess.check_call([command, arg])
+ loop.suprocess_exec([command, arg],
+ stdin=subprocess.DEVNULL,
+ stdout=sys.stdout,
+ stderr=sys.stderr)
+ print('ret:', ret)
+ return ret == 0
+
+def read_next_job_arg():
+ with open(QUEUE_FILE, 'r') as f:
+ line = f.readline()
+ return line.strip()
+
+def pop_job_arg():
+ queue = QUEUE_FILE
+ queue_tmp = queue + '.tmp'
+ linecount = 0
+ with open(queue, 'r') as fi, open(queue_tmp, 'w') as fo:
+ for line in fi:
+ linecount += 1
+ if linecount == 1:
+ continue
+ fo.write(line)
+ os.rename(queue_tmp, queue)
+
+def check_queue():
+ #loop = asyncio.get_event_loop()
+ #loop.run_forever()
+
+
+#with open('queue.txt', 'w') as f:
+# fcntl.lockf(f, fcntl.LOCK_EX)
+# time.sleep(9999)
+
+#while True:
+# job_arg = read_next_job_arg()
+# if do_job(command=COMMAND, arg=job_arg):
+# pop_job_arg()
+
+#loop = asyncio.get_event_loop()
+loop.run_forever()
diff --git a/src/wqueue2.py b/src/wqueue2.py
new file mode 100755
index 0000000..4c2cd4c
--- /dev/null
+++ b/src/wqueue2.py
@@ -0,0 +1,130 @@
+"""
+Manage a queue of files for processing them sequentialy.
+"""
+
+
+import asyncio
+import os
+import functools
+import shutils
+import collections
+
+
+u8open = functools.partial(open, encoding='utf8')
+
+
+class PersistentQueue3:
+
+ def __init__(self, filename=None):
+ self.queue = collections.deque()
+ self.current_job = collections.deque()
+ self.event = asyncio.Event()
+ self.filename = filename
+
+ def push_job(self, item):
+ self.queue.append(item)
+ self.event.set()
+
+ async def get_job(self, item):
+ if len(self.current_job):
+ return self.current_job[0]
+ if len(self.queue):
+ job_item = self.queue.pop()
+ else:
+ self.event.clear()
+ await self.event.wait()
+ job_item = self.queue.pop()
+ self.current_job.append(job_item))
+ return self.current_job[0]
+
+ async def mark_job_done(self):
+ assert len(self.current_job)
+ self.current_job.pop()
+
+ def dump_queue(self, filename=None):
+ '''
+ dump and clean the queue in order for it to be edited.
+
+ note: current file is not saved since it would be inconsistent if
+ the queue is reloaded but file has finished being processed
+ between.
+
+ Thus a file being processed is considered either done if done
+ between dump and reload, or failed if the queue is shut down
+ before its end.
+
+ The view here is we do not want to redo the work on a file, if the
+ wanted result is a success: do not close the queue.
+ '''
+ filename = filename if filename else self.filename
+ with u8open(filename, 'w') as f:
+ for item in self.queue:
+ print(item, file=f)
+ self.queue.clear()
+
+ def reload_from_filename(self, filename=None):
+ '''
+ reloads a queue after editing it externaly.
+ '''
+ filename = filename if filename else self.filename
+ with u8open(filename) as f:
+ for item in f:
+ self.queue.append(item)
+ self.event.set()
+
+
+async def do_the_job(job=None):
+ assert job
+ print('='*50)
+ print('Current job:', item)
+ do_the_job_here with await
+ print('Current job is done.')
+
+
+async def manage_client(reader, writer):
+ print('='*40)
+ print('manage_client created')
+ print('called on connection only ?')
+ await asyncio.sleep(1)
+ print('after sleep1 in server()')
+ while True:
+ line = await reader.readline()
+ if not line:
+ break
+ print('line', line)
+ writer.write(line)
+ writer.close()
+ print('end of manage_client')
+
+
+async def jobs_runner():
+ jobs = PersistentQueue3(filename='queue.txt')
+ queue = []
+ while True:
+ job = await jobs.get_job()
+ await do_the_job(job=job)
+ jobs.mark_job_done()
+
+
+def main():
+ print('getting event loop')
+ loop = asyncio.get_event_loop()
+ print('got event loop')
+ loop.call_soon(jobs_runner)
+ try:
+ os.unlink('server_sock')
+ print('creating coro...')
+ coro = asyncio.start_unix_server(manage_client, path='server_sock')
+ print('coro created')
+ loop.run_until_complete(coro)
+ print('coro returned')
+ # loop.run_until_complete(test())
+ loop.run_forever()
+ except KeyboardInterrupt:
+ pass
+ print('loop closed')
+ loop.close()
+
+
+if __name__ == '__main__':
+ main()