diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
28 files changed, 417 insertions, 696 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index e6e3de64f2..dd4882774b 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -36,45 +36,45 @@ * * IMPORTANT NOTE: any time code is added to the broker that uses timers, * the cluster may need to be updated to take account of this. - * + * * * USE OF TIMESTAMPS IN THE BROKER - * + * * The following are the current areas where broker uses timers or timestamps: - * + * * - Producer flow control: broker::SemanticState uses * connection::getClusterOrderOutput. a FrameHandler that sends * frames to the client via the cluster. Used by broker::SessionState - * + * * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is * implemented by cluster::ExpiryPolicy. - * + * * - Connection heartbeat: sends connection controls, not part of * session command counting so OK to ignore. - * + * * - LinkRegistry: only cluster elder is ever active for links. - * + * * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. + * + * - Dtx: not yet supported with cluster. * - * cluster::ExpiryPolicy uses cluster time. + * cluster::ExpiryPolicy implements the strategy for message expiry. * * ClusterTimer implements periodic timed events in the cluster context. - * Used for: - * - periodic management events. - * - DTX transaction timeouts. + * Used for periodic management events. * * <h1>CLUSTER PROTOCOL OVERVIEW</h1> - * + * * Messages sent to/from CPG are called Events. * * An Event carries a ConnectionId, which includes a MemberId and a * connection number. - * + * * Events are either * - Connection events: non-0 connection number and are associated with a connection. * - Cluster Events: 0 connection number, are not associated with a connection. - * + * * Events are further categorized as: * - Control: carries method frame(s) that affect cluster behavior. * - Data: carries raw data received from a client connection. @@ -146,7 +146,6 @@ #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterClockBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" #include "qpid/framing/ClusterRetractOfferBody.h" @@ -199,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1159329; +const uint32_t Cluster::CLUSTER_VERSION = 1058747; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -215,7 +214,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { { cluster.initialStatus( member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, + framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } void ready(const std::string& url) { @@ -231,21 +230,21 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } + void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } - void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); } + void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } void deliverToQueue(const std::string& queue, const std::string& message) { cluster.deliverToQueue(queue, message, l); } - void clock(uint64_t time) { cluster.clock(time, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : - settings(set), + settings(set), broker(b), mgmtObject(0), poller(b.getPoller()), @@ -254,7 +253,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : self(cpg.self()), clusterId(true), mAgent(0), - expiryPolicy(new ExpiryPolicy(*this)), + expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -278,11 +277,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : lastBroker(false), updateRetracted(false), updateClosed(false), - error(*this), - acl(0) + error(*this) { - broker.setInCluster(true); - // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. timer = new ClusterTimer(*this); @@ -303,7 +299,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - clusterId = store.getClusterId(); + clusterId = store.getClusterId(); QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); @@ -364,15 +360,14 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. - assert(discarding); + assert(discarding); pair<ConnectionMap::iterator, bool> ib = connections.insert(ConnectionMap::value_type(c->getId(), c)); - // Like this to avoid tripping up unused variable warning when NDEBUG set - if (!ib.second) assert(ib.second); + assert(ib.second); } void Cluster::erase(const ConnectionId& id) { - Lock l(lock); + Lock l(lock); erase(id,l); } @@ -398,9 +393,9 @@ std::vector<Url> Cluster::getUrls() const { std::vector<Url> Cluster::getUrls(Lock&) const { return map.memberUrls(); -} +} -void Cluster::leave() { +void Cluster::leave() { Lock l(lock); leave(l); } @@ -410,7 +405,7 @@ void Cluster::leave() { QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) -void Cluster::leave(Lock&) { +void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); @@ -429,7 +424,7 @@ void Cluster::deliver( uint32_t nodeid, uint32_t pid, void* msg, - int msg_len) + int msg_len) { MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); @@ -460,7 +455,7 @@ void Cluster::deliveredEvent(const Event& e) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - // Only do this for the two brokers that are directly involved in this + // Only do this for the two brokers that are directly involved in this // offer: the one making the offer, or the one receiving it. const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) { @@ -470,7 +465,7 @@ void Cluster::deliveredEvent(const Event& e) { } deliverFrame(ef); } - else if(!discarding) { + else if(!discarding) { if (e.isControl()) deliverFrame(EventFrame(e, e.getFrame())); else { @@ -512,7 +507,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { // the event queue. e.frame = AMQFrame( ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); - deliverEventQueue.start(); + deliverEventQueue.start(); } // Process each frame through the error checker. if (error.isUnresolved()) { @@ -520,14 +515,14 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); } - else + else processFrame(e, l); } void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { - QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e); + QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); @@ -536,15 +531,14 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { map.incrementFrameSeq(); ConnectionPtr connection = getConnection(e, l); if (connection) { - QPID_LOG_IF(trace, loggable(e.frame), - *this << " DLVR " << map.getFrameSeq() << ": " << e); + QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); connection->deliveredFrame(e); } else - throw Exception(QPID_MSG("Unknown connection: " << e)); + QPID_LOG(trace, *this << " DROP (no connection): " << e); } else // Drop connection frames while state < CATCHUP - QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e); + QPID_LOG(trace, *this << " DROP (joining): " << e); } // Called in deliverFrameQueue thread @@ -583,7 +577,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { } // CPG config-change callback. -void Cluster::configChange ( +void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, const cpg_address *members, int nMembers, @@ -613,7 +607,7 @@ void Cluster::setReady(Lock&) { } // Set the management status from the Cluster::state. -// +// // NOTE: Management updates are sent based on property changes. In // order to keep consistency across the cluster, we touch the local // management status property even if it is locally unchanged for any @@ -624,7 +618,7 @@ void Cluster::setMgmtStatus(Lock&) { } void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. + // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); setMgmtStatus(l); if (state == PRE_INIT) { @@ -671,8 +665,6 @@ void Cluster::initMapCompleted(Lock& l) { else { // I can go ready. discarding = false; setReady(l); - // Must be called *before* memberUpdate so first update will be generated. - failoverExchange->setReady(); memberUpdate(l); updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -709,8 +701,8 @@ void Cluster::configChange(const MemberId&, if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( - ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), + ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, + store.getState(), store.getShutdownId(), initMap.getFirstConfigStr() ), self); @@ -725,20 +717,6 @@ void Cluster::configChange(const MemberId&, updateMgmtMembership(l); // Update on every config change for consistency } -struct ClusterClockTask : public sys::TimerTask { - Cluster& cluster; - sys::Timer& timer; - - ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval) - : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {} - - void fire() { - cluster.sendClockUpdate(); - setupNextFire(); - timer.add(this); - } -}; - void Cluster::becomeElder(Lock&) { if (elder) return; // We were already the elder. // We are the oldest, reactive links if necessary @@ -746,8 +724,6 @@ void Cluster::becomeElder(Lock&) { elder = true; broker.getLinks().setPassive(false); timer->becomeElder(); - - clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval)); } void Cluster::makeOffer(const MemberId& id, Lock& ) { @@ -783,7 +759,7 @@ std::string Cluster::debugSnapshot() { // point we know the poller has stopped so no poller callbacks will be // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. -// +// void Cluster::brokerShutdown() { sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. try { cpg.shutdown(); } @@ -799,7 +775,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) } void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, - const framing::Uuid& id, + const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, const std::string& firstConfig, @@ -857,8 +833,6 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) else if (updatee == self && url) { assert(state == JOINER); state = UPDATEE; - acl = broker.getAcl(); - broker.setAcl(0); // Disable ACL during update QPID_LOG(notice, *this << " receiving update from " << updater); checkUpdateIn(l); } @@ -870,7 +844,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) if (updatee != self && url) { QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); - // Updatee will call clusterUpdate() via checkUpdateIn() when update completes + // Updatee will call clusterUpdate when update completes } } @@ -951,15 +925,13 @@ void Cluster::checkUpdateIn(Lock& l) { if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; + failoverExchange->setUrls(getUrls(l)); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); - // Must be called *after* memberUpdate() to avoid sending an extra update. - failoverExchange->setReady(); // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); - broker.setAcl(acl); // Restore ACL discarding = false; // OK to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. @@ -969,10 +941,6 @@ void Cluster::checkUpdateIn(Lock& l) { mAgent->suppress(false); // Enable management output. mAgent->clusterUpdate(); } - // Restore alternate exchange settings on exchanges. - broker.getExchanges().eachExchange( - boost::bind(&broker::Exchange::recoveryComplete, _1, - boost::ref(broker.getExchanges()))); enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } @@ -1001,7 +969,7 @@ void Cluster::updateOutDone(Lock& l) { void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending update: " << e.what()); + QPID_LOG(error, *this << " error sending update: " << e.what()); updateOutDone(l); } @@ -1099,7 +1067,7 @@ void Cluster::memberUpdate(Lock& l) { void Cluster::updateMgmtMembership(Lock& l) { if (!mgmtObject) return; std::vector<Url> urls = getUrls(l); - mgmtObject->set_clusterSize(urls.size()); + mgmtObject->set_clusterSize(urls.size()); string urlstr; for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { if (i != urls.begin()) urlstr += ";"; @@ -1146,6 +1114,10 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } +void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { + expiryPolicy->deliverExpire(id); +} + void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { // If we see an errorCheck here (rather than in the ErrorCheck // class) then we have processed succesfully past the point of the @@ -1183,35 +1155,6 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag q->deliver(msg); } -sys::AbsTime Cluster::getClusterTime() { - Mutex::ScopedLock l(lock); - return clusterTime; -} - -// This method is called during update on the updatee to set the initial cluster time. -void Cluster::clock(const uint64_t time) { - Mutex::ScopedLock l(lock); - clock(time, l); -} - -// called when broadcast message received -void Cluster::clock(const uint64_t time, Lock&) { - clusterTime = AbsTime(EPOCH, time); - AbsTime now = AbsTime::now(); - - if (!elder) { - clusterTimeOffset = Duration(now, clusterTime); - } -} - -// called by elder timer to send clock broadcast -void Cluster::sendClockUpdate() { - Mutex::ScopedLock l(lock); - int64_t nanosecondsSinceEpoch = Duration(EPOCH, now()); - nanosecondsSinceEpoch += clusterTimeOffset; - mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self); -} - bool Cluster::deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg) { @@ -1224,12 +1167,4 @@ bool Cluster::deferDeliveryImpl(const std::string& queue, return true; } -bool Cluster::loggable(const AMQFrame& f) { - const AMQMethodBody* method = (f.getMethod()); - if (!method) return true; // Not a method - bool isClock = method->amqpClassId() == ClusterClockBody::CLASS_ID - && method->amqpMethodId() == ClusterClockBody::METHOD_ID; - return !isClock; -} - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index ccec4948e6..8f73c6acca 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -56,25 +56,17 @@ namespace qpid { namespace broker { class Message; -class AclModule; } namespace framing { -class AMQFrame; class AMQBody; -struct Uuid; -} - -namespace sys { -class Timer; -class AbsTime; -class Duration; +class Uuid; } namespace cluster { class Connection; -struct EventFrame; +class EventFrame; class ClusterTimer; class UpdateDataExchange; @@ -97,10 +89,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void initialize(); // Connection map. - void addLocalConnection(const ConnectionPtr&); - void addShadowConnection(const ConnectionPtr&); - void erase(const ConnectionId&); - + void addLocalConnection(const ConnectionPtr&); + void addShadowConnection(const ConnectionPtr&); + void erase(const ConnectionId&); + // URLs of current cluster members. std::vector<std::string> getIds() const; std::vector<Url> getUrls() const; @@ -115,7 +107,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateInRetracted(); // True if we are expecting to receive catch-up connections. bool isExpectingUpdate(); - + MemberId getId() const; broker::Broker& getBroker() const; Multicaster& getMulticast() { return mcast; } @@ -143,12 +135,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg); - sys::AbsTime getClusterTime(); - void sendClockUpdate(); - void clock(const uint64_t time); - - static bool loggable(const framing::AMQFrame&); // True if the frame should be logged. - private: typedef sys::Monitor::ScopedLock Lock; @@ -158,10 +144,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { /** Version number of the cluster protocol, to avoid mixed versions. */ static const uint32_t CLUSTER_VERSION; - + // NB: A dummy Lock& parameter marks functions that must only be // called with Cluster::lock locked. - + void leave(Lock&); std::vector<std::string> getIds(Lock&) const; std::vector<Url> getUrls(Lock&) const; @@ -170,11 +156,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { void brokerShutdown(); // == Called in deliverEventQueue thread - void deliveredEvent(const Event&); + void deliveredEvent(const Event&); // == Called in deliverFrameQueue thread - void deliveredFrame(const EventFrame&); - void processFrame(const EventFrame&, Lock&); + void deliveredFrame(const EventFrame&); + void processFrame(const EventFrame&, Lock&); // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); @@ -194,12 +180,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { const std::string& left, const std::string& joined, Lock& l); + void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void timerWakeup(const MemberId&, const std::string& name, Lock&); void timerDrop(const MemberId&, const std::string& name, Lock&); void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); void deliverToQueue(const std::string& queue, const std::string& message, Lock&); - void clock(const uint64_t time, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); @@ -209,7 +195,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void setReady(Lock&); void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); - void erase(const ConnectionId&, Lock&); + void erase(const ConnectionId&, Lock&); void requestUpdate(Lock& ); void initMapCompleted(Lock&); void becomeElder(Lock&); @@ -217,7 +203,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateMgmtMembership(Lock&); // == Called in CPG dispatch thread - void deliver( // CPG deliver callback. + void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, const struct cpg_name *group, uint32_t /*nodeid*/, @@ -226,7 +212,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { int /*msg_len*/); void deliverEvent(const Event&); - + void configChange( // CPG config change callback. cpg_handle_t /*handle*/, const struct cpg_name */*group*/, @@ -277,7 +263,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Used only in deliverEventQueue thread or when stalled for update. Decoder decoder; bool discarding; - + // Remaining members are protected by lock. mutable sys::Monitor lock; @@ -290,7 +276,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { JOINER, ///< Sent update request, waiting for update offer. UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. - READY, ///< Fully operational + READY, ///< Fully operational OFFER, ///< Sent an offer, waiting for accept/reject. UPDATER, ///< Offer accepted, sending a state update. LEFT ///< Final state, left the cluster. @@ -310,13 +296,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { ErrorCheck error; UpdateReceiver updateReceiver; ClusterTimer* timer; - sys::Timer clockTimer; - sys::AbsTime clusterTime; - sys::Duration clusterTimeOffset; - broker::AclModule* acl; friend std::ostream& operator<<(std::ostream&, const Cluster&); - friend struct ClusterDispatcher; + friend class ClusterDispatcher; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index a8389095c9..040e129970 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -50,6 +50,11 @@ void insertFieldTableFromMapValue(FieldTable& ft, const ClusterMap::Map::value_t ft.setString(vt.first.str(), vt.second.str()); } +void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { + ft.clear(); + for_each(map.begin(), map.end(), bind(&insertFieldTableFromMapValue, ref(ft), _1)); +} + } ClusterMap::ClusterMap() : frameSeq(0) {} diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 69ba095f16..2962daaa07 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -72,7 +72,6 @@ struct ClusterOptions : public Options { ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.") - ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.") ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") ; } diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h index 2f7b5be20a..8e708aa139 100644 --- a/cpp/src/qpid/cluster/ClusterSettings.h +++ b/cpp/src/qpid/cluster/ClusterSettings.h @@ -35,9 +35,8 @@ struct ClusterSettings { size_t readMax; std::string username, password, mechanism; size_t size; - uint16_t clockInterval; - ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10) + ClusterSettings() : quorum(false), readMax(10), size(1) {} Url getUrl(uint16_t port) const { diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp index b4f7d00f38..f6e1c7a849 100644 --- a/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -70,7 +70,6 @@ void ClusterTimer::add(intrusive_ptr<TimerTask> task) if (i != map.end()) throw Exception(QPID_MSG("Task already exists with name " << task->getName())); map[task->getName()] = task; - // Only the elder actually activates the task with the Timer base class. if (cluster.isElder()) { QPID_LOG(trace, "Elder activating cluster timer task " << task->getName()); @@ -113,9 +112,6 @@ void ClusterTimer::deliverWakeup(const std::string& name) { else { intrusive_ptr<TimerTask> t = i->second; map.erase(i); - // Move the nextFireTime so readyToFire() is true. This is to ensure we - // don't get an error if the fired task calls setupNextFire() - t->setFired(); Timer::fire(t); } } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 394749aad2..e9b718e6de 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,8 +24,6 @@ #include "Cluster.h" #include "UpdateReceiver.h" #include "qpid/assert.h" -#include "qpid/broker/DtxAck.h" -#include "qpid/broker/DtxBuffer.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/TxBuffer.h" @@ -37,7 +35,6 @@ #include "qpid/broker/Fairshare.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" -#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" @@ -81,7 +78,7 @@ const std::string shadowPrefix("[shadow]"); Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external) - : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), + : cluster(c), self(id), catchUp(false), output(*this, out), connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), @@ -93,15 +90,13 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId member, bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external -) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out), +) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), connectionCtor(&output, cluster.getBroker(), mgmtId, external, isLink, isCatchUp ? ++catchUpId : 0, - // The first catch-up connection is not considered a shadow - // as it needs to be authenticated. - isCatchUp && self.second > 1), + isCatchUp), // isCatchUp => shadow expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), @@ -118,7 +113,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (!updateIn.nextShadowMgmtId.empty()) connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); - } + } init(); QPID_LOG(debug, cluster << " local connection " << *this); } @@ -148,7 +143,7 @@ void Connection::init() { // Called when we have consumed a read buffer to give credit to the // connection layer to continue reading. void Connection::giveReadCredit(int credit) { - if (cluster.getSettings().readMax && credit) + if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } @@ -171,7 +166,7 @@ void Connection::announce( AMQFrame frame; while (frame.decode(buf)) connection->received(frame); - connection->setUserId(username); + connection->setUserId(username); } // Do managment actions now that the connection is replicated. connection->raiseConnectEvent(); @@ -198,7 +193,7 @@ void Connection::received(framing::AMQFrame& f) { << *this << ": " << f); return; } - QPID_LOG_IF(trace, Cluster::loggable(f), cluster << " RECV " << *this << ": " << f); + QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) @@ -206,7 +201,7 @@ void Connection::received(framing::AMQFrame& f) { } else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - if (isShadow()) + if (isShadow()) cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection->getOutput().send(ok); @@ -218,9 +213,16 @@ void Connection::received(framing::AMQFrame& f) { } } -bool Connection::checkUnsupported(const AMQBody&) { - // Throw an exception for unsupported commands. Currently all are supported. - return false; +bool Connection::checkUnsupported(const AMQBody& body) { + std::string message; + if (body.getMethod()) { + switch (body.getMethod()->amqpClassId()) { + case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; + } + } + if (!message.empty()) + connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message); + return !message.empty(); } struct GiveReadCreditOnExit { @@ -239,7 +241,7 @@ void Connection::deliverDoOutput(uint32_t limit) { void Connection::deliveredFrame(const EventFrame& f) { GiveReadCreditOnExit gc(*this, f.readCredit); assert(!catchUp); - currentChannel = f.frame.getChannel(); + currentChannel = f.frame.getChannel(); if (f.frame.getBody() // frame can be emtpy with just readCredit && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. @@ -253,7 +255,7 @@ void Connection::deliveredFrame(const EventFrame& f) { } } -// A local connection is closed by the network layer. Called in the connection thread. +// A local connection is closed by the network layer. void Connection::closed() { try { if (isUpdated()) { @@ -270,9 +272,8 @@ void Connection::closed() { // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.closeOutput(); - if (announced) - cluster.getMulticast().mcastControl( - ClusterConnectionDeliverCloseBody(), self); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(), self); } } catch (const std::exception& e) { @@ -286,7 +287,7 @@ void Connection::deliverClose () { cluster.erase(self); } -// Close the connection +// Close the connection void Connection::close() { if (connection.get()) { QPID_LOG(debug, cluster << " closed connection " << *this); @@ -319,10 +320,10 @@ size_t Connection::decode(const char* data, size_t size) { while (localDecoder.decode(buf)) received(localDecoder.getFrame()); if (!wasOpen && connection->isOpen()) { - // Connections marked with setUserProxyAuth are allowed to proxy + // Connections marked as federation links are allowed to proxy // messages with user-ID that doesn't match the connection's // authenticated ID. This is important for updates. - connection->setUserProxyAuth(isCatchUp()); + connection->setFederationLink(isCatchUp()); } } else { // Multicast local connections. @@ -331,9 +332,9 @@ size_t Connection::decode(const char* data, size_t size) { if (!checkProtocolHeader(ptr, size)) // Updates ptr return 0; // Incomplete header - if (!connection->isOpen()) + if (!connection->isOpen()) processInitialFrames(ptr, end-ptr); // Updates ptr - + if (connection->isOpen() && end - ptr > 0) { // We're multi-casting, we will give read credit on delivery. grc.credit = 0; @@ -383,7 +384,6 @@ void Connection::processInitialFrames(const char*& ptr, size_t size) { connection->getUserId(), initialFrames), getId()); - announced = true; initialFrames.clear(); } } @@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std::string& userId) { void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) { - broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); - c->position = position; - c->setBlocked(blocked); - if (notifyEnabled) c->enableNotify(); else c->disableNotify(); - updateIn.consumerNumbering.add(c); + broker::SemanticState::ConsumerImpl& c = semanticState().find(name); + c.position = position; + c.setBlocked(blocked); + if (notifyEnabled) c.enableNotify(); else c.disableNotify(); + updateIn.consumerNumbering.add(c.shared_from_this()); } @@ -421,8 +421,7 @@ void Connection::sessionState( const SequenceNumber& expected, const SequenceNumber& received, const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete, - bool dtxSelected) + const SequenceSet& receivedIncomplete) { sessionState().setState( replayStart, @@ -432,10 +431,8 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - if (dtxSelected) semanticState().selectDtx(); - QPID_LOG(debug, cluster << " received session state update for " - << sessionState().getId()); - // The output tasks will be added later in the update process. + QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); + // The output tasks will be added later in the update process. connection->getOutputTasks().removeAll(); } @@ -444,7 +441,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) { if (!session) throw Exception(QPID_MSG(cluster << " channel not attached " << *this << "[" << channel << "] ")); - OutputTask* task = session->getSemanticState().find(name).get(); + OutputTask* task = &session->getSemanticState().find(name); connection->getOutputTasks().addOutputTask(task); } @@ -464,24 +461,11 @@ void Connection::shadowReady( output.setSendMax(sendMax); } -void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) { - broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); - broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid); - broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index]; - if (bufRef.suspended) - bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer; - else - bufRef.semanticState->setDtxBuffer(buffer); -} - -// Marks the end of the update. void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); updateIn.consumerNumbering.clear(); - for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(), - boost::bind(&Connection::setDtxBuffer, this, _1)); closeUpdated(); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); } @@ -494,7 +478,7 @@ void Connection::retractOffer() { void Connection::closeUpdated() { self.second = 0; // Mark this as completed update connection. - if (connection.get()) + if (connection.get()) connection->close(connection::CLOSE_CODE_NORMAL, "OK"); } @@ -545,20 +529,12 @@ void Connection::deliveryRecord(const string& qname, m = getUpdateMessage(); m.queue = queue.get(); m.position = position; - if (enqueued) queue->updateEnqueued(m); //inform queue of the message + if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue - queue->find(position, m); + m = queue->find(position); } - // FIXME aconway 2011-08-19: removed: - // if (!m.payload) - // throw Exception(QPID_MSG("deliveryRecord no update message")); - // - // It seems this could happen legitimately in the case one - // session browses message M, then another session acquires - // it. In that case the browsers delivery record is !acquired - // but the message is not on its original Queue. In that case - // we'll get a deliveryRecord with no payload for the browser. - // + if (!m.payload) + throw Exception(QPID_MSG("deliveryRecord no update message")); } broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); @@ -566,11 +542,7 @@ void Connection::deliveryRecord(const string& qname, if (cancelled) dr.cancel(dr.getTag()); if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - - if (dtxBuffer) // Record for next dtx-ack - dtxAckRecords.push_back(dr); - else - semanticState().record(dr); // Record on session's unacked list. + semanticState().record(dr); // Part of the session's unacked list. } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -584,46 +556,8 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri } } - -namespace { -// find a StatefulQueueObserver that matches a given identifier -class ObserverFinder { - const std::string id; - boost::shared_ptr<broker::QueueObserver> target; - ObserverFinder(const ObserverFinder&) {} - public: - ObserverFinder(const std::string& _id) : id(_id) {} - broker::StatefulQueueObserver *getObserver() - { - if (target) - return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); - return 0; - } - void operator() (boost::shared_ptr<broker::QueueObserver> o) - { - if (!target) { - broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); - if (p && p->getId() == id) { - target = o; - } - } - } -}; -} - - -void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state) -{ - boost::shared_ptr<broker::Queue> queue(findQueue(qname)); - ObserverFinder finder(observerId); // find this observer - queue->eachObserver<ObserverFinder &>(finder); - broker::StatefulQueueObserver *so = finder.getObserver(); - if (so) { - so->setState( state ); - QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ..."); - return; - } - QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); +void Connection::expiryId(uint64_t id) { + cluster.getExpiryPolicy().setId(id); } std::ostream& operator<<(std::ostream& o, const Connection& c) { @@ -640,7 +574,6 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { void Connection::txStart() { txBuffer.reset(new broker::TxBuffer()); } - void Connection::txAccept(const framing::SequenceSet& acked) { txBuffer->enlist(boost::shared_ptr<broker::TxAccept>( new broker::TxAccept(acked, semanticState().getUnacked()))); @@ -656,11 +589,9 @@ void Connection::txEnqueue(const std::string& queue) { new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); } -void Connection::txPublish(const framing::Array& queues, bool delivered) -{ - boost::shared_ptr<broker::TxPublish> txPub( - new broker::TxPublish(getUpdateMessage().payload)); - for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) +void Connection::txPublish(const framing::Array& queues, bool delivered) { + boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); + for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get<std::string>())); txPub->delivered = delivered; txBuffer->enlist(txPub); @@ -674,51 +605,6 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { semanticState().setAccumulatedAck(s); } -void Connection::dtxStart(const std::string& xid, - bool ended, - bool suspended, - bool failed, - bool expired) -{ - dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired)); - txBuffer = dtxBuffer; -} - -void Connection::dtxEnd() { - broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); - std::string xid = dtxBuffer->getXid(); - if (mgr.exists(xid)) - mgr.join(xid, dtxBuffer); - else - mgr.start(xid, dtxBuffer); - dtxBuffer.reset(); - txBuffer.reset(); -} - -// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords -void Connection::dtxAck() { - dtxBuffer->enlist( - boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords))); - dtxAckRecords.clear(); -} - -void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) { - // Save the association between DtxBuffers and the session so we - // can set the DtxBuffers at the end of the update when the - // DtxManager has been replicated. - updateIn.dtxBuffers.push_back( - UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState())); -} - -// Sent at end of work record. -void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout) -{ - broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); - if (timeout) mgr.setTimeout(xid, timeout); - if (prepared) mgr.prepare(xid); -} - - void Connection::exchange(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); @@ -728,6 +614,12 @@ void Connection::exchange(const std::string& encoded) { QPID_LOG(debug, cluster << " updated exchange " << ex->getName()); } +void Connection::queue(const std::string& encoded) { + Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf); + QPID_LOG(debug, cluster << " updated queue " << q->getName()); +} + void Connection::sessionError(uint16_t , const std::string& msg) { // Ignore errors before isOpen(), we're not multicasting yet. if (connection->isOpen()) @@ -786,23 +678,6 @@ void Connection::config(const std::string& encoded) { else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); } -void Connection::doCatchupIoCallbacks() { - // We need to process IO callbacks during the catch-up phase in - // order to service asynchronous completions for messages - // transferred during catch-up. - - if (catchUp) getBrokerConnection()->doIoCallbacks(); -} - -void Connection::clock(uint64_t time) { - QPID_LOG(debug, "Cluster connection received time update"); - cluster.clock(time); -} - -void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) { - boost::shared_ptr<broker::Queue> queue(findQueue(qname)); - queue->setDequeueSincePurge(dequeueSincePurge); -} }} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index fe66b77238..7ee85bf1aa 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,12 +24,11 @@ #include "types.h" #include "OutputInterceptor.h" +#include "EventFrame.h" #include "McastFrameHandler.h" #include "UpdateReceiver.h" -#include "qpid/RefCounted.h" #include "qpid/broker/Connection.h" -#include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SecureConnection.h" #include "qpid/broker/SemanticState.h" #include "qpid/amqp_0_10/Connection.h" @@ -48,7 +47,7 @@ namespace framing { class AMQFrame; } namespace broker { class SemanticState; -struct QueuedMessage; +class QueuedMessage; class TxBuffer; class TxAccept; } @@ -56,7 +55,6 @@ class TxAccept; namespace cluster { class Cluster; class Event; -struct EventFrame; /** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : @@ -64,7 +62,7 @@ class Connection : public sys::ConnectionInputHandler, public framing::AMQP_AllOperations::ClusterConnectionHandler, private broker::Connection::ErrorListener - + { public: @@ -75,7 +73,7 @@ class Connection : Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external); ~Connection(); - + ConnectionId getId() const { return self; } broker::Connection* getBrokerConnection() { return connection.get(); } const broker::Connection* getBrokerConnection() const { return connection.get(); } @@ -110,9 +108,9 @@ class Connection : void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); - + // ==== Used in catch-up mode to build initial state. - // + // // State update methods. void shadowPrepare(const std::string&); @@ -124,11 +122,10 @@ class Connection : const framing::SequenceNumber& expected, const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete, - bool dtxSelected); - + const SequenceSet& receivedIncomplete); + void outputTask(uint16_t channel, const std::string& name); - + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& managementId, @@ -156,7 +153,7 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); - void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); + void expiryId(uint64_t); void txStart(); void txAccept(const framing::SequenceSet&); @@ -166,18 +163,8 @@ class Connection : void txEnd(); void accumulatedAck(const framing::SequenceSet&); - // Dtx state - void dtxStart(const std::string& xid, - bool ended, - bool suspended, - bool failed, - bool expired); - void dtxEnd(); - void dtxAck(); - void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended); - void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout); - - // Encoded exchange replication. + // Encoded queue/exchange replication. + void queue(const std::string& encoded); void exchange(const std::string& encoded); void giveReadCredit(int credit); @@ -202,12 +189,6 @@ class Connection : void setSecureConnection ( broker::SecureConnection * sc ); - void doCatchupIoCallbacks(); - - void clock(uint64_t time); - - void queueDequeueSincePurgeState(const std::string&, uint32_t); - private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -252,7 +233,7 @@ class Connection : // Error listener functions void connectionError(const std::string&); void sessionError(uint16_t channel, const std::string&); - + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverDoOutput(uint32_t limit); @@ -264,11 +245,10 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); void closeUpdated(); - void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &); + Cluster& cluster; ConnectionId self; bool catchUp; - bool announced; OutputInterceptor output; framing::FrameDecoder localDecoder; ConnectionCtor connectionCtor; @@ -276,9 +256,6 @@ class Connection : framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; - boost::shared_ptr<broker::DtxBuffer> dtxBuffer; - broker::DeliveryRecords dtxAckRecords; - broker::DtxWorkRecord* dtxCurrent; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; UpdateReceiver& updateIn; diff --git a/cpp/src/qpid/cluster/Decoder.h b/cpp/src/qpid/cluster/Decoder.h index 3b5ada4a81..2e2af2868f 100644 --- a/cpp/src/qpid/cluster/Decoder.h +++ b/cpp/src/qpid/cluster/Decoder.h @@ -31,7 +31,7 @@ namespace qpid { namespace cluster { -struct EventFrame; +class EventFrame; class EventHeader; /** diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h index a417b2ec25..de8cedafb3 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.h +++ b/cpp/src/qpid/cluster/ErrorCheck.h @@ -33,7 +33,7 @@ namespace qpid { namespace cluster { -struct EventFrame; +class EventFrame; class Cluster; class Multicaster; class Connection; diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index da2bc89d8c..cd775ce2f1 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,6 @@ #include "qpid/cluster/Cpg.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/RefCountedBuffer.h" #include "qpid/assert.h" #include <ostream> #include <iterator> diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 13283edff7..07f74d3ba5 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,7 @@ */ #include "qpid/cluster/types.h" -#include "qpid/BufferRef.h" +#include "qpid/RefCountedBuffer.h" #include "qpid/framing/AMQFrame.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -53,7 +53,7 @@ class EventHeader { /** Size of payload data, excluding header. */ size_t getSize() const { return size; } - /** Size of header + payload. */ + /** Size of header + payload. */ size_t getStoreSize() const { return size + HEADER_SIZE; } bool isCluster() const { return connectionId.getNumber() == 0; } @@ -62,7 +62,7 @@ class EventHeader { protected: static const size_t HEADER_SIZE; - + EventType type; ConnectionId connectionId; size_t size; @@ -86,25 +86,25 @@ class Event : public EventHeader { /** Create a control event. */ static Event control(const framing::AMQFrame&, const ConnectionId&); - + // Data excluding header. - char* getData() { return store.begin() + HEADER_SIZE; } - const char* getData() const { return store.begin() + HEADER_SIZE; } + char* getData() { return store + HEADER_SIZE; } + const char* getData() const { return store + HEADER_SIZE; } // Store including header - char* getStore() { return store.begin(); } - const char* getStore() const { return store.begin(); } - - const framing::AMQFrame& getFrame() const; + char* getStore() { return store; } + const char* getStore() const { return store; } + const framing::AMQFrame& getFrame() const; + operator framing::Buffer() const; iovec toIovec() const; - + private: void encodeHeader() const; - BufferRef store; + RefCountedBuffer::pointer store; mutable framing::AMQFrame frame; }; diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index 6b702a9bf8..61447c5525 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,7 +48,7 @@ struct EventFrame ConnectionId connectionId; - framing::AMQFrame frame; + framing::AMQFrame frame; int readCredit; ///< last frame in an event, give credit when processed. EventType type; }; diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp index 0ef5c2a35d..d9a7b0122a 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -21,21 +21,106 @@ #include "qpid/broker/Message.h" #include "qpid/cluster/ExpiryPolicy.h" -#include "qpid/cluster/Cluster.h" +#include "qpid/cluster/Multicaster.h" +#include "qpid/framing/ClusterMessageExpiredBody.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {} +ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) + : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} +struct ExpiryTask : public sys::TimerTask { + ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) + : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {} + void fire() { expiryPolicy->sendExpire(expiryId); } + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + const uint64_t expiryId; +}; + +// Called while receiving an update +void ExpiryPolicy::setId(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + expiryId = id; +} + +// Called while giving an update +uint64_t ExpiryPolicy::getId() const { + sys::Mutex::ScopedLock l(lock); + return expiryId; +} + +// Called in enqueuing connection thread +void ExpiryPolicy::willExpire(broker::Message& m) { + uint64_t id; + { + // When messages are fanned out to multiple queues, update sends + // them as independenty messages so we can have multiple messages + // with the same expiry ID. + // + sys::Mutex::ScopedLock l(lock); + id = expiryId++; + if (!id) { // This is an update of an already-expired message. + m.setExpiryPolicy(expiredPolicy); + } + else { + assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); + // If this is an update, the id may already exist + unexpiredById.insert(IdMessageMap::value_type(id, &m)); + unexpiredByMessage[&m] = id; + } + } + timer.add(new ExpiryTask(this, id, m.getExpiration())); +} + +// Called in dequeueing connection thread +void ExpiryPolicy::forget(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); + MessageIdMap::iterator i = unexpiredByMessage.find(&m); + assert(i != unexpiredByMessage.end()); + unexpiredById.erase(i->second); + unexpiredByMessage.erase(i); +} + +// Called in dequeueing connection or cleanup thread. bool ExpiryPolicy::hasExpired(broker::Message& m) { - return m.getExpiration() < cluster.getClusterTime(); + sys::Mutex::ScopedLock l(lock); + return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); +} + +// Called in timer thread +void ExpiryPolicy::sendExpire(uint64_t id) { + { + sys::Mutex::ScopedLock l(lock); + // Don't multicast an expiry notice if message is already forgotten. + if (unexpiredById.find(id) == unexpiredById.end()) return; + } + mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); } -sys::AbsTime ExpiryPolicy::getCurrentTime() { - return cluster.getClusterTime(); +// Called in CPG deliver thread. +void ExpiryPolicy::deliverExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id); + IdMessageMap::iterator i = expired.first; + while (i != expired.second) { + i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; + unexpiredByMessage.erase(i->second); + unexpiredById.erase(i++); + } } +// Called in update thread on the updater. +boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); + MessageIdMap::iterator i = unexpiredByMessage.find(&m); + return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second; +} + +bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } +void ExpiryPolicy::Expired::willExpire(broker::Message&) { } + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h index d8ddbca8b3..77a656aa68 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -36,8 +36,12 @@ namespace broker { class Message; } +namespace sys { +class Timer; +} + namespace cluster { -class Cluster; +class Multicaster; /** * Cluster expiry policy @@ -45,13 +49,43 @@ class Cluster; class ExpiryPolicy : public broker::ExpiryPolicy { public: - ExpiryPolicy(Cluster& cluster); + ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&); + void willExpire(broker::Message&); bool hasExpired(broker::Message&); - qpid::sys::AbsTime getCurrentTime(); + void forget(broker::Message&); + + // Send expiration notice to cluster. + void sendExpire(uint64_t); + // Cluster delivers expiry notice. + void deliverExpire(uint64_t); + + void setId(uint64_t id); + uint64_t getId() const; + + boost::optional<uint64_t> getId(broker::Message&); + private: - Cluster& cluster; + typedef std::map<broker::Message*, uint64_t> MessageIdMap; + // When messages are fanned out to multiple queues, update sends + // them as independenty messages so we can have multiple messages + // with the same expiry ID. + typedef std::multimap<uint64_t, broker::Message*> IdMessageMap; + + struct Expired : public broker::ExpiryPolicy { + bool hasExpired(broker::Message&); + void willExpire(broker::Message&); + }; + + mutable sys::Mutex lock; + MessageIdMap unexpiredByMessage; + IdMessageMap unexpiredById; + uint64_t expiryId; + boost::intrusive_ptr<Expired> expiredPolicy; + Multicaster& mcast; + MemberId memberId; + sys::Timer& timer; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp index cfbe34a460..84232dac1b 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,10 +39,8 @@ using namespace broker; using namespace framing; const string FailoverExchange::typeName("amq.failover"); - -FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) - : Exchange(typeName, parent, b ), ready(false) -{ + +FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) { if (mgmtExchange != 0) mgmtExchange->set_type(typeName); } @@ -55,17 +53,16 @@ void FailoverExchange::setUrls(const vector<Url>& u) { void FailoverExchange::updateUrls(const vector<Url>& u) { Lock l(lock); urls=u; - if (ready && !urls.empty()) { - std::for_each(queues.begin(), queues.end(), - boost::bind(&FailoverExchange::sendUpdate, this, _1)); - } + if (urls.empty()) return; + std::for_each(queues.begin(), queues.end(), + boost::bind(&FailoverExchange::sendUpdate, this, _1)); } string FailoverExchange::getType() const { return typeName; } bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { Lock l(lock); - if (ready) sendUpdate(queue); + sendUpdate(queue); return queues.insert(queue).second; } @@ -87,7 +84,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { // Called with lock held. if (urls.empty()) return; framing::Array array(0x95); - for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) + for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); const ProtocolVersion v; boost::intrusive_ptr<Message> msg(new Message); @@ -99,12 +96,9 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array); AMQFrame headerFrame(header); headerFrame.setFirstSegment(false); - msg->getFrames().append(headerFrame); + msg->getFrames().append(headerFrame); DeliverableMessage(msg).deliverTo(queue); } -void FailoverExchange::setReady() { - ready = true; -} }} // namespace cluster diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h index c3e50c6929..2e1edfc0ae 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.h +++ b/cpp/src/qpid/cluster/FailoverExchange.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -46,8 +46,6 @@ class FailoverExchange : public broker::Exchange void setUrls(const std::vector<Url>&); /** Set the URLs and send an update.*/ void updateUrls(const std::vector<Url>&); - /** Flag the failover exchange as ready to generate updates (caught up) */ - void setReady(); // Exchange overrides std::string getType() const; @@ -58,7 +56,7 @@ class FailoverExchange : public broker::Exchange private: void sendUpdate(const boost::shared_ptr<broker::Queue>&); - + typedef sys::Mutex::ScopedLock Lock; typedef std::vector<Url> Urls; typedef std::set<boost::shared_ptr<broker::Queue> > Queues; @@ -66,7 +64,7 @@ class FailoverExchange : public broker::Exchange sys::Mutex lock; Urls urls; Queues queues; - bool ready; + }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 217641841c..8916de9628 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -21,7 +21,6 @@ #include "qpid/cluster/Multicaster.h" #include "qpid/cluster/Cpg.h" -#include "qpid/cluster/Cluster.h" #include "qpid/log/Statement.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" @@ -59,7 +58,7 @@ void Multicaster::mcast(const Event& e) { return; } } - QPID_LOG_IF(trace, e.isControl() && Cluster::loggable(e.getFrame()), "MCAST " << e); + QPID_LOG(trace, "MCAST " << e); if (bypass) { // direct, don't queue iovec iov = e.toIovec(); while (!cpg.mcast(&iov, 1)) diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 4bf03eefa2..1354dab17b 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,11 +45,12 @@ void OutputInterceptor::send(framing::AMQFrame& f) { } void OutputInterceptor::activateOutput() { - sys::Mutex::ScopedLock l(lock); - if (parent.isCatchUp()) + if (parent.isCatchUp()) { + sys::Mutex::ScopedLock l(lock); next->activateOutput(); + } else - sendDoOutput(sendMax, l); + sendDoOutput(sendMax); } void OutputInterceptor::abort() { @@ -65,38 +66,29 @@ void OutputInterceptor::giveReadCredit(int32_t credit) { } // Called in write thread when the IO layer has no more data to write. -// We only process IO callbacks in the write thread during catch-up. -// Normally we run doOutput only on delivery of doOutput requests. -bool OutputInterceptor::doOutput() { - parent.doCatchupIoCallbacks(); - return false; -} +// We do nothing in the write thread, we run doOutput only on delivery +// of doOutput requests. +bool OutputInterceptor::doOutput() { return false; } -// Send output up to limit, calculate new limit. +// Send output up to limit, calculate new limit. void OutputInterceptor::deliverDoOutput(uint32_t limit) { - sys::Mutex::ScopedLock l(lock); sentDoOutput = false; sendMax = limit; size_t newLimit = limit; if (parent.isLocal()) { - size_t buffered = next->getBuffered(); + size_t buffered = getBuffered(); if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit. - newLimit = sendMax*2; + newLimit = sendMax*2; else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit. newLimit = (sendMax + sent) / 2; } sent = 0; - while (sent < limit) { - { - sys::Mutex::ScopedUnlock u(lock); - if (!parent.getBrokerConnection()->doOutput()) break; - } + while (sent < limit && parent.getBrokerConnection()->doOutput()) ++sent; - } - if (sent == limit) sendDoOutput(newLimit, l); + if (sent == limit) sendDoOutput(newLimit); } -void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) { +void OutputInterceptor::sendDoOutput(size_t newLimit) { if (parent.isLocal() && !sentDoOutput && !closing) { sentDoOutput = true; parent.getCluster().getMulticast().mcastControl( @@ -105,7 +97,6 @@ void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLo } } -// Called in connection thread when local connection closes. void OutputInterceptor::closeOutput() { sys::Mutex::ScopedLock l(lock); closing = true; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h index 3abf5273a0..65bd82a4fc 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/cpp/src/qpid/cluster/OutputInterceptor.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -58,13 +58,13 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { uint32_t getSendMax() const { return sendMax; } void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; } - + cluster::Connection& parent; - + private: typedef sys::Mutex::ScopedLock Locker; - void sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&); + void sendDoOutput(size_t newLimit); mutable sys::Mutex lock; bool closing; diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp index 2672d8360c..6ddef66226 100644 --- a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp +++ b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,7 +48,7 @@ SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, cons if (clusterCodec) { SecureConnectionPtr sc(new SecureConnection()); clusterCodec->setSecureConnection(sc.get()); - sc->setCodec(codec); + sc->setCodec(codec); return sc.release(); } return 0; @@ -63,7 +63,7 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, if (clusterCodec) { SecureConnectionPtr sc(new SecureConnection()); clusterCodec->setSecureConnection(sc.get()); - sc->setCodec(codec); + sc->setCodec(codec); return sc.release(); } return 0; diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 2446c12f2b..8f751add9b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,9 +26,9 @@ #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" #include "qpid/cluster/UpdateDataExchange.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/client/SessionImpl.h" #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Future.h" #include "qpid/broker/Broker.h" @@ -45,13 +45,10 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/TxOpVisitor.h" #include "qpid/broker/DtxAck.h" -#include "qpid/broker/DtxBuffer.h" -#include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/RecoveredEnqueue.h" -#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -67,7 +64,6 @@ #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> -#include <iterator> #include <sstream> namespace qpid { @@ -86,20 +82,11 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; -// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("x-qpid.cluster-update"); -// Name for header used to carry expiration information. -const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration"; -// Headers used to flag headers/properties added by the UpdateClient so they can be -// removed on the other side. -const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props"; -const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers"; - std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { return o << "cluster(" << c.updaterId << " UPDATER)"; } -struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler +struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -133,7 +120,7 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, + broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, @@ -147,11 +134,13 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con UpdateClient::~UpdateClient() {} +// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. +const std::string UpdateClient::UPDATE("qpid.cluster-update"); + void UpdateClient::run() { try { connection.open(updateeUrl, connectionSettings); session = connection.newSession(UPDATE); - session.sync(); update(); done(); } catch (const std::exception& e) { @@ -165,13 +154,6 @@ void UpdateClient::update() { << " at " << updateeUrl); Broker& b = updaterBroker; - if(b.getExpiryPolicy()) { - QPID_LOG(debug, *this << "Updating updatee with cluster time"); - qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime(); - int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime); - ClusterConnectionProxy(session).clock(time); - } - updateManagementSetupState(); b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); @@ -181,20 +163,16 @@ void UpdateClient::update() { // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - - // some Queue Observers need session state & msgs synced first, so sync observers now - b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); + session.queueDelete(arg::queue=UPDATE); // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); + ClusterConnectionProxy(session).expiryId(expiry.getId()); updateLinks(); updateManagementAgent(); - updateDtxManager(); - session.queueDelete(arg::queue=UPDATE); session.close(); @@ -206,7 +184,7 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the @@ -298,7 +276,7 @@ class MessageUpdater { framing::SequenceNumber lastPos; client::AsyncSession session; ExpiryPolicy& expiry; - + public: MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) { @@ -315,6 +293,7 @@ class MessageUpdater { } } + void updateQueuedMessage(const broker::QueuedMessage& message) { // Send the queue position if necessary. if (!haveLastPos || message.position - lastPos != 1) { @@ -323,23 +302,10 @@ class MessageUpdater { } lastPos = message.position; - // if the ttl > 0, we need to send the calculated expiration time to the updatee - const DeliveryProperties* dprops = - message.payload->getProperties<DeliveryProperties>(); - if (dprops && dprops->getTtl() > 0) { - bool hadMessageProps = - message.payload->hasProperties<framing::MessageProperties>(); - const framing::MessageProperties* mprops = - message.payload->getProperties<framing::MessageProperties>(); - bool hadApplicationHeaders = mprops->hasApplicationHeaders(); - message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION, - sys::Duration(sys::EPOCH, message.payload->getExpiration())); - // If message properties or application headers didn't exist - // prior to us adding data, we want to remove them on the other side. - if (!hadMessageProps) - message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); - else if (!hadApplicationHeaders) - message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0); + // Send the expiry ID if necessary. + if (message.payload->getProperties<DeliveryProperties>()->getTtl()) { + boost::optional<uint64_t> expiryId = expiry.getId(*message.payload); + ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0); } // We can't send a broker::Message via the normal client API, @@ -352,7 +318,7 @@ class MessageUpdater { framing::MessageTransferBody transfer( *message.payload->getFrames().as<framing::MessageTransferBody>()); transfer.setDestination(UpdateClient::UPDATE); - + sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ @@ -360,10 +326,9 @@ class MessageUpdater { uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); - morecontent = message.payload->getContentFrame( - *(message.queue), frame, maxContentSize, offset); + morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); sb.get()->sendRawFrame(frame); } } @@ -392,8 +357,6 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); } - - ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge()); } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { @@ -409,11 +372,7 @@ void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue } void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) { - if (binding.exchange.size()) - s.exchangeBind(queue, binding.exchange, binding.key, binding.args); - //else its the default exchange and there is no need to replicate - //the binding, the creation of the queue will have done so - //automatically + s.exchangeBind(queue, binding.exchange, binding.key, binding.args); } void UpdateClient::updateOutputTask(const sys::OutputTask* task) { @@ -421,8 +380,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); - ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag()); - QPID_LOG(debug, *this << " updating output task " << ci->getTag() + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); + QPID_LOG(debug, *this << " updating output task " << ci->getName() << " channel=" << channel); } @@ -430,7 +389,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); - + // Send the management ID first on the main connection. std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId(); ClusterConnectionProxy(session).shadowPrepare(mgmtId); @@ -467,7 +426,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating session " << ss->getId()); - // Create a client session to update session state. + // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); simpl->disableAutoDetach(); @@ -486,19 +445,19 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), - boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession)); + boost::bind(&UpdateClient::updateUnacked, this, _1)); - updateTransactionState(ss->getSemanticState()); + updateTxState(ss->getSemanticState()); // Tx transaction state. // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; - if (inProgress) + if (inProgress) --received; // Sync the session to ensure all responses from broker have been processed. shadowSession.sync(); - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, @@ -507,8 +466,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete(), - ss->getSemanticState().getDtxSelected() + ss->receiverGetIncomplete() ); // Send frames for partial message in progress. @@ -521,13 +479,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), - arg::destination = ci->getTag(), + arg::destination = ci->getName(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, arg::exclusive = ci->isExclusive(), @@ -535,32 +493,29 @@ void UpdateClient::updateConsumer( arg::resumeTtl = ci->getResumeTtl(), arg::arguments = ci->getArguments() ); - shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); ClusterConnectionProxy(shadowSession).consumerState( - ci->getTag(), + ci->getName(), ci->isBlocked(), ci->isNotifyEnabled(), ci->position ); consumerNumbering.add(ci.get()); - QPID_LOG(debug, *this << " updated consumer " << ci->getTag() + QPID_LOG(debug, *this << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } - -void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, - client::AsyncSession& updateSession) -{ - if (!dr.isEnded() && dr.isAcquired()) { - assert(dr.getMessage().payload); + +void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { + if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. // - MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); } - ClusterConnectionProxy(updateSession).deliveryRecord( + ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, dr.getTag(), @@ -581,12 +536,10 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} - void operator()(const broker::DtxAck& ack) { - std::for_each(ack.getPending().begin(), ack.getPending().end(), - boost::bind(&UpdateClient::updateUnacked, &parent, _1, session)); - proxy.dtxAck(); + void operator()(const broker::DtxAck& ) { + throw InternalErrorException("DTX transactions not currently supported by cluster."); } - + void operator()(const broker::RecoveredDequeue& rdeq) { updateMessage(rdeq.getMessage()); proxy.txEnqueue(rdeq.getQueue()->getName()); @@ -601,18 +554,13 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { proxy.txAccept(txAccept.getAcked()); } - typedef std::list<Queue::shared_ptr> QueueList; - - void copy(const QueueList& l, Array& a) { - for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i) - a.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); - } - void operator()(const broker::TxPublish& txPub) { updateMessage(txPub.getMessage()); - assert(txPub.getQueues().empty() || txPub.getPrepared().empty()); + typedef std::list<Queue::shared_ptr> QueueList; + const QueueList& qlist = txPub.getQueues(); Array qarray(TYPE_CODE_STR8); - copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray); + for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) + qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); proxy.txPublish(qarray, txPub.delivered); } @@ -621,44 +569,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { client::AsyncSession session; ClusterConnectionProxy proxy; }; - -void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) -{ - ClusterConnectionProxy proxy(shadowSession); - broker::DtxWorkRecord* record = - updaterBroker.getDtxManager().getWork(dtx->getXid()); - proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); - -} - -void UpdateClient::updateTransactionState(broker::SemanticState& s) { + +void UpdateClient::updateTxState(broker::SemanticState& s) { + QPID_LOG(debug, *this << " updating TX transaction state."); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); - broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); - broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); - if (dtx) { - updateBufferRef(dtx, false); // Current transaction. - } else if (tx) { + broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); + if (txBuffer) { proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); - tx->accept(updater); + txBuffer->accept(updater); proxy.txEnd(); } - for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); - i != s.getSuspendedXids().end(); - ++i) - { - updateBufferRef(i->second, true); - } -} - -void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { - ClusterConnectionProxy proxy(session); - proxy.dtxStart( - dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired()); - TxOpUpdater updater(*this, session, expiry); - dtx->accept(updater); - proxy.dtxEnd(); } void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { @@ -693,35 +615,4 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) ClusterConnectionProxy(session).config(encode(*bridge)); } -void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q) -{ - q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1)); -} - -void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, - boost::shared_ptr<broker::QueueObserver> o) -{ - qpid::framing::FieldTable state; - broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); - if (so) { - so->getState( state ); - std::string id(so->getId()); - QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); - ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); - } -} - -void UpdateClient::updateDtxManager() { - broker::DtxManager& dtm = updaterBroker.getDtxManager(); - dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1)); -} - -void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) { - QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid()); - for (size_t i = 0; i < r.size(); ++i) - updateDtxBuffer(r[i]); - ClusterConnectionProxy(session).dtxWorkRecord( - r.getXid(), r.isPrepared(), r.getTimeout()); -} - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 481ee357c7..7520bb82cb 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,7 +34,7 @@ namespace qpid { -struct Url; +class Url; namespace broker { @@ -42,8 +42,8 @@ class Broker; class Queue; class Exchange; class QueueBindings; -struct QueueBinding; -struct QueuedMessage; +class QueueBinding; +class QueuedMessage; class SessionHandler; class DeliveryRecord; class SessionState; @@ -51,8 +51,7 @@ class SemanticState; class Decoder; class Link; class Bridge; -class QueueObserver; -class DtxBuffer; + } // namespace broker namespace cluster { @@ -69,26 +68,21 @@ class ExpiryPolicy; class UpdateClient : public sys::Runnable { public: static const std::string UPDATE; // Name for special update queue and exchange. - static const std::string X_QPID_EXPIRATION; // Update message expiration - // Flag to remove props/headers that were added by the UpdateClient - static const std::string X_QPID_NO_MESSAGE_PROPS; - static const std::string X_QPID_NO_HEADERS; - static client::Connection catchUpConnection(); - + UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry, const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&, const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail, - const client::ConnectionSettings& + const client::ConnectionSettings& ); ~UpdateClient(); void update(); void run(); // Will delete this when finished. - void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&); + void updateUnacked(const broker::DeliveryRecord&); private: void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&); @@ -100,8 +94,7 @@ class UpdateClient : public sys::Runnable { void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding); void updateConnection(const boost::intrusive_ptr<Connection>& connection); void updateSession(broker::SessionHandler& s); - void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended); - void updateTransactionState(broker::SemanticState& s); + void updateTxState(broker::SemanticState& s); void updateOutputTask(const sys::OutputTask* task); void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); void updateQueueListeners(const boost::shared_ptr<broker::Queue>&); @@ -111,11 +104,6 @@ class UpdateClient : public sys::Runnable { void updateLinks(); void updateLink(const boost::shared_ptr<broker::Link>&); void updateBridge(const boost::shared_ptr<broker::Bridge>&); - void updateQueueObservers(const boost::shared_ptr<broker::Queue>&); - void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>); - void updateDtxManager(); - void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& ); - void updateDtxWorkRecord(const broker::DtxWorkRecord&); Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering; diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp index e5cd82e3d3..2a079b8881 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -36,8 +36,13 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents") const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); +std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) { + return o << "cluster(" << c.clusterId << " UPDATER)"; +} + UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : - Exchange(EXCHANGE_NAME, &cluster) + Exchange(EXCHANGE_NAME, &cluster), + clusterId(cluster.getId()) {} void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, @@ -57,9 +62,11 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size()); agent->importAgents(buf1); + QPID_LOG(debug, *this << " updated management agents."); framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size()); agent->importSchemas(buf2); + QPID_LOG(debug, *this << " updated management schemas."); using amqp_0_10::ListCodec; using types::Variant; @@ -71,6 +78,7 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen new management::ManagementAgent::DeletedObject(*i))); } agent->importDeletedObjects(objects); + QPID_LOG(debug, *this << " updated management deleted objects."); } diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h index d2f6c35ad0..8c493e400a 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -74,9 +74,11 @@ class UpdateDataExchange : public broker::Exchange void updateManagementAgent(management::ManagementAgent* agent); private: + MemberId clusterId; std::string managementAgents; std::string managementSchemas; std::string managementDeletedObjects; + friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp index cb1376004e..11937f296f 100644 --- a/cpp/src/qpid/cluster/UpdateExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,7 +19,6 @@ * */ #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" #include "UpdateExchange.h" @@ -28,8 +27,6 @@ namespace cluster { using framing::MessageTransferBody; using framing::DeliveryProperties; -using framing::MessageProperties; -using framing::FieldTable; UpdateExchange::UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), @@ -37,7 +34,6 @@ UpdateExchange::UpdateExchange(management::Manageable* parent) void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) { - // Copy exchange name to destination property. MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>(); assert(transfer); const DeliveryProperties* props = msg->getProperties<DeliveryProperties>(); @@ -46,23 +42,6 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& transfer->setDestination(props->getExchange()); else transfer->clearDestinationFlag(); - - // Copy expiration from x-property if present. - if (msg->hasProperties<MessageProperties>()) { - const MessageProperties* mprops = msg->getProperties<MessageProperties>(); - if (mprops->hasApplicationHeaders()) { - const FieldTable& headers = mprops->getApplicationHeaders(); - if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) { - msg->setExpiration( - sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION))); - msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION); - // Erase props/headers that were added by the UpdateClient - if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS)) - msg->eraseProperties<MessageProperties>(); - else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS)) - msg->clearApplicationHeadersFlag(); - } - } - } } + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h index 81ee3a5ffe..7e8ce47662 100644 --- a/cpp/src/qpid/cluster/UpdateReceiver.h +++ b/cpp/src/qpid/cluster/UpdateReceiver.h @@ -39,20 +39,6 @@ class UpdateReceiver { /** Management-id for the next shadow connection */ std::string nextShadowMgmtId; - - /** Record the position of a DtxBuffer in the DtxManager (xid + index) - * and the association with a session, either suspended or current. - */ - struct DtxBufferRef { - std::string xid; - uint32_t index; // Index in WorkRecord in DtxManager - bool suspended; // Is this a suspended or current transaction? - broker::SemanticState* semanticState; // Associated session - DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss) - : xid(x), index(i), suspended(s), semanticState(ss) {} - }; - typedef std::vector<DtxBufferRef> DtxBuffers; - DtxBuffers dtxBuffers; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index bfb4fd5b9e..0795e5e77a 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -24,7 +24,6 @@ #include "config.h" #include "qpid/Url.h" -#include "qpid/RefCounted.h" #include "qpid/sys/IntegerTypes.h" #include <boost/intrusive_ptr.hpp> #include <utility> |