diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-03 17:35:35 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-03 17:35:35 +0000 |
commit | 331b0e84ae06da0c057a82c0f5b67f550bcd636b (patch) | |
tree | 91342743f16ad473b456a5ef336409e4af38cd5a /cpp/src | |
parent | cadb67eb27e00abb493793363dcf37f4d2f563dc (diff) | |
download | qpid-python-331b0e84ae06da0c057a82c0f5b67f550bcd636b.tar.gz |
Initial implementation (plus very simple tests) of message.acquire, message.release, message.reject and message.flush.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@572394 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/AccumulatedAck.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 102 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 59 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 33 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Session.cpp | 55 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Session.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceNumberSet.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceNumberSet.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/TxAckTest.cpp | 2 |
13 files changed, 281 insertions, 60 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h index b53f4a8ba5..9c7cc3d887 100644 --- a/cpp/src/qpid/broker/AccumulatedAck.h +++ b/cpp/src/qpid/broker/AccumulatedAck.h @@ -57,7 +57,7 @@ namespace qpid { */ std::list<Range> ranges; - AccumulatedAck(DeliveryId r) : mark(r) {} + explicit AccumulatedAck(DeliveryId r) : mark(r) {} void update(DeliveryId firstTag, DeliveryId lastTag); void consolidate(); void clear(); diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index a6e9c007cf..1a44b09188 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -306,7 +306,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); - session.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields); + session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); if(!nowait) client.consumeOk(newTag); diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 553f6016d2..a094c7a804 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -112,31 +112,50 @@ void Queue::requeue(const QueuedMessage& msg){ } - -void Queue::requestDispatch(){ - serializer.execute(dispatchCallback); +bool Queue::acquire(const QueuedMessage& msg) { + Mutex::ScopedLock locker(messageLock); + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position == msg.position) { + messages.erase(i); + return true; + } + } + return false; } +void Queue::requestDispatch(Consumer* c, bool sync){ + if (!c || c->preAcquires()) { + if (sync) { + serializer.dispatch(); + } else { + serializer.execute(dispatchCallback); + } + } else { + //note: this is always done on the callers thread, regardless + // of sync; browsers of large queues should use flow control! + serviceBrowser(c); + } +} bool Queue::dispatch(QueuedMessage& msg){ RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... - if(consumers.empty()){ + if(acquirers.empty()){ return false; }else if(exclusive){ return exclusive->deliver(msg); }else{ //deliver to next consumer - next = next % consumers.size(); - Consumer* c = consumers[next]; + next = next % acquirers.size(); + Consumer* c = acquirers[next]; int start = next; while(c){ next++; if(c->deliver(msg)) return true; - next = next % consumers.size(); - c = next == start ? 0 : consumers[next]; + next = next % acquirers.size(); + c = next == start ? 0 : acquirers[next]; } return false; } @@ -153,34 +172,79 @@ void Queue::dispatch(){ } if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { pop(); - } else { + } else { break; } - } + } + RWlock::ScopedRlock locker(consumerLock); + for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) { + serviceBrowser(*i); + } +} + +void Queue::serviceBrowser(Consumer* browser) +{ + //This is a poorly performing implementation: + // + // * bad concurrency where browsers exist + // * inefficient for largish queues + // + //The queue needs to be based on a current data structure that + //does not invalidate iterators when modified. Subscribers could + //then use an iterator to continue from where they left off + + Mutex::ScopedLock locker(messageLock); + if (!messages.empty() && messages.back().position > browser->position) { + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position > browser->position) { + if (browser->deliver(*i)) { + browser->position = i->position; + } else { + break; + } + } + } + } } void Queue::consume(Consumer* c, bool requestExclusive){ RWlock::ScopedWlock locker(consumerLock); - if(exclusive) + if(exclusive) { throw ChannelException( 403, format("Queue '%s' has an exclusive consumer." " No more consumers allowed.") % getName()); + } if(requestExclusive) { - if(!consumers.empty()) + if(acquirers.empty() && browsers.empty()) { + exclusive = c; + } else { throw ChannelException( - 403, format("Queue '%s' already has conumers." - "Exclusive access denied.") %getName()); - exclusive = c; + 403, format("Queue '%s' already has consumers." + "Exclusive access denied.") % getName()); + } + } + if (c->preAcquires()) { + acquirers.push_back(c); + } else { + browsers.push_back(c); } - consumers.push_back(c); } void Queue::cancel(Consumer* c){ RWlock::ScopedWlock locker(consumerLock); + if (c->preAcquires()) { + cancel(c, acquirers); + } else { + cancel(c, browsers); + } + if(exclusive == c) exclusive = 0; +} + +void Queue::cancel(Consumer* c, Consumers& consumers) +{ Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); if (i != consumers.end()) consumers.erase(i); - if(exclusive == c) exclusive = 0; } QueuedMessage Queue::dequeue(){ @@ -233,12 +297,12 @@ uint32_t Queue::getMessageCount() const{ uint32_t Queue::getConsumerCount() const{ RWlock::ScopedRlock locker(consumerLock); - return consumers.size(); + return acquirers.size() + browsers.size(); } bool Queue::canAutoDelete() const{ RWlock::ScopedRlock locker(consumerLock); - return autodelete && consumers.size() == 0; + return autodelete && acquirers.empty() && browsers.empty(); } // return true if store exists, diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index d15b5fc8c5..4b6070d11c 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -68,7 +68,8 @@ namespace qpid { const bool autodelete; MessageStore* const store; const ConnectionToken* const owner; - Consumers consumers; + Consumers acquirers; + Consumers browsers; Messages messages; int next; mutable qpid::sys::RWlock consumerLock; @@ -91,6 +92,8 @@ namespace qpid { * only called by serilizer */ void dispatch(); + void cancel(Consumer* c, Consumers& set); + void serviceBrowser(Consumer* c); protected: /** @@ -114,6 +117,9 @@ namespace qpid { void destroy(); void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args); void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); + + bool acquire(const QueuedMessage& msg); + /** * Delivers a message to the queue. Will record it as * enqueued if persistent then process it. @@ -141,7 +147,7 @@ namespace qpid { * at any time, so this call schedules the despatch based on * the serilizer policy. */ - void requestDispatch(); + void requestDispatch(Consumer* c = 0, bool sync = false); void consume(Consumer* c, bool exclusive = false); void cancel(Consumer* c); uint32_t purge(); diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 52da25082c..c482a44ab1 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -36,8 +36,13 @@ namespace qpid { }; - class Consumer{ + class Consumer { + const bool acquires; public: + framing::SequenceNumber position; + + Consumer(bool preAcquires = true) : acquires(preAcquires) {} + bool preAcquires() const { return acquires; } virtual bool deliver(QueuedMessage& msg) = 0; virtual ~Consumer(){} }; diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 5c7c632c05..7649715ade 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -19,7 +19,9 @@ * */ #include "DeliveryRecord.h" +#include "DeliverableMessage.h" #include "Session.h" +#include "qpid/log/Statement.h" using namespace qpid::broker; using std::string; @@ -27,29 +29,32 @@ using std::string; DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, Queue::shared_ptr _queue, const string _consumerTag, - const DeliveryId _deliveryTag) : msg(_msg), + const DeliveryId _id, + bool _acquired) : msg(_msg), queue(_queue), consumerTag(_consumerTag), - deliveryTag(_deliveryTag), - acquired(false), + id(_id), + acquired(_acquired), pull(false){} DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, Queue::shared_ptr _queue, - const DeliveryId _deliveryTag) : msg(_msg), + const DeliveryId _id) : msg(_msg), queue(_queue), consumerTag(""), - deliveryTag(_deliveryTag), - acquired(false), + id(_id), + acquired(true), pull(true){} void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ - queue->dequeue(ctxt, msg.payload); + if (acquired) { + queue->dequeue(ctxt, msg.payload); + } } bool DeliveryRecord::matches(DeliveryId tag) const{ - return deliveryTag == tag; + return id == tag; } bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{ @@ -57,11 +62,11 @@ bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{ } bool DeliveryRecord::after(DeliveryId tag) const{ - return deliveryTag > tag; + return id > tag; } bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ - return range->covers(deliveryTag); + return range->covers(id); } void DeliveryRecord::redeliver(Session* const session) const{ @@ -69,15 +74,36 @@ void DeliveryRecord::redeliver(Session* const session) const{ //if message was originally sent as response to get, we must requeue it requeue(); }else{ - session->deliver(msg.payload, consumerTag, deliveryTag); + session->deliver(msg.payload, consumerTag, id); } } -void DeliveryRecord::requeue() const{ +void DeliveryRecord::requeue() const +{ msg.payload->redeliver(); queue->requeue(msg); } +void DeliveryRecord::release() +{ + queue->requeue(msg); + acquired = false; +} + +void DeliveryRecord::reject() +{ + Exchange::shared_ptr alternate = queue->getAlternateExchange(); + if (alternate) { + DeliverableMessage delivery(msg.payload); + alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders())); + QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " + << alternate->getName()); + } else { + //just drop it + QPID_LOG(info, "Dropping rejected message from " << queue->getName()); + } +} + void DeliveryRecord::updateByteCredit(uint32_t& credit) const { credit += msg.payload->getRequiredCredit(); @@ -102,11 +128,18 @@ void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{ } } +void DeliveryRecord::acquire(std::vector<DeliveryId>& results) { + if (queue->acquire(msg)) { + acquired = true; + results.push_back(id); + } +} + namespace qpid { namespace broker { std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) { - out << "{" << "id=" << r.deliveryTag.getValue(); + out << "{" << "id=" << r.id.getValue(); out << ", consumer=" << r.consumerTag; out << ", queue=" << r.queue->getName() << "}"; return out; diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index d453458f62..583579ac10 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -23,6 +23,7 @@ #include <algorithm> #include <list> +#include <vector> #include <ostream> #include "AccumulatedAck.h" #include "BrokerQueue.h" @@ -42,13 +43,14 @@ class DeliveryRecord{ mutable QueuedMessage msg; mutable Queue::shared_ptr queue; const std::string consumerTag; - const DeliveryId deliveryTag; + const DeliveryId id; bool acquired; const bool pull; public: - DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag); - DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag); + DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, + const DeliveryId id, bool acquired); + DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); void dequeue(TransactionContext* ctxt = 0) const; bool matches(DeliveryId tag) const; @@ -56,6 +58,8 @@ class DeliveryRecord{ bool after(DeliveryId tag) const; bool coveredBy(const AccumulatedAck* const range) const; void requeue() const; + void release(); + void reject(); void redeliver(Session* const) const; void updateByteCredit(uint32_t& credit) const; void addTo(Prefetch&) const; @@ -63,12 +67,33 @@ class DeliveryRecord{ const std::string& getConsumerTag() const { return consumerTag; } bool isPull() const { return pull; } bool isAcquired() const { return acquired; } - void setAcquired(bool isAcquired) { acquired = isAcquired; } + //void setAcquired(bool isAcquired) { acquired = isAcquired; } + void acquire(std::vector<DeliveryId>& results); friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; typedef std::list<DeliveryRecord>::iterator ack_iterator; + +struct AckRange +{ + ack_iterator start; + ack_iterator end; + AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {} +}; + +struct AcquireFunctor +{ + std::vector<DeliveryId>& results; + + AcquireFunctor(std::vector<DeliveryId>& _results) : results(_results) {} + + void operator()(DeliveryRecord& record) + { + record.acquire(results); + } +}; + } } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 7529e3bb39..d9b91c1617 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -25,6 +25,7 @@ #include "MessageDelivery.h" #include "qpid/framing/MessageAppendBody.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "BrokerAdapter.h" #include <boost/format.hpp> @@ -92,7 +93,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, const string& destination, bool noLocal, u_int8_t confirmMode, - u_int8_t acquireMode,//TODO: implement acquire modes + u_int8_t acquireMode, bool exclusive, const framing::FieldTable& filter ) { @@ -101,8 +102,10 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; + //NB: am assuming pre-acquired = 0 as discussed on SIG list as is + //the previously expected behaviour session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), - tag, queue, noLocal, confirmMode == 1, exclusive, &filter); + tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -156,9 +159,15 @@ MessageHandlerImpl::recover(bool requeue) } void -MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ ) +MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ ) { - //TODO: implement + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.reject(i->getValue(), (++i)->getValue()); + } } void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) @@ -200,14 +209,31 @@ void MessageHandlerImpl::stop(const std::string& destination) session.stop(destination); } -void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/) +void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) { - throw ConnectionException(540, "Not yet implemented"); + SequenceNumberSet results; + + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.acquire(i->getValue(), (++i)->getValue(), results); + } + + results = results.condense(); + client.acquired(results); } -void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/) +void MessageHandlerImpl::release(const SequenceNumberSet& transfers) { - throw ConnectionException(540, "Not yet implemented"); + if (transfers.size() % 2) { //must be even number + throw InvalidArgumentException("Received odd number of elements in list of transfers"); + } + + for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { + session.release(i->getValue(), (++i)->getValue()); + } } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index 26b694d073..a8b22cb12a 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -34,6 +34,7 @@ #include "TxPublish.h" #include "qpid/QpidError.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <boost/format.hpp> @@ -91,12 +92,12 @@ bool Session::exists(const string& consumerTag){ } void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool nolocal, bool acks, + Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire, bool exclusive, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal)); + std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -239,7 +240,9 @@ Session::ConsumerImpl::ConsumerImpl(Session* _parent, bool ack, bool _nolocal, bool _acquire -) : parent(_parent), + ) : + Consumer(_acquire), + parent(_parent), token(_token), name(_name), queue(_queue), @@ -266,7 +269,7 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg) DeliveryId deliveryTag = parent->deliveryAdapter->deliver(msg.payload, token); if (ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, deliveryTag)); + parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire)); } } return !blocked; @@ -312,7 +315,7 @@ void Session::ConsumerImpl::cancel() void Session::ConsumerImpl::requestDispatch() { if(blocked) - queue->requestDispatch(); + queue->requestDispatch(this); } void Session::handle(Message::shared_ptr msg) { @@ -532,9 +535,7 @@ void Session::ConsumerImpl::addMessageCredit(uint32_t value) void Session::ConsumerImpl::flush() { - //TODO: need to wait until any messages that are available for - //this consumer have been delivered... i.e. some sort of flush on - //the queue... + queue->requestDispatch(this, true); } void Session::ConsumerImpl::stop() @@ -559,4 +560,42 @@ Queue::shared_ptr Session::getQueue(const string& name) const { return queue; } +AckRange Session::findRange(DeliveryId first, DeliveryId last) +{ + ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; + + if (first == last) { + //just acked single element (move end past it) + ++end; + } else { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } + return AckRange(start, end); +} + +void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, AcquireFunctor(acquired)); +} + +void Session::release(DeliveryId first, DeliveryId last) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release)); +} + +void Session::reject(DeliveryId first, DeliveryId last) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); + //need to remove the delivery records as well + unacked.erase(range.start, range.end); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h index 61ed25f623..8458f4cabf 100644 --- a/cpp/src/qpid/broker/Session.h +++ b/cpp/src/qpid/broker/Session.h @@ -40,6 +40,7 @@ #include <boost/ptr_container/ptr_vector.hpp> #include <list> +#include <vector> namespace qpid { @@ -80,7 +81,7 @@ class Session : public framing::FrameHandler::Chains, public: ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, - bool ack, bool nolocal, bool acquire=true); + bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); bool deliver(QueuedMessage& msg); void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag); @@ -131,6 +132,8 @@ class Session : public framing::FrameHandler::Chains, // FIXME aconway 2007-08-31: remove, temporary hack. SemanticHandler* semanticHandler; + + AckRange findRange(DeliveryId first, DeliveryId last); public: @@ -166,7 +169,7 @@ class Session : public framing::FrameHandler::Chains, *@param tagInOut - if empty it is updated with the generated token. */ void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, - bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0); + bool nolocal, bool acks, bool acquire, bool exclusive, const framing::FieldTable* = 0); void cancel(const string& tag); @@ -192,6 +195,9 @@ class Session : public framing::FrameHandler::Chains, void recover(bool requeue); void flow(bool active); void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); + void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired); + void release(DeliveryId first, DeliveryId last); + void reject(DeliveryId first, DeliveryId last); void handle(Message::shared_ptr msg); diff --git a/cpp/src/qpid/framing/SequenceNumberSet.cpp b/cpp/src/qpid/framing/SequenceNumberSet.cpp index 357b5dabd7..3bee5fb09a 100644 --- a/cpp/src/qpid/framing/SequenceNumberSet.cpp +++ b/cpp/src/qpid/framing/SequenceNumberSet.cpp @@ -44,6 +44,22 @@ uint32_t SequenceNumberSet::encodedSize() const return 2 /*count*/ + (size() * 4); } +SequenceNumberSet SequenceNumberSet::condense() const +{ + SequenceNumberSet result; + const_iterator last = end(); + for (const_iterator i = begin(); i != end(); i++) { + if (last == end()) { + last = i; + } else if (*i - *last > 1) { + result.push_back(*last); + result.push_back(*i); + last = end(); + } + } + return result; +} + namespace qpid{ namespace framing{ diff --git a/cpp/src/qpid/framing/SequenceNumberSet.h b/cpp/src/qpid/framing/SequenceNumberSet.h index bcf78d4f22..7643d68071 100644 --- a/cpp/src/qpid/framing/SequenceNumberSet.h +++ b/cpp/src/qpid/framing/SequenceNumberSet.h @@ -39,6 +39,7 @@ public: void encode(Buffer& buffer) const; void decode(Buffer& buffer); uint32_t encodedSize() const; + SequenceNumberSet condense() const; friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&); }; diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index 65426e4e21..34d0bcd156 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -78,7 +78,7 @@ public: messages.push_back(msg); QueuedMessage qm; qm.payload = msg; - deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1))); + deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1), true)); } //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) |