diff options
| author | VG <vg@devys.org> | 2016-03-29 17:43:04 +0200 | 
|---|---|---|
| committer | VG <vg@devys.org> | 2016-03-29 17:43:04 +0200 | 
| commit | 309621421193c2eebd6362df6b8850407bc77df9 (patch) | |
| tree | cdd120b47f958a03746fa5b49d48cec85e28664b /tests | |
| parent | 976602f3d3d6cd22c7357821117a14b67d9edf46 (diff) | |
| download | wqueue-309621421193c2eebd6362df6b8850407bc77df9.tar.gz wqueue-309621421193c2eebd6362df6b8850407bc77df9.tar.bz2 wqueue-309621421193c2eebd6362df6b8850407bc77df9.zip | |
wip
Diffstat (limited to 'tests')
| -rwxr-xr-x | tests/wqueue2.py | 43 | 
1 files changed, 42 insertions, 1 deletions
| diff --git a/tests/wqueue2.py b/tests/wqueue2.py index a78f315..4910b2f 100755 --- a/tests/wqueue2.py +++ b/tests/wqueue2.py @@ -1,7 +1,46 @@ -#!/usr/bin/python3.5 +""" +Manage a queue of files for processing them sequentialy. +""" +  import asyncio  import os +import functools +import shutils + + +u8open = functools.partial(open, encoding='utf8') + + +class PersistentQueue: + +    def __init__(self, filename): +        self.filename = filename +        self.lock = asyncio.Lock() + + +    def append(self, item): +        async with lock: +            with u8open(self.filename, 'a') as f: +                await f.write(item) +                await f.write('\n') + +    async def pop(self): +        # TODO: when wanting to pop an empty queue the pop should wait +        # asynchronously for a new filename to arrive +        error +        firstline = None +        async with lock: +            with u8open(filename, 'r') as f, \ +                       u8open(filename + '.tmp', 'w') as fo: +                async for line in f: +                    if not firstline: +                        firstline = line +                    else: +                        await fo.write(line) +            shutils.mv(filename + '.tmp', filename) +        return firstline +  async def test():      print('sleeping...') @@ -26,8 +65,10 @@ async def manage_client(reader, writer):  async def manage_jobs(): +    pq = PersistentQueue('queue.txt')      queue = []      while True: +        item = await pq.pop()          async with open('queue.txt') as f:              line = await next(f)              if not line: | 
