summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Session.cpp')
-rw-r--r--cpp/src/qpid/broker/Session.cpp55
1 files changed, 47 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
index 26b694d073..a8b22cb12a 100644
--- a/cpp/src/qpid/broker/Session.cpp
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -34,6 +34,7 @@
#include "TxPublish.h"
#include "qpid/QpidError.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -91,12 +92,12 @@ bool Session::exists(const string& consumerTag){
}
void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool nolocal, bool acks,
+ Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
bool exclusive, const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -239,7 +240,9 @@ Session::ConsumerImpl::ConsumerImpl(Session* _parent,
bool ack,
bool _nolocal,
bool _acquire
-) : parent(_parent),
+ ) :
+ Consumer(_acquire),
+ parent(_parent),
token(_token),
name(_name),
queue(_queue),
@@ -266,7 +269,7 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
DeliveryId deliveryTag =
parent->deliveryAdapter->deliver(msg.payload, token);
if (ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire));
}
}
return !blocked;
@@ -312,7 +315,7 @@ void Session::ConsumerImpl::cancel()
void Session::ConsumerImpl::requestDispatch()
{
if(blocked)
- queue->requestDispatch();
+ queue->requestDispatch(this);
}
void Session::handle(Message::shared_ptr msg) {
@@ -532,9 +535,7 @@ void Session::ConsumerImpl::addMessageCredit(uint32_t value)
void Session::ConsumerImpl::flush()
{
- //TODO: need to wait until any messages that are available for
- //this consumer have been delivered... i.e. some sort of flush on
- //the queue...
+ queue->requestDispatch(this, true);
}
void Session::ConsumerImpl::stop()
@@ -559,4 +560,42 @@ Queue::shared_ptr Session::getQueue(const string& name) const {
return queue;
}
+AckRange Session::findRange(DeliveryId first, DeliveryId last)
+{
+ ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
+
+ if (first == last) {
+ //just acked single element (move end past it)
+ ++end;
+ } else {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ }
+ return AckRange(start, end);
+}
+
+void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, AcquireFunctor(acquired));
+}
+
+void Session::release(DeliveryId first, DeliveryId last)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
+}
+
+void Session::reject(DeliveryId first, DeliveryId last)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
+ //need to remove the delivery records as well
+ unacked.erase(range.start, range.end);
+}
+
}} // namespace qpid::broker