diff options
author | Gordon Sim <gsim@apache.org> | 2009-02-25 11:02:20 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-02-25 11:02:20 +0000 |
commit | ebc7fef02649eda4e245c449146f8911ae730e31 (patch) | |
tree | ec31c587a5874123fe0b6e07e04590bdf3cb55ba /cpp/src | |
parent | d0ce4843db075979f1e0b45d38c3300b17763a13 (diff) | |
download | qpid-python-ebc7fef02649eda4e245c449146f8911ae730e31.tar.gz |
QPID-1685: Fixed ring queue policy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747744 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 26 | ||||
-rw-r--r-- | cpp/src/tests/QueuePolicyTest.cpp | 9 |
3 files changed, 23 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 3ae53c8ea9..8c50f26abd 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -676,9 +676,9 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { - if (policy.get() && !policy->isEnqueued(msg)) return false; { Mutex::ScopedLock locker(messageLock); + if (policy.get() && !policy->isEnqueued(msg)) return false; if (!ctxt) { dequeued(msg); } diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 41a6709d27..fc65387f08 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -197,11 +197,12 @@ void RingQueuePolicy::enqueued(const QueuedMessage& m) void RingQueuePolicy::dequeued(const QueuedMessage& m) { qpid::sys::Mutex::ScopedLock l(lock); - QueuePolicy::dequeued(m); //find and remove m from queue - for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { - if (i->position == m.position) { + for (Messages::iterator i = queue.begin(); i != queue.end(); i++) { + if (i->payload == m.payload) { queue.erase(i); + //now update count and size + QueuePolicy::dequeued(m); break; } } @@ -210,9 +211,11 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { qpid::sys::Mutex::ScopedLock l(lock); - //for non-strict ring policy, a message can be dequeued before acked; need to detect this - for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) { - if (i->position == m.position) { + //for non-strict ring policy, a message can be replaced (and + //therefore dequeued) before it is accepted or released by + //subscriber; need to detect this + for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) { + if (i->payload == m.payload) { return true; } } @@ -236,13 +239,10 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) oldest = queue.front(); } if (oldest.queue->acquire(oldest) || !strict) { - qpid::sys::Mutex::ScopedLock l(lock); - if (oldest.position == queue.front().position) { - queue.pop_front(); - QPID_LOG(debug, "Ring policy triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << ": removed message " << oldest.position << " to make way for " << m.position); - } + oldest.queue->dequeue(0, oldest); + QPID_LOG(debug, "Ring policy triggered in queue " + << (m.queue ? m.queue->getName() : std::string("unknown queue")) + << ": removed message " << oldest.position << " to make way for " << m.position); return true; } else { QPID_LOG(debug, "Ring policy could not be triggered in queue " diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp index 6c650169c7..7c7f8b7a10 100644 --- a/cpp/src/tests/QueuePolicyTest.cpp +++ b/cpp/src/tests/QueuePolicyTest.cpp @@ -158,6 +158,15 @@ QPID_AUTO_TEST_CASE(testRingPolicy) BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); } BOOST_CHECK(!f.subs.get(msg, q)); + + for (int i = 10; i < 20; i++) { + f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + for (int i = 15; i < 20; i++) { + BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); + } + BOOST_CHECK(!f.subs.get(msg, q)); } QPID_AUTO_TEST_CASE(testStrictRingPolicy) |