From ab129de8ffa271b543ca03a4e25026ffb8d65d2c Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 29 Apr 2016 14:16:42 +0000 Subject: QPID-7240: use protper cursor type when purging priority levels git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1741635 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/PriorityQueue.cpp | 2 +- qpid/cpp/src/tests/MessagingSessionTests.cpp | 40 ++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp index 5e60fe5cce..58764637b1 100644 --- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -77,7 +77,7 @@ Message* PriorityQueue::next(QueueCursor& cursor) { boost::shared_ptr ctxt = boost::dynamic_pointer_cast(cursor.context); if (!ctxt) { - ctxt = boost::shared_ptr(new PriorityContext(levels, CONSUMER)); + ctxt = boost::shared_ptr(new PriorityContext(levels, cursor.type)); cursor.context = ctxt; } if (cursor.type == REPLICATOR) { diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index fe9f6d2665..6bc43bc8e1 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1562,6 +1562,46 @@ QPID_AUTO_TEST_CASE(testClientExpiration) BOOST_CHECK_EQUAL(b_count, 50); } +QPID_AUTO_TEST_CASE(testPriorityRingEviction) +{ + MessagingFixture fix; + std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10, qpid.max_count:5, qpid.policy_type:ring}}}}"); + Sender sender = fix.session.createSender(queue); + Receiver receiver = fix.session.createReceiver(queue); + std::vector acquired; + for (uint i = 0; i < 5; ++i) { + Message msg((boost::format("msg_%1%") % (i+1)).str()); + sender.send(msg); + } + //fetch but don't acknowledge messages, leaving them in acquired state + for (uint i = 0; i < 5; ++i) { + Message msg; + BOOST_CHECK(receiver.fetch(msg, Duration::IMMEDIATE)); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("msg_%1%") % (i+1)).str()); + acquired.push_back(msg); + } + //send 5 more messages to the queue, which should cause all the + //acquired messages to be dropped + for (uint i = 5; i < 10; ++i) { + Message msg((boost::format("msg_%1%") % (i+1)).str()); + sender.send(msg); + } + //now release the acquired messages, which should have been evicted... + for (std::vector::iterator i = acquired.begin(); i != acquired.end(); ++i) { + fix.session.release(*i); + } + acquired.clear(); + //and check that the newest five are received + for (uint i = 5; i < 10; ++i) { + Message msg; + BOOST_CHECK(receiver.fetch(msg, Duration::IMMEDIATE)); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("msg_%1%") % (i+1)).str()); + acquired.push_back(msg); + } + Message msg; + BOOST_CHECK(!receiver.fetch(msg, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests -- cgit v1.2.1