#!/usr/bin/python3 import io import os import re import subprocess import threading import humanfriendly from structures import Progress class Remove: _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$') def __init__(self, path): self.last_progress = Progress() self.path = path self.rm_proc = None self.pv_proc = None self.pipe = os.pipe() self.read_fd = io.open(self.pipe[0], 'r', encoding='utf8') self._proc_started = False self._filescount = 0 self._counting_quit_event = threading.Event() self._counting_thread = threading.Thread( target=self._counting_worker, args=(self._counting_quit_event, path), ) self._counting_thread.start() def _start_proc(self): common_parameters = dict( encoding='utf8', env={**os.environ, **{'LC_ALL':'C.UTF-8', 'LANG':'C.UTF-8', 'LANGUAGE':'C.UTF-8', }}, ) self.rm_proc = subprocess.Popen( [ 'rm', '--verbose', '--recursive', '--one-file-system', '--preserve-root=all', '--interactive=never', '--', self.path, ], stdout=subprocess.PIPE, **common_parameters, ) #self.rm_proc = subprocess.Popen( # only for testing purposes # [ # 'testtools/fake-rm', # self.path, # ], # stdout=subprocess.PIPE, # **common_parameters, #) self.pv_proc = subprocess.Popen( [ 'pv', '--force', '--size', str(self._filescount), '--format', '%b %a %e', '--line-mode', ], stdin=self.rm_proc.stdout, stdout=subprocess.DEVNULL, stderr=self.pipe[1], **common_parameters, ) # close child's pipe fd on parent's side or selectors will hang if # process closes. os.close(self.pipe[1]) self._proc_started = True def _counting_worker(self, event, path): # as counting files for a deep directory can take some time, make it # interruptible with a thread and event for cancelling. # using functools.reduce(x+1+len(y[2]), os.walk) would have been much # cleaner, but the code gets harder to read with the addition of the # event checking. Fallback to accumalator and iterations. total = 0 for _, _, files in os.walk(path): total += 1 + len(files) if event.is_set(): break # set filescount with calculated total (even then interrupted, as it # would be a better estimation than nothing). self._filescount = total # start next processes (if event was not set) if not event.is_set(): self._start_proc() def select_fd(self): 'useful to use selectors with the process most meaningful fd' return self.read_fd def progress_read(self): if not self._proc_started: return self.last_progress line = self.read_fd.readline() if match := self._progress_re.search(line): written_bytes = humanfriendly.parse_size(match.group(1)) self.last_progress = Progress( written_bytes, int(100 * written_bytes / self._filescount), humanfriendly.parse_size(match.group(2)), match.group(3), ) return self.last_progress def terminate(self): if self._proc_started: for proc in (self.rm_proc, self.pv_proc): proc.terminate() else: self._counting_quit_event.set() def poll(self): 'returns None if not terminated, otherwise return returncode' if not self._proc_started: return None return self.pv_proc.poll() def wait(self, timeout=None): if self._proc_started: self.rm_proc.wait(timeout) return self.pv_proc.wait(timeout) else: self._counting_thread.join(timeout) return -1 def close(self): self.read_fd.close() def __enter__(self): return self def __exit__(self, exc_type, value, traceback): self.terminate() self.wait() self.close() if __name__ == '__main__': import sys import contextlib import time import selectors with contextlib.ExitStack() as stack: stack.enter_context(contextlib.suppress(KeyboardInterrupt)) remove = stack.enter_context(Remove(sys.argv[1])) selector = stack.enter_context(selectors.DefaultSelector()) selector.register(remove.select_fd(), selectors.EVENT_READ) while remove.poll() is None: selector.select() progress = remove.progress_read() print(f'{progress.bytes}b {progress.percent}% ' f'{progress.speed}b/s {progress.eta}') rc = remove.poll() print(f'ended with code: {rc}') print('test main ended')