summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp31
1 files changed, 9 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 05ab9148b5..75ce19e835 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -25,12 +25,14 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/framing/ClusterConnectionCloseBody.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
+#include <boost/current_function.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -76,11 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
cpg.join(name);
notify();
- // FIXME aconway 2008-08-11: can we remove this loop?
- // Dispatch till we show up in the cluster map.
- while (empty())
- cpg.dispatchOne();
-
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
deliverQueue.start(poller);
@@ -97,9 +94,8 @@ Cluster::~Cluster() {
std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
}
-// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
- bool isLocal = &c.getOutput() != &shadowOut;
+ bool isLocal = c.getOutput().get() != &shadowOut;
if (isLocal)
localConnectionSet.insert(new ConnectionInterceptor(c, *this));
}
@@ -107,10 +103,8 @@ void Cluster::initialize(broker::Connection& c) {
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
- // At this point the poller has already been shut down so
- // no dispatches can occur thru the cpgDispatchHandle.
- //
- // FIXME aconway 2008-08-11: assert this is the cae.
+ // Leave is called by from Broker destructor after the poller has
+ // been shut down. No dispatches can occur.
QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
@@ -173,13 +167,6 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
-// ################ HERE - leaking shadow connections.
-// FIXME aconway 2008-08-11: revisit memory management for shadow
-// connections, what if the Connection is closed other than via
-// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow
-// map, add deleted state to ConnectionInterceptor? Interceptors need
-// to know about map? Check how Connections can be deleted.
-
ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
ShadowConnectionId id(member, remotePtr);
ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
@@ -274,7 +261,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod
break;
}
case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
- connection->deliverDoOutput();
+ ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
+ connection->deliverDoOutput(doOutput.getBytes());
break;
}
default:
@@ -309,9 +297,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) {
void Cluster::disconnect(sys::DispatchHandle& h) {
h.stopWatch();
- // FIXME aconway 2008-08-11: error handling if we are disconnected.
- // Kill the broker?
- assert(0);
+ QPID_LOG(critical, "Disconnected from cluster, shutting down");
+ broker.shutdown();
}
}} // namespace qpid::cluster