summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Robie <jonathan@apache.org>2010-10-01 13:20:54 +0000
committerJonathan Robie <jonathan@apache.org>2010-10-01 13:20:54 +0000
commit98147863bc1e7816eedf6c957d96390ae35ebc60 (patch)
treee3387108ad9ac7bbb5aa2017e287e21fedb90862
parentaf9aae7562262306d59c6119e476ae07c6130510 (diff)
downloadqpid-python-98147863bc1e7816eedf6c957d96390ae35ebc60.tar.gz
Fixes two bugs for ring queue policies that involve size.
- When messages vary in size, now correctly displaces enough smaller messages to make room for the new message. - When a message is larger than maximum queue size, now correctly rejects the message. Resolves JIRA QPID-2338 (https://issues.apache.org/jira/browse/QPID-2338). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1003531 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp61
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.h3
-rw-r--r--qpid/cpp/src/tests/QueuePolicyTest.cpp83
3 files changed, 125 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index c8feaa8a62..4b185ef025 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -222,30 +222,51 @@ bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
-
- QueuedMessage oldest;
- if (queue.empty()) {
+
+ // If the message is bigger than the queue size, give up
+ if (m->contentSize() > getMaxSize()) {
QPID_LOG(debug, "Message too large for ring queue " << name
<< " [" << *this << "] "
- << ": message size = " << m->contentSize() << " bytes");
- return false;
- }
- oldest = queue.front();
- if (oldest.queue->acquire(oldest) || !strict) {
- queue.pop_front();
- pendingDequeues.push_back(oldest);
- QPID_LOG(debug, "Ring policy triggered in " << name
- << ": removed message " << oldest.position << " to make way for new message");
- return true;
- } else {
- QPID_LOG(debug, "Ring policy could not be triggered in " << name
- << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
- //in strict mode, if oldest message has been delivered (hence
- //cannot be acquired) but not yet acked, it should not be
- //removed and the attempted enqueue should fail
+ << ": message size = " << m->contentSize() << " bytes"
+ << ": max queue size = " << getMaxSize() << " bytes");
return false;
}
+
+ // if within limits, ok to accept
+ if (QueuePolicy::checkLimit(m)) return true;
+
+ // At this point, we've exceeded maxSize, maxCount, or both.
+ //
+ // If we've exceeded maxCount, we've exceeded it by 1, so
+ // replacing the first message is sufficient. If we've exceeded
+ // maxSize, we need to pop enough messages to get the space we
+ // need.
+
+ unsigned int haveSpace = getMaxSize() - getCurrentQueueSize();
+
+ do {
+ QueuedMessage oldest = queue.front();
+
+ if (oldest.queue->acquire(oldest) || !strict) {
+ queue.pop_front();
+ pendingDequeues.push_back(oldest);
+ QPID_LOG(debug, "Ring policy triggered in " << name
+ << ": removed message " << oldest.position << " to make way for new message");
+
+ haveSpace += oldest.payload->contentSize();
+
+ } else {
+ //in strict mode, if oldest message has been delivered (hence
+ //cannot be acquired) but not yet acked, it should not be
+ //removed and the attempted enqueue should fail
+ QPID_LOG(debug, "Ring policy could not be triggered in " << name
+ << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
+ return false;
+ }
+ } while (haveSpace < m->contentSize());
+
+
+ return true;
}
void RingQueuePolicy::getPendingDequeues(Messages& result)
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h
index b2937e94c7..7006b617a1 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.h
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h
@@ -46,6 +46,9 @@ class QueuePolicy
static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
+ protected:
+ uint64_t getCurrentQueueSize() const { return size; }
+
public:
typedef std::deque<QueuedMessage> Messages;
static QPID_BROKER_EXTERN const std::string maxCountKey;
diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp
index 875976db85..5992151e46 100644
--- a/qpid/cpp/src/tests/QueuePolicyTest.cpp
+++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp
@@ -1,4 +1,4 @@
- /*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <sstream>
#include "unit_test.h"
#include "test_tools.h"
@@ -143,7 +144,7 @@ QPID_AUTO_TEST_CASE(testSettings)
BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
}
-QPID_AUTO_TEST_CASE(testRingPolicy)
+QPID_AUTO_TEST_CASE(testRingPolicyCount)
{
FieldTable args;
std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
@@ -172,6 +173,84 @@ QPID_AUTO_TEST_CASE(testRingPolicy)
BOOST_CHECK(!f.subs.get(msg, q));
}
+QPID_AUTO_TEST_CASE(testRingPolicySize)
+{
+ std::string hundredBytes = std::string(100, 'h');
+ std::string fourHundredBytes = std::string (400, 'f');
+ std::string thousandBytes = std::string(1000, 't');
+
+ // Ring queue, 500 bytes maxSize
+
+ FieldTable args;
+ std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
+ policy->update(args);
+
+ ProxySessionFixture f;
+ std::string q("my-ring-queue");
+ f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
+
+ // A. Send messages 0 .. 5, each 100 bytes
+
+ client::Message m(hundredBytes, q);
+
+ for (int i = 0; i < 6; i++) {
+ std::stringstream id;
+ id << i;
+ m.getMessageProperties().setCorrelationId(id.str());
+ f.session.messageTransfer(arg::content=m);
+ }
+
+ // should find 1 .. 5 on the queue, 0 is displaced by 5
+ client::Message msg;
+ for (int i = 1; i < 6; i++) {
+ std::stringstream id;
+ id << i;
+ BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(msg.getMessageProperties().getCorrelationId(), id.str());
+ }
+ BOOST_CHECK(!f.subs.get(msg, q));
+
+ // B. Now make sure that one 400 byte message displaces four 100 byte messages
+
+ // Send messages 0 .. 5, each 100 bytes
+ for (int i = 0; i < 6; i++) {
+ client::Message m(hundredBytes, q);
+ std::stringstream id;
+ id << i;
+ m.getMessageProperties().setCorrelationId(id.str());
+ f.session.messageTransfer(arg::content=m);
+ }
+
+ // Now send one 400 byte message
+ client::Message m2(fourHundredBytes, q);
+ m2.getMessageProperties().setCorrelationId("6");
+ f.session.messageTransfer(arg::content=m2);
+
+ // expect to see 5, 6 on the queue
+ for (int i = 5; i < 7; i++) {
+ std::stringstream id;
+ id << i;
+ BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(msg.getMessageProperties().getCorrelationId(), id.str());
+ }
+ BOOST_CHECK(!f.subs.get(msg, q));
+
+
+ // C. Try sending a 1000-byte message, should fail - exceeds maxSize of queue
+
+ client::Message m3(thousandBytes, q);
+ m3.getMessageProperties().setCorrelationId("6");
+ try {
+ ScopedSuppressLogging sl;
+ f.session.messageTransfer(arg::content=m3);
+ BOOST_FAIL("Ooops - successfully added a 1000 byte message to a 512 byte ring queue ...");
+ }
+ catch (...) {
+ }
+
+}
+
+
QPID_AUTO_TEST_CASE(testStrictRingPolicy)
{
FieldTable args;