import os import re import struct import subprocess import humanfriendly from ..structures import Progress from .runnerbase import RunnerBase, neutral_locale_variables class Extract(RunnerBase): _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$') def __init__(self, src, dst): common_parameters = dict( encoding='utf8', env={**os.environ, **neutral_locale_variables, }, ) self.src_size = os.stat(src).st_size self.pv_proc = subprocess.Popen( [ 'pv', '--force', '--format', '%b %a %e', src, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, **common_parameters, ) lzip_command = '/usr/bin/lzip' plzip_command = '/usr/bin/plzip' uncompress_command_to_use = [lzip_command, '--decompress'] first_word_magic = 0 with open(src, 'rb') as stream: first_word_magic = struct.unpack('>L', stream.read(4))[0] if first_word_magic == 0x4c5a4950: # lzip magic if os.path.exists(plzip_command): uncompress_command_to_use = [plzip_command, '--decompress'] elif first_word_magic == 0x28B52FFD: # zstd magic uncompress_command_to_use = ['zstd', '-T0', '-d', '--stdout'] self.zip_proc = subprocess.Popen( uncompress_command_to_use, stdin=self.pv_proc.stdout, stdout=subprocess.PIPE, **common_parameters, ) self.tar_proc = subprocess.Popen( [ 'tar', '-C', dst, '-xf', '-' ], stdin=self.zip_proc.stdout, **common_parameters, ) self.last_progress = Progress() def get_read_fd(self): return self.pv_proc.stderr def progress_read(self): line = self.pv_proc.stderr.readline() if match := self._progress_re.search(line): written_bytes = humanfriendly.parse_size(match.group(1)) self.last_progress = Progress( nbbytes=written_bytes, percent=int(100 * written_bytes / self.src_size), speed=humanfriendly.parse_size(match.group(2)), eta=match.group(3), ) return self.last_progress def terminate(self): for proc in (self.pv_proc, self.zip_proc, self.tar_proc): proc.terminate() def poll(self): return self.tar_proc.poll() def close(self): self.pv_proc.wait() self.zip_proc.wait() self.tar_proc.wait() if __name__ == '__main__': import contextlib import sys with contextlib.suppress(KeyboardInterrupt): with Extract(sys.argv[1], sys.argv[2]) as runner: while runner.poll() is None: print(runner.progress_read()) rc = runner.poll() print('ended with code:', rc) print('test main ended')