summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp33
1 files changed, 29 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7dd8c7e62c..3a6c902d29 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -474,6 +474,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
void Cluster::processFrame(const EventFrame& e, Lock& l) {
+ map.incrementFrameSeq();
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -481,7 +482,6 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- map.incrementFrameSeq();
ConnectionPtr connection = getConnection(e, l);
if (connection) {
QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
@@ -578,7 +578,7 @@ void Cluster::initMapCompleted(Lock& l) {
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (elders.empty())
- becomeElder();
+ becomeElder(l);
else {
broker.getLinks().setPassive(true);
broker.getQueueEvents().disable();
@@ -635,11 +635,11 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
if (state >= CATCHUP && memberChange) {
memberUpdate(l);
- if (elders.empty()) becomeElder();
+ if (elders.empty()) becomeElder(l);
}
}
-void Cluster::becomeElder() {
+void Cluster::becomeElder(Lock&) {
if (elder) return; // We were already the elder.
// We are the oldest, reactive links if necessary
QPID_LOG(info, *this << " became the elder, active for links.");
@@ -656,6 +656,29 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) {
}
}
+namespace {
+struct AppendQueue {
+ ostream* os;
+ AppendQueue(ostream& o) : os(&o) {}
+ void operator()(const boost::shared_ptr<broker::Queue>& q) {
+ (*os) << " " << q->getName() << "=" << q->getMessageCount();
+ }
+};
+} // namespace
+
+// Log a snapshot of broker state, used for debugging inconsistency problems.
+// May only be called in deliver thread.
+void Cluster::debugSnapshot(const char* prefix, Connection* connection) {
+ assertClusterSafe();
+ std::ostringstream msg;
+ msg << prefix;
+ if (connection) msg << " " << *connection;
+ msg << " snapshot " << map.getFrameSeq() << ":";
+ AppendQueue append(msg);
+ broker.getQueues().eachQueue(append);
+ QPID_LOG(trace, msg.str());
+}
+
// Called from Broker::~Broker when broker is shut down. At this
// point we know the poller has stopped so no poller callbacks will be
// invoked. We must ensure that CPG has also shut down so no CPG
@@ -738,6 +761,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
<< " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
}
+ if (updatee != self && url) debugSnapshot("join");
}
static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
@@ -808,6 +832,7 @@ void Cluster::checkUpdateIn(Lock& l) {
broker.setClusterUpdatee(false);
discarding = false; // ok to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
+ debugSnapshot("initial");
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update