aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVG <vg@devys.org>2015-08-12 11:24:01 +0200
committerVG <vg@devys.org>2015-08-12 11:24:01 +0200
commit6346635a11bae03cf42d792a230dbd6dd4f41477 (patch)
tree7d99042e0a2c026ea70fd9c40a6c4b3ff9443d20
downloadforkcopy-6346635a11bae03cf42d792a230dbd6dd4f41477.tar.gz
forkcopy-6346635a11bae03cf42d792a230dbd6dd4f41477.tar.bz2
forkcopy-6346635a11bae03cf42d792a230dbd6dd4f41477.zip
first commit
-rw-r--r--README1
-rwxr-xr-xforkcopy-server.py407
-rwxr-xr-xforkcopy.sh9
3 files changed, 417 insertions, 0 deletions
diff --git a/README b/README
new file mode 100644
index 0000000..3fcbd4a
--- /dev/null
+++ b/README
@@ -0,0 +1 @@
+README of project forkcopy
diff --git a/forkcopy-server.py b/forkcopy-server.py
new file mode 100755
index 0000000..81085f0
--- /dev/null
+++ b/forkcopy-server.py
@@ -0,0 +1,407 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import sys, os
+import curses
+import socket
+import threading
+import time
+import shutil
+import tempfile
+import getpass
+
+scr = None
+logfile = None
+
+CHUNK_SIZE = 1048576
+
+def log(s):
+ print >> logfile, s
+ logfile.flush()
+
+def logerr(s):
+ log('ERROR: ' + s)
+
+class MainLoopInterface:
+ def main_loop(self): raise NotImplementedError()
+ def should_quit(self): raise NotImplementedError()
+
+class RemoteControlInterface:
+ def next(self): raise NotImplementedError()
+
+def convert_bytes(bytes):
+ bytes = float(bytes)
+ if bytes >= 1099511627776:
+ terabytes = bytes / 1099511627776
+ size = '%.2fT' % terabytes
+ elif bytes >= 1073741824:
+ gigabytes = bytes / 1073741824
+ size = '%.2fG' % gigabytes
+ elif bytes >= 1048576:
+ megabytes = bytes / 1048576
+ size = '%.2fM' % megabytes
+ elif bytes >= 1024:
+ kilobytes = bytes / 1024
+ size = '%.2fK' % kilobytes
+ else:
+ size = '%.2fb' % bytes
+ return size
+
+class RemoteControl(RemoteControlInterface, threading.Thread):
+
+ def __init__(self):
+ super(RemoteControl, self).__init__()
+ self._lines = []
+ self._lines_lock = threading.Lock()
+ self.daemon = True
+
+ def next(self):
+ if not self._lines_lock.acquire(False):
+ return None
+ else:
+ line = None
+ try:
+ if len(self._lines):
+ line = self._lines.pop()
+ finally:
+ self._lines_lock.release()
+ return line
+
+ def run(self):
+ sock_path = os.path.join(tempfile.gettempdir(),
+ 'forkcopy-socket_' + getpass.getuser())
+ sock_server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ if os.path.exists(sock_path):
+ os.remove(sock_path)
+ sock_server.bind(sock_path)
+ sock_server.listen(1)
+ while True:
+ sock, addr = sock_server.accept()
+ data = sock.recv(4096)
+ buf = ''
+ while len(data) != 0:
+ buf += data
+ data = sock.recv(4096)
+ sock.send('OK')
+ sock.close()
+ with self._lines_lock:
+ self._lines.extend(buf.splitlines())
+ sock_server.close()
+ logerr('should not be here deverror')
+
+class forkcopy:
+
+ ## @param main_loop a MainLoopInterface
+ def __init__(self, main_loop):
+ self._chunk_size = CHUNK_SIZE
+ self._size_copied = 0
+ self._is_active = False
+ self._main_loop = main_loop
+
+ # note I don't not care about free space, code here will raise IOError if
+ # an error arise.
+ def copy(self, ifn, ofn):
+ if os.path.exists(ofn) or os.path.exists(ofn + '.part'):
+ logerr('file `%s\' or `%s.part\' already exists, skipping' %
+ (ofn, ofn))
+ return
+ self._is_active = True
+ self._size_copied = 0
+ try:
+ dst = os.path.dirname(ofn)
+ if not os.path.exists(dst):
+ os.makedirs(dst)
+ remove_ofn = False
+ with open(ofn + '.part', 'wb') as target, \
+ open(ifn, 'rb') as source:
+ data = source.read(self._chunk_size)
+ while len(data) != 0:
+ target.write(data)
+ self._size_copied += len(data)
+ self._main_loop.main_loop()
+ if self._main_loop.should_quit():
+ remove_ofn = True
+ break
+ data = source.read(self._chunk_size)
+ if remove_ofn:
+ os.remove(ofn + '.part')
+ else:
+ os.rename(ofn + '.part', ofn)
+ try:
+ shutil.copystat(ifn, ofn)
+ #except OSError, why:
+ except OSError:
+ # can't copy file access times on Windows and windows fs
+ #logerr('can\'t copy attributes, %s' % str(why))
+ pass
+ finally:
+ self._is_active = False
+
+ def is_active(self): return self._is_active
+ def copied_size(self) : return self._size_copied
+ def reset_size(self): self._size_copied = 0
+
+
+
+class MainLoop(MainLoopInterface):
+
+ def __init__(self):
+ self._rc = None
+ self._fc = None
+ self._quit = False
+
+ self._currentbase = '.'
+ self._filelist = []
+ self._currentfile = (None, 0, None, None)
+ self._previousfile = False
+ self._previousstatus = False
+
+ self._currentfilecopiedsize = 0
+ self._copiedsize = 0
+ self._previouscopiedsize = 0
+ self._totalsize = 0
+
+ self._speed = 0
+ self._speed_timer = None
+ self._speed_timertime = 5
+ self._speed_lastsize = 0
+ self._speed_lock = threading.Lock()
+
+ self._fc = forkcopy(self)
+ self._rc = RemoteControl()
+ self._rc.start()
+ self._speed_timer = threading.Timer(self._speed_timertime,
+ self._determine_speed)
+ self._speed_timer.start()
+
+ curses.curs_set(0)
+ scr.nodelay(1)
+
+ scr.addstr(
+ 'Filename :\n' +
+ 'File size (Copied/Total):\n' +
+ 'Current destination:\n' +
+ 'Base destination directory:\n' +
+ 'Copy size (Copied/Total):\n' +
+ 'Current speed:\n' +
+ 'Current progress:\n' +
+ 'Global progress:\n')
+
+ scr.refresh()
+
+ def should_quit(self): return self._quit
+
+ def _determine_speed(self):
+ self._speed_timer = threading.Timer(self._speed_timertime,
+ self._determine_speed)
+ self._speed_timer.start()
+ with self._speed_lock:
+ self._speed = (self._copiedsize - self._speed_lastsize) / \
+ self._speed_timertime
+ self._speed_lastsize = self._copiedsize
+
+ def _manage_copy(self):
+ if self._fc.is_active():
+ return
+ # add new files if available in the list
+ if len(self._filelist) != 0:
+ self._currentfile = self._filelist.pop()
+ ifn, size, ofn, base = self._currentfile
+ fcsize = self._fc.copied_size()
+ self._fc.reset_size()
+ self._copiedsize += fcsize
+ self._fc.copy(ifn, ofn)
+ else:
+ self._currentfile = ('Waiting for new files to arrive',
+ 0, '', self._currentbase)
+ time.sleep(1) # avoid wasting cpu cycles when waiting for files
+
+ def _print_progressbar(self, percent, y):
+ if percent < 0 or percent > 100: percent = 0
+ height, width = scr.getmaxyx()
+ width -= 2 # substract '[]' characters
+ width += 1 # take whole line (bad sums below...)
+ if width <= 4: return
+ scr.addstr(y, 0, '[')
+ nbsharp = width*percent/100
+ for i in range(1, nbsharp):
+ scr.addstr(y, i, '#')
+ for i in range(1 + nbsharp, width):
+ scr.addstr(y, i, ' ')
+ scr.addstr(']\n')
+
+ def _normalize_str(self, s, size):
+ if len(s) <= 5 or size <= 5:
+ return s
+ if len(s) >= size:
+ return '...' + s[len(s)-size-3:]
+ return s
+
+ def _manage_display(self):
+ screenh, screenw = scr.getmaxyx()
+ fcsize = self._fc.copied_size()
+ self._fc.reset_size()
+ self._currentfilecopiedsize += fcsize
+ self._copiedsize += fcsize
+ if (self._currentfile != self._previousfile or
+ self._copiedsize != self._previouscopiedsize):
+ if self._currentfile != self._previousfile:
+ self._currentfilecopiedsize = fcsize
+ self._previousfile = self._currentfile
+ self._previouscopiedsize = self._copiedsize
+ (ifn, size, ofn, base) = self._currentfile
+ speed = 0
+ with self._speed_lock:
+ speed = self._speed
+ perc = 0
+ gperc = 0
+ if size != 0:
+ perc = self._currentfilecopiedsize * 100 / size
+ if self._totalsize != 0:
+ gperc = self._copiedsize * 100 / self._totalsize
+ scr.addstr(0, 28, self._normalize_str(ifn, screenw - 28) + '\n')
+ scr.addstr(1, 28, convert_bytes(self._currentfilecopiedsize)
+ + '/' + convert_bytes(size) + '\n')
+ scr.addstr(2, 28, self._normalize_str(ofn, screenw - 28) + '\n')
+ scr.addstr(3, 28, self._normalize_str(base, screenw - 28) + '\n')
+ scr.addstr(4, 28, convert_bytes(self._copiedsize) + '/' +
+ convert_bytes(self._totalsize) + '\n')
+ scr.addstr(5, 28, convert_bytes(speed) + '/s\n')
+ scr.addstr(6, 28, str(perc) + '%\n')
+ scr.addstr(7, 28, str(gperc) + '%\n')
+ self._print_progressbar(perc, 8)
+ self._print_progressbar(gperc, 9)
+ scr.refresh()
+
+ def _gettreefiles(self, src, prefix = None):
+ if src[len(src)-1] == os.sep:
+ src = src[0:len(src)-1] # avoid basename troubles
+ names = os.listdir(src)
+ dstbase = ''
+ if prefix is None:
+ dstbase = os.path.join(self._currentbase, os.path.basename(src))
+ else:
+ dstbase = os.path.join(prefix, os.path.basename(src))
+ ret = []
+ for name in names:
+ srcname = os.path.join(src, name)
+ dstname = os.path.join(dstbase, name)
+ try:
+ if os.path.isdir(srcname):
+ ret.extend(self._gettreefiles(srcname, dstname))
+ else:
+ size = 0
+ try:
+ size = os.path.getsize(srcname)
+ except os.error:
+ logerr('`%s\' inaccessible or command error.' %
+ srcname)
+ continue
+ newfile = (srcname, size, dstname, self._currentbase)
+ ret.append(newfile)
+ self._totalsize += size
+ except (IOError, os.error), why:
+ logerr('`%s\', %s.' % (srcname, str(why)))
+ except shutil.Error, err:
+ logerr('`%s\', %s.' % (srcname, str(err.args)))
+
+ return ret
+
+ def _command_add(self, ifn):
+ if not os.path.isdir(ifn):
+ size = 0
+ ofn = os.path.join(self._currentbase,
+ os.path.basename(ifn))
+ try:
+ size = os.path.getsize(ifn)
+ except os.error:
+ logerr('`%s\' inaccessible or command error.' %
+ ifn)
+ return
+ newfile = (ifn, size, ofn, self._currentbase)
+ log('adding file: ' + str(newfile))
+ self._filelist.append(newfile)
+ self._totalsize += size
+ else:
+ newfiles = self._gettreefiles(ifn)
+ self._filelist.extend(newfiles)
+ log('adding files: ' + str(newfiles))
+
+ def _command_iadd(self, arg):
+ logerr('iadd not implemented')
+
+ def _declare_quit(self):
+ self._quit = True
+ if self._speed_timer is not None:
+ self._speed_timer.cancel()
+ self._speed_timer = None
+
+ def main_loop(self):
+ if self._quit:
+ return # app is quitting, do not do any operation
+
+ item = self._rc.next()
+ if item != None:
+ index = item.find(' ')
+ command = ''
+ arg = ''
+ if index != -1:
+ command = item[0:index]
+ arg = item[index + 1:]
+ else:
+ command = item
+ log('command: `%s\'\narg: `%s\'' % (command, arg))
+ if command == 'add':
+ self._command_add(arg)
+ elif command == 'iadd': # add as immutable, copy dir structure
+ self._command_iadd(arg)
+ elif command == 'quit':
+ self._declare_quit()
+ return
+ else:
+ logerr('unrecognized command: `%s\' on line `%s\'' %
+ (command, item))
+
+ if not self._rc.is_alive():
+ self._declare_quit()
+ return
+
+ key = scr.getch()
+ if key != curses.ERR:
+ if key == ord('q') or key == ord('Q'):
+ self._declare_quit()
+ elif key == ord('p'):
+ scr.addstr(23, 0, 'Paused : touch the \'p\' key to continue')
+ scr.refresh()
+ while True:
+ key = scr.getch()
+ if key != curses.ERR and key == ord('p'):
+ break
+ else:
+ time.sleep(1)
+ scr.addstr(23, 0, ' ')
+ scr.refresh()
+
+ self._manage_copy()
+ self._manage_display()
+
+def main(stdscr):
+ global scr
+ scr = stdscr
+ main_loop = MainLoop()
+ while not main_loop.should_quit():
+ main_loop.main_loop()
+
+if __name__ == '__main__':
+ # redirecting stderr errors to stderr.txt since it's hard to see inside
+ # alternative curses screen.
+ stderr = open(os.path.join(tempfile.gettempdir(),
+ 'forkcopy-stderr_' + getpass.getuser() + '.txt'), 'wb')
+ sys.stderr.close()
+ sys.stderr = stderr
+ with open(os.path.join(tempfile.gettempdir(),
+ 'forkcopy-log_' + getpass.getuser() + '.txt'), 'wb') as logfile:
+ log('START')
+ curses.wrapper(main)
+ log('STOP')
+ sys.exit(0)
diff --git a/forkcopy.sh b/forkcopy.sh
new file mode 100755
index 0000000..f6ac3a2
--- /dev/null
+++ b/forkcopy.sh
@@ -0,0 +1,9 @@
+#!/bin/sh
+
+sockpath="/tmp/forkcopy-socket_$(id -nu)"
+for i in "$@"; do
+socat unix:"$sockpath" - <<EOF
+add $PWD/$i
+EOF
+echo
+done