summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/AccumulatedAck.h2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp2
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp102
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h10
-rw-r--r--cpp/src/qpid/broker/Consumer.h7
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp59
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h33
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp42
-rw-r--r--cpp/src/qpid/broker/Session.cpp55
-rw-r--r--cpp/src/qpid/broker/Session.h10
-rw-r--r--cpp/src/qpid/framing/SequenceNumberSet.cpp16
-rw-r--r--cpp/src/qpid/framing/SequenceNumberSet.h1
-rw-r--r--cpp/src/tests/TxAckTest.cpp2
13 files changed, 281 insertions, 60 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h
index b53f4a8ba5..9c7cc3d887 100644
--- a/cpp/src/qpid/broker/AccumulatedAck.h
+++ b/cpp/src/qpid/broker/AccumulatedAck.h
@@ -57,7 +57,7 @@ namespace qpid {
*/
std::list<Range> ranges;
- AccumulatedAck(DeliveryId r) : mark(r) {}
+ explicit AccumulatedAck(DeliveryId r) : mark(r) {}
void update(DeliveryId firstTag, DeliveryId lastTag);
void consolidate();
void clear();
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index a6e9c007cf..1a44b09188 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -306,7 +306,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- session.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
+ session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
if(!nowait) client.consumeOk(newTag);
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 553f6016d2..a094c7a804 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -112,31 +112,50 @@ void Queue::requeue(const QueuedMessage& msg){
}
-
-void Queue::requestDispatch(){
- serializer.execute(dispatchCallback);
+bool Queue::acquire(const QueuedMessage& msg) {
+ Mutex::ScopedLock locker(messageLock);
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (i->position == msg.position) {
+ messages.erase(i);
+ return true;
+ }
+ }
+ return false;
}
+void Queue::requestDispatch(Consumer* c, bool sync){
+ if (!c || c->preAcquires()) {
+ if (sync) {
+ serializer.dispatch();
+ } else {
+ serializer.execute(dispatchCallback);
+ }
+ } else {
+ //note: this is always done on the callers thread, regardless
+ // of sync; browsers of large queues should use flow control!
+ serviceBrowser(c);
+ }
+}
bool Queue::dispatch(QueuedMessage& msg){
RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
- if(consumers.empty()){
+ if(acquirers.empty()){
return false;
}else if(exclusive){
return exclusive->deliver(msg);
}else{
//deliver to next consumer
- next = next % consumers.size();
- Consumer* c = consumers[next];
+ next = next % acquirers.size();
+ Consumer* c = acquirers[next];
int start = next;
while(c){
next++;
if(c->deliver(msg)) return true;
- next = next % consumers.size();
- c = next == start ? 0 : consumers[next];
+ next = next % acquirers.size();
+ c = next == start ? 0 : acquirers[next];
}
return false;
}
@@ -153,34 +172,79 @@ void Queue::dispatch(){
}
if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
pop();
- } else {
+ } else {
break;
}
- }
+ }
+ RWlock::ScopedRlock locker(consumerLock);
+ for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) {
+ serviceBrowser(*i);
+ }
+}
+
+void Queue::serviceBrowser(Consumer* browser)
+{
+ //This is a poorly performing implementation:
+ //
+ // * bad concurrency where browsers exist
+ // * inefficient for largish queues
+ //
+ //The queue needs to be based on a current data structure that
+ //does not invalidate iterators when modified. Subscribers could
+ //then use an iterator to continue from where they left off
+
+ Mutex::ScopedLock locker(messageLock);
+ if (!messages.empty() && messages.back().position > browser->position) {
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (i->position > browser->position) {
+ if (browser->deliver(*i)) {
+ browser->position = i->position;
+ } else {
+ break;
+ }
+ }
+ }
+ }
}
void Queue::consume(Consumer* c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
- if(exclusive)
+ if(exclusive) {
throw ChannelException(
403, format("Queue '%s' has an exclusive consumer."
" No more consumers allowed.") % getName());
+ }
if(requestExclusive) {
- if(!consumers.empty())
+ if(acquirers.empty() && browsers.empty()) {
+ exclusive = c;
+ } else {
throw ChannelException(
- 403, format("Queue '%s' already has conumers."
- "Exclusive access denied.") %getName());
- exclusive = c;
+ 403, format("Queue '%s' already has consumers."
+ "Exclusive access denied.") % getName());
+ }
+ }
+ if (c->preAcquires()) {
+ acquirers.push_back(c);
+ } else {
+ browsers.push_back(c);
}
- consumers.push_back(c);
}
void Queue::cancel(Consumer* c){
RWlock::ScopedWlock locker(consumerLock);
+ if (c->preAcquires()) {
+ cancel(c, acquirers);
+ } else {
+ cancel(c, browsers);
+ }
+ if(exclusive == c) exclusive = 0;
+}
+
+void Queue::cancel(Consumer* c, Consumers& consumers)
+{
Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
if (i != consumers.end())
consumers.erase(i);
- if(exclusive == c) exclusive = 0;
}
QueuedMessage Queue::dequeue(){
@@ -233,12 +297,12 @@ uint32_t Queue::getMessageCount() const{
uint32_t Queue::getConsumerCount() const{
RWlock::ScopedRlock locker(consumerLock);
- return consumers.size();
+ return acquirers.size() + browsers.size();
}
bool Queue::canAutoDelete() const{
RWlock::ScopedRlock locker(consumerLock);
- return autodelete && consumers.size() == 0;
+ return autodelete && acquirers.empty() && browsers.empty();
}
// return true if store exists,
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index d15b5fc8c5..4b6070d11c 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -68,7 +68,8 @@ namespace qpid {
const bool autodelete;
MessageStore* const store;
const ConnectionToken* const owner;
- Consumers consumers;
+ Consumers acquirers;
+ Consumers browsers;
Messages messages;
int next;
mutable qpid::sys::RWlock consumerLock;
@@ -91,6 +92,8 @@ namespace qpid {
* only called by serilizer
*/
void dispatch();
+ void cancel(Consumer* c, Consumers& set);
+ void serviceBrowser(Consumer* c);
protected:
/**
@@ -114,6 +117,9 @@ namespace qpid {
void destroy();
void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args);
void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
+
+ bool acquire(const QueuedMessage& msg);
+
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
@@ -141,7 +147,7 @@ namespace qpid {
* at any time, so this call schedules the despatch based on
* the serilizer policy.
*/
- void requestDispatch();
+ void requestDispatch(Consumer* c = 0, bool sync = false);
void consume(Consumer* c, bool exclusive = false);
void cancel(Consumer* c);
uint32_t purge();
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 52da25082c..c482a44ab1 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -36,8 +36,13 @@ namespace qpid {
};
- class Consumer{
+ class Consumer {
+ const bool acquires;
public:
+ framing::SequenceNumber position;
+
+ Consumer(bool preAcquires = true) : acquires(preAcquires) {}
+ bool preAcquires() const { return acquires; }
virtual bool deliver(QueuedMessage& msg) = 0;
virtual ~Consumer(){}
};
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 5c7c632c05..7649715ade 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -19,7 +19,9 @@
*
*/
#include "DeliveryRecord.h"
+#include "DeliverableMessage.h"
#include "Session.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
using std::string;
@@ -27,29 +29,32 @@ using std::string;
DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
const string _consumerTag,
- const DeliveryId _deliveryTag) : msg(_msg),
+ const DeliveryId _id,
+ bool _acquired) : msg(_msg),
queue(_queue),
consumerTag(_consumerTag),
- deliveryTag(_deliveryTag),
- acquired(false),
+ id(_id),
+ acquired(_acquired),
pull(false){}
DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
- const DeliveryId _deliveryTag) : msg(_msg),
+ const DeliveryId _id) : msg(_msg),
queue(_queue),
consumerTag(""),
- deliveryTag(_deliveryTag),
- acquired(false),
+ id(_id),
+ acquired(true),
pull(true){}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- queue->dequeue(ctxt, msg.payload);
+ if (acquired) {
+ queue->dequeue(ctxt, msg.payload);
+ }
}
bool DeliveryRecord::matches(DeliveryId tag) const{
- return deliveryTag == tag;
+ return id == tag;
}
bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
@@ -57,11 +62,11 @@ bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
}
bool DeliveryRecord::after(DeliveryId tag) const{
- return deliveryTag > tag;
+ return id > tag;
}
bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
- return range->covers(deliveryTag);
+ return range->covers(id);
}
void DeliveryRecord::redeliver(Session* const session) const{
@@ -69,15 +74,36 @@ void DeliveryRecord::redeliver(Session* const session) const{
//if message was originally sent as response to get, we must requeue it
requeue();
}else{
- session->deliver(msg.payload, consumerTag, deliveryTag);
+ session->deliver(msg.payload, consumerTag, id);
}
}
-void DeliveryRecord::requeue() const{
+void DeliveryRecord::requeue() const
+{
msg.payload->redeliver();
queue->requeue(msg);
}
+void DeliveryRecord::release()
+{
+ queue->requeue(msg);
+ acquired = false;
+}
+
+void DeliveryRecord::reject()
+{
+ Exchange::shared_ptr alternate = queue->getAlternateExchange();
+ if (alternate) {
+ DeliverableMessage delivery(msg.payload);
+ alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders()));
+ QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+ }
+}
+
void DeliveryRecord::updateByteCredit(uint32_t& credit) const
{
credit += msg.payload->getRequiredCredit();
@@ -102,11 +128,18 @@ void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{
}
}
+void DeliveryRecord::acquire(std::vector<DeliveryId>& results) {
+ if (queue->acquire(msg)) {
+ acquired = true;
+ results.push_back(id);
+ }
+}
+
namespace qpid {
namespace broker {
std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) {
- out << "{" << "id=" << r.deliveryTag.getValue();
+ out << "{" << "id=" << r.id.getValue();
out << ", consumer=" << r.consumerTag;
out << ", queue=" << r.queue->getName() << "}";
return out;
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index d453458f62..583579ac10 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -23,6 +23,7 @@
#include <algorithm>
#include <list>
+#include <vector>
#include <ostream>
#include "AccumulatedAck.h"
#include "BrokerQueue.h"
@@ -42,13 +43,14 @@ class DeliveryRecord{
mutable QueuedMessage msg;
mutable Queue::shared_ptr queue;
const std::string consumerTag;
- const DeliveryId deliveryTag;
+ const DeliveryId id;
bool acquired;
const bool pull;
public:
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
+ DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag,
+ const DeliveryId id, bool acquired);
+ DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
void dequeue(TransactionContext* ctxt = 0) const;
bool matches(DeliveryId tag) const;
@@ -56,6 +58,8 @@ class DeliveryRecord{
bool after(DeliveryId tag) const;
bool coveredBy(const AccumulatedAck* const range) const;
void requeue() const;
+ void release();
+ void reject();
void redeliver(Session* const) const;
void updateByteCredit(uint32_t& credit) const;
void addTo(Prefetch&) const;
@@ -63,12 +67,33 @@ class DeliveryRecord{
const std::string& getConsumerTag() const { return consumerTag; }
bool isPull() const { return pull; }
bool isAcquired() const { return acquired; }
- void setAcquired(bool isAcquired) { acquired = isAcquired; }
+ //void setAcquired(bool isAcquired) { acquired = isAcquired; }
+ void acquire(std::vector<DeliveryId>& results);
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
typedef std::list<DeliveryRecord>::iterator ack_iterator;
+
+struct AckRange
+{
+ ack_iterator start;
+ ack_iterator end;
+ AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
+};
+
+struct AcquireFunctor
+{
+ std::vector<DeliveryId>& results;
+
+ AcquireFunctor(std::vector<DeliveryId>& _results) : results(_results) {}
+
+ void operator()(DeliveryRecord& record)
+ {
+ record.acquire(results);
+ }
+};
+
}
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 7529e3bb39..d9b91c1617 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -25,6 +25,7 @@
#include "MessageDelivery.h"
#include "qpid/framing/MessageAppendBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "BrokerAdapter.h"
#include <boost/format.hpp>
@@ -92,7 +93,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
const string& destination,
bool noLocal,
u_int8_t confirmMode,
- u_int8_t acquireMode,//TODO: implement acquire modes
+ u_int8_t acquireMode,
bool exclusive,
const framing::FieldTable& filter )
{
@@ -101,8 +102,10 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
+ //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
+ //the previously expected behaviour
session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
- tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
+ tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -156,9 +159,15 @@ MessageHandlerImpl::recover(bool requeue)
}
void
-MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ )
+MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
{
- //TODO: implement
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.reject(i->getValue(), (++i)->getValue());
+ }
}
void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
@@ -200,14 +209,31 @@ void MessageHandlerImpl::stop(const std::string& destination)
session.stop(destination);
}
-void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)
+void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
{
- throw ConnectionException(540, "Not yet implemented");
+ SequenceNumberSet results;
+
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.acquire(i->getValue(), (++i)->getValue(), results);
+ }
+
+ results = results.condense();
+ client.acquired(results);
}
-void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/)
+void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
{
- throw ConnectionException(540, "Not yet implemented");
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.release(i->getValue(), (++i)->getValue());
+ }
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
index 26b694d073..a8b22cb12a 100644
--- a/cpp/src/qpid/broker/Session.cpp
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -34,6 +34,7 @@
#include "TxPublish.h"
#include "qpid/QpidError.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -91,12 +92,12 @@ bool Session::exists(const string& consumerTag){
}
void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool nolocal, bool acks,
+ Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
bool exclusive, const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -239,7 +240,9 @@ Session::ConsumerImpl::ConsumerImpl(Session* _parent,
bool ack,
bool _nolocal,
bool _acquire
-) : parent(_parent),
+ ) :
+ Consumer(_acquire),
+ parent(_parent),
token(_token),
name(_name),
queue(_queue),
@@ -266,7 +269,7 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
DeliveryId deliveryTag =
parent->deliveryAdapter->deliver(msg.payload, token);
if (ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire));
}
}
return !blocked;
@@ -312,7 +315,7 @@ void Session::ConsumerImpl::cancel()
void Session::ConsumerImpl::requestDispatch()
{
if(blocked)
- queue->requestDispatch();
+ queue->requestDispatch(this);
}
void Session::handle(Message::shared_ptr msg) {
@@ -532,9 +535,7 @@ void Session::ConsumerImpl::addMessageCredit(uint32_t value)
void Session::ConsumerImpl::flush()
{
- //TODO: need to wait until any messages that are available for
- //this consumer have been delivered... i.e. some sort of flush on
- //the queue...
+ queue->requestDispatch(this, true);
}
void Session::ConsumerImpl::stop()
@@ -559,4 +560,42 @@ Queue::shared_ptr Session::getQueue(const string& name) const {
return queue;
}
+AckRange Session::findRange(DeliveryId first, DeliveryId last)
+{
+ ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
+
+ if (first == last) {
+ //just acked single element (move end past it)
+ ++end;
+ } else {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ }
+ return AckRange(start, end);
+}
+
+void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, AcquireFunctor(acquired));
+}
+
+void Session::release(DeliveryId first, DeliveryId last)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
+}
+
+void Session::reject(DeliveryId first, DeliveryId last)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
+ //need to remove the delivery records as well
+ unacked.erase(range.start, range.end);
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h
index 61ed25f623..8458f4cabf 100644
--- a/cpp/src/qpid/broker/Session.h
+++ b/cpp/src/qpid/broker/Session.h
@@ -40,6 +40,7 @@
#include <boost/ptr_container/ptr_vector.hpp>
#include <list>
+#include <vector>
namespace qpid {
@@ -80,7 +81,7 @@ class Session : public framing::FrameHandler::Chains,
public:
ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
- bool ack, bool nolocal, bool acquire=true);
+ bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
@@ -131,6 +132,8 @@ class Session : public framing::FrameHandler::Chains,
// FIXME aconway 2007-08-31: remove, temporary hack.
SemanticHandler* semanticHandler;
+
+ AckRange findRange(DeliveryId first, DeliveryId last);
public:
@@ -166,7 +169,7 @@ class Session : public framing::FrameHandler::Chains,
*@param tagInOut - if empty it is updated with the generated token.
*/
void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
- bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0);
+ bool nolocal, bool acks, bool acquire, bool exclusive, const framing::FieldTable* = 0);
void cancel(const string& tag);
@@ -192,6 +195,9 @@ class Session : public framing::FrameHandler::Chains,
void recover(bool requeue);
void flow(bool active);
void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
+ void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
+ void release(DeliveryId first, DeliveryId last);
+ void reject(DeliveryId first, DeliveryId last);
void handle(Message::shared_ptr msg);
diff --git a/cpp/src/qpid/framing/SequenceNumberSet.cpp b/cpp/src/qpid/framing/SequenceNumberSet.cpp
index 357b5dabd7..3bee5fb09a 100644
--- a/cpp/src/qpid/framing/SequenceNumberSet.cpp
+++ b/cpp/src/qpid/framing/SequenceNumberSet.cpp
@@ -44,6 +44,22 @@ uint32_t SequenceNumberSet::encodedSize() const
return 2 /*count*/ + (size() * 4);
}
+SequenceNumberSet SequenceNumberSet::condense() const
+{
+ SequenceNumberSet result;
+ const_iterator last = end();
+ for (const_iterator i = begin(); i != end(); i++) {
+ if (last == end()) {
+ last = i;
+ } else if (*i - *last > 1) {
+ result.push_back(*last);
+ result.push_back(*i);
+ last = end();
+ }
+ }
+ return result;
+}
+
namespace qpid{
namespace framing{
diff --git a/cpp/src/qpid/framing/SequenceNumberSet.h b/cpp/src/qpid/framing/SequenceNumberSet.h
index bcf78d4f22..7643d68071 100644
--- a/cpp/src/qpid/framing/SequenceNumberSet.h
+++ b/cpp/src/qpid/framing/SequenceNumberSet.h
@@ -39,6 +39,7 @@ public:
void encode(Buffer& buffer) const;
void decode(Buffer& buffer);
uint32_t encodedSize() const;
+ SequenceNumberSet condense() const;
friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&);
};
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index 65426e4e21..34d0bcd156 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -78,7 +78,7 @@ public:
messages.push_back(msg);
QueuedMessage qm;
qm.payload = msg;
- deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1)));
+ deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1), true));
}
//assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)