summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp8
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp12
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h11
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
4 files changed, 24 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index f6a50a7ef5..86768f0d88 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -330,9 +330,9 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
- ack_iterator j = (firstTag == 0) ?
- unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+ ack_iterator j = (firstTag == 0) ?
+ unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
if(i == unacked.end()){
throw ConnectionException(530, "Received ack for unrecognised delivery tag");
@@ -364,7 +364,7 @@ void Channel::recover(bool requeue){
outstanding.reset();
std::list<DeliveryRecord> copy = unacked;
unacked.clear();
- for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
+ for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
}
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 1473ab6288..8ec2064680 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -75,6 +75,14 @@ void Queue::process(Message::shared_ptr& msg){
}
}
+void Queue::requeue(Message::shared_ptr& msg){
+ Mutex::ScopedLock locker(lock);
+ if(queueing || !dispatch(msg)){
+ queueing = true;
+ messages.push_front(msg);
+ }
+}
+
bool Queue::dispatch(Message::shared_ptr& msg){
if(consumers.empty()){
return false;
@@ -163,12 +171,12 @@ uint32_t Queue::purge(){
void Queue::pop(){
if (policy.get()) policy->dequeued(messages.front()->contentSize());
- messages.pop();
+ messages.pop_front();
}
void Queue::push(Message::shared_ptr& msg){
queueing = true;
- messages.push(msg);
+ messages.push_back(msg);
if (policy.get()) {
policy->enqueued(msg->contentSize());
if (policy->limitExceeded()) {
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index 35ffe2bc13..e39e58d35d 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -23,7 +23,7 @@
*/
#include <vector>
#include <memory>
-#include <queue>
+#include <deque>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_types.h"
#include "ConnectionToken.h"
@@ -57,7 +57,7 @@ namespace qpid {
*/
class Queue : public PersistableQueue{
typedef std::vector<Consumer*> Consumers;
- typedef std::queue<Message::shared_ptr> Messages;
+ typedef std::deque<Message::shared_ptr> Messages;
const string name;
const bool autodelete;
@@ -109,6 +109,13 @@ namespace qpid {
*/
void process(Message::shared_ptr& msg);
/**
+ * Returns a message to the in-memory queue (due to lack
+ * of acknowledegement from a receiver). If a consumer is
+ * available it will be dispatched immediately, else it
+ * will be returned to the front of the queue.
+ */
+ void requeue(Message::shared_ptr& msg);
+ /**
* Used during recovery to add stored messages back to the queue
*/
void recover(Message::shared_ptr& msg);
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 95650b7b23..7e16adafda 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -65,7 +65,7 @@ void DeliveryRecord::redeliver(Channel* const channel) const{
void DeliveryRecord::requeue() const{
msg->redeliver();
- queue->process(msg);
+ queue->requeue(msg);
}
void DeliveryRecord::addTo(Prefetch* const prefetch) const{