diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-03 14:54:19 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-03 14:54:19 +0000 |
commit | 338f2196c07ea6ce2cd40941bca9ef11e95be2bf (patch) | |
tree | 63ac3fe82c7d668850608ea19334997960089062 /cpp/src | |
parent | e7f82f3d5b39c77ee913ee4cd6b4bf1bdc8b13bc (diff) | |
download | qpid-python-338f2196c07ea6ce2cd40941bca9ef11e95be2bf.tar.gz |
Fix (and test) for QPID-407. Messages are requeued at the head rather than the tail.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@552862 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 2 |
4 files changed, 24 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index f6a50a7ef5..86768f0d88 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -330,9 +330,9 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); - ack_iterator j = (firstTag == 0) ? - unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); + ack_iterator j = (firstTag == 0) ? + unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); if(i == unacked.end()){ throw ConnectionException(530, "Received ack for unrecognised delivery tag"); @@ -364,7 +364,7 @@ void Channel::recover(bool requeue){ outstanding.reset(); std::list<DeliveryRecord> copy = unacked; unacked.clear(); - for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); + for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); } diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 1473ab6288..8ec2064680 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -75,6 +75,14 @@ void Queue::process(Message::shared_ptr& msg){ } } +void Queue::requeue(Message::shared_ptr& msg){ + Mutex::ScopedLock locker(lock); + if(queueing || !dispatch(msg)){ + queueing = true; + messages.push_front(msg); + } +} + bool Queue::dispatch(Message::shared_ptr& msg){ if(consumers.empty()){ return false; @@ -163,12 +171,12 @@ uint32_t Queue::purge(){ void Queue::pop(){ if (policy.get()) policy->dequeued(messages.front()->contentSize()); - messages.pop(); + messages.pop_front(); } void Queue::push(Message::shared_ptr& msg){ queueing = true; - messages.push(msg); + messages.push_back(msg); if (policy.get()) { policy->enqueued(msg->contentSize()); if (policy->limitExceeded()) { diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 35ffe2bc13..e39e58d35d 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -23,7 +23,7 @@ */ #include <vector> #include <memory> -#include <queue> +#include <deque> #include <boost/shared_ptr.hpp> #include "qpid/framing/amqp_types.h" #include "ConnectionToken.h" @@ -57,7 +57,7 @@ namespace qpid { */ class Queue : public PersistableQueue{ typedef std::vector<Consumer*> Consumers; - typedef std::queue<Message::shared_ptr> Messages; + typedef std::deque<Message::shared_ptr> Messages; const string name; const bool autodelete; @@ -109,6 +109,13 @@ namespace qpid { */ void process(Message::shared_ptr& msg); /** + * Returns a message to the in-memory queue (due to lack + * of acknowledegement from a receiver). If a consumer is + * available it will be dispatched immediately, else it + * will be returned to the front of the queue. + */ + void requeue(Message::shared_ptr& msg); + /** * Used during recovery to add stored messages back to the queue */ void recover(Message::shared_ptr& msg); diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 95650b7b23..7e16adafda 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -65,7 +65,7 @@ void DeliveryRecord::redeliver(Channel* const channel) const{ void DeliveryRecord::requeue() const{ msg->redeliver(); - queue->process(msg); + queue->requeue(msg); } void DeliveryRecord::addTo(Prefetch* const prefetch) const{ |