summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-03-05 20:24:41 +0000
committerAlan Conway <aconway@apache.org>2009-03-05 20:24:41 +0000
commit5f9b4a56232ad922d3e25a408924cb5bef0036d8 (patch)
tree60f2884f112278156a00d0dc806246eccfe8d214 /cpp/src/qpid/cluster/Cluster.cpp
parent97d5254a47121a42d435e1ca808cb4c56cdbf18f (diff)
downloadqpid-python-5f9b4a56232ad922d3e25a408924cb5bef0036d8.tar.gz
Cluster: restore separate event/frame threads.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@750574 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp87
1 files changed, 39 insertions, 48 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 69a63ad83c..8946a71446 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -96,6 +96,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
self(cpg.self()),
readMax(settings.readMax),
writeEstimate(settings.writeEstimate),
+ expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -106,12 +107,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
- expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
- eventId(0),
+ connections(*this),
frameId(0),
initialized(false),
state(INIT),
- connections(*this),
+ eventId(0),
lastSize(0),
lastBroker(false)
{
@@ -156,19 +156,15 @@ void Cluster::initialize() {
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- Lock l(lock);
connections.insert(c);
}
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- Lock l(lock);
- assert(state <= UPDATEE); // Only during update.
connections.insert(c);
}
void Cluster::erase(const ConnectionId& id) {
- // Called only by Connection::deliverClose in deliver thread with lock held.
connections.erase(id);
}
@@ -225,12 +221,11 @@ void Cluster::deliver(
}
void Cluster::deliver(const Event& e) {
- if (state == LEFT) return;
- QPID_LATENCY_INIT(e);
deliverEventQueue.push(e);
}
-// Handler for deliverEventQueue
+// Handler for deliverEventQueue.
+// This thread executes cluster controls and decodes connection data events.
void Cluster::deliveredEvent(const Event& event) {
Event e(event);
Mutex::ScopedLock l(lock);
@@ -246,26 +241,34 @@ void Cluster::deliveredEvent(const Event& event) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) { // Handle connection frames
- if (e.getType() == CONTROL) {
+ if (e.getType() == CONTROL)
connectionFrame(EventFrame(e, e.getFrame()));
- }
else
connections.decode(e, e.getData());
}
// Drop connection frames while state < CATCHUP
}
-// Handler for deliverFrameQueue
+void Cluster::connectionFrame(const EventFrame& frame) {
+ deliverFrameQueue.push(frame);
+}
+
+// Handler for deliverFrameQueue.
+// This thread executes connection control and data frames.
void Cluster::deliveredFrame(const EventFrame& event) {
- Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock?
+ // No lock, only use connections, not Cluster state.
EventFrame e(event);
- assert(!e.isCluster()); // Only connection frames on this queue.
- QPID_LOG(trace, *this << " DLVR: " << e);
- if (e.type == DATA) // Add cluster-id to to data frames.
- e.frame.setClusterId(frameId++);
- boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
- if (connection) // Ignore frames to closed local connections.
- connection->deliveredFrame(e);
+ if(!e.frame.getBody()) { // marks the stall point, start the update task.
+ updateThread=Thread(*updateTask);
+ }
+ else {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ if (e.type == DATA) // Add cluster-id to to data frames.
+ e.frame.setClusterId(frameId++);
+ boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ if (connection)
+ connection->deliveredFrame(e);
+ }
}
struct AddrList {
@@ -333,7 +336,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
- setClusterId(true);
+ setClusterId(true, l);
setReady(l);
map = ClusterMap(self, myUrl, true);
memberUpdate(l);
@@ -358,8 +361,6 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
}
}
-bool Cluster::isLeader() const { return elders.empty(); }
-
void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -374,11 +375,9 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) {
// callbacks will be invoked.
//
void Cluster::brokerShutdown() {
- if (state != LEFT) {
- try { cpg.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(error, *this << " shutting down CPG: " << e.what());
- }
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, *this << " shutting down CPG: " << e.what());
}
delete this;
}
@@ -401,10 +400,6 @@ void Cluster::stall(Lock&) {
// Stop processing the deliveredEventQueue in order to send or
// recieve an update.
deliverEventQueue.stop();
-
- // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must
- // also wait for it to be empty before we are stalled, so that
- // our local model is up-to-date to give an update.
}
void Cluster::unstall(Lock&) {
@@ -430,7 +425,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
}
else if (updatee == self && url) {
assert(state == JOINER);
- setClusterId(uuid);
+ setClusterId(uuid, l);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
stall(l);
@@ -444,16 +439,20 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
state = UPDATER;
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
stall(l);
- if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+
+ if (updateThread.id())
+ updateThread.join(); // Join the previous updateThread to avoid leaks.
client::ConnectionSettings cs;
cs.username = settings.username;
cs.password = settings.password;
cs.mechanism = settings.mechanism;
- updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
+ updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
- cs));
+ cs);
+ // Push an empty frame onto the deliverFrameQueue to mark the stall point.
+ // The deliverFrameQueue thread will start the update at that point.
+ deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame()));
}
// Called in update thread.
@@ -461,6 +460,7 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t fram
Lock l(lock);
updatedMap = m;
eventId = eventId_;
+ // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently.
frameId = frameId_;
checkUpdateIn(l);
}
@@ -602,7 +602,7 @@ void Cluster::checkQuorum() {
}
}
-void Cluster::setClusterId(const Uuid& uuid) {
+void Cluster::setClusterId(const Uuid& uuid, Lock&) {
clusterId = uuid;
if (mgmtObject) {
stringstream stream;
@@ -617,13 +617,4 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
-void Cluster::connectionFrame(const EventFrame& frame) {
- // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition.
- // Measure performance impact & review.
- //
- // deliverFrameQueue.push(frame);
- //
- deliveredFrame(frame);
-}
-
}} // namespace qpid::cluster