summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwillmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f>2010-12-18 19:34:54 +0000
committerwillmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f>2010-12-18 19:34:54 +0000
commit26be977288347ef7202ebd4035b0020daf7edb8b (patch)
tree46aa6966a7d3edcd170f0e2601691a4af3fc39d8
parent9c69df76498cb902cc6dde16a1ceda7dd6f4c452 (diff)
downloadpyfilesystem-26be977288347ef7202ebd4035b0020daf7edb8b.tar.gz
Added openers for Tahoe and Dav
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@559 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r--ChangeLog3
-rw-r--r--fs/base.py2
-rw-r--r--fs/commands/fscp.py80
-rw-r--r--fs/commands/fsmv.py9
-rw-r--r--fs/commands/runner.py37
-rw-r--r--fs/contrib/davfs/__init__.py9
-rw-r--r--fs/contrib/tahoefs/__init__.py12
-rw-r--r--fs/opener.py122
-rw-r--r--fs/s3fs.py2
-rw-r--r--fs/utils.py79
10 files changed, 284 insertions, 71 deletions
diff --git a/ChangeLog b/ChangeLog
index 9745af5..6a2b214 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -64,7 +64,8 @@
* Added a getmmap to base
* Added command line scripts fsls, fstree, fscat, fscp, fsmv
* Added command line scripts fsmkdir, fsmount
- * Made automatically pick up keys if no other authentication is available
+ * Made SFTP automatically pick up keys if no other authentication is available
* Optimized listdir and listdirinfo in SFTPFS
* Made memoryfs work with threads
+ * Added copyfile_non_atomic and movefile_non_atomic for improved performance of multi-threaded copies
diff --git a/fs/base.py b/fs/base.py
index 738910d..4983c6a 100644
--- a/fs/base.py
+++ b/fs/base.py
@@ -845,6 +845,8 @@ class FS(object):
:param overwrite: if True, then an existing file at the destination path
will be silently overwritten; if False then an exception
will be raised in this case.
+ :param overwrite: When True the destination will be overwritten (if it exists),
+ otherwise a DestinationExistsError will be thrown
:type overwrite: bool
:param chunk_size: Size of chunks to use when copying, if a simple copy
is required
diff --git a/fs/commands/fscp.py b/fs/commands/fscp.py
index 48978e9..8d43bf2 100644
--- a/fs/commands/fscp.py
+++ b/fs/commands/fscp.py
@@ -1,5 +1,5 @@
from fs.opener import opener
-from fs.utils import copyfile, copystructure
+from fs.utils import copyfile, copyfile_non_atomic, copystructure
from fs.path import pathjoin, iswildcard
from fs.errors import FSError
from fs.commands.runner import Command
@@ -22,26 +22,26 @@ class FileOpThread(threading.Thread):
def run(self):
- try:
- while not self.finish_event.isSet():
- try:
- path_type, fs, path, dest_path = self.queue.get(timeout=0.1)
- except queue.Empty:
- continue
- try:
- if path_type == FScp.DIR:
- self.dest_fs.makedir(path, recursive=True, allow_recreate=True)
- else:
- self.action(fs, path, self.dest_fs, dest_path, overwrite=True)
- except Exception, e:
- self.queue.task_done()
- raise
- else:
- self.queue.task_done()
- self.on_done(path_type, fs, path, self.dest_fs, dest_path)
-
- except Exception, e:
- self.on_error(e)
+ while not self.finish_event.isSet():
+ try:
+ path_type, fs, path, dest_path = self.queue.get(timeout=0.1)
+ except queue.Empty:
+ continue
+ try:
+ if path_type == FScp.DIR:
+ self.dest_fs.makedir(path, recursive=True, allow_recreate=True)
+ else:
+ self.action(fs, path, self.dest_fs, dest_path, overwrite=True)
+ except Exception, e:
+ print e
+ self.on_error(e)
+ self.queue.task_done()
+ break
+ else:
+ self.queue.task_done()
+ self.on_done(path_type, fs, path, self.dest_fs, dest_path)
+
+
class FScp(Command):
@@ -51,7 +51,10 @@ class FScp(Command):
Copy SOURCE to DESTINATION"""
def get_action(self):
- return copyfile
+ if self.options.threads > 1:
+ return copyfile_non_atomic
+ else:
+ return copyfile
def get_verb(self):
return 'copying...'
@@ -83,7 +86,7 @@ Copy SOURCE to DESTINATION"""
if dst_path:
dst_fs = dst_fs.makeopendir(dst_path)
dst_path = None
-
+
copy_fs_paths = []
progress = options.progress
@@ -147,10 +150,11 @@ Copy SOURCE to DESTINATION"""
self.on_done,
self.on_error)
for i in xrange(options.threads)]
+
for thread in threads:
thread.start()
- self.action_error = None
+ self.action_errors = []
complete = False
try:
enqueue = file_queue.put
@@ -166,16 +170,11 @@ Copy SOURCE to DESTINATION"""
#file_queue.join()
except KeyboardInterrupt:
- options.progress = False
- if self.action_error:
- self.error(self.wrap_error(unicode(self.action_error)) + '\n')
- else:
- self.output("\nCancelling...\n")
+ options.progress = False
+ self.output("\nCancelling...\n")
except SystemExit:
- options.progress = False
- if self.action_error:
- self.error(self.wrap_error(unicode(self.action_error)) + '\n')
+ options.progress = False
finally:
sys.stdout.flush()
@@ -188,10 +187,16 @@ Copy SOURCE to DESTINATION"""
dst_fs.close()
- if complete and options.progress:
- sys.stdout.write(self.progress_bar(self.total_files, self.done_files, ''))
+ if self.action_errors:
+ for error in self.action_errors:
+ self.error(self.wrap_error(unicode(error)) + '\n')
sys.stdout.write('\n')
- sys.stdout.flush()
+ sys.stdout.flush()
+ else:
+ if complete and options.progress:
+ sys.stdout.write(self.progress_bar(self.total_files, self.done_files, ''))
+ sys.stdout.write('\n')
+ sys.stdout.flush()
def post_actions(self):
pass
@@ -212,16 +217,17 @@ Copy SOURCE to DESTINATION"""
self.lock.release()
def on_error(self, e):
+ print e
self.lock.acquire()
try:
- self.action_error = e
+ self.action_errors.append(e)
finally:
self.lock.release()
def any_error(self):
self.lock.acquire()
try:
- return bool(self.action_error)
+ return bool(self.action_errors)
finally:
self.lock.release()
diff --git a/fs/commands/fsmv.py b/fs/commands/fsmv.py
index 2de4a4a..a8520fd 100644
--- a/fs/commands/fsmv.py
+++ b/fs/commands/fsmv.py
@@ -1,4 +1,4 @@
-from fs.utils import movefile, contains_files
+from fs.utils import movefile, movefile_non_atomic, contains_files
from fs.commands import fscp
import sys
@@ -10,8 +10,11 @@ Move files from SOURCE to DESTINATION"""
def get_verb(self):
return 'moving...'
- def get_action(self):
- return movefile
+ def get_action(self):
+ if self.options.threads > 1:
+ return movefile_non_atomic
+ else:
+ return movefile
def post_actions(self):
for fs, dirpath in self.root_dirs:
diff --git a/fs/commands/runner.py b/fs/commands/runner.py
index bc63ca1..2ed1cef 100644
--- a/fs/commands/runner.py
+++ b/fs/commands/runner.py
@@ -1,6 +1,6 @@
import sys
from optparse import OptionParser
-from fs.opener import opener, OpenerError
+from fs.opener import opener, OpenerError, Opener
from fs.errors import FSError
from fs.path import splitext, pathsplit, isdotfile, iswildcard
import platform
@@ -117,12 +117,8 @@ class Command(object):
return self.wrap_link(fs_url)
return re.sub(re_fs, repl, text)
- def open_fs(self, fs_url, writeable=False, create_dir=False):
- try:
- fs, path = opener.parse(fs_url, writeable=writeable, create_dir=create_dir)
- except OpenerError, e:
- self.error(str(e), '\n')
- sys.exit(1)
+ def open_fs(self, fs_url, writeable=False, create_dir=False):
+ fs, path = opener.parse(fs_url, writeable=writeable, create_dir=create_dir)
fs.cache_hint(True)
return fs, path
@@ -238,6 +234,8 @@ class Command(object):
help="make output verbose", metavar="VERBOSE")
optparse.add_option('--listopeners', dest='listopeners', action="store_true", default=False,
help="list all FS openers", metavar="LISTOPENERS")
+ optparse.add_option('--fs', dest='fs', action='append', type="string",
+ help="import an FS opener e.g --fs foo.bar.MyOpener", metavar="OPENER")
return optparse
def list_openers(self):
@@ -288,6 +286,31 @@ class Command(object):
self.list_openers()
return 0
+ ilocals = {}
+ if options.fs:
+ for import_opener in options.fs:
+ module_name, opener_class = import_opener.rsplit('.', 1)
+ try:
+ opener_module = __import__(module_name, globals(), ilocals, [opener_class], -1)
+ except ImportError:
+ self.error("Unable to import opener %s\n" % import_opener)
+ return 0
+
+ new_opener = getattr(opener_module, opener_class)
+
+ try:
+ if not issubclass(new_opener, Opener):
+ self.error('%s is not an fs.opener.Opener\n' % import_opener)
+ return 0
+ except TypeError:
+ self.error('%s is not an opener class\n' % import_opener)
+ return 0
+
+ if options.verbose:
+ self.output('Imported opener %s\n' % import_opener)
+
+ opener.add(new_opener)
+
args = [unicode(arg, sys.getfilesystemencoding()) for arg in args]
self.verbose = options.verbose
try:
diff --git a/fs/contrib/davfs/__init__.py b/fs/contrib/davfs/__init__.py
index cfba09d..652825a 100644
--- a/fs/contrib/davfs/__init__.py
+++ b/fs/contrib/davfs/__init__.py
@@ -106,11 +106,11 @@ class DAVFS(FS):
# after any redirects have been followed.
self.url = url
resp = self._request("/","PROPFIND","",{"Depth":"0"})
- try:
+ try:
if resp.status == 404:
raise ResourceNotFoundError("/",msg="root url gives 404")
if resp.status in (401,403):
- raise PermissionDeniedError("listdir")
+ raise PermissionDeniedError("listdir (http %s)" % resp.status)
if resp.status != 207:
msg = "server at %s doesn't speak WebDAV" % (self.url,)
raise RemoteConnectionError("",msg=msg,details=resp.read())
@@ -184,7 +184,7 @@ class DAVFS(FS):
resp = None
try:
resp = self._raw_request(url,method,body,headers)
- # Loop to retry for redirects and authentication responses.
+ # Loop to retry for redirects and authentication responses.
while resp.status in (301,302,401,403):
resp.close()
if resp.status in (301,302,):
@@ -196,7 +196,7 @@ class DAVFS(FS):
raise OperationFailedError(msg="redirection seems to be looping")
if len(visited) > 10:
raise OperationFailedError("too much redirection")
- elif resp.status in (401,403):
+ elif resp.status in (401,403):
if self.get_credentials is None:
break
else:
@@ -494,6 +494,7 @@ class DAVFS(FS):
if response.status == 405:
raise ResourceInvalidError(path)
if response.status < 200 or response.status >= 300:
+ print response.read()
raise_generic_error(response,"remove",path)
return True
diff --git a/fs/contrib/tahoefs/__init__.py b/fs/contrib/tahoefs/__init__.py
index 981ae5a..c9eec02 100644
--- a/fs/contrib/tahoefs/__init__.py
+++ b/fs/contrib/tahoefs/__init__.py
@@ -1,7 +1,7 @@
'''
Example (it will use publicly available, but slow-as-hell Tahoe-LAFS cloud):
- from fs.tahoefs import TahoeFS, Connection
+ from fs.contrib.tahoefs import TahoeFS, Connection
dircap = TahoeFS.createdircap(webapi='http://pubgrid.tahoe-lafs.org')
print "Your dircap (unique key to your storage directory) is", dircap
print "Keep it safe!"
@@ -86,13 +86,13 @@ class TahoeFS(CacheFS):
def __init__(self, dircap, timeout=60, autorun=True, largefilesize=10*1024*1024, webapi='http://127.0.0.1:3456'):
'''
Creates instance of TahoeFS.
- dircap - special hash allowing user to work with TahoeLAFS directory.
- timeout - how long should underlying CacheFS keep information about files
+ :param dircap: special hash allowing user to work with TahoeLAFS directory.
+ :param timeout: how long should underlying CacheFS keep information about files
before asking TahoeLAFS node again.
- autorun - Allow listing autorun files? Can be very dangerous on Windows!.
+ :param autorun: Allow listing autorun files? Can be very dangerous on Windows!.
This is temporary hack, as it should be part of Windows-specific middleware,
not Tahoe itself.
- largefilesize - Create placeholder file for files larger than this tresholf.
+ :param largefilesize: - Create placeholder file for files larger than this treshold.
Uploading and processing of large files can last extremely long (many hours),
so placing this placeholder can help you to remember that upload is processing.
Setting this to None will skip creating placeholder files for any uploads.
@@ -384,7 +384,7 @@ class _TahoeFS(FS):
offset=offset, length=length)
@_fix_path
- def setcontents(self, path, file):
+ def setcontents(self, path, file, chunk_size=64*1024):
self._log(INFO, 'Uploading file %s' % path)
path = self.tahoeutil.fixwinpath(path, False)
size=None
diff --git a/fs/opener.py b/fs/opener.py
index 68124ab..495f338 100644
--- a/fs/opener.py
+++ b/fs/opener.py
@@ -31,15 +31,20 @@ def _expand_syspath(path):
return path
-def _parse_credentials(url):
+def _parse_credentials(url):
+ scheme = None
+ if '://' in url:
+ scheme, url = url.split('://', 1)
username = None
- password = None
+ password = None
if '@' in url:
credentials, url = url.split('@', 1)
if ':' in credentials:
username, password = credentials.split(':', 1)
else:
username = credentials
+ if scheme is not None:
+ url = '%s://%s' % (scheme, url)
return username, password, url
def _parse_name(fs_name):
@@ -48,6 +53,13 @@ def _parse_name(fs_name):
return fs_name, fs_name_params
else:
return fs_name, None
+
+def _split_url_path(url):
+ if '://' not in url:
+ url = 'http://' + url
+ scheme, netloc, path, params, query, fragment = urlparse(url)
+ url = '%s://%s' % (scheme, netloc)
+ return url, path
class OpenerRegistry(object):
@@ -133,6 +145,9 @@ class OpenerRegistry(object):
return fs, resourcename
fs_path = join(fs_path, path)
+
+ if create_dir and fs_path:
+ fs.makedir(fs_path, allow_recreate=True)
pathname, resourcename = pathsplit(fs_path or '')
if pathname and resourcename:
@@ -186,7 +201,7 @@ class OpenerRegistry(object):
:param fs_url: an FS URL e.g. ftp://ftp.mozilla.org
:param writeable: set to True (the default) if the FS must be writeable
:param create_dir: create the directory references by the FS URL, if
- it doesn't already exist
+ it doesn't already exist
"""
fs, path = self.parse(fs_url, writable=writeable, create_dir=create_dir)
@@ -418,7 +433,7 @@ example:
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
from fs.wrapfs.debugfs import DebugFS
if fs_path:
- fs, path = registry.parse(fs_path, writeable=writeable, create=create_dir)
+ fs, path = registry.parse(fs_path, writeable=writeable, create_dir=create_dir)
return DebugFS(fs, verbose=False), None
if fs_name_params == 'ram':
from fs.memoryfs import MemoryFS
@@ -440,12 +455,100 @@ example:
@classmethod
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
from fs.tempfs import TempFS
- fs = TempFS(identifier=fs_name_params)
- if create_dir and fs_path:
- fs = fs.makeopendir(fs_path)
- fs_path = pathsplit(fs_path)
+ fs = TempFS(identifier=fs_name_params)
return fs, fs_path
+
+class S3Opener(Opener):
+ names = ['s3']
+ desc = """Opens a filesystem stored on Amazon S3 storage
+ The environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should be set"""
+ @classmethod
+ def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
+ from fs.s3fs import S3FS
+
+ bucket = fs_path
+ path =''
+ if '/' in fs_path:
+ bucket, path = fs_path.split('/', 1)
+
+ fs = S3FS(bucket)
+
+ if path:
+ dirpath, resourcepath = pathsplit(path)
+ if dirpath:
+ fs = fs.opendir(dirpath)
+ path = resourcepath
+
+ return fs, path
+
+class TahoeOpener(Opener):
+ names = ['tahoe']
+ desc = """Opens a Tahoe-LAFS filesystem
+
+ example:
+ * tahoe://http://pubgrid.tahoe-lafs.org/uri/URI:DIR2:h5bkxelehowscijdb [...]"""
+
+ @classmethod
+ def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
+ from fs.contrib.tahoefs import TahoeFS
+
+ if '/uri/' not in fs_path:
+ raise OpenerError("""Tahoe url should be in the form <url>/uri/<dicap>""")
+
+ url, dircap = fs_path.split('/uri/')
+ path = ''
+ if '/' in dircap:
+ dircap, path = dircap.split('/', 1)
+
+ fs = TahoeFS(dircap, webapi=url)
+
+ if '/' in path:
+ dirname, resourcename = pathsplit(path)
+ if createdir:
+ fs = fs.makeopendir(dirname)
+ else:
+ fs = fs.opendir(dirname)
+ path = ''
+
+ return fs, path
+
+
+class DavOpener(Opener):
+ names = ['dav']
+ desc = """Opens a WebDAV server
+example:
+* dav://example.org/dav"""
+
+ @classmethod
+ def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
+ from fs.contrib.davfs import DAVFS
+
+ url = fs_path
+
+ if '://' not in url:
+ url = 'http://' + url
+
+ scheme, url = url.split('://', 1)
+
+ username, password, url = _parse_credentials(url)
+
+ credentials = None
+ if username or password:
+ credentials = {}
+ if username:
+ credentials['username'] = username
+ if password:
+ credentials['password'] = password
+
+ url = '%s://%s' % (scheme, url)
+
+ fs = DAVFS(url, credentials=credentials)
+
+ return fs, ''
+
+
+
opener = OpenerRegistry([OSFSOpener,
ZipOpener,
@@ -455,6 +558,9 @@ opener = OpenerRegistry([OSFSOpener,
MemOpener,
DebugOpener,
TempOpener,
+ S3Opener,
+ TahoeOpener,
+ DavOpener,
])
diff --git a/fs/s3fs.py b/fs/s3fs.py
index 7e3d40c..785968c 100644
--- a/fs/s3fs.py
+++ b/fs/s3fs.py
@@ -72,7 +72,7 @@ class S3FS(FS):
PATH_MAX = None
NAME_MAX = None
- def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_synchronize=True,key_sync_timeout=1):
+ def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_synchronize=True, key_sync_timeout=1):
"""Constructor for S3FS objects.
S3FS objects require the name of the S3 bucket in which to store
diff --git a/fs/utils.py b/fs/utils.py
index bbe9c93..39ddd3d 100644
--- a/fs/utils.py
+++ b/fs/utils.py
@@ -53,6 +53,37 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1
FS._shutil_copyfile(src_syspath, dst_syspath)
return
+ src_lock = getattr(src_fs, '_lock', None)
+
+ if src_lock is not None:
+ src_lock.acquire()
+
+ try:
+ src = None
+ try:
+ src = src_fs.open(src_path, 'rb')
+ dst_fs.setcontents(dst_path, src, chunk_size=chunk_size)
+ finally:
+ if src is not None:
+ src.close()
+ finally:
+ if src_lock is not None:
+ src_lock.release()
+
+def copyfile_non_atomic(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1024):
+ """A non atomic version of copyfile (will not block other threads using src_fs or dst_fst)
+
+ :param src_fs: Source filesystem object
+ :param src_path: -- Source path
+ :param dst_fs: Destination filesystem object
+ :param dst_path: Destination filesystem object
+ :param chunk_size: Size of chunks to move if system copyfile is not available (default 16K)
+
+ """
+
+ if not overwrite and dst_fs.exists(dst_path):
+ raise DestinationExistsError(dst_path)
+
src = None
dst = None
try:
@@ -97,24 +128,64 @@ def movefile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1
FS._shutil_movefile(src_syspath, dst_syspath)
return
+ src_lock = getattr(src_fs, '_lock', None)
+
+ if src_lock is not None:
+ src_lock.acquire()
+
+ try:
+ src = None
+ try:
+ # Chunk copy
+ src = src_fs.open(src_path, 'rb')
+ dst_fs.setcontents(dst_path, src, chunk_size=chunk_size)
+ except:
+ raise
+ else:
+ src_fs.remove(src_path)
+ finally:
+ if src is not None:
+ src.close()
+ finally:
+ if src_lock is not None:
+ src_lock.release()
+
+
+def movefile_non_atomic(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1024):
+ """A non atomic version of movefile (wont block other threads using src_fs or dst_fs
+
+ :param src_fs: Source filesystem object
+ :param src_path: Source path
+ :param dst_fs: Destination filesystem object
+ :param dst_path: Destination filesystem object
+ :param chunk_size: Size of chunks to move if system copyfile is not available (default 16K)
+
+ """
+
+ if not overwrite and dst_fs.exists(dst_path):
+ raise DestinationExistsError(dst_path)
+
src = None
dst = None
try:
# Chunk copy
src = src_fs.open(src_path, 'rb')
- dst = src_fs.open(dst_path, 'wb')
+ dst = dst_fs.open(dst_path, 'wb')
write = dst.write
read = src.read
chunk = read(chunk_size)
while chunk:
write(chunk)
- chunk = read(chunk_size)
+ chunk = read(chunk_size)
+ except:
+ raise
+ else:
+ src_fs.remove(src_path)
finally:
if src is not None:
src.close()
if dst is not None:
- dst.close()
- src_fs.remove(src_path)
+ dst.close()
def movedir(fs1, fs2, overwrite=False, ignore_errors=False, chunk_size=64*1024):