diff options
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ErrorCheck.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ErrorCheck.h | 2 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 27 |
7 files changed, 41 insertions, 36 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index a472287a35..e7bec8633a 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -105,7 +105,6 @@ #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" -#include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" @@ -134,7 +133,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; using namespace qpid::cluster; -using namespace qpid::framing::cluster; +using namespace qpid::framing::cluster_connection; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -152,7 +151,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } - void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); } + void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -765,16 +764,4 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } -void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, Lock&) { - // If we receive an errorCheck here, it's because we have processed past the point - // of the error so respond with ERROR_TYPE_NONE - assert(map.getFrameSeq() >= frameSeq); - if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE. - QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur locally"); - mcast.mcastControl( - ClusterErrorCheckBody(ProtocolVersion(), - framing::cluster::ERROR_TYPE_NONE, frameSeq), self); - } -} - }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index c6b5f8499c..44d57dfaf5 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -144,7 +144,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); - void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&); void shutdown(const MemberId&, Lock&); // Helper functions diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 2db8879eb5..42cb9556fb 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -38,6 +38,7 @@ #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" +#include "qpid/framing/ClusterConnectionErrorCheckBody.h" #include "qpid/log/Statement.h" #include <boost/current_function.hpp> @@ -54,7 +55,7 @@ namespace qpid { namespace cluster { using namespace framing; -using namespace framing::cluster; +using namespace framing::cluster_connection; qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); @@ -444,5 +445,19 @@ void Connection::connectionError(const std::string& ) { cluster.flagError(*this, ERROR_TYPE_CONNECTION); } +void Connection::errorCheck(uint8_t type, uint64_t frameSeq) { + // If we handle an errorCheck at this point (rather than in the + // ErrorCheck class) then we have processed succesfully past the + // point of the error so respond with ERROR_TYPE_NONE + if (type != ERROR_TYPE_NONE) { // Don't respond to NONE. + QPID_LOG(debug, cluster << " error " << frameSeq << " on " << *this + << " did not occur locally."); + cluster.getMulticast().mcastControl( + ClusterConnectionErrorCheckBody( + ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self); + } +} + + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 0b7c151e8a..7f75d1e3dd 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -152,6 +152,7 @@ class Connection : void giveReadCredit(int credit); void abort(); void deliverClose(); + void errorCheck(uint8_t type, uint64_t frameSeq); OutputInterceptor& getOutput() { return output; } diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp index 87a7bb914b..9c2ba9c61a 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -22,7 +22,7 @@ #include "EventFrame.h" #include "ClusterMap.h" #include "Cluster.h" -#include "qpid/framing/ClusterErrorCheckBody.h" +#include "qpid/framing/ClusterConnectionErrorCheckBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/log/Statement.h" @@ -33,7 +33,7 @@ namespace cluster { using namespace std; using namespace framing; -using namespace framing::cluster; +using namespace framing::cluster_connection; ErrorCheck::ErrorCheck(Cluster& c) : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) @@ -55,14 +55,16 @@ void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet connection = &c; QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection") << " error " << frameSeq << " unresolved: " << unresolved); - mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId()); + mcast.mcastControl( + ClusterConnectionErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId()); } void ErrorCheck::delivered(const EventFrame& e) { if (isUnresolved()) { - const ClusterErrorCheckBody* errorCheck = 0; + const ClusterConnectionErrorCheckBody* errorCheck = 0; if (e.frame.getBody()) - errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod()); + errorCheck = dynamic_cast<const ClusterConnectionErrorCheckBody*>( + e.frame.getMethod()); if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error if (errorCheck->getType() < type) { // my error is worse than his QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId()); diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h index 97b5f2bffd..606a959447 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.h +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h @@ -48,7 +48,7 @@ class ErrorCheck { public: typedef std::set<MemberId> MemberSet; - typedef framing::cluster::ErrorType ErrorType; + typedef framing::cluster_connection::ErrorType ErrorType; ErrorCheck(Cluster&); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 7ca3dc862f..8b1d47e56e 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -48,19 +48,6 @@ <field name="id" type="uint64"/> </control> - <domain name="error-type" type="uint8" label="Types of error"> - <enum> - <choice name="none" value="0"/> - <choice name="session" value="1"/> - <choice name="connection" value="2"/> - </enum> - </domain> - - <control name="error-check" code="0x13"> - <field name="type" type="error-type"/> - <field name="frame-seq" type="uint64"/> - </control> - <control name="shutdown" code="0x20" label="Shut down entire cluster"/> </class> @@ -80,6 +67,20 @@ <field name="limit" type="uint32"/> </control> + <domain name="error-type" type="uint8" label="Types of error"> + <enum> + <choice name="none" value="0"/> + <choice name="session" value="1"/> + <choice name="connection" value="2"/> + </enum> + </domain> + + <!-- Check for error consistency across the cluster --> + <control name="error-check" code="0x4"> + <field name="type" type="error-type"/> + <field name="frame-seq" type="uint64"/> + </control> + <!-- Update controls. Sent to a new broker in joining mode. A connection is updateed as followed: - open as a normal connection. |