1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
import contextlib
import os
import selectors
import threading
from functools import partial
from ..structures import Progress
from .runnerbase import RunnerBase
def _create_pipe():
pipe_rd, pipe_wd = os.pipe()
return open(pipe_rd, 'rb', buffering=0), open(pipe_wd, 'wb', buffering=0)
class MultiSequencialRunnerBase(RunnerBase):
def __init__(self, runners):
super().__init__()
self.runners = runners
self._pipe_rd, self._pipe_wd = _create_pipe()
self._pipesig_rd, self._pipesig_wd = _create_pipe()
self._thread = threading.Thread(target=self._thread_target)
self._thread.start()
self._last_progress = Progress()
self._last_rc = -1
def _runner_run(self,
runner_callable,
step_index,
runners_count):
with contextlib.ExitStack() as stack:
runner = stack.enter_context(runner_callable())
selector = stack.enter_context(selectors.DefaultSelector())
selector.register(self._pipesig_rd, selectors.EVENT_READ)
selector.register(runner.get_read_fd(), selectors.EVENT_READ)
while (rc := runner.poll()) is None:
for key, events in selector.select():
if key.fileobj is self._pipesig_rd:
self._pipesig_rd.read(1)
return -1
progress = runner.progress_read()
self._last_progress = progress._replace(
step=step_index,
steps_count=runners_count,
)
self._pipe_wd.write(b'p') # (p)rogress, could be anything
return rc
def _thread_target(self):
runners_count = len(self.runners)
with contextlib.ExitStack() as stack:
for step_index, runner_callable in enumerate(self.runners):
self._last_rc = self._runner_run(runner_callable, step_index,
runners_count)
if self._last_rc != 0:
break
# closing writing end of a pipe, allows select on reading-end to
# immediately return and reading on reading-end will returns EOF (or
# empty string on python).
self._pipe_wd.close()
def get_read_fd(self):
return self._pipe_rd
def progress_read(self):
# read: discard byte used to signal progress, and ignore if EOF.
self._pipe_rd.read(1)
return self._last_progress
def terminate(self):
# ignore ValueError, can arrive on closed _pipesig_wd if terminate is
# called multiple times.
with contextlib.suppress(ValueError):
self._pipesig_wd.write(b't') # t for terminate, could be anything
# close to avoid filling the pipe buffer uselessly if called multiple
# times, since nothing will read next sent bytes.
self._pipesig_wd.close()
def poll(self):
return None if self._thread.is_alive() else self._last_rc
def close(self):
self._thread.join()
for fd in (
self._pipesig_rd,
self._pipesig_wd,
self._pipe_rd,
self._pipe_wd,
):
fd.close()
|