summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp2
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp21
2 files changed, 22 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 490ad91bfb..a105a67b3f 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -284,7 +284,7 @@ IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid:
for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
- if (transfer.checkExpired() && handler->expire(transfer)) {
+ if (handler && transfer.checkExpired() && handler->expire(transfer)) {
QPID_LOG(debug, "Expired received transfer: " << *content->getMethod());
} else if (handler && handler->accept(transfer)) {
QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 6bc43bc8e1..3b7ba34fe9 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1562,6 +1562,27 @@ QPID_AUTO_TEST_CASE(testClientExpiration)
BOOST_CHECK_EQUAL(b_count, 50);
}
+QPID_AUTO_TEST_CASE(testExpiredPrefetchOnClose)
+{
+ QueueFixture fix;
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Session other = fix.connection.createSession();
+ Receiver receiver2 = other.createReceiver("amq.fanout");
+ receiver.setCapacity(500);
+ Sender sender = fix.session.createSender(fix.queue);
+ for (uint i = 0; i < 500; ++i) {
+ Message msg((boost::format("a_%1%") % (i+1)).str());
+ msg.setSubject("a");
+ msg.setTtl(Duration(5));
+ sender.send(msg);
+ }
+ Sender sender2 = other.createSender("amq.fanout");
+ sender2.send(Message("done"));
+ BOOST_CHECK_EQUAL(receiver2.fetch().getContent(), "done");
+ qpid::sys::usleep(qpid::sys::TIME_MSEC*5);//sorry Alan, I can't see any way to avoid a sleep; need to ensure messages in prefetch have expired
+ receiver.close();
+}
+
QPID_AUTO_TEST_CASE(testPriorityRingEviction)
{
MessagingFixture fix;