diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-07 13:20:02 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-07 13:20:02 +0000 |
commit | 5d8a9df4ec3a4f030ed80e143ce6986c19ab800a (patch) | |
tree | 8417c3abe9dd81e6a73084aa36371981e06f9e27 /cpp/src | |
parent | 9fd4909832e16734c47c13eebbe4aca66640b1b0 (diff) | |
download | qpid-python-5d8a9df4ec3a4f030ed80e143ce6986c19ab800a.tar.gz |
Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance.
Converted flow control python tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 64 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 4 |
7 files changed, 86 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index ed4bb176f6..65c60182b8 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -35,7 +35,7 @@ namespace qpid { { intrusive_ptr<Message> payload; framing::SequenceNumber position; - Queue* queue; + Queue* queue; QueuedMessage(Queue* q, intrusive_ptr<Message> msg, framing::SequenceNumber sn) : payload(msg), position(sn), queue(q) {} diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 154394e5de..ca90f32a5d 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -32,16 +32,20 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, const std::string _tag, DeliveryToken::shared_ptr _token, const DeliveryId _id, - bool _acquired, bool _confirmed) : msg(_msg), - queue(_queue), - tag(_tag), - token(_token), - id(_id), - acquired(_acquired), - confirmed(_confirmed), - pull(false), - cancelled(false) + bool _acquired, bool accepted) : msg(_msg), + queue(_queue), + tag(_tag), + token(_token), + id(_id), + acquired(_acquired), + pull(false), + cancelled(false), + credit(msg.payload ? msg.payload->getRequiredCredit() : 0), + size(msg.payload ? msg.payload->contentSize() : 0), + completed(false), + ended(accepted) { + if (accepted) setEnded(); } DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, @@ -50,14 +54,23 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, queue(_queue), id(_id), acquired(true), - confirmed(false), pull(true), - cancelled(false) + cancelled(false), + credit(msg.payload ? msg.payload->getRequiredCredit() : 0), + size(msg.payload ? msg.payload->contentSize() : 0), + completed(false), + ended(false) {} +void DeliveryRecord::setEnded() +{ + ended = true; + //reset msg pointer, don't need to hold on to it anymore + msg.payload = boost::intrusive_ptr<Message>(); +} void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ - if (acquired && !confirmed) { + if (acquired && !ended) { queue->dequeue(ctxt, msg.payload); } } @@ -79,7 +92,7 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const } void DeliveryRecord::redeliver(SemanticState* const session) { - if (!confirmed) { + if (!ended) { if(pull || cancelled){ //if message was originally sent as response to get, we must requeue it @@ -96,7 +109,7 @@ void DeliveryRecord::redeliver(SemanticState* const session) { void DeliveryRecord::requeue() const { - if (acquired && !confirmed) { + if (acquired && !ended) { msg.payload->redeliver(); queue->requeue(msg); } @@ -104,9 +117,22 @@ void DeliveryRecord::requeue() const void DeliveryRecord::release() { - if (acquired && !confirmed) { + if (acquired && !ended) { queue->requeue(msg); acquired = false; + setEnded(); + } +} + +void DeliveryRecord::complete() +{ + completed = true; +} + +void DeliveryRecord::accept(TransactionContext* ctxt) { + if (acquired && !ended) { + queue->dequeue(ctxt, msg.payload); + setEnded(); } } @@ -124,9 +150,9 @@ void DeliveryRecord::reject() } } -void DeliveryRecord::updateByteCredit(uint32_t& credit) const +uint32_t DeliveryRecord::getCredit() const { - credit += msg.payload->getRequiredCredit(); + return credit; } @@ -134,7 +160,7 @@ void DeliveryRecord::addTo(Prefetch& prefetch) const{ if(!pull){ //ignore 'pulled' messages (i.e. those that were sent in //response to get) when calculating prefetch - prefetch.size += msg.payload->contentSize(); + prefetch.size += size; prefetch.count++; } } @@ -143,7 +169,7 @@ void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{ if(!pull){ //ignore 'pulled' messages (i.e. those that were sent in //response to get) when calculating prefetch - prefetch.size -= msg.payload->contentSize(); + prefetch.size -= size; prefetch.count--; } } diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index eeb363bcfc..b2672345b4 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -47,32 +47,45 @@ class DeliveryRecord{ DeliveryToken::shared_ptr token; DeliveryId id; bool acquired; - const bool confirmed; const bool pull; bool cancelled; + const uint32_t credit; + const uint64_t size; + + bool completed; + bool ended; + + void setEnded(); public: DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, const DeliveryId id, bool acquired, bool confirmed = false); DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); - void dequeue(TransactionContext* ctxt = 0) const; bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; bool coveredBy(const framing::AccumulatedAck* const range) const; + + void dequeue(TransactionContext* ctxt = 0) const; void requeue() const; void release(); void reject(); void cancel(const std::string& tag); void redeliver(SemanticState* const); - void updateByteCredit(uint32_t& credit) const; + void acquire(DeliveryIds& results); + void complete(); + void accept(TransactionContext* ctxt); + + bool isAcquired() const { return acquired; } + bool isComplete() const { return completed; } + bool isRedundant() const { return ended && completed; } + + uint32_t getCredit() const; void addTo(Prefetch&) const; void subtractFrom(Prefetch&) const; const std::string& getTag() const { return tag; } bool isPull() const { return pull; } - bool isAcquired() const { return acquired; } - void acquire(DeliveryIds& results); friend bool operator<(const DeliveryRecord&, const DeliveryRecord&); friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp index 676f9e4b3d..6c3d960d1f 100644 --- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp @@ -64,7 +64,7 @@ void MultiVersionConnectionInputHandler::idleIn() bool MultiVersionConnectionInputHandler::doOutput() { - return check(false) && handler->doOutput(); + return handler.get() && handler->doOutput(); } qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation() @@ -74,17 +74,14 @@ qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiat void MultiVersionConnectionInputHandler::closed() { - check(); - handler->closed(); + if (handler.get()) handler->closed(); + //else closed before initiated, nothing to do } -bool MultiVersionConnectionInputHandler::check(bool fail) +void MultiVersionConnectionInputHandler::check() { if (!handler.get()) { - if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!"); - else return false; - } else { - return true; + throw qpid::framing::InternalErrorException("Handler not initialised!"); } } diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h index 4301eba57c..440c00c09a 100644 --- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h @@ -38,7 +38,7 @@ class MultiVersionConnectionInputHandler : public qpid::sys::ConnectionInputHand Broker& broker; const std::string id; - bool check(bool fail = true); + void check(); public: MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 5851eeeafb..f372c60044 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -393,7 +393,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); + for_each(start, end, boost::bind(&SemanticState::complete, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -433,20 +433,23 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::adjustFlow(const DeliveryRecord& delivery) +void SemanticState::complete(DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->adjustFlow(delivery); + get_pointer(i)->complete(delivery); } } -void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { - if (windowing) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); + if (!delivery.isComplete()) { + delivery.complete(); + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); + } } } @@ -662,15 +665,16 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) dtxBuffer->enlist(txAck); } } else { - for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); - unacked.erase(range.start, range.end); + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0)); + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); } } void SemanticState::completed(DeliveryId first, DeliveryId last) { AckRange range = findRange(first, last); - for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1)); + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); requestDispatch(); } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 8648135cae..3d31d5a5a2 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -88,7 +88,7 @@ class SemanticState : public framing::FrameHandler::Chains, void addMessageCredit(uint32_t value); void flush(); void stop(); - void adjustFlow(const DeliveryRecord&); + void complete(DeliveryRecord&); Queue::shared_ptr getQueue() { return queue; } bool isBlocked() const { return blocked; } @@ -122,7 +122,7 @@ class SemanticState : public framing::FrameHandler::Chains, void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); - void adjustFlow(const DeliveryRecord&); + void complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); |