summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp82
1 files changed, 58 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 07ed4596e0..9db2a61a82 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -25,7 +25,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterJoiningBody.h"
+#include "qpid/framing/ClusterUrlNoticeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -50,7 +50,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
Cluster& cluster;
MemberId member;
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- void joining(const std::string& u) { cluster.joining (member, u); }
+ void urlNotice(const std::string& u) { cluster.urlNotice(member, u); }
void ready() { cluster.ready(member); }
void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) {
@@ -58,6 +58,11 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
}
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
+
+ virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) {
+ // FIXME aconway 2008-09-12: TODO
+ }
+
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -72,13 +77,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1)))
+ connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+ state(DISCARD)
{
QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
cpg.join(name);
- deliverQueue.start(poller);
+ connectionEventQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
@@ -103,6 +109,7 @@ void Cluster::erase(ConnectionId id) {
void Cluster::leave() {
QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
cpg.leave(name);
+ // Cluster will shut down in configChange when the cluster knows we've left.
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -172,8 +179,23 @@ void Cluster::deliver(
{
try {
MemberId from(nodeid, pid);
- QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10:
- deliverQueue.push(Event::delivered(from, msg, msg_len));
+ Event e = Event::delivered(from, msg, msg_len);
+ QPID_LOG(trace, "Cluster deliver: " << e);
+
+ // Process cluster controls immediately
+ if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
+ Buffer buf(e);
+ AMQFrame frame;
+ while (frame.decode(buf))
+ if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
+ throw Exception("Invalid cluster control");
+ }
+ else { // Process connection controls & data via the connectionEventQueue.
+ if (state != DISCARD) {
+ e.setConnection(getConnection(e.getConnectionId()));
+ connectionEventQueue.push(e);
+ }
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -183,24 +205,15 @@ void Cluster::deliver(
}
}
-void Cluster::deliverEvent(const Event& e) {
- QPID_LOG(trace, "Delivered: " << e);
+void Cluster::connectionEvent(const Event& e) {
Buffer buf(e);
- if (e.getConnection().getConnectionPtr() == 0) { // Cluster control
+ assert(e.getConnection());
+ if (e.getType() == DATA)
+ e.getConnection()->deliverBuffer(buf);
+ else { // control
AMQFrame frame;
- while (frame.decode(buf))
- if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame))
- throw Exception("Invalid cluster control");
- }
- else { // Connection data or control
- boost::intrusive_ptr<Connection> c = getConnection(e.getConnection());
- if (e.getType() == DATA)
- c->deliverBuffer(buf);
- else { // control
- AMQFrame frame;
- while (frame.decode(buf))
- c->deliver(frame);
- }
+ while (frame.decode(buf))
+ e.getConnection()->deliver(frame);
}
}
@@ -239,7 +252,7 @@ void Cluster::configChange(
if (nJoined) // Notfiy new members of my URL.
mcastFrame(
- AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())),
+ AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
ConnectionId(self,0));
if (find(left, left+nLeft, self) != left+nLeft) {
@@ -266,8 +279,15 @@ void Cluster::disconnect(sys::DispatchHandle& ) {
broker.shutdown();
}
-void Cluster::joining(const MemberId& m, const string& url) {
+void Cluster::urlNotice(const MemberId& m, const string& url) {
+ //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also.
+ //FIXME aconway 2008-09-11: Note multiple meanings of my own notice -
+ //from DISCARD->STALL and from STALL->READY via map.
+
QPID_LOG(info, "Cluster member " << m << " has URL " << url);
+ // My brain dump is up to this point, stall till it is complete.
+ if (m == self && state == DISCARD)
+ state = STALL;
urls.insert(UrlMap::value_type(m,Url(url)));
}
@@ -289,4 +309,18 @@ void Cluster::shutdown() {
broker::Broker& Cluster::getBroker(){ return broker; }
+void Cluster::stall() {
+ // Stop processing connection events. We still process config changes
+ // and cluster controls in deliver()
+
+ // FIXME aconway 2008-09-11: Flow control, we should slow down or
+ // stop reading from local connections while stalled to avoid an
+ // unbounded queue.
+ connectionEventQueue.stop();
+}
+
+void Cluster::unStall() {
+ connectionEventQueue.start(poller);
+}
+
}} // namespace qpid::cluster