summaryrefslogtreecommitdiffstats
path: root/tests/wqueue2.py
blob: 4c2cd4ccb522f7b0c140f1e172b2049f0bfd27d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
Manage a queue of files for processing them sequentialy.
"""


import asyncio
import os
import functools
import shutils
import collections


u8open = functools.partial(open, encoding='utf8')


class PersistentQueue3:

    def __init__(self, filename=None):
        self.queue = collections.deque()
        self.current_job = collections.deque()
        self.event = asyncio.Event()
        self.filename = filename

    def push_job(self, item):
        self.queue.append(item)
        self.event.set()

    async def get_job(self, item):
        if len(self.current_job):
            return self.current_job[0]
        if len(self.queue):
            job_item = self.queue.pop()
        else:
            self.event.clear()
            await self.event.wait()
            job_item = self.queue.pop()
        self.current_job.append(job_item))
        return self.current_job[0]

    async def mark_job_done(self):
        assert len(self.current_job)
        self.current_job.pop()

    def dump_queue(self, filename=None):
        '''
            dump and clean the queue in order for it to be edited.

            note: current file is not saved since it would be inconsistent if
            the queue is reloaded but file has finished being processed
            between.

            Thus a file being processed is considered either done if done
            between dump and reload, or failed if the queue is shut down
            before its end.

            The view here is we do not want to redo the work on a file, if the
            wanted result is a success: do not close the queue.
        '''
        filename = filename if filename else self.filename
        with u8open(filename, 'w') as f:
            for item in self.queue:
                print(item, file=f)
        self.queue.clear()

    def reload_from_filename(self, filename=None):
        '''
            reloads a queue after editing it externaly.
        '''
        filename = filename if filename else self.filename
        with u8open(filename) as f:
            for item in f:
                self.queue.append(item)
        self.event.set()


async def do_the_job(job=None):
    assert job
    print('='*50)
    print('Current job:', item)
    do_the_job_here with await
    print('Current job is done.')


async def manage_client(reader, writer):
    print('='*40)
    print('manage_client created')
    print('called on connection only ?')
    await asyncio.sleep(1)
    print('after sleep1 in server()')
    while True:
        line = await reader.readline()
        if not line:
            break
        print('line', line)
        writer.write(line)
    writer.close()
    print('end of manage_client')


async def jobs_runner():
    jobs = PersistentQueue3(filename='queue.txt')
    queue = []
    while True:
        job = await jobs.get_job()
        await do_the_job(job=job)
        jobs.mark_job_done()


def main():
    print('getting event loop')
    loop = asyncio.get_event_loop()
    print('got event loop')
    loop.call_soon(jobs_runner)
    try:
        os.unlink('server_sock')
        print('creating coro...')
        coro = asyncio.start_unix_server(manage_client, path='server_sock')
        print('coro created')
        loop.run_until_complete(coro)
        print('coro returned')
        # loop.run_until_complete(test())
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    print('loop closed')
    loop.close()


if __name__ == '__main__':
    main()