summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp79
-rw-r--r--cpp/src/qpid/cluster/Cluster.h8
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp2
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h3
-rw-r--r--cpp/src/qpid/cluster/Event.cpp18
-rw-r--r--cpp/src/qpid/cluster/Event.h4
-rw-r--r--cpp/src/qpid/cluster/LockedConnectionMap.h1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp3
-rw-r--r--cpp/xml/cluster.xml1
9 files changed, 81 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 78f7bf13fc..467c960674 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -144,6 +144,13 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
+/** NOTE: increment this number whenever any incompatible changes in
+ * cluster protocol/behavior are made. It allows early detection and
+ * sensible reporting of an attempt to mix different versions in a
+ * cluster.
+ */
+const uint32_t Cluster::CLUSTER_VERSION = 1;
+
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
MemberId member;
@@ -153,7 +160,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
- void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); }
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
@@ -233,6 +240,7 @@ void Cluster::initialize() {
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
+ QPID_LOG(debug, *this << " add local connection " << c->getId());
localConnections.insert(c);
assert(c->getId().getMember() == self);
// Announce the connection to the cluster.
@@ -242,11 +250,14 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
+ QPID_LOG(debug, *this << " add shadow connection " << c->getId());
// Safe to use connections here because we're pre-catchup, either
// discarding or stalled, so deliveredFrame is not processing any
// connection events.
assert(discarding);
- connections.insert(ConnectionMap::value_type(c->getId(), c));
+ pair<ConnectionMap::iterator, bool> ib
+ = connections.insert(ConnectionMap::value_type(c->getId(), c));
+ assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
@@ -317,11 +328,11 @@ void Cluster::deliver(
}
LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
-LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
+ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
-void Cluster::deliverEvent(const Event& e) {
+ void Cluster::deliverEvent(const Event& e) {
LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
- deliverEventQueue.push(e);
+ deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
@@ -339,16 +350,21 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
- QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isCluster()) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- if (castUpdateOffer(ef.frame.getBody()))
+ const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
+ if (offer) {
+ QPID_LOG(debug, *this << " stall for update offer from " << e.getMemberId()
+ << " to " << MemberId(offer->getUpdatee()));
deliverEventQueue.stop();
+ }
deliverFrame(ef);
}
else if(!discarding) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isControl())
deliverFrame(EventFrame(e, e.getFrame()));
else {
@@ -403,9 +419,8 @@ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
}
- else {
+ else
processFrame(e, l);
- }
}
@@ -447,7 +462,7 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
mgmtId << id;
cp = new Connection(*this, shadowOut, mgmtId.str(), id);
}
- connections.insert(ConnectionMap::value_type(id, cp));
+ connections.insert(ConnectionMap::value_type(id, cp));
}
return cp;
}
@@ -556,7 +571,8 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self);
+ mcast.mcastControl(
+ ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self);
}
}
@@ -587,26 +603,29 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
}
}
-// Go back to normal processing after an offer that did not result in an update.
-void Cluster::cancelOffer(const MemberId& updatee, Lock& l) {
- QPID_LOG(info, *this << " cancelled offer to " << updatee);
- deliverEventQueue.start(); // Go back to normal processing
- setReady(l);
- makeOffer(map.firstJoiner(), l); // Maybe make another offer.
-}
-
-void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid,
+ uint32_t version, Lock& l) {
// NOTE: deliverEventQueue has been stopped at the update offer by
// deliveredEvent in case an update is required.
if (state == LEFT) return;
+ if (version != CLUSTER_VERSION) {
+ QPID_LOG(critical, *this << " incompatible cluster versions " <<
+ version << " != " << CLUSTER_VERSION);
+ leave(l);
+ return;
+ }
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
if (updater == self) {
assert(state == OFFER);
if (url) // My offer was first.
updateStart(updatee, *url, l);
- else // Another offer was first.
- cancelOffer(updatee, l);
+ else { // Another offer was first.
+ QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall");
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ deliverEventQueue.start(); // Go back to normal processing
+ }
}
else if (updatee == self && url) {
assert(state == JOINER);
@@ -615,8 +634,11 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
QPID_LOG(info, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
- else
+ else {
+ QPID_LOG(debug,*this << " unstall, ignore update " << updater
+ << " to " << updatee);
deliverEventQueue.start(); // Not involved in update.
+ }
}
static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
@@ -629,21 +651,23 @@ static client::ConnectionSettings connectionSettings(const ClusterSettings& sett
void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
// An offer was received while handling an error, and converted to a retract.
+ // Behavior is very similar to updateOffer.
if (state == LEFT) return;
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
if (updater == self) {
assert(state == OFFER);
if (url) { // My offer was first.
- QPID_LOG(info, *this << " retracted offer to " << updatee);
+ QPID_LOG(info, *this << " retracting offer to " << updatee);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
}
- cancelOffer(updatee, l);
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ // Don't unstall the event queue, that was already done in deliveredFrame
}
- else
- deliverEventQueue.start(); // Not involved in update.
+ QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee);
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
@@ -672,6 +696,7 @@ void Cluster::updateInDone(const ClusterMap& m) {
void Cluster::updateInRetracted() {
Lock l(lock);
updateRetracted = true;
+ map.clearStatus();
checkUpdateIn(l);
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index a89bd83ac0..170b465ff3 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -108,7 +108,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Called in deliverFrame thread to indicate an error from the broker.
void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg);
- void connectionError();
// Called only during update by Connection::shadowReady
Decoder& getDecoder() { return decoder; }
@@ -122,6 +121,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
typedef PollableQueue<EventFrame> PollableFrameQueue;
typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
+ /** Version number of the cluster protocol, to avoid mixed versions. */
+ static const uint32_t CLUSTER_VERSION;
+
// NB: A dummy Lock& parameter marks functions that must only be
// called with Cluster::lock locked.
@@ -141,7 +143,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
- void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
+ void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&,
+ uint32_t version, Lock&);
void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
@@ -159,7 +162,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void memberUpdate(Lock&);
void setClusterId(const framing::Uuid&, Lock&);
void erase(const ConnectionId&, Lock&);
- void cancelOffer(const MemberId&, Lock&);
// == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index 0395ff6382..e8c421d4eb 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -158,8 +158,6 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) {
bool ClusterMap::configChange(const std::string& addresses) {
bool memberChange = false;
Set update = decode(addresses);
- for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8)
- update.insert(MemberId(std::string(i, i+8)));
Set removed;
std::set_difference(alive.begin(), alive.end(),
update.begin(), update.end(),
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 3359c7c1f3..7e42ed1c19 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -95,6 +95,9 @@ class ClusterMap {
uint64_t getFrameSeq() { return frameSeq; }
uint64_t incrementFrameSeq() { return ++frameSeq; }
+ /** Clear out all knowledge of joiners & members, just keep alive set */
+ void clearStatus() { joiners.clear(); members.clear(); }
+
private:
Url getUrl(const Map& map, const MemberId& id);
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 52ea84b02b..ae62994e88 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -113,11 +113,12 @@ Event::operator Buffer() const {
return Buffer(const_cast<char*>(getData()), getSize());
}
-AMQFrame Event::getFrame() const {
+const AMQFrame& Event::getFrame() const {
assert(type == CONTROL);
+ if (!frame.getBody()) {
Buffer buf(*this);
- AMQFrame frame;
QPID_ASSERT(frame.decode(buf));
+ }
return frame;
}
@@ -128,8 +129,17 @@ std::ostream& operator << (std::ostream& o, EventType t) {
}
std::ostream& operator << (std::ostream& o, const EventHeader& e) {
- o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
- return o;
+ return o << "Event[" << e.getConnectionId() << " " << e.getType()
+ << " " << e.getSize() << " bytes]";
+}
+
+std::ostream& operator << (std::ostream& o, const Event& e) {
+ o << "Event[" << e.getConnectionId() << " ";
+ if (e.getType() == CONTROL)
+ o << e.getFrame();
+ else
+ o << " data " << e.getSize() << " bytes";
+ return o << "]";
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index 76ba88a87f..3175dd9ed2 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -95,7 +95,7 @@ class Event : public EventHeader {
char* getStore() { return store; }
const char* getStore() const { return store; }
- framing::AMQFrame getFrame() const;
+ const framing::AMQFrame& getFrame() const;
operator framing::Buffer() const;
@@ -105,8 +105,10 @@ class Event : public EventHeader {
void encodeHeader() const;
RefCountedBuffer::pointer store;
+ mutable framing::AMQFrame frame;
};
+std::ostream& operator << (std::ostream&, const Event&);
std::ostream& operator << (std::ostream&, const EventHeader&);
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/LockedConnectionMap.h b/cpp/src/qpid/cluster/LockedConnectionMap.h
index 4df742d6c2..f4f1d7e832 100644
--- a/cpp/src/qpid/cluster/LockedConnectionMap.h
+++ b/cpp/src/qpid/cluster/LockedConnectionMap.h
@@ -37,6 +37,7 @@ class LockedConnectionMap
public:
void insert(const ConnectionPtr& c) {
sys::Mutex::ScopedLock l(lock);
+ assert(map.find(c->getId()) == map.end());
map[c->getId()] = c;
}
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index bad56de826..a8ab105395 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -146,7 +146,8 @@ void UpdateClient::update() {
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
- QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
+ QPID_LOG(debug, updaterId << " update completed to " << updateeId
+ << " at " << updateeUrl << ": " << membership);
}
namespace {
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 474c191b0b..ab66179a05 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -36,6 +36,7 @@
<control name = "update-offer" code="0x2">
<field name="updatee" type="uint64"/>
<field name="cluster-id" type="uuid"/>
+ <field name="version" type="uint32"/>
</control>
<!-- Sender retracts an offer to a new joiner. -->