summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp3
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.cpp41
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.h12
3 files changed, 37 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 1984b4f4df..92859cc641 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -440,7 +440,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
connection->deliveredFrame(e);
}
else
- QPID_LOG(critical, *this << " FIXME DROP (no connection): " << e);
+ QPID_LOG(debug, *this << " DROP (no connection): " << e);
}
else // Drop connection frames while state < CATCHUP
QPID_LOG(trace, *this << " DROP (joining): " << e);
@@ -534,6 +534,7 @@ void Cluster::setReady(Lock&) {
void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
bool memberChange = map.configChange(current);
+ QPID_LOG(debug, *this << " applied config change: " << map);
if (state == LEFT) return;
if (!map.isAlive(self)) { // Final config change.
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp
index 33e7f34766..c22ed17239 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -39,7 +39,7 @@ ErrorCheck::ErrorCheck(Cluster& c)
: cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
{}
-ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) {
copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
return o;
}
@@ -60,45 +60,58 @@ void ErrorCheck::error(
<< " (unresolved: " << unresolved << ")");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+ // If there are already frames queued up by a previous error, review
+ // them with respect to this new error.
+ for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
+ ;
}
void ErrorCheck::delivered(const EventFrame& e) {
+ FrameQueue::iterator i = frames.insert(frames.end(), e);
+ review(i);
+}
+
+// Review a frame in the queue with respect to the current error.
+ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
+ FrameQueue::iterator next = i+1;
if (isUnresolved()) {
const ClusterErrorCheckBody* errorCheck = 0;
- if (e.frame.getBody())
+ if (i->frame.getBody())
errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
- e.frame.getMethod());
+ i->frame.getMethod());
if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ next = frames.erase(i); // Drop matching error check controls
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " error " << frameSeq
- << " did not occur on " << e.getMemberId());
+ << " did not occur on " << i->getMemberId());
throw Exception("Aborted by local failure that did not occur on all replicas");
}
else { // his error is worse/same as mine.
QPID_LOG(debug, cluster << " error " << frameSeq
- << " outcome agrees with " << e.getMemberId());
- unresolved.erase(e.getMemberId());
+ << " outcome agrees with " << i->getMemberId());
+ unresolved.erase(i->getMemberId());
checkResolved();
}
}
else {
- frames.push_back(e); // Only drop matching errorCheck controls.
const ClusterConfigChangeBody* configChange = 0;
- if (e.frame.getBody())
- configChange = dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+ if (i->frame.getBody())
+ configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod());
if (configChange) {
MemberSet members(ClusterMap::decode(configChange->getCurrent()));
- MemberSet result;
+ QPID_LOG(debug, cluster << " apply config change to unresolved: "
+ << members);
+
+ MemberSet intersect;
set_intersection(members.begin(), members.end(),
unresolved.begin(), unresolved.end(),
- inserter(result, result.begin()));
- unresolved.swap(result);
+ inserter(intersect, intersect.begin()));
+ unresolved.swap(intersect);
checkResolved();
}
}
}
- else
- frames.push_back(e);
+ return next;
}
void ErrorCheck::checkResolved() {
diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h
index d303ecea65..236b820116 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.h
+++ b/cpp/src/qpid/cluster/ErrorCheck.h
@@ -41,8 +41,8 @@ class Connection;
/**
* Error checking logic.
*
- * When an error occurs stop processing frames and queue them until we
- * can determine if all nodes experienced the error. If not, we shut down.
+ * When an error occurs queue up frames until we can determine if all
+ * nodes experienced the error. If not, we shut down.
*/
class ErrorCheck
{
@@ -59,18 +59,22 @@ class ErrorCheck
/** Called when a frame is delivered */
void delivered(const EventFrame&);
+ /**@pre canProcess **/
EventFrame getNext();
bool canProcess() const;
+
bool isUnresolved() const;
private:
+ typedef std::deque<EventFrame> FrameQueue;
+ FrameQueue::iterator review(const FrameQueue::iterator&);
void checkResolved();
Cluster& cluster;
Multicaster& mcast;
- std::deque<EventFrame> frames;
- std::set<MemberId> unresolved;
+ FrameQueue frames;
+ MemberSet unresolved;
uint64_t frameSeq;
ErrorType type;
Connection* connection;