diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 44 |
1 files changed, 33 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e7bec8633a..093ca13c7a 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -109,6 +109,8 @@ #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterUpdateRequestBody.h" +#include "qpid/framing/ClusterConnectionAnnounceBody.h" +#include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" @@ -133,7 +135,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; using namespace qpid::cluster; -using namespace qpid::framing::cluster_connection; +using namespace qpid::framing::cluster; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -151,6 +153,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } + void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } void shutdown() { cluster.shutdown(member, l); } @@ -227,6 +230,10 @@ void Cluster::initialize() { // Called in connection thread to insert a client connection. void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { localConnections.insert(c); + assert(c->getId().getMember() == self); + // Announce the connection to the cluster. + if (c->isLocalClient()) + mcast.mcastControl((ClusterConnectionAnnounceBody()), c->getId()); } // Called in connection thread to insert an updated shadow connection. @@ -388,7 +395,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { LATENCY_TRACK(LatencyScope ls(processLatency)); map.incrementFrameSeq(); QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); - ConnectionPtr connection = getConnection(e.connectionId, l); + ConnectionPtr connection = getConnection(e, l); if (connection) connection->deliveredFrame(e); } @@ -397,21 +404,24 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { } // Called in deliverFrameQueue thread -ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) { - ConnectionPtr cp; +ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { + ConnectionId id = e.connectionId; ConnectionMap::iterator i = connections.find(id); - if (i != connections.end()) - cp = i->second; - else { - if(id.getMember() == self) + if (i != connections.end()) return i->second; + ConnectionPtr cp; + // If the frame is an announcement for a new connection, add it. + if (e.frame.getBody() && e.frame.getMethod() && + e.frame.getMethod()->isA<ClusterConnectionAnnounceBody>()) + { + if (id.getMember() == self) { // Announces one of my own cp = localConnections.getErase(id); - else { - // New remote connection, create a shadow. + assert(cp); + } + else { // New remote connection, create a shadow. std::ostringstream mgmtId; mgmtId << id; cp = new Connection(*this, shadowOut, mgmtId.str(), id); } - if (cp) connections.insert(ConnectionMap::value_type(id, cp)); } return cp; @@ -764,4 +774,16 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } +void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&) { + // If we handle an errorCheck at this point (rather than in the + // ErrorCheck class) then we have processed succesfully past the + // point of the error. + if (state >= CATCHUP && type != ERROR_TYPE_NONE) { + QPID_LOG(debug, *this << " error " << frameSeq << " did not occur locally."); + mcast.mcastControl( + ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self); + } +} + + }} // namespace qpid::cluster |