summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2009-01-15 22:24:58 +0000
committerrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2009-01-15 22:24:58 +0000
commit53830923df0d66c6aad4c7df0f99b45626e26ed4 (patch)
tree84319f22750da7c9401a5f08e9c615bebb1f7df7
parent28066b4fabbd3866e0f9a0eb766de5d4b8823c26 (diff)
downloadpyfilesystem-53830923df0d66c6aad4c7df0f99b45626e26ed4.tar.gz
added RPCFS, which exposes a filesystem using XML-RPC
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@110 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r--fs/rpcfs.py306
-rw-r--r--fs/tests.py58
2 files changed, 363 insertions, 1 deletions
diff --git a/fs/rpcfs.py b/fs/rpcfs.py
new file mode 100644
index 0000000..4d2a676
--- /dev/null
+++ b/fs/rpcfs.py
@@ -0,0 +1,306 @@
+"""
+
+ fs.rpcfs: Client and Server to expose an FS via XML-RPC
+
+This module provides the following pair of classes that can be used to expose
+a remote filesystem using XML-RPC:
+
+ RPCFSServer: a subclass of SimpleXMLRPCServer that exposes the methods
+ of an FS instance via XML-RPC
+
+ RPCFS: a subclass of FS that delegates all filesystem operations to
+ a remote server using XML-RPC.
+
+If you need to use a more powerful server than SimpleXMLRPCServer, you can
+use the RPCFSInterface class to provide an XML-RPC-compatible wrapper around
+an FS object, which can then be exposed using whatever server you choose
+(e.g. Twisted's XML-RPC server).
+
+"""
+
+import xmlrpclib
+from SimpleXMLRPCServer import SimpleXMLRPCServer
+
+from fs.base import *
+
+from StringIO import StringIO
+
+
+class ObjProxy:
+ """Simple object proxy allowing us to replace read-only attributes.
+
+ This is used to put a modified 'close' method on files returned by
+ open(), such that they will be uploaded to the server when closed.
+ """
+
+ def __init__(self,obj):
+ self._obj = obj
+
+ def __getattr__(self,attr):
+ return getattr(self._obj,attr)
+
+class ReRaiseErrors:
+ """XML-RPC proxy wrapper that tries to re-raise Faults as actual errors."""
+ def __init__(self,proxy):
+ self._proxy = proxy
+ def __getattr__(self,attr):
+ val = getattr(self._proxy,attr)
+ if not callable(val):
+ return val
+ def func(*args,**kwds):
+ try:
+ val(*args,**kwds)
+ except xmlrpclib.Fault, e:
+ print "ERROR", e, e.faultCode, e.faultString
+
+def re_raise_faults(func):
+ """Decorator to re-raise XML-RPC faults as proper exceptions."""
+ def wrapper(*args,**kwds):
+ try:
+ return func(*args,**kwds)
+ except xmlrpclib.Fault, f:
+ # Make sure it's in a form we can handle
+ bits = f.faultString.split(" ")
+ if bits[0] not in ["<type","<class"]:
+ raise f
+ # Find the class/type object
+ bits = " ".join(bits[1:]).split(">:")
+ cls = bits[0]
+ msg = ">:".join(bits[1:])
+ while cls[0] in ["'",'"']:
+ cls = cls[1:]
+ while cls[-1] in ["'",'"']:
+ cls = cls[:-1]
+ cls = _object_by_name(cls)
+ # Re-raise using the remainder of the fault code as message
+ if cls:
+ raise cls(msg)
+ raise f
+ return wrapper
+
+
+def _object_by_name(name,root=None):
+ """Look up an object by dotted-name notation."""
+ bits = name.split(".")
+ if root is None:
+ try:
+ obj = globals()[bits[0]]
+ except KeyError:
+ try:
+ obj = __builtins__[bits[0]]
+ except KeyError:
+ obj = __import__(bits[0],globals())
+ else:
+ obj = getattr(root,bits[0])
+ if len(bits) > 1:
+ return _object_by_name(".".join(bits[1:]),obj)
+ else:
+ return obj
+
+
+class ReRaiseFaults:
+ """XML-RPC proxy wrapper that re-raises Faults as proper Exceptions."""
+
+ def __init__(self,obj):
+ self._obj = obj
+
+ def __getattr__(self,attr):
+ val = getattr(self._obj,attr)
+ if callable(val):
+ val = re_raise_faults(val)
+ return val
+
+
+class RPCFS(FS):
+ """Access a filesystem exposed via XML-RPC.
+
+ This class provides the client-side logic for accessing a remote FS
+ object, and is dual to the RPCFSServer class also defined in this module.
+
+ Example:
+
+ fs = RPCFS("http://my.server.com/filesystem/location/")
+
+ """
+
+ def __init__(self, uri, transport=None):
+ """Constructor for RPCFS objects.
+
+ The only required argument is the uri of the server to connect
+ to. This will be passed to the underlying XML-RPC server proxy
+ object along with any other arguments if they are provided.
+ """
+ self.uri = uri
+ if transport is not None:
+ proxy = xmlrpclib.ServerProxy(uri,transport,allow_none=True)
+ else:
+ proxy = xmlrpclib.ServerProxy(uri,allow_none=True)
+ self.proxy = ReRaiseFaults(proxy)
+
+ def __str__(self):
+ return '<RPCFS: %s>' % (self.uri,)
+
+ __repr__ = __str__
+
+ def open(self,path,mode):
+ # TODO: chunked transport of large files
+ if "r" in mode or "a" in mode:
+ try:
+ data = self.proxy.get_contents(path).data
+ except IOError:
+ if "w" not in mode and "a" not in mode:
+ raise ResourceNotFoundError("NO_FILE",path)
+ if not self.isdir(dirname(path)):
+ raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist")
+ else:
+ data = ""
+ f = ObjProxy(StringIO(data))
+ if "a" not in mode:
+ f.seek(0,0)
+ else:
+ f.seek(0,2)
+ oldclose = f.close
+ def newclose():
+ f.flush()
+ self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue()))
+ oldclose()
+ f.close = newclose
+ return f
+
+ def exists(self,path):
+ return self.proxy.exists(path)
+
+ def isdir(self,path):
+ return self.proxy.isdir(path)
+
+ def isfile(self,path):
+ return self.proxy.isfile(path)
+
+ def listdir(self,path="./",wildcard=None,full=False,absolute=False,hidden=True,dirs_only=False,files_only=False):
+ return self.proxy.listdir(path,wildcard,full,absolute,hidden,dirs_only,files_only)
+
+ def makedir(self,path,mode=0777,recursive=False,allow_recreate=False):
+ return self.proxy.makedir(path,mode,recursive,allow_recreate)
+
+ def remove(self,path):
+ return self.proxy.remove(path)
+
+ def removedir(self,path,recursive=False):
+ return self.proxy.removedir(path,recursive)
+
+ def rename(self,src,dst):
+ return self.proxy.rename(src,dst)
+
+ def getinfo(self,path):
+ return self.proxy.getinfo(path)
+
+ def desc(self,path):
+ return self.proxy.desc(path)
+
+ def getattr(self,path,attr):
+ return self.proxy.getattr(path,attr)
+
+ def setattr(self,path,attr,value):
+ return self.proxy.setattr(path,attr,value)
+
+ def copy(self,src,dst,overwrite=False,chunk_size=16384):
+ return self.proxy.copy(src,dst,overwrite,chunk_size)
+
+ def move(self,src,dst,chunk_size=16384):
+ return self.proxy.move(src,dst,chunk_size)
+
+ def movedir(self,src,dst,ignore_errors=False,chunk_size=16384):
+ return self.proxy.movedir(src,dst,ignore_errors,chunk_size)
+
+ def copydir(self,src,dst,ignore_errors=False,chunk_size=16384):
+ return self.proxy.copydir(src,dst,ignore_errors,chunk_size)
+
+
+class RPCFSInterface(object):
+ """Wrapper to expose an FS via a XML-RPC compatible interface."""
+
+ def __init__(self,fs):
+ self.fs = fs
+
+ def get_contents(self,path):
+ data = self.fs.getcontents(path)
+ return xmlrpclib.Binary(data)
+
+ def set_contents(self,path,data):
+ self.fs.createfile(path,data.data)
+
+ def exists(self,path):
+ return self.fs.exists(path)
+
+ def isdir(self,path):
+ return self.fs.isdir(path)
+
+ def isfile(self,path):
+ return self.fs.isfile(path)
+
+ def listdir(self,path="./",wildcard=None,full=False,absolute=False,hidden=True,dirs_only=False,files_only=False):
+ return list(self.fs.listdir(path,wildcard,full,absolute,hidden,dirs_only,files_only))
+
+ def makedir(self,path,mode=0777,recursive=False,allow_recreate=False):
+ return self.fs.makedir(path,mode,recursive,allow_recreate)
+
+ def remove(self,path):
+ return self.fs.remove(path)
+
+ def removedir(self,path,recursive=False):
+ return self.fs.removedir(path,recursive)
+
+ def rename(self,src,dst):
+ return self.fs.rename(src,dst)
+
+ def getinfo(self,path):
+ return self.fs.getinfo(path)
+
+ def desc(self,path):
+ return self.fs.desc(path)
+
+ def getattr(self,path,attr):
+ return self.fs.getattr(path,attr)
+
+ def setattr(self,path,attr,value):
+ return self.fs.setattr(path,attr,value)
+
+ def copy(self,src,dst,overwrite=False,chunk_size=16384):
+ return self.fs.copy(src,dst,overwrite,chunk_size)
+
+ def move(self,src,dst,chunk_size=16384):
+ return self.fs.move(src,dst,chunk_size)
+
+ def movedir(self,src,dst,ignore_errors=False,chunk_size=16384):
+ return self.fs.movedir(src,dst,ignore_errors,chunk_size)
+
+ def copydir(self,src,dst,ignore_errors=False,chunk_size=16384):
+ return self.fs.copydir(src,dst,ignore_errors,chunk_size)
+
+
+class RPCFSServer(SimpleXMLRPCServer):
+ """Server to expose an FS object via XML-RPC.
+
+ Example:
+
+ fs = OSFS('/var/srv/myfiles')
+ s = RPCFSServer(fs,("",8080))
+ s.serve_forever()
+
+ """
+
+ def __init__(self,fs,addr,requestHandler=None,logRequests=None):
+ kwds = dict(allow_none=True)
+ if requestHandler is not None:
+ kwds['requestHandler'] = requestHandler
+ if logRequests is not None:
+ kwds['logRequests'] = logRequests
+ self.serve_more_requests = True
+ SimpleXMLRPCServer.__init__(self,addr,**kwds)
+ self.register_instance(RPCFSInterface(fs))
+
+ def serve_forever(self):
+ """Override serve_forever to allow graceful shutdown."""
+ while self.serve_more_requests:
+ self.handle_request()
+
diff --git a/fs/tests.py b/fs/tests.py
index 0552229..aed5f3e 100644
--- a/fs/tests.py
+++ b/fs/tests.py
@@ -617,10 +617,66 @@ class TestAppendZipFS(TestWriteZipFS):
zip_fs.close()
+import s3fs
+class TestS3FS(TestOSFS):
+
+ bucket = "test-s3fs.rfk.id.au"
+
+ def setUp(self):
+ self.fs = s3fs.S3FS(self.bucket,"/unittest/files")
+ self._clear()
+
+ def _clear(self):
+ for (path,files) in self.fs.walk(search="depth"):
+ for fn in files:
+ self.fs.remove(pathjoin(path,fn))
+ if path and path != "/":
+ self.fs.removedir(path)
+ def tearDown(self):
+ self._clear()
+ for k in self.fs._s3bukt.list():
+ self.fs._s3bukt.delete_key(k)
+ self.fs._s3conn.delete_bucket(self.bucket)
+
+ def check(self, p):
+ return self.fs.exists(p)
+
+
+import rpcfs
+import threading
+import time
+class TestRPCFS(TestOSFS):
+
+ def setUp(self):
+ self.server = rpcfs.RPCFSServer(tempfs.TempFS(),("localhost",8000),logRequests=False)
+ self.server_thread = threading.Thread(target=self._run_server)
+ self.server_thread.start()
+ self.fs = rpcfs.RPCFS("http://localhost:8000")
+
+ def _run_server(self):
+ """Run the server, swallowing shutdown-related execptions."""
+ try:
+ self.server.serve_forever()
+ except:
+ pass
+
+ def tearDown(self):
+ try:
+ # Shut the server down. We send one final request to
+ # bump the socket and make it recognise the shutdown.
+ self.server.serve_more_requests = False
+ self.server.server_close()
+ self.fs.exists("/")
+ except Exception:
+ pass
+
+ def check(self, p):
+ return self.fs.exists(p)
+
if __name__ == "__main__":
#t = TestFS()
#t.setUp()
#t.tearDown()
import nose
- nose.main() \ No newline at end of file
+ nose.main()