diff options
Diffstat (limited to 'fs/commands/fscp.py')
-rw-r--r-- | fs/commands/fscp.py | 184 |
1 files changed, 92 insertions, 92 deletions
diff --git a/fs/commands/fscp.py b/fs/commands/fscp.py index 58d6c1a..6d61c46 100644 --- a/fs/commands/fscp.py +++ b/fs/commands/fscp.py @@ -8,52 +8,52 @@ import time import threading -class FileOpThread(threading.Thread): - +class FileOpThread(threading.Thread): + def __init__(self, action, name, dest_fs, queue, on_done, on_error): - self.action = action + self.action = action self.dest_fs = dest_fs self.queue = queue self.on_done = on_done self.on_error = on_error self.finish_event = threading.Event() - super(FileOpThread, self).__init__() - - def run(self): - - while not self.finish_event.isSet(): + super(FileOpThread, self).__init__() + + def run(self): + + while not self.finish_event.isSet(): try: path_type, fs, path, dest_path = self.queue.get(timeout=0.1) except queue.Empty: continue - try: - if path_type == FScp.DIR: + try: + if path_type == FScp.DIR: self.dest_fs.makedir(path, recursive=True, allow_recreate=True) - else: - self.action(fs, path, self.dest_fs, dest_path, overwrite=True) - except Exception, e: - self.on_error(e) - self.queue.task_done() - break + else: + self.action(fs, path, self.dest_fs, dest_path, overwrite=True) + except Exception, e: + self.on_error(e) + self.queue.task_done() + break else: - self.queue.task_done() + self.queue.task_done() self.on_done(path_type, fs, path, self.dest_fs, dest_path) - - + + class FScp(Command): - + DIR, FILE = 0, 1 - + usage = """fscp [OPTION]... [SOURCE]... [DESTINATION] Copy SOURCE to DESTINATION""" - + def get_action(self): if self.options.threads > 1: return copyfile_non_atomic else: return copyfile - + def get_verb(self): return 'copying...' @@ -62,40 +62,40 @@ Copy SOURCE to DESTINATION""" optparse.add_option('-p', '--progress', dest='progress', action="store_true", default=False, help="show progress", metavar="PROGRESS") optparse.add_option('-t', '--threads', dest='threads', action="store", default=1, - help="number of threads to use", type="int", metavar="THREAD_COUNT") + help="number of threads to use", type="int", metavar="THREAD_COUNT") return optparse - + def do_run(self, options, args): - - self.options = options + + self.options = options if len(args) < 2: self.error("at least two filesystems required\n") return 1 - + srcs = args[:-1] - dst = args[-1] - + dst = args[-1] + dst_fs, dst_path = self.open_fs(dst, writeable=True, create_dir=True) - + if dst_path is not None and dst_fs.isfile(dst_path): self.error('Destination must be a directory\n') return 1 - + if dst_path: dst_fs = dst_fs.makeopendir(dst_path) - dst_path = None - + dst_path = None + copy_fs_paths = [] - - progress = options.progress - + + progress = options.progress + if progress: sys.stdout.write(self.progress_bar(len(srcs), 0, 'scanning...')) sys.stdout.flush() - - self.root_dirs = [] + + self.root_dirs = [] for i, fs_url in enumerate(srcs): - src_fs, src_path = self.open_fs(fs_url) + src_fs, src_path = self.open_fs(fs_url) if src_path is None: src_path = '/' @@ -103,44 +103,44 @@ Copy SOURCE to DESTINATION""" if iswildcard(src_path): for file_path in src_fs.listdir(wildcard=src_path, full=True): copy_fs_paths.append((self.FILE, src_fs, file_path, file_path)) - - else: - if src_fs.isdir(src_path): - self.root_dirs.append((src_fs, src_path)) + + else: + if src_fs.isdir(src_path): + self.root_dirs.append((src_fs, src_path)) src_sub_fs = src_fs.opendir(src_path) for dir_path, file_paths in src_sub_fs.walk(): - if dir_path not in ('', '/'): + if dir_path not in ('', '/'): copy_fs_paths.append((self.DIR, src_sub_fs, dir_path, dir_path)) sub_fs = src_sub_fs.opendir(dir_path) - for file_path in file_paths: + for file_path in file_paths: copy_fs_paths.append((self.FILE, sub_fs, file_path, pathjoin(dir_path, file_path))) else: if src_fs.exists(src_path): copy_fs_paths.append((self.FILE, src_fs, src_path, src_path)) else: self.error('%s is not a file or directory\n' % src_path) - return 1 - + return 1 + if progress: sys.stdout.write(self.progress_bar(len(srcs), i + 1, 'scanning...')) sys.stdout.flush() - + if progress: sys.stdout.write(self.progress_bar(len(copy_fs_paths), 0, self.get_verb())) sys.stdout.flush() - - if self.options.threads > 1: + + if self.options.threads > 1: copy_fs_dirs = [r for r in copy_fs_paths if r[0] == self.DIR] - copy_fs_paths = [r for r in copy_fs_paths if r[0] == self.FILE] - for path_type, fs, path, dest_path in copy_fs_dirs: - dst_fs.makedir(path, allow_recreate=True, recursive=True) - + copy_fs_paths = [r for r in copy_fs_paths if r[0] == self.FILE] + for path_type, fs, path, dest_path in copy_fs_dirs: + dst_fs.makedir(path, allow_recreate=True, recursive=True) + self.lock = threading.RLock() - + self.total_files = len(copy_fs_paths) self.done_files = 0 - - file_queue = queue.Queue() + + file_queue = queue.Queue() threads = [FileOpThread(self.get_action(), 'T%i' % i, dst_fs, @@ -148,59 +148,59 @@ Copy SOURCE to DESTINATION""" self.on_done, self.on_error) for i in xrange(options.threads)] - + for thread in threads: thread.start() - + self.action_errors = [] complete = False - try: - enqueue = file_queue.put + try: + enqueue = file_queue.put for resource in copy_fs_paths: enqueue(resource) - + while not file_queue.empty(): time.sleep(0) if self.any_error(): raise SystemExit # Can't use queue.join here, or KeyboardInterrupt will not be - # caught until the queue is finished + # caught until the queue is finished #file_queue.join() - - except KeyboardInterrupt: - options.progress = False + + except KeyboardInterrupt: + options.progress = False self.output("\nCancelling...\n") - + except SystemExit: - options.progress = False - + options.progress = False + finally: - sys.stdout.flush() + sys.stdout.flush() for thread in threads: - thread.finish_event.set() + thread.finish_event.set() for thread in threads: thread.join() complete = True if not self.any_error(): self.post_actions() - + dst_fs.close() - + if self.action_errors: for error in self.action_errors: - self.error(self.wrap_error(unicode(error)) + '\n') + self.error(self.wrap_error(unicode(error)) + '\n') sys.stdout.flush() else: if complete and options.progress: sys.stdout.write(self.progress_bar(self.total_files, self.done_files, '')) sys.stdout.write('\n') sys.stdout.flush() - + def post_actions(self): pass - - def on_done(self, path_type, src_fs, src_path, dst_fs, dst_path): - self.lock.acquire() + + def on_done(self, path_type, src_fs, src_path, dst_fs, dst_path): + self.lock.acquire() try: if self.options.verbose: if path_type == self.DIR: @@ -208,44 +208,44 @@ Copy SOURCE to DESTINATION""" else: print "%s -> %s" % (src_fs.desc(src_path), dst_fs.desc(dst_path)) elif self.options.progress: - self.done_files += 1 + self.done_files += 1 sys.stdout.write(self.progress_bar(self.total_files, self.done_files, self.get_verb())) sys.stdout.flush() finally: self.lock.release() - - def on_error(self, e): + + def on_error(self, e): self.lock.acquire() try: self.action_errors.append(e) finally: self.lock.release() - - def any_error(self): + + def any_error(self): self.lock.acquire() try: return bool(self.action_errors) finally: self.lock.release() - + def progress_bar(self, total, remaining, msg=''): bar_width = 20 throbber = '|/-\\' throb = throbber[remaining % len(throbber)] done = float(remaining) / total - + done_steps = int(done * bar_width) - bar_steps = ('#' * done_steps).ljust(bar_width) - - msg = '%s %i%%' % (msg, int(done * 100.0)) + bar_steps = ('#' * done_steps).ljust(bar_width) + + msg = '%s %i%%' % (msg, int(done * 100.0)) msg = msg.ljust(20) - + if total == remaining: throb = '' - + bar = '\r%s[%s] %s\r' % (throb, bar_steps, msg.lstrip()) return bar - + def run(): return FScp().run() |