summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--fs/expose/xmlrpc.py7
-rw-r--r--fs/rpcfs.py66
-rw-r--r--fs/sftpfs.py27
-rw-r--r--tox.ini20
4 files changed, 100 insertions, 20 deletions
diff --git a/fs/expose/xmlrpc.py b/fs/expose/xmlrpc.py
index ea05d38..16af5a0 100644
--- a/fs/expose/xmlrpc.py
+++ b/fs/expose/xmlrpc.py
@@ -17,7 +17,7 @@ an FS object, which can then be exposed using whatever server you choose
import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCServer
-
+from datetime import datetime
class RPCFSInterface(object):
"""Wrapper to expose an FS via a XML-RPC compatible interface.
@@ -27,6 +27,7 @@ class RPCFSInterface(object):
"""
def __init__(self, fs):
+ super(RPCFSInterface, self).__init__()
self.fs = fs
def encode_path(self, path):
@@ -98,6 +99,10 @@ class RPCFSInterface(object):
def settimes(self, path, accessed_time, modified_time):
path = self.decode_path(path)
+ if isinstance(accessed_time, xmlrpclib.DateTime):
+ accessed_time = datetime.strptime(accessed_time.value, "%Y%m%dT%H:%M:%S")
+ if isinstance(modified_time, xmlrpclib.DateTime):
+ modified_time = datetime.strptime(modified_time.value, "%Y%m%dT%H:%M:%S")
return self.fs.settimes(path, accessed_time, modified_time)
def getinfo(self, path):
diff --git a/fs/rpcfs.py b/fs/rpcfs.py
index 405b035..99267b2 100644
--- a/fs/rpcfs.py
+++ b/fs/rpcfs.py
@@ -10,6 +10,7 @@ class from the :mod:`fs.expose.xmlrpc` module.
import xmlrpclib
import socket
+import threading
from fs.base import *
from fs.errors import *
@@ -106,14 +107,15 @@ class RPCFS(FS):
:param uri: address of the server
"""
+ super(RPCFS, self).__init__(thread_synchronize=True)
self.uri = uri
self._transport = transport
- self.proxy = self._make_proxy()
- FS.__init__(self,thread_synchronize=False)
+ self.proxy = self._make_proxy()
self.isdir('/')
+ @synchronize
def _make_proxy(self):
- kwds = dict(allow_none=True)
+ kwds = dict(allow_none=True, use_datetime=True)
if self._transport is not None:
proxy = xmlrpclib.ServerProxy(self.uri,self._transport,**kwds)
@@ -125,6 +127,7 @@ class RPCFS(FS):
def __str__(self):
return '<RPCFS: %s>' % (self.uri,)
+ @synchronize
def __getstate__(self):
state = super(RPCFS,self).__getstate__()
try:
@@ -132,11 +135,12 @@ class RPCFS(FS):
except KeyError:
pass
return state
-
+
def __setstate__(self, state):
for (k,v) in state.iteritems():
self.__dict__[k] = v
- self.proxy = self._make_proxy()
+ self._lock = threading.RLock()
+ self.proxy = self._make_proxy()
def encode_path(self, path):
"""Encode a filesystem path for sending over the wire.
@@ -151,14 +155,18 @@ class RPCFS(FS):
"""Decode paths arriving over the wire."""
return path.decode("base64").decode("utf8")
+ @synchronize
def getmeta(self, meta_name, default=NoDefaultMeta):
if default is NoDefaultMeta:
return self.proxy.getmeta(meta_name)
else:
- return self.proxy.getmeta_default(meta_name, default)
+ return self.proxy.getmeta_default(meta_name, default)
+
+ @synchronize
def hasmeta(self, meta_name):
return self.proxy.hasmeta(meta_name)
+ @synchronize
def open(self, path, mode="r"):
# TODO: chunked transport of large files
path = self.encode_path(path)
@@ -182,33 +190,50 @@ class RPCFS(FS):
f.seek(0,2)
oldflush = f.flush
oldclose = f.close
- oldtruncate = f.truncate
+ oldtruncate = f.truncate
def newflush():
- oldflush()
- self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue()))
+ self._lock.acquire()
+ try:
+ oldflush()
+ self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue()))
+ finally:
+ self._lock.release()
def newclose():
- f.flush()
- oldclose()
+ self._lock.acquire()
+ try:
+ f.flush()
+ oldclose()
+ finally:
+ self._lock.release()
def newtruncate(size=None):
- oldtruncate(size)
- f.flush()
+ self._lock.acquire()
+ try:
+ oldtruncate(size)
+ f.flush()
+ finally:
+ self._lock.release()
+
f.flush = newflush
f.close = newclose
f.truncate = newtruncate
return f
+ @synchronize
def exists(self, path):
path = self.encode_path(path)
return self.proxy.exists(path)
+ @synchronize
def isdir(self, path):
path = self.encode_path(path)
return self.proxy.isdir(path)
+ @synchronize
def isfile(self, path):
path = self.encode_path(path)
return self.proxy.isfile(path)
+ @synchronize
def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
enc_path = self.encode_path(path)
if not callable(wildcard):
@@ -226,69 +251,84 @@ class RPCFS(FS):
entries = [abspath(pathjoin(path,e)) for e in entries]
return entries
+ @synchronize
def makedir(self, path, recursive=False, allow_recreate=False):
path = self.encode_path(path)
return self.proxy.makedir(path,recursive,allow_recreate)
+ @synchronize
def remove(self, path):
path = self.encode_path(path)
return self.proxy.remove(path)
+ @synchronize
def removedir(self, path, recursive=False, force=False):
path = self.encode_path(path)
return self.proxy.removedir(path,recursive,force)
+ @synchronize
def rename(self, src, dst):
src = self.encode_path(src)
dst = self.encode_path(dst)
return self.proxy.rename(src,dst)
+ @synchronize
def settimes(self, path, accessed_time, modified_time):
path = self.encode_path(path)
return self.proxy.settimes(path, accessed_time, modified_time)
+ @synchronize
def getinfo(self, path):
path = self.encode_path(path)
return self.proxy.getinfo(path)
+ @synchronize
def desc(self, path):
path = self.encode_path(path)
return self.proxy.desc(path)
+ @synchronize
def getxattr(self, path, attr, default=None):
path = self.encode_path(path)
attr = self.encode_path(attr)
return self.fs.getxattr(path,attr,default)
+ @synchronize
def setxattr(self, path, attr, value):
path = self.encode_path(path)
attr = self.encode_path(attr)
return self.fs.setxattr(path,attr,value)
+ @synchronize
def delxattr(self, path, attr):
path = self.encode_path(path)
attr = self.encode_path(attr)
return self.fs.delxattr(path,attr)
+ @synchronize
def listxattrs(self, path):
path = self.encode_path(path)
return [self.decode_path(a) for a in self.fs.listxattrs(path)]
+ @synchronize
def copy(self, src, dst, overwrite=False, chunk_size=16384):
src = self.encode_path(src)
dst = self.encode_path(dst)
return self.proxy.copy(src,dst,overwrite,chunk_size)
+ @synchronize
def move(self, src, dst, overwrite=False, chunk_size=16384):
src = self.encode_path(src)
dst = self.encode_path(dst)
return self.proxy.move(src,dst,overwrite,chunk_size)
+ @synchronize
def movedir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
src = self.encode_path(src)
dst = self.encode_path(dst)
return self.proxy.movedir(src, dst, overwrite, ignore_errors, chunk_size)
+ @synchronize
def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
src = self.encode_path(src)
dst = self.encode_path(dst)
diff --git a/fs/sftpfs.py b/fs/sftpfs.py
index 2a2f807..18a8f72 100644
--- a/fs/sftpfs.py
+++ b/fs/sftpfs.py
@@ -198,16 +198,18 @@ class SFTPFS(FS):
def __del__(self):
self.close()
+ @synchronize
def __getstate__(self):
state = super(SFTPFS,self).__getstate__()
del state["_tlocal"]
if self._owns_transport:
state['_transport'] = self._transport.getpeername()
return state
-
+
def __setstate__(self,state):
for (k,v) in state.iteritems():
self.__dict__[k] = v
+ self._lock = threading.RLock()
self._tlocal = thread_local()
if self._owns_transport:
self._transport = paramiko.Transport(self._transport)
@@ -218,12 +220,13 @@ class SFTPFS(FS):
try:
return self._tlocal.client
except AttributeError:
- if self._transport is None:
- return self._client
+ #if self._transport is None:
+ # return self._client
client = paramiko.SFTPClient.from_transport(self._transport)
self._tlocal.client = client
return client
+ @synchronize
def close(self):
"""Close the connection to the remote server."""
if not self.closed:
@@ -256,6 +259,7 @@ class SFTPFS(FS):
url = 'sftp://%s%s' % (self.hostname.rstrip('/'), abspath(path))
return url
+ @synchronize
@convert_os_errors
def open(self,path,mode="rb",bufsize=-1):
npath = self._normpath(path)
@@ -275,6 +279,7 @@ class SFTPFS(FS):
f.truncate = new_truncate
return f
+ @synchronize
def desc(self, path):
npath = self._normpath(path)
if self.hostname:
@@ -283,6 +288,7 @@ class SFTPFS(FS):
addr, port = self._transport.getpeername()
return u'sftp://%s:%i%s' % (addr, port, self.client.normalize(npath))
+ @synchronize
@convert_os_errors
def exists(self,path):
if path in ('', '/'):
@@ -296,6 +302,7 @@ class SFTPFS(FS):
raise
return True
+ @synchronize
@convert_os_errors
def isdir(self,path):
if path in ('', '/'):
@@ -309,6 +316,7 @@ class SFTPFS(FS):
raise
return statinfo.S_ISDIR(stat.st_mode)
+ @synchronize
@convert_os_errors
def isfile(self,path):
npath = self._normpath(path)
@@ -320,6 +328,7 @@ class SFTPFS(FS):
raise
return statinfo.S_ISREG(stat.st_mode)
+ @synchronize
@convert_os_errors
def listdir(self,path="./",wildcard=None,full=False,absolute=False,dirs_only=False,files_only=False):
npath = self._normpath(path)
@@ -360,7 +369,7 @@ class SFTPFS(FS):
return self._listdir_helper(path, paths, wildcard, full, absolute, False, False)
-
+ @synchronize
@convert_os_errors
def listdirinfo(self,path="./",wildcard=None,full=False,absolute=False,dirs_only=False,files_only=False):
npath = self._normpath(path)
@@ -404,12 +413,13 @@ class SFTPFS(FS):
return [(p, getinfo(p)) for p in
self._listdir_helper(path, paths, wildcard, full, absolute, False, False)]
+ @synchronize
@convert_os_errors
def makedir(self,path,recursive=False,allow_recreate=False):
npath = self._normpath(path)
try:
self.client.mkdir(npath)
- except IOError, e:
+ except IOError, _e:
# Error code is unreliable, try to figure out what went wrong
try:
stat = self.client.stat(npath)
@@ -431,6 +441,7 @@ class SFTPFS(FS):
else:
raise ResourceInvalidError(path,msg="Can't create directory, there's already a file of that name: %(path)s")
+ @synchronize
@convert_os_errors
def remove(self,path):
npath = self._normpath(path)
@@ -443,6 +454,7 @@ class SFTPFS(FS):
raise ResourceInvalidError(path,msg="Cannot use remove() on a directory: %(path)s")
raise
+ @synchronize
@convert_os_errors
def removedir(self,path,recursive=False,force=False):
npath = self._normpath(path)
@@ -473,6 +485,7 @@ class SFTPFS(FS):
except DirectoryNotEmptyError:
pass
+ @synchronize
@convert_os_errors
def rename(self,src,dst):
nsrc = self._normpath(src)
@@ -486,6 +499,7 @@ class SFTPFS(FS):
raise ParentDirectoryMissingError(dst)
raise
+ @synchronize
@convert_os_errors
def move(self,src,dst,overwrite=False,chunk_size=16384):
nsrc = self._normpath(src)
@@ -503,6 +517,7 @@ class SFTPFS(FS):
raise ParentDirectoryMissingError(dst,msg="Destination directory does not exist: %(path)s")
raise
+ @synchronize
@convert_os_errors
def movedir(self,src,dst,overwrite=False,ignore_errors=False,chunk_size=16384):
nsrc = self._normpath(src)
@@ -537,6 +552,7 @@ class SFTPFS(FS):
info['modified_time'] = fromtimestamp(mt)
return info
+ @synchronize
@convert_os_errors
def getinfo(self, path):
npath = self._normpath(path)
@@ -554,6 +570,7 @@ class SFTPFS(FS):
info['modified_time'] = datetime.datetime.fromtimestamp(mt)
return info
+ @synchronize
@convert_os_errors
def getsize(self, path):
npath = self._normpath(path)
diff --git a/tox.ini b/tox.ini
index adb492c..eae4ef8 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,4 +1,22 @@
[tox]
envlist = py25,py26,py27
[testenv]
-commands = nosetests
+deps=dexml
+ paramiko
+ boto
+ nose
+ mako
+ pyftpdlib
+
+[testenv:py25]
+commands = nosetests -v \
+ []
+
+[testenv:py26]
+commands = nosetests -v \
+ []
+
+[testenv:py27]
+commands = nosetests -v \
+ []
+