summaryrefslogtreecommitdiffstats
path: root/gamechestcli/gamechest/runners/runnermulti.py
diff options
context:
space:
mode:
Diffstat (limited to 'gamechestcli/gamechest/runners/runnermulti.py')
-rw-r--r--gamechestcli/gamechest/runners/runnermulti.py91
1 files changed, 91 insertions, 0 deletions
diff --git a/gamechestcli/gamechest/runners/runnermulti.py b/gamechestcli/gamechest/runners/runnermulti.py
new file mode 100644
index 0000000..2a5e17c
--- /dev/null
+++ b/gamechestcli/gamechest/runners/runnermulti.py
@@ -0,0 +1,91 @@
+import contextlib
+import os
+import selectors
+import threading
+from functools import partial
+
+from ..structures import Progress
+from .runnerbase import RunnerBase
+
+
+def _create_pipe():
+ pipe_rd, pipe_wd = os.pipe()
+ return open(pipe_rd, 'rb', buffering=0), open(pipe_wd, 'wb', buffering=0)
+
+
+class MultiSequencialRunnerBase(RunnerBase):
+
+ def __init__(self, runners):
+ super().__init__()
+ self.runners = runners
+ self._pipe_rd, self._pipe_wd = _create_pipe()
+ self._pipesig_rd, self._pipesig_wd = _create_pipe()
+ self._thread = threading.Thread(target=self._thread_target)
+ self._thread.start()
+ self._last_progress = Progress()
+ self._last_rc = -1
+
+ def _runner_run(self,
+ runner_callable,
+ step_index,
+ runners_count):
+ with contextlib.ExitStack() as stack:
+ runner = stack.enter_context(runner_callable())
+ selector = stack.enter_context(selectors.DefaultSelector())
+ selector.register(self._pipesig_rd, selectors.EVENT_READ)
+ selector.register(runner.get_read_fd(), selectors.EVENT_READ)
+ while (rc := runner.poll()) is None:
+ for key, events in selector.select():
+ if key.fileobj is self._pipesig_rd:
+ self._pipesig_rd.read(1)
+ return -1
+ progress = runner.progress_read()
+ self._last_progress = progress._replace(
+ step=step_index,
+ steps_count=runners_count,
+ )
+ self._pipe_wd.write(b'p') # (p)rogress, could be anything
+ return rc
+
+ def _thread_target(self):
+ runners_count = len(self.runners)
+ with contextlib.ExitStack() as stack:
+ for step_index, runner_callable in enumerate(self.runners):
+ self._last_rc = self._runner_run(runner_callable, step_index,
+ runners_count)
+ if self._last_rc != 0:
+ break
+ # closing writing end of a pipe, allows select on reading-end to
+ # immediately return and reading on reading-end will returns EOF (or
+ # empty string on python).
+ self._pipe_wd.close()
+
+ def get_read_fd(self):
+ return self._pipe_rd
+
+ def progress_read(self):
+ # read: discard byte used to signal progress, and ignore if EOF.
+ self._pipe_rd.read(1)
+ return self._last_progress
+
+ def terminate(self):
+ # ignore ValueError, can arrive on closed _pipesig_wd if terminate is
+ # called multiple times.
+ with contextlib.suppress(ValueError):
+ self._pipesig_wd.write(b't') # t for terminate, could be anything
+ # close to avoid filling the pipe buffer uselessly if called multiple
+ # times, since nothing will read next sent bytes.
+ self._pipesig_wd.close()
+
+ def poll(self):
+ return None if self._thread.is_alive() else self._last_rc
+
+ def close(self):
+ self._thread.join()
+ for fd in (
+ self._pipesig_rd,
+ self._pipesig_wd,
+ self._pipe_rd,
+ self._pipe_wd,
+ ):
+ fd.close()