#!/usr/bin/python3 import os import re import subprocess import humanfriendly from structures import Progress class Extract: _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$') def __init__(self, src, dst): common_parameters = dict( encoding='utf8', env={**os.environ, **{'LC_ALL':'C.UTF-8', 'LANG':'C.UTF-8', 'LANGUAGE':'C.UTF-8', }}, ) self.src_size = os.stat(src).st_size self.pv_proc = subprocess.Popen( [ 'pv', '--force', '--format', '%b %a %e', src, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, **common_parameters, ) lzip_command = '/usr/bin/lzip' plzip_command = '/usr/bin/plzip' lzip_command_to_use = lzip_command if os.path.exists(plzip_command): lzip_command_to_use = plzip_command self.zip_proc = subprocess.Popen( [ lzip_command_to_use, '--decompress', ], stdin=self.pv_proc.stdout, stdout=subprocess.PIPE, **common_parameters, ) self.tar_proc = subprocess.Popen( [ 'tar', '-C', dst, '-xf', '-' ], stdin=self.zip_proc.stdout, **common_parameters, ) self.last_progress = Progress() def select_fd(self): 'useful to use selectors with the process most meaningful fd' return self.pv_proc.stderr def progress_read(self): line = self.pv_proc.stderr.readline() if match := self._progress_re.search(line): written_bytes = humanfriendly.parse_size(match.group(1)) self.last_progress = Progress( written_bytes, int(100 * written_bytes / self.src_size), humanfriendly.parse_size(match.group(2)), match.group(3), ) return self.last_progress def terminate(self): for proc in (self.pv_proc, self.zip_proc, self.tar_proc): proc.terminate() def poll(self): 'returns None if not terminated, otherwise return returncode' return self.tar_proc.poll() def wait(self, timeout=None): self.pv_proc.wait(timeout) self.zip_proc.wait(timeout) return self.tar_proc.wait(timeout) def __enter__(self): return self def __exit__(self, exc_type, value, traceback): self.terminate() self.wait() if __name__ == '__main__': import sys import contextlib with contextlib.suppress(KeyboardInterrupt): with Extract(sys.argv[1], sys.argv[2]) as extract: while extract.poll() is None: progress = extract.progress_read() print(f'{progress.bytes}b {progress.percent}% ' f'{progress.speed}b/s {progress.eta}') rc = extract.poll() print(f'ended with code: {rc}') print('test main ended')