From 77c76b4e0407250fc0ba8ee0309a528a4a8f1f09 Mon Sep 17 00:00:00 2001 From: vg Date: Tue, 30 Aug 2022 09:57:12 +0200 Subject: git-sync on boo --- gamechestcli/__init__.py | 0 gamechestcli/extract.py | 109 ++++++++++++++++++++++++++ gamechestcli/multitest.py | 10 +++ gamechestcli/remove.py | 170 +++++++++++++++++++++++++++++++++++++++++ gamechestcli/requirements.txt | 1 + gamechestcli/rsync.py | 80 +++++++++++++++---- gamechestcli/structures.py | 9 +++ gamechestcli/testtools/fake-rm | 2 + 8 files changed, 368 insertions(+), 13 deletions(-) create mode 100644 gamechestcli/__init__.py create mode 100644 gamechestcli/extract.py create mode 100644 gamechestcli/multitest.py create mode 100644 gamechestcli/remove.py create mode 100644 gamechestcli/requirements.txt create mode 100644 gamechestcli/structures.py create mode 100755 gamechestcli/testtools/fake-rm diff --git a/gamechestcli/__init__.py b/gamechestcli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gamechestcli/extract.py b/gamechestcli/extract.py new file mode 100644 index 0000000..2ac7fde --- /dev/null +++ b/gamechestcli/extract.py @@ -0,0 +1,109 @@ +#!/usr/bin/python3 + +import os +import re +import subprocess + +import humanfriendly + +from structures import Progress + +class Extract: + + _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$') + + def __init__(self, src, dst): + common_parameters = dict( + encoding='utf8', + env={**os.environ, + **{'LC_ALL':'C.UTF-8', + 'LANG':'C.UTF-8', + 'LANGUAGE':'C.UTF-8', + }}, + ) + self.src_size = os.stat(src).st_size + self.pv_proc = subprocess.Popen( + [ + 'pv', + '--force', + '--format', '%b %a %e', + src, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **common_parameters, + ) + lzip_command = '/usr/bin/lzip' + plzip_command = '/usr/bin/plzip' + lzip_command_to_use = lzip_command + if os.path.exists(plzip_command): + lzip_command_to_use = plzip_command + self.zip_proc = subprocess.Popen( + [ + lzip_command_to_use, + '--decompress', + ], + stdin=self.pv_proc.stdout, + stdout=subprocess.PIPE, + **common_parameters, + ) + self.tar_proc = subprocess.Popen( + [ + 'tar', + '-C', dst, + '-xf', '-' + ], + stdin=self.zip_proc.stdout, + **common_parameters, + ) + self.last_progress = Progress() + + def select_fd(self): + 'useful to use selectors with the process most meaningful fd' + return self.pv_proc.stderr + + def progress_read(self): + line = self.pv_proc.stderr.readline() + if match := self._progress_re.search(line): + written_bytes = humanfriendly.parse_size(match.group(1)) + self.last_progress = Progress( + written_bytes, + int(100 * written_bytes / self.src_size), + humanfriendly.parse_size(match.group(2)), + match.group(3), + ) + return self.last_progress + + def terminate(self): + for proc in (self.pv_proc, self.zip_proc, self.tar_proc): + proc.terminate() + + def poll(self): + 'returns None if not terminated, otherwise return returncode' + return self.tar_proc.poll() + + def wait(self, timeout=None): + self.pv_proc.wait(timeout) + self.zip_proc.wait(timeout) + return self.tar_proc.wait(timeout) + + def __enter__(self): + return self + + def __exit__(self, exc_type, value, traceback): + self.terminate() + self.wait() + + +if __name__ == '__main__': + import sys + import contextlib + with contextlib.suppress(KeyboardInterrupt): + with Extract(sys.argv[1], sys.argv[2]) as extract: + while extract.poll() is None: + progress = extract.progress_read() + print(f'{progress.bytes}b {progress.percent}% ' + f'{progress.speed}b/s {progress.eta}') + rc = extract.poll() + print(f'ended with code: {rc}') + print('test main ended') diff --git a/gamechestcli/multitest.py b/gamechestcli/multitest.py new file mode 100644 index 0000000..8023a4e --- /dev/null +++ b/gamechestcli/multitest.py @@ -0,0 +1,10 @@ + +class Multi: + + def __init__(self): + pass + + + def read_progress(self): + while True: + yield diff --git a/gamechestcli/remove.py b/gamechestcli/remove.py new file mode 100644 index 0000000..00247f7 --- /dev/null +++ b/gamechestcli/remove.py @@ -0,0 +1,170 @@ +#!/usr/bin/python3 + +import io +import os +import re +import subprocess +import threading + +import humanfriendly + +from structures import Progress + + +class Remove: + + _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$') + + def __init__(self, path): + self.last_progress = Progress() + self.path = path + self.rm_proc = None + self.pv_proc = None + self.pipe = os.pipe() + self.read_fd = io.open(self.pipe[0], 'r', encoding='utf8') + self._proc_started = False + self._filescount = 0 + self._counting_quit_event = threading.Event() + self._counting_thread = threading.Thread( + target=self._counting_worker, + args=(self._counting_quit_event, path), + ) + self._counting_thread.start() + + def _start_proc(self): + common_parameters = dict( + encoding='utf8', + env={**os.environ, + **{'LC_ALL':'C.UTF-8', + 'LANG':'C.UTF-8', + 'LANGUAGE':'C.UTF-8', + }}, + ) + self.rm_proc = subprocess.Popen( + [ + 'rm', + '--verbose', + '--recursive', + '--one-file-system', + '--preserve-root=all', + '--interactive=never', + '--', + self.path, + ], + stdout=subprocess.PIPE, + **common_parameters, + ) + #self.rm_proc = subprocess.Popen( # only for testing purposes + # [ + # 'testtools/fake-rm', + # self.path, + # ], + # stdout=subprocess.PIPE, + # **common_parameters, + #) + self.pv_proc = subprocess.Popen( + [ + 'pv', + '--force', + '--size', str(self._filescount), + '--format', '%b %a %e', + '--line-mode', + ], + stdin=self.rm_proc.stdout, + stdout=subprocess.DEVNULL, + stderr=self.pipe[1], + **common_parameters, + ) + # close child's pipe fd on parent's side or selectors will hang if + # process closes. + os.close(self.pipe[1]) + self._proc_started = True + + def _counting_worker(self, event, path): + # as counting files for a deep directory can take some time, make it + # interruptible with a thread and event for cancelling. + # using functools.reduce(x+1+len(y[2]), os.walk) would have been much + # cleaner, but the code gets harder to read with the addition of the + # event checking. Fallback to accumalator and iterations. + total = 0 + for _, _, files in os.walk(path): + total += 1 + len(files) + if event.is_set(): + break + # set filescount with calculated total (even then interrupted, as it + # would be a better estimation than nothing). + self._filescount = total + # start next processes (if event was not set) + if not event.is_set(): + self._start_proc() + + def select_fd(self): + 'useful to use selectors with the process most meaningful fd' + return self.read_fd + + def progress_read(self): + if not self._proc_started: + return self.last_progress + line = self.read_fd.readline() + if match := self._progress_re.search(line): + written_bytes = humanfriendly.parse_size(match.group(1)) + self.last_progress = Progress( + written_bytes, + int(100 * written_bytes / self._filescount), + humanfriendly.parse_size(match.group(2)), + match.group(3), + ) + return self.last_progress + + def terminate(self): + if self._proc_started: + for proc in (self.rm_proc, self.pv_proc): + proc.terminate() + else: + self._counting_quit_event.set() + + def poll(self): + 'returns None if not terminated, otherwise return returncode' + if not self._proc_started: + return None + return self.pv_proc.poll() + + def wait(self, timeout=None): + if self._proc_started: + self.rm_proc.wait(timeout) + return self.pv_proc.wait(timeout) + else: + self._counting_thread.join(timeout) + return -1 + + def close(self): + self.read_fd.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, value, traceback): + self.terminate() + self.wait() + self.close() + + +if __name__ == '__main__': + import sys + import contextlib + import time + import selectors + + with contextlib.ExitStack() as stack: + stack.enter_context(contextlib.suppress(KeyboardInterrupt)) + remove = stack.enter_context(Remove(sys.argv[1])) + selector = stack.enter_context(selectors.DefaultSelector()) + selector.register(remove.select_fd(), selectors.EVENT_READ) + while remove.poll() is None: + selector.select() + progress = remove.progress_read() + print(f'{progress.bytes}b {progress.percent}% ' + f'{progress.speed}b/s {progress.eta}') + rc = remove.poll() + print(f'ended with code: {rc}') + print('test main ended') diff --git a/gamechestcli/requirements.txt b/gamechestcli/requirements.txt new file mode 100644 index 0000000..f5368c4 --- /dev/null +++ b/gamechestcli/requirements.txt @@ -0,0 +1 @@ +humanfriendly diff --git a/gamechestcli/rsync.py b/gamechestcli/rsync.py index f27c4f3..37330f9 100644 --- a/gamechestcli/rsync.py +++ b/gamechestcli/rsync.py @@ -1,28 +1,82 @@ #!/usr/bin/python3 +import os +import re import subprocess +import humanfriendly + +from .structures import Progress + class Rsync: + _rsync_progress_re = re.compile(r'^\s*(\S+)\s+(\d+)%\s+(\S+)\s+(\S+)\s+$') + def __init__(self, src, dst): - pass + self.proc = subprocess.Popen( + [ + 'rsync', + '--partial', + # not human readable, easier to parse (but speed still appears + # in human form). + '--no-h', + '--info=progress2', + src, + dst, + ], + stdout=subprocess.PIPE, + #stderr=subprocess.DEVNULL, + encoding='utf8', + env={**os.environ, + **{'LC_ALL':'C.UTF-8', + 'LANG':'C.UTF-8', + 'LANGUAGE':'C.UTF-8', + }}, + ) + self.last_progress = Progress() - #def start(src, dst): + def select_fd(self): + 'useful to use selectors with the process stdout file descriptor' + return self.proc.stdout - def terminate(self): + def progress_read(self): + line = self.proc.stdout.readline() + if match := self._rsync_progress_re.search(line): + self.last_progress = Progress( + int(match.group(1)), + int(match.group(2)), + humanfriendly.parse_size(match.group(3), binary=True), + match.group(4), + ) + return self.last_progress - pass + def terminate(self): + self.proc.terminate() def poll(self): - pass + 'returns None if not terminated, otherwise return returncode' + return self.proc.poll() + + def wait(self, timeout=None): + return self.proc.wait(timeout) + + def __enter__(self): + return self + def __exit__(self, exc_type, value, traceback): + self.terminate() + self.wait() -def rsync(src, dst): - subprocess.run( - ['rsync', - '--partial', - # format: %l length in bytes, %b bytes actually transferred, %f - # filename. - '--out-format=%l %b %f', - ]) +if __name__ == '__main__': + import sys + import contextlib + with contextlib.suppress(KeyboardInterrupt): + with Rsync(sys.argv[1], sys.argv[2]) as rsync: + while rsync.poll() is None: + progress = rsync.progress_read() + print(f'{progress.bytes}b {progress.percent}% ' + f'{progress.speed}b/s {progress.eta}') + rc = rsync.poll() + print(f'rsync ended with code: {rc}') + print('Rsync test main ended') diff --git a/gamechestcli/structures.py b/gamechestcli/structures.py new file mode 100644 index 0000000..22b0e7e --- /dev/null +++ b/gamechestcli/structures.py @@ -0,0 +1,9 @@ +#!/usr/bin/python3 + +import collections + +Progress = collections.namedtuple( + 'CurrentProgress', + 'bytes percent speed eta', + defaults=[0, 0, 0, 'infinite']) + diff --git a/gamechestcli/testtools/fake-rm b/gamechestcli/testtools/fake-rm new file mode 100755 index 0000000..722b939 --- /dev/null +++ b/gamechestcli/testtools/fake-rm @@ -0,0 +1,2 @@ +#!/bin/bash +find "$1" -print0 | pv -0 --rate-limit 10 -l -q | xargs -0 -n1 echo rm -v -- -- cgit v1.2.3