diff options
| author | Gordon Sim <gsim@apache.org> | 2011-02-21 17:30:17 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2011-02-21 17:30:17 +0000 |
| commit | 44d8c4e9eac85fa612b54b5515667c0806f1accb (patch) | |
| tree | a119d355ca1d863079a5dc1783023de11fdd9cfa /cpp/src/qpid | |
| parent | 20a5c70c4b369d0c288305d8424dbd74fc7f4934 (diff) | |
| download | qpid-python-44d8c4e9eac85fa612b54b5515667c0806f1accb.tar.gz | |
QPID-3051: Ensure credit window is moved correctly even if it contains rejected messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1073085 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 24 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 7 |
2 files changed, 18 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 9443eb6ea5..64760bea36 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -131,18 +131,20 @@ void DeliveryRecord::committed() const{ void DeliveryRecord::reject() { - Exchange::shared_ptr alternate = queue->getAlternateExchange(); - if (alternate) { - DeliverableMessage delivery(msg.payload); - alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); - QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " - << alternate->getName()); - } else { - //just drop it - QPID_LOG(info, "Dropping rejected message from " << queue->getName()); + if (acquired && !ended) { + Exchange::shared_ptr alternate = queue->getAlternateExchange(); + if (alternate) { + DeliverableMessage delivery(msg.payload); + alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); + QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " + << alternate->getName()); + } else { + //just drop it + QPID_LOG(info, "Dropping rejected message from " << queue->getName()); + } + dequeue(); + setEnded(); } - - dequeue(); } uint32_t DeliveryRecord::getCredit() const diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index c91cfba2f8..cfc379f47c 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -697,8 +697,11 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) { AckRange range = findRange(first, last); for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); - //need to remove the delivery records as well - unacked.erase(range.start, range.end); + //may need to remove the delivery records as well + for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) { + if (i->isRedundant()) i = unacked.erase(i); + else i++; + } } bool SemanticState::ConsumerImpl::doOutput() |
