summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp61
1 files changed, 40 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index bea336644f..69a63ad83c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -21,6 +21,7 @@
#include "Connection.h"
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "UpdateExchange.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -106,13 +107,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
+ eventId(0),
frameId(0),
initialized(false),
state(INIT),
connections(*this),
lastSize(0),
- lastBroker(false),
- sequence(0)
+ lastBroker(false)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -122,7 +123,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
mgmtObject->set_status("JOINING");
}
+ // Failover exchange provides membership updates to clients.
failoverExchange.reset(new FailoverExchange(this));
+ broker.getExchanges().registerExchange(failoverExchange);
+
+ // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
+ broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
@@ -212,7 +219,6 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- e.setSequence(sequence++);
if (from == self) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e);
@@ -225,34 +231,40 @@ void Cluster::deliver(const Event& e) {
}
// Handler for deliverEventQueue
-void Cluster::deliveredEvent(const Event& e) {
- QPID_LATENCY_RECORD("delivered event queue", e);
+void Cluster::deliveredEvent(const Event& event) {
+ Event e(event);
Mutex::ScopedLock l(lock);
+ if (state >= CATCHUP) {
+ e.setId(++eventId);
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ }
if (e.isCluster()) { // Cluster control, process in this thread.
- AMQFrame frame(e.getFrame());
+ EventFrame ef(e, e.getFrame());
+ QPID_LOG(trace, *this << " DLVR: " << ef);
ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
- if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+ if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
- else if (state >= CATCHUP) { // Connection frame, push onto deliver queue.
- if (e.getType() == CONTROL)
+ else if (state >= CATCHUP) { // Handle connection frames
+ if (e.getType() == CONTROL) {
connectionFrame(EventFrame(e, e.getFrame()));
+ }
else
connections.decode(e, e.getData());
}
- else // connection frame && state < CATCHUP. Drop.
- QPID_LOG(trace, *this << " DROP: " << e);
+ // Drop connection frames while state < CATCHUP
}
// Handler for deliverFrameQueue
-void Cluster::deliveredFrame(const EventFrame& e) {
+void Cluster::deliveredFrame(const EventFrame& event) {
Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock?
+ EventFrame e(event);
assert(!e.isCluster()); // Only connection frames on this queue.
- QPID_LOG(trace, *this << " DLVR: " << e);
- if (e.type == DATA) // Sequence number to identify data frames.
- const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ if (e.type == DATA) // Add cluster-id to to data frames.
+ e.frame.setClusterId(frameId++);
boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
- if (connection) // Ignore frames to closed local connections.
+ if (connection) // Ignore frames to closed local connections.
connection->deliveredFrame(e);
}
@@ -389,6 +401,10 @@ void Cluster::stall(Lock&) {
// Stop processing the deliveredEventQueue in order to send or
// recieve an update.
deliverEventQueue.stop();
+
+ // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must
+ // also wait for it to be empty before we are stalled, so that
+ // our local model is up-to-date to give an update.
}
void Cluster::unstall(Lock&) {
@@ -434,17 +450,18 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
cs.password = settings.password;
cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(),
+ new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
cs));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) {
Lock l(lock);
updatedMap = m;
- frameId = fid;
+ eventId = eventId_;
+ frameId = frameId_;
checkUpdateIn(l);
}
@@ -601,9 +618,11 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
}
void Cluster::connectionFrame(const EventFrame& frame) {
- // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition.
- // Measure performance impact, restore with better locking.
+ // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition.
+ // Measure performance impact & review.
+ //
// deliverFrameQueue.push(frame);
+ //
deliveredFrame(frame);
}