diff options
| author | VG <vg@devys.org> | 2016-04-05 11:26:37 +0200 | 
|---|---|---|
| committer | VG <vg@devys.org> | 2016-04-05 11:26:37 +0200 | 
| commit | 587b76de425fd84bdc3fa319af79dbc2d4245f05 (patch) | |
| tree | 4f97a7b38e7bfc3201e97689a50bca9548072731 | |
| parent | 309621421193c2eebd6362df6b8850407bc77df9 (diff) | |
| download | wqueue-587b76de425fd84bdc3fa319af79dbc2d4245f05.tar.gz wqueue-587b76de425fd84bdc3fa319af79dbc2d4245f05.tar.bz2 wqueue-587b76de425fd84bdc3fa319af79dbc2d4245f05.zip | |
wip
| -rwxr-xr-x | tests/wqueue2.py | 51 | 
1 files changed, 50 insertions, 1 deletions
| diff --git a/tests/wqueue2.py b/tests/wqueue2.py index 4910b2f..5858fcd 100755 --- a/tests/wqueue2.py +++ b/tests/wqueue2.py @@ -12,7 +12,46 @@ import shutils  u8open = functools.partial(open, encoding='utf8') -class PersistentQueue: +class PersistentJobQueue2: + +    def __init__(self, filename, loop=None): +        self.queue = deque.deque() +        self.current_job = deque.deque() +        self.event = asyncio.Event(loop=loop) + +    def push_job(self, item): +        self.queue.push_right(item) +        self.event.set() + +    async def get_job(self, item): +        if not self.current_job.empty(): +            return self.current_job[0] +        if not self.queue.empty(): +            job_item = self.queue.pop() +        else: +            self.event.clear() +            await self.event.wait() +            job_item = self.queue.pop() +        self.current_job.push_right(self.queue.pop()) +        return self.current_job[0] + +    async def mark_job_done(self): +        assert not self.current_job.empty() +        self.current_job.pop() + + +    def dump_queue(self, filename): +        with u8open(filename, 'w') as f: +            f.write(self.current_job.dump()) +            f.write(self.queue.dump()) + +    def reload_from_filename(self, filename): +        with u8open(filename) as f: +            items = f.read().parse() +        if  + + +class PersistentJobQueue:      def __init__(self, filename):          self.filename = filename @@ -21,10 +60,20 @@ class PersistentQueue:      def append(self, item):          async with lock: +            self.queue.append(item)              with u8open(self.filename, 'a') as f:                  await f.write(item)                  await f.write('\n') +    async def get_job(self): +        async with lock: +            self.current_job.append(self.queue.pop()) +        return self.current_job[0] + +    async def mark_job_done(self): +        async with lock: +            self.current_job.pop() +      async def pop(self):          # TODO: when wanting to pop an empty queue the pop should wait          # asynchronously for a new filename to arrive | 
