diff options
author | Alan Conway <aconway@apache.org> | 2009-07-10 15:42:36 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-07-10 15:42:36 +0000 |
commit | 82d0880a9a9a65029d35422ad2dff675b867bb6c (patch) | |
tree | b974a2a89e13000cd9ff680b05a74df7bcf09ea5 /cpp/src | |
parent | 072ce948be1894a1fbaccf4accb0eb9788275bd9 (diff) | |
download | qpid-python-82d0880a9a9a65029d35422ad2dff675b867bb6c.tar.gz |
Fix cluster handling of multiple errors.
If an error occured while there were frames on the error queue from a
previous error, the enqueued frames were not being processed for the
new error, which could lead to error-check or config-change frames
being missed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@792991 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.h | 12 |
3 files changed, 37 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 1984b4f4df..92859cc641 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -440,7 +440,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { connection->deliveredFrame(e); } else - QPID_LOG(critical, *this << " FIXME DROP (no connection): " << e); + QPID_LOG(debug, *this << " DROP (no connection): " << e); } else // Drop connection frames while state < CATCHUP QPID_LOG(trace, *this << " DROP (joining): " << e); @@ -534,6 +534,7 @@ void Cluster::setReady(Lock&) { void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { bool memberChange = map.configChange(current); + QPID_LOG(debug, *this << " applied config change: " << map); if (state == LEFT) return; if (!map.isAlive(self)) { // Final config change. diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp index 33e7f34766..c22ed17239 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -39,7 +39,7 @@ ErrorCheck::ErrorCheck(Cluster& c) : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) {} -ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) { +ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) { copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " ")); return o; } @@ -60,45 +60,58 @@ void ErrorCheck::error( << " (unresolved: " << unresolved << ")"); mcast.mcastControl( ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember()); + // If there are already frames queued up by a previous error, review + // them with respect to this new error. + for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i)) + ; } void ErrorCheck::delivered(const EventFrame& e) { + FrameQueue::iterator i = frames.insert(frames.end(), e); + review(i); +} + +// Review a frame in the queue with respect to the current error. +ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) { + FrameQueue::iterator next = i+1; if (isUnresolved()) { const ClusterErrorCheckBody* errorCheck = 0; - if (e.frame.getBody()) + if (i->frame.getBody()) errorCheck = dynamic_cast<const ClusterErrorCheckBody*>( - e.frame.getMethod()); + i->frame.getMethod()); if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error + next = frames.erase(i); // Drop matching error check controls if (errorCheck->getType() < type) { // my error is worse than his QPID_LOG(critical, cluster << " error " << frameSeq - << " did not occur on " << e.getMemberId()); + << " did not occur on " << i->getMemberId()); throw Exception("Aborted by local failure that did not occur on all replicas"); } else { // his error is worse/same as mine. QPID_LOG(debug, cluster << " error " << frameSeq - << " outcome agrees with " << e.getMemberId()); - unresolved.erase(e.getMemberId()); + << " outcome agrees with " << i->getMemberId()); + unresolved.erase(i->getMemberId()); checkResolved(); } } else { - frames.push_back(e); // Only drop matching errorCheck controls. const ClusterConfigChangeBody* configChange = 0; - if (e.frame.getBody()) - configChange = dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod()); + if (i->frame.getBody()) + configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod()); if (configChange) { MemberSet members(ClusterMap::decode(configChange->getCurrent())); - MemberSet result; + QPID_LOG(debug, cluster << " apply config change to unresolved: " + << members); + + MemberSet intersect; set_intersection(members.begin(), members.end(), unresolved.begin(), unresolved.end(), - inserter(result, result.begin())); - unresolved.swap(result); + inserter(intersect, intersect.begin())); + unresolved.swap(intersect); checkResolved(); } } } - else - frames.push_back(e); + return next; } void ErrorCheck::checkResolved() { diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h index d303ecea65..236b820116 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.h +++ b/cpp/src/qpid/cluster/ErrorCheck.h @@ -41,8 +41,8 @@ class Connection; /** * Error checking logic. * - * When an error occurs stop processing frames and queue them until we - * can determine if all nodes experienced the error. If not, we shut down. + * When an error occurs queue up frames until we can determine if all + * nodes experienced the error. If not, we shut down. */ class ErrorCheck { @@ -59,18 +59,22 @@ class ErrorCheck /** Called when a frame is delivered */ void delivered(const EventFrame&); + /**@pre canProcess **/ EventFrame getNext(); bool canProcess() const; + bool isUnresolved() const; private: + typedef std::deque<EventFrame> FrameQueue; + FrameQueue::iterator review(const FrameQueue::iterator&); void checkResolved(); Cluster& cluster; Multicaster& mcast; - std::deque<EventFrame> frames; - std::set<MemberId> unresolved; + FrameQueue frames; + MemberSet unresolved; uint64_t frameSeq; ErrorType type; Connection* connection; |