From 338f2196c07ea6ce2cd40941bca9ef11e95be2bf Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 3 Jul 2007 14:54:19 +0000 Subject: Fix (and test) for QPID-407. Messages are requeued at the head rather than the tail. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@552862 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/BrokerChannel.cpp | 8 ++++---- cpp/src/qpid/broker/BrokerQueue.cpp | 12 ++++++++++-- cpp/src/qpid/broker/BrokerQueue.h | 11 +++++++++-- cpp/src/qpid/broker/DeliveryRecord.cpp | 2 +- 4 files changed, 24 insertions(+), 9 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index f6a50a7ef5..86768f0d88 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -330,9 +330,9 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); - ack_iterator j = (firstTag == 0) ? - unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); + ack_iterator j = (firstTag == 0) ? + unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); if(i == unacked.end()){ throw ConnectionException(530, "Received ack for unrecognised delivery tag"); @@ -364,7 +364,7 @@ void Channel::recover(bool requeue){ outstanding.reset(); std::list copy = unacked; unacked.clear(); - for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); + for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); } diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 1473ab6288..8ec2064680 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -75,6 +75,14 @@ void Queue::process(Message::shared_ptr& msg){ } } +void Queue::requeue(Message::shared_ptr& msg){ + Mutex::ScopedLock locker(lock); + if(queueing || !dispatch(msg)){ + queueing = true; + messages.push_front(msg); + } +} + bool Queue::dispatch(Message::shared_ptr& msg){ if(consumers.empty()){ return false; @@ -163,12 +171,12 @@ uint32_t Queue::purge(){ void Queue::pop(){ if (policy.get()) policy->dequeued(messages.front()->contentSize()); - messages.pop(); + messages.pop_front(); } void Queue::push(Message::shared_ptr& msg){ queueing = true; - messages.push(msg); + messages.push_back(msg); if (policy.get()) { policy->enqueued(msg->contentSize()); if (policy->limitExceeded()) { diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 35ffe2bc13..e39e58d35d 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -23,7 +23,7 @@ */ #include #include -#include +#include #include #include "qpid/framing/amqp_types.h" #include "ConnectionToken.h" @@ -57,7 +57,7 @@ namespace qpid { */ class Queue : public PersistableQueue{ typedef std::vector Consumers; - typedef std::queue Messages; + typedef std::deque Messages; const string name; const bool autodelete; @@ -108,6 +108,13 @@ namespace qpid { * one is available or stores it for later if not. */ void process(Message::shared_ptr& msg); + /** + * Returns a message to the in-memory queue (due to lack + * of acknowledegement from a receiver). If a consumer is + * available it will be dispatched immediately, else it + * will be returned to the front of the queue. + */ + void requeue(Message::shared_ptr& msg); /** * Used during recovery to add stored messages back to the queue */ diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 95650b7b23..7e16adafda 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -65,7 +65,7 @@ void DeliveryRecord::redeliver(Channel* const channel) const{ void DeliveryRecord::requeue() const{ msg->redeliver(); - queue->process(msg); + queue->requeue(msg); } void DeliveryRecord::addTo(Prefetch* const prefetch) const{ -- cgit v1.2.1