summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-17 19:54:34 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-17 19:54:34 +0000
commit3b096e5e16d683590e6a46799d1c220a7fbd1b04 (patch)
treec6bf6938af0a4e1f2987f355586ee2b69340a28c
parent16a49ba6ef283a5093780d28efaaa8483fc9010d (diff)
downloadqpid-python-3b096e5e16d683590e6a46799d1c220a7fbd1b04.tar.gz
QPID-2935: refactor to use the new Queue Observer interface.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1071764 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp40
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h11
-rw-r--r--qpid/cpp/src/tests/QueueFlowLimitTest.cpp82
5 files changed, 98 insertions, 49 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index cfb32749a0..235e30626d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -708,9 +708,6 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
- /** todo KAG make flowLimit an observer */
- if (flowLimit.get())
- flowLimit->dequeued(msg);
mgntDeqStats(msg.payload);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -814,20 +811,18 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
- flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
-
autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
if (autoDeleteTimeout)
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
- if (flowLimit.get())
- flowLimit->setManagementObject( mgmtObject );
}
if ( isDurable() && ! getPersistenceId() && ! recovering )
store->create(*this, _settings);
+
+ QueueFlowLimit::observe(*this, _settings);
}
void Queue::destroy()
@@ -1135,9 +1130,6 @@ void Queue::enqueued(const QueuedMessage& m)
if (policy.get()) {
policy->enqueued(m);
}
- /** todo make flowlimit an observer */
- if (flowLimit.get())
- flowLimit->enqueued(m);
mgntEnqStats(m.payload);
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index e8429128f7..3f2bf6fa9c 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -60,7 +60,6 @@ class QueueEvents;
class QueueRegistry;
class TransactionContext;
class Exchange;
-class QueueFlowLimit;
/**
* The brokers representation of an amqp queue. Messages are
@@ -114,7 +113,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
- std::auto_ptr<QueueFlowLimit> flowLimit;
bool policyExceeded;
QueueBindings bindings;
std::string alternateExchangeName;
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index e7ad74b8ab..10abdcecba 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -107,6 +107,10 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
maxCount = _queue->getPolicy()->getMaxCount();
}
broker = queue->getBroker();
+ queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject());
+ if (queueMgmtObj) {
+ queueMgmtObj->set_flowStopped(isFlowControlActive());
+ }
}
validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName );
validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName );
@@ -234,15 +238,6 @@ void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked)
}
-void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject)
-{
- queueMgmtObj = mgmtObject;
- if (queueMgmtObj) {
- queueMgmtObj->set_flowStopped(isFlowControlActive());
- }
-}
-
-
void QueueFlowLimit::encode(Buffer& buffer) const
{
buffer.putLong(flowStopCount);
@@ -302,13 +297,23 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint
}
-std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings)
+void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings)
+{
+ QueueFlowLimit *ptr = createLimit( &queue, settings );
+ if (ptr) {
+ boost::shared_ptr<QueueFlowLimit> observer(ptr);
+ queue.addObserver(observer);
+ }
+}
+
+/** returns ptr to a QueueFlowLimit, else 0 if no limit */
+QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings)
{
std::string type(QueuePolicy::getType(settings));
if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) {
// The size of a RING queue is limited by design - no need for flow control.
- return std::auto_ptr<QueueFlowLimit>();
+ return 0;
}
if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) {
@@ -317,17 +322,16 @@ std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue,
uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
- return std::auto_ptr<QueueFlowLimit>();
+ return 0;
}
/** todo KAG - remove once cluster support for flow control done. */
// TODO aconway 2011-02-16: is queue==0 only in tests?
if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
<< queue->getName());
- return std::auto_ptr<QueueFlowLimit>();
+ return 0;
}
- return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
- flowStopSize, flowResumeSize));
+ return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
if (defaultFlowStopRatio) {
@@ -339,12 +343,12 @@ std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue,
if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
<< queue->getName());
- return std::auto_ptr<QueueFlowLimit>();
+ return 0;
}
- return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize));
+ return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
}
- return std::auto_ptr<QueueFlowLimit>();
+ return 0;
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 3686b1ff56..4d33007f0d 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -27,6 +27,7 @@
#include <memory>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
@@ -52,7 +53,7 @@ class Broker;
* passing _either_ level may turn flow control ON, but _both_ must be
* below level before flow control will be turned OFF.
*/
-class QueueFlowLimit
+ class QueueFlowLimit : public QueueObserver
{
static uint64_t defaultMaxSize;
static uint defaultFlowStopRatio;
@@ -93,16 +94,17 @@ class QueueFlowLimit
uint32_t getFlowResumeCount() const { return flowResumeCount; }
uint64_t getFlowStopSize() const { return flowStopSize; }
uint64_t getFlowResumeSize() const { return flowResumeSize; }
+
+ uint32_t getFlowCount() const { return count; }
+ uint64_t getFlowSize() const { return size; }
bool isFlowControlActive() const { return flowStopped; }
bool monitorFlowControl() const { return flowStopCount || flowStopSize; }
- void setManagementObject(_qmfBroker::Queue *q);
-
void encode(framing::Buffer& buffer) const;
void decode(framing::Buffer& buffer);
uint32_t encodedSize() const;
- static QPID_BROKER_EXTERN std::auto_ptr<QueueFlowLimit> createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings);
+ static QPID_BROKER_EXTERN void observe(Queue& queue, const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
@@ -119,6 +121,7 @@ class QueueFlowLimit
QueueFlowLimit(Queue *queue,
uint32_t flowStopCount, uint32_t flowResumeCount,
uint64_t flowStopSize, uint64_t flowResumeSize);
+ static QueueFlowLimit *createLimit(Queue *queue, const qpid::framing::FieldTable& settings);
};
}}
diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
index 3b3bf777d4..700267336b 100644
--- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
+++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
@@ -39,6 +39,40 @@ namespace tests {
QPID_AUTO_TEST_SUITE(QueueFlowLimitTestSuite)
namespace {
+
+class TestFlow : public QueueFlowLimit
+{
+public:
+ TestFlow(uint32_t flowStopCount, uint32_t flowResumeCount,
+ uint64_t flowStopSize, uint64_t flowResumeSize) :
+ QueueFlowLimit(0, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)
+ {}
+ virtual ~TestFlow() {}
+
+ static TestFlow *createTestFlow(const qpid::framing::FieldTable& settings)
+ {
+ FieldTable::ValuePtr v;
+
+ v = settings.get(flowStopCountKey);
+ uint32_t flowStopCount = (v) ? (uint32_t)v->get<int64_t>() : 0;
+ v = settings.get(flowResumeCountKey);
+ uint32_t flowResumeCount = (v) ? (uint32_t)v->get<int64_t>() : 0;
+ v = settings.get(flowStopSizeKey);
+ uint64_t flowStopSize = (v) ? (uint64_t)v->get<int64_t>() : 0;
+ v = settings.get(flowResumeSizeKey);
+ uint64_t flowResumeSize = (v) ? (uint64_t)v->get<int64_t>() : 0;
+
+ return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
+ }
+
+ static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings)
+ {
+ return QueueFlowLimit::createLimit(0, settings);
+ }
+};
+
+
+
QueuedMessage createMessage(uint32_t size)
{
QueuedMessage msg;
@@ -54,7 +88,7 @@ QPID_AUTO_TEST_CASE(testFlowCount)
args.setInt(QueueFlowLimit::flowStopCountKey, 7);
args.setInt(QueueFlowLimit::flowResumeCountKey, 5);
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+ std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK_EQUAL((uint32_t) 7, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 5, flow->getFlowResumeCount());
@@ -105,7 +139,7 @@ QPID_AUTO_TEST_CASE(testFlowSize)
args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70);
args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50);
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+ std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
@@ -121,6 +155,9 @@ QPID_AUTO_TEST_CASE(testFlowSize)
BOOST_CHECK(!flow->isFlowControlActive());
}
BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue
+ BOOST_CHECK_EQUAL(6, flow->getFlowCount());
+ BOOST_CHECK_EQUAL(60, flow->getFlowSize());
+
QueuedMessage msg_9 = createMessage(9);
flow->enqueued(msg_9);
BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue
@@ -134,6 +171,8 @@ QPID_AUTO_TEST_CASE(testFlowSize)
msgs.push_back(createMessage(10));
flow->enqueued(msgs.back());
BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue
+ BOOST_CHECK_EQUAL(10, flow->getFlowCount());
+ BOOST_CHECK_EQUAL(81, flow->getFlowSize());
flow->dequeued(msgs.front());
msgs.pop_front();
@@ -158,6 +197,8 @@ QPID_AUTO_TEST_CASE(testFlowSize)
flow->dequeued(msgs.front());
msgs.pop_front();
BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue
+ BOOST_CHECK_EQUAL(2, flow->getFlowCount());
+ BOOST_CHECK_EQUAL(20, flow->getFlowSize());
}
QPID_AUTO_TEST_CASE(testFlowArgs)
@@ -170,7 +211,7 @@ QPID_AUTO_TEST_CASE(testFlowArgs)
args.setUInt64(QueueFlowLimit::flowStopSizeKey, stop);
args.setUInt64(QueueFlowLimit::flowResumeSizeKey, resume);
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+ std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK_EQUAL((uint32_t) 30, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 21, flow->getFlowResumeCount());
@@ -196,7 +237,7 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
QueuedMessage msg;
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+ std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0
// verify flow control comes ON when only count passes its stop point.
@@ -310,8 +351,10 @@ QPID_AUTO_TEST_CASE(testFlowDefaultArgs)
80, // 80% stop threshold
70); // 70% resume threshold
FieldTable args;
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+ QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(ptr);
+ std::auto_ptr<QueueFlowLimit> flow(ptr);
BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize());
BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize());
BOOST_CHECK_EQUAL( 0u, flow->getFlowStopCount());
@@ -330,7 +373,10 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs)
FieldTable args;
args.setInt(QueueFlowLimit::flowStopCountKey, 35000);
args.setInt(QueueFlowLimit::flowResumeCountKey, 30000);
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(ptr);
+ std::auto_ptr<QueueFlowLimit> flow(ptr);
BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
@@ -343,7 +389,10 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs)
FieldTable args;
args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(ptr);
+ std::auto_ptr<QueueFlowLimit> flow(ptr);
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
@@ -358,7 +407,10 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs)
args.setInt(QueueFlowLimit::flowResumeCountKey, 30000);
args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+ QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(ptr);
+ std::auto_ptr<QueueFlowLimit> flow(ptr);
BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
@@ -376,7 +428,9 @@ QPID_AUTO_TEST_CASE(testFlowOverrideDefaults)
97, // stop threshold
73); // resume threshold
FieldTable args;
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+ QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(ptr);
+ std::auto_ptr<QueueFlowLimit> flow(ptr);
BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize());
BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize());
@@ -390,16 +444,14 @@ QPID_AUTO_TEST_CASE(testFlowDisable)
{
FieldTable args;
args.setInt(QueueFlowLimit::flowStopCountKey, 0);
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
-
- BOOST_CHECK(!flow.get());
+ QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(!ptr);
}
{
FieldTable args;
args.setInt(QueueFlowLimit::flowStopSizeKey, 0);
- std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
-
- BOOST_CHECK(!flow.get());
+ QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(!ptr);
}
}