summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp169
-rw-r--r--cpp/src/qpid/cluster/Cluster.h56
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp5
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp1
-rw-r--r--cpp/src/qpid/cluster/ClusterSettings.h3
-rw-r--r--cpp/src/qpid/cluster/ClusterTimer.cpp4
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp235
-rw-r--r--cpp/src/qpid/cluster/Connection.h55
-rw-r--r--cpp/src/qpid/cluster/Decoder.h2
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.h2
-rw-r--r--cpp/src/qpid/cluster/Event.cpp5
-rw-r--r--cpp/src/qpid/cluster/Event.h28
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h6
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.cpp95
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.h42
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.cpp26
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.h10
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp3
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp39
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.h10
-rw-r--r--cpp/src/qpid/cluster/SecureConnectionFactory.cpp8
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp223
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h32
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.cpp10
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.h2
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.cpp27
-rw-r--r--cpp/src/qpid/cluster/UpdateReceiver.h14
-rw-r--r--cpp/src/qpid/cluster/types.h1
28 files changed, 417 insertions, 696 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index e6e3de64f2..dd4882774b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,45 +36,45 @@
*
* IMPORTANT NOTE: any time code is added to the broker that uses timers,
* the cluster may need to be updated to take account of this.
- *
+ *
*
* USE OF TIMESTAMPS IN THE BROKER
- *
+ *
* The following are the current areas where broker uses timers or timestamps:
- *
+ *
* - Producer flow control: broker::SemanticState uses
* connection::getClusterOrderOutput. a FrameHandler that sends
* frames to the client via the cluster. Used by broker::SessionState
- *
+ *
* - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
* implemented by cluster::ExpiryPolicy.
- *
+ *
* - Connection heartbeat: sends connection controls, not part of
* session command counting so OK to ignore.
- *
+ *
* - LinkRegistry: only cluster elder is ever active for links.
- *
+ *
* - management::ManagementBroker: uses MessageHandler supplied by cluster
* to send messages to the broker via the cluster.
+ *
+ * - Dtx: not yet supported with cluster.
*
- * cluster::ExpiryPolicy uses cluster time.
+ * cluster::ExpiryPolicy implements the strategy for message expiry.
*
* ClusterTimer implements periodic timed events in the cluster context.
- * Used for:
- * - periodic management events.
- * - DTX transaction timeouts.
+ * Used for periodic management events.
*
* <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- *
+ *
* Messages sent to/from CPG are called Events.
*
* An Event carries a ConnectionId, which includes a MemberId and a
* connection number.
- *
+ *
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
* - Cluster Events: 0 connection number, are not associated with a connection.
- *
+ *
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
* - Data: carries raw data received from a client connection.
@@ -146,7 +146,6 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterClockBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterRetractOfferBody.h"
@@ -199,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1159329;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -215,7 +214,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
{
cluster.initialStatus(
member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId,
+ framing::cluster::StoreState(storeState), shutdownId,
firstConfig, l);
}
void ready(const std::string& url) {
@@ -231,21 +230,21 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
+ void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
- void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); }
+ void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
void deliverToQueue(const std::string& queue, const std::string& message) {
cluster.deliverToQueue(queue, message, l);
}
- void clock(uint64_t time) { cluster.clock(time, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
- settings(set),
+ settings(set),
broker(b),
mgmtObject(0),
poller(b.getPoller()),
@@ -254,7 +253,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
self(cpg.self()),
clusterId(true),
mAgent(0),
- expiryPolicy(new ExpiryPolicy(*this)),
+ expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -278,11 +277,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
lastBroker(false),
updateRetracted(false),
updateClosed(false),
- error(*this),
- acl(0)
+ error(*this)
{
- broker.setInCluster(true);
-
// We give ownership of the timer to the broker and keep a plain pointer.
// This is OK as it means the timer has the same lifetime as the broker.
timer = new ClusterTimer(*this);
@@ -303,7 +299,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- clusterId = store.getClusterId();
+ clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
@@ -364,15 +360,14 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
- assert(discarding);
+ assert(discarding);
pair<ConnectionMap::iterator, bool> ib
= connections.insert(ConnectionMap::value_type(c->getId(), c));
- // Like this to avoid tripping up unused variable warning when NDEBUG set
- if (!ib.second) assert(ib.second);
+ assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
- Lock l(lock);
+ Lock l(lock);
erase(id,l);
}
@@ -398,9 +393,9 @@ std::vector<Url> Cluster::getUrls() const {
std::vector<Url> Cluster::getUrls(Lock&) const {
return map.memberUrls();
-}
+}
-void Cluster::leave() {
+void Cluster::leave() {
Lock l(lock);
leave(l);
}
@@ -410,7 +405,7 @@ void Cluster::leave() {
QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
} do {} while(0)
-void Cluster::leave(Lock&) {
+void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
@@ -429,7 +424,7 @@ void Cluster::deliver(
uint32_t nodeid,
uint32_t pid,
void* msg,
- int msg_len)
+ int msg_len)
{
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
@@ -460,7 +455,7 @@ void Cluster::deliveredEvent(const Event& e) {
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- // Only do this for the two brokers that are directly involved in this
+ // Only do this for the two brokers that are directly involved in this
// offer: the one making the offer, or the one receiving it.
const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
@@ -470,7 +465,7 @@ void Cluster::deliveredEvent(const Event& e) {
}
deliverFrame(ef);
}
- else if(!discarding) {
+ else if(!discarding) {
if (e.isControl())
deliverFrame(EventFrame(e, e.getFrame()));
else {
@@ -512,7 +507,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
// the event queue.
e.frame = AMQFrame(
ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
- deliverEventQueue.start();
+ deliverEventQueue.start();
}
// Process each frame through the error checker.
if (error.isUnresolved()) {
@@ -520,14 +515,14 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
}
- else
+ else
processFrame(e, l);
}
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
- QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e);
+ QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
@@ -536,15 +531,14 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
map.incrementFrameSeq();
ConnectionPtr connection = getConnection(e, l);
if (connection) {
- QPID_LOG_IF(trace, loggable(e.frame),
- *this << " DLVR " << map.getFrameSeq() << ": " << e);
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
connection->deliveredFrame(e);
}
else
- throw Exception(QPID_MSG("Unknown connection: " << e));
+ QPID_LOG(trace, *this << " DROP (no connection): " << e);
}
else // Drop connection frames while state < CATCHUP
- QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e);
+ QPID_LOG(trace, *this << " DROP (joining): " << e);
}
// Called in deliverFrameQueue thread
@@ -583,7 +577,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
}
// CPG config-change callback.
-void Cluster::configChange (
+void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
const cpg_address *members, int nMembers,
@@ -613,7 +607,7 @@ void Cluster::setReady(Lock&) {
}
// Set the management status from the Cluster::state.
-//
+//
// NOTE: Management updates are sent based on property changes. In
// order to keep consistency across the cluster, we touch the local
// management status property even if it is locally unchanged for any
@@ -624,7 +618,7 @@ void Cluster::setMgmtStatus(Lock&) {
}
void Cluster::initMapCompleted(Lock& l) {
- // Called on completion of the initial status map.
+ // Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
setMgmtStatus(l);
if (state == PRE_INIT) {
@@ -671,8 +665,6 @@ void Cluster::initMapCompleted(Lock& l) {
else { // I can go ready.
discarding = false;
setReady(l);
- // Must be called *before* memberUpdate so first update will be generated.
- failoverExchange->setReady();
memberUpdate(l);
updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -709,8 +701,8 @@ void Cluster::configChange(const MemberId&,
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
- ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(),
+ ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+ store.getState(), store.getShutdownId(),
initMap.getFirstConfigStr()
),
self);
@@ -725,20 +717,6 @@ void Cluster::configChange(const MemberId&,
updateMgmtMembership(l); // Update on every config change for consistency
}
-struct ClusterClockTask : public sys::TimerTask {
- Cluster& cluster;
- sys::Timer& timer;
-
- ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
- : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
-
- void fire() {
- cluster.sendClockUpdate();
- setupNextFire();
- timer.add(this);
- }
-};
-
void Cluster::becomeElder(Lock&) {
if (elder) return; // We were already the elder.
// We are the oldest, reactive links if necessary
@@ -746,8 +724,6 @@ void Cluster::becomeElder(Lock&) {
elder = true;
broker.getLinks().setPassive(false);
timer->becomeElder();
-
- clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
}
void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -783,7 +759,7 @@ std::string Cluster::debugSnapshot() {
// point we know the poller has stopped so no poller callbacks will be
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
-//
+//
void Cluster::brokerShutdown() {
sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
try { cpg.shutdown(); }
@@ -799,7 +775,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
}
void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
- const framing::Uuid& id,
+ const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
const std::string& firstConfig,
@@ -857,8 +833,6 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
else if (updatee == self && url) {
assert(state == JOINER);
state = UPDATEE;
- acl = broker.getAcl();
- broker.setAcl(0); // Disable ACL during update
QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
@@ -870,7 +844,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
if (updatee != self && url) {
QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
- // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
+ // Updatee will call clusterUpdate when update completes
}
}
@@ -951,15 +925,13 @@ void Cluster::checkUpdateIn(Lock& l) {
if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
+ failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
- // Must be called *after* memberUpdate() to avoid sending an extra update.
- failoverExchange->setReady();
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
- broker.setAcl(acl); // Restore ACL
discarding = false; // OK to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
@@ -969,10 +941,6 @@ void Cluster::checkUpdateIn(Lock& l) {
mAgent->suppress(false); // Enable management output.
mAgent->clusterUpdate();
}
- // Restore alternate exchange settings on exchanges.
- broker.getExchanges().eachExchange(
- boost::bind(&broker::Exchange::recoveryComplete, _1,
- boost::ref(broker.getExchanges())));
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
@@ -1001,7 +969,7 @@ void Cluster::updateOutDone(Lock& l) {
void Cluster::updateOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending update: " << e.what());
+ QPID_LOG(error, *this << " error sending update: " << e.what());
updateOutDone(l);
}
@@ -1099,7 +1067,7 @@ void Cluster::memberUpdate(Lock& l) {
void Cluster::updateMgmtMembership(Lock& l) {
if (!mgmtObject) return;
std::vector<Url> urls = getUrls(l);
- mgmtObject->set_clusterSize(urls.size());
+ mgmtObject->set_clusterSize(urls.size());
string urlstr;
for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
if (i != urls.begin()) urlstr += ";";
@@ -1146,6 +1114,10 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+ expiryPolicy->deliverExpire(id);
+}
+
void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
// If we see an errorCheck here (rather than in the ErrorCheck
// class) then we have processed succesfully past the point of the
@@ -1183,35 +1155,6 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag
q->deliver(msg);
}
-sys::AbsTime Cluster::getClusterTime() {
- Mutex::ScopedLock l(lock);
- return clusterTime;
-}
-
-// This method is called during update on the updatee to set the initial cluster time.
-void Cluster::clock(const uint64_t time) {
- Mutex::ScopedLock l(lock);
- clock(time, l);
-}
-
-// called when broadcast message received
-void Cluster::clock(const uint64_t time, Lock&) {
- clusterTime = AbsTime(EPOCH, time);
- AbsTime now = AbsTime::now();
-
- if (!elder) {
- clusterTimeOffset = Duration(now, clusterTime);
- }
-}
-
-// called by elder timer to send clock broadcast
-void Cluster::sendClockUpdate() {
- Mutex::ScopedLock l(lock);
- int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
- nanosecondsSinceEpoch += clusterTimeOffset;
- mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
-}
-
bool Cluster::deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg)
{
@@ -1224,12 +1167,4 @@ bool Cluster::deferDeliveryImpl(const std::string& queue,
return true;
}
-bool Cluster::loggable(const AMQFrame& f) {
- const AMQMethodBody* method = (f.getMethod());
- if (!method) return true; // Not a method
- bool isClock = method->amqpClassId() == ClusterClockBody::CLASS_ID
- && method->amqpMethodId() == ClusterClockBody::METHOD_ID;
- return !isClock;
-}
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index ccec4948e6..8f73c6acca 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -56,25 +56,17 @@ namespace qpid {
namespace broker {
class Message;
-class AclModule;
}
namespace framing {
-class AMQFrame;
class AMQBody;
-struct Uuid;
-}
-
-namespace sys {
-class Timer;
-class AbsTime;
-class Duration;
+class Uuid;
}
namespace cluster {
class Connection;
-struct EventFrame;
+class EventFrame;
class ClusterTimer;
class UpdateDataExchange;
@@ -97,10 +89,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void initialize();
// Connection map.
- void addLocalConnection(const ConnectionPtr&);
- void addShadowConnection(const ConnectionPtr&);
- void erase(const ConnectionId&);
-
+ void addLocalConnection(const ConnectionPtr&);
+ void addShadowConnection(const ConnectionPtr&);
+ void erase(const ConnectionId&);
+
// URLs of current cluster members.
std::vector<std::string> getIds() const;
std::vector<Url> getUrls() const;
@@ -115,7 +107,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void updateInRetracted();
// True if we are expecting to receive catch-up connections.
bool isExpectingUpdate();
-
+
MemberId getId() const;
broker::Broker& getBroker() const;
Multicaster& getMulticast() { return mcast; }
@@ -143,12 +135,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg);
- sys::AbsTime getClusterTime();
- void sendClockUpdate();
- void clock(const uint64_t time);
-
- static bool loggable(const framing::AMQFrame&); // True if the frame should be logged.
-
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -158,10 +144,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
/** Version number of the cluster protocol, to avoid mixed versions. */
static const uint32_t CLUSTER_VERSION;
-
+
// NB: A dummy Lock& parameter marks functions that must only be
// called with Cluster::lock locked.
-
+
void leave(Lock&);
std::vector<std::string> getIds(Lock&) const;
std::vector<Url> getUrls(Lock&) const;
@@ -170,11 +156,11 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void brokerShutdown();
// == Called in deliverEventQueue thread
- void deliveredEvent(const Event&);
+ void deliveredEvent(const Event&);
// == Called in deliverFrameQueue thread
- void deliveredFrame(const EventFrame&);
- void processFrame(const EventFrame&, Lock&);
+ void deliveredFrame(const EventFrame&);
+ void processFrame(const EventFrame&, Lock&);
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
@@ -194,12 +180,12 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const std::string& left,
const std::string& joined,
Lock& l);
+ void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
void timerDrop(const MemberId&, const std::string& name, Lock&);
void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
- void clock(const uint64_t time, Lock&);
// Helper functions
ConnectionPtr getConnection(const EventFrame&, Lock&);
@@ -209,7 +195,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void setReady(Lock&);
void memberUpdate(Lock&);
void setClusterId(const framing::Uuid&, Lock&);
- void erase(const ConnectionId&, Lock&);
+ void erase(const ConnectionId&, Lock&);
void requestUpdate(Lock& );
void initMapCompleted(Lock&);
void becomeElder(Lock&);
@@ -217,7 +203,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void updateMgmtMembership(Lock&);
// == Called in CPG dispatch thread
- void deliver( // CPG deliver callback.
+ void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
const struct cpg_name *group,
uint32_t /*nodeid*/,
@@ -226,7 +212,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
int /*msg_len*/);
void deliverEvent(const Event&);
-
+
void configChange( // CPG config change callback.
cpg_handle_t /*handle*/,
const struct cpg_name */*group*/,
@@ -277,7 +263,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Used only in deliverEventQueue thread or when stalled for update.
Decoder decoder;
bool discarding;
-
+
// Remaining members are protected by lock.
mutable sys::Monitor lock;
@@ -290,7 +276,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
JOINER, ///< Sent update request, waiting for update offer.
UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
- READY, ///< Fully operational
+ READY, ///< Fully operational
OFFER, ///< Sent an offer, waiting for accept/reject.
UPDATER, ///< Offer accepted, sending a state update.
LEFT ///< Final state, left the cluster.
@@ -310,13 +296,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ErrorCheck error;
UpdateReceiver updateReceiver;
ClusterTimer* timer;
- sys::Timer clockTimer;
- sys::AbsTime clusterTime;
- sys::Duration clusterTimeOffset;
- broker::AclModule* acl;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
- friend struct ClusterDispatcher;
+ friend class ClusterDispatcher;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index a8389095c9..040e129970 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -50,6 +50,11 @@ void insertFieldTableFromMapValue(FieldTable& ft, const ClusterMap::Map::value_t
ft.setString(vt.first.str(), vt.second.str());
}
+void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
+ ft.clear();
+ for_each(map.begin(), map.end(), bind(&insertFieldTableFromMapValue, ref(ft), _1));
+}
+
}
ClusterMap::ClusterMap() : frameSeq(0) {}
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 69ba095f16..2962daaa07 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -72,7 +72,6 @@ struct ClusterOptions : public Options {
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
- ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.")
("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
;
}
diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h
index 2f7b5be20a..8e708aa139 100644
--- a/cpp/src/qpid/cluster/ClusterSettings.h
+++ b/cpp/src/qpid/cluster/ClusterSettings.h
@@ -35,9 +35,8 @@ struct ClusterSettings {
size_t readMax;
std::string username, password, mechanism;
size_t size;
- uint16_t clockInterval;
- ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10)
+ ClusterSettings() : quorum(false), readMax(10), size(1)
{}
Url getUrl(uint16_t port) const {
diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp
index b4f7d00f38..f6e1c7a849 100644
--- a/cpp/src/qpid/cluster/ClusterTimer.cpp
+++ b/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -70,7 +70,6 @@ void ClusterTimer::add(intrusive_ptr<TimerTask> task)
if (i != map.end())
throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
map[task->getName()] = task;
-
// Only the elder actually activates the task with the Timer base class.
if (cluster.isElder()) {
QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
@@ -113,9 +112,6 @@ void ClusterTimer::deliverWakeup(const std::string& name) {
else {
intrusive_ptr<TimerTask> t = i->second;
map.erase(i);
- // Move the nextFireTime so readyToFire() is true. This is to ensure we
- // don't get an error if the fired task calls setupNextFire()
- t->setFired();
Timer::fire(t);
}
}
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 394749aad2..e9b718e6de 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,8 +24,6 @@
#include "Cluster.h"
#include "UpdateReceiver.h"
#include "qpid/assert.h"
-#include "qpid/broker/DtxAck.h"
-#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/TxBuffer.h"
@@ -37,7 +35,6 @@
#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
-#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -81,7 +78,7 @@ const std::string shadowPrefix("[shadow]");
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& mgmtId,
const ConnectionId& id, const qpid::sys::SecuritySettings& external)
- : cluster(c), self(id), catchUp(false), announced(false), output(*this, out),
+ : cluster(c), self(id), catchUp(false), output(*this, out),
connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
@@ -93,15 +90,13 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& mgmtId, MemberId member,
bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external
-) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out),
+) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
connectionCtor(&output, cluster.getBroker(),
mgmtId,
external,
isLink,
isCatchUp ? ++catchUpId : 0,
- // The first catch-up connection is not considered a shadow
- // as it needs to be authenticated.
- isCatchUp && self.second > 1),
+ isCatchUp), // isCatchUp => shadow
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
@@ -118,7 +113,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
if (!updateIn.nextShadowMgmtId.empty())
connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
- }
+ }
init();
QPID_LOG(debug, cluster << " local connection " << *this);
}
@@ -148,7 +143,7 @@ void Connection::init() {
// Called when we have consumed a read buffer to give credit to the
// connection layer to continue reading.
void Connection::giveReadCredit(int credit) {
- if (cluster.getSettings().readMax && credit)
+ if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
@@ -171,7 +166,7 @@ void Connection::announce(
AMQFrame frame;
while (frame.decode(buf))
connection->received(frame);
- connection->setUserId(username);
+ connection->setUserId(username);
}
// Do managment actions now that the connection is replicated.
connection->raiseConnectEvent();
@@ -198,7 +193,7 @@ void Connection::received(framing::AMQFrame& f) {
<< *this << ": " << f);
return;
}
- QPID_LOG_IF(trace, Cluster::loggable(f), cluster << " RECV " << *this << ": " << f);
+ QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -206,7 +201,7 @@ void Connection::received(framing::AMQFrame& f) {
}
else { // Shadow or updated catch-up connection.
if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
- if (isShadow())
+ if (isShadow())
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection->getOutput().send(ok);
@@ -218,9 +213,16 @@ void Connection::received(framing::AMQFrame& f) {
}
}
-bool Connection::checkUnsupported(const AMQBody&) {
- // Throw an exception for unsupported commands. Currently all are supported.
- return false;
+bool Connection::checkUnsupported(const AMQBody& body) {
+ std::string message;
+ if (body.getMethod()) {
+ switch (body.getMethod()->amqpClassId()) {
+ case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
+ }
+ }
+ if (!message.empty())
+ connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
+ return !message.empty();
}
struct GiveReadCreditOnExit {
@@ -239,7 +241,7 @@ void Connection::deliverDoOutput(uint32_t limit) {
void Connection::deliveredFrame(const EventFrame& f) {
GiveReadCreditOnExit gc(*this, f.readCredit);
assert(!catchUp);
- currentChannel = f.frame.getChannel();
+ currentChannel = f.frame.getChannel();
if (f.frame.getBody() // frame can be emtpy with just readCredit
&& !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
@@ -253,7 +255,7 @@ void Connection::deliveredFrame(const EventFrame& f) {
}
}
-// A local connection is closed by the network layer. Called in the connection thread.
+// A local connection is closed by the network layer.
void Connection::closed() {
try {
if (isUpdated()) {
@@ -270,9 +272,8 @@ void Connection::closed() {
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.closeOutput();
- if (announced)
- cluster.getMulticast().mcastControl(
- ClusterConnectionDeliverCloseBody(), self);
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(), self);
}
}
catch (const std::exception& e) {
@@ -286,7 +287,7 @@ void Connection::deliverClose () {
cluster.erase(self);
}
-// Close the connection
+// Close the connection
void Connection::close() {
if (connection.get()) {
QPID_LOG(debug, cluster << " closed connection " << *this);
@@ -319,10 +320,10 @@ size_t Connection::decode(const char* data, size_t size) {
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
if (!wasOpen && connection->isOpen()) {
- // Connections marked with setUserProxyAuth are allowed to proxy
+ // Connections marked as federation links are allowed to proxy
// messages with user-ID that doesn't match the connection's
// authenticated ID. This is important for updates.
- connection->setUserProxyAuth(isCatchUp());
+ connection->setFederationLink(isCatchUp());
}
}
else { // Multicast local connections.
@@ -331,9 +332,9 @@ size_t Connection::decode(const char* data, size_t size) {
if (!checkProtocolHeader(ptr, size)) // Updates ptr
return 0; // Incomplete header
- if (!connection->isOpen())
+ if (!connection->isOpen())
processInitialFrames(ptr, end-ptr); // Updates ptr
-
+
if (connection->isOpen() && end - ptr > 0) {
// We're multi-casting, we will give read credit on delivery.
grc.credit = 0;
@@ -383,7 +384,6 @@ void Connection::processInitialFrames(const char*& ptr, size_t size) {
connection->getUserId(),
initialFrames),
getId());
- announced = true;
initialFrames.clear();
}
}
@@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std::string& userId) {
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
- broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
- c->position = position;
- c->setBlocked(blocked);
- if (notifyEnabled) c->enableNotify(); else c->disableNotify();
- updateIn.consumerNumbering.add(c);
+ broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
+ c.position = position;
+ c.setBlocked(blocked);
+ if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+ updateIn.consumerNumbering.add(c.shared_from_this());
}
@@ -421,8 +421,7 @@ void Connection::sessionState(
const SequenceNumber& expected,
const SequenceNumber& received,
const SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete,
- bool dtxSelected)
+ const SequenceSet& receivedIncomplete)
{
sessionState().setState(
replayStart,
@@ -432,10 +431,8 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- if (dtxSelected) semanticState().selectDtx();
- QPID_LOG(debug, cluster << " received session state update for "
- << sessionState().getId());
- // The output tasks will be added later in the update process.
+ QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ // The output tasks will be added later in the update process.
connection->getOutputTasks().removeAll();
}
@@ -444,7 +441,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) {
if (!session)
throw Exception(QPID_MSG(cluster << " channel not attached " << *this
<< "[" << channel << "] "));
- OutputTask* task = session->getSemanticState().find(name).get();
+ OutputTask* task = &session->getSemanticState().find(name);
connection->getOutputTasks().addOutputTask(task);
}
@@ -464,24 +461,11 @@ void Connection::shadowReady(
output.setSendMax(sendMax);
}
-void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
- broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
- broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
- broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
- if (bufRef.suspended)
- bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
- else
- bufRef.semanticState->setDtxBuffer(buffer);
-}
-
-// Marks the end of the update.
void Connection::membership(const FieldTable& joiners, const FieldTable& members,
const framing::SequenceNumber& frameSeq)
{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
updateIn.consumerNumbering.clear();
- for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(),
- boost::bind(&Connection::setDtxBuffer, this, _1));
closeUpdated();
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
}
@@ -494,7 +478,7 @@ void Connection::retractOffer() {
void Connection::closeUpdated() {
self.second = 0; // Mark this as completed update connection.
- if (connection.get())
+ if (connection.get())
connection->close(connection::CLOSE_CODE_NORMAL, "OK");
}
@@ -545,20 +529,12 @@ void Connection::deliveryRecord(const string& qname,
m = getUpdateMessage();
m.queue = queue.get();
m.position = position;
- if (enqueued) queue->updateEnqueued(m); //inform queue of the message
+ if (enqueued) queue->updateEnqueued(m); //inform queue of the message
} else { // Message at original position in original queue
- queue->find(position, m);
+ m = queue->find(position);
}
- // FIXME aconway 2011-08-19: removed:
- // if (!m.payload)
- // throw Exception(QPID_MSG("deliveryRecord no update message"));
- //
- // It seems this could happen legitimately in the case one
- // session browses message M, then another session acquires
- // it. In that case the browsers delivery record is !acquired
- // but the message is not on its original Queue. In that case
- // we'll get a deliveryRecord with no payload for the browser.
- //
+ if (!m.payload)
+ throw Exception(QPID_MSG("deliveryRecord no update message"));
}
broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -566,11 +542,7 @@ void Connection::deliveryRecord(const string& qname,
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
-
- if (dtxBuffer) // Record for next dtx-ack
- dtxAckRecords.push_back(dr);
- else
- semanticState().record(dr); // Record on session's unacked list.
+ semanticState().record(dr); // Part of the session's unacked list.
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -584,46 +556,8 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri
}
}
-
-namespace {
-// find a StatefulQueueObserver that matches a given identifier
-class ObserverFinder {
- const std::string id;
- boost::shared_ptr<broker::QueueObserver> target;
- ObserverFinder(const ObserverFinder&) {}
- public:
- ObserverFinder(const std::string& _id) : id(_id) {}
- broker::StatefulQueueObserver *getObserver()
- {
- if (target)
- return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
- return 0;
- }
- void operator() (boost::shared_ptr<broker::QueueObserver> o)
- {
- if (!target) {
- broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
- if (p && p->getId() == id) {
- target = o;
- }
- }
- }
-};
-}
-
-
-void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
-{
- boost::shared_ptr<broker::Queue> queue(findQueue(qname));
- ObserverFinder finder(observerId); // find this observer
- queue->eachObserver<ObserverFinder &>(finder);
- broker::StatefulQueueObserver *so = finder.getObserver();
- if (so) {
- so->setState( state );
- QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
- return;
- }
- QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
+void Connection::expiryId(uint64_t id) {
+ cluster.getExpiryPolicy().setId(id);
}
std::ostream& operator<<(std::ostream& o, const Connection& c) {
@@ -640,7 +574,6 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
void Connection::txStart() {
txBuffer.reset(new broker::TxBuffer());
}
-
void Connection::txAccept(const framing::SequenceSet& acked) {
txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
new broker::TxAccept(acked, semanticState().getUnacked())));
@@ -656,11 +589,9 @@ void Connection::txEnqueue(const std::string& queue) {
new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
}
-void Connection::txPublish(const framing::Array& queues, bool delivered)
-{
- boost::shared_ptr<broker::TxPublish> txPub(
- new broker::TxPublish(getUpdateMessage().payload));
- for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
+void Connection::txPublish(const framing::Array& queues, bool delivered) {
+ boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
+ for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
txPub->deliverTo(findQueue((*i)->get<std::string>()));
txPub->delivered = delivered;
txBuffer->enlist(txPub);
@@ -674,51 +605,6 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
semanticState().setAccumulatedAck(s);
}
-void Connection::dtxStart(const std::string& xid,
- bool ended,
- bool suspended,
- bool failed,
- bool expired)
-{
- dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired));
- txBuffer = dtxBuffer;
-}
-
-void Connection::dtxEnd() {
- broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
- std::string xid = dtxBuffer->getXid();
- if (mgr.exists(xid))
- mgr.join(xid, dtxBuffer);
- else
- mgr.start(xid, dtxBuffer);
- dtxBuffer.reset();
- txBuffer.reset();
-}
-
-// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords
-void Connection::dtxAck() {
- dtxBuffer->enlist(
- boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords)));
- dtxAckRecords.clear();
-}
-
-void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) {
- // Save the association between DtxBuffers and the session so we
- // can set the DtxBuffers at the end of the update when the
- // DtxManager has been replicated.
- updateIn.dtxBuffers.push_back(
- UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
-}
-
-// Sent at end of work record.
-void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout)
-{
- broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
- if (timeout) mgr.setTimeout(xid, timeout);
- if (prepared) mgr.prepare(xid);
-}
-
-
void Connection::exchange(const std::string& encoded) {
Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
@@ -728,6 +614,12 @@ void Connection::exchange(const std::string& encoded) {
QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
}
+void Connection::queue(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
+ QPID_LOG(debug, cluster << " updated queue " << q->getName());
+}
+
void Connection::sessionError(uint16_t , const std::string& msg) {
// Ignore errors before isOpen(), we're not multicasting yet.
if (connection->isOpen())
@@ -786,23 +678,6 @@ void Connection::config(const std::string& encoded) {
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
}
-void Connection::doCatchupIoCallbacks() {
- // We need to process IO callbacks during the catch-up phase in
- // order to service asynchronous completions for messages
- // transferred during catch-up.
-
- if (catchUp) getBrokerConnection()->doIoCallbacks();
-}
-
-void Connection::clock(uint64_t time) {
- QPID_LOG(debug, "Cluster connection received time update");
- cluster.clock(time);
-}
-
-void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) {
- boost::shared_ptr<broker::Queue> queue(findQueue(qname));
- queue->setDequeueSincePurge(dequeueSincePurge);
-}
}} // Namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index fe66b77238..7ee85bf1aa 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,12 +24,11 @@
#include "types.h"
#include "OutputInterceptor.h"
+#include "EventFrame.h"
#include "McastFrameHandler.h"
#include "UpdateReceiver.h"
-#include "qpid/RefCounted.h"
#include "qpid/broker/Connection.h"
-#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SecureConnection.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/amqp_0_10/Connection.h"
@@ -48,7 +47,7 @@ namespace framing { class AMQFrame; }
namespace broker {
class SemanticState;
-struct QueuedMessage;
+class QueuedMessage;
class TxBuffer;
class TxAccept;
}
@@ -56,7 +55,6 @@ class TxAccept;
namespace cluster {
class Cluster;
class Event;
-struct EventFrame;
/** Intercept broker::Connection calls for shadow and local cluster connections. */
class Connection :
@@ -64,7 +62,7 @@ class Connection :
public sys::ConnectionInputHandler,
public framing::AMQP_AllOperations::ClusterConnectionHandler,
private broker::Connection::ErrorListener
-
+
{
public:
@@ -75,7 +73,7 @@ class Connection :
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
const qpid::sys::SecuritySettings& external);
~Connection();
-
+
ConnectionId getId() const { return self; }
broker::Connection* getBrokerConnection() { return connection.get(); }
const broker::Connection* getBrokerConnection() const { return connection.get(); }
@@ -110,9 +108,9 @@ class Connection :
void deliveredFrame(const EventFrame&);
void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position);
-
+
// ==== Used in catch-up mode to build initial state.
- //
+ //
// State update methods.
void shadowPrepare(const std::string&);
@@ -124,11 +122,10 @@ class Connection :
const framing::SequenceNumber& expected,
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete,
- bool dtxSelected);
-
+ const SequenceSet& receivedIncomplete);
+
void outputTask(uint16_t channel, const std::string& name);
-
+
void shadowReady(uint64_t memberId,
uint64_t connectionId,
const std::string& managementId,
@@ -156,7 +153,7 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
- void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
+ void expiryId(uint64_t);
void txStart();
void txAccept(const framing::SequenceSet&);
@@ -166,18 +163,8 @@ class Connection :
void txEnd();
void accumulatedAck(const framing::SequenceSet&);
- // Dtx state
- void dtxStart(const std::string& xid,
- bool ended,
- bool suspended,
- bool failed,
- bool expired);
- void dtxEnd();
- void dtxAck();
- void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended);
- void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
-
- // Encoded exchange replication.
+ // Encoded queue/exchange replication.
+ void queue(const std::string& encoded);
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
@@ -202,12 +189,6 @@ class Connection :
void setSecureConnection ( broker::SecureConnection * sc );
- void doCatchupIoCallbacks();
-
- void clock(uint64_t time);
-
- void queueDequeueSincePurgeState(const std::string&, uint32_t);
-
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -252,7 +233,7 @@ class Connection :
// Error listener functions
void connectionError(const std::string&);
void sessionError(uint16_t channel, const std::string&);
-
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverDoOutput(uint32_t limit);
@@ -264,11 +245,10 @@ class Connection :
broker::SemanticState& semanticState();
broker::QueuedMessage getUpdateMessage();
void closeUpdated();
- void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &);
+
Cluster& cluster;
ConnectionId self;
bool catchUp;
- bool announced;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
ConnectionCtor connectionCtor;
@@ -276,9 +256,6 @@ class Connection :
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
- boost::shared_ptr<broker::DtxBuffer> dtxBuffer;
- broker::DeliveryRecords dtxAckRecords;
- broker::DtxWorkRecord* dtxCurrent;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
UpdateReceiver& updateIn;
diff --git a/cpp/src/qpid/cluster/Decoder.h b/cpp/src/qpid/cluster/Decoder.h
index 3b5ada4a81..2e2af2868f 100644
--- a/cpp/src/qpid/cluster/Decoder.h
+++ b/cpp/src/qpid/cluster/Decoder.h
@@ -31,7 +31,7 @@
namespace qpid {
namespace cluster {
-struct EventFrame;
+class EventFrame;
class EventHeader;
/**
diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h
index a417b2ec25..de8cedafb3 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.h
+++ b/cpp/src/qpid/cluster/ErrorCheck.h
@@ -33,7 +33,7 @@
namespace qpid {
namespace cluster {
-struct EventFrame;
+class EventFrame;
class Cluster;
class Multicaster;
class Connection;
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index da2bc89d8c..cd775ce2f1 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,6 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/RefCountedBuffer.h"
#include "qpid/assert.h"
#include <ostream>
#include <iterator>
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index 13283edff7..07f74d3ba5 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,7 @@
*/
#include "qpid/cluster/types.h"
-#include "qpid/BufferRef.h"
+#include "qpid/RefCountedBuffer.h"
#include "qpid/framing/AMQFrame.h"
#include <sys/uio.h> // For iovec
#include <iosfwd>
@@ -53,7 +53,7 @@ class EventHeader {
/** Size of payload data, excluding header. */
size_t getSize() const { return size; }
- /** Size of header + payload. */
+ /** Size of header + payload. */
size_t getStoreSize() const { return size + HEADER_SIZE; }
bool isCluster() const { return connectionId.getNumber() == 0; }
@@ -62,7 +62,7 @@ class EventHeader {
protected:
static const size_t HEADER_SIZE;
-
+
EventType type;
ConnectionId connectionId;
size_t size;
@@ -86,25 +86,25 @@ class Event : public EventHeader {
/** Create a control event. */
static Event control(const framing::AMQFrame&, const ConnectionId&);
-
+
// Data excluding header.
- char* getData() { return store.begin() + HEADER_SIZE; }
- const char* getData() const { return store.begin() + HEADER_SIZE; }
+ char* getData() { return store + HEADER_SIZE; }
+ const char* getData() const { return store + HEADER_SIZE; }
// Store including header
- char* getStore() { return store.begin(); }
- const char* getStore() const { return store.begin(); }
-
- const framing::AMQFrame& getFrame() const;
+ char* getStore() { return store; }
+ const char* getStore() const { return store; }
+ const framing::AMQFrame& getFrame() const;
+
operator framing::Buffer() const;
iovec toIovec() const;
-
+
private:
void encodeHeader() const;
- BufferRef store;
+ RefCountedBuffer::pointer store;
mutable framing::AMQFrame frame;
};
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index 6b702a9bf8..61447c5525 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ struct EventFrame
ConnectionId connectionId;
- framing::AMQFrame frame;
+ framing::AMQFrame frame;
int readCredit; ///< last frame in an event, give credit when processed.
EventType type;
};
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index 0ef5c2a35d..d9a7b0122a 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -21,21 +21,106 @@
#include "qpid/broker/Message.h"
#include "qpid/cluster/ExpiryPolicy.h"
-#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Multicaster.h"
+#include "qpid/framing/ClusterMessageExpiredBody.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {}
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
+ : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
+struct ExpiryTask : public sys::TimerTask {
+ ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
+ : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {}
+ void fire() { expiryPolicy->sendExpire(expiryId); }
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ const uint64_t expiryId;
+};
+
+// Called while receiving an update
+void ExpiryPolicy::setId(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ expiryId = id;
+}
+
+// Called while giving an update
+uint64_t ExpiryPolicy::getId() const {
+ sys::Mutex::ScopedLock l(lock);
+ return expiryId;
+}
+
+// Called in enqueuing connection thread
+void ExpiryPolicy::willExpire(broker::Message& m) {
+ uint64_t id;
+ {
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ //
+ sys::Mutex::ScopedLock l(lock);
+ id = expiryId++;
+ if (!id) { // This is an update of an already-expired message.
+ m.setExpiryPolicy(expiredPolicy);
+ }
+ else {
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ // If this is an update, the id may already exist
+ unexpiredById.insert(IdMessageMap::value_type(id, &m));
+ unexpiredByMessage[&m] = id;
+ }
+ }
+ timer.add(new ExpiryTask(this, id, m.getExpiration()));
+}
+
+// Called in dequeueing connection thread
+void ExpiryPolicy::forget(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
+ MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+ assert(i != unexpiredByMessage.end());
+ unexpiredById.erase(i->second);
+ unexpiredByMessage.erase(i);
+}
+
+// Called in dequeueing connection or cleanup thread.
bool ExpiryPolicy::hasExpired(broker::Message& m) {
- return m.getExpiration() < cluster.getClusterTime();
+ sys::Mutex::ScopedLock l(lock);
+ return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
+}
+
+// Called in timer thread
+void ExpiryPolicy::sendExpire(uint64_t id) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ // Don't multicast an expiry notice if message is already forgotten.
+ if (unexpiredById.find(id) == unexpiredById.end()) return;
+ }
+ mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
-sys::AbsTime ExpiryPolicy::getCurrentTime() {
- return cluster.getClusterTime();
+// Called in CPG deliver thread.
+void ExpiryPolicy::deliverExpire(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
+ IdMessageMap::iterator i = expired.first;
+ while (i != expired.second) {
+ i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
+ unexpiredByMessage.erase(i->second);
+ unexpiredById.erase(i++);
+ }
}
+// Called in update thread on the updater.
+boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
+ MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+ return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
+}
+
+bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
+void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h
index d8ddbca8b3..77a656aa68 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -36,8 +36,12 @@ namespace broker {
class Message;
}
+namespace sys {
+class Timer;
+}
+
namespace cluster {
-class Cluster;
+class Multicaster;
/**
* Cluster expiry policy
@@ -45,13 +49,43 @@ class Cluster;
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(Cluster& cluster);
+ ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
+ void willExpire(broker::Message&);
bool hasExpired(broker::Message&);
- qpid::sys::AbsTime getCurrentTime();
+ void forget(broker::Message&);
+
+ // Send expiration notice to cluster.
+ void sendExpire(uint64_t);
+ // Cluster delivers expiry notice.
+ void deliverExpire(uint64_t);
+
+ void setId(uint64_t id);
+ uint64_t getId() const;
+
+ boost::optional<uint64_t> getId(broker::Message&);
+
private:
- Cluster& cluster;
+ typedef std::map<broker::Message*, uint64_t> MessageIdMap;
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
+
+ struct Expired : public broker::ExpiryPolicy {
+ bool hasExpired(broker::Message&);
+ void willExpire(broker::Message&);
+ };
+
+ mutable sys::Mutex lock;
+ MessageIdMap unexpiredByMessage;
+ IdMessageMap unexpiredById;
+ uint64_t expiryId;
+ boost::intrusive_ptr<Expired> expiredPolicy;
+ Multicaster& mcast;
+ MemberId memberId;
+ sys::Timer& timer;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp
index cfbe34a460..84232dac1b 100644
--- a/cpp/src/qpid/cluster/FailoverExchange.cpp
+++ b/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,10 +39,8 @@ using namespace broker;
using namespace framing;
const string FailoverExchange::typeName("amq.failover");
-
-FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)
- : Exchange(typeName, parent, b ), ready(false)
-{
+
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) {
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
@@ -55,17 +53,16 @@ void FailoverExchange::setUrls(const vector<Url>& u) {
void FailoverExchange::updateUrls(const vector<Url>& u) {
Lock l(lock);
urls=u;
- if (ready && !urls.empty()) {
- std::for_each(queues.begin(), queues.end(),
- boost::bind(&FailoverExchange::sendUpdate, this, _1));
- }
+ if (urls.empty()) return;
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(&FailoverExchange::sendUpdate, this, _1));
}
string FailoverExchange::getType() const { return typeName; }
bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
Lock l(lock);
- if (ready) sendUpdate(queue);
+ sendUpdate(queue);
return queues.insert(queue).second;
}
@@ -87,7 +84,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
// Called with lock held.
if (urls.empty()) return;
framing::Array array(0x95);
- for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
const ProtocolVersion v;
boost::intrusive_ptr<Message> msg(new Message);
@@ -99,12 +96,9 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
AMQFrame headerFrame(header);
headerFrame.setFirstSegment(false);
- msg->getFrames().append(headerFrame);
+ msg->getFrames().append(headerFrame);
DeliverableMessage(msg).deliverTo(queue);
}
-void FailoverExchange::setReady() {
- ready = true;
-}
}} // namespace cluster
diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h
index c3e50c6929..2e1edfc0ae 100644
--- a/cpp/src/qpid/cluster/FailoverExchange.h
+++ b/cpp/src/qpid/cluster/FailoverExchange.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,8 +46,6 @@ class FailoverExchange : public broker::Exchange
void setUrls(const std::vector<Url>&);
/** Set the URLs and send an update.*/
void updateUrls(const std::vector<Url>&);
- /** Flag the failover exchange as ready to generate updates (caught up) */
- void setReady();
// Exchange overrides
std::string getType() const;
@@ -58,7 +56,7 @@ class FailoverExchange : public broker::Exchange
private:
void sendUpdate(const boost::shared_ptr<broker::Queue>&);
-
+
typedef sys::Mutex::ScopedLock Lock;
typedef std::vector<Url> Urls;
typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
@@ -66,7 +64,7 @@ class FailoverExchange : public broker::Exchange
sys::Mutex lock;
Urls urls;
Queues queues;
- bool ready;
+
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 217641841c..8916de9628 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -21,7 +21,6 @@
#include "qpid/cluster/Multicaster.h"
#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/Cluster.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/AMQFrame.h"
@@ -59,7 +58,7 @@ void Multicaster::mcast(const Event& e) {
return;
}
}
- QPID_LOG_IF(trace, e.isControl() && Cluster::loggable(e.getFrame()), "MCAST " << e);
+ QPID_LOG(trace, "MCAST " << e);
if (bypass) { // direct, don't queue
iovec iov = e.toIovec();
while (!cpg.mcast(&iov, 1))
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 4bf03eefa2..1354dab17b 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,11 +45,12 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
}
void OutputInterceptor::activateOutput() {
- sys::Mutex::ScopedLock l(lock);
- if (parent.isCatchUp())
+ if (parent.isCatchUp()) {
+ sys::Mutex::ScopedLock l(lock);
next->activateOutput();
+ }
else
- sendDoOutput(sendMax, l);
+ sendDoOutput(sendMax);
}
void OutputInterceptor::abort() {
@@ -65,38 +66,29 @@ void OutputInterceptor::giveReadCredit(int32_t credit) {
}
// Called in write thread when the IO layer has no more data to write.
-// We only process IO callbacks in the write thread during catch-up.
-// Normally we run doOutput only on delivery of doOutput requests.
-bool OutputInterceptor::doOutput() {
- parent.doCatchupIoCallbacks();
- return false;
-}
+// We do nothing in the write thread, we run doOutput only on delivery
+// of doOutput requests.
+bool OutputInterceptor::doOutput() { return false; }
-// Send output up to limit, calculate new limit.
+// Send output up to limit, calculate new limit.
void OutputInterceptor::deliverDoOutput(uint32_t limit) {
- sys::Mutex::ScopedLock l(lock);
sentDoOutput = false;
sendMax = limit;
size_t newLimit = limit;
if (parent.isLocal()) {
- size_t buffered = next->getBuffered();
+ size_t buffered = getBuffered();
if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
- newLimit = sendMax*2;
+ newLimit = sendMax*2;
else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
newLimit = (sendMax + sent) / 2;
}
sent = 0;
- while (sent < limit) {
- {
- sys::Mutex::ScopedUnlock u(lock);
- if (!parent.getBrokerConnection()->doOutput()) break;
- }
+ while (sent < limit && parent.getBrokerConnection()->doOutput())
++sent;
- }
- if (sent == limit) sendDoOutput(newLimit, l);
+ if (sent == limit) sendDoOutput(newLimit);
}
-void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
+void OutputInterceptor::sendDoOutput(size_t newLimit) {
if (parent.isLocal() && !sentDoOutput && !closing) {
sentDoOutput = true;
parent.getCluster().getMulticast().mcastControl(
@@ -105,7 +97,6 @@ void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLo
}
}
-// Called in connection thread when local connection closes.
void OutputInterceptor::closeOutput() {
sys::Mutex::ScopedLock l(lock);
closing = true;
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h
index 3abf5273a0..65bd82a4fc 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -58,13 +58,13 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
uint32_t getSendMax() const { return sendMax; }
void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; }
-
+
cluster::Connection& parent;
-
+
private:
typedef sys::Mutex::ScopedLock Locker;
- void sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&);
+ void sendDoOutput(size_t newLimit);
mutable sys::Mutex lock;
bool closing;
diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp
index 2672d8360c..6ddef66226 100644
--- a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp
+++ b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, cons
if (clusterCodec) {
SecureConnectionPtr sc(new SecureConnection());
clusterCodec->setSecureConnection(sc.get());
- sc->setCodec(codec);
+ sc->setCodec(codec);
return sc.release();
}
return 0;
@@ -63,7 +63,7 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
if (clusterCodec) {
SecureConnectionPtr sc(new SecureConnection());
clusterCodec->setSecureConnection(sc.get());
- sc->setCodec(codec);
+ sc->setCodec(codec);
return sc.release();
}
return 0;
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 2446c12f2b..8f751add9b 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,9 +26,9 @@
#include "qpid/cluster/Decoder.h"
#include "qpid/cluster/ExpiryPolicy.h"
#include "qpid/cluster/UpdateDataExchange.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/ConnectionAccess.h"
-#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
@@ -45,13 +45,10 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/TxOpVisitor.h"
#include "qpid/broker/DtxAck.h"
-#include "qpid/broker/DtxBuffer.h"
-#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TxPublish.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/RecoveredEnqueue.h"
-#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -67,7 +64,6 @@
#include <boost/bind.hpp>
#include <boost/cast.hpp>
#include <algorithm>
-#include <iterator>
#include <sstream>
namespace qpid {
@@ -86,20 +82,11 @@ using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
-// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("x-qpid.cluster-update");
-// Name for header used to carry expiration information.
-const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration";
-// Headers used to flag headers/properties added by the UpdateClient so they can be
-// removed on the other side.
-const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props";
-const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers";
-
std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
return o << "cluster(" << c.updaterId << " UPDATER)";
}
-struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
{
boost::shared_ptr<qpid::client::ConnectionImpl> connection;
@@ -133,7 +120,7 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
+ broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
@@ -147,11 +134,13 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
UpdateClient::~UpdateClient() {}
+// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+const std::string UpdateClient::UPDATE("qpid.cluster-update");
+
void UpdateClient::run() {
try {
connection.open(updateeUrl, connectionSettings);
session = connection.newSession(UPDATE);
- session.sync();
update();
done();
} catch (const std::exception& e) {
@@ -165,13 +154,6 @@ void UpdateClient::update() {
<< " at " << updateeUrl);
Broker& b = updaterBroker;
- if(b.getExpiryPolicy()) {
- QPID_LOG(debug, *this << "Updating updatee with cluster time");
- qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime();
- int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime);
- ClusterConnectionProxy(session).clock(time);
- }
-
updateManagementSetupState();
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -181,20 +163,16 @@ void UpdateClient::update() {
// longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
-
std::for_each(connections.begin(), connections.end(),
boost::bind(&UpdateClient::updateConnection, this, _1));
-
- // some Queue Observers need session state & msgs synced first, so sync observers now
- b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
+ session.queueDelete(arg::queue=UPDATE);
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
+ ClusterConnectionProxy(session).expiryId(expiry.getId());
updateLinks();
updateManagementAgent();
- updateDtxManager();
- session.queueDelete(arg::queue=UPDATE);
session.close();
@@ -206,7 +184,7 @@ void UpdateClient::update() {
// NOTE: connection will be closed from the other end, don't close
// it here as that causes a race.
-
+
// TODO aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
@@ -298,7 +276,7 @@ class MessageUpdater {
framing::SequenceNumber lastPos;
client::AsyncSession session;
ExpiryPolicy& expiry;
-
+
public:
MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
@@ -315,6 +293,7 @@ class MessageUpdater {
}
}
+
void updateQueuedMessage(const broker::QueuedMessage& message) {
// Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
@@ -323,23 +302,10 @@ class MessageUpdater {
}
lastPos = message.position;
- // if the ttl > 0, we need to send the calculated expiration time to the updatee
- const DeliveryProperties* dprops =
- message.payload->getProperties<DeliveryProperties>();
- if (dprops && dprops->getTtl() > 0) {
- bool hadMessageProps =
- message.payload->hasProperties<framing::MessageProperties>();
- const framing::MessageProperties* mprops =
- message.payload->getProperties<framing::MessageProperties>();
- bool hadApplicationHeaders = mprops->hasApplicationHeaders();
- message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION,
- sys::Duration(sys::EPOCH, message.payload->getExpiration()));
- // If message properties or application headers didn't exist
- // prior to us adding data, we want to remove them on the other side.
- if (!hadMessageProps)
- message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
- else if (!hadApplicationHeaders)
- message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0);
+ // Send the expiry ID if necessary.
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+ boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
+ ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
}
// We can't send a broker::Message via the normal client API,
@@ -352,7 +318,7 @@ class MessageUpdater {
framing::MessageTransferBody transfer(
*message.payload->getFrames().as<framing::MessageTransferBody>());
transfer.setDestination(UpdateClient::UPDATE);
-
+
sb.get()->send(transfer, message.payload->getFrames(),
!message.payload->isContentReleased());
if (message.payload->isContentReleased()){
@@ -360,10 +326,9 @@ class MessageUpdater {
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
- morecontent = message.payload->getContentFrame(
- *(message.queue), frame, maxContentSize, offset);
+ morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
sb.get()->sendRawFrame(frame);
}
}
@@ -392,8 +357,6 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
}
-
- ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
@@ -409,11 +372,7 @@ void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue
}
void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) {
- if (binding.exchange.size())
- s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
- //else its the default exchange and there is no need to replicate
- //the binding, the creation of the queue will have done so
- //automatically
+ s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
@@ -421,8 +380,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
uint16_t channel = ci->getParent().getSession().getChannel();
- ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag());
- QPID_LOG(debug, *this << " updating output task " << ci->getTag()
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
+ QPID_LOG(debug, *this << " updating output task " << ci->getName()
<< " channel=" << channel);
}
@@ -430,7 +389,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
QPID_LOG(debug, *this << " updating connection " << *updateConnection);
assert(updateConnection->getBrokerConnection());
broker::Connection& bc = *updateConnection->getBrokerConnection();
-
+
// Send the management ID first on the main connection.
std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
ClusterConnectionProxy(session).shadowPrepare(mgmtId);
@@ -467,7 +426,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating session " << ss->getId());
- // Create a client session to update session state.
+ // Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
simpl->disableAutoDetach();
@@ -486,19 +445,19 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
- boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession));
+ boost::bind(&UpdateClient::updateUnacked, this, _1));
- updateTransactionState(ss->getSemanticState());
+ updateTxState(ss->getSemanticState()); // Tx transaction state.
// Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
- if (inProgress)
+ if (inProgress)
--received;
// Sync the session to ensure all responses from broker have been processed.
shadowSession.sync();
-
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
@@ -507,8 +466,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
std::max(received, ss->receiverGetExpected().command),
received,
ss->receiverGetUnknownComplete(),
- ss->receiverGetIncomplete(),
- ss->getSemanticState().getDtxSelected()
+ ss->receiverGetIncomplete()
);
// Send frames for partial message in progress.
@@ -521,13 +479,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
void UpdateClient::updateConsumer(
const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
{
- QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on "
+ QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
- arg::destination = ci->getTag(),
+ arg::destination = ci->getName(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
arg::exclusive = ci->isExclusive(),
@@ -535,32 +493,29 @@ void UpdateClient::updateConsumer(
arg::resumeTtl = ci->getResumeTtl(),
arg::arguments = ci->getArguments()
);
- shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
- shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
- shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+ shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+ shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+ shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
ClusterConnectionProxy(shadowSession).consumerState(
- ci->getTag(),
+ ci->getName(),
ci->isBlocked(),
ci->isNotifyEnabled(),
ci->position
);
consumerNumbering.add(ci.get());
- QPID_LOG(debug, *this << " updated consumer " << ci->getTag()
+ QPID_LOG(debug, *this << " updated consumer " << ci->getName()
<< " on " << shadowSession.getId());
}
-
-void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
- client::AsyncSession& updateSession)
-{
- if (!dr.isEnded() && dr.isAcquired()) {
- assert(dr.getMessage().payload);
+
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
+ if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
- MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
}
- ClusterConnectionProxy(updateSession).deliveryRecord(
+ ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
dr.getTag(),
@@ -581,12 +536,10 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
: MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
- void operator()(const broker::DtxAck& ack) {
- std::for_each(ack.getPending().begin(), ack.getPending().end(),
- boost::bind(&UpdateClient::updateUnacked, &parent, _1, session));
- proxy.dtxAck();
+ void operator()(const broker::DtxAck& ) {
+ throw InternalErrorException("DTX transactions not currently supported by cluster.");
}
-
+
void operator()(const broker::RecoveredDequeue& rdeq) {
updateMessage(rdeq.getMessage());
proxy.txEnqueue(rdeq.getQueue()->getName());
@@ -601,18 +554,13 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
proxy.txAccept(txAccept.getAcked());
}
- typedef std::list<Queue::shared_ptr> QueueList;
-
- void copy(const QueueList& l, Array& a) {
- for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i)
- a.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
- }
-
void operator()(const broker::TxPublish& txPub) {
updateMessage(txPub.getMessage());
- assert(txPub.getQueues().empty() || txPub.getPrepared().empty());
+ typedef std::list<Queue::shared_ptr> QueueList;
+ const QueueList& qlist = txPub.getQueues();
Array qarray(TYPE_CODE_STR8);
- copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray);
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
+ qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
proxy.txPublish(qarray, txPub.delivered);
}
@@ -621,44 +569,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
client::AsyncSession session;
ClusterConnectionProxy proxy;
};
-
-void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
-{
- ClusterConnectionProxy proxy(shadowSession);
- broker::DtxWorkRecord* record =
- updaterBroker.getDtxManager().getWork(dtx->getXid());
- proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
-
-}
-
-void UpdateClient::updateTransactionState(broker::SemanticState& s) {
+
+void UpdateClient::updateTxState(broker::SemanticState& s) {
+ QPID_LOG(debug, *this << " updating TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
- broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
- broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
- if (dtx) {
- updateBufferRef(dtx, false); // Current transaction.
- } else if (tx) {
+ broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
+ if (txBuffer) {
proxy.txStart();
TxOpUpdater updater(*this, shadowSession, expiry);
- tx->accept(updater);
+ txBuffer->accept(updater);
proxy.txEnd();
}
- for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
- i != s.getSuspendedXids().end();
- ++i)
- {
- updateBufferRef(i->second, true);
- }
-}
-
-void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
- ClusterConnectionProxy proxy(session);
- proxy.dtxStart(
- dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired());
- TxOpUpdater updater(*this, session, expiry);
- dtx->accept(updater);
- proxy.dtxEnd();
}
void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
@@ -693,35 +615,4 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge)
ClusterConnectionProxy(session).config(encode(*bridge));
}
-void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q)
-{
- q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1));
-}
-
-void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
- boost::shared_ptr<broker::QueueObserver> o)
-{
- qpid::framing::FieldTable state;
- broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
- if (so) {
- so->getState( state );
- std::string id(so->getId());
- QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id);
- ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state );
- }
-}
-
-void UpdateClient::updateDtxManager() {
- broker::DtxManager& dtm = updaterBroker.getDtxManager();
- dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1));
-}
-
-void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) {
- QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid());
- for (size_t i = 0; i < r.size(); ++i)
- updateDtxBuffer(r[i]);
- ClusterConnectionProxy(session).dtxWorkRecord(
- r.getXid(), r.isPrepared(), r.getTimeout());
-}
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 481ee357c7..7520bb82cb 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,7 +34,7 @@
namespace qpid {
-struct Url;
+class Url;
namespace broker {
@@ -42,8 +42,8 @@ class Broker;
class Queue;
class Exchange;
class QueueBindings;
-struct QueueBinding;
-struct QueuedMessage;
+class QueueBinding;
+class QueuedMessage;
class SessionHandler;
class DeliveryRecord;
class SessionState;
@@ -51,8 +51,7 @@ class SemanticState;
class Decoder;
class Link;
class Bridge;
-class QueueObserver;
-class DtxBuffer;
+
} // namespace broker
namespace cluster {
@@ -69,26 +68,21 @@ class ExpiryPolicy;
class UpdateClient : public sys::Runnable {
public:
static const std::string UPDATE; // Name for special update queue and exchange.
- static const std::string X_QPID_EXPIRATION; // Update message expiration
- // Flag to remove props/headers that were added by the UpdateClient
- static const std::string X_QPID_NO_MESSAGE_PROPS;
- static const std::string X_QPID_NO_HEADERS;
-
static client::Connection catchUpConnection();
-
+
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail,
- const client::ConnectionSettings&
+ const client::ConnectionSettings&
);
~UpdateClient();
void update();
void run(); // Will delete this when finished.
- void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&);
+ void updateUnacked(const broker::DeliveryRecord&);
private:
void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&);
@@ -100,8 +94,7 @@ class UpdateClient : public sys::Runnable {
void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
void updateConnection(const boost::intrusive_ptr<Connection>& connection);
void updateSession(broker::SessionHandler& s);
- void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended);
- void updateTransactionState(broker::SemanticState& s);
+ void updateTxState(broker::SemanticState& s);
void updateOutputTask(const sys::OutputTask* task);
void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
@@ -111,11 +104,6 @@ class UpdateClient : public sys::Runnable {
void updateLinks();
void updateLink(const boost::shared_ptr<broker::Link>&);
void updateBridge(const boost::shared_ptr<broker::Bridge>&);
- void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
- void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
- void updateDtxManager();
- void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& );
- void updateDtxWorkRecord(const broker::DtxWorkRecord&);
Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index e5cd82e3d3..2a079b8881 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -36,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents")
const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
+std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
+ return o << "cluster(" << c.clusterId << " UPDATER)";
+}
+
UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
- Exchange(EXCHANGE_NAME, &cluster)
+ Exchange(EXCHANGE_NAME, &cluster),
+ clusterId(cluster.getId())
{}
void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -57,9 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
agent->importAgents(buf1);
+ QPID_LOG(debug, *this << " updated management agents.");
framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
agent->importSchemas(buf2);
+ QPID_LOG(debug, *this << " updated management schemas.");
using amqp_0_10::ListCodec;
using types::Variant;
@@ -71,6 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen
new management::ManagementAgent::DeletedObject(*i)));
}
agent->importDeletedObjects(objects);
+ QPID_LOG(debug, *this << " updated management deleted objects.");
}
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h
index d2f6c35ad0..8c493e400a 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -74,9 +74,11 @@ class UpdateDataExchange : public broker::Exchange
void updateManagementAgent(management::ManagementAgent* agent);
private:
+ MemberId clusterId;
std::string managementAgents;
std::string managementSchemas;
std::string managementDeletedObjects;
+ friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp
index cb1376004e..11937f296f 100644
--- a/cpp/src/qpid/cluster/UpdateExchange.cpp
+++ b/cpp/src/qpid/cluster/UpdateExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@
*
*/
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
#include "UpdateExchange.h"
@@ -28,8 +27,6 @@ namespace cluster {
using framing::MessageTransferBody;
using framing::DeliveryProperties;
-using framing::MessageProperties;
-using framing::FieldTable;
UpdateExchange::UpdateExchange(management::Manageable* parent)
: broker::Exchange(UpdateClient::UPDATE, parent),
@@ -37,7 +34,6 @@ UpdateExchange::UpdateExchange(management::Manageable* parent)
void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
- // Copy exchange name to destination property.
MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
assert(transfer);
const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
@@ -46,23 +42,6 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>&
transfer->setDestination(props->getExchange());
else
transfer->clearDestinationFlag();
-
- // Copy expiration from x-property if present.
- if (msg->hasProperties<MessageProperties>()) {
- const MessageProperties* mprops = msg->getProperties<MessageProperties>();
- if (mprops->hasApplicationHeaders()) {
- const FieldTable& headers = mprops->getApplicationHeaders();
- if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
- msg->setExpiration(
- sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
- msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION);
- // Erase props/headers that were added by the UpdateClient
- if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
- msg->eraseProperties<MessageProperties>();
- else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
- msg->clearApplicationHeadersFlag();
- }
- }
- }
}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h
index 81ee3a5ffe..7e8ce47662 100644
--- a/cpp/src/qpid/cluster/UpdateReceiver.h
+++ b/cpp/src/qpid/cluster/UpdateReceiver.h
@@ -39,20 +39,6 @@ class UpdateReceiver {
/** Management-id for the next shadow connection */
std::string nextShadowMgmtId;
-
- /** Record the position of a DtxBuffer in the DtxManager (xid + index)
- * and the association with a session, either suspended or current.
- */
- struct DtxBufferRef {
- std::string xid;
- uint32_t index; // Index in WorkRecord in DtxManager
- bool suspended; // Is this a suspended or current transaction?
- broker::SemanticState* semanticState; // Associated session
- DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss)
- : xid(x), index(i), suspended(s), semanticState(ss) {}
- };
- typedef std::vector<DtxBufferRef> DtxBuffers;
- DtxBuffers dtxBuffers;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index bfb4fd5b9e..0795e5e77a 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -24,7 +24,6 @@
#include "config.h"
#include "qpid/Url.h"
-#include "qpid/RefCounted.h"
#include "qpid/sys/IntegerTypes.h"
#include <boost/intrusive_ptr.hpp>
#include <utility>