summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-15 19:39:22 +0000
committerAlan Conway <aconway@apache.org>2008-09-15 19:39:22 +0000
commite60518c80a7ee6e96719a365d84b777aee59df4f (patch)
treeba4d2cc340b6497265df9624fb0385241a03b463 /cpp/src/qpid/cluster/Cluster.cpp
parent6099da5735246f255eb62be535a2f462c7d3bab9 (diff)
downloadqpid-python-e60518c80a7ee6e96719a365d84b777aee59df4f.tar.gz
Cluster member stalling, cluster map updates and unit tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@695593 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp231
1 files changed, 140 insertions, 91 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9db2a61a82..c441686def 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -25,7 +25,10 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterDumpErrorBody.h"
+#include "qpid/framing/ClusterMapBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -50,19 +53,14 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
Cluster& cluster;
MemberId member;
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- void urlNotice(const std::string& u) { cluster.urlNotice(member, u); }
- void ready() { cluster.ready(member); }
-
- void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) {
- assert(0); // Not passed to cluster, used to start a brain dump over TCP.
- }
-
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
- virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) {
- // FIXME aconway 2008-09-12: TODO
+ void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); }
+ void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); }
+ void ready(const std::string& u) { cluster.ready(member, u); }
+ virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
+ cluster.mapInit(members, dumpees, dumps);
}
-
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -80,17 +78,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
state(DISCARD)
{
- QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
+ QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
- cpg.join(name);
-
- connectionEventQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
+ cpg.join(name);
+
}
-Cluster::~Cluster() {
- QPID_LOG(debug, "~Cluster()");
-}
+Cluster::~Cluster() {}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Mutex::ScopedLock l(lock);
@@ -102,60 +97,47 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
-// FIXME aconway 2008-09-10: leave is currently not called,
-// It should be called if we are shut down by a cluster admin command.
+// FIXME aconway 2008-09-10: call leave from cluster admin command.
// Any other type of exit is caught in disconnect().
//
void Cluster::leave() {
- QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
+ QPID_LOG(notice, self << " leaving cluster " << name.str());
cpg.leave(name);
- // Cluster will shut down in configChange when the cluster knows we've left.
-}
-
-template <class T> void decodePtr(Buffer& buf, T*& ptr) {
- uint64_t value = buf.getLongLong();
- ptr = reinterpret_cast<T*>(value);
-}
-
-template <class T> void encodePtr(Buffer& buf, T* ptr) {
- uint64_t value = reinterpret_cast<uint64_t>(ptr);
- buf.putLongLong(value);
+ // Defer shut down to the final configChange when the group knows we've left.
}
-void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- Event e(CONTROL, connection, frame.size());
+void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
+ QPID_LOG(trace, "MCAST [" << self << "]: " << body);
+ AMQFrame f(body);
+ Event e(CONTROL, ConnectionId(self, cptr), f.size());
Buffer buf(e);
- frame.encode(buf);
+ f.encode(buf);
mcastEvent(e);
}
void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data");
Event e(DATA, connection, size);
memcpy(e.getData(), data, size);
mcastEvent(e);
}
void Cluster::mcastEvent(const Event& e) {
- QPID_LOG(trace, "Multicasting: " << e);
e.mcast(name, cpg);
}
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
- return urls.size();
+ return map.memberCount();
}
std::vector<Url> Cluster::getUrls() const {
Mutex::ScopedLock l(lock);
- std::vector<Url> result(urls.size());
- std::transform(urls.begin(), urls.end(), result.begin(),
- boost::bind(&UrlMap::value_type::second, _1));
- return result;
-}
+ return map.memberUrls();
+}
+// FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
+ Mutex::ScopedLock l(lock);
if (id.getMember() == self)
return boost::intrusive_ptr<Connection>(id.getConnectionPtr());
ConnectionMap::iterator i = connections.find(id);
@@ -180,17 +162,19 @@ void Cluster::deliver(
try {
MemberId from(nodeid, pid);
Event e = Event::delivered(from, msg, msg_len);
- QPID_LOG(trace, "Cluster deliver: " << e);
-
// Process cluster controls immediately
if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
Buffer buf(e);
AMQFrame frame;
- while (frame.decode(buf))
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
- throw Exception("Invalid cluster control");
+ throw Exception(QPID_MSG("Invalid cluster control"));
+ }
}
- else { // Process connection controls & data via the connectionEventQueue.
+ else {
+ // Process connection controls & data via the connectionEventQueue
+ // unless we are in the DISCARD state, in which case ignore.
if (state != DISCARD) {
e.setConnection(getConnection(e.getConnectionId()));
connectionEventQueue.push(e);
@@ -227,15 +211,15 @@ ostream& operator<<(ostream& o, const AddrList& a) {
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " joined "; break;
- case CPG_REASON_LEAVE: reasonString = " left ";break;
- case CPG_REASON_NODEDOWN: reasonString = " node-down ";break;
- case CPG_REASON_NODEUP: reasonString = " node-up ";break;
- case CPG_REASON_PROCDOWN: reasonString = " process-down ";break;
+ case CPG_REASON_JOIN: reasonString = " joined"; break;
+ case CPG_REASON_LEAVE: reasonString = " left";break;
+ case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
+ case CPG_REASON_NODEUP: reasonString = " node-up";break;
+ case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
- o << member << reasonString;
+ o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : "");
}
return o;
}
@@ -247,23 +231,28 @@ void Cluster::configChange(
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined)
{
- QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
- << AddrList(joined, nJoined) << AddrList(left, nLeft));
-
- if (nJoined) // Notfiy new members of my URL.
- mcastFrame(
- AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
- ConnectionId(self,0));
-
+ // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
+ QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent));
+ QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft));
if (find(left, left+nLeft, self) != left+nLeft) {
// We have left the group, this is the final config change.
- QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str());
- broker.shutdown();
+ QPID_LOG(notice, self << " left cluster " << name.str());
+ broker.shutdown();
}
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
- // Add new members when their URL notice arraives.
- lock.notifyAll(); // Threads waiting for membership changes.
+ if (state == DISCARD) {
+ if (nCurrent == 1 && *current == self) {
+ QPID_LOG(notice, self << " first in cluster.");
+ map.ready(self, url);
+ ready(); // First in cluster.
+ }
+ else if (find(joined, joined+nJoined, self) != joined+nJoined) {
+ QPID_LOG(notice, self << " requesting state dump."); // Just joined
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
+ }
+ }
+ for (int i = 0; i < nLeft; ++i)
+ map.leave(left[i]);
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -275,24 +264,59 @@ void Cluster::disconnect(sys::DispatchHandle& ) {
// FIXME aconway 2008-09-11: this should be logged as critical,
// when we provide admin option to shut down cluster and let
// members leave cleanly.
- QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str());
+ QPID_LOG(notice, self << " disconnected from cluster " << name.str());
broker.shutdown();
}
-void Cluster::urlNotice(const MemberId& m, const string& url) {
- //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also.
- //FIXME aconway 2008-09-11: Note multiple meanings of my own notice -
- //from DISCARD->STALL and from STALL->READY via map.
+// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place.
+// Only one at a time to simplify things?
+void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
+ Mutex::ScopedLock l(lock);
+ Url url(urlStr);
+ if (self == m) {
+ switch (state) {
+ case DISCARD: state = CATCHUP; stall(); break;
+ case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map.
+ default: assert(0);
+ };
+ }
+ else if (self == map.dumpRequest(m, url)) {
+ assert(state == READY);
+ QPID_LOG(info, self << " dumping to " << url);
+ // state = DUMPING;
+ // stall();
+ // FIXME aconway 2008-09-15: need to stall map?
+ // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
+ mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump.
+ }
+}
+
+void Cluster::ready(const MemberId& m, const string& urlStr) {
+ Mutex::ScopedLock l(lock);
+ Url url(urlStr);
+ map.ready(m, url);
+}
+
+broker::Broker& Cluster::getBroker(){ return broker; }
+
+void Cluster::stall() {
+ Mutex::ScopedLock l(lock);
+ // Stop processing connection events. We still process config changes
+ // and cluster controls in deliver()
+ connectionEventQueue.stop();
- QPID_LOG(info, "Cluster member " << m << " has URL " << url);
- // My brain dump is up to this point, stall till it is complete.
- if (m == self && state == DISCARD)
- state = STALL;
- urls.insert(UrlMap::value_type(m,Url(url)));
+ // FIXME aconway 2008-09-11: Flow control, we should slow down or
+ // stop reading from local connections while stalled to avoid an
+ // unbounded queue.
}
-void Cluster::ready(const MemberId& ) {
- // FIXME aconway 2008-09-08: TODO
+void Cluster::ready() {
+ // Called with lock held
+ QPID_LOG(info, self << " ready with URL " << url);
+ state = READY;
+ mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+ connectionEventQueue.start(poller);
+ // FIXME aconway 2008-09-15: stall/unstall map?
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -301,26 +325,51 @@ void Cluster::ready(const MemberId& ) {
// callbacks will be invoked.
//
void Cluster::shutdown() {
- QPID_LOG(notice, "Cluster member " << self << " shutting down.");
+ QPID_LOG(notice, self << " shutting down.");
try { cpg.shutdown(); }
catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
delete this;
}
-broker::Broker& Cluster::getBroker(){ return broker; }
+/** Received from cluster */
+void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) {
+ QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee);
+ Mutex::ScopedLock l(lock);
+ map.dumpError(dumpee);
+ if (state == DUMPING && map.dumps(self) == 0)
+ ready();
+}
-void Cluster::stall() {
- // Stop processing connection events. We still process config changes
- // and cluster controls in deliver()
+/** Called in local dump thread */
+void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) {
+ assert(state == DUMPING);
+ QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg);
+ mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0);
+ Mutex::ScopedLock l(lock);
+ map.dumpError(dumpee);
+ if (map.dumps(self) == 0) // Unstall immediately.
+ ready();
+}
- // FIXME aconway 2008-09-11: Flow control, we should slow down or
- // stop reading from local connections while stalled to avoid an
- // unbounded queue.
- connectionEventQueue.stop();
+void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
+ Mutex::ScopedLock l(lock);
+ // FIXME aconway 2008-09-15: faking out dump here.
+ switch (state) {
+ case DISCARD:
+ map.init(members, dumpees, dumps);
+ state = HAVE_DUMP;
+ break;
+ case CATCHUP:
+ map.init(members, dumpees, dumps);
+ ready();
+ break;
+ default:
+ break;
+ }
}
-void Cluster::unStall() {
- connectionEventQueue.start(poller);
+void Cluster::dumpTo(const Url& ) {
+ // FIXME aconway 2008-09-12: DumpClient
}
}} // namespace qpid::cluster