summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp44
1 files changed, 33 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e7bec8633a..093ca13c7a 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -109,6 +109,8 @@
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterUpdateRequestBody.h"
+#include "qpid/framing/ClusterConnectionAnnounceBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
@@ -133,7 +135,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
using namespace qpid::cluster;
-using namespace qpid::framing::cluster_connection;
+using namespace qpid::framing::cluster;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -151,6 +153,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+ void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
void shutdown() { cluster.shutdown(member, l); }
@@ -227,6 +230,10 @@ void Cluster::initialize() {
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
localConnections.insert(c);
+ assert(c->getId().getMember() == self);
+ // Announce the connection to the cluster.
+ if (c->isLocalClient())
+ mcast.mcastControl((ClusterConnectionAnnounceBody()), c->getId());
}
// Called in connection thread to insert an updated shadow connection.
@@ -388,7 +395,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
LATENCY_TRACK(LatencyScope ls(processLatency));
map.incrementFrameSeq();
QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
- ConnectionPtr connection = getConnection(e.connectionId, l);
+ ConnectionPtr connection = getConnection(e, l);
if (connection)
connection->deliveredFrame(e);
}
@@ -397,21 +404,24 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
}
// Called in deliverFrameQueue thread
-ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) {
- ConnectionPtr cp;
+ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
+ ConnectionId id = e.connectionId;
ConnectionMap::iterator i = connections.find(id);
- if (i != connections.end())
- cp = i->second;
- else {
- if(id.getMember() == self)
+ if (i != connections.end()) return i->second;
+ ConnectionPtr cp;
+ // If the frame is an announcement for a new connection, add it.
+ if (e.frame.getBody() && e.frame.getMethod() &&
+ e.frame.getMethod()->isA<ClusterConnectionAnnounceBody>())
+ {
+ if (id.getMember() == self) { // Announces one of my own
cp = localConnections.getErase(id);
- else {
- // New remote connection, create a shadow.
+ assert(cp);
+ }
+ else { // New remote connection, create a shadow.
std::ostringstream mgmtId;
mgmtId << id;
cp = new Connection(*this, shadowOut, mgmtId.str(), id);
}
- if (cp)
connections.insert(ConnectionMap::value_type(id, cp));
}
return cp;
@@ -764,4 +774,16 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
+void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&) {
+ // If we handle an errorCheck at this point (rather than in the
+ // ErrorCheck class) then we have processed succesfully past the
+ // point of the error.
+ if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
+ QPID_LOG(debug, *this << " error " << frameSeq << " did not occur locally.");
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
+ }
+}
+
+
}} // namespace qpid::cluster