summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-04-28 12:25:59 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-04-28 12:25:59 +0000
commit9b7442210d74846fac84e5e86236f0f2fc21886c (patch)
tree6269e80bae30d0bf18f2ad72b8943f14f3bcaf6a /cpp/src
parent55c1e336b7ba8f30a9c673f59150eb75ff62505e (diff)
downloadqpid-python-9b7442210d74846fac84e5e86236f0f2fc21886c.tar.gz
QPID-3076: enable flow control for clustered broker configurations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1097432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp8
-rw-r--r--cpp/src/qpid/broker/Queue.h5
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp135
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.h11
-rw-r--r--cpp/src/qpid/broker/StatefulQueueObserver.h63
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp43
-rw-r--r--cpp/src/qpid/cluster/Connection.h1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp23
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h3
-rw-r--r--cpp/src/tests/QueueFlowLimitTest.cpp2
-rwxr-xr-xcpp/src/tests/cluster_tests.py167
-rw-r--r--cpp/src/tests/queue_flow_limit_tests.py16
13 files changed, 385 insertions, 94 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 764da735e3..240766c443 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -248,13 +248,7 @@ Broker::Broker(const Broker::Options& conf) :
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
- /** todo KAG - remove once cluster support for flow control done */
- if (isInCluster()) {
- QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default.");
- QueueFlowLimit::setDefaults(0, 0, 0);
- } else {
- QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
- }
+ QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
// If no plugin store module registered itself, set up the null store.
if (NullMessageStore::isNullStore(store.get()))
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 73d52ec9ca..c4f1bcc07e 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -348,6 +348,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bindings.eachBinding(f);
}
+ /** Apply f to each Observer on the queue */
+ template <class F> void eachObserver(F f) {
+ std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
+ }
+
/** Set the position sequence number for the next message on the queue.
* Must be >= the current sequence number.
* Used by cluster to replicate queues.
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 3494288f7b..20679972ff 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -92,7 +92,7 @@ namespace {
QueueFlowLimit::QueueFlowLimit(Queue *_queue,
uint32_t _flowStopCount, uint32_t _flowResumeCount,
uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : queue(_queue), queueName("<unknown>"),
+ : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
@@ -123,8 +123,6 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
void QueueFlowLimit::enqueued(const QueuedMessage& msg)
{
- if (!msg.payload) return;
-
sys::Mutex::ScopedLock l(indexLock);
++count;
@@ -152,7 +150,9 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
}
QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
- index.insert(msg.payload);
+ bool unique;
+ unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+ assert(unique);
}
}
@@ -160,8 +160,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
void QueueFlowLimit::dequeued(const QueuedMessage& msg)
{
- if (!msg.payload) return;
-
sys::Mutex::ScopedLock l(indexLock);
if (count > 0) {
@@ -189,16 +187,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
if (!index.empty()) {
if (!flowStopped) {
// flow enabled - release all pending msgs
- while (!index.empty()) {
- std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
- (*itr)->getIngressCompletion().finishCompleter();
- index.erase(itr);
- }
+ for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ itr != index.end(); ++itr)
+ if (itr->second)
+ itr->second->getIngressCompletion().finishCompleter();
+ index.clear();
} else {
// even if flow controlled, we must release this msg as it is being dequeued
- std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
+ std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
if (itr != index.end()) { // this msg is flow controlled, release it:
- (*itr)->getIngressCompletion().finishCompleter();
+ msg.payload->getIngressCompletion().finishCompleter();
index.erase(itr);
}
}
@@ -206,34 +204,6 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
}
-/** used by clustering: is the given message's completion blocked due to flow
- * control? True if message is blocked. (for the clustering updater: done
- * after msgs have been replicated to the updatee).
- */
-bool QueueFlowLimit::getState(const QueuedMessage& msg) const
-{
- sys::Mutex::ScopedLock l(indexLock);
- return (index.find(msg.payload) != index.end());
-}
-
-
-/** artificially force the flow control state of a given message
- * (for the clustering updatee: done after msgs have been replicated to
- * the updatee's queue)
- */
-void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked)
-{
- if (blocked && msg.payload) {
-
- sys::Mutex::ScopedLock l(indexLock);
- assert(index.find(msg.payload) == index.end());
-
- QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC");
- index.insert(msg.payload);
- }
-}
-
-
void QueueFlowLimit::encode(Buffer& buffer) const
{
buffer.putLong(flowStopCount);
@@ -281,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint
defaultFlowStopRatio = flowStopRatio;
defaultFlowResumeRatio = flowResumeRatio;
- /** @todo Verify valid range on Broker::Options instead of here */
+ /** @todo KAG: Verify valid range on Broker::Options instead of here */
if (flowStopRatio > 100 || flowResumeRatio > 100)
throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:"
<< " flowStopRatio=" << flowStopRatio
@@ -320,14 +290,6 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F
if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
return 0;
}
- /** @todo KAG - remove once cluster support for flow control done. */
- // TODO aconway 2011-02-16: is queue==0 only in tests?
- // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */
- if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
- QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
- << queue->getName());
- return 0;
- }
return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
@@ -335,17 +297,76 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F
uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+ return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+ }
+ return 0;
+}
- /** todo KAG - remove once cluster support for flow control done. */
- if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
- QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
- << queue->getName());
- return 0;
+/* Cluster replication */
+
+namespace {
+ /** pack a set of sequence number ranges into a framing::Array */
+ void buildSeqRangeArray(qpid::framing::Array *seqs,
+ const qpid::framing::SequenceNumber first,
+ const qpid::framing::SequenceNumber last)
+ {
+ seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first)));
+ seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last)));
+ }
+}
+
+/** Runs on UPDATER to snapshot current state */
+void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
+{
+ sys::Mutex::ScopedLock l(indexLock);
+ state.clear();
+
+ framing::SequenceSet ss;
+ if (!index.empty()) {
+ /* replicate the set of messages pending flow control */
+ for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
+ itr != index.end(); ++itr) {
+ ss.add(itr->first);
}
+ framing::Array seqs(TYPE_CODE_UINT32);
+ ss.for_each(boost::bind(&buildSeqRangeArray, &seqs, _1, _2));
+ state.setArray("pendingMsgSeqs", seqs);
+ }
+ QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss);
+}
- return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+
+/** called on UPDATEE to set state from snapshot */
+void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
+{
+ sys::Mutex::ScopedLock l(indexLock);
+ index.clear();
+
+ framing::SequenceSet fcmsg;
+ framing::Array seqArray(TYPE_CODE_UINT32);
+ if (state.getArray("pendingMsgSeqs", seqArray)) {
+ assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges
+ framing::Array::const_iterator i = seqArray.begin();
+ while (i != seqArray.end()) {
+ framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>());
+ ++i;
+ framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>());
+ ++i;
+ fcmsg.add(first, last);
+ for (SequenceNumber seq = first; seq <= last; ++seq) {
+ QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
+ bool unique;
+ unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+ assert(unique);
+ }
+ }
}
- return 0;
+
+ flowStopped = index.size() != 0;
+ if (queueMgmtObj) {
+ queueMgmtObj->set_flowStopped(isFlowControlActive());
+ }
+ QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg)
}
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.h b/cpp/src/qpid/broker/QueueFlowLimit.h
index 69d91df45a..5fdae39c29 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -27,7 +27,7 @@
#include <memory>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/QueuedMessage.h"
-#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
@@ -53,7 +53,7 @@ class Broker;
* passing _either_ level may turn flow control ON, but _both_ must be
* below level before flow control will be turned OFF.
*/
- class QueueFlowLimit : public QueueObserver
+ class QueueFlowLimit : public StatefulQueueObserver
{
static uint64_t defaultMaxSize;
static uint defaultFlowStopRatio;
@@ -86,9 +86,8 @@ class Broker;
QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
/** for clustering: */
- /** true if the given message is flow controlled, and cannot be completed. */
- bool getState(const QueuedMessage&) const;
- void setState(const QueuedMessage&, bool blocked);
+ QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
+ QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&);
uint32_t getFlowStopCount() const { return flowStopCount; }
uint32_t getFlowResumeCount() const { return flowResumeCount; }
@@ -111,7 +110,7 @@ class Broker;
protected:
// msgs waiting for flow to become available.
- std::set< boost::intrusive_ptr<Message> > index;
+ std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index;
mutable qpid::sys::Mutex indexLock;
_qmfBroker::Queue *queueMgmtObj;
diff --git a/cpp/src/qpid/broker/StatefulQueueObserver.h b/cpp/src/qpid/broker/StatefulQueueObserver.h
new file mode 100644
index 0000000000..c682d460b7
--- /dev/null
+++ b/cpp/src/qpid/broker/StatefulQueueObserver.h
@@ -0,0 +1,63 @@
+#ifndef QPID_BROKER_STATEFULQUEUEOBSERVER_H
+#define QPID_BROKER_STATEFULQUEUEOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Specialized type of QueueObserver that maintains internal state that has to
+ * be replicated across clustered brokers.
+ */
+class StatefulQueueObserver : public QueueObserver
+{
+ public:
+ StatefulQueueObserver(std::string _id) : id(_id) {}
+ virtual ~StatefulQueueObserver() {}
+
+ /** This identifier must uniquely identify this particular observer amoung
+ * all observers on a queue. For cluster replication, this id will be used
+ * to identify the peer queue observer for synchronization across
+ * brokers.
+ */
+ const std::string& getId() const { return id; }
+
+ /** This method should return the observer's internal state as an opaque
+ * map.
+ */
+ virtual void getState(qpid::framing::FieldTable& state ) const = 0;
+
+ /** The input map represents the internal state of the peer observer that
+ * this observer should synchonize to.
+ */
+ virtual void setState(const qpid::framing::FieldTable&) = 0;
+
+
+ private:
+ std::string id;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_STATEFULQUEUEOBSERVER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 30d6a6d13f..0daf0c7f5a 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1058747;
+const uint32_t Cluster::CLUSTER_VERSION = 1097431;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index f2ea466a9b..b9895290e9 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -35,6 +35,7 @@
#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -558,6 +559,48 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri
}
}
+
+namespace {
+ // find a StatefulQueueObserver that matches a given identifier
+ class ObserverFinder {
+ const std::string id;
+ boost::shared_ptr<broker::QueueObserver> target;
+ ObserverFinder(const ObserverFinder&) {}
+ public:
+ ObserverFinder(const std::string& _id) : id(_id) {}
+ broker::StatefulQueueObserver *getObserver()
+ {
+ if (target)
+ return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+ return 0;
+ }
+ void operator() (boost::shared_ptr<broker::QueueObserver> o)
+ {
+ if (!target) {
+ broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (p && p->getId() == id) {
+ target = o;
+ }
+ }
+ }
+ };
+}
+
+
+void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
+{
+ boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+ ObserverFinder finder(observerId); // find this observer
+ queue->eachObserver<ObserverFinder &>(finder);
+ broker::StatefulQueueObserver *so = finder.getObserver();
+ if (so) {
+ so->setState( state );
+ QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
+ return;
+ }
+ QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
+}
+
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index a4436e84a8..04ace724da 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -153,6 +153,7 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
+ void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
void expiryId(uint64_t);
void txStart();
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 8f751add9b..a15c14ff48 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -49,6 +49,7 @@
#include "qpid/broker/TxPublish.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -167,6 +168,9 @@ void UpdateClient::update() {
boost::bind(&UpdateClient::updateConnection, this, _1));
session.queueDelete(arg::queue=UPDATE);
+ // some Queue Observers need session state & msgs synced first, so sync observers now
+ b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
+
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
@@ -615,4 +619,23 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge)
ClusterConnectionProxy(session).config(encode(*bridge));
}
+void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q)
+{
+ q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1));
+}
+
+void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
+ boost::shared_ptr<broker::QueueObserver> o)
+{
+ qpid::framing::FieldTable state;
+ broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (so) {
+ so->getState( state );
+ std::string id(so->getId());
+ QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id);
+ ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state );
+ }
+}
+
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 7520bb82cb..bbf7a948bc 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -51,6 +51,7 @@ class SemanticState;
class Decoder;
class Link;
class Bridge;
+class QueueObserver;
} // namespace broker
@@ -104,6 +105,8 @@ class UpdateClient : public sys::Runnable {
void updateLinks();
void updateLink(const boost::shared_ptr<broker::Link>&);
void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+ void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
+ void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;
diff --git a/cpp/src/tests/QueueFlowLimitTest.cpp b/cpp/src/tests/QueueFlowLimitTest.cpp
index 70184095cd..8a6923fb09 100644
--- a/cpp/src/tests/QueueFlowLimitTest.cpp
+++ b/cpp/src/tests/QueueFlowLimitTest.cpp
@@ -75,8 +75,10 @@ public:
QueuedMessage createMessage(uint32_t size)
{
+ static uint32_t seqNum;
QueuedMessage msg;
msg.payload = MessageUtils::createMessage();
+ msg.position = ++seqNum;
MessageUtils::addContent(msg.payload, std::string (size, 'x'));
return msg;
}
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 12f7a2ca9a..26298393ff 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -327,7 +327,7 @@ acl allow all all
Thread.__init__(self)
def run(self):
try:
- self.sender.send(self.msg)
+ self.sender.send(self.msg, sync=True)
self.condition.acquire()
try:
self.blocked = False
@@ -359,11 +359,12 @@ acl allow all all
ssn0 = brokers.first().connect().session()
s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
brokers.first().startQmf()
- q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- oid = q.getObjectId()
- self.assertEqual(q.name, "flq")
- self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert not q.flowStopped
+ q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q1.getObjectId()
+ self.assertEqual(q1.name, "flq")
+ self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 0)
# fill the queue on one broker until flow control is active
for x in range(5): s0.send(Message(str(x)))
@@ -371,18 +372,20 @@ acl allow all all
sender.start() # Tests that sender does block
# Verify the broker queue goes into a flowStopped state
deadline = time.time() + 1
- while not q.flowStopped and time.time() < deadline: q.update()
- assert q.flowStopped
+ while not q1.flowStopped and time.time() < deadline: q1.update()
+ assert q1.flowStopped
+ self.assertEqual(q1.flowStoppedCount, 1)
sender.assert_blocked() # Still blocked
# Now verify the both brokers in cluster have same configuration
brokers.second().startQmf()
qs = brokers.second().qmf_session.getObjects(_objectId=oid)
self.assertEqual(len(qs), 1)
- q = qs[0]
- self.assertEqual(q.name, "flq")
- self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert q.flowStopped
+ q2 = qs[0]
+ self.assertEqual(q2.name, "flq")
+ self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q2.flowStopped
+ self.assertEqual(q2.flowStoppedCount, 1)
# now drain the queue using a session to the other broker
ssn1 = brokers.second().connect().session()
@@ -392,6 +395,12 @@ acl allow all all
ssn1.acknowledge()
sender.wait() # Verify no longer blocked.
+ # and re-verify state of queue on both brokers
+ q1.update()
+ assert not q1.flowStopped
+ q2.update()
+ assert not q2.flowStopped
+
ssn0.connection.close()
ssn1.connection.close()
cluster_test_logs.verify_logs()
@@ -405,7 +414,6 @@ acl allow all all
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_cluster(self):
- return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
cluster = self.cluster(2)
class Brokers:
def first(self): return cluster[0]
@@ -413,7 +421,6 @@ acl allow all all
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_cluster_join(self):
- return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
cluster = self.cluster(1)
class Brokers:
def first(self): return cluster[0]
@@ -422,6 +429,103 @@ acl allow all all
return cluster[1]
self.queue_flowlimit_test(Brokers())
+ def test_queue_flowlimit_replicate(self):
+ """ Verify that a queue which is in flow control BUT has drained BELOW
+ the flow control 'stop' threshold, is correctly replicated when a new
+ broker is added to the cluster.
+ """
+
+ class AsyncSender(Thread):
+ """Send a fixed number of msgs from a sender in a separate thread
+ so it may block without blocking the test.
+ """
+ def __init__(self, broker, address, count=1, size=4):
+ Thread.__init__(self)
+ self.daemon = True
+ self.broker = broker
+ self.queue = address
+ self.count = count
+ self.size = size
+ self.done = False
+
+ def run(self):
+ self.sender = subprocess.Popen(["qpid-send",
+ "--capacity=1",
+ "--content-size=%s" % self.size,
+ "--messages=%s" % self.count,
+ "--failover-updates",
+ "--connection-options={reconnect:true}",
+ "--address=%s" % self.queue,
+ "--broker=%s" % self.broker.host_port()])
+ self.sender.wait()
+ self.done = True
+
+ cluster = self.cluster(2)
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}")
+
+ # fire off the sending thread to broker[0], and wait until the queue
+ # hits flow control on broker[1]
+ sender = AsyncSender(cluster[0], "flq", count=110);
+ sender.start();
+
+ cluster[1].startQmf()
+ q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ deadline = time.time() + 10
+ while not q_obj.flowStopped and time.time() < deadline:
+ q_obj.update()
+ assert q_obj.flowStopped
+ assert not sender.done
+ assert q_obj.msgDepth < 110
+
+ # Now drain enough messages on broker[1] to drop below the flow stop
+ # threshold, but not relieve flow control...
+ receiver = subprocess.Popen(["qpid-receive",
+ "--messages=15",
+ "--timeout=1",
+ "--print-content=no",
+ "--failover-updates",
+ "--connection-options={reconnect:true}",
+ "--ack-frequency=1",
+ "--address=flq",
+ "--broker=%s" % cluster[1].host_port()])
+ receiver.wait()
+ q_obj.update()
+ assert q_obj.flowStopped
+ assert not sender.done
+ current_depth = q_obj.msgDepth
+
+ # add a new broker to the cluster, and verify that the queue is in flow
+ # control on that broker
+ cluster.start()
+ cluster[2].startQmf()
+ q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ assert q_obj.flowStopped
+ assert q_obj.msgDepth == current_depth
+
+ # now drain the queue on broker[2], and verify that the sender becomes
+ # unblocked
+ receiver = subprocess.Popen(["qpid-receive",
+ "--messages=95",
+ "--timeout=1",
+ "--print-content=no",
+ "--failover-updates",
+ "--connection-options={reconnect:true}",
+ "--ack-frequency=1",
+ "--address=flq",
+ "--broker=%s" % cluster[2].host_port()])
+ receiver.wait()
+ q_obj.update()
+ assert not q_obj.flowStopped
+ assert q_obj.msgDepth == 0
+
+ # verify that the sender has become unblocked
+ sender.join(timeout=5)
+ assert not sender.isAlive()
+ assert sender.done
+
+
def test_alternate_exchange_update(self):
"""Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
cluster = self.cluster(1)
@@ -688,6 +792,41 @@ class LongTests(BrokerTest):
for i in xrange(1000): cluster[0].connect().close()
cluster_test_logs.verify_logs()
+ def test_flowlimit_failover(self):
+ """Test fail-over during continuous send-receive with flow control
+ active.
+ """
+
+ # Original cluster will all be killed so expect exit with failure
+ cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+ #for b in cluster: ErrorGenerator(b)
+
+ # create a queue with rather draconian flow control settings
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
+
+ receiver = NumberedReceiver(cluster[2])
+ receiver.start()
+ senders = [NumberedSender(cluster[i]) for i in range(1,3)]
+ for s in senders:
+ s.start()
+
+ # Kill original brokers, start new ones for the duration.
+ endtime = time.time() + self.duration();
+ i = 0
+ while time.time() < endtime:
+ cluster[i].kill()
+ i += 1
+ b = cluster.start(expect=EXPECT_EXIT_FAIL)
+ #ErrorGenerator(b)
+ time.sleep(5)
+ #b = cluster[0]
+ #b.startQmf()
+ for s in senders:
+ s.stop()
+ receiver.stop()
+ for i in range(i, len(cluster)): cluster[i].kill()
+
class StoreTests(BrokerTest):
"""
diff --git a/cpp/src/tests/queue_flow_limit_tests.py b/cpp/src/tests/queue_flow_limit_tests.py
index bdd2a21b78..6639cdc393 100644
--- a/cpp/src/tests/queue_flow_limit_tests.py
+++ b/cpp/src/tests/queue_flow_limit_tests.py
@@ -24,7 +24,7 @@ from qpid import datatypes, messaging
from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
-from time import sleep
+from time import sleep, time
from os import environ, popen
class QueueFlowLimitTests(TestBase010):
@@ -145,11 +145,10 @@ class QueueFlowLimitTests(TestBase010):
totalMsgs = 1213 + 797 + 331
# wait until flow control is active
- count = 0
+ deadline = time() + 10
while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
- count < 10:
- sleep(1);
- count += 1;
+ time() < deadline:
+ pass
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
self.assertGreater(depth, 373)
@@ -200,11 +199,10 @@ class QueueFlowLimitTests(TestBase010):
totalBytes = 439 + 631 + 823
# wait until flow control is active
- count = 0
+ deadline = time() + 10
while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
- count < 10:
- sleep(1);
- count += 1;
+ time() < deadline:
+ pass
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133)