diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-04-28 12:25:59 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-04-28 12:25:59 +0000 |
commit | 5f29278bbdc32b348847f866769dfb375761423c (patch) | |
tree | 384f45e9e5ce9a73eb295413cc0000f07139faa8 /qpid/cpp/src/qpid | |
parent | 70973bf809ce4e94d2486e082bc2ab47290ee88e (diff) | |
download | qpid-python-5f29278bbdc32b348847f866769dfb375761423c.tar.gz |
QPID-3076: enable flow control for clustered broker configurations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1097432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 135 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/StatefulQueueObserver.h | 63 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 43 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.h | 3 |
10 files changed, 223 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 764da735e3..240766c443 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -248,13 +248,7 @@ Broker::Broker(const Broker::Options& conf) : // Early-Initialize plugins Plugin::earlyInitAll(*this); - /** todo KAG - remove once cluster support for flow control done */ - if (isInCluster()) { - QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default."); - QueueFlowLimit::setDefaults(0, 0, 0); - } else { - QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); - } + QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); // If no plugin store module registered itself, set up the null store. if (NullMessageStore::isNullStore(store.get())) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 73d52ec9ca..c4f1bcc07e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -348,6 +348,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bindings.eachBinding(f); } + /** Apply f to each Observer on the queue */ + template <class F> void eachObserver(F f) { + std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); + } + /** Set the position sequence number for the next message on the queue. * Must be >= the current sequence number. * Used by cluster to replicate queues. diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 3494288f7b..20679972ff 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -92,7 +92,7 @@ namespace { QueueFlowLimit::QueueFlowLimit(Queue *_queue, uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : queue(_queue), queueName("<unknown>"), + : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) @@ -123,8 +123,6 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, void QueueFlowLimit::enqueued(const QueuedMessage& msg) { - if (!msg.payload) return; - sys::Mutex::ScopedLock l(indexLock); ++count; @@ -152,7 +150,9 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) } QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes - index.insert(msg.payload); + bool unique; + unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second; + assert(unique); } } @@ -160,8 +160,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) void QueueFlowLimit::dequeued(const QueuedMessage& msg) { - if (!msg.payload) return; - sys::Mutex::ScopedLock l(indexLock); if (count > 0) { @@ -189,16 +187,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) if (!index.empty()) { if (!flowStopped) { // flow enabled - release all pending msgs - while (!index.empty()) { - std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - (*itr)->getIngressCompletion().finishCompleter(); - index.erase(itr); - } + for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin(); + itr != index.end(); ++itr) + if (itr->second) + itr->second->getIngressCompletion().finishCompleter(); + index.clear(); } else { // even if flow controlled, we must release this msg as it is being dequeued - std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); + std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position); if (itr != index.end()) { // this msg is flow controlled, release it: - (*itr)->getIngressCompletion().finishCompleter(); + msg.payload->getIngressCompletion().finishCompleter(); index.erase(itr); } } @@ -206,34 +204,6 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) } -/** used by clustering: is the given message's completion blocked due to flow - * control? True if message is blocked. (for the clustering updater: done - * after msgs have been replicated to the updatee). - */ -bool QueueFlowLimit::getState(const QueuedMessage& msg) const -{ - sys::Mutex::ScopedLock l(indexLock); - return (index.find(msg.payload) != index.end()); -} - - -/** artificially force the flow control state of a given message - * (for the clustering updatee: done after msgs have been replicated to - * the updatee's queue) - */ -void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked) -{ - if (blocked && msg.payload) { - - sys::Mutex::ScopedLock l(indexLock); - assert(index.find(msg.payload) == index.end()); - - QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC"); - index.insert(msg.payload); - } -} - - void QueueFlowLimit::encode(Buffer& buffer) const { buffer.putLong(flowStopCount); @@ -281,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint defaultFlowStopRatio = flowStopRatio; defaultFlowResumeRatio = flowResumeRatio; - /** @todo Verify valid range on Broker::Options instead of here */ + /** @todo KAG: Verify valid range on Broker::Options instead of here */ if (flowStopRatio > 100 || flowResumeRatio > 100) throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:" << " flowStopRatio=" << flowStopRatio @@ -320,14 +290,6 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control return 0; } - /** @todo KAG - remove once cluster support for flow control done. */ - // TODO aconway 2011-02-16: is queue==0 only in tests? - // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */ - if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { - QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " - << queue->getName()); - return 0; - } return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } @@ -335,17 +297,76 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize); uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); + return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); + } + return 0; +} - /** todo KAG - remove once cluster support for flow control done. */ - if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { - QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " - << queue->getName()); - return 0; +/* Cluster replication */ + +namespace { + /** pack a set of sequence number ranges into a framing::Array */ + void buildSeqRangeArray(qpid::framing::Array *seqs, + const qpid::framing::SequenceNumber first, + const qpid::framing::SequenceNumber last) + { + seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first))); + seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last))); + } +} + +/** Runs on UPDATER to snapshot current state */ +void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const +{ + sys::Mutex::ScopedLock l(indexLock); + state.clear(); + + framing::SequenceSet ss; + if (!index.empty()) { + /* replicate the set of messages pending flow control */ + for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin(); + itr != index.end(); ++itr) { + ss.add(itr->first); } + framing::Array seqs(TYPE_CODE_UINT32); + ss.for_each(boost::bind(&buildSeqRangeArray, &seqs, _1, _2)); + state.setArray("pendingMsgSeqs", seqs); + } + QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss); +} - return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); + +/** called on UPDATEE to set state from snapshot */ +void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) +{ + sys::Mutex::ScopedLock l(indexLock); + index.clear(); + + framing::SequenceSet fcmsg; + framing::Array seqArray(TYPE_CODE_UINT32); + if (state.getArray("pendingMsgSeqs", seqArray)) { + assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges + framing::Array::const_iterator i = seqArray.begin(); + while (i != seqArray.end()) { + framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>()); + ++i; + framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>()); + ++i; + fcmsg.add(first, last); + for (SequenceNumber seq = first; seq <= last; ++seq) { + QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked + bool unique; + unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; + assert(unique); + } + } } - return 0; + + flowStopped = index.size() != 0; + if (queueMgmtObj) { + queueMgmtObj->set_flowStopped(isFlowControlActive()); + } + QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg) } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 69d91df45a..5fdae39c29 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -27,7 +27,7 @@ #include <memory> #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/QueuedMessage.h" -#include "qpid/broker/QueueObserver.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" @@ -53,7 +53,7 @@ class Broker; * passing _either_ level may turn flow control ON, but _both_ must be * below level before flow control will be turned OFF. */ - class QueueFlowLimit : public QueueObserver + class QueueFlowLimit : public StatefulQueueObserver { static uint64_t defaultMaxSize; static uint defaultFlowStopRatio; @@ -86,9 +86,8 @@ class Broker; QPID_BROKER_EXTERN void dequeued(const QueuedMessage&); /** for clustering: */ - /** true if the given message is flow controlled, and cannot be completed. */ - bool getState(const QueuedMessage&) const; - void setState(const QueuedMessage&, bool blocked); + QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const; + QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&); uint32_t getFlowStopCount() const { return flowStopCount; } uint32_t getFlowResumeCount() const { return flowResumeCount; } @@ -111,7 +110,7 @@ class Broker; protected: // msgs waiting for flow to become available. - std::set< boost::intrusive_ptr<Message> > index; + std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index; mutable qpid::sys::Mutex indexLock; _qmfBroker::Queue *queueMgmtObj; diff --git a/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h b/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h new file mode 100644 index 0000000000..c682d460b7 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h @@ -0,0 +1,63 @@ +#ifndef QPID_BROKER_STATEFULQUEUEOBSERVER_H +#define QPID_BROKER_STATEFULQUEUEOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueObserver.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { +namespace broker { + +/** + * Specialized type of QueueObserver that maintains internal state that has to + * be replicated across clustered brokers. + */ +class StatefulQueueObserver : public QueueObserver +{ + public: + StatefulQueueObserver(std::string _id) : id(_id) {} + virtual ~StatefulQueueObserver() {} + + /** This identifier must uniquely identify this particular observer amoung + * all observers on a queue. For cluster replication, this id will be used + * to identify the peer queue observer for synchronization across + * brokers. + */ + const std::string& getId() const { return id; } + + /** This method should return the observer's internal state as an opaque + * map. + */ + virtual void getState(qpid::framing::FieldTable& state ) const = 0; + + /** The input map represents the internal state of the peer observer that + * this observer should synchonize to. + */ + virtual void setState(const qpid::framing::FieldTable&) = 0; + + + private: + std::string id; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_STATEFULQUEUEOBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 30d6a6d13f..0daf0c7f5a 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1058747; +const uint32_t Cluster::CLUSTER_VERSION = 1097431; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index f2ea466a9b..b9895290e9 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -35,6 +35,7 @@ #include "qpid/broker/Fairshare.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" @@ -558,6 +559,48 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri } } + +namespace { + // find a StatefulQueueObserver that matches a given identifier + class ObserverFinder { + const std::string id; + boost::shared_ptr<broker::QueueObserver> target; + ObserverFinder(const ObserverFinder&) {} + public: + ObserverFinder(const std::string& _id) : id(_id) {} + broker::StatefulQueueObserver *getObserver() + { + if (target) + return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); + return 0; + } + void operator() (boost::shared_ptr<broker::QueueObserver> o) + { + if (!target) { + broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (p && p->getId() == id) { + target = o; + } + } + } + }; +} + + +void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state) +{ + boost::shared_ptr<broker::Queue> queue(findQueue(qname)); + ObserverFinder finder(observerId); // find this observer + queue->eachObserver<ObserverFinder &>(finder); + broker::StatefulQueueObserver *so = finder.getObserver(); + if (so) { + so->setState( state ); + QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ..."); + return; + } + QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); +} + void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index a4436e84a8..04ace724da 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -153,6 +153,7 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); + void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); void expiryId(uint64_t); void txStart(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 8f751add9b..a15c14ff48 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -49,6 +49,7 @@ #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -167,6 +168,9 @@ void UpdateClient::update() { boost::bind(&UpdateClient::updateConnection, this, _1)); session.queueDelete(arg::queue=UPDATE); + // some Queue Observers need session state & msgs synced first, so sync observers now + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); + // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); @@ -615,4 +619,23 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) ClusterConnectionProxy(session).config(encode(*bridge)); } +void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q) +{ + q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1)); +} + +void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, + boost::shared_ptr<broker::QueueObserver> o) +{ + qpid::framing::FieldTable state; + broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (so) { + so->getState( state ); + std::string id(so->getId()); + QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); + ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); + } +} + + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index 7520bb82cb..bbf7a948bc 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -51,6 +51,7 @@ class SemanticState; class Decoder; class Link; class Bridge; +class QueueObserver; } // namespace broker @@ -104,6 +105,8 @@ class UpdateClient : public sys::Runnable { void updateLinks(); void updateLink(const boost::shared_ptr<broker::Link>&); void updateBridge(const boost::shared_ptr<broker::Bridge>&); + void updateQueueObservers(const boost::shared_ptr<broker::Queue>&); + void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>); Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering; |