summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ErrorCheck.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/ErrorCheck.cpp')
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.cpp41
1 files changed, 27 insertions, 14 deletions
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp
index 33e7f34766..c22ed17239 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -39,7 +39,7 @@ ErrorCheck::ErrorCheck(Cluster& c)
: cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
{}
-ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) {
copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
return o;
}
@@ -60,45 +60,58 @@ void ErrorCheck::error(
<< " (unresolved: " << unresolved << ")");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+ // If there are already frames queued up by a previous error, review
+ // them with respect to this new error.
+ for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
+ ;
}
void ErrorCheck::delivered(const EventFrame& e) {
+ FrameQueue::iterator i = frames.insert(frames.end(), e);
+ review(i);
+}
+
+// Review a frame in the queue with respect to the current error.
+ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
+ FrameQueue::iterator next = i+1;
if (isUnresolved()) {
const ClusterErrorCheckBody* errorCheck = 0;
- if (e.frame.getBody())
+ if (i->frame.getBody())
errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
- e.frame.getMethod());
+ i->frame.getMethod());
if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ next = frames.erase(i); // Drop matching error check controls
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " error " << frameSeq
- << " did not occur on " << e.getMemberId());
+ << " did not occur on " << i->getMemberId());
throw Exception("Aborted by local failure that did not occur on all replicas");
}
else { // his error is worse/same as mine.
QPID_LOG(debug, cluster << " error " << frameSeq
- << " outcome agrees with " << e.getMemberId());
- unresolved.erase(e.getMemberId());
+ << " outcome agrees with " << i->getMemberId());
+ unresolved.erase(i->getMemberId());
checkResolved();
}
}
else {
- frames.push_back(e); // Only drop matching errorCheck controls.
const ClusterConfigChangeBody* configChange = 0;
- if (e.frame.getBody())
- configChange = dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+ if (i->frame.getBody())
+ configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod());
if (configChange) {
MemberSet members(ClusterMap::decode(configChange->getCurrent()));
- MemberSet result;
+ QPID_LOG(debug, cluster << " apply config change to unresolved: "
+ << members);
+
+ MemberSet intersect;
set_intersection(members.begin(), members.end(),
unresolved.begin(), unresolved.end(),
- inserter(result, result.begin()));
- unresolved.swap(result);
+ inserter(intersect, intersect.begin()));
+ unresolved.swap(intersect);
checkResolved();
}
}
}
- else
- frames.push_back(e);
+ return next;
}
void ErrorCheck::checkResolved() {