summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-16 18:58:06 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-16 18:58:06 +0000
commitace71e8ab4846cab5258481da7553439a73ed242 (patch)
treef7dc599c7077b9d7e3e2f874118defd0a4c0189f
parentfd96d1ca556ffdcec5fa78bb155130786ed06612 (diff)
downloadqpid-python-ace71e8ab4846cab5258481da7553439a73ed242.tar.gz
QPID-2935: move completion from msg to session cmd context.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1071360 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCompletion.h265
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h17
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp64
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h11
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp141
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h84
-rw-r--r--qpid/cpp/src/tests/MessageUtils.h14
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp2
-rw-r--r--qpid/cpp/src/tests/TxPublishTest.cpp4
-rw-r--r--qpid/cpp/src/tests/brokertest.py5
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py174
14 files changed, 560 insertions, 241 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
index c66609f8a6..1f3d11e0ee 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h
+++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
@@ -1,5 +1,5 @@
-#ifndef _Completion_
-#define _Completion_
+#ifndef _AsyncCompletion_
+#define _AsyncCompletion_
/*
*
@@ -26,148 +26,145 @@
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Monitor.h"
-#include <boost/function.hpp>
-#include <boost/shared_ptr.hpp>
namespace qpid {
- namespace broker {
-
- /**
- * Class to implement asynchronous notification of completion.
- *
- * Use-case: An "initiator" needs to wait for a set of "completers" to
- * finish a unit of work before an action can occur. This object
- * tracks the progress of the set of completers, and allows the action
- * to occur once all completers have signalled that they are done.
- *
- * The initiator and completers may be running in separate threads.
- *
- * The initiating thread is the thread that initiates the action,
- * i.e. the connection read thread.
- *
- * A completing thread is any thread that contributes to completion,
- * e.g. a store thread that does an async write.
- * There may be zero or more completers.
- *
- * When the work is complete, a callback is invoked. The callback
- * may be invoked in the Initiator thread, or one of the Completer
- * threads. The callback is passed a flag indicating whether or not
- * the callback is running under the context of the Initiator thread.
- *
- * Use model:
- * 1) Initiator thread invokes begin()
- * 2) After begin() has been invoked, zero or more Completers invoke
- * startCompleter(). Completers may be running in the same or
- * different thread as the Initiator, as long as they guarantee that
- * startCompleter() is invoked at least once before the Initiator invokes end().
- * 3) Completers may invoke finishCompleter() at any time, even after the
- * initiator has invoked end(). finishCompleter() may be called from any
- * thread.
- * 4) startCompleter()/finishCompleter() calls "nest": for each call to
- * startCompleter(), a corresponding call to finishCompleter() must be made.
- * Once the last finishCompleter() is called, the Completer must no longer
- * reference the completion object.
- * 5) The Initiator invokes end() at the point where it has finished
- * dispatching work to the Completers, and is prepared for the callback
- * handler to be invoked. Note: if there are no outstanding Completers
- * pending when the Initiator invokes end(), the callback will be invoked
- * directly, and the sync parameter will be set true. This indicates to the
- * Initiator that the callback is executing in the context of the end() call,
- * and the Initiator is free to optimize the handling of the completion,
- * assuming no need for synchronization with Completer threads.
- */
- class AsyncCompletion {
- public:
- // encapsulates the completion callback handler
- class CompletionHandler {
- public:
- virtual void operator() (bool) { /* bool == true if called via end() */}
- };
-
- private:
- mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
- mutable qpid::sys::Monitor callbackLock;
- bool inCallback;
- void invokeCallback(bool sync) {
- qpid::sys::Mutex::ScopedLock l(callbackLock);
- inCallback = true;
- if (handler) {
- boost::shared_ptr<CompletionHandler> tmp;
- tmp.swap(handler);
- {
- qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
- (*tmp)(sync);
- }
- }
- inCallback = false;
- callbackLock.notifyAll();
- }
+namespace broker {
- protected:
- /** Invoked when all completers have signalled that they have completed
- * (via calls to finishCompleter()).
- */
- boost::shared_ptr<CompletionHandler> handler;
+/**
+ * Class to implement asynchronous notification of completion.
+ *
+ * Use-case: An "initiator" needs to wait for a set of "completers" to
+ * finish a unit of work before an action can occur. This object
+ * tracks the progress of the set of completers, and allows the action
+ * to occur once all completers have signalled that they are done.
+ *
+ * The initiator and completers may be running in separate threads.
+ *
+ * The initiating thread is the thread that initiates the action,
+ * i.e. the connection read thread.
+ *
+ * A completing thread is any thread that contributes to completion,
+ * e.g. a store thread that does an async write.
+ * There may be zero or more completers.
+ *
+ * When the work is complete, a callback is invoked. The callback
+ * may be invoked in the Initiator thread, or one of the Completer
+ * threads. The callback is passed a flag indicating whether or not
+ * the callback is running under the context of the Initiator thread.
+ *
+ * Use model:
+ * 1) Initiator thread invokes begin()
+ * 2) After begin() has been invoked, zero or more Completers invoke
+ * startCompleter(). Completers may be running in the same or
+ * different thread as the Initiator, as long as they guarantee that
+ * startCompleter() is invoked at least once before the Initiator invokes end().
+ * 3) Completers may invoke finishCompleter() at any time, even after the
+ * initiator has invoked end(). finishCompleter() may be called from any
+ * thread.
+ * 4) startCompleter()/finishCompleter() calls "nest": for each call to
+ * startCompleter(), a corresponding call to finishCompleter() must be made.
+ * Once the last finishCompleter() is called, the Completer must no longer
+ * reference the completion object.
+ * 5) The Initiator invokes end() at the point where it has finished
+ * dispatching work to the Completers, and is prepared for the callback
+ * handler to be invoked. Note: if there are no outstanding Completers
+ * pending when the Initiator invokes end(), the callback will be invoked
+ * directly, and the sync parameter will be set true. This indicates to the
+ * Initiator that the callback is executing in the context of the end() call,
+ * and the Initiator is free to optimize the handling of the completion,
+ * assuming no need for synchronization with Completer threads.
+ */
- public:
- AsyncCompletion() : completionsNeeded(0), inCallback(false) {};
- virtual ~AsyncCompletion() { /* @todo KAG - assert(completionsNeeded.get() == 0); */ };
+class AsyncCompletion
+{
+ private:
+ mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
+ mutable qpid::sys::Monitor callbackLock;
+ bool inCallback, active;
- /** True when all outstanding operations have compeleted
- */
- bool isDone()
+ void invokeCallback(bool sync) {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ if (active) {
+ inCallback = true;
{
- qpid::sys::Mutex::ScopedLock l(callbackLock);
- return !inCallback && completionsNeeded.get() == 0;
+ qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+ completed(sync);
}
+ inCallback = false;
+ active = false;
+ callbackLock.notifyAll();
+ }
+ }
- /** Called to signal the start of an asynchronous operation. The operation
- * is considered pending until finishCompleter() is called.
- * E.g. called when initiating an async store operation.
- */
- void startCompleter() { ++completionsNeeded; }
-
- /** Called by completer to signal that it has finished the operation started
- * when startCompleter() was invoked.
- * e.g. called when async write complete.
- */
- void finishCompleter()
- {
- if (--completionsNeeded == 0) {
- invokeCallback(false);
- }
- }
+ protected:
+ /** Invoked when all completers have signalled that they have completed
+ * (via calls to finishCompleter()). bool == true if called via end()
+ */
+ virtual void completed(bool) = 0;
- /** called by initiator before any calls to startCompleter can be done.
- */
- void begin() { startCompleter(); };
+ public:
+ AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {};
+ virtual ~AsyncCompletion() { cancel(); }
- /** called by initiator after all potential completers have called
- * startCompleter().
- */
- void end(boost::shared_ptr<CompletionHandler> _handler)
- {
- assert(completionsNeeded.get() > 0); // ensure begin() has been called!
- handler = _handler;
- if (--completionsNeeded == 0) {
- invokeCallback(true);
- }
- }
+ /** True when all outstanding operations have compeleted
+ */
+ bool isDone()
+ {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ return !active;
+ }
- /** may be called by Initiator to cancel the callback registered by end()
- */
- void cancel() {
- qpid::sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- handler.reset();
- }
+ /** Called to signal the start of an asynchronous operation. The operation
+ * is considered pending until finishCompleter() is called.
+ * E.g. called when initiating an async store operation.
+ */
+ void startCompleter() { ++completionsNeeded; }
+
+ /** Called by completer to signal that it has finished the operation started
+ * when startCompleter() was invoked.
+ * e.g. called when async write complete.
+ */
+ void finishCompleter()
+ {
+ if (--completionsNeeded == 0) {
+ invokeCallback(false);
+ }
+ }
+
+ /** called by initiator before any calls to startCompleter can be done.
+ */
+ void begin()
+ {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ ++completionsNeeded;
+ }
+
+ /** called by initiator after all potential completers have called
+ * startCompleter().
+ */
+ void end()
+ {
+ assert(completionsNeeded.get() > 0); // ensure begin() has been called!
+ if (--completionsNeeded == 0) {
+ invokeCallback(true);
+ }
+ }
+
+ /** may be called by Initiator to cancel the callback. Will wait for
+ * callback to complete if in progress.
+ */
+ virtual void cancel() {
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
+ active = false;
+ }
- /** may be called by Initiator after all completers have been added but
- * prior to calling end(). Allows initiator to determine if it _really_
- * needs to wait for pending Completers (e.g. count > 1).
- */
- uint32_t getPendingCompleters() { return completionsNeeded.get(); }
- };
+ /** may be called by Initiator after all completers have been added but
+ * prior to calling end(). Allows initiator to determine if it _really_
+ * needs to wait for pending Completers (e.g. count > 1).
+ */
+ //uint32_t getPendingCompleters() { return completionsNeeded.get(); }
+};
- }} // qpid::broker::
-#endif /*!_Completion_*/
+}} // qpid::broker::
+#endif /*!_AsyncCompletion_*/
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 1cf81dfcf6..2370795d0d 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -60,6 +60,7 @@
#include "qpid/StringUtils.h"
#include "qpid/Url.h"
#include "qpid/Version.h"
+#include "qpid/sys/ClusterSafe.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -224,12 +225,19 @@ Broker::Broker(const Broker::Options& conf) :
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
- QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
queues.setQueueEvents(&queueEvents);
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
+ /** todo KAG - remove once cluster support for flow control done + (and ClusterSafe.h include above). */
+ if (sys::isCluster()) {
+ QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default.");
+ QueueFlowLimit::setDefaults(0, 0, 0);
+ } else {
+ QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+ }
+
// If no plugin store module registered itself, set up the null store.
if (NullMessageStore::isNullStore(store.get()))
setStore();
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index e22d0ec4b8..a84aa45d76 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -48,14 +48,15 @@ class PersistableMessage : public Persistable
sys::Mutex storeLock;
/**
+ * "Ingress" messages == messages sent _to_ the broker.
* Tracks the number of outstanding asynchronous operations that must
- * complete before the message can be considered safely received by the
+ * complete before an inbound message can be considered fully received by the
* broker. E.g. all enqueues have completed, the message has been written
* to store, credit has been replenished, etc. Once all outstanding
* operations have completed, the transfer of this message from the client
* may be considered complete.
*/
- AsyncCompletion receiveCompletion;
+ boost::shared_ptr<AsyncCompletion> ingressCompletion;
/**
* Tracks the number of outstanding asynchronous dequeue
@@ -113,9 +114,13 @@ class PersistableMessage : public Persistable
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
- QPID_BROKER_EXTERN bool isReceiveComplete() { return receiveCompletion.isDone(); }
- QPID_BROKER_EXTERN void enqueueStart() { receiveCompletion.startCompleter(); }
- QPID_BROKER_EXTERN void enqueueComplete() { receiveCompletion.finishCompleter(); }
+ /** track the progress of a message received by the broker - see ingressCompletion above */
+ QPID_BROKER_EXTERN bool isIngressComplete() { return !ingressCompletion || ingressCompletion->isDone(); }
+ QPID_BROKER_EXTERN boost::shared_ptr<AsyncCompletion>& getIngressCompletion() { return ingressCompletion; }
+ QPID_BROKER_EXTERN void setIngressCompletion(boost::shared_ptr<AsyncCompletion>& ic) { ingressCompletion = ic; }
+
+ QPID_BROKER_EXTERN void enqueueStart() { if (ingressCompletion) ingressCompletion->startCompleter(); }
+ QPID_BROKER_EXTERN void enqueueComplete() { if (ingressCompletion) ingressCompletion->finishCompleter(); }
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store);
@@ -131,8 +136,6 @@ class PersistableMessage : public Persistable
bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
-
- QPID_BROKER_EXTERN AsyncCompletion& getReceiveCompletion() { return receiveCompletion; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 8fc4a8ec39..3de93ed74e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -686,7 +686,7 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const
//NOTE: don't need to use checkLvqReplace() here as it
//is only relevant for LVQ which does not support persistence
//so the enqueueComplete check has no effect
- if ( i->payload->isReceiveComplete() ) count ++;
+ if ( i->payload->isIngressComplete() ) count ++;
}
return count;
@@ -1219,6 +1219,12 @@ void Queue::flush()
if (u.acquired && store) store->flush(*this);
}
+const Broker* Queue::getBroker()
+{
+ return broker;
+}
+
+
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 57ef3dae6b..5af630f3c8 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -362,6 +362,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void recoverPrepared(boost::intrusive_ptr<Message>& msg);
void flush();
+
+ const Broker* getBroker();
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 02a776fd70..f3e6e088b9 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -19,12 +19,15 @@
*
*/
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/broker/SessionState.h"
+#include "qpid/sys/ClusterSafe.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
@@ -92,7 +95,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
: queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
- flowStopped(false), count(0), size(0), queueMgmtObj(0)
+ flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
{
uint32_t maxCount(0);
uint64_t maxSize(0);
@@ -103,6 +106,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
maxSize = _queue->getPolicy()->getMaxSize();
maxCount = _queue->getPolicy()->getMaxCount();
}
+ broker = queue->getBroker();
}
validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName );
validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName );
@@ -140,7 +144,13 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
}
if (flowStopped || !index.empty()) {
- msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes
+ // ignore flow control if we are populating the queue due to cluster replication:
+ if (broker && broker->isClusterUpdatee()) {
+ QPID_LOG(error, "KAG: Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
+ return;
+ }
+ QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
+ msg.payload->getIngressCompletion()->startCompleter(); // don't complete until flow resumes
index.insert(msg.payload);
}
}
@@ -180,14 +190,14 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
// flow enabled - release all pending msgs
while (!index.empty()) {
std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
- (*itr)->getReceiveCompletion().finishCompleter();
+ (*itr)->getIngressCompletion()->finishCompleter();
index.erase(itr);
}
} else {
// even if flow controlled, we must release this msg as it is being dequeued
std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
if (itr != index.end()) { // this msg is flow controlled, release it:
- (*itr)->getReceiveCompletion().finishCompleter();
+ (*itr)->getIngressCompletion()->finishCompleter();
index.erase(itr);
}
}
@@ -195,6 +205,35 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
}
+/** used by clustering: is the given message's completion blocked due to flow
+ * control? True if message is blocked. (for the clustering updater: done
+ * after msgs have been replicated to the updatee).
+ */
+bool QueueFlowLimit::getState(const QueuedMessage& msg) const
+{
+ sys::Mutex::ScopedLock l(indexLock);
+ return (index.find(msg.payload) != index.end());
+}
+
+
+/** artificially force the flow control state of a given message
+ * (for the clustering updatee: done after msgs have been replicated to
+ * the updatee's queue)
+ */
+void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked)
+{
+ if (blocked && msg.payload) {
+
+ sys::Mutex::ScopedLock l(indexLock);
+ assert(index.find(msg.payload) == index.end());
+
+ QPID_LOG(error, "KAG TBD!!!: Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC");
+ // KAG TBD!!!
+ index.insert(msg.payload);
+ }
+}
+
+
void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject)
{
queueMgmtObj = mgmtObject;
@@ -284,6 +323,14 @@ std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue,
if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
return std::auto_ptr<QueueFlowLimit>();
}
+ /** todo KAG - remove once cluster support for flow control done. */
+ if (sys::isCluster()) {
+ if (queue) {
+ QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+ << queue->getName());
+ }
+ return std::auto_ptr<QueueFlowLimit>();
+ }
return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
flowStopSize, flowResumeSize));
}
@@ -293,6 +340,15 @@ std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue,
uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+ /** todo KAG - remove once cluster support for flow control done. */
+ if (sys::isCluster()) {
+ if (queue) {
+ QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+ << queue->getName());
+ }
+ return std::auto_ptr<QueueFlowLimit>();
+ }
+
return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize));
}
return std::auto_ptr<QueueFlowLimit>();
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 8533854dee..3686b1ff56 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -43,6 +43,8 @@ namespace _qmfBroker = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
+class Broker;
+
/**
* Producer flow control: when level is > flowStop*, flow control is ON.
* then level is < flowResume*, flow control is OFF. If == 0, flow control
@@ -82,6 +84,11 @@ class QueueFlowLimit
/** the queue has removed QueuedMessage. Returns true if flow state changes */
void dequeued(const QueuedMessage&);
+ /** for clustering: */
+ /** true if the given message is flow controlled, and cannot be completed. */
+ bool getState(const QueuedMessage&) const;
+ void setState(const QueuedMessage&, bool blocked);
+
uint32_t getFlowStopCount() const { return flowStopCount; }
uint32_t getFlowResumeCount() const { return flowResumeCount; }
uint64_t getFlowStopSize() const { return flowStopSize; }
@@ -103,10 +110,12 @@ class QueueFlowLimit
protected:
// msgs waiting for flow to become available.
std::set< boost::intrusive_ptr<Message> > index;
- qpid::sys::Mutex indexLock;
+ mutable qpid::sys::Mutex indexLock;
_qmfBroker::Queue *queueMgmtObj;
+ const Broker *broker;
+
QueueFlowLimit(Queue *queue,
uint32_t flowStopCount, uint32_t flowResumeCount,
uint64_t flowStopSize, uint64_t flowResumeSize);
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index d9af4b13c5..d572e37d00 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -61,7 +61,7 @@ SessionState::SessionState(
msgBuilder(&broker.getStore()),
mgmtObject(0),
rateFlowcontrol(0),
- scheduledRcvMsgs(new IncompleteRcvMsg::deque)
+ scheduledCmds(new std::list<SequenceNumber>)
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -95,20 +95,22 @@ SessionState::~SessionState() {
if (flowControlTimer)
flowControlTimer->cancel();
- // clean up any outstanding incomplete receive messages
- qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
- std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> copy(incompleteRcvMsgs);
- incompleteRcvMsgs.clear();
+ // clean up any outstanding incomplete commands
+ qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
+ incompleteCmds.clear();
while (!copy.empty()) {
- boost::shared_ptr<IncompleteRcvMsg> ref(copy.begin()->second);
+ boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
copy.erase(copy.begin());
{
// note: need to drop lock, as callback may attempt to take it.
- qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock);
+ qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
ref->cancel();
}
}
- scheduledRcvMsgs->clear(); // no need to lock - shared with IO thread.
+ // At this point, we are guaranteed no further completion callbacks will be
+ // made.
+ scheduledCmds->clear(); // keeps IO thread from running more completions.
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -265,10 +267,12 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
}
msg->setPublisher(&getConnection());
- msg->getReceiveCompletion().begin();
+ boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
+ msg->setIngressCompletion( ac );
+ ac->begin();
semanticState.handle(msg);
msgBuilder.end();
- msg->getReceiveCompletion().end( createPendingMsg(msg) ); // allows msg to complete
+ ac->end(); // allows msg to complete xfer
}
// Handle producer session flow control
@@ -323,13 +327,15 @@ void SessionState::sendAcceptAndCompletion()
* its credit has been accounted for, etc). At this point, msg is considered
* by this receiver as 'completed' (as defined by AMQP 0_10)
*/
-void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg)
+void SessionState::completeRcvMsg(SequenceNumber id,
+ bool requiresAccept,
+ bool requiresSync)
{
bool callSendCompletion = false;
- receiverCompleted(msg->getCommandId());
- if (msg->requiresAccept())
+ receiverCompleted(id);
+ if (requiresAccept)
// will cause msg's seq to appear in the next message.accept we send.
- accepted.add(msg->getCommandId());
+ accepted.add(id);
// Are there any outstanding Execution.Sync commands pending the
// completion of this msg? If so, complete them.
@@ -343,7 +349,7 @@ void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> ms
}
// if the sender has requested immediate notification of the completion...
- if (msg->getFrames().getMethod()->isSync()) {
+ if (requiresSync) {
sendAcceptAndCompletion();
} else if (callSendCompletion) {
sendCompletion();
@@ -435,70 +441,83 @@ void SessionState::addPendingExecutionSync()
}
+/** factory for creating IncompleteIngressMsgXfer objects which
+ * can be references from Messages as ingress AsyncCompletion objects.
+ */
+boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
+SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
+{
+ SequenceNumber id = msg->getCommandId();
+ boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
+ qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+ incompleteCmds[id] = cmd;
+ return cmd;
+}
+
+
/** Invoked by the asynchronous completer associated with
* a received msg that is pending Completion. May be invoked
* by the SessionState directly (sync == true), or some external
* entity (!sync).
*/
-void SessionState::IncompleteRcvMsg::operator() (bool sync)
+void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
- QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync);
-
- qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
- std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr>::iterator i = session->incompleteRcvMsgs.find(this);
- if (i != session->incompleteRcvMsgs.end()) {
- boost::shared_ptr<IncompleteRcvMsg> tmp(i->second);
- session->incompleteRcvMsgs.erase(i);
-
- if (session->isAttached()) {
- if (sync) {
- qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteRcvMsgsLock);
- QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
- session->completeRcvMsg(msg);
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
+ if (!sync) {
+ // note well: this path may execute in any thread.
+ QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
+ session->scheduledCmds->push_back(id);
+ if (session->scheduledCmds->size() == 1) {
+ session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
+ session->scheduledCmds,
+ session));
+ }
+ } else { // command is being completed in IO thread.
+ // this path runs only on the IO thread.
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+ cmd = session->incompleteCmds.find(id);
+ if (cmd != session->incompleteCmds.end()) {
+ boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+ session->incompleteCmds.erase(cmd);
+
+ if (session->isAttached()) {
+ QPID_LOG(debug, ": receive completed for msg seq=" << id);
+ qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
+ session->completeRcvMsg(id, requiresAccept, requiresSync);
return;
- } else { // potentially called from a different thread
- QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
- session->scheduledRcvMsgs->push_back(tmp);
- if (session->scheduledRcvMsgs->size() == 1) {
- session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
- session->scheduledRcvMsgs));
- }
}
}
}
}
-/** Scheduled from IncompleteRcvMsg callback, completes all pending message
- * receives asynchronously.
+/** Scheduled from incomplete command's completed callback, safely completes all
+ * completed commands in the IO Thread. Guaranteed not to be running at the same
+ * time as the message receive code.
*/
-void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<deque> msgs)
+void SessionState::scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > completedCmds,
+ SessionState *session)
{
- while (!msgs->empty()) {
- boost::shared_ptr<IncompleteRcvMsg> iMsg = msgs->front();
- msgs->pop_front();
- QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
- if (iMsg->session && iMsg->session->isAttached()) {
- QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId());
- iMsg->session->completeRcvMsg(iMsg->msg);
+ // when session is destroyed, it clears the list below. If the list is empty,
+ // the passed session pointer is not valid - do nothing.
+ if (completedCmds->empty()) return;
+
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
+ std::list<SequenceNumber> cmds(*completedCmds); // make copy so we can drop lock
+ completedCmds->clear();
+
+ while (!cmds.empty()) {
+ SequenceNumber id = cmds.front();
+ cmds.pop_front();
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+
+ cmd = session->incompleteCmds.find(id);
+ if (cmd != session->incompleteCmds.end()) {
+ qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
+ cmd->second->do_completion(); // retakes lock
}
}
}
-/** Cancels a pending incomplete receive message completion callback. Note
- * well: will wait for the callback to finish if it is currently in progress
- * on another thread.
- */
-void SessionState::IncompleteRcvMsg::cancel()
-{
- QPID_LOG(debug, session->getId() << ": cancelling outstanding completion for msg seq=" << msg->getCommandId());
- // Cancel the message complete callback. On return, we are guaranteed there
- // will be no outstanding calls to SessionState::IncompleteRcvMsg::operator() (bool sync)
- msg->getReceiveCompletion().cancel();
- // there may be calls to SessionState::IncompleteRcvMsg::scheduledCompleter() pending,
- // clear the session so scheduledCompleter() will ignore this IncompleteRcvMsg.
- session = 0;
-}
-
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index fc9fec7871..f4c10295b1 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -134,7 +134,7 @@ class SessionState : public qpid::SessionState,
// indicate that the given ingress msg has been completely received by the
// broker, and the msg's message.transfer command can be considered completed.
- void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg);
+ void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
@@ -172,41 +172,65 @@ class SessionState : public qpid::SessionState,
std::queue<SequenceNumber> pendingExecutionSyncs;
bool currentCommandComplete;
- // A list of ingress messages whose message.transfer command is pending
- // completion. These messages are awaiting some set of asynchronous
- // operations to complete (eg: store, flow-control, etc). before
- // the message.transfer can be completed.
- class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler
+ /** Abstract class that represents a command that is pending
+ * completion.
+ */
+ class IncompleteCommandContext : public AsyncCompletion
{
- public:
- IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg)
- : session(&_session), msg(_msg) {}
- virtual void operator() (bool sync); // invoked when msg is completed.
- void cancel(); // cancel pending incomplete callback [operator() above].
+ public:
+ IncompleteCommandContext( SessionState *ss, SequenceNumber _id )
+ : id(_id), session(ss) {}
+ virtual ~IncompleteCommandContext() {}
+
+ /* allows manual invokation of completion, used by IO thread to
+ * complete a command that was originally finished on a different
+ * thread.
+ */
+ void do_completion() { completed(true); }
+
+ protected:
+ SequenceNumber id;
+ SessionState *session;
+ };
- typedef boost::shared_ptr<IncompleteRcvMsg> shared_ptr;
- typedef std::deque<shared_ptr> deque;
+ /** incomplete Message.transfer commands - inbound to broker from client
+ */
+ class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext
+ {
+ public:
+ IncompleteIngressMsgXfer( SessionState *ss,
+ SequenceNumber _id,
+ boost::intrusive_ptr<Message> msg )
+ : IncompleteCommandContext(ss, _id),
+ requiresAccept(msg->requiresAccept()),
+ requiresSync(msg->getFrames().getMethod()->isSync()) {};
+ virtual ~IncompleteIngressMsgXfer() {};
+
+ protected:
+ virtual void completed(bool);
+
+ private:
+ /** meta-info required to complete the message */
+ bool requiresAccept;
+ bool requiresSync; // method's isSync() flag
+ };
+ /** creates a command context suitable for use as an AsyncCompletion in a message */
+ boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg);
- private:
- SessionState *session;
- boost::intrusive_ptr<Message> msg;
+ /* A list of commands that are pending completion. These commands are
+ * awaiting some set of asynchronous operations to finish (eg: store,
+ * flow-control, etc). before the command can be completed to the client
+ */
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds;
+ // identifies those commands in incompleteCmds that are waiting for IO thread to run in order to be completed.
+ boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds;
+ qpid::sys::Mutex incompleteCmdsLock; // locks both above containers
- static void scheduledCompleter(boost::shared_ptr<deque>);
- };
- std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> incompleteRcvMsgs; // msgs pending completion
- qpid::sys::Mutex incompleteRcvMsgsLock;
- boost::shared_ptr<IncompleteRcvMsg> createPendingMsg(boost::intrusive_ptr<Message>& msg) {
- boost::shared_ptr<IncompleteRcvMsg> pending(new IncompleteRcvMsg(*this, msg));
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(incompleteRcvMsgsLock);
- incompleteRcvMsgs[pending.get()] = pending;
- return pending;
- }
-
- // holds msgs waiting for IO thread to run scheduledCompleter()
- boost::shared_ptr<IncompleteRcvMsg::deque> scheduledRcvMsgs;
+ /** runs in IO thread, completes commands that where finished asynchronously. */
+ static void scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds,
+ SessionState *session);
friend class SessionManager;
- friend class IncompleteRcvMsg;
};
diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h
index a1b140d484..baca14cf4e 100644
--- a/qpid/cpp/src/tests/MessageUtils.h
+++ b/qpid/cpp/src/tests/MessageUtils.h
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Message.h"
+#include "qpid/broker/AsyncCompletion.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/Uuid.h"
@@ -28,6 +29,17 @@ using namespace qpid;
using namespace broker;
using namespace framing;
+namespace {
+ class DummyCompletion : public AsyncCompletion
+ {
+ public:
+ DummyCompletion() {}
+ virtual ~DummyCompletion() {}
+ protected:
+ void completed(bool) {}
+ };
+}
+
namespace qpid {
namespace tests {
@@ -50,6 +62,8 @@ struct MessageUtils
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
if (durable)
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
+ boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
+ msg->setIngressCompletion(dc);
return msg;
}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index 9b6e423b02..4d63d9bd97 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -88,6 +88,8 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK
msg->getFrames().append(method);
msg->getFrames().append(header);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
+ msg->setIngressCompletion(dc);
return msg;
}
diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp
index ffb0125302..210abf0a5b 100644
--- a/qpid/cpp/src/tests/TxPublishTest.cpp
+++ b/qpid/cpp/src/tests/TxPublishTest.cpp
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare)
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
- BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isReceiveComplete());
+ BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete());
}
QPID_AUTO_TEST_CASE(testCommit)
@@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit)
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
- BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isReceiveComplete());
+ BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete());
BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 98f58ebfdd..6e771bf5d6 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -29,6 +29,7 @@ from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
from logging import getLogger
+import qmf.console
log = getLogger("qpid.brokertest")
@@ -327,6 +328,10 @@ class Broker(Popen):
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
self._log_ready = False
+ def startQmf(self, handler=None):
+ self.qmf_session = qmf.console.Session(handler)
+ self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+
def host(self): return self._host
def port(self):
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index cbad4010b4..8ff83ade60 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -304,6 +304,180 @@ acl allow all all
# Verify logs are consistent
cluster_test_logs.verify_logs()
+ def test_queue_flowlimit(self):
+ """Verify that the queue's flowlimit configuration and state are
+ correctly replicated.
+ """
+ return; # @todo enable once flow control works in clusters
+ # start a cluster of two brokers
+ args = ["--log-enable=info+:broker"]
+ cluster = self.cluster(2, args)
+
+ # configure a queue with a specific flow limit on broker 0
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ cluster[0].startQmf()
+ for q in cluster[0].qmf_session.getObjects(_class="queue"):
+ if q.name == "flq":
+ oid = q.getObjectId()
+ break
+ self.assertEqual(q.name, "flq")
+ self.assertEqual(q.flowStopCount, 5)
+ self.assertEqual(q.flowResumeCount, 3)
+ self.assertFalse(q.flowStopped)
+
+ # verify both brokers in cluster have same configuration
+ cluster[1].startQmf()
+ qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q = qs[0]
+ self.assertEqual(q.name, "flq")
+ self.assertEqual(q.flowStopCount, 5)
+ self.assertEqual(q.flowResumeCount, 3)
+ self.assertFalse(q.flowStopped)
+
+ # fill the queue on one broker until flow control is active
+ class BlockedSender(Thread):
+ def __init__(self): Thread.__init__(self)
+ def run(self):
+ for x in range(6):
+ s0.send(Message(str(x)))
+
+ sender = BlockedSender()
+ sender.start()
+
+ start = time.time()
+ while time.time() < start + 5:
+ q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
+ if q.flowStopped:
+ break;
+ self.assertTrue(q.flowStopped)
+
+ # verify flow control is active on other broker.
+ q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
+ self.assertTrue(q.flowStopped)
+
+ # now drain the queue using a session to the other broker
+ ssn1 = cluster[1].connect().session()
+ r1 = ssn1.receiver("flq", capacity=6)
+ try:
+ while r1.fetch(timeout=0):
+ ssn1.acknowledge()
+ except Empty:
+ pass
+ sender.join()
+
+ # and verify both brokers see an unblocked queue
+ q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
+ self.assertFalse(q.flowStopped)
+ q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
+ self.assertFalse(q.flowStopped)
+
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
+
+
+ def test_queue_flowlimit_join(self):
+ """Verify that the queue's flowlimit configuration and state are
+ correctly replicated to a newly joined broker.
+ """
+ return; # @todo enable once flow control works in clusters
+ # start a cluster of two brokers
+ #args = ["--log-enable=info+:broker"]
+ args = ["--log-enable=debug"]
+ cluster = self.cluster(2, args)
+
+ # configure a queue with a specific flow limit on broker 0
+ ssn0 = cluster[0].connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ cluster[0].startQmf()
+ for q in cluster[0].qmf_session.getObjects(_class="queue"):
+ if q.name == "flq":
+ oid = q.getObjectId()
+ break
+ self.assertEqual(q.name, "flq")
+ self.assertEqual(q.flowStopCount, 5)
+ self.assertEqual(q.flowResumeCount, 3)
+ self.assertFalse(q.flowStopped)
+
+ # verify both brokers in cluster have same configuration
+ cluster[1].startQmf()
+ qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q = qs[0]
+ self.assertEqual(q.name, "flq")
+ self.assertEqual(q.flowStopCount, 5)
+ self.assertEqual(q.flowResumeCount, 3)
+ self.assertFalse(q.flowStopped)
+
+ # fill the queue on one broker until flow control is active
+ class BlockedSender(Thread):
+ def __init__(self): Thread.__init__(self)
+ def run(self):
+ for x in range(6):
+ s0.send(Message(str(x)))
+
+ sender = BlockedSender()
+ sender.start()
+
+ start = time.time()
+ while time.time() < start + 5:
+ q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
+ if q.flowStopped:
+ break;
+ self.assertTrue(q.flowStopped)
+
+ # verify flow control is active on other broker.
+ q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
+ self.assertTrue(q.flowStopped)
+
+ # add a new broker to the cluster
+ print("Start")
+ cluster.start()
+ print("Start Done")
+
+ # todo: enable verification:
+ # cluster[2].startQmf()
+ # qs = cluster[2].qmf_session.getObjects(_objectId=oid)
+ # self.assertEqual(len(qs), 1)
+ # q = qs[0]
+ # self.assertEqual(q.name, "flq")
+ # self.assertEqual(q.flowStopCount, 5)
+ # self.assertEqual(q.flowResumeCount, 3)
+ # self.assertEqual(q.msgDepth, 5)
+ # self.assertFalse(q.flowStopped)
+ # q = cluster[2].qmf_session.getObjects(_objectId=oid)[0]
+ # self.assertTrue(q.flowStopped)
+
+ # verify new member's queue config
+ # verify new member's queue flow setting
+
+
+
+
+ # now drain the queue using a session to the other broker
+ ssn1 = cluster[1].connect().session()
+ r1 = ssn1.receiver("flq", capacity=6)
+ try:
+ while r1.fetch(timeout=1):
+ ssn1.acknowledge()
+ except Empty:
+ pass
+ sender.join()
+
+ # and verify both brokers see an unblocked queue
+ q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
+ self.assertFalse(q.flowStopped)
+ q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
+ self.assertFalse(q.flowStopped)
+
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
+
+
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):