diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 61 |
1 files changed, 40 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index bea336644f..69a63ad83c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -21,6 +21,7 @@ #include "Connection.h" #include "UpdateClient.h" #include "FailoverExchange.h" +#include "UpdateExchange.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -106,13 +107,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())), + eventId(0), frameId(0), initialized(false), state(INIT), connections(*this), lastSize(0), - lastBroker(false), - sequence(0) + lastBroker(false) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -122,7 +123,13 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : mgmtObject->set_status("JOINING"); } + // Failover exchange provides membership updates to clients. failoverExchange.reset(new FailoverExchange(this)); + broker.getExchanges().registerExchange(failoverExchange); + + // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. + broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + if (settings.quorum) quorum.init(); cpg.join(name); // pump the CPG dispatch manually till we get initialized. @@ -212,7 +219,6 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - e.setSequence(sequence++); if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e); @@ -225,34 +231,40 @@ void Cluster::deliver(const Event& e) { } // Handler for deliverEventQueue -void Cluster::deliveredEvent(const Event& e) { - QPID_LATENCY_RECORD("delivered event queue", e); +void Cluster::deliveredEvent(const Event& event) { + Event e(event); Mutex::ScopedLock l(lock); + if (state >= CATCHUP) { + e.setId(++eventId); + QPID_LOG(trace, *this << " DLVR: " << e); + } if (e.isCluster()) { // Cluster control, process in this thread. - AMQFrame frame(e.getFrame()); + EventFrame ef(e, e.getFrame()); + QPID_LOG(trace, *this << " DLVR: " << ef); ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l); - if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) + if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } - else if (state >= CATCHUP) { // Connection frame, push onto deliver queue. - if (e.getType() == CONTROL) + else if (state >= CATCHUP) { // Handle connection frames + if (e.getType() == CONTROL) { connectionFrame(EventFrame(e, e.getFrame())); + } else connections.decode(e, e.getData()); } - else // connection frame && state < CATCHUP. Drop. - QPID_LOG(trace, *this << " DROP: " << e); + // Drop connection frames while state < CATCHUP } // Handler for deliverFrameQueue -void Cluster::deliveredFrame(const EventFrame& e) { +void Cluster::deliveredFrame(const EventFrame& event) { Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock? + EventFrame e(event); assert(!e.isCluster()); // Only connection frames on this queue. - QPID_LOG(trace, *this << " DLVR: " << e); - if (e.type == DATA) // Sequence number to identify data frames. - const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + QPID_LOG(trace, *this << " DLVR: " << e); + if (e.type == DATA) // Add cluster-id to to data frames. + e.frame.setClusterId(frameId++); boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); - if (connection) // Ignore frames to closed local connections. + if (connection) // Ignore frames to closed local connections. connection->deliveredFrame(e); } @@ -389,6 +401,10 @@ void Cluster::stall(Lock&) { // Stop processing the deliveredEventQueue in order to send or // recieve an update. deliverEventQueue.stop(); + + // FIXME aconway 2009-03-04: if frame queue is re-enabled, we must + // also wait for it to be empty before we are stalled, so that + // our local model is up-to-date to give an update. } void Cluster::unstall(Lock&) { @@ -434,17 +450,18 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { cs.password = settings.password; cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(), + new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), cs)); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) { Lock l(lock); updatedMap = m; - frameId = fid; + eventId = eventId_; + frameId = frameId_; checkUpdateIn(l); } @@ -601,9 +618,11 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { } void Cluster::connectionFrame(const EventFrame& frame) { - // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition. - // Measure performance impact, restore with better locking. + // FIXME aconway 2009-03-02: bypass deliverFrameQueue to avoid race condition. + // Measure performance impact & review. + // // deliverFrameQueue.push(frame); + // deliveredFrame(frame); } |