diff options
author | Jonathan Robie <jonathan@apache.org> | 2010-10-01 13:20:54 +0000 |
---|---|---|
committer | Jonathan Robie <jonathan@apache.org> | 2010-10-01 13:20:54 +0000 |
commit | 98147863bc1e7816eedf6c957d96390ae35ebc60 (patch) | |
tree | e3387108ad9ac7bbb5aa2017e287e21fedb90862 | |
parent | af9aae7562262306d59c6119e476ae07c6130510 (diff) | |
download | qpid-python-98147863bc1e7816eedf6c957d96390ae35ebc60.tar.gz |
Fixes two bugs for ring queue policies that involve size.
- When messages vary in size, now correctly displaces enough smaller messages
to make room for the new message.
- When a message is larger than maximum queue size, now correctly rejects
the message.
Resolves JIRA QPID-2338 (https://issues.apache.org/jira/browse/QPID-2338).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1003531 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 61 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueuePolicyTest.cpp | 83 |
3 files changed, 125 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index c8feaa8a62..4b185ef025 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -222,30 +222,51 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { - if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept - - QueuedMessage oldest; - if (queue.empty()) { + + // If the message is bigger than the queue size, give up + if (m->contentSize() > getMaxSize()) { QPID_LOG(debug, "Message too large for ring queue " << name << " [" << *this << "] " - << ": message size = " << m->contentSize() << " bytes"); - return false; - } - oldest = queue.front(); - if (oldest.queue->acquire(oldest) || !strict) { - queue.pop_front(); - pendingDequeues.push_back(oldest); - QPID_LOG(debug, "Ring policy triggered in " << name - << ": removed message " << oldest.position << " to make way for new message"); - return true; - } else { - QPID_LOG(debug, "Ring policy could not be triggered in " << name - << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); - //in strict mode, if oldest message has been delivered (hence - //cannot be acquired) but not yet acked, it should not be - //removed and the attempted enqueue should fail + << ": message size = " << m->contentSize() << " bytes" + << ": max queue size = " << getMaxSize() << " bytes"); return false; } + + // if within limits, ok to accept + if (QueuePolicy::checkLimit(m)) return true; + + // At this point, we've exceeded maxSize, maxCount, or both. + // + // If we've exceeded maxCount, we've exceeded it by 1, so + // replacing the first message is sufficient. If we've exceeded + // maxSize, we need to pop enough messages to get the space we + // need. + + unsigned int haveSpace = getMaxSize() - getCurrentQueueSize(); + + do { + QueuedMessage oldest = queue.front(); + + if (oldest.queue->acquire(oldest) || !strict) { + queue.pop_front(); + pendingDequeues.push_back(oldest); + QPID_LOG(debug, "Ring policy triggered in " << name + << ": removed message " << oldest.position << " to make way for new message"); + + haveSpace += oldest.payload->contentSize(); + + } else { + //in strict mode, if oldest message has been delivered (hence + //cannot be acquired) but not yet acked, it should not be + //removed and the attempted enqueue should fail + QPID_LOG(debug, "Ring policy could not be triggered in " << name + << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); + return false; + } + } while (haveSpace < m->contentSize()); + + + return true; } void RingQueuePolicy::getPendingDequeues(Messages& result) diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index b2937e94c7..7006b617a1 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -46,6 +46,9 @@ class QueuePolicy static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); + protected: + uint64_t getCurrentQueueSize() const { return size; } + public: typedef std::deque<QueuedMessage> Messages; static QPID_BROKER_EXTERN const std::string maxCountKey; diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index 875976db85..5992151e46 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -1,4 +1,4 @@ - /* +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,6 +18,7 @@ * under the License. * */ +#include <sstream> #include "unit_test.h" #include "test_tools.h" @@ -143,7 +144,7 @@ QPID_AUTO_TEST_CASE(testSettings) BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize()); } -QPID_AUTO_TEST_CASE(testRingPolicy) +QPID_AUTO_TEST_CASE(testRingPolicyCount) { FieldTable args; std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING); @@ -172,6 +173,84 @@ QPID_AUTO_TEST_CASE(testRingPolicy) BOOST_CHECK(!f.subs.get(msg, q)); } +QPID_AUTO_TEST_CASE(testRingPolicySize) +{ + std::string hundredBytes = std::string(100, 'h'); + std::string fourHundredBytes = std::string (400, 'f'); + std::string thousandBytes = std::string(1000, 't'); + + // Ring queue, 500 bytes maxSize + + FieldTable args; + std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING); + policy->update(args); + + ProxySessionFixture f; + std::string q("my-ring-queue"); + f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); + + // A. Send messages 0 .. 5, each 100 bytes + + client::Message m(hundredBytes, q); + + for (int i = 0; i < 6; i++) { + std::stringstream id; + id << i; + m.getMessageProperties().setCorrelationId(id.str()); + f.session.messageTransfer(arg::content=m); + } + + // should find 1 .. 5 on the queue, 0 is displaced by 5 + client::Message msg; + for (int i = 1; i < 6; i++) { + std::stringstream id; + id << i; + BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(msg.getMessageProperties().getCorrelationId(), id.str()); + } + BOOST_CHECK(!f.subs.get(msg, q)); + + // B. Now make sure that one 400 byte message displaces four 100 byte messages + + // Send messages 0 .. 5, each 100 bytes + for (int i = 0; i < 6; i++) { + client::Message m(hundredBytes, q); + std::stringstream id; + id << i; + m.getMessageProperties().setCorrelationId(id.str()); + f.session.messageTransfer(arg::content=m); + } + + // Now send one 400 byte message + client::Message m2(fourHundredBytes, q); + m2.getMessageProperties().setCorrelationId("6"); + f.session.messageTransfer(arg::content=m2); + + // expect to see 5, 6 on the queue + for (int i = 5; i < 7; i++) { + std::stringstream id; + id << i; + BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(msg.getMessageProperties().getCorrelationId(), id.str()); + } + BOOST_CHECK(!f.subs.get(msg, q)); + + + // C. Try sending a 1000-byte message, should fail - exceeds maxSize of queue + + client::Message m3(thousandBytes, q); + m3.getMessageProperties().setCorrelationId("6"); + try { + ScopedSuppressLogging sl; + f.session.messageTransfer(arg::content=m3); + BOOST_FAIL("Ooops - successfully added a 1000 byte message to a 512 byte ring queue ..."); + } + catch (...) { + } + +} + + QPID_AUTO_TEST_CASE(testStrictRingPolicy) { FieldTable args; |