summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-20 17:24:40 +0000
committerGordon Sim <gsim@apache.org>2013-08-20 17:24:40 +0000
commitef87423f4ee8dcb6a91903030ae28dba2deefdf8 (patch)
tree4af2485fb2442893594a13a19de4e353173dcdbf
parent6c7c3d5a561c93463240add32d5eb77a09c959ae (diff)
downloadqpid-python-ef87423f4ee8dcb6a91903030ae28dba2deefdf8.tar.gz
QPID-5085: ensure messages released 'behind' a cursor on a priority queue don't get missed (merged from r1515793)
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.24@1515891 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp7
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp20
2 files changed, 26 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index 99488ded13..5e60fe5cce 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -135,7 +135,12 @@ void PriorityQueue::publish(const Message& published)
Message* PriorityQueue::release(const QueueCursor& cursor)
{
MessagePointer* ptr = fifo.release(cursor);
- return ptr ? &(ptr->holder->message) : 0;
+ if (ptr) {
+ messages[ptr->holder->priority].resetCursors();
+ return &(ptr->holder->message);
+ } else {
+ return 0;
+ }
}
void PriorityQueue::foreach(Functor f)
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 2cf9648be4..2133cb5605 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1332,6 +1332,26 @@ QPID_AUTO_TEST_CASE(testReroutingRingQueue)
}
}
+QPID_AUTO_TEST_CASE(testReleaseOnPriorityQueue)
+{
+ MessagingFixture fix;
+ std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10}}}}");
+ std::string text("my message");
+ Sender sender = fix.session.createSender(queue);
+ sender.send(Message(text));
+ Receiver receiver = fix.session.createReceiver(queue);
+ Message msg;
+ for (uint i = 0; i < 10; ++i) {
+ if (receiver.fetch(msg, Duration::SECOND)) {
+ BOOST_CHECK_EQUAL(msg.getContent(), text);
+ fix.session.release(msg);
+ } else {
+ BOOST_FAIL("Released message not redelivered as expected.");
+ }
+ }
+ fix.session.acknowledge();
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests