diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-03-23 20:34:58 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-03-23 20:34:58 +0000 |
commit | 9486fbbc46abc818568d38a706153a3dec7cd62c (patch) | |
tree | de7e864d6b9b1a243b718df336b770ecd339eeee | |
parent | 33e5588c8d7c359b7b386e31542d0ca55dd25ba3 (diff) | |
download | qpid-python-9486fbbc46abc818568d38a706153a3dec7cd62c.tar.gz |
fixed resource leakage on repeated connection open/close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@926766 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/compat.py | 27 | ||||
-rw-r--r-- | qpid/python/qpid/concurrency.py | 6 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/driver.py | 3 | ||||
-rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 4 | ||||
-rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 25 |
5 files changed, 59 insertions, 6 deletions
diff --git a/qpid/python/qpid/compat.py b/qpid/python/qpid/compat.py index c2b668a5e9..8b1f4b746b 100644 --- a/qpid/python/qpid/compat.py +++ b/qpid/python/qpid/compat.py @@ -84,6 +84,16 @@ if sys.platform in ('win32', 'cygwin'): def fileno(self): return self.read_sock.fileno() + def close(self): + if self.write_sock is not None: + self.write_sock.close() + self.write_sock = None + self.read_sock.close() + self.read_sock = None + + def __del__(self): + self.close() + def __repr__(self): return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock) @@ -102,9 +112,8 @@ else: class PipeWaiter(BaseWaiter): - def __init__(self, read_fd, write_fd): - self.read_fd = read_fd - self.write_fd = write_fd + def __init__(self): + self.read_fd, self.write_fd = os.pipe() def _do_write(self): os.write(self.write_fd, "\0") @@ -115,8 +124,18 @@ else: def fileno(self): return self.read_fd + def close(self): + if self.write_fd is not None: + os.close(self.write_fd) + self.write_fd = None + os.close(self.read_fd) + self.read_fd = None + + def __del__(self): + self.close() + def __repr__(self): return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd) def selectable_waiter(): - return PipeWaiter(*os.pipe()) + return PipeWaiter() diff --git a/qpid/python/qpid/concurrency.py b/qpid/python/qpid/concurrency.py index 9837a3f0df..eefe0d445f 100644 --- a/qpid/python/qpid/concurrency.py +++ b/qpid/python/qpid/concurrency.py @@ -98,3 +98,9 @@ class Condition: self.lock._acquire_restore(st) self.waiting.remove(sw) self.waiters.append(sw) + + def gc(self): + assert self.lock._is_owned() + while self.waiters: + sw = self.waiters.pop(0) + sw.close() diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index ba53d94e33..01393d6d70 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -342,6 +342,9 @@ class Driver: def start(self): self._selector.register(self) + def stop(self): + self._selector.unregister(self) + def fileno(self): return self._socket.fileno() diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index af2b1a8007..195c6e7ef7 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -102,7 +102,6 @@ class Connection: self.error = None from driver import Driver self._driver = Driver(self) - self._driver.start() def _wait(self, predicate, timeout=None): return self._waiter.wait(predicate, timeout=timeout) @@ -157,6 +156,7 @@ class Connection: Connect to the remote endpoint. """ self._connected = True + self._driver.start() self._wakeup() self._ewait(lambda: self._transport_connected and not self._unlinked(), exc=ConnectError) @@ -175,6 +175,8 @@ class Connection: self._connected = False self._wakeup() self._ewait(lambda: not self._transport_connected) + self._driver.stop() + self._condition.gc() @synchronized def connected(self): diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 5d4fc1646b..5888413f2f 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -20,7 +20,7 @@ # setup, usage, teardown, errors(sync), errors(async), stress, soak, # boundary-conditions, config -import time +import errno, os, time from qpid import compat from qpid.messaging import * from qpid.tests.messaging import Base @@ -48,6 +48,29 @@ class SetupTests(Base): # XXX: should verify that e includes appropriate diagnostic info pass + def use_fds(self): + fds = [] + try: + while True: + fds.append(os.open("/dev/null", os.O_RDONLY)) + except OSError, e: + if e.errno != errno.EMFILE: + raise e + else: + return fds + + def testOpenCloseResourceLeaks(self): + fds = self.use_fds() + try: + for i in range(32): + if fds: os.close(fds.pop()) + for i in xrange(64): + conn = Connection.open(self.broker.host, self.broker.port) + conn.close() + finally: + while fds: + os.close(fds.pop()) + class ConnectionTests(Base): def setup_connection(self): |