1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
import os
import re
import select
import struct
import subprocess
import threading
import zipfile
import humanfriendly
from ..structures import Progress
from .runnerbase import RunnerBase, neutral_locale_variables
class ExtractZip(RunnerBase):
"""
Simple zip wrapper without multithreading. Should not be an issue as zip
is used for little archives only, bigger archives are tar+zstd with
multithreaded algorithms. So keep this class simple.
"""
def __init__(self, src, dst):
self.last_progress = Progress()
self.read_fd, self.write_fd = os.pipe()
self.cancel_event = threading.Event()
# No need to lock this unless GIL is disabled.
self.written_bytes = 0
self.total_bytes = 0
self.progress = 0
def extract_and_report():
try:
chunk_size = 1024**2 # 1MiB chunks
with zipfile.ZipFile(src, 'r') as zip_ref:
self.total_bytes = sum(getattr(file_info, 'file_size', 0) for file_info in zip_ref.infolist() if not file_info.is_dir())
for file_info in zip_ref.infolist():
if file_info.is_dir():
continue
with zip_ref.open(file_info) as source_file:
target_path = os.path.join(dst, file_info.filename)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, 'wb') as target_file:
while not self.cancel_event.is_set():
chunk = source_file.read(chunk_size)
if not chunk or self.cancel_event.is_set():
# if we have read everything from the
# source file, OR, we got a cancel event
# in-between the read and the write, we
# break.
break
target_file.write(chunk)
self.written_bytes += len(chunk)
self.progress = (self.written_bytes / self.total_bytes) * 100
#progress_message = f"Progress: {progress:.2f}% ({bytes_written} of {total_bytes} bytes)\n"
os.write(self.write_fd, b'.')
finally:
self.cancel_event.set()
self.progress = 100
os.write(self.write_fd, b'.')
#os.close(self.read_fd)
#os.close(self.write_fd)
# Create a thread for extraction
extraction_thread = threading.Thread(target=extract_and_report)
extraction_thread.start()
self.extraction_thread = extraction_thread
def get_read_fd(self):
return self.read_fd
def progress_read(self):
ready, _, _ = select.select([self.read_fd], [], [])
if ready:
# flush any present data from the pipe
os.read(self.read_fd, 1024)
self.last_progress = Progress(
nbbytes=self.written_bytes,
percent=int(100 * self.written_bytes / self.total_bytes),
#speed=humanfriendly.parse_size(match.group(2)),
speed=0,
eta="UNK",
)
return self.last_progress
def terminate(self):
self.close() # same as self.close, can be called multiple time
def poll(self):
return None if not self.cancel_event.is_set() else 0
def close(self):
self.cancel_event.set()
self.extraction_thread.join()
class ExtractTar(RunnerBase):
_progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$')
def __init__(self, src, dst, compression_type):
common_parameters = dict(
encoding='utf8',
env={**os.environ,
**neutral_locale_variables,
},
)
self.src_size = os.stat(src).st_size
self.pv_proc = subprocess.Popen(
[
'pv',
'--force',
'--format', '%b %a %e',
src,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**common_parameters,
)
lzip_command = '/usr/bin/lzip'
plzip_command = '/usr/bin/plzip'
uncompress_command_to_use = [lzip_command, '--decompress']
if compression_type == 'lzip':
if os.path.exists(plzip_command):
uncompress_command_to_use = [plzip_command, '--decompress']
elif compression_type == 'zstd':
uncompress_command_to_use = ['zstd', '-T0', '-d', '--stdout']
self.zip_proc = subprocess.Popen(
uncompress_command_to_use,
stdin=self.pv_proc.stdout,
stdout=subprocess.PIPE,
**common_parameters,
)
self.tar_proc = subprocess.Popen(
[
'tar',
'-C', dst,
'-xf', '-'
],
stdin=self.zip_proc.stdout,
**common_parameters,
)
self.last_progress = Progress()
def get_read_fd(self):
return self.pv_proc.stderr
def progress_read(self):
line = self.pv_proc.stderr.readline()
if match := self._progress_re.search(line):
written_bytes = humanfriendly.parse_size(match.group(1))
self.last_progress = Progress(
nbbytes=written_bytes,
percent=int(100 * written_bytes / self.src_size),
speed=humanfriendly.parse_size(match.group(2)),
eta=match.group(3),
)
return self.last_progress
def terminate(self):
for proc in (self.pv_proc, self.zip_proc, self.tar_proc):
proc.terminate()
def poll(self):
return self.tar_proc.poll()
def close(self):
self.pv_proc.wait()
self.zip_proc.wait()
self.tar_proc.wait()
class Extract(RunnerBase):
def __init__(self, src, dst):
zip_magics = (
b'PK\x03\x04',
b'PK\x05\x06', # empty
b'PK\x07\x08', # spanned
)
first_word_magic = 0
first_word_magic_b = b''
compression_type = None
with open(src, 'rb') as stream:
first_word_magic_b = stream.read(4)
first_word_magic = struct.unpack('>L', first_word_magic_b)[0]
if first_word_magic_b in zip_magics:
compression_type = 'zip'
elif first_word_magic == 0x4c5a4950: # lzip magic
compression_type = 'lzip'
elif first_word_magic == 0x28B52FFD: # zstd magic
compression_type = 'zstd'
if compression_type == 'zip':
self.inner_class = ExtractZip(src, dst)
else:
self.inner_class = ExtractTar(src, dst, compression_type)
def get_read_fd(self):
return self.inner_class.get_read_fd()
def progress_read(self):
return self.inner_class.progress_read()
def terminate(self):
return self.inner_class.terminate()
def poll(self):
return self.inner_class.poll()
def close(self):
return self.inner_class.close()
if __name__ == '__main__':
import contextlib
import sys
with contextlib.suppress(KeyboardInterrupt):
with Extract(sys.argv[1], sys.argv[2]) as runner:
while runner.poll() is None:
print(runner.progress_read())
rc = runner.poll()
print('ended with code:', rc)
print('test main ended')
|