#!python3 import collections import copy from enum import Enum, auto as enum_auto Progress = collections.namedtuple('Progress', 'step index count percent') Progress.step.__doc__ = 'name of the current step' Progress.index.__doc__ = 'current step index out of count steps' Progress.count.__doc__ = 'total number of steps' Progress.percent.__doc__ = 'advance in percent of the current step' class Stepper: ''' Manage a worker processing steps in background and allows polling its progress. Takes steps as a dict of step_names/step_worker. A step_worker is a function generator taking a queue waiting for a stop command to abort the step and yielding progress of the step as a percentage, and as second argument it takes a GameChestConfig. ''' class Command(Enum): STOP = enum_auto() def _create_thread(self): return threading.Thread(target=self.worker, daemon=True) def __init__(self, conf, steps): self.conf = conf self.thread = None self.commandq = queue.Queue(maxsize=10) self.progressq = queue.Queue(maxsize=10) self.steps = steps self.count = len(steps) self.steps.__doc__ = 'steps are a key/func-value dict' self.thread = self._create_thread() self.last_progress = Progress('unknown', 0, self.count, 0) self.status = {} def worker(self, *args): for index, step, step_worker in enumerate(self.steps.items()): self.progressq.put(Progress(step, index, self.count, 0)) for percent in step_worker(self.commandq, self.conf, self.status): self.progressq.put(Progress(step, index, self.count, percent)) def start(self, status): if self.is_alive(): return self.status = copy.deepcopy(status) self.thread.start() def stop(self): if not self.is_alive(): return self.commandq.put(self.Command.STOP) self.thread.join() self.thread = self._create_thread() def is_alive(self): return self.thread.is_alive() def poll(self): with contextlib.suppress(queue.Empty): progress = self.progress_queue.get_nowait() self.last_progress = progress return self.last_progress