From 01a75ea85ceda350213229967f132b40bce404e8 Mon Sep 17 00:00:00 2001 From: "willmcgugan@gmail.com" Date: Sun, 10 Feb 2013 11:27:33 +0000 Subject: Fix for xmlrpc git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@844 67cdc799-7952-0410-af00-57a81ceafa0f --- fs/expose/xmlrpc.py | 20 +++++---- fs/mountfs.py | 38 ++++++++-------- fs/rpcfs.py | 78 +++++++++++++++++---------------- fs/tests/test_archivefs.py | 10 ++++- fs/tests/test_expose.py | 106 ++------------------------------------------- fs/tests/test_rpcfs.py | 100 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 184 insertions(+), 168 deletions(-) create mode 100644 fs/tests/test_rpcfs.py diff --git a/fs/expose/xmlrpc.py b/fs/expose/xmlrpc.py index a11eb56..b24368f 100644 --- a/fs/expose/xmlrpc.py +++ b/fs/expose/xmlrpc.py @@ -41,23 +41,27 @@ class RPCFSInterface(object): is base64-encoded UTF-8. """ if PY3: - return path + return path return path.encode("utf8").encode("base64") def decode_path(self, path): """Decode paths arriving over the wire.""" if PY3: return path - return path.decode("base64").decode("utf8") + return path.decode("base64").decode("utf8") def getmeta(self, meta_name): meta = self.fs.getmeta(meta_name) + if isinstance(meta, basestring): + meta = meta.decode('base64') return meta - + def getmeta_default(self, meta_name, default): meta = self.fs.getmeta(meta_name, default) + if isinstance(meta, basestring): + meta = meta.decode('base64') return meta - + def hasmeta(self, meta_name): return self.fs.hasmeta(meta_name) @@ -98,7 +102,7 @@ class RPCFSInterface(object): def removedir(self, path, recursive=False, force=False): path = self.decode_path(path) return self.fs.removedir(path, recursive, force) - + def rename(self, src, dst): src = self.decode_path(src) dst = self.decode_path(dst) @@ -109,12 +113,12 @@ class RPCFSInterface(object): if isinstance(accessed_time, xmlrpclib.DateTime): accessed_time = datetime.strptime(accessed_time.value, "%Y%m%dT%H:%M:%S") if isinstance(modified_time, xmlrpclib.DateTime): - modified_time = datetime.strptime(modified_time.value, "%Y%m%dT%H:%M:%S") + modified_time = datetime.strptime(modified_time.value, "%Y%m%dT%H:%M:%S") return self.fs.settimes(path, accessed_time, modified_time) - def getinfo(self, path): + def getinfo(self, path): path = self.decode_path(path) - info = self.fs.getinfo(path) + info = self.fs.getinfo(path) return info def desc(self, path): diff --git a/fs/mountfs.py b/fs/mountfs.py index 41c75b8..18f6f88 100644 --- a/fs/mountfs.py +++ b/fs/mountfs.py @@ -8,12 +8,12 @@ For example, lets say we have two filesystems containing config files and resour [config_fs] |-- config.cfg - `-- defaults.cfg + `-- defaults.cfg [resources_fs] |-- images | |-- logo.jpg - | `-- photo.jpg + | `-- photo.jpg `-- data.dat We can combine these filesystems in to a single filesystem with the following code:: @@ -31,7 +31,7 @@ This will create a single filesystem where paths under `config` map to `config_f | `-- defaults.cfg `-- resources |-- images - | |-- logo.jpg + | |-- logo.jpg | `-- photo.jpg `-- data.dat @@ -39,7 +39,7 @@ Now both filesystems can be accessed with the same path structure:: print combined_fs.getcontents('/config/defaults.cfg') read_jpg(combined_fs.open('/resources/images/logo.jpg') - + """ from fs.base import * @@ -51,14 +51,14 @@ from fs import _thread_synchronize_default class DirMount(object): def __init__(self, path, fs): self.path = path - self.fs = fs + self.fs = fs def __str__(self): return "" % (self.path, self.fs) - + def __repr__(self): return "" % (self.path, self.fs) - + def __unicode__(self): return u"" % (self.path, self.fs) @@ -77,7 +77,7 @@ class MountFS(FS): _meta = { 'virtual': True, 'read_only' : False, 'unicode_paths' : True, - 'case_insensitive_paths' : False, + 'case_insensitive_paths' : False, } DirMount = DirMount @@ -86,7 +86,7 @@ class MountFS(FS): def __init__(self, auto_close=True, thread_synchronize=_thread_synchronize_default): self.auto_close = auto_close super(MountFS, self).__init__(thread_synchronize=thread_synchronize) - self.mount_tree = PathMap() + self.mount_tree = PathMap() def __str__(self): return "<%s [%s]>" % (self.__class__.__name__,self.mount_tree.items(),) @@ -128,11 +128,11 @@ class MountFS(FS): def close(self): # Explicitly closes children if requested if self.auto_close: - for mount in self.mount_tree.itervalues(): + for mount in self.mount_tree.itervalues(): mount.fs.close() # Free references (which may incidently call the close method of the child filesystems) - self.mount_tree.clear() - super(MountFS, self).close() + self.mount_tree.clear() + super(MountFS, self).close() def getsyspath(self, path, allow_none=False): fs, _mount_path, delegate_path = self._delegate(path) @@ -142,7 +142,7 @@ class MountFS(FS): else: raise NoSysPathError(path=path) return fs.getsyspath(delegate_path, allow_none=allow_none) - + def getpathurl(self, path, allow_none=False): fs, _mount_path, delegate_path = self._delegate(path) if fs is self or fs is None: @@ -160,7 +160,7 @@ class MountFS(FS): return "Mount dir" else: return "Mounted file" - return "Mounted dir, maps to path %s on %s" % (delegate_path, str(fs)) + return "Mounted dir, maps to path %s on %s" % (delegate_path or '/', str(fs)) @synchronize def isdir(self, path): @@ -283,7 +283,7 @@ class MountFS(FS): if not delegate_path: if allow_recreate: return - else: + else: raise DestinationExistsError(path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s") return fs.makedir(delegate_path, recursive=recursive, allow_recreate=allow_recreate) @@ -396,9 +396,9 @@ class MountFS(FS): @synchronize def mountdir(self, path, fs): """Mounts a host FS object on a given path. - + :param path: A path within the MountFS - :param fs: A filesystem object to mount + :param fs: A filesystem object to mount """ path = abspath(normpath(path)) @@ -408,11 +408,11 @@ class MountFS(FS): @synchronize def mountfile(self, path, open_callable=None, info_callable=None): """Mounts a single file path. - + :param path: A path within the MountFS :param open_callable: A callable that returns a file-like object :param info_callable: A callable that returns a dictionary with information regarding the file-like object - + """ self.mount_tree[path] = MountFS.FileMount(path, callable, info_callable) diff --git a/fs/rpcfs.py b/fs/rpcfs.py index 831a977..2c4be4e 100644 --- a/fs/rpcfs.py +++ b/fs/rpcfs.py @@ -23,22 +23,22 @@ from six import PY3, b def re_raise_faults(func): """Decorator to re-raise XML-RPC faults as proper exceptions.""" - def wrapper(*args,**kwds): + def wrapper(*args,**kwds): try: return func(*args,**kwds) - except xmlrpclib.Fault, f: + except (xmlrpclib.Fault), f: # Make sure it's in a form we can handle - bits = f.faultString.split(" ") + bits = f.faultString.split(" ") if bits[0] not in [":") - cls = bits[0] - msg = ">:".join(bits[1:]) - cls = cls.strip('\'') + cls = bits[0] + msg = ">:".join(bits[1:]) + cls = cls.strip('\'') cls = _object_by_name(cls) # Re-raise using the remainder of the fault code as message - if cls: + if cls: if issubclass(cls,FSError): raise cls('', msg=msg) else: @@ -66,7 +66,7 @@ def _object_by_name(name,root=None): return _object_by_name(".".join(bits[1:]),obj) else: return obj - + class ReRaiseFaults: """XML-RPC proxy wrapper that re-raises Faults as proper Exceptions.""" @@ -94,9 +94,9 @@ class RPCFS(FS): """ - _meta = {'thread_safe' : True, - 'virtual': False, - 'network' : True, + _meta = {'thread_safe' : True, + 'virtual': False, + 'network' : True, } def __init__(self, uri, transport=None): @@ -105,30 +105,30 @@ class RPCFS(FS): The only required argument is the URI of the server to connect to. This will be passed to the underlying XML-RPC server proxy object, along with the 'transport' argument if it is provided. - - :param uri: address of the server - + + :param uri: address of the server + """ super(RPCFS, self).__init__(thread_synchronize=True) self.uri = uri self._transport = transport - self.proxy = self._make_proxy() + self.proxy = self._make_proxy() self.isdir('/') @synchronize def _make_proxy(self): kwds = dict(allow_none=True, use_datetime=True) - + if self._transport is not None: proxy = xmlrpclib.ServerProxy(self.uri,self._transport,**kwds) else: - proxy = xmlrpclib.ServerProxy(self.uri,**kwds) - + proxy = xmlrpclib.ServerProxy(self.uri,**kwds) + return ReRaiseFaults(proxy) def __str__(self): return '' % (self.uri,) - + def __repr__(self): return '' % (self.uri,) @@ -140,10 +140,10 @@ class RPCFS(FS): except KeyError: pass return state - + def __setstate__(self, state): - super(RPCFS, self).__setstate__(state) - self.proxy = self._make_proxy() + super(RPCFS, self).__setstate__(state) + self.proxy = self._make_proxy() def encode_path(self, path): """Encode a filesystem path for sending over the wire. @@ -154,23 +154,27 @@ class RPCFS(FS): """ if PY3: return path - return path.encode("utf8").encode("base64") + return path.encode("utf8").encode("base64") def decode_path(self, path): """Decode paths arriving over the wire.""" if PY3: - return path - return path.decode("base64").decode("utf8") - + return path + return path.decode("base64").decode("utf8") + @synchronize def getmeta(self, meta_name, default=NoDefaultMeta): - if default is NoDefaultMeta: - return self.proxy.getmeta(meta_name) + if default is NoDefaultMeta: + meta = self.proxy.getmeta(meta_name) else: - return self.proxy.getmeta_default(meta_name, default) - - @synchronize - def hasmeta(self, meta_name): + meta = self.proxy.getmeta_default(meta_name, default) + if isinstance(meta, basestring): + # To allow transport of meta with invalid xml chars (like null) + meta = meta.encode('base64') + return meta + + @synchronize + def hasmeta(self, meta_name): return self.proxy.hasmeta(meta_name) @synchronize @@ -197,7 +201,7 @@ class RPCFS(FS): f.seek(0,2) oldflush = f.flush oldclose = f.close - oldtruncate = f.truncate + oldtruncate = f.truncate def newflush(): self._lock.acquire() try: @@ -219,7 +223,7 @@ class RPCFS(FS): f.flush() finally: self._lock.release() - + f.flush = newflush f.close = newclose f.truncate = newtruncate @@ -241,7 +245,7 @@ class RPCFS(FS): return self.proxy.isfile(path) @synchronize - def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False): + def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False): enc_path = self.encode_path(path) if not callable(wildcard): entries = self.proxy.listdir(enc_path,wildcard,full,absolute, @@ -272,7 +276,7 @@ class RPCFS(FS): def removedir(self, path, recursive=False, force=False): path = self.encode_path(path) return self.proxy.removedir(path,recursive,force) - + @synchronize def rename(self, src, dst): src = self.encode_path(src) @@ -286,7 +290,7 @@ class RPCFS(FS): @synchronize def getinfo(self, path): - path = self.encode_path(path) + path = self.encode_path(path) return self.proxy.getinfo(path) @synchronize diff --git a/fs/tests/test_archivefs.py b/fs/tests/test_archivefs.py index 217990f..30019b6 100644 --- a/fs/tests/test_archivefs.py +++ b/fs/tests/test_archivefs.py @@ -13,12 +13,20 @@ import shutil import fs.tests from fs.path import * -from fs.contrib import archivefs +try: + from fs.contrib import archivefs +except ImportError: + libarchive_available = False +else: + libarchive_available = True + from six import PY3, b class TestReadArchiveFS(unittest.TestCase): + __test__ = libarchive_available + def setUp(self): self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip" self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename) diff --git a/fs/tests/test_expose.py b/fs/tests/test_expose.py index e052e6c..3c070c5 100644 --- a/fs/tests/test_expose.py +++ b/fs/tests/test_expose.py @@ -24,99 +24,7 @@ from fs.expose.xmlrpc import RPCFSServer import six from six import PY3, b -class TestRPCFS(unittest.TestCase, FSTestCases, ThreadingTestCases): - - def makeServer(self,fs,addr): - return RPCFSServer(fs,addr,logRequests=False) - - def startServer(self): - port = 3000 - self.temp_fs = TempFS() - self.server = None - - self.serve_more_requests = True - self.server_thread = threading.Thread(target=self.runServer) - self.server_thread.setDaemon(True) - - self.start_event = threading.Event() - self.end_event = threading.Event() - - self.server_thread.start() - - self.start_event.wait() - - def runServer(self): - """Run the server, swallowing shutdown-related execptions.""" - - port = 3000 - while not self.server: - try: - self.server = self.makeServer(self.temp_fs,("127.0.0.1",port)) - except socket.error, e: - if e.args[1] == "Address already in use": - port += 1 - else: - raise - self.server_addr = ("127.0.0.1", port) - - self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - -# if sys.platform != "win32": -# try: -# self.server.socket.settimeout(1) -# except socket.error: -# pass -# - self.start_event.set() - - try: - #self.server.serve_forever() - while self.serve_more_requests: - self.server.handle_request() - except Exception, e: - pass - - self.end_event.set() - - def setUp(self): - self.startServer() - self.fs = rpcfs.RPCFS("http://%s:%d" % self.server_addr) - - def tearDown(self): - self.serve_more_requests = False - #self.server.socket.close() -# self.server.socket.shutdown(socket.SHUT_RDWR) -# self.server.socket.close() -# self.temp_fs.close() - #self.server_thread.join() - - #self.end_event.wait() - #return - - try: - self.bump() - self.server.server_close() - except Exception: - pass - #self.server_thread.join() - self.temp_fs.close() - - def bump(self): - host, port = self.server_addr - for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): - af, socktype, proto, cn, sa = res - sock = None - try: - sock = socket.socket(af, socktype, proto) - sock.settimeout(.1) - sock.connect(sa) - sock.send(b("\n")) - except socket.error, e: - pass - finally: - if sock is not None: - sock.close() - +from fs.tests.test_rpcfs import TestRPCFS try: from fs import sftpfs @@ -125,7 +33,7 @@ except ImportError: if not PY3: raise class TestSFTPFS(TestRPCFS): - + __test__ = not PY3 def makeServer(self,fs,addr): @@ -134,14 +42,6 @@ class TestSFTPFS(TestRPCFS): def setUp(self): self.startServer() self.fs = sftpfs.SFTPFS(self.server_addr, no_auth=True) - - #def runServer(self): - # self.server.serve_forever() - # - #def tearDown(self): - # self.server.shutdown() - # self.server_thread.join() - # self.temp_fs.close() def bump(self): # paramiko doesn't like being bumped, just wait for it to timeout. @@ -158,7 +58,7 @@ else: class TestFUSE(unittest.TestCase,FSTestCases,ThreadingTestCases): def setUp(self): - self.temp_fs = TempFS() + self.temp_fs = TempFS() self.temp_fs.makedir("root") self.temp_fs.makedir("mount") self.mounted_fs = self.temp_fs.opendir("root") diff --git a/fs/tests/test_rpcfs.py b/fs/tests/test_rpcfs.py new file mode 100644 index 0000000..a6f789c --- /dev/null +++ b/fs/tests/test_rpcfs.py @@ -0,0 +1,100 @@ + +import unittest +import sys +import os, os.path +import socket +import threading +import time + +from fs.tests import FSTestCases, ThreadingTestCases +from fs.tempfs import TempFS +from fs.osfs import OSFS +from fs.memoryfs import MemoryFS +from fs.path import * +from fs.errors import * + +from fs import rpcfs +from fs.expose.xmlrpc import RPCFSServer + +import six +from six import PY3, b + + +class TestRPCFS(unittest.TestCase, FSTestCases, ThreadingTestCases): + + def makeServer(self,fs,addr): + return RPCFSServer(fs,addr,logRequests=False) + + def startServer(self): + port = 3000 + self.temp_fs = TempFS() + self.server = None + + self.serve_more_requests = True + self.server_thread = threading.Thread(target=self.runServer) + self.server_thread.setDaemon(True) + + self.start_event = threading.Event() + self.end_event = threading.Event() + + self.server_thread.start() + + self.start_event.wait() + + def runServer(self): + """Run the server, swallowing shutdown-related execptions.""" + + port = 3000 + while not self.server: + try: + self.server = self.makeServer(self.temp_fs,("127.0.0.1",port)) + except socket.error, e: + if e.args[1] == "Address already in use": + port += 1 + else: + raise + self.server_addr = ("127.0.0.1", port) + + self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + self.start_event.set() + + try: + #self.server.serve_forever() + while self.serve_more_requests: + self.server.handle_request() + except Exception, e: + pass + + self.end_event.set() + + def setUp(self): + self.startServer() + self.fs = rpcfs.RPCFS("http://%s:%d" % self.server_addr) + + def tearDown(self): + self.serve_more_requests = False + + try: + self.bump() + self.server.server_close() + except Exception: + pass + #self.server_thread.join() + self.temp_fs.close() + + def bump(self): + host, port = self.server_addr + for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): + af, socktype, proto, cn, sa = res + sock = None + try: + sock = socket.socket(af, socktype, proto) + sock.settimeout(.1) + sock.connect(sa) + sock.send(b("\n")) + except socket.error, e: + pass + finally: + if sock is not None: + sock.close() -- cgit v1.2.1