diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 52aad75748..94d0cc87f7 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -132,6 +132,10 @@ bool SemanticState::cancel(const string& tag) //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); + //can also remove any records that are now redundant + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1)); + unacked.erase(removed, unacked.end()); return true; } else { return false; @@ -177,8 +181,8 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) if (!dtxSelected) { throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); } - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + dtxBuffer.reset(new DtxBuffer(xid)); + txBuffer = dtxBuffer; if (join) { mgr.join(xid, dtxBuffer); } else { @@ -246,7 +250,7 @@ void SemanticState::resumeDtx(const std::string& xid) checkDtxTimeout(); dtxBuffer->setSuspended(false); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + txBuffer = dtxBuffer; } void SemanticState::checkDtxTimeout() @@ -284,6 +288,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, acquire(_acquire), blocked(true), windowing(true), + windowActive(false), exclusive(_exclusive), resumeId(_resumeId), tag(_tag), @@ -534,7 +539,7 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); - if (windowing) { + if (windowing && windowActive) { if (msgCredit != 0xFFFFFFFF) msgCredit++; if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); } @@ -641,6 +646,7 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (byteCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) byteCredit = value; else byteCredit += value; @@ -650,6 +656,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (msgCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) msgCredit = value; else msgCredit += value; @@ -670,7 +677,8 @@ void SemanticState::ConsumerImpl::flush() { while(haveCredit() && queue->dispatch(shared_from_this())) ; - stop(); + msgCredit = 0; + byteCredit = 0; } void SemanticState::ConsumerImpl::stop() @@ -678,6 +686,7 @@ void SemanticState::ConsumerImpl::stop() assertClusterSafe(); msgCredit = 0; byteCredit = 0; + windowActive = false; } Queue::shared_ptr SemanticState::getQueue(const string& name) const { @@ -711,6 +720,10 @@ void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedeliver DeliveryRecords::reverse_iterator start(range.end); DeliveryRecords::reverse_iterator end(range.start); for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered)); + + DeliveryRecords::iterator removed = + remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1)); + unacked.erase(removed, range.end); } void SemanticState::reject(DeliveryId first, DeliveryId last) |