diff options
Diffstat (limited to 'cpp/src/qpid/cluster/ErrorCheck.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.cpp | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp index 33e7f34766..c22ed17239 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -39,7 +39,7 @@ ErrorCheck::ErrorCheck(Cluster& c) : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) {} -ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) { +ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) { copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " ")); return o; } @@ -60,45 +60,58 @@ void ErrorCheck::error( << " (unresolved: " << unresolved << ")"); mcast.mcastControl( ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember()); + // If there are already frames queued up by a previous error, review + // them with respect to this new error. + for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i)) + ; } void ErrorCheck::delivered(const EventFrame& e) { + FrameQueue::iterator i = frames.insert(frames.end(), e); + review(i); +} + +// Review a frame in the queue with respect to the current error. +ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) { + FrameQueue::iterator next = i+1; if (isUnresolved()) { const ClusterErrorCheckBody* errorCheck = 0; - if (e.frame.getBody()) + if (i->frame.getBody()) errorCheck = dynamic_cast<const ClusterErrorCheckBody*>( - e.frame.getMethod()); + i->frame.getMethod()); if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error + next = frames.erase(i); // Drop matching error check controls if (errorCheck->getType() < type) { // my error is worse than his QPID_LOG(critical, cluster << " error " << frameSeq - << " did not occur on " << e.getMemberId()); + << " did not occur on " << i->getMemberId()); throw Exception("Aborted by local failure that did not occur on all replicas"); } else { // his error is worse/same as mine. QPID_LOG(debug, cluster << " error " << frameSeq - << " outcome agrees with " << e.getMemberId()); - unresolved.erase(e.getMemberId()); + << " outcome agrees with " << i->getMemberId()); + unresolved.erase(i->getMemberId()); checkResolved(); } } else { - frames.push_back(e); // Only drop matching errorCheck controls. const ClusterConfigChangeBody* configChange = 0; - if (e.frame.getBody()) - configChange = dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod()); + if (i->frame.getBody()) + configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod()); if (configChange) { MemberSet members(ClusterMap::decode(configChange->getCurrent())); - MemberSet result; + QPID_LOG(debug, cluster << " apply config change to unresolved: " + << members); + + MemberSet intersect; set_intersection(members.begin(), members.end(), unresolved.begin(), unresolved.end(), - inserter(result, result.begin())); - unresolved.swap(result); + inserter(intersect, intersect.begin())); + unresolved.swap(intersect); checkResolved(); } } } - else - frames.push_back(e); + return next; } void ErrorCheck::checkResolved() { |