summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp12
1 files changed, 10 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d10e1fd458..eb6428d394 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -590,6 +590,7 @@ void Cluster::initMapCompleted(Lock& l) {
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
+ if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
state = JOINER;
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
QPID_LOG(notice, *this << " joining cluster " << name);
@@ -672,7 +673,7 @@ void Cluster::debugSnapshot(const char* prefix, Connection* connection) {
assertClusterSafe();
std::ostringstream msg;
msg << prefix;
- if (connection) msg << " " << *connection;
+ if (connection) msg << " " << connection->getId();
msg << " snapshot " << map.getFrameSeq() << ":";
AppendQueue append(msg);
broker.getQueues().eachQueue(append);
@@ -761,7 +762,10 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
<< " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
}
- if (updatee != self && url) debugSnapshot("join");
+ if (updatee != self && url) {
+ debugSnapshot("join");
+ if (mAgent) mAgent->clusterUpdate();
+ }
}
static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
@@ -830,9 +834,11 @@ void Cluster::checkUpdateIn(Lock& l) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
broker.setClusterUpdatee(false);
+ if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
debugSnapshot("initial");
+ if (mAgent) mAgent->clusterUpdate();
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update
@@ -992,10 +998,12 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
}
void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
+ QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name)
timer->deliverWakeup(name);
}
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
+ QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
timer->deliverDrop(name);
}