diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 82 |
1 files changed, 58 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 07ed4596e0..9db2a61a82 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -25,7 +25,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterJoiningBody.h" +#include "qpid/framing/ClusterUrlNoticeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -50,7 +50,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { Cluster& cluster; MemberId member; ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} - void joining(const std::string& u) { cluster.joining (member, u); } + void urlNotice(const std::string& u) { cluster.urlNotice(member, u); } void ready() { cluster.ready(member); } void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) { @@ -58,6 +58,11 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { } bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } + + virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) { + // FIXME aconway 2008-09-12: TODO + } + }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -72,13 +77,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) + connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), + state(DISCARD) { QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpg.join(name); - deliverQueue.start(poller); + connectionEventQueue.start(poller); cpgDispatchHandle.startWatch(poller); } @@ -103,6 +109,7 @@ void Cluster::erase(ConnectionId id) { void Cluster::leave() { QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); cpg.leave(name); + // Cluster will shut down in configChange when the cluster knows we've left. } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -172,8 +179,23 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); - QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: - deliverQueue.push(Event::delivered(from, msg, msg_len)); + Event e = Event::delivered(from, msg, msg_len); + QPID_LOG(trace, "Cluster deliver: " << e); + + // Process cluster controls immediately + if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control + Buffer buf(e); + AMQFrame frame; + while (frame.decode(buf)) + if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame)) + throw Exception("Invalid cluster control"); + } + else { // Process connection controls & data via the connectionEventQueue. + if (state != DISCARD) { + e.setConnection(getConnection(e.getConnectionId())); + connectionEventQueue.push(e); + } + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -183,24 +205,15 @@ void Cluster::deliver( } } -void Cluster::deliverEvent(const Event& e) { - QPID_LOG(trace, "Delivered: " << e); +void Cluster::connectionEvent(const Event& e) { Buffer buf(e); - if (e.getConnection().getConnectionPtr() == 0) { // Cluster control + assert(e.getConnection()); + if (e.getType() == DATA) + e.getConnection()->deliverBuffer(buf); + else { // control AMQFrame frame; - while (frame.decode(buf)) - if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame)) - throw Exception("Invalid cluster control"); - } - else { // Connection data or control - boost::intrusive_ptr<Connection> c = getConnection(e.getConnection()); - if (e.getType() == DATA) - c->deliverBuffer(buf); - else { // control - AMQFrame frame; - while (frame.decode(buf)) - c->deliver(frame); - } + while (frame.decode(buf)) + e.getConnection()->deliver(frame); } } @@ -239,7 +252,7 @@ void Cluster::configChange( if (nJoined) // Notfiy new members of my URL. mcastFrame( - AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())), + AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); if (find(left, left+nLeft, self) != left+nLeft) { @@ -266,8 +279,15 @@ void Cluster::disconnect(sys::DispatchHandle& ) { broker.shutdown(); } -void Cluster::joining(const MemberId& m, const string& url) { +void Cluster::urlNotice(const MemberId& m, const string& url) { + //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also. + //FIXME aconway 2008-09-11: Note multiple meanings of my own notice - + //from DISCARD->STALL and from STALL->READY via map. + QPID_LOG(info, "Cluster member " << m << " has URL " << url); + // My brain dump is up to this point, stall till it is complete. + if (m == self && state == DISCARD) + state = STALL; urls.insert(UrlMap::value_type(m,Url(url))); } @@ -289,4 +309,18 @@ void Cluster::shutdown() { broker::Broker& Cluster::getBroker(){ return broker; } +void Cluster::stall() { + // Stop processing connection events. We still process config changes + // and cluster controls in deliver() + + // FIXME aconway 2008-09-11: Flow control, we should slow down or + // stop reading from local connections while stalled to avoid an + // unbounded queue. + connectionEventQueue.stop(); +} + +void Cluster::unStall() { + connectionEventQueue.start(poller); +} + }} // namespace qpid::cluster |