diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index dd4882774b..fe5a1c806e 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -36,28 +36,28 @@ * * IMPORTANT NOTE: any time code is added to the broker that uses timers, * the cluster may need to be updated to take account of this. - * + * * * USE OF TIMESTAMPS IN THE BROKER - * + * * The following are the current areas where broker uses timers or timestamps: - * + * * - Producer flow control: broker::SemanticState uses * connection::getClusterOrderOutput. a FrameHandler that sends * frames to the client via the cluster. Used by broker::SessionState - * + * * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is * implemented by cluster::ExpiryPolicy. - * + * * - Connection heartbeat: sends connection controls, not part of * session command counting so OK to ignore. - * + * * - LinkRegistry: only cluster elder is ever active for links. - * + * * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. - * - * - Dtx: not yet supported with cluster. + * + * - Dtx: not yet supported with cluster. * * cluster::ExpiryPolicy implements the strategy for message expiry. * @@ -65,16 +65,16 @@ * Used for periodic management events. * * <h1>CLUSTER PROTOCOL OVERVIEW</h1> - * + * * Messages sent to/from CPG are called Events. * * An Event carries a ConnectionId, which includes a MemberId and a * connection number. - * + * * Events are either * - Connection events: non-0 connection number and are associated with a connection. * - Cluster Events: 0 connection number, are not associated with a connection. - * + * * Events are further categorized as: * - Control: carries method frame(s) that affect cluster behavior. * - Data: carries raw data received from a client connection. @@ -214,7 +214,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { { cluster.initialStatus( member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, + framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } void ready(const std::string& url) { @@ -244,7 +244,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : - settings(set), + settings(set), broker(b), mgmtObject(0), poller(b.getPoller()), @@ -279,6 +279,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : updateClosed(false), error(*this) { + broker.setInCluster(true); + // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. timer = new ClusterTimer(*this); @@ -299,7 +301,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - clusterId = store.getClusterId(); + clusterId = store.getClusterId(); QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); @@ -360,14 +362,14 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. - assert(discarding); + assert(discarding); pair<ConnectionMap::iterator, bool> ib = connections.insert(ConnectionMap::value_type(c->getId(), c)); assert(ib.second); } void Cluster::erase(const ConnectionId& id) { - Lock l(lock); + Lock l(lock); erase(id,l); } @@ -393,9 +395,9 @@ std::vector<Url> Cluster::getUrls() const { std::vector<Url> Cluster::getUrls(Lock&) const { return map.memberUrls(); -} +} -void Cluster::leave() { +void Cluster::leave() { Lock l(lock); leave(l); } @@ -405,7 +407,7 @@ void Cluster::leave() { QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) -void Cluster::leave(Lock&) { +void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); @@ -424,7 +426,7 @@ void Cluster::deliver( uint32_t nodeid, uint32_t pid, void* msg, - int msg_len) + int msg_len) { MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); @@ -455,7 +457,7 @@ void Cluster::deliveredEvent(const Event& e) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - // Only do this for the two brokers that are directly involved in this + // Only do this for the two brokers that are directly involved in this // offer: the one making the offer, or the one receiving it. const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) { @@ -465,7 +467,7 @@ void Cluster::deliveredEvent(const Event& e) { } deliverFrame(ef); } - else if(!discarding) { + else if(!discarding) { if (e.isControl()) deliverFrame(EventFrame(e, e.getFrame())); else { @@ -507,7 +509,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { // the event queue. e.frame = AMQFrame( ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); - deliverEventQueue.start(); + deliverEventQueue.start(); } // Process each frame through the error checker. if (error.isUnresolved()) { @@ -515,7 +517,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); } - else + else processFrame(e, l); } @@ -577,7 +579,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { } // CPG config-change callback. -void Cluster::configChange ( +void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, const cpg_address *members, int nMembers, @@ -607,7 +609,7 @@ void Cluster::setReady(Lock&) { } // Set the management status from the Cluster::state. -// +// // NOTE: Management updates are sent based on property changes. In // order to keep consistency across the cluster, we touch the local // management status property even if it is locally unchanged for any @@ -618,7 +620,7 @@ void Cluster::setMgmtStatus(Lock&) { } void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. + // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); setMgmtStatus(l); if (state == PRE_INIT) { @@ -701,8 +703,8 @@ void Cluster::configChange(const MemberId&, if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( - ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), + ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, + store.getState(), store.getShutdownId(), initMap.getFirstConfigStr() ), self); @@ -759,7 +761,7 @@ std::string Cluster::debugSnapshot() { // point we know the poller has stopped so no poller callbacks will be // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. -// +// void Cluster::brokerShutdown() { sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. try { cpg.shutdown(); } @@ -775,7 +777,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) } void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, - const framing::Uuid& id, + const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, const std::string& firstConfig, @@ -969,7 +971,7 @@ void Cluster::updateOutDone(Lock& l) { void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending update: " << e.what()); + QPID_LOG(error, *this << " error sending update: " << e.what()); updateOutDone(l); } @@ -1067,7 +1069,7 @@ void Cluster::memberUpdate(Lock& l) { void Cluster::updateMgmtMembership(Lock& l) { if (!mgmtObject) return; std::vector<Url> urls = getUrls(l); - mgmtObject->set_clusterSize(urls.size()); + mgmtObject->set_clusterSize(urls.size()); string urlstr; for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { if (i != urls.begin()) urlstr += ";"; |