summaryrefslogtreecommitdiffstats
path: root/gamechestcli/gamechest/runners/runnermulti.py
blob: 2a5e17ced9fbf0d4e349c78f37437a6deb7f3434 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import contextlib
import os
import selectors
import threading
from functools import partial

from ..structures import Progress
from .runnerbase import RunnerBase


def _create_pipe():
    pipe_rd, pipe_wd = os.pipe()
    return open(pipe_rd, 'rb', buffering=0), open(pipe_wd, 'wb', buffering=0)


class MultiSequencialRunnerBase(RunnerBase):

    def __init__(self, runners):
        super().__init__()
        self.runners = runners
        self._pipe_rd, self._pipe_wd = _create_pipe()
        self._pipesig_rd, self._pipesig_wd = _create_pipe()
        self._thread = threading.Thread(target=self._thread_target)
        self._thread.start()
        self._last_progress = Progress()
        self._last_rc = -1

    def _runner_run(self,
                    runner_callable,
                    step_index,
                    runners_count):
        with contextlib.ExitStack() as stack:
            runner = stack.enter_context(runner_callable())
            selector = stack.enter_context(selectors.DefaultSelector())
            selector.register(self._pipesig_rd, selectors.EVENT_READ)
            selector.register(runner.get_read_fd(), selectors.EVENT_READ)
            while (rc := runner.poll()) is None:
                for key, events in selector.select():
                    if key.fileobj is self._pipesig_rd:
                        self._pipesig_rd.read(1)
                        return -1
                    progress = runner.progress_read()
                    self._last_progress = progress._replace(
                        step=step_index,
                        steps_count=runners_count,
                    )
                    self._pipe_wd.write(b'p') # (p)rogress, could be anything
        return rc

    def _thread_target(self):
        runners_count = len(self.runners)
        with contextlib.ExitStack() as stack:
            for step_index, runner_callable in enumerate(self.runners):
                self._last_rc = self._runner_run(runner_callable, step_index,
                                          runners_count)
                if self._last_rc != 0:
                    break
        # closing writing end of a pipe, allows select on reading-end to
        # immediately return and reading on reading-end will returns EOF (or
        # empty string on python).
        self._pipe_wd.close()

    def get_read_fd(self):
        return self._pipe_rd

    def progress_read(self):
        # read: discard byte used to signal progress, and ignore if EOF.
        self._pipe_rd.read(1)
        return self._last_progress

    def terminate(self):
        # ignore ValueError, can arrive on closed _pipesig_wd if terminate is
        # called multiple times.
        with contextlib.suppress(ValueError):
            self._pipesig_wd.write(b't') # t for terminate, could be anything
        # close to avoid filling the pipe buffer uselessly if called multiple
        # times, since nothing will read next sent bytes.
        self._pipesig_wd.close()

    def poll(self):
        return None if self._thread.is_alive() else self._last_rc

    def close(self):
        self._thread.join()
        for fd in (
            self._pipesig_rd,
            self._pipesig_wd,
            self._pipe_rd,
            self._pipe_wd,
        ):
            fd.close()