summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
committerAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
commitff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch)
treed8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src
parent2141967346b884e592a42353ae596d37eb90fe7b (diff)
downloadqpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions. client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang. src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/cluster.mk8
-rw-r--r--cpp/src/qpid/broker/Connection.cpp1
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h1
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp8
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp478
-rw-r--r--cpp/src/qpid/cluster/Cluster.h171
-rw-r--r--cpp/src/qpid/cluster/ClusterHandler.cpp62
-rw-r--r--cpp/src/qpid/cluster/ClusterHandler.h70
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp137
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h66
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp20
-rw-r--r--cpp/src/qpid/cluster/Connection.h4
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp9
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h1
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp43
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h13
-rw-r--r--cpp/src/qpid/cluster/Event.cpp11
-rw-r--r--cpp/src/qpid/cluster/Event.h6
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp124
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.h60
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.cpp96
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.h63
-rw-r--r--cpp/src/qpid/cluster/types.h7
-rw-r--r--cpp/src/qpid/framing/FieldTable.h3
-rw-r--r--cpp/src/qpid/sys/LockPtr.h89
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h99
-rw-r--r--cpp/src/tests/cluster_test.cpp31
29 files changed, 789 insertions, 897 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index c489af901b..f9c6e74bd8 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -595,6 +595,7 @@ nobase_include_HEADERS = \
qpid/sys/Dispatcher.h \
qpid/sys/IntegerTypes.h \
qpid/sys/IOHandle.h \
+ qpid/sys/LockPtr.h \
qpid/sys/Monitor.h \
qpid/sys/Mutex.h \
qpid/sys/OutputControl.h \
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 8060e49b97..443db3fb15 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -28,13 +28,7 @@ cluster_la_SOURCES = \
qpid/cluster/DumpClient.h \
qpid/cluster/DumpClient.cpp \
qpid/cluster/ClusterMap.h \
- qpid/cluster/ClusterMap.cpp \
- qpid/cluster/ClusterHandler.h \
- qpid/cluster/ClusterHandler.cpp \
- qpid/cluster/JoiningHandler.h \
- qpid/cluster/JoiningHandler.cpp \
- qpid/cluster/MemberHandler.h \
- qpid/cluster/MemberHandler.cpp
+ qpid/cluster/ClusterMap.cpp
cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 320efc42f1..5d4ebad4b9 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -167,6 +167,7 @@ void Connection::setFederationLink(bool b)
void Connection::close(
ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
+ QPID_LOG_IF(error, code != 200, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
adapter.close(code, text, classId, methodId);
channels.clear();
getOutput().close();
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 0321c2e6aa..7f1cd5ce7f 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -198,5 +198,7 @@ bool ConnectionHandler::isOpen() const
bool ConnectionHandler::isClosed() const
{
int s = getState();
- return s == CLOSING || s == CLOSED || s == FAILED;
+ return s == CLOSED || s == FAILED;
}
+
+bool ConnectionHandler::isClosing() const { return getState() == CLOSING; }
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index ffb612fae8..40be3a5237 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -99,6 +99,7 @@ public:
// Note that open and closed aren't related by open = !closed
bool isOpen() const;
bool isClosed() const;
+ bool isClosing() const;
CloseListener onClose;
ErrorListener onError;
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index f180c4f23e..d9ac65c1b3 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -143,11 +143,13 @@ static const std::string CONN_CLOSED("Connection closed by broker");
void ConnectionImpl::shutdown() {
Mutex::ScopedLock l(lock);
- // FIXME aconway 2008-06-06: exception use, connection-forced is incorrect here.
- setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
if (handler.isClosed()) return;
+ // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
+ // an appropriate close-code. connection-forced is not right.
+ if (!handler.isClosing())
+ closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
+ setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
handler.fail(CONN_CLOSED);
- closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index b48443526c..9c503d6d13 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -18,19 +18,24 @@
#include "Cluster.h"
#include "Connection.h"
+#include "DumpClient.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
-#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterDumpOfferBody.h"
+#include "qpid/framing/ClusterDumpStartBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
@@ -55,156 +60,221 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace qmf = qmf::org::apache::qpid::cluster;
+/**@file
+ Threading notes:
+ - Public functions may be called in local connection IO threads.
+ see .h.
+*/
+
+struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
+ qpid::cluster::Cluster& cluster;
+ MemberId member;
+ Cluster::Lock& l;
+ ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
+
+ void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); }
+ void ready(const std::string& url) { cluster.ready(member, url, l); }
+ void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); }
+ void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); }
+ void shutdown() { cluster.shutdown(member, l); }
+
+ bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
+};
+
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(b),
poller(b.getPoller()),
cpg(*this),
name(name_),
- url(url_),
- self(cpg.self()),
- cpgDispatchHandle(cpg,
- boost::bind(&Cluster::dispatch, this, _1), // read
- 0, // write
- boost::bind(&Cluster::disconnect, this, _1) // disconnect
+ myUrl(url_),
+ memberId(cpg.self()),
+ cpgDispatchHandle(
+ cpg,
+ boost::bind(&Cluster::dispatch, this, _1), // read
+ 0, // write
+ boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+ deliverQueue(boost::bind(&Cluster::process, this, _1), poller),
+ mcastId(0),
mgmtObject(0),
- handler(&joiningHandler),
- joiningHandler(*this),
- memberHandler(*this),
- mcastId(),
- lastSize(1)
+ state(INIT),
+ lastSize(1)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
qmf::Package packageInit(agent);
- mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str());
+ mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str());
agent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
-
// FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
- QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
+ QPID_LOG(notice, *this << " joining cluster " << name.str());
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+ if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
+}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
+ Lock l(lock);
+ assert(!c->isCatchUp());
connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
}
-void Cluster::dumpComplete() { handler->dumpComplete(); }
-
void Cluster::erase(ConnectionId id) {
- Mutex::ScopedLock l(lock);
+ Lock l(lock);
connections.erase(id);
}
-void Cluster::leave() {
- QPID_LOG(notice, self << " leaving cluster " << name.str());
- cpg.leave(name);
- // Defer shut down to the final configChange when the group knows we've left.
+void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
+ Lock l(lock);
+ mcastControl(body, cptr, l);
}
-void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
- AMQFrame f(body);
- Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId);
- Buffer buf(e);
- f.encode(buf);
- QPID_LOG(trace, "MCAST " << e << " " << body);
- mcastEvent(e);
+void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr, Lock&) {
+ Lock l(lock);
+ Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId));
+ QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
+ mcast(e, l);
}
void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) {
+ Lock l(lock);
+ mcastBuffer(data, size, connection, id, l);
+}
+
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id, Lock&) {
+ Lock l(lock);
Event e(DATA, connection, size, id);
memcpy(e.getData(), data, size);
- QPID_LOG(trace, "MCAST " << e);
- mcastEvent(e);
+ QPID_LOG(trace, *this << " MCAST " << e);
+ mcast(e, l);
}
-void Cluster::mcastEvent(const Event& e) {
- e.mcast(name, cpg);
-}
+void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
-size_t Cluster::size() const {
- Mutex::ScopedLock l(lock);
- return map.size();
+void Cluster::mcast(const Event& e, Lock&) {
+ if (state == LEFT) return;
+ if (state < READY && e.isConnection()) {
+ // Stall outgoing connection events.
+ QPID_LOG(trace, *this << " MCAST deferred: " << e );
+ mcastQueue.push_back(e);
+ }
+ else
+ e.mcast(name, cpg);
}
std::vector<Url> Cluster::getUrls() const {
- Mutex::ScopedLock l(lock);
+ Lock l(lock);
+ return getUrls(l);
+}
+
+std::vector<Url> Cluster::getUrls(Lock&) const {
return map.memberUrls();
}
-// FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
-// Check locking from Handler functions.
-boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
- Mutex::ScopedLock l(lock);
- if (id.getMember() == self)
- return boost::intrusive_ptr<Connection>(id.getConnectionPtr());
- ConnectionMap::iterator i = connections.find(id);
+void Cluster::leave() {
+ Lock l(lock);
+ leave(l);
+}
+
+void Cluster::leave(Lock&) {
+ if (state != LEFT) {
+ state = LEFT;
+ QPID_LOG(notice, *this << " leaving cluster " << name.str());
+
+ if (!deliverQueue.isStopped()) deliverQueue.stop();
+ if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
+ try { cpg.leave(name); }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error leaving process group: " << e.what());
+ }
+ try { broker.shutdown(); }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error during shutdown, aborting: " << e.what());
+ abort(); // Big trouble.
+ }
+ }
+}
+
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) {
+ if (connectionId.getMember() == memberId)
+ return boost::intrusive_ptr<Connection>(connectionId.getPointer());
+ ConnectionMap::iterator i = connections.find(connectionId);
if (i == connections.end()) { // New shadow connection.
- assert(id.getMember() != self);
+ assert(connectionId.getMember() != memberId);
std::ostringstream mgmtId;
- mgmtId << name.str() << ":" << id;
- ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id));
+ mgmtId << name.str() << ":" << connectionId;
+ ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId));
i = connections.insert(value).first;
}
return i->second;
}
-void Cluster::deliver(
+Cluster::Connections Cluster::getConnections(Lock&) {
+ Connections result(connections.size());
+ std::transform(connections.begin(), connections.end(), result.begin(),
+ boost::bind(&ConnectionMap::value_type::second, _1));
+ return result;
+}
+
+void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
uint32_t nodeid,
uint32_t pid,
void* msg,
- int msg_len)
+ int msg_len)
{
- try {
- MemberId from(nodeid, pid);
- Event e = Event::delivered(from, msg, msg_len);
+ Mutex::ScopedLock l(lock);
+ MemberId from(nodeid, pid);
+ Event e = Event::delivered(from, msg, msg_len);
+ if (state == LEFT) return;
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ if (e.isCluster() && state != DUMPEE) // Process cluster controls immediately unless in DUMPEE state.
+ process(e, l);
+ else if (state != NEWBIE) // Newbie discards events up to the dump offer.
+ deliverQueue.push(e);
+}
+
+void Cluster::process(const Event& e) {
+ Lock l(lock);
+ process(e,l);
+}
- // Process cluster controls immediately
- if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
- Buffer buf(e);
- AMQFrame frame;
+void Cluster::process(const Event& e, Lock& l) {
+ try {
+ Buffer buf(e);
+ AMQFrame frame;
+ if (e.isCluster()) {
while (frame.decode(buf)) {
- QPID_LOG(trace, "DLVR " << e << " " << frame);
- if (!handler->invoke(e.getConnectionId().getMember(), frame))
+ QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ ClusterDispatcher dispatch(*this, e.getMemberId(), l);
+ if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
- else {
- QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? "(stalled)" : "") << " " << e);
- handler->deliver(e);
+ else { // e.isConnection()
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l);
+ if (e.getType() == DATA) {
+ QPID_LOG(trace, *this << " PROC: " << e);
+ connection->deliverBuffer(buf);
+ }
+ else { // control
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+ connection->delivered(frame);
+ }
+ }
}
}
catch (const std::exception& e) {
- QPID_LOG(critical, "Error in cluster deliver: " << e.what());
- leave();
- }
-}
-
-void Cluster::connectionEvent(const Event& e) {
- Buffer buf(e);
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
- assert(connection);
- if (e.getType() == DATA) {
- QPID_LOG(trace, "EXEC: " << e);
- connection->deliverBuffer(buf);
- }
- else { // control
- AMQFrame frame;
- while (frame.decode(buf)) {
- QPID_LOG(trace, "EXEC " << e << " " << frame);
- connection->delivered(frame);
- }
+ QPID_LOG(critical, *this << " error in cluster process: " << e.what());
+ leave(l);
}
}
@@ -236,16 +306,22 @@ ostream& operator<<(ostream& o, const AddrList& a) {
}
void Cluster::dispatch(sys::DispatchHandle& h) {
- cpg.dispatchAll();
- h.rewatch();
+ try {
+ cpg.dispatchAll();
+ h.rewatch();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster deliver: " << e.what());
+ leave();
+ }
}
void Cluster::disconnect(sys::DispatchHandle& ) {
- QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down");
+ QPID_LOG(critical, *this << " disconnected from cluster, shutting down");
broker.shutdown();
}
-void Cluster::configChange(
+void Cluster::configChange (
cpg_handle_t /*handle*/,
cpg_name */*group*/,
cpg_address *current, int nCurrent,
@@ -253,49 +329,57 @@ void Cluster::configChange(
cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, "Process members: " << AddrList(current, nCurrent)
+ QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
-
- if (find(left, left+nLeft, self) != left+nLeft) {
- // I have left the group, this is the final config change.
- QPID_LOG(notice, self << " left cluster " << name.str());
- broker.shutdown();
- return;
+ map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
+ updateMemberStats(l);
+ if (state == LEFT) return;
+ if (!map.isAlive(memberId)) { leave(l); return; }
+
+ if(state == INIT) { // First configChange
+ if (map.aliveCount() == 1) {
+ QPID_LOG(info, *this << " first in cluster at " << myUrl);
+ map = ClusterMap(memberId, myUrl, true);
+ unstall(l);
+ }
+ else { // Joining established group.
+ state = NEWBIE;
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), 0, l);
+ QPID_LOG(debug, *this << " send dump-request " << myUrl);
+ }
}
-
- if (map.left(left, nLeft)) updateMemberStats();
- handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
}
-
-broker::Broker& Cluster::getBroker(){ return broker; }
-
-void Cluster::stall() {
- Mutex::ScopedLock l(lock);
- QPID_LOG(debug, self << " stalling.");
- // Stop processing connection events. We still process config changes
- // and cluster controls in deliver()
- connectionEventQueue.stop();
- if (mgmtObject!=0) mgmtObject->set_status("STALLED");
-
- // FIXME aconway 2008-09-11: Flow control, we should slow down or
- // stop reading from local connections while stalled to avoid an
- // unbounded queue.
+void Cluster::dumpInDone(const ClusterMap& m) {
+ Lock l(lock);
+ dumpedMap = m;
+ checkDumpIn(l);
}
-void Cluster::ready() {
- // Called with lock held
- QPID_LOG(debug, self << " ready at " << url);
- unstall();
- mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
+ if (state == READY && map.isNewbie(id)) {
+ state = OFFER;
+ QPID_LOG(debug, *this << " send dump-offer to " << id);
+ mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l);
+ }
}
-void Cluster::unstall() {
+void Cluster::unstall(Lock& l) {
// Called with lock held
- QPID_LOG(debug, self << " un-stalling");
- handler = &memberHandler; // Member mode.
- connectionEventQueue.start(poller);
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ switch (state) {
+ case INIT: case DUMPEE: case DUMPER:
+ QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size()
+ << " mcast=" << mcastQueue.size());
+ deliverQueue.start();
+ state = READY;
+ for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ mcastQueue.clear();
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ break;
+ case LEFT: break;
+ case NEWBIE: case READY: case OFFER:
+ assert(0);
+ }
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -303,17 +387,106 @@ void Cluster::unstall() {
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
//
-void Cluster::brokerShutdown() {
- QPID_LOG(notice, self << " shutting down.");
- try { cpg.shutdown(); }
- catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
+void Cluster::brokerShutdown() {
+ QPID_LOG(notice, *this << " shutting down ");
+ if (state != LEFT) {
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, *this << " during shutdown: " << e.what());
+ }
+ }
delete this;
}
-ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; }
+void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
+ map.dumpRequest(id, url);
+ tryMakeOffer(id, l);
+}
+
+void Cluster::ready(const MemberId& id, const std::string& url, Lock&) {
+ map.ready(id, Url(url));
+}
+
+void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
+ if (state == LEFT) return;
+ MemberId dumpee(dumpeeInt);
+ boost::optional<Url> url = map.dumpOffer(dumper, dumpee);
+ if (dumper == memberId) {
+ assert(state == OFFER);
+ if (url) { // My offer was first.
+ QPID_LOG(debug, *this << " mark dump point for dump to " << dumpee);
+ // Put dump-start on my own deliver queue to mark the stall point.
+ // We will stall when it is processed.
+ deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(), dumpee, url->str()), memberId));
+ }
+ else { // Another offer was first.
+ QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
+ state = READY;
+ tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
+ }
+ }
+ else if (dumpee == memberId && url) {
+ assert(state == NEWBIE);
+ QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
+ state = DUMPEE;
+ checkDumpIn(l);
+ }
+}
+
+void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) {
+ if (state == LEFT) return;
+ MemberId dumpee(dumpeeInt);
+ Url url(urlStr);
+ assert(state == OFFER);
+ deliverQueue.stop();
+ QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr);
+ state = DUMPER;
+ if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
+ dumpThread = Thread(
+ new DumpClient(memberId, dumpee, url, broker, map, getConnections(l),
+ boost::bind(&Cluster::dumpOutDone, this),
+ boost::bind(&Cluster::dumpOutError, this, _1)));
+}
+
+void Cluster::checkDumpIn(Lock& l) {
+ if (state == LEFT) return;
+ assert(state == DUMPEE || state == NEWBIE);
+ if (state == DUMPEE && dumpedMap) {
+ map = *dumpedMap;
+ QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
+ unstall(l);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
+ }
+}
+
+void Cluster::dumpOutDone() {
+ Monitor::ScopedLock l(lock);
+ dumpOutDone(l);
+}
+
+void Cluster::dumpOutDone(Lock& l) {
+ QPID_LOG(debug, *this << " finished sending dump.");
+ assert(state == DUMPER);
+ unstall(l);
+ tryMakeOffer(map.firstNewbie(), l); // Try another offer
+}
+
+void Cluster::dumpOutError(const std::exception& e) {
+ Monitor::ScopedLock l(lock);
+ QPID_LOG(error, *this << " error sending state dump: " << e.what());
+ dumpOutDone(l);
+}
+
+void Cluster ::shutdown(const MemberId& id, Lock& l) {
+ QPID_LOG(notice, *this << " received shutdown from " << id);
+ leave(l);
+}
+
+ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
- QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+ Lock l(lock);
+ QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
@@ -322,30 +495,32 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string
return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode(void) {
- QPID_LOG(notice, self << " stopped by admin");
+void Cluster::stopClusterNode() {
+ QPID_LOG(notice, *this << " stopped by admin");
leave();
}
-void Cluster::stopFullCluster(void) {
- QPID_LOG(notice, self << " sending shutdown to cluster.");
- mcastControl(ClusterShutdownBody(), 0);
+void Cluster::stopFullCluster() {
+ Lock l(lock);
+ QPID_LOG(notice, *this << " shutting down cluster " << name.str());
+ mcastControl(ClusterShutdownBody(), 0, l);
}
-void Cluster::updateMemberStats() {
+void Cluster::updateMemberStats(Lock& l) {
if (mgmtObject) {
- if (lastSize != size() && size() ==1){
- QPID_LOG(info, "Last node standing, updating queue policies, size:" <<size());
- broker.getQueues().updateQueueClusterState(true);
- lastSize = size();
- }else if (lastSize != size() && size() > 1) {
- QPID_LOG(info, "Recover back from last node standing, updating queue policies, size:" <<size());
- broker.getQueues().updateQueueClusterState(false);
- lastSize = size();
- }
+ std::vector<Url> vectUrl = getUrls(l);
+ size_t size = vectUrl.size();
+ if (lastSize != size && size == 1){
+ QPID_LOG(info, *this << " last node standing, updating queue policies.");
+ broker.getQueues().updateQueueClusterState(true);
+ }
+ else if (lastSize != size && size > 1) {
+ QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size);
+ broker.getQueues().updateQueueClusterState(false);
+ }
+ lastSize = size;
- mgmtObject->set_clusterSize(size());
- std::vector<Url> vectUrl = getUrls();
+ mgmtObject->set_clusterSize(size);
string urlstr;
for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
if (iter != vectUrl.begin()) urlstr += "\n";
@@ -355,4 +530,17 @@ void Cluster::updateMemberStats() {
}
}
+std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
+ static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY", "OFFER", "DUMPER", "LEFT" };
+ return o << cluster.memberId << "(" << STATE[cluster.state] << ")";
+}
+
+MemberId Cluster::getId() const {
+ return memberId; // Immutable, no need to lock.
+}
+
+broker::Broker& Cluster::getBroker() const {
+ return broker; // Immutable, no need to lock.
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index a8c916a99b..d1cf4b752f 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -23,18 +23,18 @@
#include "Event.h"
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
-#include "JoiningHandler.h"
-#include "MemberHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/Url.h"
+#include "qpid/sys/LockPtr.h"
#include "qpid/management/Manageable.h"
+#include "qpid/Url.h"
#include "qmf/org/apache/qpid/cluster/Cluster.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/bind.hpp>
+#include <boost/optional.hpp>
#include <algorithm>
#include <vector>
@@ -47,12 +47,13 @@ class Connection;
/**
* Connection to the cluster.
- * Keeps cluster membership data.
+ *
*/
-class Cluster : private Cpg::Handler, public management::Manageable
-{
+class Cluster : private Cpg::Handler, public management::Manageable {
public:
-
+ typedef boost::intrusive_ptr<Connection> ConnectionPtr;
+ typedef std::vector<ConnectionPtr> Connections;
+
/**
* Join a cluster.
* @param name of the cluster.
@@ -62,58 +63,68 @@ class Cluster : private Cpg::Handler, public management::Manageable
virtual ~Cluster();
- // FIXME aconway 2008-09-26: thread safety
- void insert(const boost::intrusive_ptr<Connection>&);
+ // Connection map
+ void insert(const ConnectionPtr&);
void erase(ConnectionId);
- void dumpComplete();
-
- /** Get the URLs of current cluster members. */
- std::vector<Url> getUrls() const;
-
- /** Number of members in the cluster. */
- size_t size() const;
-
- bool empty() const { return size() == 0; }
- /** Send to the cluster */
- void mcastControl(const framing::AMQBody& controlBody, Connection* cptr);
+ // Send to the cluster
+ void mcastControl(const framing::AMQBody& controlBody, Connection* cptr=0);
void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
- void mcastEvent(const Event& e);
-
- /** Leave the cluster */
- void leave();
+ void mcast(const Event& e);
- MemberId getSelf() const { return self; }
- MemberId getId() const { return self; }
+ // URLs of current cluster members.
+ std::vector<Url> getUrls() const;
- void ready();
- void stall();
- void unstall();
+ // Leave the cluster
+ void leave();
- void brokerShutdown();
+ // Dump completedJo
+ void dumpInDone(const ClusterMap&);
- broker::Broker& getBroker();
+ MemberId getId() const;
+ broker::Broker& getBroker() const;
- template <class F> void eachConnection(const F& f) {
- for (ConnectionMap::const_iterator i = connections.begin(); i != connections.end(); ++i)
- f(i->second);
- }
-
private:
+ typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
+ typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
+ typedef sys::Monitor::ScopedLock Lock;
+
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
- typedef sys::PollableQueue<Event> EventQueue;
- enum State {
- START, // Start state, no cluster update received yet.
- DISCARD, // Discard updates up to dump start point.
- CATCHUP, // Stalled at catchup point, waiting for dump.
- DUMPING, // Stalled while sending a state dump.
- READY // Normal processing.
- };
-
- void connectionEvent(const Event&);
-
- /** CPG deliver callback. */
- void deliver(
+ typedef sys::PollableQueue<Event> PollableEventQueue;
+ typedef std::deque<Event> PlainEventQueue;
+
+ // Unlocked versions of public functions
+ void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&);
+ void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&);
+ void mcast(const Event& e, Lock&);
+ void leave(Lock&);
+ std::vector<Url> getUrls(Lock&) const;
+
+ // Called via CPG, deliverQueue or DumpClient threads.
+ void tryMakeOffer(const MemberId&, Lock&);
+
+ // Called in CPG, connection IO and DumpClient threads.
+ void unstall(Lock&);
+
+ // Called in main thread in ~Broker.
+ void brokerShutdown();
+
+ // Cluster controls implement XML methods from cluster.xml.
+ // May be called in CPG thread via deliver() OR in deliverQueue thread.
+ //
+ void dumpRequest(const MemberId&, const std::string&, Lock&);
+ void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&);
+ void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&);
+ void ready(const MemberId&, const std::string&, Lock&);
+ void shutdown(const MemberId&, Lock&);
+ void process(const Event&); // deliverQueue callback
+ void process(const Event&, Lock&); // unlocked version
+
+ // CPG callbacks, called in CPG IO thread.
+ void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
+ void disconnect(sys::DispatchHandle&); // PG was disconnected
+
+ void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
struct cpg_name *group,
uint32_t /*nodeid*/,
@@ -121,8 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable
void* /*msg*/,
int /*msg_len*/);
- /** CPG config change callback */
- void configChange(
+ void configChange( // CPG config change callback.
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
struct cpg_address */*members*/, int /*nMembers*/,
@@ -130,45 +140,50 @@ class Cluster : private Cpg::Handler, public management::Manageable
struct cpg_address */*joined*/, int /*nJoined*/
);
- /** Callback to dispatch CPG events. */
- void dispatch(sys::DispatchHandle&);
- /** Callback if CPG fd is disconnected. */
- void disconnect(sys::DispatchHandle&);
+ boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&, Lock&);
+ Connections getConnections(Lock&);
- boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
-
- virtual qpid::management::ManagementObject* GetManagementObject(void) const;
+ virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- void stopClusterNode(void);
- void stopFullCluster(void);
- void updateMemberStats(void);
+ void stopClusterNode();
+ void stopFullCluster();
+ void updateMemberStats(Lock&);
+
+ // Called in connection IO threads .
+ void checkDumpIn(Lock&);
+
+ // Called in DumpClient thread.
+ void dumpOutDone();
+ void dumpOutError(const std::exception&);
+ void dumpOutDone(Lock&);
+
+ mutable sys::Monitor lock;
- mutable sys::Monitor lock; // Protect access to members.
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
- Cpg::Name name;
- Url url;
- ClusterMap map;
- MemberId self;
+ const Cpg::Name name;
+ const Url myUrl;
+ const MemberId memberId;
+
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- EventQueue connectionEventQueue;
- State state;
- qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+ PollableEventQueue deliverQueue;
+ PlainEventQueue mcastQueue;
+ uint32_t mcastId;
- // Handlers for different states.
- ClusterHandler* handler;
- JoiningHandler joiningHandler;
- MemberHandler memberHandler;
+ qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
- uint32_t mcastId;
- size_t lastSize;
+ enum { INIT, NEWBIE, DUMPEE, READY, OFFER, DUMPER, LEFT } state;
+ ClusterMap map;
+ sys::Thread dumpThread;
+ boost::optional<ClusterMap> dumpedMap;
+
+ size_t lastSize;
- friend class ClusterHandler;
- friend class JoiningHandler;
- friend class MemberHandler;
+ friend std::ostream& operator<<(std::ostream&, const Cluster&);
+ friend class ClusterDispatcher;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterHandler.cpp b/cpp/src/qpid/cluster/ClusterHandler.cpp
deleted file mode 100644
index 7413f27192..0000000000
--- a/cpp/src/qpid/cluster/ClusterHandler.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Cluster.h"
-#include "ClusterHandler.h"
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/log/Statement.h"
-
-
-
-namespace qpid {
-namespace cluster {
-
-struct Operations : public framing::AMQP_AllOperations::ClusterHandler {
- qpid::cluster::ClusterHandler& handler;
- MemberId member;
- Operations(qpid::cluster::ClusterHandler& c, const MemberId& id) : handler(c), member(id) {}
-
- void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); }
- void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); }
- void ready(const std::string& url) { handler.ready(member, url); }
- void shutdown() { handler.shutdown(member); }
-};
-
-ClusterHandler::~ClusterHandler() {}
-
-ClusterHandler::ClusterHandler(Cluster& c) : cluster (c) {}
-
-bool ClusterHandler::invoke(const MemberId& id, framing::AMQFrame& frame) {
- Operations ops(*this, id);
- return framing::invoke(ops, *frame.getBody()).wasHandled();
-}
-
-void ClusterHandler::shutdown(const MemberId& id) {
- QPID_LOG(notice, cluster.self << " received shutdown from " << id);
- cluster.leave();
-}
-
-
-}} // namespace qpid::cluster
-
diff --git a/cpp/src/qpid/cluster/ClusterHandler.h b/cpp/src/qpid/cluster/ClusterHandler.h
deleted file mode 100644
index d8bcaa8fe8..0000000000
--- a/cpp/src/qpid/cluster/ClusterHandler.h
+++ /dev/null
@@ -1,70 +0,0 @@
-#ifndef QPID_CLUSTER_CLUSTERHANDLER_H
-#define QPID_CLUSTER_CLUSTERHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Cpg.h"
-#include "types.h"
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-
-namespace framing { class AMQFrame; }
-
-namespace cluster {
-
-class Connection;
-class Cluster;
-class Event;
-
-/**
- * Interface for handing cluster events.
- * Implementations provide different behavior for different states of a member..
- */
-class ClusterHandler
-{
- public:
- ClusterHandler(Cluster& c);
- virtual ~ClusterHandler();
-
- bool invoke(const MemberId&, framing::AMQFrame& f);
-
- virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0;
- virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
- virtual void ready(const MemberId&, const std::string& url) = 0;
- virtual void shutdown(const MemberId&);
-
- virtual void deliver(Event& e) = 0; // Deliver a connection event.
-
- virtual void configChange(cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined) = 0;
-
- virtual void dumpComplete() = 0;
-
- protected:
- Cluster& cluster;
-};
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_CLUSTERHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index b5b71cd397..f3b5451afb 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -33,73 +33,118 @@ using namespace framing;
namespace cluster {
-ClusterMap::ClusterMap() {}
+namespace {
+void insertSet(ClusterMap::Set& set, const ClusterMap::Map::value_type& v) { set.insert(v.first); }
-MemberId ClusterMap::first() const {
- return (members.empty()) ? MemberId() : members.begin()->first;
+void insertMap(ClusterMap::Map& map, FieldTable::ValueMap::value_type vt) {
+ map.insert(ClusterMap::Map::value_type(vt.first, Url(vt.second->get<std::string>())));
}
-bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) {
- bool changed=false;
- for (const cpg_address* a = addrs; a < addrs+nLeft; ++a)
- changed = members.erase(*a) || changed;
- if (dumper && !isMember(dumper))
- dumper = MemberId();
- QPID_LOG_IF(debug, changed, "Members left. " << *this);
- return changed;
+void assignMap(ClusterMap::Map& map, const FieldTable& ft) {
+ map.clear();
+ std::for_each(ft.begin(), ft.end(), boost::bind(&insertMap, boost::ref(map), _1));
}
-framing::ClusterUpdateBody ClusterMap::toControl() const {
- framing::ClusterUpdateBody b;
- for (Members::const_iterator i = members.begin(); i != members.end(); ++i)
- b.getMembers().setString(i->first.str(), i->second.str());
- b.setDumper(dumper);
- return b;
+void insertFieldTable(FieldTable& ft, const ClusterMap::Map::value_type& vt) {
+ return ft.setString(vt.first.str(), vt.second.str());
+}
+
+void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
+ ft.clear();
+ std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTable, boost::ref(ft), _1));
+}
+}
+
+ClusterMap::ClusterMap() {}
+
+ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
+ alive.insert(id);
+ if (isMember)
+ members[id] = url;
+ else
+ newbies[id] = url;
}
-bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
- dumper = MemberId(dumper_);
- bool changed = false;
- framing:: FieldTable::ValueMap::const_iterator i;
- for (i = ftMembers.begin(); i != ftMembers.end(); ++i) {
- MemberId id(i->first);
- Url url(i->second->get<std::string>());
- changed = members.insert(Members::value_type(id, url)).second || changed;
+ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) {
+ assignMap(newbies, newbiesFt);
+ assignMap(members, membersFt);
+ std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertSet, boost::ref(alive), _1));
+ std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1));
+}
+
+void ClusterMap::configChange(
+ cpg_address *current, int nCurrent,
+ cpg_address *left, int nLeft,
+ cpg_address */*joined*/, int /*nJoined*/)
+{
+ cpg_address* a;
+ for (a = left; a != left+nLeft; ++a) {
+ members.erase(*a);
+ newbies.erase(*a);
}
- QPID_LOG_IF(debug, changed, "Update: " << *this);
- return changed;
+ alive.clear();
+ std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
+}
+
+Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
+ Map::const_iterator i = map.find(id);
+ return i == map.end() ? Url() : i->second;
+}
+
+MemberId ClusterMap::firstNewbie() const {
+ return newbies.empty() ? MemberId() : newbies.begin()->first;
+}
+
+ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
+ framing::ClusterConnectionMembershipBody b;
+ assignFieldTable(b.getNewbies(), newbies);
+ assignFieldTable(b.getMembers(), members);
+ return b;
}
std::vector<Url> ClusterMap::memberUrls() const {
- std::vector<Url> result(size());
- std::transform(members.begin(), members.end(), result.begin(),
- boost::bind(&Members::value_type::second, _1));
- return result;
+ std::vector<Url> urls(members.size());
+ std::transform(members.begin(), members.end(), urls.begin(),
+ boost::bind(&Map::value_type::second, _1));
+ return urls;
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
+ std::ostream_iterator<MemberId> oi(o);
+ std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1));
+ return o;
}
std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
- o << "Broker members:";
- for (ClusterMap::Members::const_iterator i=m.members.begin(); i != m.members.end(); ++i) {
- o << " " << i->first;
- if (i->first == m.dumper) o << "(dumping)";
+ for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) {
+ o << *i;
+ if (m.isMember(*i)) o << "(member)";
+ if (m.isNewbie(*i)) o << "(newbie)";
+ o << " ";
}
return o;
}
-bool ClusterMap::sendUpdate(const MemberId& id) const {
- return dumper==id || (!dumper && first() == id);
+bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) {
+ if (isAlive(id)) {
+ newbies[id] = Url(url);
+ return true;
+ }
+ return false;
+}
+
+void ClusterMap::ready(const MemberId& id, const Url& url) {
+ if (isAlive(id)) members[id] = url;
}
-bool ClusterMap::ready(const MemberId& id, const Url& url) {
- bool changed = members.insert(Members::value_type(id,url)).second;
- if (id == dumper) {
- dumper = MemberId();
- QPID_LOG(info, id << " finished dump. " << *this);
- }
- else {
- QPID_LOG(info, id << " joined, url=" << url << ". " << *this);
+boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) {
+ Map::iterator i = newbies.find(to);
+ if (isAlive(from) && i != newbies.end()) {
+ Url url= i->second;
+ newbies.erase(i); // No longer a potential dumpee.
+ return url;
}
- return changed;
+ return boost::none;
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 60fef75f0e..79afba7dc0 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -23,58 +23,68 @@
*/
#include "types.h"
-#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/Url.h"
+#include "qpid/framing/ClusterConnectionMembershipBody.h"
+
#include <boost/function.hpp>
+#include <boost/optional.hpp>
+
#include <vector>
#include <deque>
#include <map>
+#include <set>
#include <iosfwd>
namespace qpid {
namespace cluster {
/**
- * Map of established cluster members and brain-dumps in progress.
- * A dumper is an established member that is sending catch-up data.
- * A dumpee is an aspiring member that is receiving catch-up data.
+ * Map of established cluster members and newbies waiting for a brain dump.
*/
class ClusterMap {
public:
- typedef std::map<MemberId, Url> Members;
- Members members;
- MemberId dumper;
+ typedef std::map<MemberId, Url> Map;
+ typedef std::set<MemberId> Set;
ClusterMap();
-
- /** First member of the cluster in ID order, gets to perform one-off tasks. */
- MemberId first() const;
+ ClusterMap(const MemberId& id, const Url& url, bool isReady);
+ ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
- /** Update for members leaving.
- *@return true if the cluster membership changed.
- */
- bool left(const cpg_address* addrs, size_t size);
+ /** Update from config change. */
+ void configChange(
+ cpg_address *current, int nCurrent,
+ cpg_address *left, int nLeft,
+ cpg_address *joined, int nJoined);
- /** Convert map contents to a cluster update body. */
- framing::ClusterUpdateBody toControl() const;
+ bool isNewbie(const MemberId& id) const { return newbies.find(id) != newbies.end(); }
+ bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
+ bool isAlive(const MemberId& id) const { return alive.find(id) != alive.end(); }
+
+ Url getNewbieUrl(const MemberId& id) { return getUrl(newbies, id); }
+ Url getMemberUrl(const MemberId& id) { return getUrl(members, id); }
- /** Add a new member or dump complete if id == dumper. */
- bool ready(const MemberId& id, const Url& url);
+ /** First newbie in the cluster in ID order, target for offers */
+ MemberId firstNewbie() const;
- /** Apply update delivered from cluster.
- *@return true if cluster membership changed.
- **/
- bool update(const framing::FieldTable& members, uint64_t dumper);
+ /** Convert map contents to a cluster control body. */
+ framing::ClusterConnectionMembershipBody asMethodBody() const;
- bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
-
- bool sendUpdate(const MemberId& id) const; // True if id should send an update.
+ size_t aliveCount() const { return alive.size(); }
+ size_t memberCount() const { return members.size(); }
std::vector<Url> memberUrls() const;
- size_t size() const { return members.size(); }
-
- bool empty() const { return members.empty(); }
+
+ bool dumpRequest(const MemberId& id, const std::string& url);
+ /** Return non-empty Url if accepted */
+ boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to);
+ void ready(const MemberId& id, const Url&);
+
private:
+ Url getUrl(const Map& map, const MemberId& id);
+
+ Map newbies, members;
+ Set alive;
+ friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
};
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 6aab31c177..ae731ed25e 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -77,10 +77,10 @@ void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, "RECV " << *this << ": " << f);
if (isShadow()) {
// Intercept the close that completes catch-up for shadow a connection.
- if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ if (isShadow() && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
catchUp = false;
- AMQFrame ok(in_place<ConnectionCloseOkBody>());
cluster.insert(boost::intrusive_ptr<Connection>(this));
+ AMQFrame ok(in_place<ConnectionCloseOkBody>());
connection.getOutput().send(ok);
output.setOutputHandler(discardHandler);
}
@@ -107,7 +107,11 @@ void Connection::delivered(framing::AMQFrame& f) {
void Connection::closed() {
try {
QPID_LOG(debug, "Connection closed " << *this);
- if (catchUp)
+ if (catchUp) {
+ QPID_LOG(critical, cluster << " error on catch-up connection " << *this);
+ cluster.leave();
+ }
+ else if (isDump())
connection.closed();
else if (isLocal()) {
// This was a local replicated connection. Multicast a deliver
@@ -131,7 +135,7 @@ void Connection::deliverClose () {
// Decode data from local clients.
size_t Connection::decode(const char* buffer, size_t size) {
- if (catchUp) { // Handle catch-up locally.
+ if (catchUp || isDump()) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
received(localDecoder.frame);
@@ -179,11 +183,13 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
self = shadow;
}
-void Connection::dumpComplete() {
- cluster.dumpComplete();
+void Connection::membership(const FieldTable& urls, const FieldTable& states) {
+ cluster.dumpInDone(ClusterMap(urls,states));
+ catchUp = false;
+ self.second = 0; // Mark this as completed dump connection.
}
-bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; }
+bool Connection::isLocal() const { return self.first == cluster.getId() && self.second == this; }
std::ostream& operator<<(std::ostream& o, const Connection& c) {
return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow")
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index df3c035c8a..7d92987e01 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -69,6 +69,8 @@ class Connection :
/** True if the connection is in "catch-up" mode: building initial broker state. */
bool isCatchUp() const { return catchUp; }
+ bool isDump() const { return self.getPointer() == 0; }
+
Cluster& getCluster() { return cluster; }
// ConnectionInputHandler methods
@@ -98,7 +100,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId);
- void dumpComplete();
+ void membership(const framing::FieldTable&, const framing::FieldTable&);
private:
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index 1458a87923..44e40f0591 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -52,17 +52,20 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id)
ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp)
: codec(out, id, false),
- interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp)),
- id(interceptor->getId())
+ interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp)),
+ id(interceptor->getId()),
+ localId(id)
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
- cluster.insert(interceptor);
+ if (!catchUp) // Don't put catchUp connections in the cluster map.
+ cluster.insert(interceptor);
}
ConnectionCodec::~ConnectionCodec() {}
size_t ConnectionCodec::decode(const char* buffer, size_t size) {
+ QPID_LOG(trace, "RECVB [" << localId << "]: " << size << " bytes");
return interceptor->decode(buffer, size);
}
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index e6ab7f5ba1..86fac270fa 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -72,6 +72,7 @@ class ConnectionCodec : public sys::ConnectionCodec {
amqp_0_10::Connection codec;
boost::intrusive_ptr<cluster::Connection> interceptor;
cluster::ConnectionId id;
+ std::string localId;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 59542a2e95..ed339b2f85 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -20,6 +20,7 @@
*/
#include "DumpClient.h"
#include "Cluster.h"
+#include "ClusterMap.h"
#include "Connection.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/broker/Broker.h"
@@ -31,7 +32,7 @@
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/ClusterConnectionDumpCompleteBody.h"
+#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
@@ -63,6 +64,10 @@ struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
ClusterConnectionProxy(client::Connection& c) :
AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
};
+struct ClusterProxy : public AMQP_AllProxy::Cluster {
+ ClusterProxy(client::Connection& c) :
+ AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {}
+};
// Create a connection with special version that marks it as a catch-up connection.
@@ -80,10 +85,11 @@ void send(client::Session& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
-DumpClient::DumpClient(const Url& url, Cluster& c,
+DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url,
+ broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
- : receiver(url), donor(c),
+ : dumperId(to), dumpeeId(from), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
@@ -98,18 +104,19 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
void DumpClient::dump() {
- QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver);
- Broker& b = donor.getBroker();
+ QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl);
+ Broker& b = dumperBroker;
b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
session.sync();
session.close();
- donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
- ClusterConnectionProxy(connection).dumpComplete();
+ std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1));
+ AMQFrame frame(map.asMethodBody());
+ client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
- QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver);
+ QPID_LOG(debug, dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl);
}
void DumpClient::run() {
@@ -160,25 +167,22 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi
}
void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
+ QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection);
shadowConnection = catchUpConnection();
broker::Connection& bc = dumpConnection->getBrokerConnection();
// FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size,
// authentication etc. See ConnectionSettings.
- shadowConnection.open(receiver, bc.getUserId());
+ shadowConnection.open(dumpeeUrl, bc.getUserId());
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
dumpConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
+ reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer()));
shadowConnection.close();
- QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection);
+ QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection);
}
-// FIXME aconway 2008-09-26: REMOVE
-void foo(broker::SemanticState::ConsumerImpl*) {}
-
-
void DumpClient::dumpSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, donor.getId() << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
+ QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
broker::SessionState* s = sh.getSession();
if (!s) return; // no session.
@@ -214,17 +218,18 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
// FIXME aconway 2008-09-23: session replay list.
- QPID_LOG(debug, donor.getId() << " dumped session " << sh.getSession()->getId());
+ QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
}
void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
+ QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
arg::destination = ci->getName(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
- arg::exclusive = false , // FIXME aconway 2008-09-23: how to read.
+ arg::exclusive = false , // FIXME aconway 2008-09-23: duplicate from consumer
// TODO aconway 2008-09-23: remaining args not used by current broker.
// Update this code when they are.
@@ -236,7 +241,7 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
// FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled?
- QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
index 6ce41a53a9..d61779319a 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -22,6 +22,7 @@
*
*/
+#include "ClusterMap.h"
#include "qpid/client/Connection.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/broker/SemanticState.h"
@@ -49,13 +50,15 @@ namespace cluster {
class Cluster;
class Connection;
+class ClusterMap;
/**
* A client that dumps the contents of a local broker to a remote one using AMQP.
*/
class DumpClient : public sys::Runnable {
public:
- DumpClient(const Url& receiver, Cluster& donor,
+ DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&,
+ broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& ,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail);
@@ -73,8 +76,12 @@ class DumpClient : public sys::Runnable {
void dumpConsumer(broker::SemanticState::ConsumerImpl*);
private:
- Url receiver;
- Cluster& donor;
+ MemberId dumperId;
+ MemberId dumpeeId;
+ Url dumpeeUrl;
+ broker::Broker& dumperBroker;
+ ClusterMap map;
+ std::vector<boost::intrusive_ptr<Connection> > connections;
client::Connection connection, shadowConnection;
client::AsyncSession session, shadowSession;
boost::function<void()> done;
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 2531001504..f7389c1922 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -22,6 +22,7 @@
#include "Event.h"
#include "Cpg.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
#include <ostream>
#include <iterator>
#include <algorithm>
@@ -46,12 +47,20 @@ Event Event::delivered(const MemberId& m, void* d, size_t s) {
memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD);
return e;
}
+
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid, uint32_t id) {
+ framing::AMQFrame f(body);
+ Event e(CONTROL, cid, f.size(), id);
+ Buffer buf(e);
+ f.encode(buf);
+ return e;
+}
void Event::mcast (const Cpg::Name& name, Cpg& cpg) const {
char header[OVERHEAD];
Buffer b(header, OVERHEAD);
b.putOctet(type);
- b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr()));
+ b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
b.putLong(id);
iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } };
cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index 6d8655392e..9a2b12bf05 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -47,15 +47,21 @@ class Event {
/** Create an event copied from delivered data. */
static Event delivered(const MemberId& m, void* data, size_t size);
+
+ /** Create an event containing a control */
+ static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0);
void mcast(const Cpg::Name& name, Cpg& cpg) const;
EventType getType() const { return type; }
ConnectionId getConnectionId() const { return connectionId; }
+ MemberId getMemberId() const { return connectionId.getMember(); }
size_t getSize() const { return size; }
char* getData() { return data; }
const char* getData() const { return data; }
size_t getId() const { return id; }
+ bool isCluster() const { return connectionId.getPointer() == 0; }
+ bool isConnection() const { return connectionId.getPointer() != 0; }
operator framing::Buffer() const;
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
deleted file mode 100644
index dbee0ece61..0000000000
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "JoiningHandler.h"
-#include "Cluster.h"
-#include "qpid/framing/ClusterDumpRequestBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace sys;
-using namespace framing;
-
-JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {}
-
-void JoiningHandler::configChange(
- cpg_address *current, int nCurrent,
- cpg_address */*left*/, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
-{
- // FIXME aconway 2008-09-24: Called with lock held - volatile
- if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
- QPID_LOG(notice, cluster.self << " first in cluster.");
- cluster.map.ready(cluster.self, cluster.url);
- cluster.updateMemberStats();
- cluster.unstall();
- }
-}
-
-void JoiningHandler::deliver(Event& e) {
- Mutex::ScopedLock l(cluster.lock);
- // Discard connection events unless we are stalled to receive a dump.
- if (state == STALLED)
- cluster.connectionEventQueue.push(e);
- else
- QPID_LOG(trace, "Discarded pre-join event " << e);
-}
-
-void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
- Mutex::ScopedLock l(cluster.lock);
- if (cluster.map.update(members, dumper)) cluster.updateMemberStats();
- checkDumpRequest();
-}
-
-void JoiningHandler::checkDumpRequest() { // Call with lock held
- if (state == START && !cluster.map.dumper) {
- cluster.broker.getPort(); // ensure the broker is listening.
- state = DUMP_REQUESTED;
- cluster.mcastControl(ClusterDumpRequestBody(framing::ProtocolVersion(), cluster.url.str()), 0);
- }
-}
-
-void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
- Mutex::ScopedLock l(cluster.lock);
- if (cluster.map.dumper) { // Already a dump in progress.
- if (dumpee == cluster.self && state == DUMP_REQUESTED)
- state = START; // Need to make another request.
- }
- else { // Start a new dump
- cluster.map.dumper = cluster.map.first();
- QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper << " dumpee=" << dumpee);
- if (dumpee == cluster.self) { // My turn
- switch (state) {
- case START:
- case STALLED:
- assert(0); break;
-
- case DUMP_REQUESTED:
- QPID_LOG(debug, cluster.self << " stalling for dump from " << cluster.map.dumper);
- state = STALLED;
- cluster.stall();
- break;
-
- case DUMP_COMPLETE:
- QPID_LOG(debug, cluster.self << " at start point and dump complete, ready.");
- cluster.ready();
- break;
- }
- }
- }
-}
-
-void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) {
- Mutex::ScopedLock l(cluster.lock);
- if (cluster.map.ready(id, Url(urlStr)))
- cluster.updateMemberStats();
- checkDumpRequest();
-}
-
-void JoiningHandler::dumpComplete() {
- Mutex::ScopedLock l(cluster.lock);
- if (state == STALLED) {
- QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
- cluster.ready();
- }
- else {
- QPID_LOG(debug, cluster.self << " received dump, waiting for start point.");
- assert(state == DUMP_REQUESTED);
- state = DUMP_COMPLETE;
- }
- // FIXME aconway 2008-09-18: need to detect incomplete dump.
-}
-
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/JoiningHandler.h b/cpp/src/qpid/cluster/JoiningHandler.h
deleted file mode 100644
index cc47690ac5..0000000000
--- a/cpp/src/qpid/cluster/JoiningHandler.h
+++ /dev/null
@@ -1,60 +0,0 @@
-#ifndef QPID_CLUSTER_JOININGHANDLER_H
-#define QPID_CLUSTER_JOININGHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ClusterHandler.h"
-
-namespace qpid {
-namespace cluster {
-
-/**
- * Cluster handler for the "joining" phase, before the process is a
- * full cluster member.
- */
-class JoiningHandler : public ClusterHandler
-{
- public:
- JoiningHandler(Cluster& c);
-
- void configChange(struct cpg_address */*members*/, int /*nMembers*/,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
- );
-
- void deliver(Event& e);
-
- void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
- void dumpRequest(const MemberId&, const std::string& url);
- void ready(const MemberId&, const std::string& url);
-
- void dumpComplete();
-
- private:
- void checkDumpRequest();
-
- enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
- size_t catchUpConnections;
-
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_JOININGHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp
deleted file mode 100644
index 69fe2eec0b..0000000000
--- a/cpp/src/qpid/cluster/MemberHandler.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "MemberHandler.h"
-#include "Cluster.h"
-#include "DumpClient.h"
-#include "qpid/log/Statement.h"
-#include "qpid/framing/ClusterUpdateBody.h"
-#include "qpid/framing/enum.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace sys;
-using namespace framing;
-
-MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
-
-MemberHandler::~MemberHandler() {
- if (dumpThread.id())
- dumpThread.join(); // Join the last dumpthread.
-}
-
-void MemberHandler::configChange(
- cpg_address */*current*/, int /*nCurrent*/,
- cpg_address */*left*/, int /*nLeft*/,
- cpg_address */*joined*/, int nJoined)
-{
- // FIXME aconway 2008-09-24: Called with lock held - volatile
- if (nJoined && cluster.map.sendUpdate(cluster.self)) // New members need update
- cluster.mcastControl(cluster.map.toControl(), 0);
-}
-
-void MemberHandler::deliver(Event& e) {
- cluster.connectionEventQueue.push(e);
-}
-
-// Updates are for new joiners.
-void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
-
-void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
- Mutex::ScopedLock l(cluster.lock);
- if (cluster.map.dumper) return; // dump in progress, ignore request.
-
- cluster.map.dumper = cluster.map.first();
- if (cluster.map.dumper != cluster.self) return;
-
- QPID_LOG(info, cluster.self << " sending state dump to " << dumpee);
- assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled.
- cluster.stall();
-
- if (dumpThread.id())
- dumpThread.join(); // Join the previous dumpthread.
- dumpThread = Thread(new DumpClient(Url(urlStr), cluster,
- boost::bind(&MemberHandler::dumpSent, this),
- boost::bind(&MemberHandler::dumpError, this, _1)));
-}
-
-void MemberHandler::ready(const MemberId& id, const std::string& urlStr) {
- Mutex::ScopedLock l(cluster.lock);
- if (cluster.map.ready(id, Url(urlStr)))
- cluster.updateMemberStats();
-}
-
-
-void MemberHandler::dumpSent() {
- Mutex::ScopedLock l(cluster.lock);
- QPID_LOG(debug, "Finished sending state dump.");
- cluster.ready();
-}
-
-void MemberHandler::dumpError(const std::exception& e) {
- QPID_LOG(error, cluster.self << " error sending state dump: " << e.what());
- dumpSent();
-}
-
-void MemberHandler::dumpComplete() { assert(0); }
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/MemberHandler.h b/cpp/src/qpid/cluster/MemberHandler.h
deleted file mode 100644
index 37cf653b7b..0000000000
--- a/cpp/src/qpid/cluster/MemberHandler.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#ifndef QPID_CLUSTER_MEMBERHANDLER_H
-#define QPID_CLUSTER_MEMBERHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ClusterHandler.h"
-#include "qpid/sys/Thread.h"
-
-namespace qpid {
-namespace cluster {
-
-/**
- * Cluster handler for the "member" phase, before the process is a
- * full cluster member.
- */
-class MemberHandler : public ClusterHandler
-{
- public:
- MemberHandler(Cluster& c);
- ~MemberHandler();
-
- void configChange(
- struct cpg_address */*members*/, int /*nMembers*/,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
- );
-
- void deliver(Event& e);
-
- void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
- void dumpRequest(const MemberId&, const std::string& url);
- void ready(const MemberId&, const std::string& url);
-
- void dumpSent();
- void dumpError(const std::exception&);
-
- void dumpComplete();
-
- public:
- sys::Thread dumpThread;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_MEMBERHANDLER_H*/
-
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index f16cc18f41..2154aa89ce 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -21,6 +21,9 @@
* under the License.
*
*/
+
+#include <qpid/Url.h>
+
#include <utility>
#include <iosfwd>
#include <string>
@@ -49,7 +52,7 @@ struct MemberId : std::pair<uint32_t, uint32_t> {
uint32_t getPid() const { return second; }
operator uint64_t() const { return (uint64_t(first)<<32ull) + second; }
- // Encode as string, network byte order.
+ // AsMethodBody as string, network byte order.
std::string str() const;
};
@@ -62,7 +65,7 @@ struct ConnectionId : public std::pair<MemberId, Connection*> {
ConnectionId(uint64_t m, uint64_t c)
: std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {}
MemberId getMember() const { return first; }
- Connection* getConnectionPtr() const { return second; }
+ Connection* getPointer() const { return second; }
};
std::ostream& operator<<(std::ostream&, const ConnectionId&);
diff --git a/cpp/src/qpid/framing/FieldTable.h b/cpp/src/qpid/framing/FieldTable.h
index b56e3ce3ba..1b6ef0a124 100644
--- a/cpp/src/qpid/framing/FieldTable.h
+++ b/cpp/src/qpid/framing/FieldTable.h
@@ -89,6 +89,9 @@ class FieldTable
ValueMap::const_iterator end() const { return values.end(); }
ValueMap::const_iterator find(const std::string& s) const { return values.find(s); }
+ std::pair <ValueMap::iterator, bool> insert(const ValueMap::value_type&);
+ void clear() { values.clear(); }
+
// ### Hack Alert
ValueMap::iterator getValues() { return values.begin(); }
diff --git a/cpp/src/qpid/sys/LockPtr.h b/cpp/src/qpid/sys/LockPtr.h
new file mode 100644
index 0000000000..738a864317
--- /dev/null
+++ b/cpp/src/qpid/sys/LockPtr.h
@@ -0,0 +1,89 @@
+#ifndef QPID_SYS_LOCKPTR_H
+#define QPID_SYS_LOCKPTR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+class Mutex;
+
+/**
+ * LockPtr is a smart pointer to T. It is constructed from a volatile
+ * T* and a Lock (by default a Mutex). It const_casts away the
+ * volatile qualifier and locks the Lock for the duration of its
+ *
+ * Used in conjuntion with the "volatile" keyword to get the compiler
+ * to help enforce correct concurrent use of mutli-threaded objects.
+ * See ochttp://www.ddj.com/cpp/184403766 for a detailed discussion.
+ *
+ * To summarize the convention:
+ * - Declare thread-safe member functions as volatile.
+ * - Declare instances of the class that may be called concurrently as volatile.
+ * - Use LockPtr to cast away the volatile qualifier while taking a lock.
+ *
+ * This means that code calling on a concurrently-used object
+ * (declared volatile) can only call thread-safe (volatile) member
+ * functions. Code that needs to use thread-unsafe members must use a
+ * LockPtr, thereby acquiring the lock and making it safe to do so.
+ *
+ * A good type-safe pattern is the internally-locked object:
+ * - It has it's own private lock member.
+ * - All public functions are thread safe and declared volatile.
+ * - Any thread-unsafe, non-volatile functions are private.
+ * - Only member function implementations use LockPtr to access private functions.
+ *
+ * This encapsulates all the locking logic inside the class.
+ *
+ * One nice feature of this convention is the common case where you
+ * need a public, locked version of some function foo() and also a
+ * private unlocked version to avoid recursive locks. They can be declared as
+ * volatile and non-volatile overloads of the same function:
+ *
+ * // public
+ * void Thing::foo() volatile { LockPtr<Thing>(this, myLock)->foo(); }
+ * // private
+ * void Thing::foo() { ... do stuff ...}
+ */
+
+template <class T, class Lock> class LockPtr : public boost::noncopyable {
+ public:
+ LockPtr(volatile T* p, Lock& l) : ptr(const_cast<T*>(p)), lock(l) { lock.lock(); }
+ LockPtr(volatile T* p, volatile Lock& l) : ptr(const_cast<T*>(p)), lock(const_cast<Lock&>(l)) { lock.lock(); }
+ ~LockPtr() { lock.unlock(); }
+
+ T& operator*() { return *ptr; }
+ T* operator->() { return ptr; }
+
+ private:
+ T* ptr;
+ Lock& lock;
+};
+
+
+}} // namespace qpid::sys
+
+
+#endif /*!QPID_SYS_LOCKPTR_H*/
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 3a94c60be0..8313196623 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -42,40 +42,31 @@ class Poller;
*/
template <class T>
class PollableQueue {
- typedef std::deque<T> Queue;
-
public:
- typedef typename Queue::iterator iterator;
-
/** Callback to process a range of items. */
- typedef boost::function<void (const iterator&, const iterator&)> Callback;
+ typedef boost::function<void (const T&)> Callback;
- /** @see forEach() */
- template <class F> struct ForEach {
- F handleOne;
- ForEach(F f) : handleOne(f) {}
- void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); }
- };
-
- /** Create a range callback from a functor that processes a single item. */
- template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); }
-
/** When the queue is selected by the poller, values are passed to callback cb. */
- explicit PollableQueue(const Callback& cb);
+ PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller);
+ ~PollableQueue();
+
/** Push a value onto the queue. Thread safe */
void push(const T& t);
/** Start polling. */
- void start(const boost::shared_ptr<sys::Poller>& poller);
+ void start();
/** Stop polling and wait for the current callback, if any, to complete. */
void stop();
/** Are we currently stopped?*/
- bool isStopped() const;
-
+ bool isStopped() const { ScopedLock l(lock); return stopped; }
+
+ size_t size() { ScopedLock l(lock); return queue.size(); }
+ bool empty() { ScopedLock l(lock); return queue.empty(); }
private:
+ typedef std::deque<T> Queue;
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
@@ -83,59 +74,67 @@ class PollableQueue {
mutable sys::Monitor lock;
Callback callback;
+ boost::shared_ptr<sys::Poller> poller;
PollableCondition condition;
- sys::DispatchHandle handle;
+ DispatchHandle handle;
Queue queue;
- Queue batch;
- bool dispatching, stopped;
+ Thread dispatcher;
+ bool stopped;
};
-template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
- : callback(cb),
- handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0),
- dispatching(false), stopped(true)
-{}
+template <class T> PollableQueue<T>::PollableQueue(
+ const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
+ : callback(cb), poller(p),
+ handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true)
+{
+ handle.startWatch(poller);
+ handle.unwatch();
+}
-template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) {
+template <class T> void PollableQueue<T>::start() {
ScopedLock l(lock);
+ assert(stopped);
stopped = false;
- handle.startWatch(poller);
+ if (!queue.empty()) condition.set();
+ handle.rewatch();
+}
+
+template <class T> PollableQueue<T>::~PollableQueue() {
+ handle.stopWatch();
}
template <class T> void PollableQueue<T>::push(const T& t) {
ScopedLock l(lock);
+ if (queue.empty()) condition.set();
queue.push_back(t);
- condition.set();
}
template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock);
- if (stopped) return;
- dispatching = true;
- condition.clear();
- batch.clear();
- batch.swap(queue); // Snapshot of current queue contents.
- {
- // Process outside the lock to allow concurrent push.
- ScopedUnlock u(lock);
- callback(batch.begin(), batch.end());
+ ScopedLock l(lock); // Prevent concurrent push
+ assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
+ dispatcher = Thread::current();
+ while (!stopped && !queue.empty()) {
+ T value = queue.front();
+ queue.pop_front();
+ { // callback outside the lock to allow concurrent push.
+ ScopedUnlock u(lock);
+ callback(value);
+ }
}
- batch.clear();
- dispatching = false;
+ if (queue.empty()) condition.clear();
if (stopped) lock.notifyAll();
- else h.rewatch();
+ dispatcher = Thread();
+ if (!stopped) h.rewatch();
}
template <class T> void PollableQueue<T>::stop() {
ScopedLock l(lock);
- handle.stopWatch();
+ assert(!stopped);
+ handle.unwatch();
stopped = true;
- while (dispatching) lock.wait();
-}
-
-template <class T> bool PollableQueue<T>::isStopped() const {
- ScopedLock l(lock);
- return stopped;
+ // No deadlock if stop is called from the dispatcher thread
+ while (dispatcher.id() && dispatcher.id() != Thread::current().id())
+ lock.wait();
}
}} // namespace qpid::sys
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 9573caf61d..7b67fed388 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -89,7 +89,7 @@ struct ClusterFixture : public vector<uint16_t> {
void waitFor(size_t n) {
size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
- while (retry && getGlobalCluster().size() != n) {
+ while (retry && getGlobalCluster().getUrls().size() != n) {
::usleep(1000);
--retry;
}
@@ -101,7 +101,7 @@ ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()),
if (!init0) return; // Defer initialization of broker0
// Wait for all n members to join the cluster
waitFor(n);
- BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
+ BOOST_REQUIRE_EQUAL(n, getGlobalCluster().getUrls().size());
}
void ClusterFixture::add() {
@@ -227,7 +227,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
BOOST_CHECK_EQUAL(m.getData(), "bar");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
- // Add & verify another broker.
+ // Add another broker, don't wait for join - should be stalled till ready.
cluster.add();
Client c2(cluster[2], "c2");
BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
@@ -321,29 +321,4 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
-QPID_AUTO_TEST_CASE(testStall) {
- ClusterFixture cluster(2);
- Client c0(cluster[0], "c0");
- Client c1(cluster[1], "c1");
-
- // Declare on all to avoid race condition.
- c0.session.queueDeclare("q");
- c1.session.queueDeclare("q");
-
- // Stall 0, verify it does not process deliverys while stalled.
- getGlobalCluster().stall();
- c1.session.messageTransfer(arg::content=Message("foo","q"));
- while (c1.session.queueQuery("q").getMessageCount() != 1)
- ::usleep(1000); // Wait for message to show up on broker 1.
- // But it should not be on broker 0.
- boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
- BOOST_REQUIRE(q0);
- BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0);
- // Now unstall and we should get the message.
- getGlobalCluster().ready();
- Message m;
- BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "foo");
-}
-
QPID_AUTO_TEST_SUITE_END()