summaryrefslogtreecommitdiff
path: root/fs/ftpfs.py
diff options
context:
space:
mode:
authorwillmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f>2010-01-02 18:06:03 +0000
committerwillmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f>2010-01-02 18:06:03 +0000
commitb7dbf13f40bf766fc19ce43d7861b98ca9edbcda (patch)
tree02a42783f330d45f76dd51cb0f9b00588f817f44 /fs/ftpfs.py
parent671cf321fcc774c03d093fbae257e71b1df3d0ea (diff)
downloadpyfilesystem-b7dbf13f40bf766fc19ce43d7861b98ca9edbcda.tar.gz
Optimized FTP fs by caching directory structure
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@310 67cdc799-7952-0410-af00-57a81ceafa0f
Diffstat (limited to 'fs/ftpfs.py')
-rw-r--r--fs/ftpfs.py441
1 files changed, 252 insertions, 189 deletions
diff --git a/fs/ftpfs.py b/fs/ftpfs.py
index 21de57d..cb5c297 100644
--- a/fs/ftpfs.py
+++ b/fs/ftpfs.py
@@ -22,7 +22,7 @@ try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
-
+
import time
import sys
@@ -33,7 +33,7 @@ import sys
class Enum(object):
def __init__(self, *names):
self._names_map = dict((name, i) for i, name in enumerate(names))
-
+
def __getattr__(self, name):
return self._names_map[name]
@@ -512,13 +512,13 @@ def _skip(s, i, c):
-
+
class _FTPFile(object):
-
+
""" A file-like that provides access to a file being streamed over ftp."""
-
+
def __init__(self, ftpfs, ftp, path, mode):
if not hasattr(self, '_lock'):
self._lock = threading.RLock()
@@ -532,14 +532,14 @@ class _FTPFile(object):
if 'r' in mode or 'a' in mode:
self.file_size = ftpfs.getsize(path)
self.conn = None
-
+
path = _encode(path)
#self._lock = ftpfs._lock
-
+
if 'r' in mode:
self.ftp.voidcmd('TYPE I')
self.conn = ftp.transfercmd('RETR '+path, None)
-
+
#self._ftp_thread = threading.Thread(target=do_read)
#self._ftp_thread.start()
elif 'w' in mode or 'a' in mode:
@@ -556,17 +556,17 @@ class _FTPFile(object):
# if callback: callback(buf)
#conn.close()
#return self.voidresp()
-
+
#self._ftp_thread = threading.Thread(target=do_write)
#self._ftp_thread.start()
-
- @synchronize
+
+ @synchronize
def read(self, size=None):
if self.conn is None:
return ''
-
+
chunks = []
- if size is None:
+ if size is None:
while 1:
data = self.conn.recv(4096)
if not data:
@@ -575,9 +575,9 @@ class _FTPFile(object):
self.ftp.voidresp()
break
chunks.append(data)
- self.read_pos += len(data)
+ self.read_pos += len(data)
return ''.join(chunks)
-
+
remaining_bytes = size
while remaining_bytes:
read_size = min(remaining_bytes, 4096)
@@ -590,42 +590,42 @@ class _FTPFile(object):
chunks.append(data)
self.read_pos += len(data)
remaining_bytes -= len(data)
-
+
return ''.join(chunks)
- @synchronize
+ @synchronize
def write(self, data):
-
+
data_pos = 0
remaining_data = len(data)
-
+
while remaining_data:
chunk_size = min(remaining_data, 4096)
self.conn.sendall(data[data_pos:data_pos+chunk_size])
data_pos += chunk_size
remaining_data -= chunk_size
self.write_pos += chunk_size
-
+
def __enter__(self):
return self
def __exit__(self,exc_type,exc_value,traceback):
self.close()
-
+
@synchronize
def flush(self):
- return
-
+ return
+
def seek(self, pos, where=fs.SEEK_SET):
# Ftp doesn't support a real seek, so we close the transfer and resume
# it at the new position with the REST command
# I'm not sure how reliable this method is!
if not self.file_size:
raise ValueError("Seek only works with files open for read")
-
+
self._lock.acquire()
try:
-
+
current = self.tell()
new_pos = None
if where == fs.SEEK_SET:
@@ -636,49 +636,51 @@ class _FTPFile(object):
new_pos = self.file_size + pos
if new_pos < 0:
raise ValueError("Can't seek before start of file")
-
+
if self.conn is not None:
self.conn.close()
-
+
finally:
self._lock.release()
-
+
self.close()
self._lock.acquire()
try:
self.ftp = self.ftpfs._open_ftp()
self.ftp.sendcmd('TYPE I')
- self.ftp.sendcmd('REST %i' % (new_pos))
+ self.ftp.sendcmd('REST %i' % (new_pos))
self.__init__(self.ftpfs, self.ftp, _encode(self.path), self.mode)
self.read_pos = new_pos
finally:
self._lock.release()
-
+
#raise UnsupportedError('ftp seek')
-
+
@synchronize
def tell(self):
if 'r' in self.mode:
return self.read_pos
else:
return self.write_pos
-
- @synchronize
+
+ @synchronize
def close(self):
- if self.conn is not None:
+ if self.conn is not None:
self.conn.close()
self.conn = None
self.ftp.voidresp()
if self.ftp is not None:
self.ftp.close()
self.closed = True
-
+ if 'w' in self.mode or 'a' in self.mode:
+ self.ftpfs._on_file_written(self.path)
+
def __iter__(self):
return self.next()
-
+
def next(self):
""" Line iterator
-
+
This isn't terribly efficient. It would probably be better to do
a read followed by splitlines.
"""
@@ -686,13 +688,13 @@ class _FTPFile(object):
chars = []
while True:
char = self.read(1)
- if not char:
+ if not char:
yield ''.join(chars)
del chars[:]
break
chars.append(char)
if char in endings:
- line = ''.join(chars)
+ line = ''.join(chars)
del chars[:]
c = self.read(1)
if not char:
@@ -702,23 +704,31 @@ class _FTPFile(object):
yield line + c
else:
yield line
- chars.append(c)
-
+ chars.append(c)
+
def ftperrors(f):
@wraps(f)
def deco(self, *args, **kwargs):
+ self._lock.acquire()
try:
- ret = f(self, *args, **kwargs)
- except Exception, e:
- #import traceback
- #traceback.print_exc()
- self._translate_exception(args[0] if args else '', e)
+ self._enter_dircache()
+ try:
+ try:
+ ret = f(self, *args, **kwargs)
+ except Exception, e:
+ self._translate_exception(args[0] if args else '', e)
+ finally:
+ self._leave_dircache()
+ finally:
+ self._lock.release()
+ if not self.use_dircache:
+ self.clear_dircache()
return ret
return deco
-
-
+
+
def _encode(s):
if isinstance(s, unicode):
return s.encode('utf-8')
@@ -726,12 +736,12 @@ def _encode(s):
class FTPFS(FS):
-
+
_locals = threading.local()
-
+
def __init__(self, host='', user='', passwd='', acct='', timeout=_GLOBAL_DEFAULT_TIMEOUT,
port=21,
- dircache=False,
+ dircache=True,
max_buffer_size=128*1024*1024):
"""
:param host:
@@ -739,30 +749,124 @@ class FTPFS(FS):
:param passwd:
:param timeout:
:param dircache: If True then directory information will be cached,
- which will speed up operations such as isdir and isfile, but changes
- to the ftp file structure will not be visible (till clear_dircache) is
+ which will speed up operations such as getinfo, isdi, isfile, but changes
+ to the ftp file structure will not be visible untill clear_dircache is
called
:param max_buffer_size: Number of bytes to hold before blocking write operations.
-
+
"""
-
+
super(FTPFS, self).__init__()
-
+
self.host = host
self.port = port
self.user = user
self.passwd = passwd
self.acct = acct
self.timeout = timeout
-
- self._dircache = {}
+
self.use_dircache = dircache
+ self.get_dircache()
+
self.max_buffer_size = max_buffer_size
-
- self._locals._ftp = None
+
+ self._cache_hint = False
+ self._locals._ftp = None
self._thread_ftps = set()
self.ftp
+ @synchronize
+ def cache_hint(self, enabled):
+ self._cache_hint = enabled
+
+ @synchronize
+ def _enter_dircache(self):
+ self.get_dircache()
+ count = getattr(self._locals, '_dircache_count', 0)
+ count += 1
+ self._locals._dircache_count = count
+
+ @synchronize
+ def _leave_dircache(self):
+ self._locals._dircache_count -= 1
+ if not self._locals._dircache_count and not self._cache_hint:
+ self.clear_dircache()
+ assert self._locals._dircache_count >= 0, "dircache count should never be negative"
+
+ @synchronize
+ def get_dircache(self):
+ dircache = getattr(self._locals, '_dircache', None)
+ if dircache is None:
+ dircache = {}
+ self._locals._dircache = dircache
+ self._locals._dircache_count = 0
+ return dircache
+
+ @synchronize
+ def _on_file_written(self, path):
+ self.clear_dircache(dirname(path))
+
+
+ @synchronize
+ def _readdir(self, path):
+
+ dircache = self.get_dircache()
+ dircache_count = self._locals._dircache_count
+
+ if dircache_count:
+ cached_dirlist = dircache.get(path)
+ if cached_dirlist is not None:
+ return cached_dirlist
+ dirlist = {}
+
+ parser = FTPListDataParser()
+
+ def on_line(line):
+ #print repr(line)
+ if not isinstance(line, unicode):
+ line = line.decode('utf-8')
+ info = parser.parse_line(line)
+ if info:
+ info = info.__dict__
+ dirlist[info['name']] = info
+
+ try:
+ self.ftp.dir(_encode(path), on_line)
+ except error_reply:
+ pass
+ dircache[path] = dirlist
+
+ return dirlist
+
+ @synchronize
+ def clear_dircache(self, *paths):
+ """
+ Clear cached directory information.
+
+ :path: Path of directory to clear cache for, or all directories if
+ None (the default)
+
+ """
+ dircache = self.get_dircache()
+ if not paths:
+ dircache.clear()
+ else:
+ for path in paths:
+ dircache.pop(path, None)
+
+ @synchronize
+ def _check_path(self, path):
+ base, fname = pathsplit(abspath(path))
+ dirlist = self._readdir(base)
+ if fname and fname not in dirlist:
+ raise ResourceNotFoundError(path)
+ return dirlist, fname
+
+ def _get_dirlist(self, path):
+ base, fname = pathsplit(abspath(path))
+ dirlist = self._readdir(base)
+ return dirlist, fname
+
@synchronize
def get_ftp(self):
@@ -771,6 +875,7 @@ class FTPFS(FS):
ftp = self._locals._ftp
self._thread_ftps.add(ftp)
return self._locals._ftp
+ @synchronize
def set_ftp(self, ftp):
self._locals._ftp = ftp
ftp = property(get_ftp, set_ftp)
@@ -780,11 +885,11 @@ class FTPFS(FS):
try:
ftp = FTP()
ftp.connect(self.host, self.port, self.timeout)
- ftp.login(self.user, self.passwd, self.acct)
- except socket_error, e:
+ ftp.login(self.user, self.passwd, self.acct)
+ except socket_error, e:
raise RemoteConnectionError(str(e), details=e)
return ftp
-
+
def __getstate__(self):
state = super(FTPFS, self).__getstate__()
del state["_thread_ftps"]
@@ -792,33 +897,33 @@ class FTPFS(FS):
def __setstate__(self,state):
super(FTPFS, self).__setstate__(state)
- self._thread_ftps = set()
+ self._thread_ftps = set()
self.ftp
-
+
def __str__(self):
return '<FTPFS %s>' % self.host
-
+
def __unicode__(self):
return u'<FTPFS %s>' % self.host
-
+
@convert_os_errors
def _translate_exception(self, path, exception):
-
+
""" Translates exceptions that my be thrown by the ftp code in to
FS exceptions
-
+
TODO: Flesh this out with more specific exceptions
-
+
"""
if isinstance(exception, socket_error):
raise RemoteConnectionError(str(exception), details=exception)
-
+
elif isinstance(exception, error_temp):
code, message = str(exception).split(' ', 1)
raise RemoteConnectionError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception)
-
+
elif isinstance(exception, error_perm):
code, message = str(exception).split(' ', 1)
code = int(code)
@@ -826,91 +931,35 @@ class FTPFS(FS):
raise ResourceNotFoundError(path)
raise PermissionDeniedError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception)
-
raise exception
-
+
@ftperrors
- @synchronize
def close(self):
for ftp in self._thread_ftps:
ftp.close()
+
+ self._thread_ftps.clear()
self.closed = True
-
+
@ftperrors
- @synchronize
def open(self, path, mode='r'):
+ mode = mode.lower()
if 'r' in mode:
if not self.isfile(path):
raise ResourceNotFoundError(path)
+ if 'w' in mode or 'a' in mode:
+ self.clear_dircache(dirname(path))
ftp = self._open_ftp()
f = _FTPFile(self, ftp, path, mode)
return f
- @synchronize
- def _readdir(self, path):
-
- if self.use_dircache:
- cached_dirlist = self._dircache.get(path)
- if cached_dirlist is not None:
- return cached_dirlist
- dirlist = {}
-
- parser = FTPListDataParser()
-
- def on_line(line):
- #print repr(line)
- if not isinstance(line, unicode):
- line = line.decode('utf-8')
- info = parser.parse_line(line)
- if info:
- info = info.__dict__
- dirlist[info['name']] = info
-
- try:
- self.ftp.dir(_encode(path), on_line)
- except error_reply:
- pass
- self._dircache[path] = dirlist
-
- return dirlist
-
- @synchronize
- def clear_dircache(self, path=None):
- """
- Clear cached directory information.
-
- :path: Path of directory to clear cache for, or all directories if
- None (the default)
-
- """
- if path is None:
- self._dircache.clear()
- if path in self._dircache:
- del self._dircache[path]
-
- @synchronize
- @ftperrors
- def _check_path(self, path, ignore_missing=False):
- base, fname = pathsplit(abspath(path))
- dirlist = self._readdir(base)
- if fname and fname not in dirlist:
- raise ResourceNotFoundError(path)
- return dirlist, fname
-
- def _get_dirlist(self, path):
- base, fname = pathsplit(abspath(path))
- dirlist = self._readdir(base)
- return dirlist, fname
-
- @synchronize
@ftperrors
def exists(self, path):
if path in ('', '/'):
return True
dirlist, fname = self._get_dirlist(path)
return fname in dirlist
-
- @synchronize
+
@ftperrors
def isdir(self, path):
if path in ('', '/'):
@@ -920,20 +969,18 @@ class FTPFS(FS):
if info is None:
return False
return info['try_cwd']
-
- @synchronize
+
@ftperrors
def isfile(self, path):
if path in ('', '/'):
return False
- dirlist, fname = self._get_dirlist(path)
+ dirlist, fname = self._get_dirlist(path)
info = dirlist.get(fname)
if info is None:
return False
return not info['try_cwd']
@ftperrors
- @synchronize
def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
path = normpath(path)
if not self.exists(path):
@@ -941,21 +988,21 @@ class FTPFS(FS):
if not self.isdir(path):
raise ResourceInvalidError(path)
paths = self._readdir(path).keys()
-
+
return self._listdir_helper(path, paths, wildcard, full, absolute, dirs_only, files_only)
-
-
+
+
@ftperrors
- @synchronize
def makedir(self, path, recursive=False, allow_recreate=False):
if path in ('', '/'):
return
def checkdir(path):
+ self.clear_dircache(dirname(path), path)
try:
self.ftp.mkd(_encode(path))
except error_reply:
return
- except error_perm, e:
+ except error_perm, e:
if recursive or allow_recreate:
return
if str(e).split(' ', 1)[0]=='550':
@@ -966,38 +1013,37 @@ class FTPFS(FS):
for p in recursepath(path):
checkdir(p)
else:
- base, dirname = pathsplit(path)
+ base = dirname(path)
if not self.exists(base):
raise ParentDirectoryMissingError(path)
-
- if not allow_recreate:
+
+ if not allow_recreate:
if self.exists(path):
if self.isfile(path):
raise ResourceInvalidError(path)
raise DestinationExistsError(path)
checkdir(path)
-
-
+
@ftperrors
- @synchronize
def remove(self, path):
if not self.exists(path):
raise ResourceNotFoundError(path)
if not self.isfile(path):
raise ResourceInvalidError(path)
+ self.clear_dircache(dirname(path))
self.ftp.delete(_encode(path))
-
+
@ftperrors
- @synchronize
def removedir(self, path, recursive=False, force=False):
+
if not self.exists(path):
raise ResourceNotFoundError(path)
if self.isfile(path):
raise ResourceInvalidError(path)
-
+
if not force:
for checkpath in self.listdir(path):
- raise DirectoryNotEmptyError(path)
+ raise DirectoryNotEmptyError(path)
try:
if force:
for rpath in self.listdir(path, full=True):
@@ -1008,6 +1054,7 @@ class FTPFS(FS):
self.removedir(rpath, force=force)
except FSError:
pass
+ self.clear_dircache(dirname(path), path)
self.ftp.rmd(_encode(path))
except error_reply:
pass
@@ -1016,17 +1063,16 @@ class FTPFS(FS):
self.removedir(dirname(path), recursive=True)
except DirectoryNotEmptyError:
pass
-
+
@ftperrors
- @synchronize
def rename(self, src, dst):
+ self.clear_dircache(dirname(src), dirname(dst), src, dst)
try:
self.ftp.rename(_encode(src), _encode(dst))
except error_reply:
pass
-
+
@ftperrors
- @synchronize
def getinfo(self, path):
dirlist, fname = self._check_path(path)
if not fname:
@@ -1035,76 +1081,93 @@ class FTPFS(FS):
info['modified_time'] = datetime.datetime.fromtimestamp(info['mtime'])
info['created_time'] = info['modified_time']
return info
-
+
@ftperrors
- @synchronize
- def getsize(self, path):
+ def getsize(self, path):
+
+ size = None
+ if self._locals._dircache_count:
+ dirlist, fname = self._check_path(path)
+ size = dirlist[fname].get('size')
+
+ if size is not None:
+ return size
+
self.ftp.sendcmd('TYPE I')
size = self.ftp.size(_encode(path))
if size is None:
- dirlist, fname = self._check_path(path)
+ dirlist, fname = self._check_path(path)
size = dirlist[fname].get('size')
if size is None:
raise OperationFailedError('getsize', path)
return size
-
+
@ftperrors
- @synchronize
def desc(self, path):
dirlist, fname = self._check_path(path)
if fname not in dirlist:
raise ResourceNotFoundError(path)
return dirlist[fname].get('raw_line', 'No description available')
- @ftperrors
- @synchronize
+ @ftperrors
def move(self, src, dst, overwrite=False, chunk_size=16384):
-
+
if not overwrite and self.exists(dst):
raise DestinationExistsError(dst)
- try:
+ self.clear_dircache(dirname(src), dirname(dst))
+ try:
self.rename(src, dst)
except error_reply:
pass
except:
self.copy(src, dst)
self.remove(src)
-
-
+
+ @ftperrors
+ def movedir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
+ self.clear_dircache(src, dst, dirname(src), dirname(dst))
+ super(FTPFS, self).movedir(src, dst, overwrite, ignore_errors, chunk_size)
+
+ @ftperrors
+ def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
+ self.clear_dircache(src, dst, dirname(src), dirname(dst))
+ super(FTPFS, self).copydir(src, dst, overwrite, ignore_errors, chunk_size)
+
+
if __name__ == "__main__":
-
+
ftp_fs = FTPFS('ftp.ncsa.uiuc.edu')
- #from fs.browsewin import browse
- #browse(ftp_fs)
-
- ftp_fs = FTPFS('127.0.0.1', 'user', '12345', dircache=True)
+ ftp_fs.cache_hint(True)
+ from fs.browsewin import browse
+ browse(ftp_fs)
+
+ #ftp_fs = FTPFS('127.0.0.1', 'user', '12345', dircache=True)
#f = ftp_fs.open('testout.txt', 'w')
#f.write("Testing writing to an ftp file!")
#f.write("\nHai!")
#f.close()
-
+
#ftp_fs.createfile(u"\N{GREEK CAPITAL LETTER KAPPA}", 'unicode!')
-
+
#kappa = u"\N{GREEK CAPITAL LETTER KAPPA}"
#ftp_fs.makedir(kappa)
-
+
#print repr(ftp_fs.listdir())
-
+
#print repr(ftp_fs.listdir())
-
+
#ftp_fs.makedir('a/b/c/d', recursive=True)
#print ftp_fs.getsize('/testout.txt')
-
-
+
+
#print f.read()
#for p in ftp_fs:
# print p
-
+
#from fs.utils import print_fs
#print_fs(ftp_fs)
-
+
#print ftp_fs.getsize('test.txt')
-
- from fs.browsewin import browse
- browse(ftp_fs)
- \ No newline at end of file
+
+ #from fs.browsewin import browse
+ #browse(ftp_fs)