summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-02-13 19:21:21 +0000
committerGordon Sim <gsim@apache.org>2009-02-13 19:21:21 +0000
commit492b05e6aa1a771d86cdb29c04dfe7d0ebefdd4b (patch)
treee68d7d95ea082c4faaaa1b71e909785fd663f446
parentf8b4543a97933d7736f89451be7726b411e8498b (diff)
downloadqpid-python-492b05e6aa1a771d86cdb29c04dfe7d0ebefdd4b.tar.gz
Ensure that the queue depth in bytes remains accurate for LVQ. This also ensures that there are no underflow exceptions from the policy that keeps this count.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@744222 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp4
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp22
2 files changed, 25 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index bcce83af1e..3ae53c8ea9 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -563,8 +563,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
listeners.populate(copy);
lvq[key] = msg;
}else {
+ boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
+ if (!old) old = i->second;
i->second->setReplacementMessage(msg,this);
- dequeued(QueuedMessage(qm.queue, i->second, qm.position));
+ dequeued(QueuedMessage(qm.queue, old, qm.position));
}
}else {
messages.push_back(qm);
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index e4c311d8a0..e156000f18 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -21,6 +21,7 @@
#include "unit_test.h"
#include "test_tools.h"
#include "BrokerFixture.h"
+#include "qpid/client/QueueOptions.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
@@ -526,6 +527,27 @@ QPID_AUTO_TEST_CASE(testSessionCloseOnInvalidSession) {
session.close();
}
+QPID_AUTO_TEST_CASE(testLVQVariedSize) {
+ ClientSessionFixture fix;
+ std::string queue("my-lvq");
+ QueueOptions args;
+ args.setOrdering(LVQ_NO_BROWSE);
+ fix.session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
+
+ std::string key;
+ args.getLVQKey(key);
+
+ for (size_t i = 0; i < 10; i++) {
+ std::ostringstream data;
+ size_t size = 100 - ((i % 10) * 10);
+ data << std::string(size, 'x');
+
+ Message m(data.str(), queue);
+ m.getHeaders().setString(key, "abc");
+ fix.session.messageTransfer(arg::content=m);
+ }
+}
+
QPID_AUTO_TEST_SUITE_END()