diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 31 |
1 files changed, 9 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 05ab9148b5..75ce19e835 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -25,12 +25,14 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/framing/ClusterConnectionCloseBody.h" +#include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include <boost/bind.hpp> #include <boost/cast.hpp> +#include <boost/current_function.hpp> #include <algorithm> #include <iterator> #include <map> @@ -76,11 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg.join(name); notify(); - // FIXME aconway 2008-08-11: can we remove this loop? - // Dispatch till we show up in the cluster map. - while (empty()) - cpg.dispatchOne(); - // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); deliverQueue.start(poller); @@ -97,9 +94,8 @@ Cluster::~Cluster() { std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1)); } -// local connection initializes plugins void Cluster::initialize(broker::Connection& c) { - bool isLocal = &c.getOutput() != &shadowOut; + bool isLocal = c.getOutput().get() != &shadowOut; if (isLocal) localConnectionSet.insert(new ConnectionInterceptor(c, *this)); } @@ -107,10 +103,8 @@ void Cluster::initialize(broker::Connection& c) { void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. - // At this point the poller has already been shut down so - // no dispatches can occur thru the cpgDispatchHandle. - // - // FIXME aconway 2008-08-11: assert this is the cae. + // Leave is called by from Broker destructor after the poller has + // been shut down. No dispatches can occur. QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); @@ -173,13 +167,6 @@ Cluster::MemberList Cluster::getMembers() const { return result; } -// ################ HERE - leaking shadow connections. -// FIXME aconway 2008-08-11: revisit memory management for shadow -// connections, what if the Connection is closed other than via -// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow -// map, add deleted state to ConnectionInterceptor? Interceptors need -// to know about map? Check how Connections can be deleted. - ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { ShadowConnectionId id(member, remotePtr); ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); @@ -274,7 +261,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod break; } case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: { - connection->deliverDoOutput(); + ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method); + connection->deliverDoOutput(doOutput.getBytes()); break; } default: @@ -309,9 +297,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) { void Cluster::disconnect(sys::DispatchHandle& h) { h.stopWatch(); - // FIXME aconway 2008-08-11: error handling if we are disconnected. - // Kill the broker? - assert(0); + QPID_LOG(critical, "Disconnected from cluster, shutting down"); + broker.shutdown(); } }} // namespace qpid::cluster |