summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp129
1 files changed, 89 insertions, 40 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 460f974b36..f8a875a30c 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -16,7 +16,8 @@
*
*/
-/** CLUSTER IMPLEMENTATION OVERVIEW
+/**
+ * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1>
*
* The cluster works on the principle that if all members of the
* cluster receive identical input, they will all produce identical
@@ -41,12 +42,15 @@
*
* The following are the current areas where broker uses timers or timestamps:
*
- * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput.
- * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState
+ * - Producer flow control: broker::SemanticState uses
+ * connection::getClusterOrderOutput. a FrameHandler that sends
+ * frames to the client via the cluster. Used by broker::SessionState
*
- * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy.
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
+ * implemented by cluster::ExpiryPolicy.
*
- * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore.
+ * - Connection heartbeat: sends connection controls, not part of
+ * session command counting so OK to ignore.
*
* - LinkRegistry: only cluster elder is ever active for links.
*
@@ -57,7 +61,10 @@
*
* cluster::ExpiryPolicy implements the strategy for message expiry.
*
- * CLUSTER PROTOCOL OVERVIEW
+ * ClusterTimer implements periodic timed events in the cluster context.
+ * Used for periodic management events.
+ *
+ * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
*
* Messages sent to/from CPG are called Events.
*
@@ -84,12 +91,16 @@
* - Connection control events carrying non-cluster frames: frames sent to the client.
* e.g. flow-control frames generated on a timer.
*
- * CLUSTER INITIALIZATION OVERVIEW
+ * <h1>CLUSTER INITIALIZATION OVERVIEW</h1>
+ *
+ * @see InitialStatusMap
*
* When a new member joins the CPG group, all members (including the
* new one) multicast their "initial status." The new member is in
- * INIT mode until it gets a complete set of initial status messages
- * from all cluster members.
+ * PRE_INIT mode until it gets a complete set of initial status
+ * messages from all cluster members. In a newly-forming cluster is
+ * then in INIT mode until the configured cluster-size members have
+ * joined.
*
* The newcomer uses initial status to determine
* - The cluster UUID
@@ -97,11 +108,16 @@
* - Do I need to get an update from an existing active member?
* - Can I recover from my own store?
*
- * Initialization happens in the Cluster constructor (plugin
- * early-init phase) because it needs to be done before the store
- * initializes. In INIT mode sending & receiving from the cluster are
- * done single-threaded, bypassing the normal PollableQueues because
- * the Poller is not active at this point to service them.
+ * Pre-initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to set the recovery flag before
+ * the store initializes. This phase lasts until inital-status is
+ * received for all active members. The PollableQueues and Multicaster
+ * are in "bypass" mode during this phase since the poller has not
+ * started so there are no threads to serve pollable queues.
+ *
+ * The remaining initialization happens in Cluster::initialize() or,
+ * if cluster-size=N is specified, in the deliver thread when an
+ * initial-status control is delivered that brings the total to N.
*/
#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
@@ -191,16 +207,19 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
uint8_t storeState, const Uuid& shutdownId,
- const framing::SequenceNumber& configSeq,
const std::string& firstConfig)
{
cluster.initialStatus(
member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId, configSeq,
+ framing::cluster::StoreState(storeState), shutdownId,
firstConfig, l);
}
- void ready(const std::string& url) { cluster.ready(member, url, l); }
- void configChange(const std::string& current) { cluster.configChange(member, current, l); }
+ void ready(const std::string& url) {
+ cluster.ready(member, url, l);
+ }
+ void configChange(const std::string& current) {
+ cluster.configChange(member, current, l);
+ }
void updateOffer(uint64_t updatee) {
cluster.updateOffer(member, updatee, l);
}
@@ -241,7 +260,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
- state(INIT),
+ state(PRE_INIT),
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
elder(false),
@@ -271,17 +290,18 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- if (store.getState() == STORE_STATE_DIRTY_STORE)
- broker.setRecovery(false); // Ditch my current store.
if (store.getClusterId())
clusterId = store.getClusterId(); // Use stored ID if there is one.
QPID_LOG(notice, "Cluster store state: " << store)
}
-
cpg.join(name);
+ // pump the CPG dispatch manually till we get past PRE_INIT.
+ while (state == PRE_INIT)
+ cpg.dispatchOne();
}
Cluster::~Cluster() {
@@ -298,9 +318,14 @@ void Cluster::initialize() {
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
+ mcast.start();
+
+ // Run initMapCompleted immediately to process the initial configuration.
+ assert(state == INIT);
+ initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
// Add finalizer last for exception safety.
- broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
// Called in connection thread to insert a client connection.
@@ -510,8 +535,13 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
assert(cp);
}
else { // New remote connection, create a shadow.
- unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
- cp = new Connection(*this, shadowOut, announce->getManagementId(), id, ssf);
+ qpid::sys::SecuritySettings secSettings;
+ if (announce) {
+ secSettings.ssf = announce->getSsf();
+ secSettings.authid = announce->getAuthid();
+ secSettings.nodict = announce->getNodict();
+ }
+ cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings);
}
connections.insert(ConnectionMap::value_type(id, cp));
}
@@ -571,9 +601,27 @@ void Cluster::setReady(Lock&) {
void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
- if (state == INIT) {
- // We have status for all members so we can make join descisions.
+ if (state == PRE_INIT) {
+ // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
+ // We decide here whether we want to recover from our store.
+ // We won't recover if we are joining an active cluster or our store is dirty.
+ if (store.hasStore() &&
+ (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
+ broker.setRecovery(false); // Ditch my current store.
+ state = INIT;
+ }
+ else if (state == INIT) {
+ // INIT means we are past Cluster::initialize().
+
+ // If we're forming an initial cluster (no active members)
+ // then we wait to reach the configured cluster-size
+ if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) {
+ QPID_LOG(info, *this << initMap.getActualSize()
+ << " members, waiting for at least " << initMap.getRequiredSize());
+ return;
+ }
initMap.checkConsistent();
+
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (elders.empty())
@@ -626,7 +674,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
mcast.mcastControl(
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(), store.getConfigSeq(),
+ store.getState(), store.getShutdownId(),
initMap.getFirstConfigStr()
),
self);
@@ -668,15 +716,13 @@ struct AppendQueue {
// Log a snapshot of broker state, used for debugging inconsistency problems.
// May only be called in deliver thread.
-void Cluster::debugSnapshot(const char* prefix, Connection* connection) {
+std::string Cluster::debugSnapshot() {
assertClusterSafe();
std::ostringstream msg;
- msg << prefix;
- if (connection) msg << " " << connection->getId();
- msg << " snapshot " << map.getFrameSeq() << ":";
+ msg << "queue snapshot at " << map.getFrameSeq() << ":";
AppendQueue append(msg);
broker.getQueues().eachQueue(append);
- QPID_LOG(trace, msg.str());
+ return msg.str();
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -702,7 +748,6 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
- const framing::SequenceNumber& configSeq,
const std::string& firstConfig,
Lock& l)
{
@@ -715,7 +760,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
initMap.received(
member,
ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
- store, shutdownId, configSeq, firstConfig)
+ store, shutdownId, firstConfig)
);
if (initMap.transitionToComplete()) initMapCompleted(l);
}
@@ -762,8 +807,9 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
deliverEventQueue.start(); // Not involved in update.
}
if (updatee != self && url) {
- debugSnapshot("join");
+ QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
+ // Updatee will call clusterUpdate when update completes
}
}
@@ -836,7 +882,7 @@ void Cluster::checkUpdateIn(Lock& l) {
if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
- debugSnapshot("initial");
+ QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
deliverEventQueue.start();
}
@@ -963,7 +1009,8 @@ void Cluster::memberUpdate(Lock& l) {
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = {
- "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
+ "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
@@ -1003,12 +1050,14 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
}
void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
- timer->deliverWakeup(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverWakeup(name);
}
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
- timer->deliverDrop(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {