diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 21 |
1 files changed, 17 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f5db692e92..3ced6263df 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -153,7 +153,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 820783; +const uint32_t Cluster::CLUSTER_VERSION = 834052; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -250,7 +250,9 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { assert(c->getId().getMember() == self); // Announce the connection to the cluster. if (c->isLocalClient()) - mcast.mcastControl((ClusterConnectionAnnounceBody()), c->getId()); + mcast.mcastControl(ClusterConnectionAnnounceBody(ProtocolVersion(), + c->getBrokerConnection().getSSF() ), + c->getId()); } // Called in connection thread to insert an updated shadow connection. @@ -344,7 +346,13 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { body->getMethod()->isA<ClusterUpdateOfferBody>()) ? static_cast<const ClusterUpdateOfferBody*>(body) : 0; } - + +const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) { + return (body && body->getMethod() && + body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ? + static_cast<const ClusterConnectionAnnounceBody*>(body) : 0; +} + // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { @@ -452,8 +460,13 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { } else { // New remote connection, create a shadow. std::ostringstream mgmtId; + unsigned int ssf; + const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody()); + mgmtId << id; - cp = new Connection(*this, shadowOut, mgmtId.str(), id); + ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0; + QPID_LOG(debug, *this << "new connection's ssf =" << ssf ); + cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf ); } connections.insert(ConnectionMap::value_type(id, cp)); } |