diff options
author | Gordon Sim <gsim@apache.org> | 2011-02-21 17:30:17 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-02-21 17:30:17 +0000 |
commit | e12ff7bd297345f9a40ceee4308d4915ffb01938 (patch) | |
tree | 557f8a102bcf71980410ed4e89f901bc2c69161b | |
parent | 3d7df7067b2824d64f4c9193e7416eace56495be (diff) | |
download | qpid-python-e12ff7bd297345f9a40ceee4308d4915ffb01938.tar.gz |
QPID-3051: Ensure credit window is moved correctly even if it contains rejected messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1073085 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 41 |
3 files changed, 59 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 9443eb6ea5..64760bea36 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -131,18 +131,20 @@ void DeliveryRecord::committed() const{ void DeliveryRecord::reject() { - Exchange::shared_ptr alternate = queue->getAlternateExchange(); - if (alternate) { - DeliverableMessage delivery(msg.payload); - alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); - QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " - << alternate->getName()); - } else { - //just drop it - QPID_LOG(info, "Dropping rejected message from " << queue->getName()); + if (acquired && !ended) { + Exchange::shared_ptr alternate = queue->getAlternateExchange(); + if (alternate) { + DeliverableMessage delivery(msg.payload); + alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); + QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " + << alternate->getName()); + } else { + //just drop it + QPID_LOG(info, "Dropping rejected message from " << queue->getName()); + } + dequeue(); + setEnded(); } - - dequeue(); } uint32_t DeliveryRecord::getCredit() const diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index c91cfba2f8..cfc379f47c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -697,8 +697,11 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) { AckRange range = findRange(first, last); for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); - //need to remove the delivery records as well - unacked.erase(range.start, range.end); + //may need to remove the delivery records as well + for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) { + if (i->isRedundant()) i = unacked.erase(i); + else i++; + } } bool SemanticState::ConsumerImpl::doOutput() diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index f9a8b0e4c1..20ed16dece 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -937,6 +937,47 @@ QPID_AUTO_TEST_CASE(testQmfCreateAndDelete) } } +QPID_AUTO_TEST_CASE(testRejectAndCredit) +{ + //Ensure credit is restored on completing rejected messages + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Receiver receiver = fix.session.createReceiver(fix.queue); + + const uint count(10); + receiver.setCapacity(count); + for (uint i = 0; i < count; i++) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + + Message in; + for (uint i = 0; i < count; ++i) { + if (receiver.fetch(in, Duration::SECOND)) { + BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + fix.session.reject(in); + } else { + BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+1)).str()); + break; + } + } + //send another batch of messages + for (uint i = 0; i < count; i++) { + sender.send(Message((boost::format("Message_%1%") % (i+count)).str())); + } + + for (uint i = 0; i < count; ++i) { + if (receiver.fetch(in, Duration::SECOND)) { + BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+count)).str()); + } else { + BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+count)).str()); + break; + } + } + fix.session.acknowledge(); + receiver.close(); + sender.close(); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |