summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp21
1 files changed, 17 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f5db692e92..3ced6263df 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -153,7 +153,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 820783;
+const uint32_t Cluster::CLUSTER_VERSION = 834052;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -250,7 +250,9 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
assert(c->getId().getMember() == self);
// Announce the connection to the cluster.
if (c->isLocalClient())
- mcast.mcastControl((ClusterConnectionAnnounceBody()), c->getId());
+ mcast.mcastControl(ClusterConnectionAnnounceBody(ProtocolVersion(),
+ c->getBrokerConnection().getSSF() ),
+ c->getId());
}
// Called in connection thread to insert an updated shadow connection.
@@ -344,7 +346,13 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
body->getMethod()->isA<ClusterUpdateOfferBody>()) ?
static_cast<const ClusterUpdateOfferBody*>(body) : 0;
}
-
+
+const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) {
+ return (body && body->getMethod() &&
+ body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ?
+ static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
+}
+
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
@@ -452,8 +460,13 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
}
else { // New remote connection, create a shadow.
std::ostringstream mgmtId;
+ unsigned int ssf;
+ const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody());
+
mgmtId << id;
- cp = new Connection(*this, shadowOut, mgmtId.str(), id);
+ ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
+ QPID_LOG(debug, *this << "new connection's ssf =" << ssf );
+ cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf );
}
connections.insert(ConnectionMap::value_type(id, cp));
}