diff options
author | willmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f> | 2010-01-02 18:06:03 +0000 |
---|---|---|
committer | willmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f> | 2010-01-02 18:06:03 +0000 |
commit | b7dbf13f40bf766fc19ce43d7861b98ca9edbcda (patch) | |
tree | 02a42783f330d45f76dd51cb0f9b00588f817f44 /fs/ftpfs.py | |
parent | 671cf321fcc774c03d093fbae257e71b1df3d0ea (diff) | |
download | pyfilesystem-b7dbf13f40bf766fc19ce43d7861b98ca9edbcda.tar.gz |
Optimized FTP fs by caching directory structure
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@310 67cdc799-7952-0410-af00-57a81ceafa0f
Diffstat (limited to 'fs/ftpfs.py')
-rw-r--r-- | fs/ftpfs.py | 441 |
1 files changed, 252 insertions, 189 deletions
diff --git a/fs/ftpfs.py b/fs/ftpfs.py index 21de57d..cb5c297 100644 --- a/fs/ftpfs.py +++ b/fs/ftpfs.py @@ -22,7 +22,7 @@ try: from cStringIO import StringIO except ImportError: from StringIO import StringIO - + import time import sys @@ -33,7 +33,7 @@ import sys class Enum(object): def __init__(self, *names): self._names_map = dict((name, i) for i, name in enumerate(names)) - + def __getattr__(self, name): return self._names_map[name] @@ -512,13 +512,13 @@ def _skip(s, i, c): - + class _FTPFile(object): - + """ A file-like that provides access to a file being streamed over ftp.""" - + def __init__(self, ftpfs, ftp, path, mode): if not hasattr(self, '_lock'): self._lock = threading.RLock() @@ -532,14 +532,14 @@ class _FTPFile(object): if 'r' in mode or 'a' in mode: self.file_size = ftpfs.getsize(path) self.conn = None - + path = _encode(path) #self._lock = ftpfs._lock - + if 'r' in mode: self.ftp.voidcmd('TYPE I') self.conn = ftp.transfercmd('RETR '+path, None) - + #self._ftp_thread = threading.Thread(target=do_read) #self._ftp_thread.start() elif 'w' in mode or 'a' in mode: @@ -556,17 +556,17 @@ class _FTPFile(object): # if callback: callback(buf) #conn.close() #return self.voidresp() - + #self._ftp_thread = threading.Thread(target=do_write) #self._ftp_thread.start() - - @synchronize + + @synchronize def read(self, size=None): if self.conn is None: return '' - + chunks = [] - if size is None: + if size is None: while 1: data = self.conn.recv(4096) if not data: @@ -575,9 +575,9 @@ class _FTPFile(object): self.ftp.voidresp() break chunks.append(data) - self.read_pos += len(data) + self.read_pos += len(data) return ''.join(chunks) - + remaining_bytes = size while remaining_bytes: read_size = min(remaining_bytes, 4096) @@ -590,42 +590,42 @@ class _FTPFile(object): chunks.append(data) self.read_pos += len(data) remaining_bytes -= len(data) - + return ''.join(chunks) - @synchronize + @synchronize def write(self, data): - + data_pos = 0 remaining_data = len(data) - + while remaining_data: chunk_size = min(remaining_data, 4096) self.conn.sendall(data[data_pos:data_pos+chunk_size]) data_pos += chunk_size remaining_data -= chunk_size self.write_pos += chunk_size - + def __enter__(self): return self def __exit__(self,exc_type,exc_value,traceback): self.close() - + @synchronize def flush(self): - return - + return + def seek(self, pos, where=fs.SEEK_SET): # Ftp doesn't support a real seek, so we close the transfer and resume # it at the new position with the REST command # I'm not sure how reliable this method is! if not self.file_size: raise ValueError("Seek only works with files open for read") - + self._lock.acquire() try: - + current = self.tell() new_pos = None if where == fs.SEEK_SET: @@ -636,49 +636,51 @@ class _FTPFile(object): new_pos = self.file_size + pos if new_pos < 0: raise ValueError("Can't seek before start of file") - + if self.conn is not None: self.conn.close() - + finally: self._lock.release() - + self.close() self._lock.acquire() try: self.ftp = self.ftpfs._open_ftp() self.ftp.sendcmd('TYPE I') - self.ftp.sendcmd('REST %i' % (new_pos)) + self.ftp.sendcmd('REST %i' % (new_pos)) self.__init__(self.ftpfs, self.ftp, _encode(self.path), self.mode) self.read_pos = new_pos finally: self._lock.release() - + #raise UnsupportedError('ftp seek') - + @synchronize def tell(self): if 'r' in self.mode: return self.read_pos else: return self.write_pos - - @synchronize + + @synchronize def close(self): - if self.conn is not None: + if self.conn is not None: self.conn.close() self.conn = None self.ftp.voidresp() if self.ftp is not None: self.ftp.close() self.closed = True - + if 'w' in self.mode or 'a' in self.mode: + self.ftpfs._on_file_written(self.path) + def __iter__(self): return self.next() - + def next(self): """ Line iterator - + This isn't terribly efficient. It would probably be better to do a read followed by splitlines. """ @@ -686,13 +688,13 @@ class _FTPFile(object): chars = [] while True: char = self.read(1) - if not char: + if not char: yield ''.join(chars) del chars[:] break chars.append(char) if char in endings: - line = ''.join(chars) + line = ''.join(chars) del chars[:] c = self.read(1) if not char: @@ -702,23 +704,31 @@ class _FTPFile(object): yield line + c else: yield line - chars.append(c) - + chars.append(c) + def ftperrors(f): @wraps(f) def deco(self, *args, **kwargs): + self._lock.acquire() try: - ret = f(self, *args, **kwargs) - except Exception, e: - #import traceback - #traceback.print_exc() - self._translate_exception(args[0] if args else '', e) + self._enter_dircache() + try: + try: + ret = f(self, *args, **kwargs) + except Exception, e: + self._translate_exception(args[0] if args else '', e) + finally: + self._leave_dircache() + finally: + self._lock.release() + if not self.use_dircache: + self.clear_dircache() return ret return deco - - + + def _encode(s): if isinstance(s, unicode): return s.encode('utf-8') @@ -726,12 +736,12 @@ def _encode(s): class FTPFS(FS): - + _locals = threading.local() - + def __init__(self, host='', user='', passwd='', acct='', timeout=_GLOBAL_DEFAULT_TIMEOUT, port=21, - dircache=False, + dircache=True, max_buffer_size=128*1024*1024): """ :param host: @@ -739,30 +749,124 @@ class FTPFS(FS): :param passwd: :param timeout: :param dircache: If True then directory information will be cached, - which will speed up operations such as isdir and isfile, but changes - to the ftp file structure will not be visible (till clear_dircache) is + which will speed up operations such as getinfo, isdi, isfile, but changes + to the ftp file structure will not be visible untill clear_dircache is called :param max_buffer_size: Number of bytes to hold before blocking write operations. - + """ - + super(FTPFS, self).__init__() - + self.host = host self.port = port self.user = user self.passwd = passwd self.acct = acct self.timeout = timeout - - self._dircache = {} + self.use_dircache = dircache + self.get_dircache() + self.max_buffer_size = max_buffer_size - - self._locals._ftp = None + + self._cache_hint = False + self._locals._ftp = None self._thread_ftps = set() self.ftp + @synchronize + def cache_hint(self, enabled): + self._cache_hint = enabled + + @synchronize + def _enter_dircache(self): + self.get_dircache() + count = getattr(self._locals, '_dircache_count', 0) + count += 1 + self._locals._dircache_count = count + + @synchronize + def _leave_dircache(self): + self._locals._dircache_count -= 1 + if not self._locals._dircache_count and not self._cache_hint: + self.clear_dircache() + assert self._locals._dircache_count >= 0, "dircache count should never be negative" + + @synchronize + def get_dircache(self): + dircache = getattr(self._locals, '_dircache', None) + if dircache is None: + dircache = {} + self._locals._dircache = dircache + self._locals._dircache_count = 0 + return dircache + + @synchronize + def _on_file_written(self, path): + self.clear_dircache(dirname(path)) + + + @synchronize + def _readdir(self, path): + + dircache = self.get_dircache() + dircache_count = self._locals._dircache_count + + if dircache_count: + cached_dirlist = dircache.get(path) + if cached_dirlist is not None: + return cached_dirlist + dirlist = {} + + parser = FTPListDataParser() + + def on_line(line): + #print repr(line) + if not isinstance(line, unicode): + line = line.decode('utf-8') + info = parser.parse_line(line) + if info: + info = info.__dict__ + dirlist[info['name']] = info + + try: + self.ftp.dir(_encode(path), on_line) + except error_reply: + pass + dircache[path] = dirlist + + return dirlist + + @synchronize + def clear_dircache(self, *paths): + """ + Clear cached directory information. + + :path: Path of directory to clear cache for, or all directories if + None (the default) + + """ + dircache = self.get_dircache() + if not paths: + dircache.clear() + else: + for path in paths: + dircache.pop(path, None) + + @synchronize + def _check_path(self, path): + base, fname = pathsplit(abspath(path)) + dirlist = self._readdir(base) + if fname and fname not in dirlist: + raise ResourceNotFoundError(path) + return dirlist, fname + + def _get_dirlist(self, path): + base, fname = pathsplit(abspath(path)) + dirlist = self._readdir(base) + return dirlist, fname + @synchronize def get_ftp(self): @@ -771,6 +875,7 @@ class FTPFS(FS): ftp = self._locals._ftp self._thread_ftps.add(ftp) return self._locals._ftp + @synchronize def set_ftp(self, ftp): self._locals._ftp = ftp ftp = property(get_ftp, set_ftp) @@ -780,11 +885,11 @@ class FTPFS(FS): try: ftp = FTP() ftp.connect(self.host, self.port, self.timeout) - ftp.login(self.user, self.passwd, self.acct) - except socket_error, e: + ftp.login(self.user, self.passwd, self.acct) + except socket_error, e: raise RemoteConnectionError(str(e), details=e) return ftp - + def __getstate__(self): state = super(FTPFS, self).__getstate__() del state["_thread_ftps"] @@ -792,33 +897,33 @@ class FTPFS(FS): def __setstate__(self,state): super(FTPFS, self).__setstate__(state) - self._thread_ftps = set() + self._thread_ftps = set() self.ftp - + def __str__(self): return '<FTPFS %s>' % self.host - + def __unicode__(self): return u'<FTPFS %s>' % self.host - + @convert_os_errors def _translate_exception(self, path, exception): - + """ Translates exceptions that my be thrown by the ftp code in to FS exceptions - + TODO: Flesh this out with more specific exceptions - + """ if isinstance(exception, socket_error): raise RemoteConnectionError(str(exception), details=exception) - + elif isinstance(exception, error_temp): code, message = str(exception).split(' ', 1) raise RemoteConnectionError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception) - + elif isinstance(exception, error_perm): code, message = str(exception).split(' ', 1) code = int(code) @@ -826,91 +931,35 @@ class FTPFS(FS): raise ResourceNotFoundError(path) raise PermissionDeniedError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception) - raise exception - + @ftperrors - @synchronize def close(self): for ftp in self._thread_ftps: ftp.close() + + self._thread_ftps.clear() self.closed = True - + @ftperrors - @synchronize def open(self, path, mode='r'): + mode = mode.lower() if 'r' in mode: if not self.isfile(path): raise ResourceNotFoundError(path) + if 'w' in mode or 'a' in mode: + self.clear_dircache(dirname(path)) ftp = self._open_ftp() f = _FTPFile(self, ftp, path, mode) return f - @synchronize - def _readdir(self, path): - - if self.use_dircache: - cached_dirlist = self._dircache.get(path) - if cached_dirlist is not None: - return cached_dirlist - dirlist = {} - - parser = FTPListDataParser() - - def on_line(line): - #print repr(line) - if not isinstance(line, unicode): - line = line.decode('utf-8') - info = parser.parse_line(line) - if info: - info = info.__dict__ - dirlist[info['name']] = info - - try: - self.ftp.dir(_encode(path), on_line) - except error_reply: - pass - self._dircache[path] = dirlist - - return dirlist - - @synchronize - def clear_dircache(self, path=None): - """ - Clear cached directory information. - - :path: Path of directory to clear cache for, or all directories if - None (the default) - - """ - if path is None: - self._dircache.clear() - if path in self._dircache: - del self._dircache[path] - - @synchronize - @ftperrors - def _check_path(self, path, ignore_missing=False): - base, fname = pathsplit(abspath(path)) - dirlist = self._readdir(base) - if fname and fname not in dirlist: - raise ResourceNotFoundError(path) - return dirlist, fname - - def _get_dirlist(self, path): - base, fname = pathsplit(abspath(path)) - dirlist = self._readdir(base) - return dirlist, fname - - @synchronize @ftperrors def exists(self, path): if path in ('', '/'): return True dirlist, fname = self._get_dirlist(path) return fname in dirlist - - @synchronize + @ftperrors def isdir(self, path): if path in ('', '/'): @@ -920,20 +969,18 @@ class FTPFS(FS): if info is None: return False return info['try_cwd'] - - @synchronize + @ftperrors def isfile(self, path): if path in ('', '/'): return False - dirlist, fname = self._get_dirlist(path) + dirlist, fname = self._get_dirlist(path) info = dirlist.get(fname) if info is None: return False return not info['try_cwd'] @ftperrors - @synchronize def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False): path = normpath(path) if not self.exists(path): @@ -941,21 +988,21 @@ class FTPFS(FS): if not self.isdir(path): raise ResourceInvalidError(path) paths = self._readdir(path).keys() - + return self._listdir_helper(path, paths, wildcard, full, absolute, dirs_only, files_only) - - + + @ftperrors - @synchronize def makedir(self, path, recursive=False, allow_recreate=False): if path in ('', '/'): return def checkdir(path): + self.clear_dircache(dirname(path), path) try: self.ftp.mkd(_encode(path)) except error_reply: return - except error_perm, e: + except error_perm, e: if recursive or allow_recreate: return if str(e).split(' ', 1)[0]=='550': @@ -966,38 +1013,37 @@ class FTPFS(FS): for p in recursepath(path): checkdir(p) else: - base, dirname = pathsplit(path) + base = dirname(path) if not self.exists(base): raise ParentDirectoryMissingError(path) - - if not allow_recreate: + + if not allow_recreate: if self.exists(path): if self.isfile(path): raise ResourceInvalidError(path) raise DestinationExistsError(path) checkdir(path) - - + @ftperrors - @synchronize def remove(self, path): if not self.exists(path): raise ResourceNotFoundError(path) if not self.isfile(path): raise ResourceInvalidError(path) + self.clear_dircache(dirname(path)) self.ftp.delete(_encode(path)) - + @ftperrors - @synchronize def removedir(self, path, recursive=False, force=False): + if not self.exists(path): raise ResourceNotFoundError(path) if self.isfile(path): raise ResourceInvalidError(path) - + if not force: for checkpath in self.listdir(path): - raise DirectoryNotEmptyError(path) + raise DirectoryNotEmptyError(path) try: if force: for rpath in self.listdir(path, full=True): @@ -1008,6 +1054,7 @@ class FTPFS(FS): self.removedir(rpath, force=force) except FSError: pass + self.clear_dircache(dirname(path), path) self.ftp.rmd(_encode(path)) except error_reply: pass @@ -1016,17 +1063,16 @@ class FTPFS(FS): self.removedir(dirname(path), recursive=True) except DirectoryNotEmptyError: pass - + @ftperrors - @synchronize def rename(self, src, dst): + self.clear_dircache(dirname(src), dirname(dst), src, dst) try: self.ftp.rename(_encode(src), _encode(dst)) except error_reply: pass - + @ftperrors - @synchronize def getinfo(self, path): dirlist, fname = self._check_path(path) if not fname: @@ -1035,76 +1081,93 @@ class FTPFS(FS): info['modified_time'] = datetime.datetime.fromtimestamp(info['mtime']) info['created_time'] = info['modified_time'] return info - + @ftperrors - @synchronize - def getsize(self, path): + def getsize(self, path): + + size = None + if self._locals._dircache_count: + dirlist, fname = self._check_path(path) + size = dirlist[fname].get('size') + + if size is not None: + return size + self.ftp.sendcmd('TYPE I') size = self.ftp.size(_encode(path)) if size is None: - dirlist, fname = self._check_path(path) + dirlist, fname = self._check_path(path) size = dirlist[fname].get('size') if size is None: raise OperationFailedError('getsize', path) return size - + @ftperrors - @synchronize def desc(self, path): dirlist, fname = self._check_path(path) if fname not in dirlist: raise ResourceNotFoundError(path) return dirlist[fname].get('raw_line', 'No description available') - @ftperrors - @synchronize + @ftperrors def move(self, src, dst, overwrite=False, chunk_size=16384): - + if not overwrite and self.exists(dst): raise DestinationExistsError(dst) - try: + self.clear_dircache(dirname(src), dirname(dst)) + try: self.rename(src, dst) except error_reply: pass except: self.copy(src, dst) self.remove(src) - - + + @ftperrors + def movedir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384): + self.clear_dircache(src, dst, dirname(src), dirname(dst)) + super(FTPFS, self).movedir(src, dst, overwrite, ignore_errors, chunk_size) + + @ftperrors + def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384): + self.clear_dircache(src, dst, dirname(src), dirname(dst)) + super(FTPFS, self).copydir(src, dst, overwrite, ignore_errors, chunk_size) + + if __name__ == "__main__": - + ftp_fs = FTPFS('ftp.ncsa.uiuc.edu') - #from fs.browsewin import browse - #browse(ftp_fs) - - ftp_fs = FTPFS('127.0.0.1', 'user', '12345', dircache=True) + ftp_fs.cache_hint(True) + from fs.browsewin import browse + browse(ftp_fs) + + #ftp_fs = FTPFS('127.0.0.1', 'user', '12345', dircache=True) #f = ftp_fs.open('testout.txt', 'w') #f.write("Testing writing to an ftp file!") #f.write("\nHai!") #f.close() - + #ftp_fs.createfile(u"\N{GREEK CAPITAL LETTER KAPPA}", 'unicode!') - + #kappa = u"\N{GREEK CAPITAL LETTER KAPPA}" #ftp_fs.makedir(kappa) - + #print repr(ftp_fs.listdir()) - + #print repr(ftp_fs.listdir()) - + #ftp_fs.makedir('a/b/c/d', recursive=True) #print ftp_fs.getsize('/testout.txt') - - + + #print f.read() #for p in ftp_fs: # print p - + #from fs.utils import print_fs #print_fs(ftp_fs) - + #print ftp_fs.getsize('test.txt') - - from fs.browsewin import browse - browse(ftp_fs) -
\ No newline at end of file + + #from fs.browsewin import browse + #browse(ftp_fs) |