1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
#!/usr/bin/python3
import io
import os
import re
import subprocess
import threading
import humanfriendly
from structures import Progress
class Remove:
_progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$')
def __init__(self, path):
self.last_progress = Progress()
self.path = path
self.rm_proc = None
self.pv_proc = None
self.pipe = os.pipe()
self.read_fd = io.open(self.pipe[0], 'r', encoding='utf8')
self._proc_started = False
self._filescount = 0
self._counting_quit_event = threading.Event()
self._counting_thread = threading.Thread(
target=self._counting_worker,
args=(self._counting_quit_event, path),
)
self._counting_thread.start()
def _start_proc(self):
common_parameters = dict(
encoding='utf8',
env={**os.environ,
**{'LC_ALL':'C.UTF-8',
'LANG':'C.UTF-8',
'LANGUAGE':'C.UTF-8',
}},
)
self.rm_proc = subprocess.Popen(
[
'rm',
'--verbose',
'--recursive',
'--one-file-system',
'--preserve-root=all',
'--interactive=never',
'--',
self.path,
],
stdout=subprocess.PIPE,
**common_parameters,
)
#self.rm_proc = subprocess.Popen( # only for testing purposes
# [
# 'testtools/fake-rm',
# self.path,
# ],
# stdout=subprocess.PIPE,
# **common_parameters,
#)
self.pv_proc = subprocess.Popen(
[
'pv',
'--force',
'--size', str(self._filescount),
'--format', '%b %a %e',
'--line-mode',
],
stdin=self.rm_proc.stdout,
stdout=subprocess.DEVNULL,
stderr=self.pipe[1],
**common_parameters,
)
# close child's pipe fd on parent's side or selectors will hang if
# process closes.
os.close(self.pipe[1])
self._proc_started = True
def _counting_worker(self, event, path):
# as counting files for a deep directory can take some time, make it
# interruptible with a thread and event for cancelling.
# using functools.reduce(x+1+len(y[2]), os.walk) would have been much
# cleaner, but the code gets harder to read with the addition of the
# event checking. Fallback to accumalator and iterations.
total = 0
for _, _, files in os.walk(path):
total += 1 + len(files)
if event.is_set():
break
# set filescount with calculated total (even then interrupted, as it
# would be a better estimation than nothing).
self._filescount = total
# start next processes (if event was not set)
if not event.is_set():
self._start_proc()
def select_fd(self):
'useful to use selectors with the process most meaningful fd'
return self.read_fd
def progress_read(self):
if not self._proc_started:
return self.last_progress
line = self.read_fd.readline()
if match := self._progress_re.search(line):
written_bytes = humanfriendly.parse_size(match.group(1))
self.last_progress = Progress(
written_bytes,
int(100 * written_bytes / self._filescount),
humanfriendly.parse_size(match.group(2)),
match.group(3),
)
return self.last_progress
def terminate(self):
if self._proc_started:
for proc in (self.rm_proc, self.pv_proc):
proc.terminate()
else:
self._counting_quit_event.set()
def poll(self):
'returns None if not terminated, otherwise return returncode'
if not self._proc_started:
return None
return self.pv_proc.poll()
def wait(self, timeout=None):
if self._proc_started:
self.rm_proc.wait(timeout)
return self.pv_proc.wait(timeout)
else:
self._counting_thread.join(timeout)
return -1
def close(self):
self.read_fd.close()
def __enter__(self):
return self
def __exit__(self, exc_type, value, traceback):
self.terminate()
self.wait()
self.close()
if __name__ == '__main__':
import sys
import contextlib
import time
import selectors
with contextlib.ExitStack() as stack:
stack.enter_context(contextlib.suppress(KeyboardInterrupt))
remove = stack.enter_context(Remove(sys.argv[1]))
selector = stack.enter_context(selectors.DefaultSelector())
selector.register(remove.select_fd(), selectors.EVENT_READ)
while remove.poll() is None:
selector.select()
progress = remove.progress_read()
print(f'{progress.bytes}b {progress.percent}% '
f'{progress.speed}b/s {progress.eta}')
rc = remove.poll()
print(f'ended with code: {rc}')
print('test main ended')
|