summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-15 22:40:30 +0000
committerAlan Conway <aconway@apache.org>2008-08-15 22:40:30 +0000
commit289fa5870d89af4e4cb1b38fa0af37e739dbe421 (patch)
treea248a70eb8204dfe675ed2d47d73934c4a217b67
parente19070a17640bbf2e82fbca281ac8033bd54daf6 (diff)
downloadqpid-python-289fa5870d89af4e4cb1b38fa0af37e739dbe421.tar.gz
Fix memory leak in Cluster and enable valgrind in ais_check
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@686409 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp22
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h2
-rwxr-xr-xqpid/cpp/src/tests/ais_check3
5 files changed, 32 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 84edfa201d..05ab9148b5 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -87,13 +87,21 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
mcastQueue.start(poller);
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+ for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin();
+ i != shadowConnectionMap.end();
+ ++i)
+ {
+ i->second->dirtyClose();
+ }
+ std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
+}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
bool isLocal = &c.getOutput() != &shadowOut;
if (isLocal)
- new ConnectionInterceptor(c, *this);
+ localConnectionSet.insert(new ConnectionInterceptor(c, *this));
}
void Cluster::leave() {
@@ -260,6 +268,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod
case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
if (!connection->isLocal())
shadowConnectionMap.erase(connection->getShadowId());
+ else
+ localConnectionSet.erase(connection);
connection->deliverClosed();
break;
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 1c43bdac43..2b40193dd3 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -91,11 +91,12 @@ class Cluster : private Cpg::Handler, public RefCounted
// Cluster frame handing functions
void notify(const std::string& url);
void connectionClose();
-
+
private:
typedef Cpg::Id Id;
typedef std::map<Id, Member> MemberMap;
typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
+ typedef std::set<ConnectionInterceptor*> LocalConnectionSet;
/** Message sent over the cluster. */
struct Message {
@@ -154,6 +155,7 @@ class Cluster : private Cpg::Handler, public RefCounted
MemberMap members;
Id self;
ShadowConnectionMap shadowConnectionMap;
+ LocalConnectionSet localConnectionSet;
ShadowConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
MessageQueue deliverQueue;
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
index 81d496597a..c13651eccb 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
@@ -42,7 +42,6 @@ ConnectionInterceptor::ConnectionInterceptor(
}
ConnectionInterceptor::~ConnectionInterceptor() {
- assert(isClosed);
assert(connection == 0);
}
@@ -52,7 +51,6 @@ void ConnectionInterceptor::received(framing::AMQFrame& f) {
}
void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
- // ostringstream os; os << f; printf("Received: %s\n", os.str().c_str()); // FIXME aconway 2008-08-08: remove
receivedNext(f);
}
@@ -82,9 +80,19 @@ void ConnectionInterceptor::deliverClosed() {
connection = 0;
}
+void ConnectionInterceptor::dirtyClose() {
+ // Not closed via cluster self-delivery but closed locally.
+ // Used for dirty cluster shutdown where active connections
+ // must be cleaned up.
+ connection = 0;
+}
+
bool ConnectionInterceptor::doOutput() {
+ // FIXME aconway 2008-08-15: this is not correct.
+ // Run in write threads so order of execution of doOutput is not determinate.
+ // Will only work reliably for in single-consumer tests.
+
if (connection->hasOutput()) {
- QPID_LOG(debug, "Intercept doOutput, call doOutputNext"); // FIXME aconway 2008-08-08: remove
cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
return doOutputNext();
}
@@ -92,13 +100,9 @@ bool ConnectionInterceptor::doOutput() {
}
void ConnectionInterceptor::deliverDoOutput() {
- if (isShadow()) {
- QPID_LOG(debug, "Shadow deliver do output, call doOutputNext"); // FIXME aconway 2008-08-08: remove
+ // FIXME aconway 2008-08-15: see comment in doOutput.
+ if (isShadow())
doOutputNext();
- }
- else {
- QPID_LOG(debug, "Primary deliver doOutput, ignore."); // FIXME aconway 2008-08-08: remove
- }
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
index a256738aeb..370572bd9d 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
@@ -48,6 +48,8 @@ class ConnectionInterceptor {
void deliverClosed();
void deliverDoOutput();
+ void dirtyClose();
+
private:
struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
void close() {}
diff --git a/qpid/cpp/src/tests/ais_check b/qpid/cpp/src/tests/ais_check
index e2d53b5870..d76841eb1d 100755
--- a/qpid/cpp/src/tests/ais_check
+++ b/qpid/cpp/src/tests/ais_check
@@ -1,4 +1,5 @@
#!/bin/sh
+srcdir=`dirname $0`
# Check AIS requirements tests if found.
id -nG | grep '\<ais\>' >/dev/null || \
@@ -31,6 +32,6 @@ with_ais_group() {
# Run the tests
srcdir=`dirname $0`
-with_ais_group ./cluster_test || ERROR=1
+with_ais_group $srcdir/run_test ./cluster_test || ERROR=1
exit $ERROR