diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 129 |
1 files changed, 89 insertions, 40 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 460f974b36..f8a875a30c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -16,7 +16,8 @@ * */ -/** CLUSTER IMPLEMENTATION OVERVIEW +/** + * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1> * * The cluster works on the principle that if all members of the * cluster receive identical input, they will all produce identical @@ -41,12 +42,15 @@ * * The following are the current areas where broker uses timers or timestamps: * - * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput. - * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState + * - Producer flow control: broker::SemanticState uses + * connection::getClusterOrderOutput. a FrameHandler that sends + * frames to the client via the cluster. Used by broker::SessionState * - * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy. + * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is + * implemented by cluster::ExpiryPolicy. * - * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore. + * - Connection heartbeat: sends connection controls, not part of + * session command counting so OK to ignore. * * - LinkRegistry: only cluster elder is ever active for links. * @@ -57,7 +61,10 @@ * * cluster::ExpiryPolicy implements the strategy for message expiry. * - * CLUSTER PROTOCOL OVERVIEW + * ClusterTimer implements periodic timed events in the cluster context. + * Used for periodic management events. + * + * <h1>CLUSTER PROTOCOL OVERVIEW</h1> * * Messages sent to/from CPG are called Events. * @@ -84,12 +91,16 @@ * - Connection control events carrying non-cluster frames: frames sent to the client. * e.g. flow-control frames generated on a timer. * - * CLUSTER INITIALIZATION OVERVIEW + * <h1>CLUSTER INITIALIZATION OVERVIEW</h1> + * + * @see InitialStatusMap * * When a new member joins the CPG group, all members (including the * new one) multicast their "initial status." The new member is in - * INIT mode until it gets a complete set of initial status messages - * from all cluster members. + * PRE_INIT mode until it gets a complete set of initial status + * messages from all cluster members. In a newly-forming cluster is + * then in INIT mode until the configured cluster-size members have + * joined. * * The newcomer uses initial status to determine * - The cluster UUID @@ -97,11 +108,16 @@ * - Do I need to get an update from an existing active member? * - Can I recover from my own store? * - * Initialization happens in the Cluster constructor (plugin - * early-init phase) because it needs to be done before the store - * initializes. In INIT mode sending & receiving from the cluster are - * done single-threaded, bypassing the normal PollableQueues because - * the Poller is not active at this point to service them. + * Pre-initialization happens in the Cluster constructor (plugin + * early-init phase) because it needs to set the recovery flag before + * the store initializes. This phase lasts until inital-status is + * received for all active members. The PollableQueues and Multicaster + * are in "bypass" mode during this phase since the poller has not + * started so there are no threads to serve pollable queues. + * + * The remaining initialization happens in Cluster::initialize() or, + * if cluster-size=N is specified, in the deliver thread when an + * initial-status control is delivered that brings the total to N. */ #include "qpid/Exception.h" #include "qpid/cluster/Cluster.h" @@ -191,16 +207,19 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void initialStatus(uint32_t version, bool active, const Uuid& clusterId, uint8_t storeState, const Uuid& shutdownId, - const framing::SequenceNumber& configSeq, const std::string& firstConfig) { cluster.initialStatus( member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, configSeq, + framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } - void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& current) { cluster.configChange(member, current, l); } + void ready(const std::string& url) { + cluster.ready(member, url, l); + } + void configChange(const std::string& current) { + cluster.configChange(member, current, l); + } void updateOffer(uint64_t updatee) { cluster.updateOffer(member, updatee, l); } @@ -241,7 +260,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), - state(INIT), + state(PRE_INIT), initMap(self, settings.size), store(broker.getDataDir().getPath()), elder(false), @@ -271,17 +290,18 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - if (store.getState() == STORE_STATE_DIRTY_STORE) - broker.setRecovery(false); // Ditch my current store. if (store.getClusterId()) clusterId = store.getClusterId(); // Use stored ID if there is one. QPID_LOG(notice, "Cluster store state: " << store) } - cpg.join(name); + // pump the CPG dispatch manually till we get past PRE_INIT. + while (state == PRE_INIT) + cpg.dispatchOne(); } Cluster::~Cluster() { @@ -298,9 +318,14 @@ void Cluster::initialize() { dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); + mcast.start(); + + // Run initMapCompleted immediately to process the initial configuration. + assert(state == INIT); + initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. // Add finalizer last for exception safety. - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. @@ -510,8 +535,13 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { assert(cp); } else { // New remote connection, create a shadow. - unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0; - cp = new Connection(*this, shadowOut, announce->getManagementId(), id, ssf); + qpid::sys::SecuritySettings secSettings; + if (announce) { + secSettings.ssf = announce->getSsf(); + secSettings.authid = announce->getAuthid(); + secSettings.nodict = announce->getNodict(); + } + cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings); } connections.insert(ConnectionMap::value_type(id, cp)); } @@ -571,9 +601,27 @@ void Cluster::setReady(Lock&) { void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); - if (state == INIT) { - // We have status for all members so we can make join descisions. + if (state == PRE_INIT) { + // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. + // We decide here whether we want to recover from our store. + // We won't recover if we are joining an active cluster or our store is dirty. + if (store.hasStore() && + (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) + broker.setRecovery(false); // Ditch my current store. + state = INIT; + } + else if (state == INIT) { + // INIT means we are past Cluster::initialize(). + + // If we're forming an initial cluster (no active members) + // then we wait to reach the configured cluster-size + if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) { + QPID_LOG(info, *this << initMap.getActualSize() + << " members, waiting for at least " << initMap.getRequiredSize()); + return; + } initMap.checkConsistent(); + elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (elders.empty()) @@ -626,7 +674,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& mcast.mcastControl( ClusterInitialStatusBody( ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), store.getConfigSeq(), + store.getState(), store.getShutdownId(), initMap.getFirstConfigStr() ), self); @@ -668,15 +716,13 @@ struct AppendQueue { // Log a snapshot of broker state, used for debugging inconsistency problems. // May only be called in deliver thread. -void Cluster::debugSnapshot(const char* prefix, Connection* connection) { +std::string Cluster::debugSnapshot() { assertClusterSafe(); std::ostringstream msg; - msg << prefix; - if (connection) msg << " " << connection->getId(); - msg << " snapshot " << map.getFrameSeq() << ":"; + msg << "queue snapshot at " << map.getFrameSeq() << ":"; AppendQueue append(msg); broker.getQueues().eachQueue(append); - QPID_LOG(trace, msg.str()); + return msg.str(); } // Called from Broker::~Broker when broker is shut down. At this @@ -702,7 +748,6 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, - const framing::SequenceNumber& configSeq, const std::string& firstConfig, Lock& l) { @@ -715,7 +760,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ initMap.received( member, ClusterInitialStatusBody(ProtocolVersion(), version, active, id, - store, shutdownId, configSeq, firstConfig) + store, shutdownId, firstConfig) ); if (initMap.transitionToComplete()) initMapCompleted(l); } @@ -762,8 +807,9 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) deliverEventQueue.start(); // Not involved in update. } if (updatee != self && url) { - debugSnapshot("join"); + QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); + // Updatee will call clusterUpdate when update completes } } @@ -836,7 +882,7 @@ void Cluster::checkUpdateIn(Lock& l) { if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); - debugSnapshot("initial"); + QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); deliverEventQueue.start(); } @@ -963,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) { std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { - "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", + "READY", "OFFER", "UPDATER", "LEFT" }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); o << "cluster(" << cluster.self << " " << STATE[cluster.state]; @@ -1003,12 +1050,14 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { - timer->deliverWakeup(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverWakeup(name); } void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) - timer->deliverDrop(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverDrop(name); } bool Cluster::isElder() const { |