From 3b096e5e16d683590e6a46799d1c220a7fbd1b04 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Thu, 17 Feb 2011 19:54:34 +0000 Subject: QPID-2935: refactor to use the new Queue Observer interface. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1071764 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 12 +---- qpid/cpp/src/qpid/broker/Queue.h | 2 - qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 40 +++++++------- qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 11 ++-- qpid/cpp/src/tests/QueueFlowLimitTest.cpp | 82 +++++++++++++++++++++++------ 5 files changed, 98 insertions(+), 49 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index cfb32749a0..235e30626d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -708,9 +708,6 @@ void Queue::popAndDequeue() void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); - /** todo KAG make flowLimit an observer */ - if (flowLimit.get()) - flowLimit->dequeued(msg); mgntDeqStats(msg.payload); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -814,20 +811,18 @@ void Queue::configure(const FieldTable& _settings, bool recovering) FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo()) insertSequenceNumbers(p->get()); - flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings); - autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); if (autoDeleteTimeout) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); - if (flowLimit.get()) - flowLimit->setManagementObject( mgmtObject ); } if ( isDurable() && ! getPersistenceId() && ! recovering ) store->create(*this, _settings); + + QueueFlowLimit::observe(*this, _settings); } void Queue::destroy() @@ -1135,9 +1130,6 @@ void Queue::enqueued(const QueuedMessage& m) if (policy.get()) { policy->enqueued(m); } - /** todo make flowlimit an observer */ - if (flowLimit.get()) - flowLimit->enqueued(m); mgntEnqStats(m.payload); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index e8429128f7..3f2bf6fa9c 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -60,7 +60,6 @@ class QueueEvents; class QueueRegistry; class TransactionContext; class Exchange; -class QueueFlowLimit; /** * The brokers representation of an amqp queue. Messages are @@ -114,7 +113,6 @@ class Queue : public boost::enable_shared_from_this, mutable uint64_t persistenceId; framing::FieldTable settings; std::auto_ptr policy; - std::auto_ptr flowLimit; bool policyExceeded; QueueBindings bindings; std::string alternateExchangeName; diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index e7ad74b8ab..10abdcecba 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -107,6 +107,10 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, maxCount = _queue->getPolicy()->getMaxCount(); } broker = queue->getBroker(); + queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject()); + if (queueMgmtObj) { + queueMgmtObj->set_flowStopped(isFlowControlActive()); + } } validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName ); validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName ); @@ -234,15 +238,6 @@ void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked) } -void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject) -{ - queueMgmtObj = mgmtObject; - if (queueMgmtObj) { - queueMgmtObj->set_flowStopped(isFlowControlActive()); - } -} - - void QueueFlowLimit::encode(Buffer& buffer) const { buffer.putLong(flowStopCount); @@ -302,13 +297,23 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint } -std::auto_ptr QueueFlowLimit::createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings) +void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings) +{ + QueueFlowLimit *ptr = createLimit( &queue, settings ); + if (ptr) { + boost::shared_ptr observer(ptr); + queue.addObserver(observer); + } +} + +/** returns ptr to a QueueFlowLimit, else 0 if no limit */ +QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings) { std::string type(QueuePolicy::getType(settings)); if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) { // The size of a RING queue is limited by design - no need for flow control. - return std::auto_ptr(); + return 0; } if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) { @@ -317,17 +322,16 @@ std::auto_ptr QueueFlowLimit::createQueueFlowLimit(Queue *queue, uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control - return std::auto_ptr(); + return 0; } /** todo KAG - remove once cluster support for flow control done. */ // TODO aconway 2011-02-16: is queue==0 only in tests? if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " << queue->getName()); - return std::auto_ptr(); + return 0; } - return std::auto_ptr(new QueueFlowLimit(queue, flowStopCount, flowResumeCount, - flowStopSize, flowResumeSize)); + return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } if (defaultFlowStopRatio) { @@ -339,12 +343,12 @@ std::auto_ptr QueueFlowLimit::createQueueFlowLimit(Queue *queue, if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " << queue->getName()); - return std::auto_ptr(); + return 0; } - return std::auto_ptr(new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize)); + return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); } - return std::auto_ptr(); + return 0; } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 3686b1ff56..4d33007f0d 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -27,6 +27,7 @@ #include #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" @@ -52,7 +53,7 @@ class Broker; * passing _either_ level may turn flow control ON, but _both_ must be * below level before flow control will be turned OFF. */ -class QueueFlowLimit + class QueueFlowLimit : public QueueObserver { static uint64_t defaultMaxSize; static uint defaultFlowStopRatio; @@ -93,16 +94,17 @@ class QueueFlowLimit uint32_t getFlowResumeCount() const { return flowResumeCount; } uint64_t getFlowStopSize() const { return flowStopSize; } uint64_t getFlowResumeSize() const { return flowResumeSize; } + + uint32_t getFlowCount() const { return count; } + uint64_t getFlowSize() const { return size; } bool isFlowControlActive() const { return flowStopped; } bool monitorFlowControl() const { return flowStopCount || flowStopSize; } - void setManagementObject(_qmfBroker::Queue *q); - void encode(framing::Buffer& buffer) const; void decode(framing::Buffer& buffer); uint32_t encodedSize() const; - static QPID_BROKER_EXTERN std::auto_ptr createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings); + static QPID_BROKER_EXTERN void observe(Queue& queue, const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio); friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&); @@ -119,6 +121,7 @@ class QueueFlowLimit QueueFlowLimit(Queue *queue, uint32_t flowStopCount, uint32_t flowResumeCount, uint64_t flowStopSize, uint64_t flowResumeSize); + static QueueFlowLimit *createLimit(Queue *queue, const qpid::framing::FieldTable& settings); }; }} diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp index 3b3bf777d4..700267336b 100644 --- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp +++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp @@ -39,6 +39,40 @@ namespace tests { QPID_AUTO_TEST_SUITE(QueueFlowLimitTestSuite) namespace { + +class TestFlow : public QueueFlowLimit +{ +public: + TestFlow(uint32_t flowStopCount, uint32_t flowResumeCount, + uint64_t flowStopSize, uint64_t flowResumeSize) : + QueueFlowLimit(0, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize) + {} + virtual ~TestFlow() {} + + static TestFlow *createTestFlow(const qpid::framing::FieldTable& settings) + { + FieldTable::ValuePtr v; + + v = settings.get(flowStopCountKey); + uint32_t flowStopCount = (v) ? (uint32_t)v->get() : 0; + v = settings.get(flowResumeCountKey); + uint32_t flowResumeCount = (v) ? (uint32_t)v->get() : 0; + v = settings.get(flowStopSizeKey); + uint64_t flowStopSize = (v) ? (uint64_t)v->get() : 0; + v = settings.get(flowResumeSizeKey); + uint64_t flowResumeSize = (v) ? (uint64_t)v->get() : 0; + + return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); + } + + static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings) + { + return QueueFlowLimit::createLimit(0, settings); + } +}; + + + QueuedMessage createMessage(uint32_t size) { QueuedMessage msg; @@ -54,7 +88,7 @@ QPID_AUTO_TEST_CASE(testFlowCount) args.setInt(QueueFlowLimit::flowStopCountKey, 7); args.setInt(QueueFlowLimit::flowResumeCountKey, 5); - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + std::auto_ptr flow(TestFlow::createTestFlow(args)); BOOST_CHECK_EQUAL((uint32_t) 7, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 5, flow->getFlowResumeCount()); @@ -105,7 +139,7 @@ QPID_AUTO_TEST_CASE(testFlowSize) args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70); args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50); - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + std::auto_ptr flow(TestFlow::createTestFlow(args)); BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount()); @@ -121,6 +155,9 @@ QPID_AUTO_TEST_CASE(testFlowSize) BOOST_CHECK(!flow->isFlowControlActive()); } BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue + BOOST_CHECK_EQUAL(6, flow->getFlowCount()); + BOOST_CHECK_EQUAL(60, flow->getFlowSize()); + QueuedMessage msg_9 = createMessage(9); flow->enqueued(msg_9); BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue @@ -134,6 +171,8 @@ QPID_AUTO_TEST_CASE(testFlowSize) msgs.push_back(createMessage(10)); flow->enqueued(msgs.back()); BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue + BOOST_CHECK_EQUAL(10, flow->getFlowCount()); + BOOST_CHECK_EQUAL(81, flow->getFlowSize()); flow->dequeued(msgs.front()); msgs.pop_front(); @@ -158,6 +197,8 @@ QPID_AUTO_TEST_CASE(testFlowSize) flow->dequeued(msgs.front()); msgs.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue + BOOST_CHECK_EQUAL(2, flow->getFlowCount()); + BOOST_CHECK_EQUAL(20, flow->getFlowSize()); } QPID_AUTO_TEST_CASE(testFlowArgs) @@ -170,7 +211,7 @@ QPID_AUTO_TEST_CASE(testFlowArgs) args.setUInt64(QueueFlowLimit::flowStopSizeKey, stop); args.setUInt64(QueueFlowLimit::flowResumeSizeKey, resume); - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + std::auto_ptr flow(TestFlow::createTestFlow(args)); BOOST_CHECK_EQUAL((uint32_t) 30, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 21, flow->getFlowResumeCount()); @@ -196,7 +237,7 @@ QPID_AUTO_TEST_CASE(testFlowCombo) QueuedMessage msg; - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + std::auto_ptr flow(TestFlow::createTestFlow(args)); BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0 // verify flow control comes ON when only count passes its stop point. @@ -310,8 +351,10 @@ QPID_AUTO_TEST_CASE(testFlowDefaultArgs) 80, // 80% stop threshold 70); // 70% resume threshold FieldTable args; - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr flow(ptr); BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize()); BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize()); BOOST_CHECK_EQUAL( 0u, flow->getFlowStopCount()); @@ -330,7 +373,10 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs) FieldTable args; args.setInt(QueueFlowLimit::flowStopCountKey, 35000); args.setInt(QueueFlowLimit::flowResumeCountKey, 30000); - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr flow(ptr); BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount()); @@ -343,7 +389,10 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs) FieldTable args; args.setInt(QueueFlowLimit::flowStopSizeKey, 350000); args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000); - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr flow(ptr); BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount()); @@ -358,7 +407,10 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs) args.setInt(QueueFlowLimit::flowResumeCountKey, 30000); args.setInt(QueueFlowLimit::flowStopSizeKey, 350000); args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000); - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr flow(ptr); BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount()); @@ -376,7 +428,9 @@ QPID_AUTO_TEST_CASE(testFlowOverrideDefaults) 97, // stop threshold 73); // resume threshold FieldTable args; - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr flow(ptr); BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize()); BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize()); @@ -390,16 +444,14 @@ QPID_AUTO_TEST_CASE(testFlowDisable) { FieldTable args; args.setInt(QueueFlowLimit::flowStopCountKey, 0); - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); - - BOOST_CHECK(!flow.get()); + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(!ptr); } { FieldTable args; args.setInt(QueueFlowLimit::flowStopSizeKey, 0); - std::auto_ptr flow(QueueFlowLimit::createQueueFlowLimit(0, args)); - - BOOST_CHECK(!flow.get()); + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(!ptr); } } -- cgit v1.2.1