diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 169 |
1 files changed, 52 insertions, 117 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e6e3de64f2..dd4882774b 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -36,45 +36,45 @@ * * IMPORTANT NOTE: any time code is added to the broker that uses timers, * the cluster may need to be updated to take account of this. - * + * * * USE OF TIMESTAMPS IN THE BROKER - * + * * The following are the current areas where broker uses timers or timestamps: - * + * * - Producer flow control: broker::SemanticState uses * connection::getClusterOrderOutput. a FrameHandler that sends * frames to the client via the cluster. Used by broker::SessionState - * + * * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is * implemented by cluster::ExpiryPolicy. - * + * * - Connection heartbeat: sends connection controls, not part of * session command counting so OK to ignore. - * + * * - LinkRegistry: only cluster elder is ever active for links. - * + * * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. + * + * - Dtx: not yet supported with cluster. * - * cluster::ExpiryPolicy uses cluster time. + * cluster::ExpiryPolicy implements the strategy for message expiry. * * ClusterTimer implements periodic timed events in the cluster context. - * Used for: - * - periodic management events. - * - DTX transaction timeouts. + * Used for periodic management events. * * <h1>CLUSTER PROTOCOL OVERVIEW</h1> - * + * * Messages sent to/from CPG are called Events. * * An Event carries a ConnectionId, which includes a MemberId and a * connection number. - * + * * Events are either * - Connection events: non-0 connection number and are associated with a connection. * - Cluster Events: 0 connection number, are not associated with a connection. - * + * * Events are further categorized as: * - Control: carries method frame(s) that affect cluster behavior. * - Data: carries raw data received from a client connection. @@ -146,7 +146,6 @@ #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterClockBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" #include "qpid/framing/ClusterRetractOfferBody.h" @@ -199,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1159329; +const uint32_t Cluster::CLUSTER_VERSION = 1058747; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -215,7 +214,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { { cluster.initialStatus( member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, + framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } void ready(const std::string& url) { @@ -231,21 +230,21 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } + void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } - void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); } + void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } void deliverToQueue(const std::string& queue, const std::string& message) { cluster.deliverToQueue(queue, message, l); } - void clock(uint64_t time) { cluster.clock(time, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : - settings(set), + settings(set), broker(b), mgmtObject(0), poller(b.getPoller()), @@ -254,7 +253,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : self(cpg.self()), clusterId(true), mAgent(0), - expiryPolicy(new ExpiryPolicy(*this)), + expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -278,11 +277,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : lastBroker(false), updateRetracted(false), updateClosed(false), - error(*this), - acl(0) + error(*this) { - broker.setInCluster(true); - // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. timer = new ClusterTimer(*this); @@ -303,7 +299,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - clusterId = store.getClusterId(); + clusterId = store.getClusterId(); QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); @@ -364,15 +360,14 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. - assert(discarding); + assert(discarding); pair<ConnectionMap::iterator, bool> ib = connections.insert(ConnectionMap::value_type(c->getId(), c)); - // Like this to avoid tripping up unused variable warning when NDEBUG set - if (!ib.second) assert(ib.second); + assert(ib.second); } void Cluster::erase(const ConnectionId& id) { - Lock l(lock); + Lock l(lock); erase(id,l); } @@ -398,9 +393,9 @@ std::vector<Url> Cluster::getUrls() const { std::vector<Url> Cluster::getUrls(Lock&) const { return map.memberUrls(); -} +} -void Cluster::leave() { +void Cluster::leave() { Lock l(lock); leave(l); } @@ -410,7 +405,7 @@ void Cluster::leave() { QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) -void Cluster::leave(Lock&) { +void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); @@ -429,7 +424,7 @@ void Cluster::deliver( uint32_t nodeid, uint32_t pid, void* msg, - int msg_len) + int msg_len) { MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); @@ -460,7 +455,7 @@ void Cluster::deliveredEvent(const Event& e) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - // Only do this for the two brokers that are directly involved in this + // Only do this for the two brokers that are directly involved in this // offer: the one making the offer, or the one receiving it. const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) { @@ -470,7 +465,7 @@ void Cluster::deliveredEvent(const Event& e) { } deliverFrame(ef); } - else if(!discarding) { + else if(!discarding) { if (e.isControl()) deliverFrame(EventFrame(e, e.getFrame())); else { @@ -512,7 +507,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { // the event queue. e.frame = AMQFrame( ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); - deliverEventQueue.start(); + deliverEventQueue.start(); } // Process each frame through the error checker. if (error.isUnresolved()) { @@ -520,14 +515,14 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); } - else + else processFrame(e, l); } void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { - QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e); + QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); @@ -536,15 +531,14 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { map.incrementFrameSeq(); ConnectionPtr connection = getConnection(e, l); if (connection) { - QPID_LOG_IF(trace, loggable(e.frame), - *this << " DLVR " << map.getFrameSeq() << ": " << e); + QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); connection->deliveredFrame(e); } else - throw Exception(QPID_MSG("Unknown connection: " << e)); + QPID_LOG(trace, *this << " DROP (no connection): " << e); } else // Drop connection frames while state < CATCHUP - QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e); + QPID_LOG(trace, *this << " DROP (joining): " << e); } // Called in deliverFrameQueue thread @@ -583,7 +577,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { } // CPG config-change callback. -void Cluster::configChange ( +void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, const cpg_address *members, int nMembers, @@ -613,7 +607,7 @@ void Cluster::setReady(Lock&) { } // Set the management status from the Cluster::state. -// +// // NOTE: Management updates are sent based on property changes. In // order to keep consistency across the cluster, we touch the local // management status property even if it is locally unchanged for any @@ -624,7 +618,7 @@ void Cluster::setMgmtStatus(Lock&) { } void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. + // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); setMgmtStatus(l); if (state == PRE_INIT) { @@ -671,8 +665,6 @@ void Cluster::initMapCompleted(Lock& l) { else { // I can go ready. discarding = false; setReady(l); - // Must be called *before* memberUpdate so first update will be generated. - failoverExchange->setReady(); memberUpdate(l); updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -709,8 +701,8 @@ void Cluster::configChange(const MemberId&, if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( - ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), + ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, + store.getState(), store.getShutdownId(), initMap.getFirstConfigStr() ), self); @@ -725,20 +717,6 @@ void Cluster::configChange(const MemberId&, updateMgmtMembership(l); // Update on every config change for consistency } -struct ClusterClockTask : public sys::TimerTask { - Cluster& cluster; - sys::Timer& timer; - - ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval) - : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {} - - void fire() { - cluster.sendClockUpdate(); - setupNextFire(); - timer.add(this); - } -}; - void Cluster::becomeElder(Lock&) { if (elder) return; // We were already the elder. // We are the oldest, reactive links if necessary @@ -746,8 +724,6 @@ void Cluster::becomeElder(Lock&) { elder = true; broker.getLinks().setPassive(false); timer->becomeElder(); - - clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval)); } void Cluster::makeOffer(const MemberId& id, Lock& ) { @@ -783,7 +759,7 @@ std::string Cluster::debugSnapshot() { // point we know the poller has stopped so no poller callbacks will be // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. -// +// void Cluster::brokerShutdown() { sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. try { cpg.shutdown(); } @@ -799,7 +775,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) } void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, - const framing::Uuid& id, + const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, const std::string& firstConfig, @@ -857,8 +833,6 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) else if (updatee == self && url) { assert(state == JOINER); state = UPDATEE; - acl = broker.getAcl(); - broker.setAcl(0); // Disable ACL during update QPID_LOG(notice, *this << " receiving update from " << updater); checkUpdateIn(l); } @@ -870,7 +844,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) if (updatee != self && url) { QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); - // Updatee will call clusterUpdate() via checkUpdateIn() when update completes + // Updatee will call clusterUpdate when update completes } } @@ -951,15 +925,13 @@ void Cluster::checkUpdateIn(Lock& l) { if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; + failoverExchange->setUrls(getUrls(l)); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); - // Must be called *after* memberUpdate() to avoid sending an extra update. - failoverExchange->setReady(); // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); - broker.setAcl(acl); // Restore ACL discarding = false; // OK to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. @@ -969,10 +941,6 @@ void Cluster::checkUpdateIn(Lock& l) { mAgent->suppress(false); // Enable management output. mAgent->clusterUpdate(); } - // Restore alternate exchange settings on exchanges. - broker.getExchanges().eachExchange( - boost::bind(&broker::Exchange::recoveryComplete, _1, - boost::ref(broker.getExchanges()))); enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } @@ -1001,7 +969,7 @@ void Cluster::updateOutDone(Lock& l) { void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending update: " << e.what()); + QPID_LOG(error, *this << " error sending update: " << e.what()); updateOutDone(l); } @@ -1099,7 +1067,7 @@ void Cluster::memberUpdate(Lock& l) { void Cluster::updateMgmtMembership(Lock& l) { if (!mgmtObject) return; std::vector<Url> urls = getUrls(l); - mgmtObject->set_clusterSize(urls.size()); + mgmtObject->set_clusterSize(urls.size()); string urlstr; for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { if (i != urls.begin()) urlstr += ";"; @@ -1146,6 +1114,10 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } +void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { + expiryPolicy->deliverExpire(id); +} + void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { // If we see an errorCheck here (rather than in the ErrorCheck // class) then we have processed succesfully past the point of the @@ -1183,35 +1155,6 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag q->deliver(msg); } -sys::AbsTime Cluster::getClusterTime() { - Mutex::ScopedLock l(lock); - return clusterTime; -} - -// This method is called during update on the updatee to set the initial cluster time. -void Cluster::clock(const uint64_t time) { - Mutex::ScopedLock l(lock); - clock(time, l); -} - -// called when broadcast message received -void Cluster::clock(const uint64_t time, Lock&) { - clusterTime = AbsTime(EPOCH, time); - AbsTime now = AbsTime::now(); - - if (!elder) { - clusterTimeOffset = Duration(now, clusterTime); - } -} - -// called by elder timer to send clock broadcast -void Cluster::sendClockUpdate() { - Mutex::ScopedLock l(lock); - int64_t nanosecondsSinceEpoch = Duration(EPOCH, now()); - nanosecondsSinceEpoch += clusterTimeOffset; - mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self); -} - bool Cluster::deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg) { @@ -1224,12 +1167,4 @@ bool Cluster::deferDeliveryImpl(const std::string& queue, return true; } -bool Cluster::loggable(const AMQFrame& f) { - const AMQMethodBody* method = (f.getMethod()); - if (!method) return true; // Not a method - bool isClock = method->amqpClassId() == ClusterClockBody::CLASS_ID - && method->amqpMethodId() == ClusterClockBody::METHOD_ID; - return !isClock; -} - }} // namespace qpid::cluster |