diff options
author | Alan Conway <aconway@apache.org> | 2008-08-15 22:40:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-15 22:40:30 +0000 |
commit | 289fa5870d89af4e4cb1b38fa0af37e739dbe421 (patch) | |
tree | a248a70eb8204dfe675ed2d47d73934c4a217b67 | |
parent | e19070a17640bbf2e82fbca281ac8033bd54daf6 (diff) | |
download | qpid-python-289fa5870d89af4e4cb1b38fa0af37e739dbe421.tar.gz |
Fix memory leak in Cluster and enable valgrind in ais_check
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@686409 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ais_check | 3 |
5 files changed, 32 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 84edfa201d..05ab9148b5 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -87,13 +87,21 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : mcastQueue.start(poller); } -Cluster::~Cluster() {} +Cluster::~Cluster() { + for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin(); + i != shadowConnectionMap.end(); + ++i) + { + i->second->dirtyClose(); + } + std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1)); +} // local connection initializes plugins void Cluster::initialize(broker::Connection& c) { bool isLocal = &c.getOutput() != &shadowOut; if (isLocal) - new ConnectionInterceptor(c, *this); + localConnectionSet.insert(new ConnectionInterceptor(c, *this)); } void Cluster::leave() { @@ -260,6 +268,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod case CLUSTER_CONNECTION_CLOSE_METHOD_ID: { if (!connection->isLocal()) shadowConnectionMap.erase(connection->getShadowId()); + else + localConnectionSet.erase(connection); connection->deliverClosed(); break; } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 1c43bdac43..2b40193dd3 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -91,11 +91,12 @@ class Cluster : private Cpg::Handler, public RefCounted // Cluster frame handing functions void notify(const std::string& url); void connectionClose(); - + private: typedef Cpg::Id Id; typedef std::map<Id, Member> MemberMap; typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap; + typedef std::set<ConnectionInterceptor*> LocalConnectionSet; /** Message sent over the cluster. */ struct Message { @@ -154,6 +155,7 @@ class Cluster : private Cpg::Handler, public RefCounted MemberMap members; Id self; ShadowConnectionMap shadowConnectionMap; + LocalConnectionSet localConnectionSet; ShadowConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; MessageQueue deliverQueue; diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp index 81d496597a..c13651eccb 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -42,7 +42,6 @@ ConnectionInterceptor::ConnectionInterceptor( } ConnectionInterceptor::~ConnectionInterceptor() { - assert(isClosed); assert(connection == 0); } @@ -52,7 +51,6 @@ void ConnectionInterceptor::received(framing::AMQFrame& f) { } void ConnectionInterceptor::deliver(framing::AMQFrame& f) { - // ostringstream os; os << f; printf("Received: %s\n", os.str().c_str()); // FIXME aconway 2008-08-08: remove receivedNext(f); } @@ -82,9 +80,19 @@ void ConnectionInterceptor::deliverClosed() { connection = 0; } +void ConnectionInterceptor::dirtyClose() { + // Not closed via cluster self-delivery but closed locally. + // Used for dirty cluster shutdown where active connections + // must be cleaned up. + connection = 0; +} + bool ConnectionInterceptor::doOutput() { + // FIXME aconway 2008-08-15: this is not correct. + // Run in write threads so order of execution of doOutput is not determinate. + // Will only work reliably for in single-consumer tests. + if (connection->hasOutput()) { - QPID_LOG(debug, "Intercept doOutput, call doOutputNext"); // FIXME aconway 2008-08-08: remove cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this); return doOutputNext(); } @@ -92,13 +100,9 @@ bool ConnectionInterceptor::doOutput() { } void ConnectionInterceptor::deliverDoOutput() { - if (isShadow()) { - QPID_LOG(debug, "Shadow deliver do output, call doOutputNext"); // FIXME aconway 2008-08-08: remove + // FIXME aconway 2008-08-15: see comment in doOutput. + if (isShadow()) doOutputNext(); - } - else { - QPID_LOG(debug, "Primary deliver doOutput, ignore."); // FIXME aconway 2008-08-08: remove - } } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h index a256738aeb..370572bd9d 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h @@ -48,6 +48,8 @@ class ConnectionInterceptor { void deliverClosed(); void deliverDoOutput(); + void dirtyClose(); + private: struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler { void close() {} diff --git a/qpid/cpp/src/tests/ais_check b/qpid/cpp/src/tests/ais_check index e2d53b5870..d76841eb1d 100755 --- a/qpid/cpp/src/tests/ais_check +++ b/qpid/cpp/src/tests/ais_check @@ -1,4 +1,5 @@ #!/bin/sh +srcdir=`dirname $0` # Check AIS requirements tests if found. id -nG | grep '\<ais\>' >/dev/null || \ @@ -31,6 +32,6 @@ with_ais_group() { # Run the tests srcdir=`dirname $0` -with_ais_group ./cluster_test || ERROR=1 +with_ais_group $srcdir/run_test ./cluster_test || ERROR=1 exit $ERROR |