diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-16 18:58:06 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-16 18:58:06 +0000 |
commit | ace71e8ab4846cab5258481da7553439a73ed242 (patch) | |
tree | f7dc599c7077b9d7e3e2f874118defd0a4c0189f | |
parent | fd96d1ca556ffdcec5fa78bb155130786ed06612 (diff) | |
download | qpid-python-ace71e8ab4846cab5258481da7553439a73ed242.tar.gz |
QPID-2935: move completion from msg to session cmd context.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1071360 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCompletion.h | 265 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableMessage.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 64 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 141 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 84 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessageUtils.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TxPublishTest.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 5 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 174 |
14 files changed, 560 insertions, 241 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index c66609f8a6..1f3d11e0ee 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -1,5 +1,5 @@ -#ifndef _Completion_ -#define _Completion_ +#ifndef _AsyncCompletion_ +#define _AsyncCompletion_ /* * @@ -26,148 +26,145 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Monitor.h" -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> namespace qpid { - namespace broker { - - /** - * Class to implement asynchronous notification of completion. - * - * Use-case: An "initiator" needs to wait for a set of "completers" to - * finish a unit of work before an action can occur. This object - * tracks the progress of the set of completers, and allows the action - * to occur once all completers have signalled that they are done. - * - * The initiator and completers may be running in separate threads. - * - * The initiating thread is the thread that initiates the action, - * i.e. the connection read thread. - * - * A completing thread is any thread that contributes to completion, - * e.g. a store thread that does an async write. - * There may be zero or more completers. - * - * When the work is complete, a callback is invoked. The callback - * may be invoked in the Initiator thread, or one of the Completer - * threads. The callback is passed a flag indicating whether or not - * the callback is running under the context of the Initiator thread. - * - * Use model: - * 1) Initiator thread invokes begin() - * 2) After begin() has been invoked, zero or more Completers invoke - * startCompleter(). Completers may be running in the same or - * different thread as the Initiator, as long as they guarantee that - * startCompleter() is invoked at least once before the Initiator invokes end(). - * 3) Completers may invoke finishCompleter() at any time, even after the - * initiator has invoked end(). finishCompleter() may be called from any - * thread. - * 4) startCompleter()/finishCompleter() calls "nest": for each call to - * startCompleter(), a corresponding call to finishCompleter() must be made. - * Once the last finishCompleter() is called, the Completer must no longer - * reference the completion object. - * 5) The Initiator invokes end() at the point where it has finished - * dispatching work to the Completers, and is prepared for the callback - * handler to be invoked. Note: if there are no outstanding Completers - * pending when the Initiator invokes end(), the callback will be invoked - * directly, and the sync parameter will be set true. This indicates to the - * Initiator that the callback is executing in the context of the end() call, - * and the Initiator is free to optimize the handling of the completion, - * assuming no need for synchronization with Completer threads. - */ - class AsyncCompletion { - public: - // encapsulates the completion callback handler - class CompletionHandler { - public: - virtual void operator() (bool) { /* bool == true if called via end() */} - }; - - private: - mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded; - mutable qpid::sys::Monitor callbackLock; - bool inCallback; - void invokeCallback(bool sync) { - qpid::sys::Mutex::ScopedLock l(callbackLock); - inCallback = true; - if (handler) { - boost::shared_ptr<CompletionHandler> tmp; - tmp.swap(handler); - { - qpid::sys::Mutex::ScopedUnlock ul(callbackLock); - (*tmp)(sync); - } - } - inCallback = false; - callbackLock.notifyAll(); - } +namespace broker { - protected: - /** Invoked when all completers have signalled that they have completed - * (via calls to finishCompleter()). - */ - boost::shared_ptr<CompletionHandler> handler; +/** + * Class to implement asynchronous notification of completion. + * + * Use-case: An "initiator" needs to wait for a set of "completers" to + * finish a unit of work before an action can occur. This object + * tracks the progress of the set of completers, and allows the action + * to occur once all completers have signalled that they are done. + * + * The initiator and completers may be running in separate threads. + * + * The initiating thread is the thread that initiates the action, + * i.e. the connection read thread. + * + * A completing thread is any thread that contributes to completion, + * e.g. a store thread that does an async write. + * There may be zero or more completers. + * + * When the work is complete, a callback is invoked. The callback + * may be invoked in the Initiator thread, or one of the Completer + * threads. The callback is passed a flag indicating whether or not + * the callback is running under the context of the Initiator thread. + * + * Use model: + * 1) Initiator thread invokes begin() + * 2) After begin() has been invoked, zero or more Completers invoke + * startCompleter(). Completers may be running in the same or + * different thread as the Initiator, as long as they guarantee that + * startCompleter() is invoked at least once before the Initiator invokes end(). + * 3) Completers may invoke finishCompleter() at any time, even after the + * initiator has invoked end(). finishCompleter() may be called from any + * thread. + * 4) startCompleter()/finishCompleter() calls "nest": for each call to + * startCompleter(), a corresponding call to finishCompleter() must be made. + * Once the last finishCompleter() is called, the Completer must no longer + * reference the completion object. + * 5) The Initiator invokes end() at the point where it has finished + * dispatching work to the Completers, and is prepared for the callback + * handler to be invoked. Note: if there are no outstanding Completers + * pending when the Initiator invokes end(), the callback will be invoked + * directly, and the sync parameter will be set true. This indicates to the + * Initiator that the callback is executing in the context of the end() call, + * and the Initiator is free to optimize the handling of the completion, + * assuming no need for synchronization with Completer threads. + */ - public: - AsyncCompletion() : completionsNeeded(0), inCallback(false) {}; - virtual ~AsyncCompletion() { /* @todo KAG - assert(completionsNeeded.get() == 0); */ }; +class AsyncCompletion +{ + private: + mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded; + mutable qpid::sys::Monitor callbackLock; + bool inCallback, active; - /** True when all outstanding operations have compeleted - */ - bool isDone() + void invokeCallback(bool sync) { + qpid::sys::Mutex::ScopedLock l(callbackLock); + if (active) { + inCallback = true; { - qpid::sys::Mutex::ScopedLock l(callbackLock); - return !inCallback && completionsNeeded.get() == 0; + qpid::sys::Mutex::ScopedUnlock ul(callbackLock); + completed(sync); } + inCallback = false; + active = false; + callbackLock.notifyAll(); + } + } - /** Called to signal the start of an asynchronous operation. The operation - * is considered pending until finishCompleter() is called. - * E.g. called when initiating an async store operation. - */ - void startCompleter() { ++completionsNeeded; } - - /** Called by completer to signal that it has finished the operation started - * when startCompleter() was invoked. - * e.g. called when async write complete. - */ - void finishCompleter() - { - if (--completionsNeeded == 0) { - invokeCallback(false); - } - } + protected: + /** Invoked when all completers have signalled that they have completed + * (via calls to finishCompleter()). bool == true if called via end() + */ + virtual void completed(bool) = 0; - /** called by initiator before any calls to startCompleter can be done. - */ - void begin() { startCompleter(); }; + public: + AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {}; + virtual ~AsyncCompletion() { cancel(); } - /** called by initiator after all potential completers have called - * startCompleter(). - */ - void end(boost::shared_ptr<CompletionHandler> _handler) - { - assert(completionsNeeded.get() > 0); // ensure begin() has been called! - handler = _handler; - if (--completionsNeeded == 0) { - invokeCallback(true); - } - } + /** True when all outstanding operations have compeleted + */ + bool isDone() + { + qpid::sys::Mutex::ScopedLock l(callbackLock); + return !active; + } - /** may be called by Initiator to cancel the callback registered by end() - */ - void cancel() { - qpid::sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - handler.reset(); - } + /** Called to signal the start of an asynchronous operation. The operation + * is considered pending until finishCompleter() is called. + * E.g. called when initiating an async store operation. + */ + void startCompleter() { ++completionsNeeded; } + + /** Called by completer to signal that it has finished the operation started + * when startCompleter() was invoked. + * e.g. called when async write complete. + */ + void finishCompleter() + { + if (--completionsNeeded == 0) { + invokeCallback(false); + } + } + + /** called by initiator before any calls to startCompleter can be done. + */ + void begin() + { + qpid::sys::Mutex::ScopedLock l(callbackLock); + ++completionsNeeded; + } + + /** called by initiator after all potential completers have called + * startCompleter(). + */ + void end() + { + assert(completionsNeeded.get() > 0); // ensure begin() has been called! + if (--completionsNeeded == 0) { + invokeCallback(true); + } + } + + /** may be called by Initiator to cancel the callback. Will wait for + * callback to complete if in progress. + */ + virtual void cancel() { + qpid::sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); + active = false; + } - /** may be called by Initiator after all completers have been added but - * prior to calling end(). Allows initiator to determine if it _really_ - * needs to wait for pending Completers (e.g. count > 1). - */ - uint32_t getPendingCompleters() { return completionsNeeded.get(); } - }; + /** may be called by Initiator after all completers have been added but + * prior to calling end(). Allows initiator to determine if it _really_ + * needs to wait for pending Completers (e.g. count > 1). + */ + //uint32_t getPendingCompleters() { return completionsNeeded.get(); } +}; - }} // qpid::broker:: -#endif /*!_Completion_*/ +}} // qpid::broker:: +#endif /*!_AsyncCompletion_*/ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 1cf81dfcf6..2370795d0d 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -60,6 +60,7 @@ #include "qpid/StringUtils.h" #include "qpid/Url.h" #include "qpid/Version.h" +#include "qpid/sys/ClusterSafe.h" #include <boost/bind.hpp> #include <boost/format.hpp> @@ -224,12 +225,19 @@ Broker::Broker(const Broker::Options& conf) : } QueuePolicy::setDefaultMaxSize(conf.queueLimit); - QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); queues.setQueueEvents(&queueEvents); // Early-Initialize plugins Plugin::earlyInitAll(*this); + /** todo KAG - remove once cluster support for flow control done + (and ClusterSafe.h include above). */ + if (sys::isCluster()) { + QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default."); + QueueFlowLimit::setDefaults(0, 0, 0); + } else { + QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); + } + // If no plugin store module registered itself, set up the null store. if (NullMessageStore::isNullStore(store.get())) setStore(); diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index e22d0ec4b8..a84aa45d76 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -48,14 +48,15 @@ class PersistableMessage : public Persistable sys::Mutex storeLock; /** + * "Ingress" messages == messages sent _to_ the broker. * Tracks the number of outstanding asynchronous operations that must - * complete before the message can be considered safely received by the + * complete before an inbound message can be considered fully received by the * broker. E.g. all enqueues have completed, the message has been written * to store, credit has been replenished, etc. Once all outstanding * operations have completed, the transfer of this message from the client * may be considered complete. */ - AsyncCompletion receiveCompletion; + boost::shared_ptr<AsyncCompletion> ingressCompletion; /** * Tracks the number of outstanding asynchronous dequeue @@ -113,9 +114,13 @@ class PersistableMessage : public Persistable virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; - QPID_BROKER_EXTERN bool isReceiveComplete() { return receiveCompletion.isDone(); } - QPID_BROKER_EXTERN void enqueueStart() { receiveCompletion.startCompleter(); } - QPID_BROKER_EXTERN void enqueueComplete() { receiveCompletion.finishCompleter(); } + /** track the progress of a message received by the broker - see ingressCompletion above */ + QPID_BROKER_EXTERN bool isIngressComplete() { return !ingressCompletion || ingressCompletion->isDone(); } + QPID_BROKER_EXTERN boost::shared_ptr<AsyncCompletion>& getIngressCompletion() { return ingressCompletion; } + QPID_BROKER_EXTERN void setIngressCompletion(boost::shared_ptr<AsyncCompletion>& ic) { ingressCompletion = ic; } + + QPID_BROKER_EXTERN void enqueueStart() { if (ingressCompletion) ingressCompletion->startCompleter(); } + QPID_BROKER_EXTERN void enqueueComplete() { if (ingressCompletion) ingressCompletion->finishCompleter(); } QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); @@ -131,8 +136,6 @@ class PersistableMessage : public Persistable bool isStoredOnQueue(PersistableQueue::shared_ptr queue); void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); - - QPID_BROKER_EXTERN AsyncCompletion& getReceiveCompletion() { return receiveCompletion; } }; }} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 8fc4a8ec39..3de93ed74e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -686,7 +686,7 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const //NOTE: don't need to use checkLvqReplace() here as it //is only relevant for LVQ which does not support persistence //so the enqueueComplete check has no effect - if ( i->payload->isReceiveComplete() ) count ++; + if ( i->payload->isIngressComplete() ) count ++; } return count; @@ -1219,6 +1219,12 @@ void Queue::flush() if (u.acquired && store) store->flush(*this); } +const Broker* Queue::getBroker() +{ + return broker; +} + + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 57ef3dae6b..5af630f3c8 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -362,6 +362,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, void recoverPrepared(boost::intrusive_ptr<Message>& msg); void flush(); + + const Broker* getBroker(); }; } } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 02a776fd70..f3e6e088b9 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -19,12 +19,15 @@ * */ #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/Exception.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include "qpid/sys/Mutex.h" +#include "qpid/broker/SessionState.h" +#include "qpid/sys/ClusterSafe.h" #include "qmf/org/apache/qpid/broker/Queue.h" @@ -92,7 +95,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, : queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), - flowStopped(false), count(0), size(0), queueMgmtObj(0) + flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) { uint32_t maxCount(0); uint64_t maxSize(0); @@ -103,6 +106,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, maxSize = _queue->getPolicy()->getMaxSize(); maxCount = _queue->getPolicy()->getMaxCount(); } + broker = queue->getBroker(); } validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName ); validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName ); @@ -140,7 +144,13 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) } if (flowStopped || !index.empty()) { - msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes + // ignore flow control if we are populating the queue due to cluster replication: + if (broker && broker->isClusterUpdatee()) { + QPID_LOG(error, "KAG: Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position); + return; + } + QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); + msg.payload->getIngressCompletion()->startCompleter(); // don't complete until flow resumes index.insert(msg.payload); } } @@ -180,14 +190,14 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) // flow enabled - release all pending msgs while (!index.empty()) { std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - (*itr)->getReceiveCompletion().finishCompleter(); + (*itr)->getIngressCompletion()->finishCompleter(); index.erase(itr); } } else { // even if flow controlled, we must release this msg as it is being dequeued std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); if (itr != index.end()) { // this msg is flow controlled, release it: - (*itr)->getReceiveCompletion().finishCompleter(); + (*itr)->getIngressCompletion()->finishCompleter(); index.erase(itr); } } @@ -195,6 +205,35 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) } +/** used by clustering: is the given message's completion blocked due to flow + * control? True if message is blocked. (for the clustering updater: done + * after msgs have been replicated to the updatee). + */ +bool QueueFlowLimit::getState(const QueuedMessage& msg) const +{ + sys::Mutex::ScopedLock l(indexLock); + return (index.find(msg.payload) != index.end()); +} + + +/** artificially force the flow control state of a given message + * (for the clustering updatee: done after msgs have been replicated to + * the updatee's queue) + */ +void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked) +{ + if (blocked && msg.payload) { + + sys::Mutex::ScopedLock l(indexLock); + assert(index.find(msg.payload) == index.end()); + + QPID_LOG(error, "KAG TBD!!!: Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC"); + // KAG TBD!!! + index.insert(msg.payload); + } +} + + void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject) { queueMgmtObj = mgmtObject; @@ -284,6 +323,14 @@ std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue, if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control return std::auto_ptr<QueueFlowLimit>(); } + /** todo KAG - remove once cluster support for flow control done. */ + if (sys::isCluster()) { + if (queue) { + QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " + << queue->getName()); + } + return std::auto_ptr<QueueFlowLimit>(); + } return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } @@ -293,6 +340,15 @@ std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue, uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); + /** todo KAG - remove once cluster support for flow control done. */ + if (sys::isCluster()) { + if (queue) { + QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " + << queue->getName()); + } + return std::auto_ptr<QueueFlowLimit>(); + } + return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize)); } return std::auto_ptr<QueueFlowLimit>(); diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 8533854dee..3686b1ff56 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -43,6 +43,8 @@ namespace _qmfBroker = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { +class Broker; + /** * Producer flow control: when level is > flowStop*, flow control is ON. * then level is < flowResume*, flow control is OFF. If == 0, flow control @@ -82,6 +84,11 @@ class QueueFlowLimit /** the queue has removed QueuedMessage. Returns true if flow state changes */ void dequeued(const QueuedMessage&); + /** for clustering: */ + /** true if the given message is flow controlled, and cannot be completed. */ + bool getState(const QueuedMessage&) const; + void setState(const QueuedMessage&, bool blocked); + uint32_t getFlowStopCount() const { return flowStopCount; } uint32_t getFlowResumeCount() const { return flowResumeCount; } uint64_t getFlowStopSize() const { return flowStopSize; } @@ -103,10 +110,12 @@ class QueueFlowLimit protected: // msgs waiting for flow to become available. std::set< boost::intrusive_ptr<Message> > index; - qpid::sys::Mutex indexLock; + mutable qpid::sys::Mutex indexLock; _qmfBroker::Queue *queueMgmtObj; + const Broker *broker; + QueueFlowLimit(Queue *queue, uint32_t flowStopCount, uint32_t flowResumeCount, uint64_t flowStopSize, uint64_t flowResumeSize); diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index d9af4b13c5..d572e37d00 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -61,7 +61,7 @@ SessionState::SessionState( msgBuilder(&broker.getStore()), mgmtObject(0), rateFlowcontrol(0), - scheduledRcvMsgs(new IncompleteRcvMsg::deque) + scheduledCmds(new std::list<SequenceNumber>) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -95,20 +95,22 @@ SessionState::~SessionState() { if (flowControlTimer) flowControlTimer->cancel(); - // clean up any outstanding incomplete receive messages - qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock); - std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> copy(incompleteRcvMsgs); - incompleteRcvMsgs.clear(); + // clean up any outstanding incomplete commands + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); + incompleteCmds.clear(); while (!copy.empty()) { - boost::shared_ptr<IncompleteRcvMsg> ref(copy.begin()->second); + boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); copy.erase(copy.begin()); { // note: need to drop lock, as callback may attempt to take it. - qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock); + qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); ref->cancel(); } } - scheduledRcvMsgs->clear(); // no need to lock - shared with IO thread. + // At this point, we are guaranteed no further completion callbacks will be + // made. + scheduledCmds->clear(); // keeps IO thread from running more completions. } AMQP_ClientProxy& SessionState::getProxy() { @@ -265,10 +267,12 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) } msg->setPublisher(&getConnection()); - msg->getReceiveCompletion().begin(); + boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg))); + msg->setIngressCompletion( ac ); + ac->begin(); semanticState.handle(msg); msgBuilder.end(); - msg->getReceiveCompletion().end( createPendingMsg(msg) ); // allows msg to complete + ac->end(); // allows msg to complete xfer } // Handle producer session flow control @@ -323,13 +327,15 @@ void SessionState::sendAcceptAndCompletion() * its credit has been accounted for, etc). At this point, msg is considered * by this receiver as 'completed' (as defined by AMQP 0_10) */ -void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg) +void SessionState::completeRcvMsg(SequenceNumber id, + bool requiresAccept, + bool requiresSync) { bool callSendCompletion = false; - receiverCompleted(msg->getCommandId()); - if (msg->requiresAccept()) + receiverCompleted(id); + if (requiresAccept) // will cause msg's seq to appear in the next message.accept we send. - accepted.add(msg->getCommandId()); + accepted.add(id); // Are there any outstanding Execution.Sync commands pending the // completion of this msg? If so, complete them. @@ -343,7 +349,7 @@ void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> ms } // if the sender has requested immediate notification of the completion... - if (msg->getFrames().getMethod()->isSync()) { + if (requiresSync) { sendAcceptAndCompletion(); } else if (callSendCompletion) { sendCompletion(); @@ -435,70 +441,83 @@ void SessionState::addPendingExecutionSync() } +/** factory for creating IncompleteIngressMsgXfer objects which + * can be references from Messages as ingress AsyncCompletion objects. + */ +boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> +SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg) +{ + SequenceNumber id = msg->getCommandId(); + boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg)); + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + incompleteCmds[id] = cmd; + return cmd; +} + + /** Invoked by the asynchronous completer associated with * a received msg that is pending Completion. May be invoked * by the SessionState directly (sync == true), or some external * entity (!sync). */ -void SessionState::IncompleteRcvMsg::operator() (bool sync) +void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { - QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync); - - qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock); - std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr>::iterator i = session->incompleteRcvMsgs.find(this); - if (i != session->incompleteRcvMsgs.end()) { - boost::shared_ptr<IncompleteRcvMsg> tmp(i->second); - session->incompleteRcvMsgs.erase(i); - - if (session->isAttached()) { - if (sync) { - qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteRcvMsgsLock); - QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId()); - session->completeRcvMsg(msg); + qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); + if (!sync) { + // note well: this path may execute in any thread. + QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); + session->scheduledCmds->push_back(id); + if (session->scheduledCmds->size() == 1) { + session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, + session->scheduledCmds, + session)); + } + } else { // command is being completed in IO thread. + // this path runs only on the IO thread. + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + cmd = session->incompleteCmds.find(id); + if (cmd != session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + session->incompleteCmds.erase(cmd); + + if (session->isAttached()) { + QPID_LOG(debug, ": receive completed for msg seq=" << id); + qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); + session->completeRcvMsg(id, requiresAccept, requiresSync); return; - } else { // potentially called from a different thread - QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId()); - session->scheduledRcvMsgs->push_back(tmp); - if (session->scheduledRcvMsgs->size() == 1) { - session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, - session->scheduledRcvMsgs)); - } } } } } -/** Scheduled from IncompleteRcvMsg callback, completes all pending message - * receives asynchronously. +/** Scheduled from incomplete command's completed callback, safely completes all + * completed commands in the IO Thread. Guaranteed not to be running at the same + * time as the message receive code. */ -void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<deque> msgs) +void SessionState::scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > completedCmds, + SessionState *session) { - while (!msgs->empty()) { - boost::shared_ptr<IncompleteRcvMsg> iMsg = msgs->front(); - msgs->pop_front(); - QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId()); - if (iMsg->session && iMsg->session->isAttached()) { - QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId()); - iMsg->session->completeRcvMsg(iMsg->msg); + // when session is destroyed, it clears the list below. If the list is empty, + // the passed session pointer is not valid - do nothing. + if (completedCmds->empty()) return; + + qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); + std::list<SequenceNumber> cmds(*completedCmds); // make copy so we can drop lock + completedCmds->clear(); + + while (!cmds.empty()) { + SequenceNumber id = cmds.front(); + cmds.pop_front(); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + + cmd = session->incompleteCmds.find(id); + if (cmd != session->incompleteCmds.end()) { + qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); + cmd->second->do_completion(); // retakes lock } } } -/** Cancels a pending incomplete receive message completion callback. Note - * well: will wait for the callback to finish if it is currently in progress - * on another thread. - */ -void SessionState::IncompleteRcvMsg::cancel() -{ - QPID_LOG(debug, session->getId() << ": cancelling outstanding completion for msg seq=" << msg->getCommandId()); - // Cancel the message complete callback. On return, we are guaranteed there - // will be no outstanding calls to SessionState::IncompleteRcvMsg::operator() (bool sync) - msg->getReceiveCompletion().cancel(); - // there may be calls to SessionState::IncompleteRcvMsg::scheduledCompleter() pending, - // clear the session so scheduledCompleter() will ignore this IncompleteRcvMsg. - session = 0; -} - }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index fc9fec7871..f4c10295b1 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -134,7 +134,7 @@ class SessionState : public qpid::SessionState, // indicate that the given ingress msg has been completely received by the // broker, and the msg's message.transfer command can be considered completed. - void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg); + void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync); void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); @@ -172,41 +172,65 @@ class SessionState : public qpid::SessionState, std::queue<SequenceNumber> pendingExecutionSyncs; bool currentCommandComplete; - // A list of ingress messages whose message.transfer command is pending - // completion. These messages are awaiting some set of asynchronous - // operations to complete (eg: store, flow-control, etc). before - // the message.transfer can be completed. - class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler + /** Abstract class that represents a command that is pending + * completion. + */ + class IncompleteCommandContext : public AsyncCompletion { - public: - IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg) - : session(&_session), msg(_msg) {} - virtual void operator() (bool sync); // invoked when msg is completed. - void cancel(); // cancel pending incomplete callback [operator() above]. + public: + IncompleteCommandContext( SessionState *ss, SequenceNumber _id ) + : id(_id), session(ss) {} + virtual ~IncompleteCommandContext() {} + + /* allows manual invokation of completion, used by IO thread to + * complete a command that was originally finished on a different + * thread. + */ + void do_completion() { completed(true); } + + protected: + SequenceNumber id; + SessionState *session; + }; - typedef boost::shared_ptr<IncompleteRcvMsg> shared_ptr; - typedef std::deque<shared_ptr> deque; + /** incomplete Message.transfer commands - inbound to broker from client + */ + class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext + { + public: + IncompleteIngressMsgXfer( SessionState *ss, + SequenceNumber _id, + boost::intrusive_ptr<Message> msg ) + : IncompleteCommandContext(ss, _id), + requiresAccept(msg->requiresAccept()), + requiresSync(msg->getFrames().getMethod()->isSync()) {}; + virtual ~IncompleteIngressMsgXfer() {}; + + protected: + virtual void completed(bool); + + private: + /** meta-info required to complete the message */ + bool requiresAccept; + bool requiresSync; // method's isSync() flag + }; + /** creates a command context suitable for use as an AsyncCompletion in a message */ + boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg); - private: - SessionState *session; - boost::intrusive_ptr<Message> msg; + /* A list of commands that are pending completion. These commands are + * awaiting some set of asynchronous operations to finish (eg: store, + * flow-control, etc). before the command can be completed to the client + */ + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds; + // identifies those commands in incompleteCmds that are waiting for IO thread to run in order to be completed. + boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds; + qpid::sys::Mutex incompleteCmdsLock; // locks both above containers - static void scheduledCompleter(boost::shared_ptr<deque>); - }; - std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> incompleteRcvMsgs; // msgs pending completion - qpid::sys::Mutex incompleteRcvMsgsLock; - boost::shared_ptr<IncompleteRcvMsg> createPendingMsg(boost::intrusive_ptr<Message>& msg) { - boost::shared_ptr<IncompleteRcvMsg> pending(new IncompleteRcvMsg(*this, msg)); - qpid::sys::ScopedLock<qpid::sys::Mutex> l(incompleteRcvMsgsLock); - incompleteRcvMsgs[pending.get()] = pending; - return pending; - } - - // holds msgs waiting for IO thread to run scheduledCompleter() - boost::shared_ptr<IncompleteRcvMsg::deque> scheduledRcvMsgs; + /** runs in IO thread, completes commands that where finished asynchronously. */ + static void scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds, + SessionState *session); friend class SessionManager; - friend class IncompleteRcvMsg; }; diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h index a1b140d484..baca14cf4e 100644 --- a/qpid/cpp/src/tests/MessageUtils.h +++ b/qpid/cpp/src/tests/MessageUtils.h @@ -20,6 +20,7 @@ */ #include "qpid/broker/Message.h" +#include "qpid/broker/AsyncCompletion.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/Uuid.h" @@ -28,6 +29,17 @@ using namespace qpid; using namespace broker; using namespace framing; +namespace { + class DummyCompletion : public AsyncCompletion + { + public: + DummyCompletion() {} + virtual ~DummyCompletion() {} + protected: + void completed(bool) {} + }; +} + namespace qpid { namespace tests { @@ -50,6 +62,8 @@ struct MessageUtils msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); if (durable) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2); + boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion()); + msg->setIngressCompletion(dc); return msg; } diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 9b6e423b02..4d63d9bd97 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -88,6 +88,8 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK msg->getFrames().append(method); msg->getFrames().append(header); msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion()); + msg->setIngressCompletion(dc); return msg; } diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp index ffb0125302..210abf0a5b 100644 --- a/qpid/cpp/src/tests/TxPublishTest.cpp +++ b/qpid/cpp/src/tests/TxPublishTest.cpp @@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare) BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second); BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first); BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second); - BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isReceiveComplete()); + BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete()); } QPID_AUTO_TEST_CASE(testCommit) @@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit) BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount()); intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload; - BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isReceiveComplete()); + BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete()); BOOST_CHECK_EQUAL(t.msg, msg_dequeue); BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount()); diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 98f58ebfdd..6e771bf5d6 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -29,6 +29,7 @@ from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition from logging import getLogger +import qmf.console log = getLogger("qpid.brokertest") @@ -327,6 +328,10 @@ class Broker(Popen): log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) self._log_ready = False + def startQmf(self, handler=None): + self.qmf_session = qmf.console.Session(handler) + self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) + def host(self): return self._host def port(self): diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index cbad4010b4..8ff83ade60 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -304,6 +304,180 @@ acl allow all all # Verify logs are consistent cluster_test_logs.verify_logs() + def test_queue_flowlimit(self): + """Verify that the queue's flowlimit configuration and state are + correctly replicated. + """ + return; # @todo enable once flow control works in clusters + # start a cluster of two brokers + args = ["--log-enable=info+:broker"] + cluster = self.cluster(2, args) + + # configure a queue with a specific flow limit on broker 0 + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") + cluster[0].startQmf() + for q in cluster[0].qmf_session.getObjects(_class="queue"): + if q.name == "flq": + oid = q.getObjectId() + break + self.assertEqual(q.name, "flq") + self.assertEqual(q.flowStopCount, 5) + self.assertEqual(q.flowResumeCount, 3) + self.assertFalse(q.flowStopped) + + # verify both brokers in cluster have same configuration + cluster[1].startQmf() + qs = cluster[1].qmf_session.getObjects(_objectId=oid) + self.assertEqual(len(qs), 1) + q = qs[0] + self.assertEqual(q.name, "flq") + self.assertEqual(q.flowStopCount, 5) + self.assertEqual(q.flowResumeCount, 3) + self.assertFalse(q.flowStopped) + + # fill the queue on one broker until flow control is active + class BlockedSender(Thread): + def __init__(self): Thread.__init__(self) + def run(self): + for x in range(6): + s0.send(Message(str(x))) + + sender = BlockedSender() + sender.start() + + start = time.time() + while time.time() < start + 5: + q = cluster[0].qmf_session.getObjects(_objectId=oid)[0] + if q.flowStopped: + break; + self.assertTrue(q.flowStopped) + + # verify flow control is active on other broker. + q = cluster[1].qmf_session.getObjects(_objectId=oid)[0] + self.assertTrue(q.flowStopped) + + # now drain the queue using a session to the other broker + ssn1 = cluster[1].connect().session() + r1 = ssn1.receiver("flq", capacity=6) + try: + while r1.fetch(timeout=0): + ssn1.acknowledge() + except Empty: + pass + sender.join() + + # and verify both brokers see an unblocked queue + q = cluster[0].qmf_session.getObjects(_objectId=oid)[0] + self.assertFalse(q.flowStopped) + q = cluster[1].qmf_session.getObjects(_objectId=oid)[0] + self.assertFalse(q.flowStopped) + + ssn0.connection.close() + ssn1.connection.close() + cluster_test_logs.verify_logs() + + + def test_queue_flowlimit_join(self): + """Verify that the queue's flowlimit configuration and state are + correctly replicated to a newly joined broker. + """ + return; # @todo enable once flow control works in clusters + # start a cluster of two brokers + #args = ["--log-enable=info+:broker"] + args = ["--log-enable=debug"] + cluster = self.cluster(2, args) + + # configure a queue with a specific flow limit on broker 0 + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") + cluster[0].startQmf() + for q in cluster[0].qmf_session.getObjects(_class="queue"): + if q.name == "flq": + oid = q.getObjectId() + break + self.assertEqual(q.name, "flq") + self.assertEqual(q.flowStopCount, 5) + self.assertEqual(q.flowResumeCount, 3) + self.assertFalse(q.flowStopped) + + # verify both brokers in cluster have same configuration + cluster[1].startQmf() + qs = cluster[1].qmf_session.getObjects(_objectId=oid) + self.assertEqual(len(qs), 1) + q = qs[0] + self.assertEqual(q.name, "flq") + self.assertEqual(q.flowStopCount, 5) + self.assertEqual(q.flowResumeCount, 3) + self.assertFalse(q.flowStopped) + + # fill the queue on one broker until flow control is active + class BlockedSender(Thread): + def __init__(self): Thread.__init__(self) + def run(self): + for x in range(6): + s0.send(Message(str(x))) + + sender = BlockedSender() + sender.start() + + start = time.time() + while time.time() < start + 5: + q = cluster[0].qmf_session.getObjects(_objectId=oid)[0] + if q.flowStopped: + break; + self.assertTrue(q.flowStopped) + + # verify flow control is active on other broker. + q = cluster[1].qmf_session.getObjects(_objectId=oid)[0] + self.assertTrue(q.flowStopped) + + # add a new broker to the cluster + print("Start") + cluster.start() + print("Start Done") + + # todo: enable verification: + # cluster[2].startQmf() + # qs = cluster[2].qmf_session.getObjects(_objectId=oid) + # self.assertEqual(len(qs), 1) + # q = qs[0] + # self.assertEqual(q.name, "flq") + # self.assertEqual(q.flowStopCount, 5) + # self.assertEqual(q.flowResumeCount, 3) + # self.assertEqual(q.msgDepth, 5) + # self.assertFalse(q.flowStopped) + # q = cluster[2].qmf_session.getObjects(_objectId=oid)[0] + # self.assertTrue(q.flowStopped) + + # verify new member's queue config + # verify new member's queue flow setting + + + + + # now drain the queue using a session to the other broker + ssn1 = cluster[1].connect().session() + r1 = ssn1.receiver("flq", capacity=6) + try: + while r1.fetch(timeout=1): + ssn1.acknowledge() + except Empty: + pass + sender.join() + + # and verify both brokers see an unblocked queue + q = cluster[0].qmf_session.getObjects(_objectId=oid)[0] + self.assertFalse(q.flowStopped) + q = cluster[1].qmf_session.getObjects(_objectId=oid)[0] + self.assertFalse(q.flowStopped) + + ssn0.connection.close() + ssn1.connection.close() + cluster_test_logs.verify_logs() + + + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): |