diff options
Diffstat (limited to 'fs/tests/test_expose.py')
-rw-r--r-- | fs/tests/test_expose.py | 106 |
1 files changed, 3 insertions, 103 deletions
diff --git a/fs/tests/test_expose.py b/fs/tests/test_expose.py index e052e6c..3c070c5 100644 --- a/fs/tests/test_expose.py +++ b/fs/tests/test_expose.py @@ -24,99 +24,7 @@ from fs.expose.xmlrpc import RPCFSServer import six from six import PY3, b -class TestRPCFS(unittest.TestCase, FSTestCases, ThreadingTestCases): - - def makeServer(self,fs,addr): - return RPCFSServer(fs,addr,logRequests=False) - - def startServer(self): - port = 3000 - self.temp_fs = TempFS() - self.server = None - - self.serve_more_requests = True - self.server_thread = threading.Thread(target=self.runServer) - self.server_thread.setDaemon(True) - - self.start_event = threading.Event() - self.end_event = threading.Event() - - self.server_thread.start() - - self.start_event.wait() - - def runServer(self): - """Run the server, swallowing shutdown-related execptions.""" - - port = 3000 - while not self.server: - try: - self.server = self.makeServer(self.temp_fs,("127.0.0.1",port)) - except socket.error, e: - if e.args[1] == "Address already in use": - port += 1 - else: - raise - self.server_addr = ("127.0.0.1", port) - - self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - -# if sys.platform != "win32": -# try: -# self.server.socket.settimeout(1) -# except socket.error: -# pass -# - self.start_event.set() - - try: - #self.server.serve_forever() - while self.serve_more_requests: - self.server.handle_request() - except Exception, e: - pass - - self.end_event.set() - - def setUp(self): - self.startServer() - self.fs = rpcfs.RPCFS("http://%s:%d" % self.server_addr) - - def tearDown(self): - self.serve_more_requests = False - #self.server.socket.close() -# self.server.socket.shutdown(socket.SHUT_RDWR) -# self.server.socket.close() -# self.temp_fs.close() - #self.server_thread.join() - - #self.end_event.wait() - #return - - try: - self.bump() - self.server.server_close() - except Exception: - pass - #self.server_thread.join() - self.temp_fs.close() - - def bump(self): - host, port = self.server_addr - for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): - af, socktype, proto, cn, sa = res - sock = None - try: - sock = socket.socket(af, socktype, proto) - sock.settimeout(.1) - sock.connect(sa) - sock.send(b("\n")) - except socket.error, e: - pass - finally: - if sock is not None: - sock.close() - +from fs.tests.test_rpcfs import TestRPCFS try: from fs import sftpfs @@ -125,7 +33,7 @@ except ImportError: if not PY3: raise class TestSFTPFS(TestRPCFS): - + __test__ = not PY3 def makeServer(self,fs,addr): @@ -134,14 +42,6 @@ class TestSFTPFS(TestRPCFS): def setUp(self): self.startServer() self.fs = sftpfs.SFTPFS(self.server_addr, no_auth=True) - - #def runServer(self): - # self.server.serve_forever() - # - #def tearDown(self): - # self.server.shutdown() - # self.server_thread.join() - # self.temp_fs.close() def bump(self): # paramiko doesn't like being bumped, just wait for it to timeout. @@ -158,7 +58,7 @@ else: class TestFUSE(unittest.TestCase,FSTestCases,ThreadingTestCases): def setUp(self): - self.temp_fs = TempFS() + self.temp_fs = TempFS() self.temp_fs.makedir("root") self.temp_fs.makedir("mount") self.mounted_fs = self.temp_fs.opendir("root") |