summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp138
1 files changed, 78 insertions, 60 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 312d1e90e3..bea336644f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -22,6 +22,7 @@
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
@@ -91,7 +92,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
cpg(*this),
name(settings.name),
myUrl(settings.url.empty() ? Url() : Url(settings.url)),
- myId(cpg.self()),
+ self(cpg.self()),
readMax(settings.readMax),
writeEstimate(settings.writeEstimate),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
@@ -104,8 +105,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
- decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections),
- expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
+ expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
frameId(0),
initialized(false),
state(INIT),
@@ -213,7 +213,7 @@ void Cluster::deliver(
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
e.setSequence(sequence++);
- if (from == myId) // Record self-deliveries for flow control.
+ if (from == self) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e);
}
@@ -227,42 +227,33 @@ void Cluster::deliver(const Event& e) {
// Handler for deliverEventQueue
void Cluster::deliveredEvent(const Event& e) {
QPID_LATENCY_RECORD("delivered event queue", e);
- Buffer buf(const_cast<char*>(e.getData()), e.getSize());
- if (e.getType() == CONTROL) {
- AMQFrame frame;
- while (frame.decode(buf)) {
- // Check for deliver close here so we can erase the
- // connection decoder safely in this thread.
- if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>())
- decoder.erase(e.getConnectionId());
- deliverFrameQueue.push(EventFrame(e, frame));
- }
+ Mutex::ScopedLock l(lock);
+ if (e.isCluster()) { // Cluster control, process in this thread.
+ AMQFrame frame(e.getFrame());
+ ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
+ if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
}
- else if (e.getType() == DATA)
- decoder.decode(e, e.getData());
+ else if (state >= CATCHUP) { // Connection frame, push onto deliver queue.
+ if (e.getType() == CONTROL)
+ connectionFrame(EventFrame(e, e.getFrame()));
+ else
+ connections.decode(e, e.getData());
+ }
+ else // connection frame && state < CATCHUP. Drop.
+ QPID_LOG(trace, *this << " DROP: " << e);
}
// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
- Mutex::ScopedLock l(lock);
- const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+ Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock?
+ assert(!e.isCluster()); // Only connection frames on this queue.
QPID_LOG(trace, *this << " DLVR: " << e);
- QPID_LATENCY_RECORD("delivered frame queue", e.frame);
- if (e.isCluster()) { // Cluster control frame
- ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
- if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
- else { // Connection frame.
- if (state <= UPDATEE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- return;
- }
- boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
- if (connection) // Ignore frames to closed local connections.
- connection->deliveredFrame(e);
- }
- QPID_LATENCY_RECORD("processed", e.frame);
+ if (e.type == DATA) // Sequence number to identify data frames.
+ const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+ boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ if (connection) // Ignore frames to closed local connections.
+ connection->deliveredFrame(e);
}
struct AddrList {
@@ -310,7 +301,7 @@ void Cluster::configChange (
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
- deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+ deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
}
void Cluster::setReady(Lock&) {
@@ -323,7 +314,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
bool memberChange = map.configChange(addresses);
if (state == LEFT) return;
- if (!map.isAlive(myId)) { // Final config change.
+ if (!map.isAlive(self)) { // Final config change.
leave(l);
return;
}
@@ -332,16 +323,16 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (map.aliveCount() == 1) {
setClusterId(true);
setReady(l);
- map = ClusterMap(myId, myUrl, true);
+ map = ClusterMap(self, myUrl, true);
memberUpdate(l);
QPID_LOG(notice, *this << " first in cluster");
}
else { // Joining established group.
state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
elders = map.getAlive();
- elders.erase(myId);
+ elders.erase(self);
broker.getLinks().setPassive(true);
}
}
@@ -361,7 +352,7 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self);
}
}
@@ -388,17 +379,29 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
- if (state == CATCHUP && id == myId) {
+ if (state == CATCHUP && id == self) {
setReady(l);
QPID_LOG(notice, *this << " caught up, active cluster member");
}
}
+void Cluster::stall(Lock&) {
+ // Stop processing the deliveredEventQueue in order to send or
+ // recieve an update.
+ deliverEventQueue.stop();
+}
+
+void Cluster::unstall(Lock&) {
+ // Stop processing the deliveredEventQueue in order to send or
+ // recieve an update.
+ deliverEventQueue.start();
+}
+
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
if (state == LEFT) return;
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
- if (updater == myId) {
+ if (updater == self) {
assert(state == OFFER);
if (url) { // My offer was first.
updateStart(updatee, *url, l);
@@ -409,29 +412,29 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
makeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
}
- else if (updatee == myId && url) {
+ else if (updatee == self && url) {
assert(state == JOINER);
setClusterId(uuid);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
- deliverFrameQueue.stop();
+ stall(l);
checkUpdateIn(l);
}
}
-void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
- deliverFrameQueue.stop();
+ stall(l);
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
client::ConnectionSettings cs;
cs.username = settings.username;
cs.password = settings.password;
cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+ new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
cs));
@@ -445,13 +448,13 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
checkUpdateIn(l);
}
-void Cluster::checkUpdateIn(Lock& ) {
+void Cluster::checkUpdateIn(Lock& l) {
if (state == UPDATEE && updatedMap) {
map = *updatedMap;
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
QPID_LOG(info, *this << " received update, starting catch-up");
- deliverFrameQueue.start();
+ unstall(l);
}
}
@@ -465,7 +468,7 @@ void Cluster::updateOutDone(Lock& l) {
assert(state == UPDATER);
state = READY;
mcast.release();
- deliverFrameQueue.start();
+ unstall(l);
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -490,7 +493,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
{
_qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
stringstream stream;
- stream << myId;
+ stream << self;
if (iargs.i_brokerId == stream.str())
stopClusterNode(l);
}
@@ -511,7 +514,7 @@ void Cluster::stopClusterNode(Lock& l) {
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcast.mcastControl(ClusterShutdownBody(), myId);
+ mcast.mcastControl(ClusterShutdownBody(), self);
}
void Cluster::memberUpdate(Lock& l) {
@@ -522,12 +525,12 @@ void Cluster::memberUpdate(Lock& l) {
failoverExchange->setUrls(urls);
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
- QPID_LOG(info, *this << " last broker standing, update queue policies");
+ QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
}
else if (size > 1 && lastBroker) {
- QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
lastBroker = false;
broker.getQueues().updateQueueClusterState(false);
}
@@ -549,17 +552,25 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_memberIDs(idstr);
}
- // Close connections belonging to members that have now been excluded
- connections.update(myId, map);
+ // Generate a deliver-close control frame for connections
+ // belonging to defunct members, so they will be erased in the
+ // deliverFrameQueue thread.
+ ConnectionMap::Vector c = connections.values();
+ for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) {
+ ConnectionId cid = (*i)->getId();
+ MemberId mid = cid.getMember();
+ if (mid != self && !map.isMember(mid))
+ connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody())));
+ }
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.myId << "(" << STATE[cluster.state] << ")";
+ return o << cluster.self << "(" << STATE[cluster.state] << ")";
}
MemberId Cluster::getId() const {
- return myId; // Immutable, no need to lock.
+ return self; // Immutable, no need to lock.
}
broker::Broker& Cluster::getBroker() const {
@@ -578,7 +589,7 @@ void Cluster::setClusterId(const Uuid& uuid) {
clusterId = uuid;
if (mgmtObject) {
stringstream stream;
- stream << myId;
+ stream << self;
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
@@ -589,4 +600,11 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
+void Cluster::connectionFrame(const EventFrame& frame) {
+ // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition.
+ // Measure performance impact, restore with better locking.
+ // deliverFrameQueue.push(frame);
+ deliveredFrame(frame);
+}
+
}} // namespace qpid::cluster