diff options
author | Jonathan Robie <jonathan@apache.org> | 2011-02-07 18:21:24 +0000 |
---|---|---|
committer | Jonathan Robie <jonathan@apache.org> | 2011-02-07 18:21:24 +0000 |
commit | eb8e7cf77d382f4233d01cd6c4f96acb3c68f390 (patch) | |
tree | 1edda53e532d03b4f2eaf24d7a0e62f4c7dda710 /qpid/cpp/src/tests/MessagingSessionTests.cpp | |
parent | a23886065e02cf0ea86dc62b2b752ebf35d1220b (diff) | |
download | qpid-python-eb8e7cf77d382f4233d01cd6c4f96acb3c68f390.tar.gz |
Ensures that messages acquired, but not acked, are released before a queue is deleted.
Otherwise, these messages are not routed to an alternate exchange, and the queue is not actually deleted.
Resolves QPID-3040 / Red Hat BZ 674678.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1068042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/MessagingSessionTests.cpp')
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 58 |
1 files changed, 47 insertions, 11 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index fc1632b4e1..5dfb0f38bb 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public MessagingFixture ~QueueCreatePolicyFixture() { - admin.deleteQueue(address.getName()); + admin.deleteQueue(address.getName()); } }; @@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture ~ExchangeCreatePolicyFixture() { - admin.deleteExchange(address.getName()); + admin.deleteExchange(address.getName()); } }; @@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue) s1.close(); Receiver r1 = fix.session.createReceiver(a1); r1.close(); - + std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}"; Sender s2 = fix.session.createSender(a2); s2.close(); @@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerification) { MessagingFixture fix; fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}"); - BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); + BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); } QPID_AUTO_TEST_CASE(testReceiveSpecialProperties) @@ -775,21 +775,57 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscriber) QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser) { MessagingFixture fix; - + std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }"; std::string browseAddress = "exclusive-queue; { mode: browse }"; Receiver receiver = fix.session.createReceiver(address); fix.session.sync(); - Connection c2 = fix.newConnection(); + Connection c2 = fix.newConnection(); c2.open(); Session s2 = c2.createSession(); - + BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress)); - c2.close(); + c2.close(); } + +QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages) +{ + MessagingFixture fix; + const uint capacity = 5; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}"); + Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}"); + + receiver1.setCapacity(capacity); + receiver2.setCapacity(2*capacity); + + Message out("test-message"); + for (uint i = 0; i < capacity*2; ++i) { + sender.send(out); + } + + // Read half the messages, do not acknowledge + Message in; + for (uint i = 0; i < capacity; ++i) { + in = receiver1.fetch(Duration::SECOND * 5); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + } + + receiver1.close(); + + // Make sure all unacked messages were sent to the alternate + // exchange when the queue was deleted. + for (uint i = 0; i < capacity*2; ++i) { + in = receiver2.fetch(Duration::SECOND * 5); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + } +} + + QPID_AUTO_TEST_CASE(testAuthenticatedUsername) { MessagingFixture fix; @@ -828,7 +864,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) messages.push_back(msg); } const uint batch(10); //acknowledge first 10 messages only - for (uint i = 0; i < batch; ++i) { + for (uint i = 0; i < batch; ++i) { other.acknowledge(messages[i]); } messages.clear(); @@ -836,7 +872,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) other.close(); other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < (count-batch); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); @@ -847,7 +883,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) //check unacknowledged messages are still enqueued other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < ((count-batch)/2); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str()); |