summaryrefslogtreecommitdiffstats
path: root/gamechestcli/gamechest/runners/remove.py
blob: 99c4247aaedc8e2a88e0c4b0be29c92556f4f07c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import re
import subprocess
import threading

import humanfriendly

from ..structures import Progress
from .runnerbase import RunnerBase, neutral_locale_variables


class Remove(RunnerBase):

    _progress_re = re.compile(r'^\s*(\S+)\s+\[([^]]+)/s\]\s+ETA\s+(\S+)\s*$')

    def __init__(self, path):
        self.last_progress = Progress(linemode=True)
        self.path = path
        self.rm_proc = None
        self.pv_proc = None
        self.pipe = os.pipe()
        self.read_fd = open(self.pipe[0], 'r', encoding='utf8')
        #self.read_fd = open(self.pipe[0], 'rb', buffering=0)
        self._proc_started = False
        self._filescount = 0
        self._counting_quit_event = threading.Event()
        self._counting_thread = threading.Thread(
            target=self._counting_worker,
            args=(self._counting_quit_event, path),
        )
        self._counting_thread.start()

    def _start_proc(self):
        common_parameters = dict(
            encoding='utf8',
            env={**os.environ,
                 **neutral_locale_variables,
                 },
        )
        self.rm_proc = subprocess.Popen(
            [
                'rm',
                '--verbose',
                '--recursive',
                '--one-file-system',
                '--preserve-root=all',
                '--interactive=never',
                '--',
                self.path,
            ],
        #    [  # only for testing
        #        'testtools/fake-rm',
        #        self.path,
        #    ],
            stdout=subprocess.PIPE,
            **common_parameters,
        )
        self.pv_proc = subprocess.Popen(
            [
                'pv',
                '--force',
                '--size', str(self._filescount),
                '--format', '%b %a %e',
                '--line-mode',
            ],
            stdin=self.rm_proc.stdout,
            stdout=subprocess.DEVNULL,
            stderr=self.pipe[1],
            **common_parameters,
        )
        # close child's pipe fd on parent's side or selectors will hang if
        # process closes.
        os.close(self.pipe[1])
        self._proc_started = True

    def _counting_worker(self, event, path):
        # as counting files for a deep directory can take some time, make it
        # interruptible with a thread and event for cancelling.
        # using functools.reduce(x+1+len(y[2]), os.walk) would have been much
        # cleaner, but the code gets harder to read with the addition of the
        # event checking. Fallback to accumalator and iterations.
        total = 0
        for _, _, files in os.walk(path):
            total += 1 + len(files)
            if event.is_set():
                break
        # set filescount with calculated total (even then interrupted, as it
        # would be a better estimation than nothing).
        if os.path.isfile(path):
            self._filescount = 1
        else:
            self._filescount = total
        # start next processes (if event was not set)
        if not event.is_set():
            self._start_proc()

    def get_read_fd(self):
        return self.read_fd

    def progress_read(self):
        if not self._proc_started:
            return self.last_progress
        line = self.read_fd.readline()
        #line = self.read_fd.readline().decode('utf8')
        if match := self._progress_re.search(line):
            written_bytes = humanfriendly.parse_size(match.group(1))
            self.last_progress = Progress(
                linemode=True,
                nbbytes=written_bytes,
                percent=int(100 * written_bytes / self._filescount),
                speed=humanfriendly.parse_size(match.group(2)),
                eta=match.group(3),
            )
        return self.last_progress

    def terminate(self):
        self._counting_quit_event.set()
        if self._proc_started:
            for proc in (self.rm_proc, self.pv_proc):
                proc.terminate()

    def poll(self):
        if not self._proc_started:
            return None
        return self.pv_proc.poll()

    def close(self):
        self._counting_thread.join()
        if self._proc_started:
            for proc in (self.rm_proc, self.pv_proc):
                proc.wait()
        self.read_fd.close()


if __name__ == '__main__':
    import sys
    import contextlib
    import time
    import selectors

    with contextlib.ExitStack() as stack:
        stack.enter_context(contextlib.suppress(KeyboardInterrupt))
        runner = stack.enter_context(Remove(sys.argv[1]))
        selector = stack.enter_context(selectors.DefaultSelector())
        selector.register(runner.get_read_fd(), selectors.EVENT_READ)
        while (rc := runner.poll()) is None:
            if selector.select():
                print(runner.progress_read())
        print('ended with code:', rc)
    print('test main ended')