From 6346635a11bae03cf42d792a230dbd6dd4f41477 Mon Sep 17 00:00:00 2001 From: VG Date: Wed, 12 Aug 2015 11:24:01 +0200 Subject: first commit --- README | 1 + forkcopy-server.py | 407 +++++++++++++++++++++++++++++++++++++++++++++++++++++ forkcopy.sh | 9 ++ 3 files changed, 417 insertions(+) create mode 100644 README create mode 100755 forkcopy-server.py create mode 100755 forkcopy.sh diff --git a/README b/README new file mode 100644 index 0000000..3fcbd4a --- /dev/null +++ b/README @@ -0,0 +1 @@ +README of project forkcopy diff --git a/forkcopy-server.py b/forkcopy-server.py new file mode 100755 index 0000000..81085f0 --- /dev/null +++ b/forkcopy-server.py @@ -0,0 +1,407 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import sys, os +import curses +import socket +import threading +import time +import shutil +import tempfile +import getpass + +scr = None +logfile = None + +CHUNK_SIZE = 1048576 + +def log(s): + print >> logfile, s + logfile.flush() + +def logerr(s): + log('ERROR: ' + s) + +class MainLoopInterface: + def main_loop(self): raise NotImplementedError() + def should_quit(self): raise NotImplementedError() + +class RemoteControlInterface: + def next(self): raise NotImplementedError() + +def convert_bytes(bytes): + bytes = float(bytes) + if bytes >= 1099511627776: + terabytes = bytes / 1099511627776 + size = '%.2fT' % terabytes + elif bytes >= 1073741824: + gigabytes = bytes / 1073741824 + size = '%.2fG' % gigabytes + elif bytes >= 1048576: + megabytes = bytes / 1048576 + size = '%.2fM' % megabytes + elif bytes >= 1024: + kilobytes = bytes / 1024 + size = '%.2fK' % kilobytes + else: + size = '%.2fb' % bytes + return size + +class RemoteControl(RemoteControlInterface, threading.Thread): + + def __init__(self): + super(RemoteControl, self).__init__() + self._lines = [] + self._lines_lock = threading.Lock() + self.daemon = True + + def next(self): + if not self._lines_lock.acquire(False): + return None + else: + line = None + try: + if len(self._lines): + line = self._lines.pop() + finally: + self._lines_lock.release() + return line + + def run(self): + sock_path = os.path.join(tempfile.gettempdir(), + 'forkcopy-socket_' + getpass.getuser()) + sock_server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + if os.path.exists(sock_path): + os.remove(sock_path) + sock_server.bind(sock_path) + sock_server.listen(1) + while True: + sock, addr = sock_server.accept() + data = sock.recv(4096) + buf = '' + while len(data) != 0: + buf += data + data = sock.recv(4096) + sock.send('OK') + sock.close() + with self._lines_lock: + self._lines.extend(buf.splitlines()) + sock_server.close() + logerr('should not be here deverror') + +class forkcopy: + + ## @param main_loop a MainLoopInterface + def __init__(self, main_loop): + self._chunk_size = CHUNK_SIZE + self._size_copied = 0 + self._is_active = False + self._main_loop = main_loop + + # note I don't not care about free space, code here will raise IOError if + # an error arise. + def copy(self, ifn, ofn): + if os.path.exists(ofn) or os.path.exists(ofn + '.part'): + logerr('file `%s\' or `%s.part\' already exists, skipping' % + (ofn, ofn)) + return + self._is_active = True + self._size_copied = 0 + try: + dst = os.path.dirname(ofn) + if not os.path.exists(dst): + os.makedirs(dst) + remove_ofn = False + with open(ofn + '.part', 'wb') as target, \ + open(ifn, 'rb') as source: + data = source.read(self._chunk_size) + while len(data) != 0: + target.write(data) + self._size_copied += len(data) + self._main_loop.main_loop() + if self._main_loop.should_quit(): + remove_ofn = True + break + data = source.read(self._chunk_size) + if remove_ofn: + os.remove(ofn + '.part') + else: + os.rename(ofn + '.part', ofn) + try: + shutil.copystat(ifn, ofn) + #except OSError, why: + except OSError: + # can't copy file access times on Windows and windows fs + #logerr('can\'t copy attributes, %s' % str(why)) + pass + finally: + self._is_active = False + + def is_active(self): return self._is_active + def copied_size(self) : return self._size_copied + def reset_size(self): self._size_copied = 0 + + + +class MainLoop(MainLoopInterface): + + def __init__(self): + self._rc = None + self._fc = None + self._quit = False + + self._currentbase = '.' + self._filelist = [] + self._currentfile = (None, 0, None, None) + self._previousfile = False + self._previousstatus = False + + self._currentfilecopiedsize = 0 + self._copiedsize = 0 + self._previouscopiedsize = 0 + self._totalsize = 0 + + self._speed = 0 + self._speed_timer = None + self._speed_timertime = 5 + self._speed_lastsize = 0 + self._speed_lock = threading.Lock() + + self._fc = forkcopy(self) + self._rc = RemoteControl() + self._rc.start() + self._speed_timer = threading.Timer(self._speed_timertime, + self._determine_speed) + self._speed_timer.start() + + curses.curs_set(0) + scr.nodelay(1) + + scr.addstr( + 'Filename :\n' + + 'File size (Copied/Total):\n' + + 'Current destination:\n' + + 'Base destination directory:\n' + + 'Copy size (Copied/Total):\n' + + 'Current speed:\n' + + 'Current progress:\n' + + 'Global progress:\n') + + scr.refresh() + + def should_quit(self): return self._quit + + def _determine_speed(self): + self._speed_timer = threading.Timer(self._speed_timertime, + self._determine_speed) + self._speed_timer.start() + with self._speed_lock: + self._speed = (self._copiedsize - self._speed_lastsize) / \ + self._speed_timertime + self._speed_lastsize = self._copiedsize + + def _manage_copy(self): + if self._fc.is_active(): + return + # add new files if available in the list + if len(self._filelist) != 0: + self._currentfile = self._filelist.pop() + ifn, size, ofn, base = self._currentfile + fcsize = self._fc.copied_size() + self._fc.reset_size() + self._copiedsize += fcsize + self._fc.copy(ifn, ofn) + else: + self._currentfile = ('Waiting for new files to arrive', + 0, '', self._currentbase) + time.sleep(1) # avoid wasting cpu cycles when waiting for files + + def _print_progressbar(self, percent, y): + if percent < 0 or percent > 100: percent = 0 + height, width = scr.getmaxyx() + width -= 2 # substract '[]' characters + width += 1 # take whole line (bad sums below...) + if width <= 4: return + scr.addstr(y, 0, '[') + nbsharp = width*percent/100 + for i in range(1, nbsharp): + scr.addstr(y, i, '#') + for i in range(1 + nbsharp, width): + scr.addstr(y, i, ' ') + scr.addstr(']\n') + + def _normalize_str(self, s, size): + if len(s) <= 5 or size <= 5: + return s + if len(s) >= size: + return '...' + s[len(s)-size-3:] + return s + + def _manage_display(self): + screenh, screenw = scr.getmaxyx() + fcsize = self._fc.copied_size() + self._fc.reset_size() + self._currentfilecopiedsize += fcsize + self._copiedsize += fcsize + if (self._currentfile != self._previousfile or + self._copiedsize != self._previouscopiedsize): + if self._currentfile != self._previousfile: + self._currentfilecopiedsize = fcsize + self._previousfile = self._currentfile + self._previouscopiedsize = self._copiedsize + (ifn, size, ofn, base) = self._currentfile + speed = 0 + with self._speed_lock: + speed = self._speed + perc = 0 + gperc = 0 + if size != 0: + perc = self._currentfilecopiedsize * 100 / size + if self._totalsize != 0: + gperc = self._copiedsize * 100 / self._totalsize + scr.addstr(0, 28, self._normalize_str(ifn, screenw - 28) + '\n') + scr.addstr(1, 28, convert_bytes(self._currentfilecopiedsize) + + '/' + convert_bytes(size) + '\n') + scr.addstr(2, 28, self._normalize_str(ofn, screenw - 28) + '\n') + scr.addstr(3, 28, self._normalize_str(base, screenw - 28) + '\n') + scr.addstr(4, 28, convert_bytes(self._copiedsize) + '/' + + convert_bytes(self._totalsize) + '\n') + scr.addstr(5, 28, convert_bytes(speed) + '/s\n') + scr.addstr(6, 28, str(perc) + '%\n') + scr.addstr(7, 28, str(gperc) + '%\n') + self._print_progressbar(perc, 8) + self._print_progressbar(gperc, 9) + scr.refresh() + + def _gettreefiles(self, src, prefix = None): + if src[len(src)-1] == os.sep: + src = src[0:len(src)-1] # avoid basename troubles + names = os.listdir(src) + dstbase = '' + if prefix is None: + dstbase = os.path.join(self._currentbase, os.path.basename(src)) + else: + dstbase = os.path.join(prefix, os.path.basename(src)) + ret = [] + for name in names: + srcname = os.path.join(src, name) + dstname = os.path.join(dstbase, name) + try: + if os.path.isdir(srcname): + ret.extend(self._gettreefiles(srcname, dstname)) + else: + size = 0 + try: + size = os.path.getsize(srcname) + except os.error: + logerr('`%s\' inaccessible or command error.' % + srcname) + continue + newfile = (srcname, size, dstname, self._currentbase) + ret.append(newfile) + self._totalsize += size + except (IOError, os.error), why: + logerr('`%s\', %s.' % (srcname, str(why))) + except shutil.Error, err: + logerr('`%s\', %s.' % (srcname, str(err.args))) + + return ret + + def _command_add(self, ifn): + if not os.path.isdir(ifn): + size = 0 + ofn = os.path.join(self._currentbase, + os.path.basename(ifn)) + try: + size = os.path.getsize(ifn) + except os.error: + logerr('`%s\' inaccessible or command error.' % + ifn) + return + newfile = (ifn, size, ofn, self._currentbase) + log('adding file: ' + str(newfile)) + self._filelist.append(newfile) + self._totalsize += size + else: + newfiles = self._gettreefiles(ifn) + self._filelist.extend(newfiles) + log('adding files: ' + str(newfiles)) + + def _command_iadd(self, arg): + logerr('iadd not implemented') + + def _declare_quit(self): + self._quit = True + if self._speed_timer is not None: + self._speed_timer.cancel() + self._speed_timer = None + + def main_loop(self): + if self._quit: + return # app is quitting, do not do any operation + + item = self._rc.next() + if item != None: + index = item.find(' ') + command = '' + arg = '' + if index != -1: + command = item[0:index] + arg = item[index + 1:] + else: + command = item + log('command: `%s\'\narg: `%s\'' % (command, arg)) + if command == 'add': + self._command_add(arg) + elif command == 'iadd': # add as immutable, copy dir structure + self._command_iadd(arg) + elif command == 'quit': + self._declare_quit() + return + else: + logerr('unrecognized command: `%s\' on line `%s\'' % + (command, item)) + + if not self._rc.is_alive(): + self._declare_quit() + return + + key = scr.getch() + if key != curses.ERR: + if key == ord('q') or key == ord('Q'): + self._declare_quit() + elif key == ord('p'): + scr.addstr(23, 0, 'Paused : touch the \'p\' key to continue') + scr.refresh() + while True: + key = scr.getch() + if key != curses.ERR and key == ord('p'): + break + else: + time.sleep(1) + scr.addstr(23, 0, ' ') + scr.refresh() + + self._manage_copy() + self._manage_display() + +def main(stdscr): + global scr + scr = stdscr + main_loop = MainLoop() + while not main_loop.should_quit(): + main_loop.main_loop() + +if __name__ == '__main__': + # redirecting stderr errors to stderr.txt since it's hard to see inside + # alternative curses screen. + stderr = open(os.path.join(tempfile.gettempdir(), + 'forkcopy-stderr_' + getpass.getuser() + '.txt'), 'wb') + sys.stderr.close() + sys.stderr = stderr + with open(os.path.join(tempfile.gettempdir(), + 'forkcopy-log_' + getpass.getuser() + '.txt'), 'wb') as logfile: + log('START') + curses.wrapper(main) + log('STOP') + sys.exit(0) diff --git a/forkcopy.sh b/forkcopy.sh new file mode 100755 index 0000000..f6ac3a2 --- /dev/null +++ b/forkcopy.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +sockpath="/tmp/forkcopy-socket_$(id -nu)" +for i in "$@"; do +socat unix:"$sockpath" - <