diff options
author | willmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f> | 2010-12-18 19:34:54 +0000 |
---|---|---|
committer | willmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f> | 2010-12-18 19:34:54 +0000 |
commit | 26be977288347ef7202ebd4035b0020daf7edb8b (patch) | |
tree | 46aa6966a7d3edcd170f0e2601691a4af3fc39d8 | |
parent | 9c69df76498cb902cc6dde16a1ceda7dd6f4c452 (diff) | |
download | pyfilesystem-26be977288347ef7202ebd4035b0020daf7edb8b.tar.gz |
Added openers for Tahoe and Dav
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@559 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r-- | ChangeLog | 3 | ||||
-rw-r--r-- | fs/base.py | 2 | ||||
-rw-r--r-- | fs/commands/fscp.py | 80 | ||||
-rw-r--r-- | fs/commands/fsmv.py | 9 | ||||
-rw-r--r-- | fs/commands/runner.py | 37 | ||||
-rw-r--r-- | fs/contrib/davfs/__init__.py | 9 | ||||
-rw-r--r-- | fs/contrib/tahoefs/__init__.py | 12 | ||||
-rw-r--r-- | fs/opener.py | 122 | ||||
-rw-r--r-- | fs/s3fs.py | 2 | ||||
-rw-r--r-- | fs/utils.py | 79 |
10 files changed, 284 insertions, 71 deletions
@@ -64,7 +64,8 @@ * Added a getmmap to base * Added command line scripts fsls, fstree, fscat, fscp, fsmv * Added command line scripts fsmkdir, fsmount - * Made automatically pick up keys if no other authentication is available + * Made SFTP automatically pick up keys if no other authentication is available * Optimized listdir and listdirinfo in SFTPFS * Made memoryfs work with threads + * Added copyfile_non_atomic and movefile_non_atomic for improved performance of multi-threaded copies @@ -845,6 +845,8 @@ class FS(object): :param overwrite: if True, then an existing file at the destination path will be silently overwritten; if False then an exception will be raised in this case. + :param overwrite: When True the destination will be overwritten (if it exists), + otherwise a DestinationExistsError will be thrown :type overwrite: bool :param chunk_size: Size of chunks to use when copying, if a simple copy is required diff --git a/fs/commands/fscp.py b/fs/commands/fscp.py index 48978e9..8d43bf2 100644 --- a/fs/commands/fscp.py +++ b/fs/commands/fscp.py @@ -1,5 +1,5 @@ from fs.opener import opener -from fs.utils import copyfile, copystructure +from fs.utils import copyfile, copyfile_non_atomic, copystructure from fs.path import pathjoin, iswildcard from fs.errors import FSError from fs.commands.runner import Command @@ -22,26 +22,26 @@ class FileOpThread(threading.Thread): def run(self): - try: - while not self.finish_event.isSet(): - try: - path_type, fs, path, dest_path = self.queue.get(timeout=0.1) - except queue.Empty: - continue - try: - if path_type == FScp.DIR: - self.dest_fs.makedir(path, recursive=True, allow_recreate=True) - else: - self.action(fs, path, self.dest_fs, dest_path, overwrite=True) - except Exception, e: - self.queue.task_done() - raise - else: - self.queue.task_done() - self.on_done(path_type, fs, path, self.dest_fs, dest_path) - - except Exception, e: - self.on_error(e) + while not self.finish_event.isSet(): + try: + path_type, fs, path, dest_path = self.queue.get(timeout=0.1) + except queue.Empty: + continue + try: + if path_type == FScp.DIR: + self.dest_fs.makedir(path, recursive=True, allow_recreate=True) + else: + self.action(fs, path, self.dest_fs, dest_path, overwrite=True) + except Exception, e: + print e + self.on_error(e) + self.queue.task_done() + break + else: + self.queue.task_done() + self.on_done(path_type, fs, path, self.dest_fs, dest_path) + + class FScp(Command): @@ -51,7 +51,10 @@ class FScp(Command): Copy SOURCE to DESTINATION""" def get_action(self): - return copyfile + if self.options.threads > 1: + return copyfile_non_atomic + else: + return copyfile def get_verb(self): return 'copying...' @@ -83,7 +86,7 @@ Copy SOURCE to DESTINATION""" if dst_path: dst_fs = dst_fs.makeopendir(dst_path) dst_path = None - + copy_fs_paths = [] progress = options.progress @@ -147,10 +150,11 @@ Copy SOURCE to DESTINATION""" self.on_done, self.on_error) for i in xrange(options.threads)] + for thread in threads: thread.start() - self.action_error = None + self.action_errors = [] complete = False try: enqueue = file_queue.put @@ -166,16 +170,11 @@ Copy SOURCE to DESTINATION""" #file_queue.join() except KeyboardInterrupt: - options.progress = False - if self.action_error: - self.error(self.wrap_error(unicode(self.action_error)) + '\n') - else: - self.output("\nCancelling...\n") + options.progress = False + self.output("\nCancelling...\n") except SystemExit: - options.progress = False - if self.action_error: - self.error(self.wrap_error(unicode(self.action_error)) + '\n') + options.progress = False finally: sys.stdout.flush() @@ -188,10 +187,16 @@ Copy SOURCE to DESTINATION""" dst_fs.close() - if complete and options.progress: - sys.stdout.write(self.progress_bar(self.total_files, self.done_files, '')) + if self.action_errors: + for error in self.action_errors: + self.error(self.wrap_error(unicode(error)) + '\n') sys.stdout.write('\n') - sys.stdout.flush() + sys.stdout.flush() + else: + if complete and options.progress: + sys.stdout.write(self.progress_bar(self.total_files, self.done_files, '')) + sys.stdout.write('\n') + sys.stdout.flush() def post_actions(self): pass @@ -212,16 +217,17 @@ Copy SOURCE to DESTINATION""" self.lock.release() def on_error(self, e): + print e self.lock.acquire() try: - self.action_error = e + self.action_errors.append(e) finally: self.lock.release() def any_error(self): self.lock.acquire() try: - return bool(self.action_error) + return bool(self.action_errors) finally: self.lock.release() diff --git a/fs/commands/fsmv.py b/fs/commands/fsmv.py index 2de4a4a..a8520fd 100644 --- a/fs/commands/fsmv.py +++ b/fs/commands/fsmv.py @@ -1,4 +1,4 @@ -from fs.utils import movefile, contains_files +from fs.utils import movefile, movefile_non_atomic, contains_files from fs.commands import fscp import sys @@ -10,8 +10,11 @@ Move files from SOURCE to DESTINATION""" def get_verb(self): return 'moving...' - def get_action(self): - return movefile + def get_action(self): + if self.options.threads > 1: + return movefile_non_atomic + else: + return movefile def post_actions(self): for fs, dirpath in self.root_dirs: diff --git a/fs/commands/runner.py b/fs/commands/runner.py index bc63ca1..2ed1cef 100644 --- a/fs/commands/runner.py +++ b/fs/commands/runner.py @@ -1,6 +1,6 @@ import sys from optparse import OptionParser -from fs.opener import opener, OpenerError +from fs.opener import opener, OpenerError, Opener from fs.errors import FSError from fs.path import splitext, pathsplit, isdotfile, iswildcard import platform @@ -117,12 +117,8 @@ class Command(object): return self.wrap_link(fs_url) return re.sub(re_fs, repl, text) - def open_fs(self, fs_url, writeable=False, create_dir=False): - try: - fs, path = opener.parse(fs_url, writeable=writeable, create_dir=create_dir) - except OpenerError, e: - self.error(str(e), '\n') - sys.exit(1) + def open_fs(self, fs_url, writeable=False, create_dir=False): + fs, path = opener.parse(fs_url, writeable=writeable, create_dir=create_dir) fs.cache_hint(True) return fs, path @@ -238,6 +234,8 @@ class Command(object): help="make output verbose", metavar="VERBOSE") optparse.add_option('--listopeners', dest='listopeners', action="store_true", default=False, help="list all FS openers", metavar="LISTOPENERS") + optparse.add_option('--fs', dest='fs', action='append', type="string", + help="import an FS opener e.g --fs foo.bar.MyOpener", metavar="OPENER") return optparse def list_openers(self): @@ -288,6 +286,31 @@ class Command(object): self.list_openers() return 0 + ilocals = {} + if options.fs: + for import_opener in options.fs: + module_name, opener_class = import_opener.rsplit('.', 1) + try: + opener_module = __import__(module_name, globals(), ilocals, [opener_class], -1) + except ImportError: + self.error("Unable to import opener %s\n" % import_opener) + return 0 + + new_opener = getattr(opener_module, opener_class) + + try: + if not issubclass(new_opener, Opener): + self.error('%s is not an fs.opener.Opener\n' % import_opener) + return 0 + except TypeError: + self.error('%s is not an opener class\n' % import_opener) + return 0 + + if options.verbose: + self.output('Imported opener %s\n' % import_opener) + + opener.add(new_opener) + args = [unicode(arg, sys.getfilesystemencoding()) for arg in args] self.verbose = options.verbose try: diff --git a/fs/contrib/davfs/__init__.py b/fs/contrib/davfs/__init__.py index cfba09d..652825a 100644 --- a/fs/contrib/davfs/__init__.py +++ b/fs/contrib/davfs/__init__.py @@ -106,11 +106,11 @@ class DAVFS(FS): # after any redirects have been followed. self.url = url resp = self._request("/","PROPFIND","",{"Depth":"0"}) - try: + try: if resp.status == 404: raise ResourceNotFoundError("/",msg="root url gives 404") if resp.status in (401,403): - raise PermissionDeniedError("listdir") + raise PermissionDeniedError("listdir (http %s)" % resp.status) if resp.status != 207: msg = "server at %s doesn't speak WebDAV" % (self.url,) raise RemoteConnectionError("",msg=msg,details=resp.read()) @@ -184,7 +184,7 @@ class DAVFS(FS): resp = None try: resp = self._raw_request(url,method,body,headers) - # Loop to retry for redirects and authentication responses. + # Loop to retry for redirects and authentication responses. while resp.status in (301,302,401,403): resp.close() if resp.status in (301,302,): @@ -196,7 +196,7 @@ class DAVFS(FS): raise OperationFailedError(msg="redirection seems to be looping") if len(visited) > 10: raise OperationFailedError("too much redirection") - elif resp.status in (401,403): + elif resp.status in (401,403): if self.get_credentials is None: break else: @@ -494,6 +494,7 @@ class DAVFS(FS): if response.status == 405: raise ResourceInvalidError(path) if response.status < 200 or response.status >= 300: + print response.read() raise_generic_error(response,"remove",path) return True diff --git a/fs/contrib/tahoefs/__init__.py b/fs/contrib/tahoefs/__init__.py index 981ae5a..c9eec02 100644 --- a/fs/contrib/tahoefs/__init__.py +++ b/fs/contrib/tahoefs/__init__.py @@ -1,7 +1,7 @@ '''
Example (it will use publicly available, but slow-as-hell Tahoe-LAFS cloud):
- from fs.tahoefs import TahoeFS, Connection
+ from fs.contrib.tahoefs import TahoeFS, Connection
dircap = TahoeFS.createdircap(webapi='http://pubgrid.tahoe-lafs.org')
print "Your dircap (unique key to your storage directory) is", dircap
print "Keep it safe!"
@@ -86,13 +86,13 @@ class TahoeFS(CacheFS): def __init__(self, dircap, timeout=60, autorun=True, largefilesize=10*1024*1024, webapi='http://127.0.0.1:3456'):
'''
Creates instance of TahoeFS.
- dircap - special hash allowing user to work with TahoeLAFS directory.
- timeout - how long should underlying CacheFS keep information about files
+ :param dircap: special hash allowing user to work with TahoeLAFS directory.
+ :param timeout: how long should underlying CacheFS keep information about files
before asking TahoeLAFS node again.
- autorun - Allow listing autorun files? Can be very dangerous on Windows!.
+ :param autorun: Allow listing autorun files? Can be very dangerous on Windows!.
This is temporary hack, as it should be part of Windows-specific middleware,
not Tahoe itself.
- largefilesize - Create placeholder file for files larger than this tresholf.
+ :param largefilesize: - Create placeholder file for files larger than this treshold.
Uploading and processing of large files can last extremely long (many hours),
so placing this placeholder can help you to remember that upload is processing.
Setting this to None will skip creating placeholder files for any uploads.
@@ -384,7 +384,7 @@ class _TahoeFS(FS): offset=offset, length=length)
@_fix_path
- def setcontents(self, path, file):
+ def setcontents(self, path, file, chunk_size=64*1024):
self._log(INFO, 'Uploading file %s' % path)
path = self.tahoeutil.fixwinpath(path, False)
size=None
diff --git a/fs/opener.py b/fs/opener.py index 68124ab..495f338 100644 --- a/fs/opener.py +++ b/fs/opener.py @@ -31,15 +31,20 @@ def _expand_syspath(path): return path -def _parse_credentials(url): +def _parse_credentials(url): + scheme = None + if '://' in url: + scheme, url = url.split('://', 1) username = None - password = None + password = None if '@' in url: credentials, url = url.split('@', 1) if ':' in credentials: username, password = credentials.split(':', 1) else: username = credentials + if scheme is not None: + url = '%s://%s' % (scheme, url) return username, password, url def _parse_name(fs_name): @@ -48,6 +53,13 @@ def _parse_name(fs_name): return fs_name, fs_name_params else: return fs_name, None + +def _split_url_path(url): + if '://' not in url: + url = 'http://' + url + scheme, netloc, path, params, query, fragment = urlparse(url) + url = '%s://%s' % (scheme, netloc) + return url, path class OpenerRegistry(object): @@ -133,6 +145,9 @@ class OpenerRegistry(object): return fs, resourcename fs_path = join(fs_path, path) + + if create_dir and fs_path: + fs.makedir(fs_path, allow_recreate=True) pathname, resourcename = pathsplit(fs_path or '') if pathname and resourcename: @@ -186,7 +201,7 @@ class OpenerRegistry(object): :param fs_url: an FS URL e.g. ftp://ftp.mozilla.org :param writeable: set to True (the default) if the FS must be writeable :param create_dir: create the directory references by the FS URL, if - it doesn't already exist + it doesn't already exist """ fs, path = self.parse(fs_url, writable=writeable, create_dir=create_dir) @@ -418,7 +433,7 @@ example: def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir): from fs.wrapfs.debugfs import DebugFS if fs_path: - fs, path = registry.parse(fs_path, writeable=writeable, create=create_dir) + fs, path = registry.parse(fs_path, writeable=writeable, create_dir=create_dir) return DebugFS(fs, verbose=False), None if fs_name_params == 'ram': from fs.memoryfs import MemoryFS @@ -440,12 +455,100 @@ example: @classmethod def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir): from fs.tempfs import TempFS - fs = TempFS(identifier=fs_name_params) - if create_dir and fs_path: - fs = fs.makeopendir(fs_path) - fs_path = pathsplit(fs_path) + fs = TempFS(identifier=fs_name_params) return fs, fs_path + +class S3Opener(Opener): + names = ['s3'] + desc = """Opens a filesystem stored on Amazon S3 storage + The environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should be set""" + @classmethod + def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir): + from fs.s3fs import S3FS + + bucket = fs_path + path ='' + if '/' in fs_path: + bucket, path = fs_path.split('/', 1) + + fs = S3FS(bucket) + + if path: + dirpath, resourcepath = pathsplit(path) + if dirpath: + fs = fs.opendir(dirpath) + path = resourcepath + + return fs, path + +class TahoeOpener(Opener): + names = ['tahoe'] + desc = """Opens a Tahoe-LAFS filesystem + + example: + * tahoe://http://pubgrid.tahoe-lafs.org/uri/URI:DIR2:h5bkxelehowscijdb [...]""" + + @classmethod + def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir): + from fs.contrib.tahoefs import TahoeFS + + if '/uri/' not in fs_path: + raise OpenerError("""Tahoe url should be in the form <url>/uri/<dicap>""") + + url, dircap = fs_path.split('/uri/') + path = '' + if '/' in dircap: + dircap, path = dircap.split('/', 1) + + fs = TahoeFS(dircap, webapi=url) + + if '/' in path: + dirname, resourcename = pathsplit(path) + if createdir: + fs = fs.makeopendir(dirname) + else: + fs = fs.opendir(dirname) + path = '' + + return fs, path + + +class DavOpener(Opener): + names = ['dav'] + desc = """Opens a WebDAV server +example: +* dav://example.org/dav""" + + @classmethod + def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir): + from fs.contrib.davfs import DAVFS + + url = fs_path + + if '://' not in url: + url = 'http://' + url + + scheme, url = url.split('://', 1) + + username, password, url = _parse_credentials(url) + + credentials = None + if username or password: + credentials = {} + if username: + credentials['username'] = username + if password: + credentials['password'] = password + + url = '%s://%s' % (scheme, url) + + fs = DAVFS(url, credentials=credentials) + + return fs, '' + + + opener = OpenerRegistry([OSFSOpener, ZipOpener, @@ -455,6 +558,9 @@ opener = OpenerRegistry([OSFSOpener, MemOpener, DebugOpener, TempOpener, + S3Opener, + TahoeOpener, + DavOpener, ]) @@ -72,7 +72,7 @@ class S3FS(FS): PATH_MAX = None NAME_MAX = None - def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_synchronize=True,key_sync_timeout=1): + def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_synchronize=True, key_sync_timeout=1): """Constructor for S3FS objects. S3FS objects require the name of the S3 bucket in which to store diff --git a/fs/utils.py b/fs/utils.py index bbe9c93..39ddd3d 100644 --- a/fs/utils.py +++ b/fs/utils.py @@ -53,6 +53,37 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1 FS._shutil_copyfile(src_syspath, dst_syspath) return + src_lock = getattr(src_fs, '_lock', None) + + if src_lock is not None: + src_lock.acquire() + + try: + src = None + try: + src = src_fs.open(src_path, 'rb') + dst_fs.setcontents(dst_path, src, chunk_size=chunk_size) + finally: + if src is not None: + src.close() + finally: + if src_lock is not None: + src_lock.release() + +def copyfile_non_atomic(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1024): + """A non atomic version of copyfile (will not block other threads using src_fs or dst_fst) + + :param src_fs: Source filesystem object + :param src_path: -- Source path + :param dst_fs: Destination filesystem object + :param dst_path: Destination filesystem object + :param chunk_size: Size of chunks to move if system copyfile is not available (default 16K) + + """ + + if not overwrite and dst_fs.exists(dst_path): + raise DestinationExistsError(dst_path) + src = None dst = None try: @@ -97,24 +128,64 @@ def movefile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1 FS._shutil_movefile(src_syspath, dst_syspath) return + src_lock = getattr(src_fs, '_lock', None) + + if src_lock is not None: + src_lock.acquire() + + try: + src = None + try: + # Chunk copy + src = src_fs.open(src_path, 'rb') + dst_fs.setcontents(dst_path, src, chunk_size=chunk_size) + except: + raise + else: + src_fs.remove(src_path) + finally: + if src is not None: + src.close() + finally: + if src_lock is not None: + src_lock.release() + + +def movefile_non_atomic(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1024): + """A non atomic version of movefile (wont block other threads using src_fs or dst_fs + + :param src_fs: Source filesystem object + :param src_path: Source path + :param dst_fs: Destination filesystem object + :param dst_path: Destination filesystem object + :param chunk_size: Size of chunks to move if system copyfile is not available (default 16K) + + """ + + if not overwrite and dst_fs.exists(dst_path): + raise DestinationExistsError(dst_path) + src = None dst = None try: # Chunk copy src = src_fs.open(src_path, 'rb') - dst = src_fs.open(dst_path, 'wb') + dst = dst_fs.open(dst_path, 'wb') write = dst.write read = src.read chunk = read(chunk_size) while chunk: write(chunk) - chunk = read(chunk_size) + chunk = read(chunk_size) + except: + raise + else: + src_fs.remove(src_path) finally: if src is not None: src.close() if dst is not None: - dst.close() - src_fs.remove(src_path) + dst.close() def movedir(fs1, fs2, overwrite=False, ignore_errors=False, chunk_size=64*1024): |