diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 91 |
1 files changed, 67 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 92e2b65fe2..f8a875a30c 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -16,7 +16,8 @@ * */ -/** CLUSTER IMPLEMENTATION OVERVIEW +/** + * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1> * * The cluster works on the principle that if all members of the * cluster receive identical input, they will all produce identical @@ -41,12 +42,15 @@ * * The following are the current areas where broker uses timers or timestamps: * - * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput. - * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState + * - Producer flow control: broker::SemanticState uses + * connection::getClusterOrderOutput. a FrameHandler that sends + * frames to the client via the cluster. Used by broker::SessionState * - * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy. + * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is + * implemented by cluster::ExpiryPolicy. * - * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore. + * - Connection heartbeat: sends connection controls, not part of + * session command counting so OK to ignore. * * - LinkRegistry: only cluster elder is ever active for links. * @@ -57,7 +61,10 @@ * * cluster::ExpiryPolicy implements the strategy for message expiry. * - * CLUSTER PROTOCOL OVERVIEW + * ClusterTimer implements periodic timed events in the cluster context. + * Used for periodic management events. + * + * <h1>CLUSTER PROTOCOL OVERVIEW</h1> * * Messages sent to/from CPG are called Events. * @@ -84,12 +91,16 @@ * - Connection control events carrying non-cluster frames: frames sent to the client. * e.g. flow-control frames generated on a timer. * - * CLUSTER INITIALIZATION OVERVIEW + * <h1>CLUSTER INITIALIZATION OVERVIEW</h1> + * + * @see InitialStatusMap * * When a new member joins the CPG group, all members (including the * new one) multicast their "initial status." The new member is in - * INIT mode until it gets a complete set of initial status messages - * from all cluster members. + * PRE_INIT mode until it gets a complete set of initial status + * messages from all cluster members. In a newly-forming cluster is + * then in INIT mode until the configured cluster-size members have + * joined. * * The newcomer uses initial status to determine * - The cluster UUID @@ -97,11 +108,16 @@ * - Do I need to get an update from an existing active member? * - Can I recover from my own store? * - * Initialization happens in the Cluster constructor (plugin - * early-init phase) because it needs to be done before the store - * initializes. In INIT mode sending & receiving from the cluster are - * done single-threaded, bypassing the normal PollableQueues because - * the Poller is not active at this point to service them. + * Pre-initialization happens in the Cluster constructor (plugin + * early-init phase) because it needs to set the recovery flag before + * the store initializes. This phase lasts until inital-status is + * received for all active members. The PollableQueues and Multicaster + * are in "bypass" mode during this phase since the poller has not + * started so there are no threads to serve pollable queues. + * + * The remaining initialization happens in Cluster::initialize() or, + * if cluster-size=N is specified, in the deliver thread when an + * initial-status control is delivered that brings the total to N. */ #include "qpid/Exception.h" #include "qpid/cluster/Cluster.h" @@ -244,7 +260,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), - state(INIT), + state(PRE_INIT), initMap(self, settings.size), store(broker.getDataDir().getPath()), elder(false), @@ -274,17 +290,18 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - if (store.getState() == STORE_STATE_DIRTY_STORE) - broker.setRecovery(false); // Ditch my current store. if (store.getClusterId()) clusterId = store.getClusterId(); // Use stored ID if there is one. QPID_LOG(notice, "Cluster store state: " << store) } - cpg.join(name); + // pump the CPG dispatch manually till we get past PRE_INIT. + while (state == PRE_INIT) + cpg.dispatchOne(); } Cluster::~Cluster() { @@ -301,9 +318,14 @@ void Cluster::initialize() { dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); + mcast.start(); + + // Run initMapCompleted immediately to process the initial configuration. + assert(state == INIT); + initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. // Add finalizer last for exception safety. - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. @@ -579,9 +601,27 @@ void Cluster::setReady(Lock&) { void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); - if (state == INIT) { - // We have status for all members so we can make join descisions. + if (state == PRE_INIT) { + // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. + // We decide here whether we want to recover from our store. + // We won't recover if we are joining an active cluster or our store is dirty. + if (store.hasStore() && + (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) + broker.setRecovery(false); // Ditch my current store. + state = INIT; + } + else if (state == INIT) { + // INIT means we are past Cluster::initialize(). + + // If we're forming an initial cluster (no active members) + // then we wait to reach the configured cluster-size + if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) { + QPID_LOG(info, *this << initMap.getActualSize() + << " members, waiting for at least " << initMap.getRequiredSize()); + return; + } initMap.checkConsistent(); + elders = initMap.getElders(); QPID_LOG(debug, *this << " elders: " << elders); if (elders.empty()) @@ -969,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) { std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { - "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", + "READY", "OFFER", "UPDATER", "LEFT" }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); o << "cluster(" << cluster.self << " " << STATE[cluster.state]; @@ -1009,12 +1050,14 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { - timer->deliverWakeup(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverWakeup(name); } void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) - timer->deliverDrop(name); + if (state >= CATCHUP) // Pre catchup our timer isn't set up. + timer->deliverDrop(name); } bool Cluster::isElder() const { |