diff options
Diffstat (limited to 'cpp/src/qpid/broker/Session.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Session.cpp | 55 |
1 files changed, 47 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index 26b694d073..a8b22cb12a 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -34,6 +34,7 @@ #include "TxPublish.h" #include "qpid/QpidError.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <boost/format.hpp> @@ -91,12 +92,12 @@ bool Session::exists(const string& consumerTag){ } void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool nolocal, bool acks, + Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire, bool exclusive, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal)); + std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -239,7 +240,9 @@ Session::ConsumerImpl::ConsumerImpl(Session* _parent, bool ack, bool _nolocal, bool _acquire -) : parent(_parent), + ) : + Consumer(_acquire), + parent(_parent), token(_token), name(_name), queue(_queue), @@ -266,7 +269,7 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg) DeliveryId deliveryTag = parent->deliveryAdapter->deliver(msg.payload, token); if (ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, deliveryTag)); + parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire)); } } return !blocked; @@ -312,7 +315,7 @@ void Session::ConsumerImpl::cancel() void Session::ConsumerImpl::requestDispatch() { if(blocked) - queue->requestDispatch(); + queue->requestDispatch(this); } void Session::handle(Message::shared_ptr msg) { @@ -532,9 +535,7 @@ void Session::ConsumerImpl::addMessageCredit(uint32_t value) void Session::ConsumerImpl::flush() { - //TODO: need to wait until any messages that are available for - //this consumer have been delivered... i.e. some sort of flush on - //the queue... + queue->requestDispatch(this, true); } void Session::ConsumerImpl::stop() @@ -559,4 +560,42 @@ Queue::shared_ptr Session::getQueue(const string& name) const { return queue; } +AckRange Session::findRange(DeliveryId first, DeliveryId last) +{ + ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; + + if (first == last) { + //just acked single element (move end past it) + ++end; + } else { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } + return AckRange(start, end); +} + +void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, AcquireFunctor(acquired)); +} + +void Session::release(DeliveryId first, DeliveryId last) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release)); +} + +void Session::reject(DeliveryId first, DeliveryId last) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); + //need to remove the delivery records as well + unacked.erase(range.start, range.end); +} + }} // namespace qpid::broker |