summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/MessagingSessionTests.cpp
diff options
context:
space:
mode:
authorJonathan Robie <jonathan@apache.org>2011-02-07 18:21:24 +0000
committerJonathan Robie <jonathan@apache.org>2011-02-07 18:21:24 +0000
commiteb8e7cf77d382f4233d01cd6c4f96acb3c68f390 (patch)
tree1edda53e532d03b4f2eaf24d7a0e62f4c7dda710 /qpid/cpp/src/tests/MessagingSessionTests.cpp
parenta23886065e02cf0ea86dc62b2b752ebf35d1220b (diff)
downloadqpid-python-eb8e7cf77d382f4233d01cd6c4f96acb3c68f390.tar.gz
Ensures that messages acquired, but not acked, are released before a queue is deleted.
Otherwise, these messages are not routed to an alternate exchange, and the queue is not actually deleted. Resolves QPID-3040 / Red Hat BZ 674678. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1068042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/MessagingSessionTests.cpp')
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp58
1 files changed, 47 insertions, 11 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index fc1632b4e1..5dfb0f38bb 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public MessagingFixture
~QueueCreatePolicyFixture()
{
- admin.deleteQueue(address.getName());
+ admin.deleteQueue(address.getName());
}
};
@@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture
~ExchangeCreatePolicyFixture()
{
- admin.deleteExchange(address.getName());
+ admin.deleteExchange(address.getName());
}
};
@@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
-
+
std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
@@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerification)
{
MessagingFixture fix;
fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}");
- BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
+ BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
}
QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
@@ -775,21 +775,57 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscriber)
QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser)
{
MessagingFixture fix;
-
+
std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }";
std::string browseAddress = "exclusive-queue; { mode: browse }";
Receiver receiver = fix.session.createReceiver(address);
fix.session.sync();
- Connection c2 = fix.newConnection();
+ Connection c2 = fix.newConnection();
c2.open();
Session s2 = c2.createSession();
-
+
BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress));
- c2.close();
+ c2.close();
}
+
+QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages)
+{
+ MessagingFixture fix;
+ const uint capacity = 5;
+
+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+ Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}");
+ Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}");
+
+ receiver1.setCapacity(capacity);
+ receiver2.setCapacity(2*capacity);
+
+ Message out("test-message");
+ for (uint i = 0; i < capacity*2; ++i) {
+ sender.send(out);
+ }
+
+ // Read half the messages, do not acknowledge
+ Message in;
+ for (uint i = 0; i < capacity; ++i) {
+ in = receiver1.fetch(Duration::SECOND * 5);
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ }
+
+ receiver1.close();
+
+ // Make sure all unacked messages were sent to the alternate
+ // exchange when the queue was deleted.
+ for (uint i = 0; i < capacity*2; ++i) {
+ in = receiver2.fetch(Duration::SECOND * 5);
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ }
+}
+
+
QPID_AUTO_TEST_CASE(testAuthenticatedUsername)
{
MessagingFixture fix;
@@ -828,7 +864,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
messages.push_back(msg);
}
const uint batch(10); //acknowledge first 10 messages only
- for (uint i = 0; i < batch; ++i) {
+ for (uint i = 0; i < batch; ++i) {
other.acknowledge(messages[i]);
}
messages.clear();
@@ -836,7 +872,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
other.close();
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < (count-batch); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
@@ -847,7 +883,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
//check unacknowledged messages are still enqueued
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < ((count-batch)/2); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str());