diff options
-rw-r--r-- | fs/expose/xmlrpc.py | 7 | ||||
-rw-r--r-- | fs/rpcfs.py | 66 | ||||
-rw-r--r-- | fs/sftpfs.py | 27 | ||||
-rw-r--r-- | tox.ini | 20 |
4 files changed, 100 insertions, 20 deletions
diff --git a/fs/expose/xmlrpc.py b/fs/expose/xmlrpc.py index ea05d38..16af5a0 100644 --- a/fs/expose/xmlrpc.py +++ b/fs/expose/xmlrpc.py @@ -17,7 +17,7 @@ an FS object, which can then be exposed using whatever server you choose import xmlrpclib from SimpleXMLRPCServer import SimpleXMLRPCServer - +from datetime import datetime class RPCFSInterface(object): """Wrapper to expose an FS via a XML-RPC compatible interface. @@ -27,6 +27,7 @@ class RPCFSInterface(object): """ def __init__(self, fs): + super(RPCFSInterface, self).__init__() self.fs = fs def encode_path(self, path): @@ -98,6 +99,10 @@ class RPCFSInterface(object): def settimes(self, path, accessed_time, modified_time): path = self.decode_path(path) + if isinstance(accessed_time, xmlrpclib.DateTime): + accessed_time = datetime.strptime(accessed_time.value, "%Y%m%dT%H:%M:%S") + if isinstance(modified_time, xmlrpclib.DateTime): + modified_time = datetime.strptime(modified_time.value, "%Y%m%dT%H:%M:%S") return self.fs.settimes(path, accessed_time, modified_time) def getinfo(self, path): diff --git a/fs/rpcfs.py b/fs/rpcfs.py index 405b035..99267b2 100644 --- a/fs/rpcfs.py +++ b/fs/rpcfs.py @@ -10,6 +10,7 @@ class from the :mod:`fs.expose.xmlrpc` module. import xmlrpclib import socket +import threading from fs.base import * from fs.errors import * @@ -106,14 +107,15 @@ class RPCFS(FS): :param uri: address of the server """ + super(RPCFS, self).__init__(thread_synchronize=True) self.uri = uri self._transport = transport - self.proxy = self._make_proxy() - FS.__init__(self,thread_synchronize=False) + self.proxy = self._make_proxy() self.isdir('/') + @synchronize def _make_proxy(self): - kwds = dict(allow_none=True) + kwds = dict(allow_none=True, use_datetime=True) if self._transport is not None: proxy = xmlrpclib.ServerProxy(self.uri,self._transport,**kwds) @@ -125,6 +127,7 @@ class RPCFS(FS): def __str__(self): return '<RPCFS: %s>' % (self.uri,) + @synchronize def __getstate__(self): state = super(RPCFS,self).__getstate__() try: @@ -132,11 +135,12 @@ class RPCFS(FS): except KeyError: pass return state - + def __setstate__(self, state): for (k,v) in state.iteritems(): self.__dict__[k] = v - self.proxy = self._make_proxy() + self._lock = threading.RLock() + self.proxy = self._make_proxy() def encode_path(self, path): """Encode a filesystem path for sending over the wire. @@ -151,14 +155,18 @@ class RPCFS(FS): """Decode paths arriving over the wire.""" return path.decode("base64").decode("utf8") + @synchronize def getmeta(self, meta_name, default=NoDefaultMeta): if default is NoDefaultMeta: return self.proxy.getmeta(meta_name) else: - return self.proxy.getmeta_default(meta_name, default) + return self.proxy.getmeta_default(meta_name, default) + + @synchronize def hasmeta(self, meta_name): return self.proxy.hasmeta(meta_name) + @synchronize def open(self, path, mode="r"): # TODO: chunked transport of large files path = self.encode_path(path) @@ -182,33 +190,50 @@ class RPCFS(FS): f.seek(0,2) oldflush = f.flush oldclose = f.close - oldtruncate = f.truncate + oldtruncate = f.truncate def newflush(): - oldflush() - self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue())) + self._lock.acquire() + try: + oldflush() + self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue())) + finally: + self._lock.release() def newclose(): - f.flush() - oldclose() + self._lock.acquire() + try: + f.flush() + oldclose() + finally: + self._lock.release() def newtruncate(size=None): - oldtruncate(size) - f.flush() + self._lock.acquire() + try: + oldtruncate(size) + f.flush() + finally: + self._lock.release() + f.flush = newflush f.close = newclose f.truncate = newtruncate return f + @synchronize def exists(self, path): path = self.encode_path(path) return self.proxy.exists(path) + @synchronize def isdir(self, path): path = self.encode_path(path) return self.proxy.isdir(path) + @synchronize def isfile(self, path): path = self.encode_path(path) return self.proxy.isfile(path) + @synchronize def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False): enc_path = self.encode_path(path) if not callable(wildcard): @@ -226,69 +251,84 @@ class RPCFS(FS): entries = [abspath(pathjoin(path,e)) for e in entries] return entries + @synchronize def makedir(self, path, recursive=False, allow_recreate=False): path = self.encode_path(path) return self.proxy.makedir(path,recursive,allow_recreate) + @synchronize def remove(self, path): path = self.encode_path(path) return self.proxy.remove(path) + @synchronize def removedir(self, path, recursive=False, force=False): path = self.encode_path(path) return self.proxy.removedir(path,recursive,force) + @synchronize def rename(self, src, dst): src = self.encode_path(src) dst = self.encode_path(dst) return self.proxy.rename(src,dst) + @synchronize def settimes(self, path, accessed_time, modified_time): path = self.encode_path(path) return self.proxy.settimes(path, accessed_time, modified_time) + @synchronize def getinfo(self, path): path = self.encode_path(path) return self.proxy.getinfo(path) + @synchronize def desc(self, path): path = self.encode_path(path) return self.proxy.desc(path) + @synchronize def getxattr(self, path, attr, default=None): path = self.encode_path(path) attr = self.encode_path(attr) return self.fs.getxattr(path,attr,default) + @synchronize def setxattr(self, path, attr, value): path = self.encode_path(path) attr = self.encode_path(attr) return self.fs.setxattr(path,attr,value) + @synchronize def delxattr(self, path, attr): path = self.encode_path(path) attr = self.encode_path(attr) return self.fs.delxattr(path,attr) + @synchronize def listxattrs(self, path): path = self.encode_path(path) return [self.decode_path(a) for a in self.fs.listxattrs(path)] + @synchronize def copy(self, src, dst, overwrite=False, chunk_size=16384): src = self.encode_path(src) dst = self.encode_path(dst) return self.proxy.copy(src,dst,overwrite,chunk_size) + @synchronize def move(self, src, dst, overwrite=False, chunk_size=16384): src = self.encode_path(src) dst = self.encode_path(dst) return self.proxy.move(src,dst,overwrite,chunk_size) + @synchronize def movedir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384): src = self.encode_path(src) dst = self.encode_path(dst) return self.proxy.movedir(src, dst, overwrite, ignore_errors, chunk_size) + @synchronize def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384): src = self.encode_path(src) dst = self.encode_path(dst) diff --git a/fs/sftpfs.py b/fs/sftpfs.py index 2a2f807..18a8f72 100644 --- a/fs/sftpfs.py +++ b/fs/sftpfs.py @@ -198,16 +198,18 @@ class SFTPFS(FS): def __del__(self): self.close() + @synchronize def __getstate__(self): state = super(SFTPFS,self).__getstate__() del state["_tlocal"] if self._owns_transport: state['_transport'] = self._transport.getpeername() return state - + def __setstate__(self,state): for (k,v) in state.iteritems(): self.__dict__[k] = v + self._lock = threading.RLock() self._tlocal = thread_local() if self._owns_transport: self._transport = paramiko.Transport(self._transport) @@ -218,12 +220,13 @@ class SFTPFS(FS): try: return self._tlocal.client except AttributeError: - if self._transport is None: - return self._client + #if self._transport is None: + # return self._client client = paramiko.SFTPClient.from_transport(self._transport) self._tlocal.client = client return client + @synchronize def close(self): """Close the connection to the remote server.""" if not self.closed: @@ -256,6 +259,7 @@ class SFTPFS(FS): url = 'sftp://%s%s' % (self.hostname.rstrip('/'), abspath(path)) return url + @synchronize @convert_os_errors def open(self,path,mode="rb",bufsize=-1): npath = self._normpath(path) @@ -275,6 +279,7 @@ class SFTPFS(FS): f.truncate = new_truncate return f + @synchronize def desc(self, path): npath = self._normpath(path) if self.hostname: @@ -283,6 +288,7 @@ class SFTPFS(FS): addr, port = self._transport.getpeername() return u'sftp://%s:%i%s' % (addr, port, self.client.normalize(npath)) + @synchronize @convert_os_errors def exists(self,path): if path in ('', '/'): @@ -296,6 +302,7 @@ class SFTPFS(FS): raise return True + @synchronize @convert_os_errors def isdir(self,path): if path in ('', '/'): @@ -309,6 +316,7 @@ class SFTPFS(FS): raise return statinfo.S_ISDIR(stat.st_mode) + @synchronize @convert_os_errors def isfile(self,path): npath = self._normpath(path) @@ -320,6 +328,7 @@ class SFTPFS(FS): raise return statinfo.S_ISREG(stat.st_mode) + @synchronize @convert_os_errors def listdir(self,path="./",wildcard=None,full=False,absolute=False,dirs_only=False,files_only=False): npath = self._normpath(path) @@ -360,7 +369,7 @@ class SFTPFS(FS): return self._listdir_helper(path, paths, wildcard, full, absolute, False, False) - + @synchronize @convert_os_errors def listdirinfo(self,path="./",wildcard=None,full=False,absolute=False,dirs_only=False,files_only=False): npath = self._normpath(path) @@ -404,12 +413,13 @@ class SFTPFS(FS): return [(p, getinfo(p)) for p in self._listdir_helper(path, paths, wildcard, full, absolute, False, False)] + @synchronize @convert_os_errors def makedir(self,path,recursive=False,allow_recreate=False): npath = self._normpath(path) try: self.client.mkdir(npath) - except IOError, e: + except IOError, _e: # Error code is unreliable, try to figure out what went wrong try: stat = self.client.stat(npath) @@ -431,6 +441,7 @@ class SFTPFS(FS): else: raise ResourceInvalidError(path,msg="Can't create directory, there's already a file of that name: %(path)s") + @synchronize @convert_os_errors def remove(self,path): npath = self._normpath(path) @@ -443,6 +454,7 @@ class SFTPFS(FS): raise ResourceInvalidError(path,msg="Cannot use remove() on a directory: %(path)s") raise + @synchronize @convert_os_errors def removedir(self,path,recursive=False,force=False): npath = self._normpath(path) @@ -473,6 +485,7 @@ class SFTPFS(FS): except DirectoryNotEmptyError: pass + @synchronize @convert_os_errors def rename(self,src,dst): nsrc = self._normpath(src) @@ -486,6 +499,7 @@ class SFTPFS(FS): raise ParentDirectoryMissingError(dst) raise + @synchronize @convert_os_errors def move(self,src,dst,overwrite=False,chunk_size=16384): nsrc = self._normpath(src) @@ -503,6 +517,7 @@ class SFTPFS(FS): raise ParentDirectoryMissingError(dst,msg="Destination directory does not exist: %(path)s") raise + @synchronize @convert_os_errors def movedir(self,src,dst,overwrite=False,ignore_errors=False,chunk_size=16384): nsrc = self._normpath(src) @@ -537,6 +552,7 @@ class SFTPFS(FS): info['modified_time'] = fromtimestamp(mt) return info + @synchronize @convert_os_errors def getinfo(self, path): npath = self._normpath(path) @@ -554,6 +570,7 @@ class SFTPFS(FS): info['modified_time'] = datetime.datetime.fromtimestamp(mt) return info + @synchronize @convert_os_errors def getsize(self, path): npath = self._normpath(path) @@ -1,4 +1,22 @@ [tox] envlist = py25,py26,py27 [testenv] -commands = nosetests +deps=dexml + paramiko + boto + nose + mako + pyftpdlib + +[testenv:py25] +commands = nosetests -v \ + [] + +[testenv:py26] +commands = nosetests -v \ + [] + +[testenv:py27] +commands = nosetests -v \ + [] + |