diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 27 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 2 |
8 files changed, 65 insertions, 13 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 0a40493350..7b40328f1c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -323,10 +323,20 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& state = NEWBIE; QPID_LOG(info, *this << " joining cluster: " << map); mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId); + ClusterMap::Set members = map.getAlive(); + members.erase(myId); + myElders = members; + broker.getLinks().setPassive(true); } } - else if (state >= READY && memberChange) + else if (state >= READY && memberChange) { memberUpdate(l); + myElders = ClusterMap::intersection(myElders, map.getAlive()); + if (myElders.empty()) { + //assume we are oldest, reactive links if necessary + broker.getLinks().setPassive(false); + } + } } @@ -496,6 +506,8 @@ void Cluster::memberUpdate(Lock& l) { } lastSize = size; + // + if (mgmtObject) { mgmtObject->set_clusterSize(size); string urlstr; diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index c93cd21876..8d235c7caf 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -184,6 +184,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; + ClusterMap::Set myElders; // Thread safe members Multicaster mcast; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index 873f0be928..cc9ea29093 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -114,6 +114,10 @@ std::vector<Url> ClusterMap::memberUrls() const { return urls; } +ClusterMap::Set ClusterMap::getAlive() const { + return alive; +} + std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { std::ostream_iterator<MemberId> oi(o); std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1)); @@ -170,4 +174,13 @@ boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& return boost::optional<Url>(); } +ClusterMap::Set ClusterMap::intersection(const ClusterMap::Set& a, const ClusterMap::Set& b) +{ + Set intersection; + std::set_intersection(a.begin(), a.end(), + b.begin(), b.end(), + std::inserter(intersection, intersection.begin())); + return intersection; + +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 5c1981269f..507fee9a72 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -76,6 +76,7 @@ class ClusterMap { size_t aliveCount() const { return alive.size(); } size_t memberCount() const { return members.size(); } std::vector<Url> memberUrls() const; + Set getAlive() const; bool dumpRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ @@ -84,6 +85,10 @@ class ClusterMap { /**@return true If this is a new member */ bool ready(const MemberId& id, const Url&); + /** + * Utility method to return intersection of two member sets + */ + static Set intersection(const Set& a, const Set& b); private: Url getUrl(const Map& map, const MemberId& id); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 3988abd491..ac4b9dcdf2 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -62,14 +62,15 @@ NoOpConnectionOutputHandler Connection::discardHandler; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), readCredit(0) + connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false) { init(); } // Local connections Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId, bool isCatchUp) + const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), readCredit(0) + connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0), + expectProtocolHeader(isLink) { init(); } void Connection::init() { @@ -213,7 +214,25 @@ size_t Connection::decode(const char* buffer, size_t size) { } else { // Multicast local connections. assert(isLocal()); - cluster.getMulticast().mcastBuffer(buffer, size, self); + const char* remainingData = buffer; + size_t remainingSize = size; + if (expectProtocolHeader) { + //If this is an outgoing link, we will receive a protocol + //header which needs to be decoded first + framing::ProtocolInitiation pi; + Buffer buf(const_cast<char*>(buffer), size); + if (pi.decode(buf)) { + //TODO: check the version is correct + QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); + expectProtocolHeader = false; + remainingData = buffer + pi.encodedSize(); + remainingSize = size - pi.encodedSize(); + } else { + QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link"); + return 0; + } + } + cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self); } return size; } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index ec46d62cc2..5d46b7e81d 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -64,7 +64,7 @@ class Connection : typedef sys::PollableQueue<EventFrame> EventFrameQueue; /** Local connection, use this in ConnectionId */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp); + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink); /** Shadow connection */ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId); ~Connection(); @@ -172,6 +172,7 @@ class Connection : framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; int readCredit; + bool expectProtocolHeader; friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index 44e40f0591..28d2750ff9 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -38,21 +38,22 @@ using namespace framing; sys::ConnectionCodec* ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { if (v == ProtocolVersion(0, 10)) - return new ConnectionCodec(out, id, cluster, false); + return new ConnectionCodec(out, id, cluster, false, false); else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) - return new ConnectionCodec(out, id, cluster, true); // Catch-up connection + return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection return 0; } // Used for outgoing Link connections, we don't care. sys::ConnectionCodec* ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - return next->create(out, id); + return new ConnectionCodec(out, id, cluster, false, true); + //return next->create(out, id); } -ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp) - : codec(out, id, false), - interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp)), +ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink) + : codec(out, id, isLink), + interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)), id(interceptor->getId()), localId(id) { diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 86fac270fa..69c2b0c3c8 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -56,7 +56,7 @@ class ConnectionCodec : public sys::ConnectionCodec { sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; - ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp); + ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink); ~ConnectionCodec(); // ConnectionCodec functions. |