summaryrefslogtreecommitdiffstats
path: root/tests/wqueue2.py
blob: 0a3791f1475ba5b37ed13b0527ef1981fceeaaec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Manage a queue of files for processing them sequentialy.
"""


import asyncio
import os
import functools
import shutils


u8open = functools.partial(open, encoding='utf8')


class PersistentJobQueue2:

    def __init__(self, filename, loop=None):
        self.queue = deque.deque()
        self.current_job = deque.deque()
        self.event = asyncio.Event(loop=loop)
        self.loop = loop

    def push_job(self, item):
        self.queue.push_right(item)
        self.event.set()

    async def get_job(self, item):
        if not self.current_job.empty():
            return self.current_job[0]
        if not self.queue.empty():
            job_item = self.queue.pop()
        else:
            self.event.clear()
            await self.event.wait()
            job_item = self.queue.pop()
        self.current_job.push_right(self.queue.pop())
        return self.current_job[0]

    async def mark_job_done(self):
        assert not self.current_job.empty()
        self.current_job.pop()

    def dump_queue(self, filename):
        '''
            note: current file is not saved since it would be inconsistent if
            the queue is reloaded but file has finished being processed
            between.

            Thus a file being processed is considered either done if done
            between dump and reload, or failed if the queue is shut down
            before its end.

            The view here is we do not want to redo the work on a file, if the
            wanted result is a success: do not close the queue.
        '''
        with u8open(filename, 'w') as f:
            f.write(self.queue.dump())

    def reload_from_filename(self, filename):
        with u8open(filename) as f:
            items = f.read().parse()


class PersistentJobQueue:

    def __init__(self, filename):
        self.filename = filename
        self.lock = asyncio.Lock()


    def append(self, item):
        async with lock:
            self.queue.append(item)
            with u8open(self.filename, 'a') as f:
                await f.write(item)
                await f.write('\n')

    async def get_job(self):
        async with lock:
            self.current_job.append(self.queue.pop())
        return self.current_job[0]

    async def mark_job_done(self):
        async with lock:
            self.current_job.pop()

    async def pop(self):
        # TODO: when wanting to pop an empty queue the pop should wait
        # asynchronously for a new filename to arrive
        error
        firstline = None
        async with lock:
            with u8open(filename, 'r') as f, \
                       u8open(filename + '.tmp', 'w') as fo:
                async for line in f:
                    if not firstline:
                        firstline = line
                    else:
                        await fo.write(line)
            shutils.mv(filename + '.tmp', filename)
        return firstline


async def test():
    print('sleeping...')
    await asyncio.sleep(3)
    print('slept')


async def manage_client(reader, writer):
    print('='*40)
    print('manage_client created')
    print('called on connection only ?')
    await asyncio.sleep(1)
    print('after sleep1 in server()')
    while True:
        line = await reader.readline()
        if not line:
            break
        print('line', line)
        writer.write(line)
    writer.close()
    print('end of manage_client')


async def manage_jobs():
    pq = PersistentQueue('queue.txt')
    queue = []
    while True:
        item = await pq.pop()
        async with open('queue.txt') as f:
            line = await next(f)
            if not line:
                # schedule new job
            print('executing next scheduled job')

def read_next_job_arg():
    with open(QUEUE_FILE, 'r') as f:
        line = f.readline()


def main():
    print('getting event loop')
    loop = asyncio.get_event_loop()
    print('got event loop')
    # loop.call_soon(test)
    try:
        os.unlink('server_sock')
        print('creating coro...')
        coro = asyncio.start_unix_server(manage_client, path='server_sock')
        print('coro created')
        loop.run_until_complete(coro)
        print('coro returned')
        # loop.run_until_complete(test())
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    print('loop closed')
    loop.close()


if __name__ == '__main__':
    main()