diff options
author | Gordon Sim <gsim@apache.org> | 2013-10-03 13:31:12 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-10-03 13:31:12 +0000 |
commit | 515fe2b92acbbea5a269ff761561698eb0fdc076 (patch) | |
tree | d64e84ae8b4dbbf7937d9836047ab4c53009a70b /cpp/src | |
parent | a121612d15b9ac15bf0dc2ca612b94e9359e1b20 (diff) | |
download | qpid-python-515fe2b92acbbea5a269ff761561698eb0fdc076.tar.gz |
QPID-5199: take 0-10 header segment into account for message size
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1528852 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueFlowLimit.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ThresholdAlerts.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Message.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Message.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/MessageTest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/QueueFlowLimitTest.cpp | 170 | ||||
-rw-r--r-- | cpp/src/tests/QueuePolicyTest.cpp | 13 |
13 files changed, 125 insertions, 112 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 0a25d57cd8..efd83a3225 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -135,7 +135,7 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) if (mgmtExchange != 0) { qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics(); - uint64_t contentSize = msg.getMessage().getContentSize(); + uint64_t contentSize = msg.getMessage().getMessageSize(); eStats->msgReceives += 1; eStats->byteReceives += contentSize; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e71125b4ab..deca238f22 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -71,9 +71,9 @@ bool Message::isPersistent() const return getEncoding().isPersistent(); } -uint64_t Message::getContentSize() const +uint64_t Message::getMessageSize() const { - return getEncoding().getContentSize(); + return getEncoding().getMessageSize(); } boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 84f62a771d..823207f10c 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -68,7 +68,7 @@ public: virtual std::string getRoutingKey() const = 0; virtual bool isPersistent() const = 0; virtual uint8_t getPriority() const = 0; - virtual uint64_t getContentSize() const = 0; + virtual uint64_t getMessageSize() const = 0; virtual qpid::amqp::MessageId getMessageId() const = 0; virtual qpid::amqp::MessageId getCorrelationId() const = 0; virtual std::string getPropertyAsString(const std::string& key) const = 0; @@ -119,7 +119,7 @@ public: QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const; void processProperties(qpid::amqp::MapHandler&) const; - QPID_BROKER_EXTERN uint64_t getContentSize() const; + QPID_BROKER_EXTERN uint64_t getMessageSize() const; QPID_BROKER_EXTERN Encoding& getEncoding(); QPID_BROKER_EXTERN const Encoding& getEncoding() const; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 25b4e23968..29e7c06e90 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -88,7 +88,7 @@ inline void mgntEnqStats(const Message& msg, _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); - uint64_t contentSize = msg.getContentSize(); + uint64_t contentSize = msg.getMessageSize(); qStats->msgTotalEnqueues +=1; bStats->msgTotalEnqueues += 1; qStats->byteTotalEnqueues += contentSize; @@ -111,7 +111,7 @@ inline void mgntDeqStats(const Message& msg, if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); - uint64_t contentSize = msg.getContentSize(); + uint64_t contentSize = msg.getMessageSize(); qStats->msgTotalDequeues += 1; bStats->msgTotalDequeues += 1; @@ -202,7 +202,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, redirectSource(false) { current.setCount(0);//always track depth in messages - if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it + if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it if (settings.traceExcludes.size()) { split(traceExclude, settings.traceExcludes, ", "); } @@ -305,7 +305,7 @@ void Queue::deliverTo(Message msg, TxBuffer* txn) void Queue::recoverPrepared(const Message& msg) { Mutex::ScopedLock locker(messageLock); - current += QueueDepth(1, msg.getContentSize()); + current += QueueDepth(1, msg.getMessageSize()); } void Queue::recover(Message& msg) @@ -319,7 +319,7 @@ void Queue::process(Message& msg) push(msg); if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); - const uint64_t contentSize = msg.getContentSize(); + const uint64_t contentSize = msg.getMessageSize(); qStats->msgTxnEnqueues += 1; qStats->byteTxnEnqueues += contentSize; mgmtObject->statisticsUpdated(); @@ -861,7 +861,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg) { Mutex::ScopedLock locker(messageLock); - if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) { + if (!checkDepth(QueueDepth(1, msg.getMessageSize()), msg)) { return false; } } @@ -891,7 +891,7 @@ void Queue::enqueueAborted(const Message& msg) //Called when any transactional enqueue is aborted (including but //not limited to a recovered dtx transaction) Mutex::ScopedLock locker(messageLock); - current -= QueueDepth(1, msg.getContentSize()); + current -= QueueDepth(1, msg.getMessageSize()); } void Queue::enqueueCommited(Message& msg) @@ -919,7 +919,7 @@ void Queue::dequeueCommited(const Message& msg) observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); - mgmtObject->inc_byteTxnDequeues(msg.getContentSize()); + mgmtObject->inc_byteTxnDequeues(msg.getMessageSize()); } } @@ -962,7 +962,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor) Mutex::ScopedLock locker(messageLock); Message* msg = messages->find(cursor); if (msg) { - const uint64_t contentSize = msg->getContentSize(); + const uint64_t contentSize = msg->getMessageSize(); observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); @@ -986,7 +986,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor) */ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete) { - current -= QueueDepth(1, msg.getContentSize()); + current -= QueueDepth(1, msg.getMessageSize()); mgntDeqStats(msg, mgmtObject, brokerMgmtObject); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index 9b2e31c925..607dc09fe8 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -112,7 +112,7 @@ void QueueFlowLimit::enqueued(const Message& msg) sys::Mutex::ScopedLock l(indexLock); ++count; - size += msg.getContentSize(); + size += msg.getMessageSize(); if (!flowStopped) { if (flowStopCount && count > flowStopCount) { @@ -150,7 +150,7 @@ void QueueFlowLimit::dequeued(const Message& msg) throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName)); } - uint64_t _size = msg.getContentSize(); + uint64_t _size = msg.getMessageSize(); if (_size <= size) { size -= _size; } else { diff --git a/cpp/src/qpid/broker/ThresholdAlerts.cpp b/cpp/src/qpid/broker/ThresholdAlerts.cpp index 345b9d89d5..afb9d9ff4e 100644 --- a/cpp/src/qpid/broker/ThresholdAlerts.cpp +++ b/cpp/src/qpid/broker/ThresholdAlerts.cpp @@ -44,7 +44,7 @@ ThresholdAlerts::ThresholdAlerts(const std::string& n, void ThresholdAlerts::enqueued(const Message& m) { - size += m.getContentSize(); + size += m.getMessageSize(); ++count; if (sizeGoingUp && sizeThreshold && size >= sizeThreshold) { @@ -64,7 +64,7 @@ void ThresholdAlerts::enqueued(const Message& m) void ThresholdAlerts::dequeued(const Message& m) { - size -= m.getContentSize(); + size -= m.getMessageSize(); --count; if (!sizeGoingUp && sizeThreshold && size <= sizeThresholdDown) { diff --git a/cpp/src/qpid/broker/amqp/Message.cpp b/cpp/src/qpid/broker/amqp/Message.cpp index cf134a894d..7015db324b 100644 --- a/cpp/src/qpid/broker/amqp/Message.cpp +++ b/cpp/src/qpid/broker/amqp/Message.cpp @@ -202,11 +202,7 @@ std::string Message::getAnnotationAsString(const std::string& key) const } -//getContentSize() is primarily used in stats about the number of -//bytes enqueued/dequeued etc, not sure whether this is the right name -//and whether it should indeed only be the content that is thus -//measured -uint64_t Message::getContentSize() const { return data.size(); } +uint64_t Message::getMessageSize() const { return data.size(); } //getContent() is used primarily for decoding qmf messages in //management and ha, but also by the xml exchange std::string Message::getContent() const diff --git a/cpp/src/qpid/broker/amqp/Message.h b/cpp/src/qpid/broker/amqp/Message.h index 3a7c4529de..5bf49ef556 100644 --- a/cpp/src/qpid/broker/amqp/Message.h +++ b/cpp/src/qpid/broker/amqp/Message.h @@ -44,7 +44,7 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess std::string getRoutingKey() const; bool isPersistent() const; uint8_t getPriority() const; - uint64_t getContentSize() const; + uint64_t getMessageSize() const; std::string getPropertyAsString(const std::string& key) const; std::string getAnnotationAsString(const std::string& key) const; bool getTtl(uint64_t&) const; diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index e3a967796b..0c8434f381 100644 --- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -52,6 +52,11 @@ uint64_t MessageTransfer::getContentSize() const return frames.getContentSize(); } +uint64_t MessageTransfer::getMessageSize() const +{ + return getRequiredCredit(); +} + std::string MessageTransfer::getAnnotationAsString(const std::string& key) const { const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>(); diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index d32c0d07fd..422446db51 100644 --- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -45,6 +45,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro bool isPersistent() const; uint8_t getPriority() const; uint64_t getContentSize() const; + uint64_t getMessageSize() const; qpid::amqp::MessageId getMessageId() const; qpid::amqp::MessageId getCorrelationId() const; std::string getPropertyAsString(const std::string& key) const; diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp index 666a2c6297..28f8b0c2b8 100644 --- a/cpp/src/tests/MessageTest.cpp +++ b/cpp/src/tests/MessageTest.cpp @@ -65,7 +65,7 @@ QPID_AUTO_TEST_CASE(testEncodeDecode) msg = registry.decode(buffer); BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey()); - BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize()); + BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContent().size()); BOOST_CHECK_EQUAL(data, msg.getContent()); //BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId()); BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc")); diff --git a/cpp/src/tests/QueueFlowLimitTest.cpp b/cpp/src/tests/QueueFlowLimitTest.cpp index d305ca452b..7b0a776062 100644 --- a/cpp/src/tests/QueueFlowLimitTest.cpp +++ b/cpp/src/tests/QueueFlowLimitTest.cpp @@ -77,8 +77,14 @@ public: Message createMessage(uint32_t size) { static uint32_t seqNum; - Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x')); - msg.setSequence(++seqNum); + //Need to compute what data size is required to make a given + //overall size (use one byte of content in test message to ensure + //content frame is added) + Message test = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string("x")); + size_t min = test.getMessageSize() - 1; + if (min > size) throw qpid::Exception("Can't create message that small!"); + Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size - min, 'x')); + msg.setSequence(++seqNum);//this doesn't affect message size return msg; } } @@ -100,18 +106,18 @@ QPID_AUTO_TEST_CASE(testFlowCount) std::deque<Message> msgs; for (size_t i = 0; i < 6; i++) { - msgs.push_back(createMessage(10)); + msgs.push_back(createMessage(100)); flow->enqueued(msgs.back()); BOOST_CHECK(!flow->isFlowControlActive()); } BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue - msgs.push_back(createMessage(10)); + msgs.push_back(createMessage(100)); flow->enqueued(msgs.back()); BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue - msgs.push_back(createMessage(10)); + msgs.push_back(createMessage(100)); flow->enqueued(msgs.back()); BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON - msgs.push_back(createMessage(10)); + msgs.push_back(createMessage(100)); flow->enqueued(msgs.back()); BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue, no change to flow control @@ -136,69 +142,69 @@ QPID_AUTO_TEST_CASE(testFlowCount) QPID_AUTO_TEST_CASE(testFlowSize) { FieldTable args; - args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70); - args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50); + args.setUInt64(QueueFlowLimit::flowStopSizeKey, 700); + args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 460); std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args)); BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount()); - BOOST_CHECK_EQUAL((uint32_t) 70, flow->getFlowStopSize()); - BOOST_CHECK_EQUAL((uint32_t) 50, flow->getFlowResumeSize()); + BOOST_CHECK_EQUAL((uint32_t) 700, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 460, flow->getFlowResumeSize()); BOOST_CHECK(!flow->isFlowControlActive()); BOOST_CHECK(flow->monitorFlowControl()); std::deque<Message> msgs; for (size_t i = 0; i < 6; i++) { - msgs.push_back(createMessage(10)); + msgs.push_back(createMessage(100)); flow->enqueued(msgs.back()); BOOST_CHECK(!flow->isFlowControlActive()); } - BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue + BOOST_CHECK(!flow->isFlowControlActive()); // 600 on queue BOOST_CHECK_EQUAL(6u, flow->getFlowCount()); - BOOST_CHECK_EQUAL(60u, flow->getFlowSize()); + BOOST_CHECK_EQUAL(600u, flow->getFlowSize()); - Message msg_9 = createMessage(9); - flow->enqueued(msg_9); - BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue - Message tinyMsg_1 = createMessage(1); + Message msg_50 = createMessage(50); + flow->enqueued(msg_50); + BOOST_CHECK(!flow->isFlowControlActive()); // 650 on queue + Message tinyMsg_1 = createMessage(40); flow->enqueued(tinyMsg_1); - BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue + BOOST_CHECK(!flow->isFlowControlActive()); // 690 on queue - Message tinyMsg_2 = createMessage(1); + Message tinyMsg_2 = createMessage(40); flow->enqueued(tinyMsg_2); - BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON - msgs.push_back(createMessage(10)); + BOOST_CHECK(flow->isFlowControlActive()); // 730 on queue, ON + msgs.push_back(createMessage(100)); flow->enqueued(msgs.back()); - BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue + BOOST_CHECK(flow->isFlowControlActive()); // 830 on queue BOOST_CHECK_EQUAL(10u, flow->getFlowCount()); - BOOST_CHECK_EQUAL(81u, flow->getFlowSize()); + BOOST_CHECK_EQUAL(830u, flow->getFlowSize()); flow->dequeued(msgs.front()); msgs.pop_front(); - BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue + BOOST_CHECK(flow->isFlowControlActive()); // 730 on queue flow->dequeued(msgs.front()); msgs.pop_front(); - BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue + BOOST_CHECK(flow->isFlowControlActive()); // 630 on queue flow->dequeued(msgs.front()); msgs.pop_front(); - BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue + BOOST_CHECK(flow->isFlowControlActive()); // 530 on queue flow->dequeued(tinyMsg_1); - BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue + BOOST_CHECK(flow->isFlowControlActive()); // 490 on queue flow->dequeued(tinyMsg_2); - BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF + BOOST_CHECK(!flow->isFlowControlActive()); // 450 on queue, OFF - flow->dequeued(msg_9); - BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue + flow->dequeued(msg_50); + BOOST_CHECK(!flow->isFlowControlActive()); // 400 on queue flow->dequeued(msgs.front()); msgs.pop_front(); - BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue + BOOST_CHECK(!flow->isFlowControlActive()); // 300 on queue flow->dequeued(msgs.front()); msgs.pop_front(); - BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue + BOOST_CHECK(!flow->isFlowControlActive()); // 200 on queue BOOST_CHECK_EQUAL(2u, flow->getFlowCount()); - BOOST_CHECK_EQUAL(20u, flow->getFlowSize()); + BOOST_CHECK_EQUAL(200u, flow->getFlowSize()); } QPID_AUTO_TEST_CASE(testFlowArgs) @@ -227,13 +233,13 @@ QPID_AUTO_TEST_CASE(testFlowCombo) FieldTable args; args.setInt(QueueFlowLimit::flowStopCountKey, 10); args.setInt(QueueFlowLimit::flowResumeCountKey, 5); - args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200); - args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100); + args.setUInt64(QueueFlowLimit::flowStopSizeKey, 2000); + args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 1000); - std::deque<Message> msgs_1; - std::deque<Message> msgs_10; std::deque<Message> msgs_50; std::deque<Message> msgs_100; + std::deque<Message> msgs_500; + std::deque<Message> msgs_1000; Message msg; @@ -243,104 +249,104 @@ QPID_AUTO_TEST_CASE(testFlowCombo) // verify flow control comes ON when only count passes its stop point. for (size_t i = 0; i < 10; i++) { - msgs_10.push_back(createMessage(10)); - flow->enqueued(msgs_10.back()); + msgs_100.push_back(createMessage(100)); + flow->enqueued(msgs_100.back()); BOOST_CHECK(!flow->isFlowControlActive()); } - // count:10 size:100 + // count:10 size:1000 - msgs_1.push_back(createMessage(1)); - flow->enqueued(msgs_1.back()); // count:11 size: 101 ->ON + msgs_50.push_back(createMessage(50)); + flow->enqueued(msgs_50.back()); // count:11 size: 1050 ->ON BOOST_CHECK(flow->isFlowControlActive()); for (size_t i = 0; i < 6; i++) { - flow->dequeued(msgs_10.front()); - msgs_10.pop_front(); + flow->dequeued(msgs_100.front()); + msgs_100.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); } - // count:5 size: 41 + // count:5 size: 450 - flow->dequeued(msgs_1.front()); // count: 4 size: 40 ->OFF - msgs_1.pop_front(); + flow->dequeued(msgs_50.front()); // count: 4 size: 400 ->OFF + msgs_50.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); for (size_t i = 0; i < 4; i++) { - flow->dequeued(msgs_10.front()); - msgs_10.pop_front(); + flow->dequeued(msgs_100.front()); + msgs_100.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); } // count:0 size:0 // verify flow control comes ON when only size passes its stop point. - msgs_100.push_back(createMessage(100)); - flow->enqueued(msgs_100.back()); // count:1 size: 100 + msgs_1000.push_back(createMessage(1000)); + flow->enqueued(msgs_1000.back()); // count:1 size: 1000 BOOST_CHECK(!flow->isFlowControlActive()); - msgs_50.push_back(createMessage(50)); - flow->enqueued(msgs_50.back()); // count:2 size: 150 + msgs_500.push_back(createMessage(500)); + flow->enqueued(msgs_500.back()); // count:2 size: 1500 BOOST_CHECK(!flow->isFlowControlActive()); - msgs_50.push_back(createMessage(50)); - flow->enqueued(msgs_50.back()); // count:3 size: 200 + msgs_500.push_back(createMessage(500)); + flow->enqueued(msgs_500.back()); // count:3 size: 2000 BOOST_CHECK(!flow->isFlowControlActive()); - msgs_1.push_back(createMessage(1)); - flow->enqueued(msgs_1.back()); // count:4 size: 201 ->ON + msgs_50.push_back(createMessage(50)); + flow->enqueued(msgs_50.back()); // count:4 size: 2050 ->ON BOOST_CHECK(flow->isFlowControlActive()); - flow->dequeued(msgs_100.front()); // count:3 size:101 - msgs_100.pop_front(); + flow->dequeued(msgs_1000.front()); // count:3 size:1050 + msgs_1000.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); - flow->dequeued(msgs_1.front()); // count:2 size:100 - msgs_1.pop_front(); + flow->dequeued(msgs_50.front()); // count:2 size:1000 + msgs_50.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); - flow->dequeued(msgs_50.front()); // count:1 size:50 ->OFF - msgs_50.pop_front(); + flow->dequeued(msgs_500.front()); // count:1 size:500 ->OFF + msgs_500.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); // verify flow control remains ON until both thresholds drop below their // resume point. for (size_t i = 0; i < 8; i++) { - msgs_10.push_back(createMessage(10)); - flow->enqueued(msgs_10.back()); + msgs_100.push_back(createMessage(100)); + flow->enqueued(msgs_100.back()); BOOST_CHECK(!flow->isFlowControlActive()); } - // count:9 size:130 + // count:9 size:1300 - msgs_10.push_back(createMessage(10)); - flow->enqueued(msgs_10.back()); // count:10 size: 140 + msgs_100.push_back(createMessage(100)); + flow->enqueued(msgs_100.back()); // count:10 size: 1400 BOOST_CHECK(!flow->isFlowControlActive()); - msgs_1.push_back(createMessage(1)); - flow->enqueued(msgs_1.back()); // count:11 size: 141 ->ON + msgs_50.push_back(createMessage(50)); + flow->enqueued(msgs_50.back()); // count:11 size: 1450 ->ON BOOST_CHECK(flow->isFlowControlActive()); - msgs_100.push_back(createMessage(100)); - flow->enqueued(msgs_100.back()); // count:12 size: 241 (both thresholds crossed) + msgs_1000.push_back(createMessage(1000)); + flow->enqueued(msgs_1000.back()); // count:12 size: 2450 (both thresholds crossed) BOOST_CHECK(flow->isFlowControlActive()); - // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241 + // at this point: 9@100 + 1@500 + 1@1000 + 1@50 == 12@2450 - flow->dequeued(msgs_50.front()); // count:11 size:191 - msgs_50.pop_front(); + flow->dequeued(msgs_500.front()); // count:11 size:1950 + msgs_500.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); for (size_t i = 0; i < 9; i++) { - flow->dequeued(msgs_10.front()); - msgs_10.pop_front(); + flow->dequeued(msgs_100.front()); + msgs_100.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); } - // count:2 size:101 - flow->dequeued(msgs_1.front()); // count:1 size:100 - msgs_1.pop_front(); + // count:2 size:1050 + flow->dequeued(msgs_50.front()); // count:1 size:1000 + msgs_50.pop_front(); BOOST_CHECK(flow->isFlowControlActive()); // still active due to size - flow->dequeued(msgs_100.front()); // count:0 size:0 ->OFF - msgs_100.pop_front(); + flow->dequeued(msgs_1000.front()); // count:0 size:0 ->OFF + msgs_1000.pop_front(); BOOST_CHECK(!flow->isFlowControlActive()); } diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp index ce1b0addea..f61c283fd4 100644 --- a/cpp/src/tests/QueuePolicyTest.cpp +++ b/cpp/src/tests/QueuePolicyTest.cpp @@ -69,9 +69,15 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount) QPID_AUTO_TEST_CASE(testRingPolicySize) { - std::string hundredBytes = std::string(100, 'h'); - std::string fourHundredBytes = std::string (400, 'f'); - std::string thousandBytes = std::string(1000, 't'); + //The message size now includes all headers as well as the content + //aka body, so compute the amount of data needed to hit a given + //overall size + std::string q("my-ring-queue"); + size_t minMessageSize = 25/*minimum size of headers*/ + q.size()/*routing key length*/ + 4/*default exchange, added by broker*/; + + std::string hundredBytes = std::string(100 - minMessageSize, 'h'); + std::string fourHundredBytes = std::string (400 - minMessageSize, 'f'); + std::string thousandBytes = std::string(1000 - minMessageSize, 't'); // Ring queue, 500 bytes maxSize @@ -79,7 +85,6 @@ QPID_AUTO_TEST_CASE(testRingPolicySize) args.setSizePolicy(RING, 500, 0); SessionFixture f; - std::string q("my-ring-queue"); f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); // A. Send messages 0 .. 5, each 100 bytes |