summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp169
1 files changed, 52 insertions, 117 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e6e3de64f2..dd4882774b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,45 +36,45 @@
*
* IMPORTANT NOTE: any time code is added to the broker that uses timers,
* the cluster may need to be updated to take account of this.
- *
+ *
*
* USE OF TIMESTAMPS IN THE BROKER
- *
+ *
* The following are the current areas where broker uses timers or timestamps:
- *
+ *
* - Producer flow control: broker::SemanticState uses
* connection::getClusterOrderOutput. a FrameHandler that sends
* frames to the client via the cluster. Used by broker::SessionState
- *
+ *
* - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
* implemented by cluster::ExpiryPolicy.
- *
+ *
* - Connection heartbeat: sends connection controls, not part of
* session command counting so OK to ignore.
- *
+ *
* - LinkRegistry: only cluster elder is ever active for links.
- *
+ *
* - management::ManagementBroker: uses MessageHandler supplied by cluster
* to send messages to the broker via the cluster.
+ *
+ * - Dtx: not yet supported with cluster.
*
- * cluster::ExpiryPolicy uses cluster time.
+ * cluster::ExpiryPolicy implements the strategy for message expiry.
*
* ClusterTimer implements periodic timed events in the cluster context.
- * Used for:
- * - periodic management events.
- * - DTX transaction timeouts.
+ * Used for periodic management events.
*
* <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- *
+ *
* Messages sent to/from CPG are called Events.
*
* An Event carries a ConnectionId, which includes a MemberId and a
* connection number.
- *
+ *
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
* - Cluster Events: 0 connection number, are not associated with a connection.
- *
+ *
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
* - Data: carries raw data received from a client connection.
@@ -146,7 +146,6 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterClockBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterRetractOfferBody.h"
@@ -199,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1159329;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -215,7 +214,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
{
cluster.initialStatus(
member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId,
+ framing::cluster::StoreState(storeState), shutdownId,
firstConfig, l);
}
void ready(const std::string& url) {
@@ -231,21 +230,21 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
+ void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
- void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); }
+ void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
void deliverToQueue(const std::string& queue, const std::string& message) {
cluster.deliverToQueue(queue, message, l);
}
- void clock(uint64_t time) { cluster.clock(time, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
- settings(set),
+ settings(set),
broker(b),
mgmtObject(0),
poller(b.getPoller()),
@@ -254,7 +253,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
self(cpg.self()),
clusterId(true),
mAgent(0),
- expiryPolicy(new ExpiryPolicy(*this)),
+ expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -278,11 +277,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
lastBroker(false),
updateRetracted(false),
updateClosed(false),
- error(*this),
- acl(0)
+ error(*this)
{
- broker.setInCluster(true);
-
// We give ownership of the timer to the broker and keep a plain pointer.
// This is OK as it means the timer has the same lifetime as the broker.
timer = new ClusterTimer(*this);
@@ -303,7 +299,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- clusterId = store.getClusterId();
+ clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
@@ -364,15 +360,14 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
- assert(discarding);
+ assert(discarding);
pair<ConnectionMap::iterator, bool> ib
= connections.insert(ConnectionMap::value_type(c->getId(), c));
- // Like this to avoid tripping up unused variable warning when NDEBUG set
- if (!ib.second) assert(ib.second);
+ assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
- Lock l(lock);
+ Lock l(lock);
erase(id,l);
}
@@ -398,9 +393,9 @@ std::vector<Url> Cluster::getUrls() const {
std::vector<Url> Cluster::getUrls(Lock&) const {
return map.memberUrls();
-}
+}
-void Cluster::leave() {
+void Cluster::leave() {
Lock l(lock);
leave(l);
}
@@ -410,7 +405,7 @@ void Cluster::leave() {
QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
} do {} while(0)
-void Cluster::leave(Lock&) {
+void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
@@ -429,7 +424,7 @@ void Cluster::deliver(
uint32_t nodeid,
uint32_t pid,
void* msg,
- int msg_len)
+ int msg_len)
{
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
@@ -460,7 +455,7 @@ void Cluster::deliveredEvent(const Event& e) {
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- // Only do this for the two brokers that are directly involved in this
+ // Only do this for the two brokers that are directly involved in this
// offer: the one making the offer, or the one receiving it.
const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
@@ -470,7 +465,7 @@ void Cluster::deliveredEvent(const Event& e) {
}
deliverFrame(ef);
}
- else if(!discarding) {
+ else if(!discarding) {
if (e.isControl())
deliverFrame(EventFrame(e, e.getFrame()));
else {
@@ -512,7 +507,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
// the event queue.
e.frame = AMQFrame(
ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
- deliverEventQueue.start();
+ deliverEventQueue.start();
}
// Process each frame through the error checker.
if (error.isUnresolved()) {
@@ -520,14 +515,14 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
}
- else
+ else
processFrame(e, l);
}
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
- QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e);
+ QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
@@ -536,15 +531,14 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
map.incrementFrameSeq();
ConnectionPtr connection = getConnection(e, l);
if (connection) {
- QPID_LOG_IF(trace, loggable(e.frame),
- *this << " DLVR " << map.getFrameSeq() << ": " << e);
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
connection->deliveredFrame(e);
}
else
- throw Exception(QPID_MSG("Unknown connection: " << e));
+ QPID_LOG(trace, *this << " DROP (no connection): " << e);
}
else // Drop connection frames while state < CATCHUP
- QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e);
+ QPID_LOG(trace, *this << " DROP (joining): " << e);
}
// Called in deliverFrameQueue thread
@@ -583,7 +577,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
}
// CPG config-change callback.
-void Cluster::configChange (
+void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
const cpg_address *members, int nMembers,
@@ -613,7 +607,7 @@ void Cluster::setReady(Lock&) {
}
// Set the management status from the Cluster::state.
-//
+//
// NOTE: Management updates are sent based on property changes. In
// order to keep consistency across the cluster, we touch the local
// management status property even if it is locally unchanged for any
@@ -624,7 +618,7 @@ void Cluster::setMgmtStatus(Lock&) {
}
void Cluster::initMapCompleted(Lock& l) {
- // Called on completion of the initial status map.
+ // Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
setMgmtStatus(l);
if (state == PRE_INIT) {
@@ -671,8 +665,6 @@ void Cluster::initMapCompleted(Lock& l) {
else { // I can go ready.
discarding = false;
setReady(l);
- // Must be called *before* memberUpdate so first update will be generated.
- failoverExchange->setReady();
memberUpdate(l);
updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -709,8 +701,8 @@ void Cluster::configChange(const MemberId&,
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
- ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(),
+ ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+ store.getState(), store.getShutdownId(),
initMap.getFirstConfigStr()
),
self);
@@ -725,20 +717,6 @@ void Cluster::configChange(const MemberId&,
updateMgmtMembership(l); // Update on every config change for consistency
}
-struct ClusterClockTask : public sys::TimerTask {
- Cluster& cluster;
- sys::Timer& timer;
-
- ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
- : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
-
- void fire() {
- cluster.sendClockUpdate();
- setupNextFire();
- timer.add(this);
- }
-};
-
void Cluster::becomeElder(Lock&) {
if (elder) return; // We were already the elder.
// We are the oldest, reactive links if necessary
@@ -746,8 +724,6 @@ void Cluster::becomeElder(Lock&) {
elder = true;
broker.getLinks().setPassive(false);
timer->becomeElder();
-
- clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
}
void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -783,7 +759,7 @@ std::string Cluster::debugSnapshot() {
// point we know the poller has stopped so no poller callbacks will be
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
-//
+//
void Cluster::brokerShutdown() {
sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
try { cpg.shutdown(); }
@@ -799,7 +775,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
}
void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
- const framing::Uuid& id,
+ const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
const std::string& firstConfig,
@@ -857,8 +833,6 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
else if (updatee == self && url) {
assert(state == JOINER);
state = UPDATEE;
- acl = broker.getAcl();
- broker.setAcl(0); // Disable ACL during update
QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
@@ -870,7 +844,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
if (updatee != self && url) {
QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
- // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
+ // Updatee will call clusterUpdate when update completes
}
}
@@ -951,15 +925,13 @@ void Cluster::checkUpdateIn(Lock& l) {
if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
+ failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
- // Must be called *after* memberUpdate() to avoid sending an extra update.
- failoverExchange->setReady();
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
- broker.setAcl(acl); // Restore ACL
discarding = false; // OK to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
@@ -969,10 +941,6 @@ void Cluster::checkUpdateIn(Lock& l) {
mAgent->suppress(false); // Enable management output.
mAgent->clusterUpdate();
}
- // Restore alternate exchange settings on exchanges.
- broker.getExchanges().eachExchange(
- boost::bind(&broker::Exchange::recoveryComplete, _1,
- boost::ref(broker.getExchanges())));
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
@@ -1001,7 +969,7 @@ void Cluster::updateOutDone(Lock& l) {
void Cluster::updateOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending update: " << e.what());
+ QPID_LOG(error, *this << " error sending update: " << e.what());
updateOutDone(l);
}
@@ -1099,7 +1067,7 @@ void Cluster::memberUpdate(Lock& l) {
void Cluster::updateMgmtMembership(Lock& l) {
if (!mgmtObject) return;
std::vector<Url> urls = getUrls(l);
- mgmtObject->set_clusterSize(urls.size());
+ mgmtObject->set_clusterSize(urls.size());
string urlstr;
for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
if (i != urls.begin()) urlstr += ";";
@@ -1146,6 +1114,10 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+ expiryPolicy->deliverExpire(id);
+}
+
void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
// If we see an errorCheck here (rather than in the ErrorCheck
// class) then we have processed succesfully past the point of the
@@ -1183,35 +1155,6 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag
q->deliver(msg);
}
-sys::AbsTime Cluster::getClusterTime() {
- Mutex::ScopedLock l(lock);
- return clusterTime;
-}
-
-// This method is called during update on the updatee to set the initial cluster time.
-void Cluster::clock(const uint64_t time) {
- Mutex::ScopedLock l(lock);
- clock(time, l);
-}
-
-// called when broadcast message received
-void Cluster::clock(const uint64_t time, Lock&) {
- clusterTime = AbsTime(EPOCH, time);
- AbsTime now = AbsTime::now();
-
- if (!elder) {
- clusterTimeOffset = Duration(now, clusterTime);
- }
-}
-
-// called by elder timer to send clock broadcast
-void Cluster::sendClockUpdate() {
- Mutex::ScopedLock l(lock);
- int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
- nanosecondsSinceEpoch += clusterTimeOffset;
- mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
-}
-
bool Cluster::deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg)
{
@@ -1224,12 +1167,4 @@ bool Cluster::deferDeliveryImpl(const std::string& queue,
return true;
}
-bool Cluster::loggable(const AMQFrame& f) {
- const AMQMethodBody* method = (f.getMethod());
- if (!method) return true; // Not a method
- bool isClock = method->amqpClassId() == ClusterClockBody::CLASS_ID
- && method->amqpMethodId() == ClusterClockBody::METHOD_ID;
- return !isClock;
-}
-
}} // namespace qpid::cluster