diff options
Diffstat (limited to 'gamechestcli')
| -rw-r--r-- | gamechestcli/gamechest/runners/extract.py | 133 | 
1 files changed, 126 insertions, 7 deletions
diff --git a/gamechestcli/gamechest/runners/extract.py b/gamechestcli/gamechest/runners/extract.py index 4e538ef..23c3d34 100644 --- a/gamechestcli/gamechest/runners/extract.py +++ b/gamechestcli/gamechest/runners/extract.py @@ -1,7 +1,10 @@  import os  import re +import select  import struct  import subprocess +import threading +import zipfile  import humanfriendly @@ -9,11 +12,89 @@ from ..structures import Progress  from .runnerbase import RunnerBase, neutral_locale_variables -class Extract(RunnerBase): +class ExtractZip(RunnerBase): +    """ +    Simple zip wrapper without multithreading. Should not be an issue as zip +    is used for little archives only, bigger archives are tar+zstd with +    multithreaded algorithms. So keep this class simple. +    """ + +    def __init__(self, src, dst): +        self.zip = zipfile.ZipFile(src, 'r') +        self.last_progress = Progress() +        self.read_fd, self.write_fd = os.pipe() +        self.cancel_event = threading.Event() + +        # No need to lock this unless GIL is disabled. +        self.written_bytes = 0 +        self.total_bytes = 0 +        self.progress = 0 + +        def extract_and_report(): +            chunk_size = 1024**2 # 1MiB chunks +            with zipfile.ZipFile(src, 'r') as zip_ref: +                self.total_bytes = sum(getattr(file_info, 'file_size', 0) for file_info in zip_ref.infolist()) +                for file_info in zip_ref.infolist(): +                    with zip_ref.open(file_info) as source_file: +                        target_path = os.path.join(dst, file_info.filename) +                        os.makedirs(os.path.dirname(target_path), exist_ok=True) +                        with open(target_path, 'wb') as target_file: +                            while not self.cancel_event.is_set(): +                                chunk = source_file.read(chunk_size) +                                if not chunk or self.cancel_event.is_set(): +                                    # if we have read everything from the +                                    # source file, OR, we got a cancel event +                                    # in-between the read and the write, we +                                    # break. +                                    break +                                target_file.write(chunk) +                                self.written_bytes += len(chunk) +                                self.progress = (self.written_bytes / self.total_bytes) * 100 +                                #progress_message = f"Progress: {progress:.2f}% ({bytes_written} of {total_bytes} bytes)\n" +                                os.write(self.write_fd, b'.') + +        # Create a thread for extraction +        extraction_thread = threading.Thread(target=extract_and_report) +        extraction_thread.start() +        self.extraction_thread = extraction_thread + + +    def get_read_fd(self): +        return self.read_fd + +    def progress_read(self): +        ready, _, _ = select.select([self.read_fd], [], []) +        if ready: +            # flush any present data from the pipe +            os.read(self.read_fd, 1024) +            self.last_progress = Progress( +                nbbytes=self.written_bytes, +                percent=int(100 * self.written_bytes / self.total_bytes), +                #speed=humanfriendly.parse_size(match.group(2)), +                speed=0, +                eta="UNK", +            ) +        return self.last_progress + +    def terminate(self): +        self.close() # same as self.close, can be called multiple time + +    def poll(self): +        return None if self.zip else 0 + +    def close(self): +        self.cancel_event.set() +        self.extraction_thread.join() +        if self.zip: +            self.zip.close() +        self.zip = None + + +class ExtractTar(RunnerBase):      _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$') -    def __init__(self, src, dst): +    def __init__(self, src, dst, compression_type):          common_parameters = dict(              encoding='utf8',              env={**os.environ, @@ -35,13 +116,10 @@ class Extract(RunnerBase):          lzip_command = '/usr/bin/lzip'          plzip_command = '/usr/bin/plzip'          uncompress_command_to_use = [lzip_command, '--decompress'] -        first_word_magic = 0 -        with open(src, 'rb') as stream: -            first_word_magic = struct.unpack('>L', stream.read(4))[0] -        if first_word_magic == 0x4c5a4950: # lzip magic +        if compression_type == 'lzip':              if os.path.exists(plzip_command):                  uncompress_command_to_use = [plzip_command, '--decompress'] -        elif first_word_magic == 0x28B52FFD: # zstd magic +        elif compression_type == 'zstd':              uncompress_command_to_use = ['zstd', '-T0', '-d', '--stdout']          self.zip_proc = subprocess.Popen(              uncompress_command_to_use, @@ -88,6 +166,47 @@ class Extract(RunnerBase):          self.tar_proc.wait() +class Extract(RunnerBase): +    def __init__(self, src, dst): +        zip_magics = ( +            b'PK\x03\x04', +            b'PK\x05\x06', # empty +            b'PK\x07\x08', # spanned +        ) + +        first_word_magic = 0 +        first_word_magic_b = b'' +        compression_type = None +        with open(src, 'rb') as stream: +            first_word_magic_b = stream.read(4) +            first_word_magic = struct.unpack('>L', first_word_magic_b)[0] +        if first_word_magic_b in zip_magics: +            compression_type = 'zip' +        elif first_word_magic == 0x4c5a4950: # lzip magic +            compression_type = 'lzip' +        elif first_word_magic == 0x28B52FFD: # zstd magic +            compression_type = 'zstd' +        if compression_type == 'zip': +            self.inner_class = ExtractZip(src, dst) +        else: +            self.inner_class = ExtractTar(src, dst, compression_type) + +    def get_read_fd(self): +        return self.inner_class.get_read_fd() + +    def progress_read(self): +        return self.inner_class.progress_read() + +    def terminate(self): +        return self.inner_class.terminate() + +    def poll(self): +        return self.inner_class.poll() + +    def close(self): +        return self.inner_class.close() + +  if __name__ == '__main__':      import contextlib      import sys  | 
