summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-03-23 20:34:58 +0000
committerRafael H. Schloming <rhs@apache.org>2010-03-23 20:34:58 +0000
commit9486fbbc46abc818568d38a706153a3dec7cd62c (patch)
treede7e864d6b9b1a243b718df336b770ecd339eeee
parent33e5588c8d7c359b7b386e31542d0ca55dd25ba3 (diff)
downloadqpid-python-9486fbbc46abc818568d38a706153a3dec7cd62c.tar.gz
fixed resource leakage on repeated connection open/close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@926766 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/compat.py27
-rw-r--r--qpid/python/qpid/concurrency.py6
-rw-r--r--qpid/python/qpid/messaging/driver.py3
-rw-r--r--qpid/python/qpid/messaging/endpoints.py4
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py25
5 files changed, 59 insertions, 6 deletions
diff --git a/qpid/python/qpid/compat.py b/qpid/python/qpid/compat.py
index c2b668a5e9..8b1f4b746b 100644
--- a/qpid/python/qpid/compat.py
+++ b/qpid/python/qpid/compat.py
@@ -84,6 +84,16 @@ if sys.platform in ('win32', 'cygwin'):
def fileno(self):
return self.read_sock.fileno()
+ def close(self):
+ if self.write_sock is not None:
+ self.write_sock.close()
+ self.write_sock = None
+ self.read_sock.close()
+ self.read_sock = None
+
+ def __del__(self):
+ self.close()
+
def __repr__(self):
return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock)
@@ -102,9 +112,8 @@ else:
class PipeWaiter(BaseWaiter):
- def __init__(self, read_fd, write_fd):
- self.read_fd = read_fd
- self.write_fd = write_fd
+ def __init__(self):
+ self.read_fd, self.write_fd = os.pipe()
def _do_write(self):
os.write(self.write_fd, "\0")
@@ -115,8 +124,18 @@ else:
def fileno(self):
return self.read_fd
+ def close(self):
+ if self.write_fd is not None:
+ os.close(self.write_fd)
+ self.write_fd = None
+ os.close(self.read_fd)
+ self.read_fd = None
+
+ def __del__(self):
+ self.close()
+
def __repr__(self):
return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd)
def selectable_waiter():
- return PipeWaiter(*os.pipe())
+ return PipeWaiter()
diff --git a/qpid/python/qpid/concurrency.py b/qpid/python/qpid/concurrency.py
index 9837a3f0df..eefe0d445f 100644
--- a/qpid/python/qpid/concurrency.py
+++ b/qpid/python/qpid/concurrency.py
@@ -98,3 +98,9 @@ class Condition:
self.lock._acquire_restore(st)
self.waiting.remove(sw)
self.waiters.append(sw)
+
+ def gc(self):
+ assert self.lock._is_owned()
+ while self.waiters:
+ sw = self.waiters.pop(0)
+ sw.close()
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index ba53d94e33..01393d6d70 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -342,6 +342,9 @@ class Driver:
def start(self):
self._selector.register(self)
+ def stop(self):
+ self._selector.unregister(self)
+
def fileno(self):
return self._socket.fileno()
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index af2b1a8007..195c6e7ef7 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -102,7 +102,6 @@ class Connection:
self.error = None
from driver import Driver
self._driver = Driver(self)
- self._driver.start()
def _wait(self, predicate, timeout=None):
return self._waiter.wait(predicate, timeout=timeout)
@@ -157,6 +156,7 @@ class Connection:
Connect to the remote endpoint.
"""
self._connected = True
+ self._driver.start()
self._wakeup()
self._ewait(lambda: self._transport_connected and not self._unlinked(),
exc=ConnectError)
@@ -175,6 +175,8 @@ class Connection:
self._connected = False
self._wakeup()
self._ewait(lambda: not self._transport_connected)
+ self._driver.stop()
+ self._condition.gc()
@synchronized
def connected(self):
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 5d4fc1646b..5888413f2f 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -20,7 +20,7 @@
# setup, usage, teardown, errors(sync), errors(async), stress, soak,
# boundary-conditions, config
-import time
+import errno, os, time
from qpid import compat
from qpid.messaging import *
from qpid.tests.messaging import Base
@@ -48,6 +48,29 @@ class SetupTests(Base):
# XXX: should verify that e includes appropriate diagnostic info
pass
+ def use_fds(self):
+ fds = []
+ try:
+ while True:
+ fds.append(os.open("/dev/null", os.O_RDONLY))
+ except OSError, e:
+ if e.errno != errno.EMFILE:
+ raise e
+ else:
+ return fds
+
+ def testOpenCloseResourceLeaks(self):
+ fds = self.use_fds()
+ try:
+ for i in range(32):
+ if fds: os.close(fds.pop())
+ for i in xrange(64):
+ conn = Connection.open(self.broker.host, self.broker.port)
+ conn.close()
+ finally:
+ while fds:
+ os.close(fds.pop())
+
class ConnectionTests(Base):
def setup_connection(self):