diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 9 |
5 files changed, 43 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f5db692e92..3ced6263df 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -153,7 +153,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 820783; +const uint32_t Cluster::CLUSTER_VERSION = 834052; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -250,7 +250,9 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { assert(c->getId().getMember() == self); // Announce the connection to the cluster. if (c->isLocalClient()) - mcast.mcastControl((ClusterConnectionAnnounceBody()), c->getId()); + mcast.mcastControl(ClusterConnectionAnnounceBody(ProtocolVersion(), + c->getBrokerConnection().getSSF() ), + c->getId()); } // Called in connection thread to insert an updated shadow connection. @@ -344,7 +346,13 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { body->getMethod()->isA<ClusterUpdateOfferBody>()) ? static_cast<const ClusterUpdateOfferBody*>(body) : 0; } - + +const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) { + return (body && body->getMethod() && + body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ? + static_cast<const ClusterConnectionAnnounceBody*>(body) : 0; +} + // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { @@ -452,8 +460,13 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { } else { // New remote connection, create a shadow. std::ostringstream mgmtId; + unsigned int ssf; + const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody()); + mgmtId << id; - cp = new Connection(*this, shadowOut, mgmtId.str(), id); + ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0; + QPID_LOG(debug, *this << "new connection's ssf =" << ssf ); + cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf ); } connections.insert(ConnectionMap::value_type(id, cp)); } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 841c3b610d..2eda84ae11 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -72,9 +72,10 @@ const std::string shadowPrefix("[shadow]"); // Shadow connection -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id) + Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, + const ConnectionId& id, unsigned int ssf) : cluster(c), self(id), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), shadowPrefix+logId), expectProtocolHeader(false), + connection(&output, cluster.getBroker(), shadowPrefix+logId, ssf), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), consumerNumbering(c.getUpdateReceiver().consumerNumbering) { init(); } @@ -82,10 +83,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std: // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId member, - bool isCatchUp, bool isLink + bool isCatchUp, bool isLink, unsigned int ssf ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), connection(&output, cluster.getBroker(), isCatchUp ? shadowPrefix+logId : logId, + ssf, isLink, isCatchUp ? ++catchUpId : 0), expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 2799cc9fe1..57cca865db 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -63,10 +63,13 @@ class Connection : { public: + /** Local connection. */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink); + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink, + unsigned int ssf); /** Shadow connection. */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id); + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id, + unsigned int ssf); ~Connection(); ConnectionId getId() const { return self; } @@ -155,7 +158,7 @@ class Connection : void exchange(const std::string& encoded); void giveReadCredit(int credit); - void announce() {} // handled by Cluster. + void announce(uint32_t) {} // handled by Cluster. void abort(); void deliverClose(); diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index 4ff8b0a4a3..8f6f1d9ad5 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -36,25 +36,27 @@ namespace cluster { using namespace framing; sys::ConnectionCodec* -ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { +ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, + unsigned int ssf) { if (v == ProtocolVersion(0, 10)) - return new ConnectionCodec(v, out, id, cluster, false, false); + return new ConnectionCodec(v, out, id, cluster, false, false, ssf); else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection - return new ConnectionCodec(v, out, id, cluster, true, false); + return new ConnectionCodec(v, out, id, cluster, true, false, ssf); return 0; } // Used for outgoing Link connections sys::ConnectionCodec* -ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) { - return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true); +ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId, + unsigned int ssf) { + return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true, ssf); } ConnectionCodec::ConnectionCodec( const ProtocolVersion& v, sys::OutputControl& out, - const std::string& logId, Cluster& cluster, bool catchUp, bool isLink + const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, unsigned int ssf ) : codec(out, logId, isLink), - interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink)) + interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, ssf)) { std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 4ff738b603..74cb3c507d 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -52,12 +52,15 @@ class ConnectionCodec : public sys::ConnectionCodec { Cluster& cluster; Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {} - sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); - sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); + sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id, + unsigned int conn_ssf); + sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id, + unsigned int conn_ssf); }; ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out, - const std::string& logId, Cluster& c, bool catchUp, bool isLink); + const std::string& logId, Cluster& c, bool catchUp, bool isLink, + unsigned int ssf); ~ConnectionCodec(); // ConnectionCodec functions. |