summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp124
1 files changed, 60 insertions, 64 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 6a19b8e4ea..eaa4a720b1 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -20,7 +20,6 @@
#include "Connection.h"
#include "UpdateClient.h"
#include "FailoverExchange.h"
-#include "ClusterQueueHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
@@ -92,8 +91,16 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
writeEstimate(writeEstimate_),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
- deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller),
- deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller),
+ deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
+ boost::bind(&Cluster::leave, this),
+ "Error decoding events",
+ poller),
+ deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1),
+ boost::bind(&Cluster::leave, this),
+ "Error delivering frames",
+ poller),
+ connections(*this),
+ decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
state(INIT),
lastSize(0),
lastBroker(false),
@@ -121,12 +128,23 @@ Cluster::~Cluster() {
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- connections.insert(c->getId(), c);
+// Called in connection thread to insert a client connection.
+void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
+ Lock l(lock);
+ connections.insert(c);
}
-void Cluster::erase(ConnectionId id) {
+// Called in connection thread to insert an updated shadow connection.
+void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
+ Lock l(lock);
+ assert(state <= UPDATEE); // Only during update.
+ connections.insert(c);
+}
+
+void Cluster::erase(const ConnectionId& id) {
+ // Called only by Connection::deliverClose in deliver thread, no need to lock.
connections.erase(id);
+ decoder.erase(id);
}
std::vector<string> Cluster::getIds() const {
@@ -168,17 +186,7 @@ void Cluster::leave(Lock&) {
}
}
-boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId) {
- boost::intrusive_ptr<Connection> cp = connections.find(connectionId);
- if (!cp && connectionId.getMember() != myId) { // New shadow connection
- std::ostringstream mgmtId;
- mgmtId << name << ":" << connectionId;
- cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId);
- connections.insert(connectionId, cp);
- }
- return cp;
-}
-
+// Deliver CPG message.
void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
@@ -187,58 +195,52 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
- Mutex::ScopedLock l(lock);
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
e.setSequence(sequence++);
if (from == myId) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
- deliver(e, l);
+ deliver(e);
}
-void Cluster::deliver(const Event& e, Lock&) {
+void Cluster::deliver(const Event& e) {
if (state == LEFT) return;
QPID_LATENCY_INIT(e);
deliverEventQueue.push(e);
}
-// Entry point: called when deliverEventQueue has events to process.
+// Handler for deliverEventQueue
void Cluster::deliveredEvent(const Event& e) {
QPID_LATENCY_RECORD("delivered event queue", e);
Buffer buf(const_cast<char*>(e.getData()), e.getSize());
- boost::intrusive_ptr<Connection> connection;
- if (e.isConnection()) {
- if (state <= UPDATEE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- return;
- }
- connection = getConnection(e.getConnectionId());
- if (!connection) return;
- }
if (e.getType() == CONTROL) {
AMQFrame frame;
- while (frame.decode(buf)) {
- deliverFrameQueue.push(EventFrame(connection, e, frame));
- }
- }
- else if (e.getType() == DATA) {
- connection->deliveredEvent(e, deliverFrameQueue);
+ while (frame.decode(buf))
+ deliverFrameQueue.push(EventFrame(e, frame));
}
+ else if (e.getType() == DATA)
+ decoder.decode(e, e.getData());
}
+// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
+ Mutex::ScopedLock l(lock);
QPID_LOG(trace, *this << " DLVR: " << e);
QPID_LATENCY_RECORD("delivered frame queue", e.frame);
- if (e.connection) {
- e.connection->deliveredFrame(e);
- }
- else {
- Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
- ClusterDispatcher dispatch(*this, e.member, l);
+ if (e.isCluster()) { // Cluster control frame
+ ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
+ else { // Connection frame.
+ if (state <= UPDATEE) {
+ QPID_LOG(trace, *this << " DROP: " << e);
+ return;
+ }
+ boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ connection->deliveredFrame(e);
+ }
QPID_LATENCY_RECORD("processed", e.frame);
}
@@ -282,7 +284,13 @@ void Cluster::configChange (
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
- deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId), l);
+ deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+}
+
+void Cluster::setReady(Lock&) {
+ state = READY;
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ mcast.release();
}
void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
@@ -296,12 +304,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
- setClusterId(true);
- // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release()
- state = READY;
- mcast.release();
QPID_LOG(notice, *this << " first in cluster");
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ setClusterId(true);
+ setReady(l);
map = ClusterMap(myId, myUrl, true);
memberUpdate(l);
}
@@ -325,9 +330,6 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
}
}
-
-
-
void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -361,11 +363,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
if (state == CATCHUP && id == myId) {
- state = READY;
- mcast.release();
QPID_LOG(notice, *this << " caught up, active cluster member");
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- mcast.release();
+ setReady(l);
}
}
@@ -379,8 +378,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
updateStart(updatee, *url, l);
}
else { // Another offer was first.
- state = READY;
- mcast.release();
+ setReady(l);
QPID_LOG(info, *this << " cancelled update offer to " << updatee);
tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
@@ -390,7 +388,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
setClusterId(uuid);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
- deliverEventQueue.stop();
+ deliverFrameQueue.stop();
checkUpdateIn(l);
}
}
@@ -400,7 +398,7 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
assert(state == OFFER);
state = UPDATER;
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
- deliverEventQueue.stop();
+ deliverFrameQueue.stop();
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
updateThread = Thread(
new UpdateClient(myId, updatee, url, broker, map, connections.values(),
@@ -422,7 +420,7 @@ void Cluster::checkUpdateIn(Lock& ) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
state = CATCHUP;
QPID_LOG(info, *this << " received update, starting catch-up");
- deliverEventQueue.start();
+ deliverFrameQueue.start();
}
}
@@ -432,11 +430,11 @@ void Cluster::updateOutDone() {
}
void Cluster::updateOutDone(Lock& l) {
+ QPID_LOG(info, *this << " sent update");
assert(state == UPDATER);
state = READY;
mcast.release();
- QPID_LOG(info, *this << " sent update");
- deliverEventQueue.start();
+ deliverFrameQueue.start();
tryMakeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -504,8 +502,6 @@ void Cluster::memberUpdate(Lock& l) {
}
lastSize = size;
- //
-
if (mgmtObject) {
mgmtObject->set_clusterSize(size);
string urlstr;