diff options
author | vg <vgm+dev@devys.org> | 2022-09-16 14:51:12 +0200 |
---|---|---|
committer | vg <vgm+dev@devys.org> | 2022-09-16 14:51:12 +0200 |
commit | 5143dc3af380f65536ef624472e27ca8c86b65df (patch) | |
tree | 8245e5dca2882a0c5a7622301350cb930f12c5d1 /gamechestcli/gamechest/runners | |
parent | 77c76b4e0407250fc0ba8ee0309a528a4a8f1f09 (diff) | |
download | gamechest-5143dc3af380f65536ef624472e27ca8c86b65df.tar.gz gamechest-5143dc3af380f65536ef624472e27ca8c86b65df.tar.bz2 gamechest-5143dc3af380f65536ef624472e27ca8c86b65df.zip |
git-sync on seele
Diffstat (limited to 'gamechestcli/gamechest/runners')
-rw-r--r-- | gamechestcli/gamechest/runners/download.py | 68 | ||||
-rw-r--r-- | gamechestcli/gamechest/runners/extract.py | 96 | ||||
-rw-r--r-- | gamechestcli/gamechest/runners/install.py | 42 | ||||
-rw-r--r-- | gamechestcli/gamechest/runners/remove.py | 150 | ||||
-rw-r--r-- | gamechestcli/gamechest/runners/runnerbase.py | 44 | ||||
-rw-r--r-- | gamechestcli/gamechest/runners/runnermulti.py | 91 |
6 files changed, 491 insertions, 0 deletions
diff --git a/gamechestcli/gamechest/runners/download.py b/gamechestcli/gamechest/runners/download.py new file mode 100644 index 0000000..0dfbd05 --- /dev/null +++ b/gamechestcli/gamechest/runners/download.py @@ -0,0 +1,68 @@ +import os +import re +import subprocess + +import humanfriendly + +from ..structures import Progress +from .runnerbase import RunnerBase, neutral_locale_variables + + +class Download(RunnerBase): + + _rsync_progress_re = re.compile(r'^\s*(\S+)\s+(\d+)%\s+(\S+)\s+(\S+)\s+$') + + def __init__(self, src, dst): + self.proc = subprocess.Popen( + [ + 'rsync', + '--partial', + # --no-h: not human, easier to parse (but speed still appears + # in human form). + '--no-h', + '--info=progress2', + src, + dst, + ], + stdout=subprocess.PIPE, + encoding='utf8', + env={**os.environ, + **neutral_locale_variables, + }, + ) + self.last_progress = Progress() + + def get_read_fd(self): + return self.proc.stdout + + def progress_read(self): + line = self.proc.stdout.readline() + if match := self._rsync_progress_re.search(line): + self.last_progress = Progress( + nbbytes=int(match.group(1)), + percent=int(match.group(2)), + speed=humanfriendly.parse_size(match.group(3), binary=True), + eta=match.group(4), + ) + return self.last_progress + + def terminate(self): + self.proc.terminate() + + def poll(self): + return self.proc.poll() + + def close(self): + self.proc.wait() + + +if __name__ == '__main__': + import sys + import contextlib + with contextlib.ExitStack() as stack: + stack.enter_context(contextlib.suppress(KeyboardInterrupt)) + runner = stack.enter_context(Download(sys.argv[1], sys.argv[2])) + while (rc := runner.poll()) is None: + print(runner.progress_read()) + print('runner ended with code:', rc) + print('test main ended') diff --git a/gamechestcli/gamechest/runners/extract.py b/gamechestcli/gamechest/runners/extract.py new file mode 100644 index 0000000..e8011a7 --- /dev/null +++ b/gamechestcli/gamechest/runners/extract.py @@ -0,0 +1,96 @@ +import os +import re +import subprocess + +import humanfriendly + +from ..structures import Progress +from .runnerbase import RunnerBase, neutral_locale_variables + + +class Extract(RunnerBase): + + _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$') + + def __init__(self, src, dst): + common_parameters = dict( + encoding='utf8', + env={**os.environ, + **neutral_locale_variables, + }, + ) + self.src_size = os.stat(src).st_size + self.pv_proc = subprocess.Popen( + [ + 'pv', + '--force', + '--format', '%b %a %e', + src, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **common_parameters, + ) + lzip_command = '/usr/bin/lzip' + plzip_command = '/usr/bin/plzip' + lzip_command_to_use = lzip_command + if os.path.exists(plzip_command): + lzip_command_to_use = plzip_command + self.zip_proc = subprocess.Popen( + [ + lzip_command_to_use, + '--decompress', + ], + stdin=self.pv_proc.stdout, + stdout=subprocess.PIPE, + **common_parameters, + ) + self.tar_proc = subprocess.Popen( + [ + 'tar', + '-C', dst, + '-xf', '-' + ], + stdin=self.zip_proc.stdout, + **common_parameters, + ) + self.last_progress = Progress() + + def get_read_fd(self): + return self.pv_proc.stderr + + def progress_read(self): + line = self.pv_proc.stderr.readline() + if match := self._progress_re.search(line): + written_bytes = humanfriendly.parse_size(match.group(1)) + self.last_progress = Progress( + nbbytes=written_bytes, + percent=int(100 * written_bytes / self.src_size), + speed=humanfriendly.parse_size(match.group(2)), + eta=match.group(3), + ) + return self.last_progress + + def terminate(self): + for proc in (self.pv_proc, self.zip_proc, self.tar_proc): + proc.terminate() + + def poll(self): + return self.tar_proc.poll() + + def close(self): + self.pv_proc.wait() + self.zip_proc.wait() + self.tar_proc.wait() + + +if __name__ == '__main__': + import sys + import contextlib + with contextlib.suppress(KeyboardInterrupt): + with Extract(sys.argv[1], sys.argv[2]) as runner: + while runner.poll() is None: + print(runner.progress_read()) + rc = runner.poll() + print('ended with code:', rc) + print('test main ended') diff --git a/gamechestcli/gamechest/runners/install.py b/gamechestcli/gamechest/runners/install.py new file mode 100644 index 0000000..a827e25 --- /dev/null +++ b/gamechestcli/gamechest/runners/install.py @@ -0,0 +1,42 @@ +import functools +import os + +from .runnermulti import MultiSequencialRunnerBase +from .download import Download +from .extract import Extract +from .remove import Remove + + +class Install(MultiSequencialRunnerBase): + + def __init__(self, source, dest): + filename = os.path.split(source)[1] + tmpdest = os.path.join(dest, f'{filename}.rsynctmp') + runners = [ + functools.partial(Download, source, tmpdest), + functools.partial(Extract, tmpdest, dest), + functools.partial(Remove, tmpdest), + ] + super().__init__(runners) + + +if __name__ == '__main__': + import sys + import contextlib + import time + import selectors + print('main test') + with contextlib.ExitStack() as stack: + stack.enter_context(contextlib.suppress(KeyboardInterrupt)) + runner = stack.enter_context(Install(sys.argv[1], sys.argv[2])) + selector = stack.enter_context(selectors.DefaultSelector()) + selector.register(runner.get_read_fd(), selectors.EVENT_READ) + last_progress = None + while (rc := runner.poll()) is None: + if selector.select(): + progress = runner.progress_read() + if progress != last_progress: + print(progress) + last_progress = progress + print('ended with code:', rc) + print('test main ended') diff --git a/gamechestcli/gamechest/runners/remove.py b/gamechestcli/gamechest/runners/remove.py new file mode 100644 index 0000000..99c4247 --- /dev/null +++ b/gamechestcli/gamechest/runners/remove.py @@ -0,0 +1,150 @@ +import os +import re +import subprocess +import threading + +import humanfriendly + +from ..structures import Progress +from .runnerbase import RunnerBase, neutral_locale_variables + + +class Remove(RunnerBase): + + _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$') + + def __init__(self, path): + self.last_progress = Progress(linemode=True) + self.path = path + self.rm_proc = None + self.pv_proc = None + self.pipe = os.pipe() + self.read_fd = open(self.pipe[0], 'r', encoding='utf8') + #self.read_fd = open(self.pipe[0], 'rb', buffering=0) + self._proc_started = False + self._filescount = 0 + self._counting_quit_event = threading.Event() + self._counting_thread = threading.Thread( + target=self._counting_worker, + args=(self._counting_quit_event, path), + ) + self._counting_thread.start() + + def _start_proc(self): + common_parameters = dict( + encoding='utf8', + env={**os.environ, + **neutral_locale_variables, + }, + ) + self.rm_proc = subprocess.Popen( + [ + 'rm', + '--verbose', + '--recursive', + '--one-file-system', + '--preserve-root=all', + '--interactive=never', + '--', + self.path, + ], + # [ # only for testing + # 'testtools/fake-rm', + # self.path, + # ], + stdout=subprocess.PIPE, + **common_parameters, + ) + self.pv_proc = subprocess.Popen( + [ + 'pv', + '--force', + '--size', str(self._filescount), + '--format', '%b %a %e', + '--line-mode', + ], + stdin=self.rm_proc.stdout, + stdout=subprocess.DEVNULL, + stderr=self.pipe[1], + **common_parameters, + ) + # close child's pipe fd on parent's side or selectors will hang if + # process closes. + os.close(self.pipe[1]) + self._proc_started = True + + def _counting_worker(self, event, path): + # as counting files for a deep directory can take some time, make it + # interruptible with a thread and event for cancelling. + # using functools.reduce(x+1+len(y[2]), os.walk) would have been much + # cleaner, but the code gets harder to read with the addition of the + # event checking. Fallback to accumalator and iterations. + total = 0 + for _, _, files in os.walk(path): + total += 1 + len(files) + if event.is_set(): + break + # set filescount with calculated total (even then interrupted, as it + # would be a better estimation than nothing). + if os.path.isfile(path): + self._filescount = 1 + else: + self._filescount = total + # start next processes (if event was not set) + if not event.is_set(): + self._start_proc() + + def get_read_fd(self): + return self.read_fd + + def progress_read(self): + if not self._proc_started: + return self.last_progress + line = self.read_fd.readline() + #line = self.read_fd.readline().decode('utf8') + if match := self._progress_re.search(line): + written_bytes = humanfriendly.parse_size(match.group(1)) + self.last_progress = Progress( + linemode=True, + nbbytes=written_bytes, + percent=int(100 * written_bytes / self._filescount), + speed=humanfriendly.parse_size(match.group(2)), + eta=match.group(3), + ) + return self.last_progress + + def terminate(self): + self._counting_quit_event.set() + if self._proc_started: + for proc in (self.rm_proc, self.pv_proc): + proc.terminate() + + def poll(self): + if not self._proc_started: + return None + return self.pv_proc.poll() + + def close(self): + self._counting_thread.join() + if self._proc_started: + for proc in (self.rm_proc, self.pv_proc): + proc.wait() + self.read_fd.close() + + +if __name__ == '__main__': + import sys + import contextlib + import time + import selectors + + with contextlib.ExitStack() as stack: + stack.enter_context(contextlib.suppress(KeyboardInterrupt)) + runner = stack.enter_context(Remove(sys.argv[1])) + selector = stack.enter_context(selectors.DefaultSelector()) + selector.register(runner.get_read_fd(), selectors.EVENT_READ) + while (rc := runner.poll()) is None: + if selector.select(): + print(runner.progress_read()) + print('ended with code:', rc) + print('test main ended') diff --git a/gamechestcli/gamechest/runners/runnerbase.py b/gamechestcli/gamechest/runners/runnerbase.py new file mode 100644 index 0000000..ac64ebb --- /dev/null +++ b/gamechestcli/gamechest/runners/runnerbase.py @@ -0,0 +1,44 @@ +from abc import ABCMeta, abstractmethod + + +class RunnerBase(metaclass=ABCMeta): + + @abstractmethod + def get_read_fd(self): + 'fd to select for read evts to know when to progress_read nonblockingly' + raise NotImplementedError + + @abstractmethod + def progress_read(self): + 'parse the fd given by get_read_fd and returns current progress' + raise NotImplementedError + + @abstractmethod + def terminate(self): + '''signal runner(s) to cancel immediately the operation, must + be idempotent if already terminated (not running)''' + raise NotImplementedError + + @abstractmethod + def poll(self): + 'returns None if still running, otherwise return returncode' + raise NotImplementedError + + def close(self): + 'frees resources, clean/unzombify/gc/close-fds, can block if running' + + def __enter__(self): + 'returns self' + return self + + def __exit__(self, exc_type, value, traceback): + 'terminate and wait garbage-collect' + self.terminate() + self.close() + + +neutral_locale_variables = { + 'LC_ALL':'C.UTF-8', + 'LANG':'C.UTF-8', + 'LANGUAGE':'C.UTF-8', +} diff --git a/gamechestcli/gamechest/runners/runnermulti.py b/gamechestcli/gamechest/runners/runnermulti.py new file mode 100644 index 0000000..2a5e17c --- /dev/null +++ b/gamechestcli/gamechest/runners/runnermulti.py @@ -0,0 +1,91 @@ +import contextlib +import os +import selectors +import threading +from functools import partial + +from ..structures import Progress +from .runnerbase import RunnerBase + + +def _create_pipe(): + pipe_rd, pipe_wd = os.pipe() + return open(pipe_rd, 'rb', buffering=0), open(pipe_wd, 'wb', buffering=0) + + +class MultiSequencialRunnerBase(RunnerBase): + + def __init__(self, runners): + super().__init__() + self.runners = runners + self._pipe_rd, self._pipe_wd = _create_pipe() + self._pipesig_rd, self._pipesig_wd = _create_pipe() + self._thread = threading.Thread(target=self._thread_target) + self._thread.start() + self._last_progress = Progress() + self._last_rc = -1 + + def _runner_run(self, + runner_callable, + step_index, + runners_count): + with contextlib.ExitStack() as stack: + runner = stack.enter_context(runner_callable()) + selector = stack.enter_context(selectors.DefaultSelector()) + selector.register(self._pipesig_rd, selectors.EVENT_READ) + selector.register(runner.get_read_fd(), selectors.EVENT_READ) + while (rc := runner.poll()) is None: + for key, events in selector.select(): + if key.fileobj is self._pipesig_rd: + self._pipesig_rd.read(1) + return -1 + progress = runner.progress_read() + self._last_progress = progress._replace( + step=step_index, + steps_count=runners_count, + ) + self._pipe_wd.write(b'p') # (p)rogress, could be anything + return rc + + def _thread_target(self): + runners_count = len(self.runners) + with contextlib.ExitStack() as stack: + for step_index, runner_callable in enumerate(self.runners): + self._last_rc = self._runner_run(runner_callable, step_index, + runners_count) + if self._last_rc != 0: + break + # closing writing end of a pipe, allows select on reading-end to + # immediately return and reading on reading-end will returns EOF (or + # empty string on python). + self._pipe_wd.close() + + def get_read_fd(self): + return self._pipe_rd + + def progress_read(self): + # read: discard byte used to signal progress, and ignore if EOF. + self._pipe_rd.read(1) + return self._last_progress + + def terminate(self): + # ignore ValueError, can arrive on closed _pipesig_wd if terminate is + # called multiple times. + with contextlib.suppress(ValueError): + self._pipesig_wd.write(b't') # t for terminate, could be anything + # close to avoid filling the pipe buffer uselessly if called multiple + # times, since nothing will read next sent bytes. + self._pipesig_wd.close() + + def poll(self): + return None if self._thread.is_alive() else self._last_rc + + def close(self): + self._thread.join() + for fd in ( + self._pipesig_rd, + self._pipesig_wd, + self._pipe_rd, + self._pipe_wd, + ): + fd.close() |