summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-10-03 13:31:12 +0000
committerGordon Sim <gsim@apache.org>2013-10-03 13:31:12 +0000
commit515fe2b92acbbea5a269ff761561698eb0fdc076 (patch)
treed64e84ae8b4dbbf7937d9836047ab4c53009a70b /cpp/src
parenta121612d15b9ac15bf0dc2ca612b94e9359e1b20 (diff)
downloadqpid-python-515fe2b92acbbea5a269ff761561698eb0fdc076.tar.gz
QPID-5199: take 0-10 header segment into account for message size
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1528852 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp4
-rw-r--r--cpp/src/qpid/broker/Message.h4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp20
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp4
-rw-r--r--cpp/src/qpid/broker/ThresholdAlerts.cpp4
-rw-r--r--cpp/src/qpid/broker/amqp/Message.cpp6
-rw-r--r--cpp/src/qpid/broker/amqp/Message.h2
-rw-r--r--cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp5
-rw-r--r--cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h1
-rw-r--r--cpp/src/tests/MessageTest.cpp2
-rw-r--r--cpp/src/tests/QueueFlowLimitTest.cpp170
-rw-r--r--cpp/src/tests/QueuePolicyTest.cpp13
13 files changed, 125 insertions, 112 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 0a25d57cd8..efd83a3225 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -135,7 +135,7 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
if (mgmtExchange != 0)
{
qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics();
- uint64_t contentSize = msg.getMessage().getContentSize();
+ uint64_t contentSize = msg.getMessage().getMessageSize();
eStats->msgReceives += 1;
eStats->byteReceives += contentSize;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e71125b4ab..deca238f22 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -71,9 +71,9 @@ bool Message::isPersistent() const
return getEncoding().isPersistent();
}
-uint64_t Message::getContentSize() const
+uint64_t Message::getMessageSize() const
{
- return getEncoding().getContentSize();
+ return getEncoding().getMessageSize();
}
boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 84f62a771d..823207f10c 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -68,7 +68,7 @@ public:
virtual std::string getRoutingKey() const = 0;
virtual bool isPersistent() const = 0;
virtual uint8_t getPriority() const = 0;
- virtual uint64_t getContentSize() const = 0;
+ virtual uint64_t getMessageSize() const = 0;
virtual qpid::amqp::MessageId getMessageId() const = 0;
virtual qpid::amqp::MessageId getCorrelationId() const = 0;
virtual std::string getPropertyAsString(const std::string& key) const = 0;
@@ -119,7 +119,7 @@ public:
QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
void processProperties(qpid::amqp::MapHandler&) const;
- QPID_BROKER_EXTERN uint64_t getContentSize() const;
+ QPID_BROKER_EXTERN uint64_t getMessageSize() const;
QPID_BROKER_EXTERN Encoding& getEncoding();
QPID_BROKER_EXTERN const Encoding& getEncoding() const;
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 25b4e23968..29e7c06e90 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -88,7 +88,7 @@ inline void mgntEnqStats(const Message& msg,
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg.getContentSize();
+ uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalEnqueues +=1;
bStats->msgTotalEnqueues += 1;
qStats->byteTotalEnqueues += contentSize;
@@ -111,7 +111,7 @@ inline void mgntDeqStats(const Message& msg,
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg.getContentSize();
+ uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalDequeues += 1;
bStats->msgTotalDequeues += 1;
@@ -202,7 +202,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
redirectSource(false)
{
current.setCount(0);//always track depth in messages
- if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it
+ if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it
if (settings.traceExcludes.size()) {
split(traceExclude, settings.traceExcludes, ", ");
}
@@ -305,7 +305,7 @@ void Queue::deliverTo(Message msg, TxBuffer* txn)
void Queue::recoverPrepared(const Message& msg)
{
Mutex::ScopedLock locker(messageLock);
- current += QueueDepth(1, msg.getContentSize());
+ current += QueueDepth(1, msg.getMessageSize());
}
void Queue::recover(Message& msg)
@@ -319,7 +319,7 @@ void Queue::process(Message& msg)
push(msg);
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- const uint64_t contentSize = msg.getContentSize();
+ const uint64_t contentSize = msg.getMessageSize();
qStats->msgTxnEnqueues += 1;
qStats->byteTxnEnqueues += contentSize;
mgmtObject->statisticsUpdated();
@@ -861,7 +861,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
Mutex::ScopedLock locker(messageLock);
- if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) {
+ if (!checkDepth(QueueDepth(1, msg.getMessageSize()), msg)) {
return false;
}
}
@@ -891,7 +891,7 @@ void Queue::enqueueAborted(const Message& msg)
//Called when any transactional enqueue is aborted (including but
//not limited to a recovered dtx transaction)
Mutex::ScopedLock locker(messageLock);
- current -= QueueDepth(1, msg.getContentSize());
+ current -= QueueDepth(1, msg.getMessageSize());
}
void Queue::enqueueCommited(Message& msg)
@@ -919,7 +919,7 @@ void Queue::dequeueCommited(const Message& msg)
observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
- mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
+ mgmtObject->inc_byteTxnDequeues(msg.getMessageSize());
}
}
@@ -962,7 +962,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
- const uint64_t contentSize = msg->getContentSize();
+ const uint64_t contentSize = msg->getMessageSize();
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
@@ -986,7 +986,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
*/
void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete)
{
- current -= QueueDepth(1, msg.getContentSize());
+ current -= QueueDepth(1, msg.getMessageSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 9b2e31c925..607dc09fe8 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -112,7 +112,7 @@ void QueueFlowLimit::enqueued(const Message& msg)
sys::Mutex::ScopedLock l(indexLock);
++count;
- size += msg.getContentSize();
+ size += msg.getMessageSize();
if (!flowStopped) {
if (flowStopCount && count > flowStopCount) {
@@ -150,7 +150,7 @@ void QueueFlowLimit::dequeued(const Message& msg)
throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
}
- uint64_t _size = msg.getContentSize();
+ uint64_t _size = msg.getMessageSize();
if (_size <= size) {
size -= _size;
} else {
diff --git a/cpp/src/qpid/broker/ThresholdAlerts.cpp b/cpp/src/qpid/broker/ThresholdAlerts.cpp
index 345b9d89d5..afb9d9ff4e 100644
--- a/cpp/src/qpid/broker/ThresholdAlerts.cpp
+++ b/cpp/src/qpid/broker/ThresholdAlerts.cpp
@@ -44,7 +44,7 @@ ThresholdAlerts::ThresholdAlerts(const std::string& n,
void ThresholdAlerts::enqueued(const Message& m)
{
- size += m.getContentSize();
+ size += m.getMessageSize();
++count;
if (sizeGoingUp && sizeThreshold && size >= sizeThreshold) {
@@ -64,7 +64,7 @@ void ThresholdAlerts::enqueued(const Message& m)
void ThresholdAlerts::dequeued(const Message& m)
{
- size -= m.getContentSize();
+ size -= m.getMessageSize();
--count;
if (!sizeGoingUp && sizeThreshold && size <= sizeThresholdDown) {
diff --git a/cpp/src/qpid/broker/amqp/Message.cpp b/cpp/src/qpid/broker/amqp/Message.cpp
index cf134a894d..7015db324b 100644
--- a/cpp/src/qpid/broker/amqp/Message.cpp
+++ b/cpp/src/qpid/broker/amqp/Message.cpp
@@ -202,11 +202,7 @@ std::string Message::getAnnotationAsString(const std::string& key) const
}
-//getContentSize() is primarily used in stats about the number of
-//bytes enqueued/dequeued etc, not sure whether this is the right name
-//and whether it should indeed only be the content that is thus
-//measured
-uint64_t Message::getContentSize() const { return data.size(); }
+uint64_t Message::getMessageSize() const { return data.size(); }
//getContent() is used primarily for decoding qmf messages in
//management and ha, but also by the xml exchange
std::string Message::getContent() const
diff --git a/cpp/src/qpid/broker/amqp/Message.h b/cpp/src/qpid/broker/amqp/Message.h
index 3a7c4529de..5bf49ef556 100644
--- a/cpp/src/qpid/broker/amqp/Message.h
+++ b/cpp/src/qpid/broker/amqp/Message.h
@@ -44,7 +44,7 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess
std::string getRoutingKey() const;
bool isPersistent() const;
uint8_t getPriority() const;
- uint64_t getContentSize() const;
+ uint64_t getMessageSize() const;
std::string getPropertyAsString(const std::string& key) const;
std::string getAnnotationAsString(const std::string& key) const;
bool getTtl(uint64_t&) const;
diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index e3a967796b..0c8434f381 100644
--- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -52,6 +52,11 @@ uint64_t MessageTransfer::getContentSize() const
return frames.getContentSize();
}
+uint64_t MessageTransfer::getMessageSize() const
+{
+ return getRequiredCredit();
+}
+
std::string MessageTransfer::getAnnotationAsString(const std::string& key) const
{
const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index d32c0d07fd..422446db51 100644
--- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -45,6 +45,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
bool isPersistent() const;
uint8_t getPriority() const;
uint64_t getContentSize() const;
+ uint64_t getMessageSize() const;
qpid::amqp::MessageId getMessageId() const;
qpid::amqp::MessageId getCorrelationId() const;
std::string getPropertyAsString(const std::string& key) const;
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index 666a2c6297..28f8b0c2b8 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -65,7 +65,7 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
msg = registry.decode(buffer);
BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
- BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize());
+ BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContent().size());
BOOST_CHECK_EQUAL(data, msg.getContent());
//BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc"));
diff --git a/cpp/src/tests/QueueFlowLimitTest.cpp b/cpp/src/tests/QueueFlowLimitTest.cpp
index d305ca452b..7b0a776062 100644
--- a/cpp/src/tests/QueueFlowLimitTest.cpp
+++ b/cpp/src/tests/QueueFlowLimitTest.cpp
@@ -77,8 +77,14 @@ public:
Message createMessage(uint32_t size)
{
static uint32_t seqNum;
- Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x'));
- msg.setSequence(++seqNum);
+ //Need to compute what data size is required to make a given
+ //overall size (use one byte of content in test message to ensure
+ //content frame is added)
+ Message test = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string("x"));
+ size_t min = test.getMessageSize() - 1;
+ if (min > size) throw qpid::Exception("Can't create message that small!");
+ Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size - min, 'x'));
+ msg.setSequence(++seqNum);//this doesn't affect message size
return msg;
}
}
@@ -100,18 +106,18 @@ QPID_AUTO_TEST_CASE(testFlowCount)
std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue, no change to flow control
@@ -136,69 +142,69 @@ QPID_AUTO_TEST_CASE(testFlowCount)
QPID_AUTO_TEST_CASE(testFlowSize)
{
FieldTable args;
- args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70);
- args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50);
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, 700);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 460);
std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
- BOOST_CHECK_EQUAL((uint32_t) 70, flow->getFlowStopSize());
- BOOST_CHECK_EQUAL((uint32_t) 50, flow->getFlowResumeSize());
+ BOOST_CHECK_EQUAL((uint32_t) 700, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint32_t) 460, flow->getFlowResumeSize());
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 600 on queue
BOOST_CHECK_EQUAL(6u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(60u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(600u, flow->getFlowSize());
- Message msg_9 = createMessage(9);
- flow->enqueued(msg_9);
- BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue
- Message tinyMsg_1 = createMessage(1);
+ Message msg_50 = createMessage(50);
+ flow->enqueued(msg_50);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 650 on queue
+ Message tinyMsg_1 = createMessage(40);
flow->enqueued(tinyMsg_1);
- BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 690 on queue
- Message tinyMsg_2 = createMessage(1);
+ Message tinyMsg_2 = createMessage(40);
flow->enqueued(tinyMsg_2);
- BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON
- msgs.push_back(createMessage(10));
+ BOOST_CHECK(flow->isFlowControlActive()); // 730 on queue, ON
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
- BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 830 on queue
BOOST_CHECK_EQUAL(10u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(81u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(830u, flow->getFlowSize());
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 730 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 630 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 530 on queue
flow->dequeued(tinyMsg_1);
- BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 490 on queue
flow->dequeued(tinyMsg_2);
- BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF
+ BOOST_CHECK(!flow->isFlowControlActive()); // 450 on queue, OFF
- flow->dequeued(msg_9);
- BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue
+ flow->dequeued(msg_50);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 400 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 300 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 200 on queue
BOOST_CHECK_EQUAL(2u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(20u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(200u, flow->getFlowSize());
}
QPID_AUTO_TEST_CASE(testFlowArgs)
@@ -227,13 +233,13 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
FieldTable args;
args.setInt(QueueFlowLimit::flowStopCountKey, 10);
args.setInt(QueueFlowLimit::flowResumeCountKey, 5);
- args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
- args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, 2000);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 1000);
- std::deque<Message> msgs_1;
- std::deque<Message> msgs_10;
std::deque<Message> msgs_50;
std::deque<Message> msgs_100;
+ std::deque<Message> msgs_500;
+ std::deque<Message> msgs_1000;
Message msg;
@@ -243,104 +249,104 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
// verify flow control comes ON when only count passes its stop point.
for (size_t i = 0; i < 10; i++) {
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back());
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- // count:10 size:100
+ // count:10 size:1000
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:11 size: 101 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:11 size: 1050 ->ON
BOOST_CHECK(flow->isFlowControlActive());
for (size_t i = 0; i < 6; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
}
- // count:5 size: 41
+ // count:5 size: 450
- flow->dequeued(msgs_1.front()); // count: 4 size: 40 ->OFF
- msgs_1.pop_front();
+ flow->dequeued(msgs_50.front()); // count: 4 size: 400 ->OFF
+ msgs_50.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
for (size_t i = 0; i < 4; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
}
// count:0 size:0
// verify flow control comes ON when only size passes its stop point.
- msgs_100.push_back(createMessage(100));
- flow->enqueued(msgs_100.back()); // count:1 size: 100
+ msgs_1000.push_back(createMessage(1000));
+ flow->enqueued(msgs_1000.back()); // count:1 size: 1000
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_50.push_back(createMessage(50));
- flow->enqueued(msgs_50.back()); // count:2 size: 150
+ msgs_500.push_back(createMessage(500));
+ flow->enqueued(msgs_500.back()); // count:2 size: 1500
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_50.push_back(createMessage(50));
- flow->enqueued(msgs_50.back()); // count:3 size: 200
+ msgs_500.push_back(createMessage(500));
+ flow->enqueued(msgs_500.back()); // count:3 size: 2000
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:4 size: 201 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:4 size: 2050 ->ON
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_100.front()); // count:3 size:101
- msgs_100.pop_front();
+ flow->dequeued(msgs_1000.front()); // count:3 size:1050
+ msgs_1000.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_1.front()); // count:2 size:100
- msgs_1.pop_front();
+ flow->dequeued(msgs_50.front()); // count:2 size:1000
+ msgs_50.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_50.front()); // count:1 size:50 ->OFF
- msgs_50.pop_front();
+ flow->dequeued(msgs_500.front()); // count:1 size:500 ->OFF
+ msgs_500.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
// verify flow control remains ON until both thresholds drop below their
// resume point.
for (size_t i = 0; i < 8; i++) {
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back());
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- // count:9 size:130
+ // count:9 size:1300
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back()); // count:10 size: 140
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back()); // count:10 size: 1400
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:11 size: 141 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:11 size: 1450 ->ON
BOOST_CHECK(flow->isFlowControlActive());
- msgs_100.push_back(createMessage(100));
- flow->enqueued(msgs_100.back()); // count:12 size: 241 (both thresholds crossed)
+ msgs_1000.push_back(createMessage(1000));
+ flow->enqueued(msgs_1000.back()); // count:12 size: 2450 (both thresholds crossed)
BOOST_CHECK(flow->isFlowControlActive());
- // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241
+ // at this point: 9@100 + 1@500 + 1@1000 + 1@50 == 12@2450
- flow->dequeued(msgs_50.front()); // count:11 size:191
- msgs_50.pop_front();
+ flow->dequeued(msgs_500.front()); // count:11 size:1950
+ msgs_500.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
for (size_t i = 0; i < 9; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
}
- // count:2 size:101
- flow->dequeued(msgs_1.front()); // count:1 size:100
- msgs_1.pop_front();
+ // count:2 size:1050
+ flow->dequeued(msgs_50.front()); // count:1 size:1000
+ msgs_50.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // still active due to size
- flow->dequeued(msgs_100.front()); // count:0 size:0 ->OFF
- msgs_100.pop_front();
+ flow->dequeued(msgs_1000.front()); // count:0 size:0 ->OFF
+ msgs_1000.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
}
diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp
index ce1b0addea..f61c283fd4 100644
--- a/cpp/src/tests/QueuePolicyTest.cpp
+++ b/cpp/src/tests/QueuePolicyTest.cpp
@@ -69,9 +69,15 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount)
QPID_AUTO_TEST_CASE(testRingPolicySize)
{
- std::string hundredBytes = std::string(100, 'h');
- std::string fourHundredBytes = std::string (400, 'f');
- std::string thousandBytes = std::string(1000, 't');
+ //The message size now includes all headers as well as the content
+ //aka body, so compute the amount of data needed to hit a given
+ //overall size
+ std::string q("my-ring-queue");
+ size_t minMessageSize = 25/*minimum size of headers*/ + q.size()/*routing key length*/ + 4/*default exchange, added by broker*/;
+
+ std::string hundredBytes = std::string(100 - minMessageSize, 'h');
+ std::string fourHundredBytes = std::string (400 - minMessageSize, 'f');
+ std::string thousandBytes = std::string(1000 - minMessageSize, 't');
// Ring queue, 500 bytes maxSize
@@ -79,7 +85,6 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
args.setSizePolicy(RING, 500, 0);
SessionFixture f;
- std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
// A. Send messages 0 .. 5, each 100 bytes