summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp1176
1 files changed, 1176 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
new file mode 100644
index 0000000000..0daf0c7f5a
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -0,0 +1,1176 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1>
+ *
+ * The cluster works on the principle that if all members of the
+ * cluster receive identical input, they will all produce identical
+ * results. cluster::Connections intercept data received from clients
+ * and multicast it via CPG. The data is processed (passed to the
+ * broker::Connection) only when it is received from CPG in cluster
+ * order. Each cluster member has Connection objects for directly
+ * connected clients and "shadow" Connection objects for connections
+ * to other members.
+ *
+ * This assumes that all broker actions occur deterministically in
+ * response to data arriving on client connections. There are two
+ * situations where this assumption fails:
+ * - sending data in response to polling local connections for writabiliy.
+ * - taking actions based on a timer or timestamp comparison.
+ *
+ * IMPORTANT NOTE: any time code is added to the broker that uses timers,
+ * the cluster may need to be updated to take account of this.
+ *
+ *
+ * USE OF TIMESTAMPS IN THE BROKER
+ *
+ * The following are the current areas where broker uses timers or timestamps:
+ *
+ * - Producer flow control: broker::SemanticState uses
+ * connection::getClusterOrderOutput. a FrameHandler that sends
+ * frames to the client via the cluster. Used by broker::SessionState
+ *
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
+ * implemented by cluster::ExpiryPolicy.
+ *
+ * - Connection heartbeat: sends connection controls, not part of
+ * session command counting so OK to ignore.
+ *
+ * - LinkRegistry: only cluster elder is ever active for links.
+ *
+ * - management::ManagementBroker: uses MessageHandler supplied by cluster
+ * to send messages to the broker via the cluster.
+ *
+ * - Dtx: not yet supported with cluster.
+ *
+ * cluster::ExpiryPolicy implements the strategy for message expiry.
+ *
+ * ClusterTimer implements periodic timed events in the cluster context.
+ * Used for periodic management events.
+ *
+ * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
+ *
+ * Messages sent to/from CPG are called Events.
+ *
+ * An Event carries a ConnectionId, which includes a MemberId and a
+ * connection number.
+ *
+ * Events are either
+ * - Connection events: non-0 connection number and are associated with a connection.
+ * - Cluster Events: 0 connection number, are not associated with a connection.
+ *
+ * Events are further categorized as:
+ * - Control: carries method frame(s) that affect cluster behavior.
+ * - Data: carries raw data received from a client connection.
+ *
+ * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml
+ * which defines two classes:
+ * - cluster: cluster control information.
+ * - cluster.connection: control information for a specific connection.
+ *
+ * The following combinations are legal:
+ * - Data frames carrying connection data.
+ * - Cluster control events carrying cluster commands.
+ * - Connection control events carrying cluster.connection commands.
+ * - Connection control events carrying non-cluster frames: frames sent to the client.
+ * e.g. flow-control frames generated on a timer.
+ *
+ * <h1>CLUSTER INITIALIZATION OVERVIEW</h1>
+ *
+ * @see InitialStatusMap
+ *
+ * When a new member joins the CPG group, all members (including the
+ * new one) multicast their "initial status." The new member is in
+ * PRE_INIT mode until it gets a complete set of initial status
+ * messages from all cluster members. In a newly-forming cluster is
+ * then in INIT mode until the configured cluster-size members have
+ * joined.
+ *
+ * The newcomer uses initial status to determine
+ * - The cluster UUID
+ * - Am I speaking the correct version of the cluster protocol?
+ * - Do I need to get an update from an existing active member?
+ * - Can I recover from my own store?
+ *
+ * Pre-initialization happens in the Cluster constructor (plugin
+ * early-init phase) because it needs to set the recovery flag before
+ * the store initializes. This phase lasts until inital-status is
+ * received for all active members. The PollableQueues and Multicaster
+ * are in "bypass" mode during this phase since the poller has not
+ * started so there are no threads to serve pollable queues.
+ *
+ * The remaining initialization happens in Cluster::initialize() or,
+ * if cluster-size=N is specified, in the deliver thread when an
+ * initial-status control is delivered that brings the total to N.
+ */
+#include "qpid/Exception.h"
+#include "qpid/cluster/Cluster.h"
+#include "qpid/sys/ClusterSafe.h"
+#include "qpid/cluster/ClusterSettings.h"
+#include "qpid/cluster/Connection.h"
+#include "qpid/cluster/UpdateClient.h"
+#include "qpid/cluster/RetractClient.h"
+#include "qpid/cluster/FailoverExchange.h"
+#include "qpid/cluster/UpdateDataExchange.h"
+#include "qpid/cluster/UpdateExchange.h"
+#include "qpid/cluster/ClusterTimer.h"
+
+#include "qpid/assert.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qmf/org/apache/qpid/cluster/Package.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ClusterConnectionAbortBody.h"
+#include "qpid/framing/ClusterRetractOfferBody.h"
+#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
+#include "qpid/framing/ClusterUpdateOfferBody.h"
+#include "qpid/framing/ClusterUpdateRequestBody.h"
+#include "qpid/framing/ClusterConnectionAnnounceBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterDeliverToQueueBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Helpers.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/memory.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/cast.hpp>
+#include <boost/current_function.hpp>
+#include <algorithm>
+#include <iterator>
+#include <map>
+#include <ostream>
+
+
+namespace qpid {
+namespace cluster {
+using namespace qpid;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace qpid::cluster;
+using namespace framing::cluster;
+using namespace std;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
+namespace _qmf = ::qmf::org::apache::qpid::cluster;
+
+/**
+ * NOTE: must increment this number whenever any incompatible changes in
+ * cluster protocol/behavior are made. It allows early detection and
+ * sensible reporting of an attempt to mix different versions in a
+ * cluster.
+ *
+ * Currently use SVN revision to avoid clashes with versions from
+ * different branches.
+ */
+const uint32_t Cluster::CLUSTER_VERSION = 1097431;
+
+struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
+ qpid::cluster::Cluster& cluster;
+ MemberId member;
+ Cluster::Lock& l;
+ ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
+
+ void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
+
+ void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
+ uint8_t storeState, const Uuid& shutdownId,
+ const std::string& firstConfig)
+ {
+ cluster.initialStatus(
+ member, version, active, clusterId,
+ framing::cluster::StoreState(storeState), shutdownId,
+ firstConfig, l);
+ }
+ void ready(const std::string& url) {
+ cluster.ready(member, url, l);
+ }
+ void configChange(const std::string& members,
+ const std::string& left,
+ const std::string& joined)
+ {
+ cluster.configChange(member, members, left, joined, l);
+ }
+ void updateOffer(uint64_t updatee) {
+ cluster.updateOffer(member, updatee, l);
+ }
+ void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
+ void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+ void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
+ cluster.errorCheck(member, type, frameSeq, l);
+ }
+ void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
+ void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); }
+ void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
+ void deliverToQueue(const std::string& queue, const std::string& message) {
+ cluster.deliverToQueue(queue, message, l);
+ }
+ bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
+};
+
+Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
+ settings(set),
+ broker(b),
+ mgmtObject(0),
+ poller(b.getPoller()),
+ cpg(*this),
+ name(settings.name),
+ self(cpg.self()),
+ clusterId(true),
+ mAgent(0),
+ expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
+ mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
+ dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
+ deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
+ boost::bind(&Cluster::leave, this),
+ "Error decoding events, may indicate a broker version mismatch",
+ poller),
+ deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1),
+ boost::bind(&Cluster::leave, this),
+ "Error delivering frames",
+ poller),
+ failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
+ updateDataExchange(new UpdateDataExchange(*this)),
+ quorum(boost::bind(&Cluster::leave, this)),
+ decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
+ discarding(true),
+ state(PRE_INIT),
+ initMap(self, settings.size),
+ store(broker.getDataDir().getPath()),
+ elder(false),
+ lastAliveCount(0),
+ lastBroker(false),
+ updateRetracted(false),
+ updateClosed(false),
+ error(*this)
+{
+ broker.setInCluster(true);
+
+ // We give ownership of the timer to the broker and keep a plain pointer.
+ // This is OK as it means the timer has the same lifetime as the broker.
+ timer = new ClusterTimer(*this);
+ broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
+
+ // Failover exchange provides membership updates to clients.
+ broker.getExchanges().registerExchange(failoverExchange);
+
+ // Update exchange is used during updates to replicate messages
+ // without modifying delivery-properties.exchange.
+ broker.getExchanges().registerExchange(
+ boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
+ // Update-data exchange is used for passing data that may be too large
+ // for single control frame.
+ broker.getExchanges().registerExchange(updateDataExchange);
+
+ // Load my store status before we go into initialization
+ if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
+ store.load();
+ clusterId = store.getClusterId();
+ QPID_LOG(notice, "Cluster store state: " << store)
+ }
+ cpg.join(name);
+ // pump the CPG dispatch manually till we get past PRE_INIT.
+ while (state == PRE_INIT)
+ cpg.dispatchOne();
+}
+
+Cluster::~Cluster() {
+ broker.setClusterTimer(std::auto_ptr<sys::Timer>(0)); // Delete cluster timer
+ if (updateThread) updateThread.join(); // Join the previous updatethread.
+}
+
+void Cluster::initialize() {
+ if (settings.quorum) quorum.start(poller);
+ if (settings.url.empty())
+ myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
+ else
+ myUrl = settings.url;
+ broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
+ broker.setExpiryPolicy(expiryPolicy);
+ deliverEventQueue.bypassOff();
+ deliverEventQueue.start();
+ deliverFrameQueue.bypassOff();
+ deliverFrameQueue.start();
+ mcast.start();
+
+ /// Create management object
+ mAgent = broker.getManagementAgent();
+ if (mAgent != 0){
+ _qmf::Package packageInit(mAgent);
+ mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
+ mAgent->addObject (mgmtObject);
+ }
+
+ // Run initMapCompleted immediately to process the initial configuration
+ // that allowed us to transition out of PRE_INIT
+ assert(state == INIT);
+ initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
+
+ // Add finalizer last for exception safety.
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
+
+ // Start dispatching CPG events.
+ dispatcher.start();
+}
+
+// Called in connection thread to insert a client connection.
+void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
+ assert(c->getId().getMember() == self);
+ localConnections.insert(c);
+}
+
+// Called in connection thread to insert an updated shadow connection.
+void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
+ QPID_LOG(debug, *this << " new shadow connection " << c->getId());
+ // Safe to use connections here because we're pre-catchup, stalled
+ // and discarding, so deliveredFrame is not processing any
+ // connection events.
+ assert(discarding);
+ pair<ConnectionMap::iterator, bool> ib
+ = connections.insert(ConnectionMap::value_type(c->getId(), c));
+ assert(ib.second);
+}
+
+void Cluster::erase(const ConnectionId& id) {
+ Lock l(lock);
+ erase(id,l);
+}
+
+// Called by Connection::deliverClose() in deliverFrameQueue thread.
+void Cluster::erase(const ConnectionId& id, Lock&) {
+ connections.erase(id);
+ decoder.erase(id);
+}
+
+std::vector<string> Cluster::getIds() const {
+ Lock l(lock);
+ return getIds(l);
+}
+
+std::vector<string> Cluster::getIds(Lock&) const {
+ return map.memberIds();
+}
+
+std::vector<Url> Cluster::getUrls() const {
+ Lock l(lock);
+ return getUrls(l);
+}
+
+std::vector<Url> Cluster::getUrls(Lock&) const {
+ return map.memberUrls();
+}
+
+void Cluster::leave() {
+ Lock l(lock);
+ leave(l);
+}
+
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
+ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+ } do {} while(0)
+
+void Cluster::leave(Lock&) {
+ if (state != LEFT) {
+ state = LEFT;
+ QPID_LOG(notice, *this << " leaving cluster " << name);
+ // Finalize connections now now to avoid problems later in destructor.
+ ClusterSafeScope css; // Don't trigger cluster-safe assertions.
+ LEAVE_TRY(localConnections.clear());
+ LEAVE_TRY(connections.clear());
+ LEAVE_TRY(broker::SignalHandler::shutdown());
+ }
+}
+
+// Deliver CPG message.
+void Cluster::deliver(
+ cpg_handle_t /*handle*/,
+ const cpg_name* /*group*/,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
+{
+ MemberId from(nodeid, pid);
+ framing::Buffer buf(static_cast<char*>(msg), msg_len);
+ Event e(Event::decodeCopy(from, buf));
+ deliverEvent(e);
+}
+
+void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); }
+
+void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); }
+
+const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
+ return (body && body->getMethod() &&
+ body->getMethod()->isA<ClusterUpdateOfferBody>()) ?
+ static_cast<const ClusterUpdateOfferBody*>(body) : 0;
+}
+
+const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) {
+ return (body && body->getMethod() &&
+ body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ?
+ static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
+}
+
+// Handler for deliverEventQueue.
+// This thread decodes frames from events.
+void Cluster::deliveredEvent(const Event& e) {
+ if (e.isCluster()) {
+ EventFrame ef(e, e.getFrame());
+ // Stop the deliverEventQueue on update offers.
+ // This preserves the connection decoder fragments for an update.
+ // Only do this for the two brokers that are directly involved in this
+ // offer: the one making the offer, or the one receiving it.
+ const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
+ if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
+ QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId()
+ << " to " << MemberId(offer->getUpdatee()));
+ deliverEventQueue.stop();
+ }
+ deliverFrame(ef);
+ }
+ else if(!discarding) {
+ if (e.isControl())
+ deliverFrame(EventFrame(e, e.getFrame()));
+ else {
+ try { decoder.decode(e, e.getData()); }
+ catch (const Exception& ex) {
+ // Close a connection that is sending us invalid data.
+ QPID_LOG(error, *this << " aborting connection "
+ << e.getConnectionId() << ": " << ex.what());
+ framing::AMQFrame abort((ClusterConnectionAbortBody()));
+ deliverFrame(EventFrame(EventHeader(CONTROL, e.getConnectionId()), abort));
+ }
+ }
+ }
+}
+
+void Cluster::flagError(
+ Connection& connection, ErrorCheck::ErrorType type, const std::string& msg)
+{
+ Mutex::ScopedLock l(lock);
+ if (connection.isCatchUp()) {
+ QPID_LOG(critical, *this << " error on update connection " << connection
+ << ": " << msg);
+ leave(l);
+ }
+ error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
+}
+
+// Handler for deliverFrameQueue.
+// This thread executes the main logic.
+void Cluster::deliveredFrame(const EventFrame& efConst) {
+ Mutex::ScopedLock l(lock);
+ sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
+ if (state == LEFT) return;
+ EventFrame e(efConst);
+ const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
+ if (offer && error.isUnresolved()) {
+ // We can't honour an update offer that is delivered while an
+ // error is in progress so replace it with a retractOffer and re-start
+ // the event queue.
+ e.frame = AMQFrame(
+ ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
+ deliverEventQueue.start();
+ }
+ // Process each frame through the error checker.
+ if (error.isUnresolved()) {
+ error.delivered(e);
+ while (error.canProcess()) // There is a frame ready to process.
+ processFrame(error.getNext(), l);
+ }
+ else
+ processFrame(e, l);
+}
+
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
+ if (e.isCluster()) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
+ if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
+ }
+ else if (state >= CATCHUP) {
+ map.incrementFrameSeq();
+ ConnectionPtr connection = getConnection(e, l);
+ if (connection) {
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
+ connection->deliveredFrame(e);
+ }
+ else
+ throw Exception(QPID_MSG("Unknown connection: " << e));
+ }
+ else // Drop connection frames while state < CATCHUP
+ QPID_LOG(trace, *this << " DROP (joining): " << e);
+}
+
+// Called in deliverFrameQueue thread
+ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
+ ConnectionId id = e.connectionId;
+ ConnectionMap::iterator i = connections.find(id);
+ if (i != connections.end()) return i->second;
+ ConnectionPtr cp;
+ // If the frame is an announcement for a new connection, add it.
+ const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody());
+ if (e.frame.getBody() && e.frame.getMethod() && announce)
+ {
+ if (id.getMember() == self) { // Announces one of my own
+ cp = localConnections.getErase(id);
+ assert(cp);
+ }
+ else { // New remote connection, create a shadow.
+ qpid::sys::SecuritySettings secSettings;
+ if (announce) {
+ secSettings.ssf = announce->getSsf();
+ secSettings.authid = announce->getAuthid();
+ secSettings.nodict = announce->getNodict();
+ }
+ cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings);
+ }
+ connections.insert(ConnectionMap::value_type(id, cp));
+ }
+ return cp;
+}
+
+Cluster::ConnectionVector Cluster::getConnections(Lock&) {
+ ConnectionVector result(connections.size());
+ std::transform(connections.begin(), connections.end(), result.begin(),
+ boost::bind(&ConnectionMap::value_type::second, _1));
+ return result;
+}
+
+// CPG config-change callback.
+void Cluster::configChange (
+ cpg_handle_t /*handle*/,
+ const cpg_name */*group*/,
+ const cpg_address *members, int nMembers,
+ const cpg_address *left, int nLeft,
+ const cpg_address *joined, int nJoined)
+{
+ Mutex::ScopedLock l(lock);
+ string membersStr, leftStr, joinedStr;
+ // Encode members and enqueue as an event so the config change can
+ // be executed in the correct thread.
+ for (const cpg_address* p = members; p < members+nMembers; ++p)
+ membersStr.append(MemberId(*p).str());
+ for (const cpg_address* p = left; p < left+nLeft; ++p)
+ leftStr.append(MemberId(*p).str());
+ for (const cpg_address* p = joined; p < joined+nJoined; ++p)
+ joinedStr.append(MemberId(*p).str());
+ deliverEvent(Event::control(ClusterConfigChangeBody(
+ ProtocolVersion(), membersStr, leftStr, joinedStr),
+ self));
+}
+
+void Cluster::setReady(Lock&) {
+ state = READY;
+ mcast.setReady();
+ broker.getQueueEvents().enable();
+ enableClusterSafe(); // Enable cluster-safe assertions.
+}
+
+// Set the management status from the Cluster::state.
+//
+// NOTE: Management updates are sent based on property changes. In
+// order to keep consistency across the cluster, we touch the local
+// management status property even if it is locally unchanged for any
+// event that could have cause a cluster property change on any cluster member.
+void Cluster::setMgmtStatus(Lock&) {
+ if (mgmtObject)
+ mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING");
+}
+
+void Cluster::initMapCompleted(Lock& l) {
+ // Called on completion of the initial status map.
+ QPID_LOG(debug, *this << " initial status map complete. ");
+ setMgmtStatus(l);
+ if (state == PRE_INIT) {
+ // PRE_INIT means we're still in the earlyInitialize phase, in the constructor.
+ // We decide here whether we want to recover from our store.
+ // We won't recover if we are joining an active cluster or our store is dirty.
+ if (store.hasStore() &&
+ store.getState() != STORE_STATE_EMPTY_STORE &&
+ (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE))
+ broker.setRecovery(false); // Ditch my current store.
+ state = INIT;
+ }
+ else if (state == INIT) {
+ // INIT means we are past Cluster::initialize().
+
+ // If we're forming an initial cluster (no active members)
+ // then we wait to reach the configured cluster-size
+ if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) {
+ QPID_LOG(info, *this << initMap.getActualSize()
+ << " members, waiting for at least " << initMap.getRequiredSize());
+ return;
+ }
+
+ initMap.checkConsistent();
+ elders = initMap.getElders();
+ QPID_LOG(debug, *this << " elders: " << elders);
+ if (elders.empty())
+ becomeElder(l);
+ else {
+ broker.getLinks().setPassive(true);
+ broker.getQueueEvents().disable();
+ QPID_LOG(info, *this << " not active for links.");
+ }
+ setClusterId(initMap.getClusterId(), l);
+
+ if (initMap.isUpdateNeeded()) { // Joining established cluster.
+ broker.setRecovery(false); // Ditch my current store.
+ broker.setClusterUpdatee(true);
+ if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
+ state = JOINER;
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ QPID_LOG(notice, *this << " joining cluster " << name);
+ }
+ else { // I can go ready.
+ discarding = false;
+ setReady(l);
+ memberUpdate(l);
+ updateMgmtMembership(l);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
+ QPID_LOG(notice, *this << " joined cluster " << name);
+ }
+ }
+}
+
+void Cluster::configChange(const MemberId&,
+ const std::string& membersStr,
+ const std::string& leftStr,
+ const std::string& joinedStr,
+ Lock& l)
+{
+ if (state == LEFT) return;
+ MemberSet members = decodeMemberSet(membersStr);
+ MemberSet left = decodeMemberSet(leftStr);
+ MemberSet joined = decodeMemberSet(joinedStr);
+ QPID_LOG(notice, *this << " configuration change: " << members);
+ QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
+ QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
+
+ // If we are still joining, make sure there is someone to give us an update.
+ elders = intersection(elders, members);
+ if (elders.empty() && INIT < state && state < CATCHUP) {
+ QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
+ leave(l);
+ return;
+ }
+ bool memberChange = map.configChange(members);
+
+ // Update initital status for members joining or leaving.
+ initMap.configChange(members);
+ if (initMap.isResendNeeded()) {
+ mcast.mcastControl(
+ ClusterInitialStatusBody(
+ ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+ store.getState(), store.getShutdownId(),
+ initMap.getFirstConfigStr()
+ ),
+ self);
+ }
+ if (initMap.transitionToComplete()) initMapCompleted(l);
+
+ if (state >= CATCHUP && memberChange) {
+ memberUpdate(l);
+ if (elders.empty()) becomeElder(l);
+ }
+
+ updateMgmtMembership(l); // Update on every config change for consistency
+}
+
+void Cluster::becomeElder(Lock&) {
+ if (elder) return; // We were already the elder.
+ // We are the oldest, reactive links if necessary
+ QPID_LOG(info, *this << " became the elder, active for links.");
+ elder = true;
+ broker.getLinks().setPassive(false);
+ timer->becomeElder();
+}
+
+void Cluster::makeOffer(const MemberId& id, Lock& ) {
+ if (state == READY && map.isJoiner(id)) {
+ state = OFFER;
+ QPID_LOG(info, *this << " send update-offer to " << id);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self);
+ }
+}
+
+namespace {
+struct AppendQueue {
+ ostream* os;
+ AppendQueue(ostream& o) : os(&o) {}
+ void operator()(const boost::shared_ptr<broker::Queue>& q) {
+ (*os) << " " << q->getName() << "=" << q->getMessageCount();
+ }
+};
+} // namespace
+
+// Log a snapshot of broker state, used for debugging inconsistency problems.
+// May only be called in deliver thread.
+std::string Cluster::debugSnapshot() {
+ assertClusterSafe();
+ std::ostringstream msg;
+ msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:";
+ AppendQueue append(msg);
+ broker.getQueues().eachQueue(append);
+ return msg.str();
+}
+
+// Called from Broker::~Broker when broker is shut down. At this
+// point we know the poller has stopped so no poller callbacks will be
+// invoked. We must ensure that CPG has also shut down so no CPG
+// callbacks will be invoked.
+//
+void Cluster::brokerShutdown() {
+ sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, *this << " shutting down CPG: " << e.what());
+ }
+ delete this;
+}
+
+void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
+ map.updateRequest(id, url);
+ makeOffer(id, l);
+}
+
+void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
+ const framing::Uuid& id,
+ framing::cluster::StoreState store,
+ const framing::Uuid& shutdownId,
+ const std::string& firstConfig,
+ Lock& l)
+{
+ if (version != CLUSTER_VERSION) {
+ QPID_LOG(critical, *this << " incompatible cluster versions " <<
+ version << " != " << CLUSTER_VERSION);
+ leave(l);
+ return;
+ }
+ QPID_LOG_IF(debug, state == PRE_INIT, *this
+ << " received initial status from " << member);
+ initMap.received(
+ member,
+ ClusterInitialStatusBody(ProtocolVersion(), version, active, id,
+ store, shutdownId, firstConfig)
+ );
+ if (initMap.transitionToComplete()) initMapCompleted(l);
+}
+
+void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
+ try {
+ if (map.ready(id, Url(url)))
+ memberUpdate(l);
+ if (state == CATCHUP && id == self) {
+ setReady(l);
+ QPID_LOG(notice, *this << " caught up.");
+ }
+ } catch (const Url::Invalid& e) {
+ QPID_LOG(error, "Invalid URL in cluster ready command: " << url);
+ }
+ // Update management on every ready event to be consistent across cluster.
+ setMgmtStatus(l);
+ updateMgmtMembership(l);
+}
+
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
+ // NOTE: deliverEventQueue has been stopped at the update offer by
+ // deliveredEvent in case an update is required.
+ if (state == LEFT) return;
+ MemberId updatee(updateeInt);
+ boost::optional<Url> url = map.updateOffer(updater, updatee);
+ if (updater == self) {
+ assert(state == OFFER);
+ if (url) // My offer was first.
+ updateStart(updatee, *url, l);
+ else { // Another offer was first.
+ QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall");
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ deliverEventQueue.start(); // Go back to normal processing
+ }
+ }
+ else if (updatee == self && url) {
+ assert(state == JOINER);
+ state = UPDATEE;
+ QPID_LOG(notice, *this << " receiving update from " << updater);
+ checkUpdateIn(l);
+ }
+ else {
+ QPID_LOG(info, *this << " unstall, ignore update " << updater
+ << " to " << updatee);
+ deliverEventQueue.start(); // Not involved in update.
+ }
+ if (updatee != self && url) {
+ QPID_LOG(debug, debugSnapshot());
+ if (mAgent) mAgent->clusterUpdate();
+ // Updatee will call clusterUpdate when update completes
+ }
+}
+
+static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
+ client::ConnectionSettings cs;
+ cs.username = settings.username;
+ cs.password = settings.password;
+ cs.mechanism = settings.mechanism;
+ return cs;
+}
+
+void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
+ // An offer was received while handling an error, and converted to a retract.
+ // Behavior is very similar to updateOffer.
+ if (state == LEFT) return;
+ MemberId updatee(updateeInt);
+ boost::optional<Url> url = map.updateOffer(updater, updatee);
+ if (updater == self) {
+ assert(state == OFFER);
+ if (url) { // My offer was first.
+ if (updateThread)
+ updateThread.join(); // Join the previous updateThread to avoid leaks.
+ updateThread = Thread(new RetractClient(*url, connectionSettings(settings)));
+ }
+ setReady(l);
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ // Don't unstall the event queue, that was already done in deliveredFrame
+ }
+ QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee);
+}
+
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+ // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
+ if (state == LEFT) return;
+ assert(state == OFFER);
+ state = UPDATER;
+ QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url);
+ if (updateThread)
+ updateThread.join(); // Join the previous updateThread to avoid leaks.
+ updateThread = Thread(
+ new UpdateClient(self, updatee, url, broker, map, *expiryPolicy,
+ getConnections(l), decoder,
+ boost::bind(&Cluster::updateOutDone, this),
+ boost::bind(&Cluster::updateOutError, this, _1),
+ connectionSettings(settings)));
+}
+
+// Called in network thread
+void Cluster::updateInClosed() {
+ Lock l(lock);
+ assert(!updateClosed);
+ updateClosed = true;
+ checkUpdateIn(l);
+}
+
+// Called in update thread.
+void Cluster::updateInDone(const ClusterMap& m) {
+ Lock l(lock);
+ updatedMap = m;
+ checkUpdateIn(l);
+}
+
+void Cluster::updateInRetracted() {
+ Lock l(lock);
+ updateRetracted = true;
+ map.clearStatus();
+ checkUpdateIn(l);
+}
+
+bool Cluster::isExpectingUpdate() {
+ Lock l(lock);
+ return state <= UPDATEE;
+}
+
+// Called in update thread or deliver thread.
+void Cluster::checkUpdateIn(Lock& l) {
+ if (state != UPDATEE) return; // Wait till we reach the stall point.
+ if (!updateClosed) return; // Wait till update connection closes.
+ if (updatedMap) { // We're up to date
+ map = *updatedMap;
+ failoverExchange->setUrls(getUrls(l));
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
+ state = CATCHUP;
+ memberUpdate(l);
+ // NB: don't updateMgmtMembership() here as we are not in the deliver
+ // thread. It will be updated on delivery of the "ready" we just mcast.
+ broker.setClusterUpdatee(false);
+ discarding = false; // OK to set, we're stalled for update.
+ QPID_LOG(notice, *this << " update complete, starting catch-up.");
+ QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
+ if (mAgent) {
+ // Update management agent now, after all update activity is complete.
+ updateDataExchange->updateManagementAgent(mAgent);
+ mAgent->suppress(false); // Enable management output.
+ mAgent->clusterUpdate();
+ }
+ // Restore alternate exchange settings on exchanges.
+ broker.getExchanges().eachExchange(
+ boost::bind(&broker::Exchange::recoveryComplete, _1,
+ boost::ref(broker.getExchanges())));
+ enableClusterSafe(); // Enable cluster-safe assertions
+ deliverEventQueue.start();
+ }
+ else if (updateRetracted) { // Update was retracted, request another update
+ updateRetracted = false;
+ updateClosed = false;
+ state = JOINER;
+ QPID_LOG(notice, *this << " update retracted, sending new update request.");
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ deliverEventQueue.start();
+ }
+}
+
+void Cluster::updateOutDone() {
+ Monitor::ScopedLock l(lock);
+ updateOutDone(l);
+}
+
+void Cluster::updateOutDone(Lock& l) {
+ QPID_LOG(notice, *this << " update sent");
+ assert(state == UPDATER);
+ state = READY;
+ deliverEventQueue.start(); // Start processing events again.
+ makeOffer(map.firstJoiner(), l); // Try another offer
+}
+
+void Cluster::updateOutError(const std::exception& e) {
+ Monitor::ScopedLock l(lock);
+ QPID_LOG(error, *this << " error sending update: " << e.what());
+ updateOutDone(l);
+}
+
+void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) {
+ QPID_LOG(notice, *this << " cluster shut down by administrator.");
+ if (store.hasStore()) store.clean(id);
+ leave(l);
+}
+
+ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
+
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) {
+ Lock l(lock);
+ QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
+ switch (methodId) {
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+ {
+ _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
+ stringstream stream;
+ stream << self;
+ if (iargs.i_brokerId == stream.str())
+ stopClusterNode(l);
+ }
+ break;
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+ stopFullCluster(l);
+ break;
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
+ }
+ return Manageable::STATUS_OK;
+}
+
+void Cluster::stopClusterNode(Lock& l) {
+ QPID_LOG(notice, *this << " cluster member stopped by administrator.");
+ leave(l);
+}
+
+void Cluster::stopFullCluster(Lock& ) {
+ QPID_LOG(notice, *this << " shutting down cluster " << name);
+ mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self);
+}
+
+void Cluster::memberUpdate(Lock& l) {
+ // Ignore config changes while we are joining.
+ if (state < CATCHUP) return;
+ QPID_LOG(info, *this << " member update: " << map);
+ size_t aliveCount = map.aliveCount();
+ assert(map.isAlive(self));
+ failoverExchange->updateUrls(getUrls(l));
+
+ // Mark store clean if I am the only broker, dirty otherwise.
+ if (store.hasStore()) {
+ if (aliveCount == 1) {
+ if (store.getState() != STORE_STATE_CLEAN_STORE) {
+ QPID_LOG(notice, *this << "Sole member of cluster, marking store clean.");
+ store.clean(Uuid(true));
+ }
+ }
+ else {
+ if (store.getState() != STORE_STATE_DIRTY_STORE) {
+ QPID_LOG(notice, "Running in a cluster, marking store dirty.");
+ store.dirty();
+ }
+ }
+ }
+
+ // If I am the last member standing, set queue policies.
+ if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) {
+ QPID_LOG(notice, *this << " last broker standing, update queue policies");
+ lastBroker = true;
+ broker.getQueues().updateQueueClusterState(true);
+ }
+ else if (aliveCount > 1 && lastBroker) {
+ QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1
+ << " replicas, updating queue policies.");
+ lastBroker = false;
+ broker.getQueues().updateQueueClusterState(false);
+ }
+ lastAliveCount = aliveCount;
+
+ // Close connections belonging to members that have left the cluster.
+ ConnectionMap::iterator i = connections.begin();
+ while (i != connections.end()) {
+ ConnectionMap::iterator j = i++;
+ MemberId m = j->second->getId().getMember();
+ if (m != self && !map.isMember(m)) {
+ j->second->close();
+ erase(j->second->getId(), l);
+ }
+ }
+}
+
+// See comment on Cluster::setMgmtStatus
+void Cluster::updateMgmtMembership(Lock& l) {
+ if (!mgmtObject) return;
+ std::vector<Url> urls = getUrls(l);
+ mgmtObject->set_clusterSize(urls.size());
+ string urlstr;
+ for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
+ if (i != urls.begin()) urlstr += ";";
+ urlstr += i->str();
+ }
+ std::vector<string> ids = getIds(l);
+ string idstr;
+ for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) {
+ if (i != ids.begin()) idstr += ";";
+ idstr += *i;
+ }
+ mgmtObject->set_members(urlstr);
+ mgmtObject->set_memberIDs(idstr);
+}
+
+std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
+ static const char* STATE[] = {
+ "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP",
+ "READY", "OFFER", "UPDATER", "LEFT"
+ };
+ assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+ o << "cluster(" << cluster.self << " " << STATE[cluster.state];
+ if (cluster.error.isUnresolved()) o << "/error";
+ return o << ")";
+}
+
+MemberId Cluster::getId() const {
+ return self; // Immutable, no need to lock.
+}
+
+broker::Broker& Cluster::getBroker() const {
+ return broker; // Immutable, no need to lock.
+}
+
+void Cluster::setClusterId(const Uuid& uuid, Lock&) {
+ clusterId = uuid;
+ if (store.hasStore()) store.setClusterId(uuid);
+ if (mgmtObject) {
+ stringstream stream;
+ stream << self;
+ mgmtObject->set_clusterID(clusterId.str());
+ mgmtObject->set_memberID(stream.str());
+ }
+ QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
+}
+
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+ expiryPolicy->deliverExpire(id);
+}
+
+void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
+ // If we see an errorCheck here (rather than in the ErrorCheck
+ // class) then we have processed succesfully past the point of the
+ // error.
+ if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened
+ error.respondNone(from, type, frameSeq);
+}
+
+void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverWakeup(name);
+}
+
+void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
+ QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverDrop(name);
+}
+
+bool Cluster::isElder() const {
+ return elder;
+}
+
+void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l)
+{
+ broker::Queue::shared_ptr q = broker.getQueues().find(queue);
+ if (!q) {
+ QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue);
+ leave(l);
+ }
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ boost::intrusive_ptr<broker::Message> msg(new broker::Message);
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
+ q->deliver(msg);
+}
+
+bool Cluster::deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<broker::Message>& msg)
+{
+ if (isClusterSafe()) return false;
+ std::string message;
+ message.resize(msg->encodedSize());
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ msg->encode(buf);
+ mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self);
+ return true;
+}
+
+}} // namespace qpid::cluster