summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-06-18 21:25:00 +0000
committerAlan Conway <aconway@apache.org>2009-06-18 21:25:00 +0000
commit1ab0573accff3416b1a22ac69eacb57c63bb69f0 (patch)
tree5c6146f277426c6e42a1e1d19528cf16da5cab0a
parent0ac0ba5e1d146a133abbd1ea0ddcabe6d25ab987 (diff)
downloadqpid-python-1ab0573accff3416b1a22ac69eacb57c63bb69f0.tar.gz
Make error-check a cluster-connection control rather than a cluster control.
Fixes bug if an error occurs during update. As cluster controls, error-checks were being processed out of sequence with the connection data they referred to. Making them connection controls ensures they are processed in the proper order. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@786294 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp17
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp17
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/ErrorCheck.h2
-rw-r--r--qpid/cpp/xml/cluster.xml27
7 files changed, 41 insertions, 36 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index a472287a35..e7bec8633a 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -105,7 +105,6 @@
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
-#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -134,7 +133,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
using namespace qpid::cluster;
-using namespace qpid::framing::cluster;
+using namespace qpid::framing::cluster_connection;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -152,7 +151,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
- void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
+
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -765,16 +764,4 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
-void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, Lock&) {
- // If we receive an errorCheck here, it's because we have processed past the point
- // of the error so respond with ERROR_TYPE_NONE
- assert(map.getFrameSeq() >= frameSeq);
- if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE.
- QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur locally");
- mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(),
- framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
- }
-}
-
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index c6b5f8499c..44d57dfaf5 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -144,7 +144,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
- void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&);
void shutdown(const MemberId&, Lock&);
// Helper functions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 2db8879eb5..42cb9556fb 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -38,6 +38,7 @@
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
+#include "qpid/framing/ClusterConnectionErrorCheckBody.h"
#include "qpid/log/Statement.h"
#include <boost/current_function.hpp>
@@ -54,7 +55,7 @@ namespace qpid {
namespace cluster {
using namespace framing;
-using namespace framing::cluster;
+using namespace framing::cluster_connection;
qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
@@ -444,5 +445,19 @@ void Connection::connectionError(const std::string& ) {
cluster.flagError(*this, ERROR_TYPE_CONNECTION);
}
+void Connection::errorCheck(uint8_t type, uint64_t frameSeq) {
+ // If we handle an errorCheck at this point (rather than in the
+ // ErrorCheck class) then we have processed succesfully past the
+ // point of the error so respond with ERROR_TYPE_NONE
+ if (type != ERROR_TYPE_NONE) { // Don't respond to NONE.
+ QPID_LOG(debug, cluster << " error " << frameSeq << " on " << *this
+ << " did not occur locally.");
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionErrorCheckBody(
+ ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
+ }
+}
+
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 0b7c151e8a..7f75d1e3dd 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -152,6 +152,7 @@ class Connection :
void giveReadCredit(int credit);
void abort();
void deliverClose();
+ void errorCheck(uint8_t type, uint64_t frameSeq);
OutputInterceptor& getOutput() { return output; }
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
index 87a7bb914b..9c2ba9c61a 100644
--- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -22,7 +22,7 @@
#include "EventFrame.h"
#include "ClusterMap.h"
#include "Cluster.h"
-#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterConnectionErrorCheckBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/log/Statement.h"
@@ -33,7 +33,7 @@ namespace cluster {
using namespace std;
using namespace framing;
-using namespace framing::cluster;
+using namespace framing::cluster_connection;
ErrorCheck::ErrorCheck(Cluster& c)
: cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
@@ -55,14 +55,16 @@ void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet
connection = &c;
QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
<< " error " << frameSeq << " unresolved: " << unresolved);
- mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
+ mcast.mcastControl(
+ ClusterConnectionErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId());
}
void ErrorCheck::delivered(const EventFrame& e) {
if (isUnresolved()) {
- const ClusterErrorCheckBody* errorCheck = 0;
+ const ClusterConnectionErrorCheckBody* errorCheck = 0;
if (e.frame.getBody())
- errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod());
+ errorCheck = dynamic_cast<const ClusterConnectionErrorCheckBody*>(
+ e.frame.getMethod());
if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
index 97b5f2bffd..606a959447 100644
--- a/qpid/cpp/src/qpid/cluster/ErrorCheck.h
+++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h
@@ -48,7 +48,7 @@ class ErrorCheck
{
public:
typedef std::set<MemberId> MemberSet;
- typedef framing::cluster::ErrorType ErrorType;
+ typedef framing::cluster_connection::ErrorType ErrorType;
ErrorCheck(Cluster&);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 7ca3dc862f..8b1d47e56e 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -48,19 +48,6 @@
<field name="id" type="uint64"/>
</control>
- <domain name="error-type" type="uint8" label="Types of error">
- <enum>
- <choice name="none" value="0"/>
- <choice name="session" value="1"/>
- <choice name="connection" value="2"/>
- </enum>
- </domain>
-
- <control name="error-check" code="0x13">
- <field name="type" type="error-type"/>
- <field name="frame-seq" type="uint64"/>
- </control>
-
<control name="shutdown" code="0x20" label="Shut down entire cluster"/>
</class>
@@ -80,6 +67,20 @@
<field name="limit" type="uint32"/>
</control>
+ <domain name="error-type" type="uint8" label="Types of error">
+ <enum>
+ <choice name="none" value="0"/>
+ <choice name="session" value="1"/>
+ <choice name="connection" value="2"/>
+ </enum>
+ </domain>
+
+ <!-- Check for error consistency across the cluster -->
+ <control name="error-check" code="0x4">
+ <field name="type" type="error-type"/>
+ <field name="frame-seq" type="uint64"/>
+ </control>
+
<!-- Update controls. Sent to a new broker in joining mode.
A connection is updateed as followed:
- open as a normal connection.