summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp21
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp8
-rw-r--r--cpp/src/qpid/cluster/Connection.h9
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp16
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h9
5 files changed, 43 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f5db692e92..3ced6263df 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -153,7 +153,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 820783;
+const uint32_t Cluster::CLUSTER_VERSION = 834052;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -250,7 +250,9 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
assert(c->getId().getMember() == self);
// Announce the connection to the cluster.
if (c->isLocalClient())
- mcast.mcastControl((ClusterConnectionAnnounceBody()), c->getId());
+ mcast.mcastControl(ClusterConnectionAnnounceBody(ProtocolVersion(),
+ c->getBrokerConnection().getSSF() ),
+ c->getId());
}
// Called in connection thread to insert an updated shadow connection.
@@ -344,7 +346,13 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
body->getMethod()->isA<ClusterUpdateOfferBody>()) ?
static_cast<const ClusterUpdateOfferBody*>(body) : 0;
}
-
+
+const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) {
+ return (body && body->getMethod() &&
+ body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ?
+ static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
+}
+
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
@@ -452,8 +460,13 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
}
else { // New remote connection, create a shadow.
std::ostringstream mgmtId;
+ unsigned int ssf;
+ const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody());
+
mgmtId << id;
- cp = new Connection(*this, shadowOut, mgmtId.str(), id);
+ ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
+ QPID_LOG(debug, *this << "new connection's ssf =" << ssf );
+ cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf );
}
connections.insert(ConnectionMap::value_type(id, cp));
}
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 841c3b610d..2eda84ae11 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -72,9 +72,10 @@ const std::string shadowPrefix("[shadow]");
// Shadow connection
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
+ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
+ const ConnectionId& id, unsigned int ssf)
: cluster(c), self(id), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), shadowPrefix+logId), expectProtocolHeader(false),
+ connection(&output, cluster.getBroker(), shadowPrefix+logId, ssf), expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
consumerNumbering(c.getUpdateReceiver().consumerNumbering)
{ init(); }
@@ -82,10 +83,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std:
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& logId, MemberId member,
- bool isCatchUp, bool isLink
+ bool isCatchUp, bool isLink, unsigned int ssf
) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(),
isCatchUp ? shadowPrefix+logId : logId,
+ ssf,
isLink,
isCatchUp ? ++catchUpId : 0),
expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self),
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 2799cc9fe1..57cca865db 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -63,10 +63,13 @@ class Connection :
{
public:
+
/** Local connection. */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink,
+ unsigned int ssf);
/** Shadow connection. */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id);
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id,
+ unsigned int ssf);
~Connection();
ConnectionId getId() const { return self; }
@@ -155,7 +158,7 @@ class Connection :
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
- void announce() {} // handled by Cluster.
+ void announce(uint32_t) {} // handled by Cluster.
void abort();
void deliverClose();
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index 4ff8b0a4a3..8f6f1d9ad5 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -36,25 +36,27 @@ namespace cluster {
using namespace framing;
sys::ConnectionCodec*
-ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
+ unsigned int ssf) {
if (v == ProtocolVersion(0, 10))
- return new ConnectionCodec(v, out, id, cluster, false, false);
+ return new ConnectionCodec(v, out, id, cluster, false, false, ssf);
else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
- return new ConnectionCodec(v, out, id, cluster, true, false);
+ return new ConnectionCodec(v, out, id, cluster, true, false, ssf);
return 0;
}
// Used for outgoing Link connections
sys::ConnectionCodec*
-ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
- return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true);
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId,
+ unsigned int ssf) {
+ return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true, ssf);
}
ConnectionCodec::ConnectionCodec(
const ProtocolVersion& v, sys::OutputControl& out,
- const std::string& logId, Cluster& cluster, bool catchUp, bool isLink
+ const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, unsigned int ssf
) : codec(out, logId, isLink),
- interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
+ interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, ssf))
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index 4ff738b603..74cb3c507d 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -52,12 +52,15 @@ class ConnectionCodec : public sys::ConnectionCodec {
Cluster& cluster;
Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c)
: next(f), cluster(c) {}
- sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
- sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
+ sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
+ unsigned int conn_ssf);
+ sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id,
+ unsigned int conn_ssf);
};
ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out,
- const std::string& logId, Cluster& c, bool catchUp, bool isLink);
+ const std::string& logId, Cluster& c, bool catchUp, bool isLink,
+ unsigned int ssf);
~ConnectionCodec();
// ConnectionCodec functions.