summaryrefslogtreecommitdiffstats
path: root/gamechestcli/gamechest/runners
diff options
context:
space:
mode:
authorvg <vgm+dev@devys.org>2022-09-16 14:51:12 +0200
committervg <vgm+dev@devys.org>2022-09-16 14:51:12 +0200
commit5143dc3af380f65536ef624472e27ca8c86b65df (patch)
tree8245e5dca2882a0c5a7622301350cb930f12c5d1 /gamechestcli/gamechest/runners
parent77c76b4e0407250fc0ba8ee0309a528a4a8f1f09 (diff)
downloadgamechest-5143dc3af380f65536ef624472e27ca8c86b65df.tar.gz
gamechest-5143dc3af380f65536ef624472e27ca8c86b65df.tar.bz2
gamechest-5143dc3af380f65536ef624472e27ca8c86b65df.zip
git-sync on seele
Diffstat (limited to 'gamechestcli/gamechest/runners')
-rw-r--r--gamechestcli/gamechest/runners/download.py68
-rw-r--r--gamechestcli/gamechest/runners/extract.py96
-rw-r--r--gamechestcli/gamechest/runners/install.py42
-rw-r--r--gamechestcli/gamechest/runners/remove.py150
-rw-r--r--gamechestcli/gamechest/runners/runnerbase.py44
-rw-r--r--gamechestcli/gamechest/runners/runnermulti.py91
6 files changed, 491 insertions, 0 deletions
diff --git a/gamechestcli/gamechest/runners/download.py b/gamechestcli/gamechest/runners/download.py
new file mode 100644
index 0000000..0dfbd05
--- /dev/null
+++ b/gamechestcli/gamechest/runners/download.py
@@ -0,0 +1,68 @@
+import os
+import re
+import subprocess
+
+import humanfriendly
+
+from ..structures import Progress
+from .runnerbase import RunnerBase, neutral_locale_variables
+
+
+class Download(RunnerBase):
+
+ _rsync_progress_re = re.compile(r'^\s*(\S+)\s+(\d+)%\s+(\S+)\s+(\S+)\s+$')
+
+ def __init__(self, src, dst):
+ self.proc = subprocess.Popen(
+ [
+ 'rsync',
+ '--partial',
+ # --no-h: not human, easier to parse (but speed still appears
+ # in human form).
+ '--no-h',
+ '--info=progress2',
+ src,
+ dst,
+ ],
+ stdout=subprocess.PIPE,
+ encoding='utf8',
+ env={**os.environ,
+ **neutral_locale_variables,
+ },
+ )
+ self.last_progress = Progress()
+
+ def get_read_fd(self):
+ return self.proc.stdout
+
+ def progress_read(self):
+ line = self.proc.stdout.readline()
+ if match := self._rsync_progress_re.search(line):
+ self.last_progress = Progress(
+ nbbytes=int(match.group(1)),
+ percent=int(match.group(2)),
+ speed=humanfriendly.parse_size(match.group(3), binary=True),
+ eta=match.group(4),
+ )
+ return self.last_progress
+
+ def terminate(self):
+ self.proc.terminate()
+
+ def poll(self):
+ return self.proc.poll()
+
+ def close(self):
+ self.proc.wait()
+
+
+if __name__ == '__main__':
+ import sys
+ import contextlib
+ with contextlib.ExitStack() as stack:
+ stack.enter_context(contextlib.suppress(KeyboardInterrupt))
+ runner = stack.enter_context(Download(sys.argv[1], sys.argv[2]))
+ while (rc := runner.poll()) is None:
+ print(runner.progress_read())
+ print('runner ended with code:', rc)
+ print('test main ended')
diff --git a/gamechestcli/gamechest/runners/extract.py b/gamechestcli/gamechest/runners/extract.py
new file mode 100644
index 0000000..e8011a7
--- /dev/null
+++ b/gamechestcli/gamechest/runners/extract.py
@@ -0,0 +1,96 @@
+import os
+import re
+import subprocess
+
+import humanfriendly
+
+from ..structures import Progress
+from .runnerbase import RunnerBase, neutral_locale_variables
+
+
+class Extract(RunnerBase):
+
+ _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$')
+
+ def __init__(self, src, dst):
+ common_parameters = dict(
+ encoding='utf8',
+ env={**os.environ,
+ **neutral_locale_variables,
+ },
+ )
+ self.src_size = os.stat(src).st_size
+ self.pv_proc = subprocess.Popen(
+ [
+ 'pv',
+ '--force',
+ '--format', '%b %a %e',
+ src,
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ **common_parameters,
+ )
+ lzip_command = '/usr/bin/lzip'
+ plzip_command = '/usr/bin/plzip'
+ lzip_command_to_use = lzip_command
+ if os.path.exists(plzip_command):
+ lzip_command_to_use = plzip_command
+ self.zip_proc = subprocess.Popen(
+ [
+ lzip_command_to_use,
+ '--decompress',
+ ],
+ stdin=self.pv_proc.stdout,
+ stdout=subprocess.PIPE,
+ **common_parameters,
+ )
+ self.tar_proc = subprocess.Popen(
+ [
+ 'tar',
+ '-C', dst,
+ '-xf', '-'
+ ],
+ stdin=self.zip_proc.stdout,
+ **common_parameters,
+ )
+ self.last_progress = Progress()
+
+ def get_read_fd(self):
+ return self.pv_proc.stderr
+
+ def progress_read(self):
+ line = self.pv_proc.stderr.readline()
+ if match := self._progress_re.search(line):
+ written_bytes = humanfriendly.parse_size(match.group(1))
+ self.last_progress = Progress(
+ nbbytes=written_bytes,
+ percent=int(100 * written_bytes / self.src_size),
+ speed=humanfriendly.parse_size(match.group(2)),
+ eta=match.group(3),
+ )
+ return self.last_progress
+
+ def terminate(self):
+ for proc in (self.pv_proc, self.zip_proc, self.tar_proc):
+ proc.terminate()
+
+ def poll(self):
+ return self.tar_proc.poll()
+
+ def close(self):
+ self.pv_proc.wait()
+ self.zip_proc.wait()
+ self.tar_proc.wait()
+
+
+if __name__ == '__main__':
+ import sys
+ import contextlib
+ with contextlib.suppress(KeyboardInterrupt):
+ with Extract(sys.argv[1], sys.argv[2]) as runner:
+ while runner.poll() is None:
+ print(runner.progress_read())
+ rc = runner.poll()
+ print('ended with code:', rc)
+ print('test main ended')
diff --git a/gamechestcli/gamechest/runners/install.py b/gamechestcli/gamechest/runners/install.py
new file mode 100644
index 0000000..a827e25
--- /dev/null
+++ b/gamechestcli/gamechest/runners/install.py
@@ -0,0 +1,42 @@
+import functools
+import os
+
+from .runnermulti import MultiSequencialRunnerBase
+from .download import Download
+from .extract import Extract
+from .remove import Remove
+
+
+class Install(MultiSequencialRunnerBase):
+
+ def __init__(self, source, dest):
+ filename = os.path.split(source)[1]
+ tmpdest = os.path.join(dest, f'{filename}.rsynctmp')
+ runners = [
+ functools.partial(Download, source, tmpdest),
+ functools.partial(Extract, tmpdest, dest),
+ functools.partial(Remove, tmpdest),
+ ]
+ super().__init__(runners)
+
+
+if __name__ == '__main__':
+ import sys
+ import contextlib
+ import time
+ import selectors
+ print('main test')
+ with contextlib.ExitStack() as stack:
+ stack.enter_context(contextlib.suppress(KeyboardInterrupt))
+ runner = stack.enter_context(Install(sys.argv[1], sys.argv[2]))
+ selector = stack.enter_context(selectors.DefaultSelector())
+ selector.register(runner.get_read_fd(), selectors.EVENT_READ)
+ last_progress = None
+ while (rc := runner.poll()) is None:
+ if selector.select():
+ progress = runner.progress_read()
+ if progress != last_progress:
+ print(progress)
+ last_progress = progress
+ print('ended with code:', rc)
+ print('test main ended')
diff --git a/gamechestcli/gamechest/runners/remove.py b/gamechestcli/gamechest/runners/remove.py
new file mode 100644
index 0000000..99c4247
--- /dev/null
+++ b/gamechestcli/gamechest/runners/remove.py
@@ -0,0 +1,150 @@
+import os
+import re
+import subprocess
+import threading
+
+import humanfriendly
+
+from ..structures import Progress
+from .runnerbase import RunnerBase, neutral_locale_variables
+
+
+class Remove(RunnerBase):
+
+ _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$')
+
+ def __init__(self, path):
+ self.last_progress = Progress(linemode=True)
+ self.path = path
+ self.rm_proc = None
+ self.pv_proc = None
+ self.pipe = os.pipe()
+ self.read_fd = open(self.pipe[0], 'r', encoding='utf8')
+ #self.read_fd = open(self.pipe[0], 'rb', buffering=0)
+ self._proc_started = False
+ self._filescount = 0
+ self._counting_quit_event = threading.Event()
+ self._counting_thread = threading.Thread(
+ target=self._counting_worker,
+ args=(self._counting_quit_event, path),
+ )
+ self._counting_thread.start()
+
+ def _start_proc(self):
+ common_parameters = dict(
+ encoding='utf8',
+ env={**os.environ,
+ **neutral_locale_variables,
+ },
+ )
+ self.rm_proc = subprocess.Popen(
+ [
+ 'rm',
+ '--verbose',
+ '--recursive',
+ '--one-file-system',
+ '--preserve-root=all',
+ '--interactive=never',
+ '--',
+ self.path,
+ ],
+ # [ # only for testing
+ # 'testtools/fake-rm',
+ # self.path,
+ # ],
+ stdout=subprocess.PIPE,
+ **common_parameters,
+ )
+ self.pv_proc = subprocess.Popen(
+ [
+ 'pv',
+ '--force',
+ '--size', str(self._filescount),
+ '--format', '%b %a %e',
+ '--line-mode',
+ ],
+ stdin=self.rm_proc.stdout,
+ stdout=subprocess.DEVNULL,
+ stderr=self.pipe[1],
+ **common_parameters,
+ )
+ # close child's pipe fd on parent's side or selectors will hang if
+ # process closes.
+ os.close(self.pipe[1])
+ self._proc_started = True
+
+ def _counting_worker(self, event, path):
+ # as counting files for a deep directory can take some time, make it
+ # interruptible with a thread and event for cancelling.
+ # using functools.reduce(x+1+len(y[2]), os.walk) would have been much
+ # cleaner, but the code gets harder to read with the addition of the
+ # event checking. Fallback to accumalator and iterations.
+ total = 0
+ for _, _, files in os.walk(path):
+ total += 1 + len(files)
+ if event.is_set():
+ break
+ # set filescount with calculated total (even then interrupted, as it
+ # would be a better estimation than nothing).
+ if os.path.isfile(path):
+ self._filescount = 1
+ else:
+ self._filescount = total
+ # start next processes (if event was not set)
+ if not event.is_set():
+ self._start_proc()
+
+ def get_read_fd(self):
+ return self.read_fd
+
+ def progress_read(self):
+ if not self._proc_started:
+ return self.last_progress
+ line = self.read_fd.readline()
+ #line = self.read_fd.readline().decode('utf8')
+ if match := self._progress_re.search(line):
+ written_bytes = humanfriendly.parse_size(match.group(1))
+ self.last_progress = Progress(
+ linemode=True,
+ nbbytes=written_bytes,
+ percent=int(100 * written_bytes / self._filescount),
+ speed=humanfriendly.parse_size(match.group(2)),
+ eta=match.group(3),
+ )
+ return self.last_progress
+
+ def terminate(self):
+ self._counting_quit_event.set()
+ if self._proc_started:
+ for proc in (self.rm_proc, self.pv_proc):
+ proc.terminate()
+
+ def poll(self):
+ if not self._proc_started:
+ return None
+ return self.pv_proc.poll()
+
+ def close(self):
+ self._counting_thread.join()
+ if self._proc_started:
+ for proc in (self.rm_proc, self.pv_proc):
+ proc.wait()
+ self.read_fd.close()
+
+
+if __name__ == '__main__':
+ import sys
+ import contextlib
+ import time
+ import selectors
+
+ with contextlib.ExitStack() as stack:
+ stack.enter_context(contextlib.suppress(KeyboardInterrupt))
+ runner = stack.enter_context(Remove(sys.argv[1]))
+ selector = stack.enter_context(selectors.DefaultSelector())
+ selector.register(runner.get_read_fd(), selectors.EVENT_READ)
+ while (rc := runner.poll()) is None:
+ if selector.select():
+ print(runner.progress_read())
+ print('ended with code:', rc)
+ print('test main ended')
diff --git a/gamechestcli/gamechest/runners/runnerbase.py b/gamechestcli/gamechest/runners/runnerbase.py
new file mode 100644
index 0000000..ac64ebb
--- /dev/null
+++ b/gamechestcli/gamechest/runners/runnerbase.py
@@ -0,0 +1,44 @@
+from abc import ABCMeta, abstractmethod
+
+
+class RunnerBase(metaclass=ABCMeta):
+
+ @abstractmethod
+ def get_read_fd(self):
+ 'fd to select for read evts to know when to progress_read nonblockingly'
+ raise NotImplementedError
+
+ @abstractmethod
+ def progress_read(self):
+ 'parse the fd given by get_read_fd and returns current progress'
+ raise NotImplementedError
+
+ @abstractmethod
+ def terminate(self):
+ '''signal runner(s) to cancel immediately the operation, must
+ be idempotent if already terminated (not running)'''
+ raise NotImplementedError
+
+ @abstractmethod
+ def poll(self):
+ 'returns None if still running, otherwise return returncode'
+ raise NotImplementedError
+
+ def close(self):
+ 'frees resources, clean/unzombify/gc/close-fds, can block if running'
+
+ def __enter__(self):
+ 'returns self'
+ return self
+
+ def __exit__(self, exc_type, value, traceback):
+ 'terminate and wait garbage-collect'
+ self.terminate()
+ self.close()
+
+
+neutral_locale_variables = {
+ 'LC_ALL':'C.UTF-8',
+ 'LANG':'C.UTF-8',
+ 'LANGUAGE':'C.UTF-8',
+}
diff --git a/gamechestcli/gamechest/runners/runnermulti.py b/gamechestcli/gamechest/runners/runnermulti.py
new file mode 100644
index 0000000..2a5e17c
--- /dev/null
+++ b/gamechestcli/gamechest/runners/runnermulti.py
@@ -0,0 +1,91 @@
+import contextlib
+import os
+import selectors
+import threading
+from functools import partial
+
+from ..structures import Progress
+from .runnerbase import RunnerBase
+
+
+def _create_pipe():
+ pipe_rd, pipe_wd = os.pipe()
+ return open(pipe_rd, 'rb', buffering=0), open(pipe_wd, 'wb', buffering=0)
+
+
+class MultiSequencialRunnerBase(RunnerBase):
+
+ def __init__(self, runners):
+ super().__init__()
+ self.runners = runners
+ self._pipe_rd, self._pipe_wd = _create_pipe()
+ self._pipesig_rd, self._pipesig_wd = _create_pipe()
+ self._thread = threading.Thread(target=self._thread_target)
+ self._thread.start()
+ self._last_progress = Progress()
+ self._last_rc = -1
+
+ def _runner_run(self,
+ runner_callable,
+ step_index,
+ runners_count):
+ with contextlib.ExitStack() as stack:
+ runner = stack.enter_context(runner_callable())
+ selector = stack.enter_context(selectors.DefaultSelector())
+ selector.register(self._pipesig_rd, selectors.EVENT_READ)
+ selector.register(runner.get_read_fd(), selectors.EVENT_READ)
+ while (rc := runner.poll()) is None:
+ for key, events in selector.select():
+ if key.fileobj is self._pipesig_rd:
+ self._pipesig_rd.read(1)
+ return -1
+ progress = runner.progress_read()
+ self._last_progress = progress._replace(
+ step=step_index,
+ steps_count=runners_count,
+ )
+ self._pipe_wd.write(b'p') # (p)rogress, could be anything
+ return rc
+
+ def _thread_target(self):
+ runners_count = len(self.runners)
+ with contextlib.ExitStack() as stack:
+ for step_index, runner_callable in enumerate(self.runners):
+ self._last_rc = self._runner_run(runner_callable, step_index,
+ runners_count)
+ if self._last_rc != 0:
+ break
+ # closing writing end of a pipe, allows select on reading-end to
+ # immediately return and reading on reading-end will returns EOF (or
+ # empty string on python).
+ self._pipe_wd.close()
+
+ def get_read_fd(self):
+ return self._pipe_rd
+
+ def progress_read(self):
+ # read: discard byte used to signal progress, and ignore if EOF.
+ self._pipe_rd.read(1)
+ return self._last_progress
+
+ def terminate(self):
+ # ignore ValueError, can arrive on closed _pipesig_wd if terminate is
+ # called multiple times.
+ with contextlib.suppress(ValueError):
+ self._pipesig_wd.write(b't') # t for terminate, could be anything
+ # close to avoid filling the pipe buffer uselessly if called multiple
+ # times, since nothing will read next sent bytes.
+ self._pipesig_wd.close()
+
+ def poll(self):
+ return None if self._thread.is_alive() else self._last_rc
+
+ def close(self):
+ self._thread.join()
+ for fd in (
+ self._pipesig_rd,
+ self._pipesig_wd,
+ self._pipe_rd,
+ self._pipe_wd,
+ ):
+ fd.close()