summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ErrorCheck.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/ErrorCheck.cpp')
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.cpp80
1 files changed, 51 insertions, 29 deletions
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp
index 0a16d492e4..35be055d06 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -45,11 +45,11 @@ ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) {
}
void ErrorCheck::error(
- Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg)
+ Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg)
{
// Detected a local error, inform cluster and set error state.
assert(t != ERROR_TYPE_NONE); // Must be an error.
- assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+ assert(type == ERROR_TYPE_NONE); // Can't be called when already in an error state.
type = t;
unresolved = ms;
frameSeq = seq;
@@ -59,7 +59,7 @@ void ErrorCheck::error(
<< " error " << frameSeq << " on " << c << ": " << msg
<< " must be resolved with: " << unresolved);
mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+ ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
// If there are already frames queued up by a previous error, review
// them with respect to this new error.
for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
@@ -74,41 +74,52 @@ void ErrorCheck::delivered(const EventFrame& e) {
// Review a frame in the queue with respect to the current error.
ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
FrameQueue::iterator next = i+1;
- if (isUnresolved()) {
- const ClusterErrorCheckBody* errorCheck = 0;
- if (i->frame.getBody())
- errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
- i->frame.getMethod());
- if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ if(!isUnresolved() || !i->frame.getBody() || !i->frame.getMethod())
+ return next; // Only interested in control frames while unresolved.
+ const AMQMethodBody* method = i->frame.getMethod();
+ if (method->isA<const ClusterErrorCheckBody>()) {
+ const ClusterErrorCheckBody* errorCheck =
+ static_cast<const ClusterErrorCheckBody*>(method);
+
+ if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error
next = frames.erase(i); // Drop matching error check controls
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " error " << frameSeq
<< " did not occur on " << i->getMemberId());
- throw Exception("Aborted by failure that did not occur on all replicas");
+ throw Exception(QPID_MSG("Error " << frameSeq
+ << " did not occur on all members"));
}
else { // his error is worse/same as mine.
- QPID_LOG(notice, cluster << " error " << frameSeq
+ QPID_LOG(info, cluster << " error " << frameSeq
<< " resolved with " << i->getMemberId());
unresolved.erase(i->getMemberId());
checkResolved();
}
}
- else {
- const ClusterConfigChangeBody* configChange = 0;
- if (i->frame.getBody())
- configChange = dynamic_cast<const ClusterConfigChangeBody*>(
- i->frame.getMethod());
- if (configChange) {
- MemberSet members(ClusterMap::decode(configChange->getCurrent()));
- QPID_LOG(debug, cluster << " apply config change to unresolved: "
- << members);
- MemberSet intersect;
- set_intersection(members.begin(), members.end(),
- unresolved.begin(), unresolved.end(),
- inserter(intersect, intersect.begin()));
- unresolved.swap(intersect);
- checkResolved();
- }
+ else if (errorCheck->getFrameSeq() < frameSeq && errorCheck->getType() != NONE
+ && i->connectionId.getMember() != cluster.getId())
+ {
+ // This error occured before the current error so we
+ // have processed past it.
+ next = frames.erase(i); // Drop the error check control
+ respondNone(i->connectionId.getMember(), errorCheck->getType(),
+ errorCheck->getFrameSeq());
+ }
+ // if errorCheck->getFrameSeq() > frameSeq then leave it in the queue.
+ }
+ else if (method->isA<const ClusterConfigChangeBody>()) {
+ const ClusterConfigChangeBody* configChange =
+ static_cast<const ClusterConfigChangeBody*>(method);
+ if (configChange) {
+ MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ QPID_LOG(debug, cluster << " apply config change to error "
+ << frameSeq << ": " << members);
+ MemberSet intersect;
+ set_intersection(members.begin(), members.end(),
+ unresolved.begin(), unresolved.end(),
+ inserter(intersect, intersect.begin()));
+ unresolved.swap(intersect);
+ checkResolved();
}
}
return next;
@@ -117,10 +128,10 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator&
void ErrorCheck::checkResolved() {
if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
type = ERROR_TYPE_NONE;
- QPID_LOG(notice, cluster << " error " << frameSeq << " resolved.");
+ QPID_LOG(info, cluster << " error " << frameSeq << " resolved.");
}
else
- QPID_LOG(notice, cluster << " error " << frameSeq
+ QPID_LOG(info, cluster << " error " << frameSeq
<< " must be resolved with " << unresolved);
}
@@ -131,4 +142,15 @@ EventFrame ErrorCheck::getNext() {
return e;
}
+void ErrorCheck::respondNone(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq) {
+ // Don't respond to non-errors or to my own errors.
+ if (type == ERROR_TYPE_NONE || from == cluster.getId())
+ return;
+ QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally.");
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq),
+ cluster.getId()
+ );
+}
+
}} // namespace qpid::cluster