aboutsummaryrefslogtreecommitdiffstats
path: root/tests/wqueue2.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/wqueue2.py')
-rwxr-xr-xtests/wqueue2.py130
1 files changed, 0 insertions, 130 deletions
diff --git a/tests/wqueue2.py b/tests/wqueue2.py
deleted file mode 100755
index 4c2cd4c..0000000
--- a/tests/wqueue2.py
+++ /dev/null
@@ -1,130 +0,0 @@
-"""
-Manage a queue of files for processing them sequentialy.
-"""
-
-
-import asyncio
-import os
-import functools
-import shutils
-import collections
-
-
-u8open = functools.partial(open, encoding='utf8')
-
-
-class PersistentQueue3:
-
- def __init__(self, filename=None):
- self.queue = collections.deque()
- self.current_job = collections.deque()
- self.event = asyncio.Event()
- self.filename = filename
-
- def push_job(self, item):
- self.queue.append(item)
- self.event.set()
-
- async def get_job(self, item):
- if len(self.current_job):
- return self.current_job[0]
- if len(self.queue):
- job_item = self.queue.pop()
- else:
- self.event.clear()
- await self.event.wait()
- job_item = self.queue.pop()
- self.current_job.append(job_item))
- return self.current_job[0]
-
- async def mark_job_done(self):
- assert len(self.current_job)
- self.current_job.pop()
-
- def dump_queue(self, filename=None):
- '''
- dump and clean the queue in order for it to be edited.
-
- note: current file is not saved since it would be inconsistent if
- the queue is reloaded but file has finished being processed
- between.
-
- Thus a file being processed is considered either done if done
- between dump and reload, or failed if the queue is shut down
- before its end.
-
- The view here is we do not want to redo the work on a file, if the
- wanted result is a success: do not close the queue.
- '''
- filename = filename if filename else self.filename
- with u8open(filename, 'w') as f:
- for item in self.queue:
- print(item, file=f)
- self.queue.clear()
-
- def reload_from_filename(self, filename=None):
- '''
- reloads a queue after editing it externaly.
- '''
- filename = filename if filename else self.filename
- with u8open(filename) as f:
- for item in f:
- self.queue.append(item)
- self.event.set()
-
-
-async def do_the_job(job=None):
- assert job
- print('='*50)
- print('Current job:', item)
- do_the_job_here with await
- print('Current job is done.')
-
-
-async def manage_client(reader, writer):
- print('='*40)
- print('manage_client created')
- print('called on connection only ?')
- await asyncio.sleep(1)
- print('after sleep1 in server()')
- while True:
- line = await reader.readline()
- if not line:
- break
- print('line', line)
- writer.write(line)
- writer.close()
- print('end of manage_client')
-
-
-async def jobs_runner():
- jobs = PersistentQueue3(filename='queue.txt')
- queue = []
- while True:
- job = await jobs.get_job()
- await do_the_job(job=job)
- jobs.mark_job_done()
-
-
-def main():
- print('getting event loop')
- loop = asyncio.get_event_loop()
- print('got event loop')
- loop.call_soon(jobs_runner)
- try:
- os.unlink('server_sock')
- print('creating coro...')
- coro = asyncio.start_unix_server(manage_client, path='server_sock')
- print('coro created')
- loop.run_until_complete(coro)
- print('coro returned')
- # loop.run_until_complete(test())
- loop.run_forever()
- except KeyboardInterrupt:
- pass
- print('loop closed')
- loop.close()
-
-
-if __name__ == '__main__':
- main()