diff options
Diffstat (limited to 'gamechestcli/gamechest/runners/runnermulti.py')
-rw-r--r-- | gamechestcli/gamechest/runners/runnermulti.py | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/gamechestcli/gamechest/runners/runnermulti.py b/gamechestcli/gamechest/runners/runnermulti.py new file mode 100644 index 0000000..2a5e17c --- /dev/null +++ b/gamechestcli/gamechest/runners/runnermulti.py @@ -0,0 +1,91 @@ +import contextlib +import os +import selectors +import threading +from functools import partial + +from ..structures import Progress +from .runnerbase import RunnerBase + + +def _create_pipe(): + pipe_rd, pipe_wd = os.pipe() + return open(pipe_rd, 'rb', buffering=0), open(pipe_wd, 'wb', buffering=0) + + +class MultiSequencialRunnerBase(RunnerBase): + + def __init__(self, runners): + super().__init__() + self.runners = runners + self._pipe_rd, self._pipe_wd = _create_pipe() + self._pipesig_rd, self._pipesig_wd = _create_pipe() + self._thread = threading.Thread(target=self._thread_target) + self._thread.start() + self._last_progress = Progress() + self._last_rc = -1 + + def _runner_run(self, + runner_callable, + step_index, + runners_count): + with contextlib.ExitStack() as stack: + runner = stack.enter_context(runner_callable()) + selector = stack.enter_context(selectors.DefaultSelector()) + selector.register(self._pipesig_rd, selectors.EVENT_READ) + selector.register(runner.get_read_fd(), selectors.EVENT_READ) + while (rc := runner.poll()) is None: + for key, events in selector.select(): + if key.fileobj is self._pipesig_rd: + self._pipesig_rd.read(1) + return -1 + progress = runner.progress_read() + self._last_progress = progress._replace( + step=step_index, + steps_count=runners_count, + ) + self._pipe_wd.write(b'p') # (p)rogress, could be anything + return rc + + def _thread_target(self): + runners_count = len(self.runners) + with contextlib.ExitStack() as stack: + for step_index, runner_callable in enumerate(self.runners): + self._last_rc = self._runner_run(runner_callable, step_index, + runners_count) + if self._last_rc != 0: + break + # closing writing end of a pipe, allows select on reading-end to + # immediately return and reading on reading-end will returns EOF (or + # empty string on python). + self._pipe_wd.close() + + def get_read_fd(self): + return self._pipe_rd + + def progress_read(self): + # read: discard byte used to signal progress, and ignore if EOF. + self._pipe_rd.read(1) + return self._last_progress + + def terminate(self): + # ignore ValueError, can arrive on closed _pipesig_wd if terminate is + # called multiple times. + with contextlib.suppress(ValueError): + self._pipesig_wd.write(b't') # t for terminate, could be anything + # close to avoid filling the pipe buffer uselessly if called multiple + # times, since nothing will read next sent bytes. + self._pipesig_wd.close() + + def poll(self): + return None if self._thread.is_alive() else self._last_rc + + def close(self): + self._thread.join() + for fd in ( + self._pipesig_rd, + self._pipesig_wd, + self._pipe_rd, + self._pipe_wd, + ): + fd.close() |