diff options
Diffstat (limited to 'qpid/cpp/src/tests/MessagingSessionTests.cpp')
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 6aa4c63ed7..fae45a94d0 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -992,6 +992,78 @@ QPID_AUTO_TEST_CASE(testTtlForever) BOOST_CHECK(in.getTtl() == Duration::FOREVER); } +QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber) +{ + TopicFixture fix; + std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str(); + Sender sender = fix.session.createSender(fix.topic); + Receiver receiver1 = fix.session.createReceiver(address); + { + ScopedSuppressLogging sl; + try { + fix.session.createReceiver(address); + fix.session.sync(); + BOOST_FAIL("Expected exception."); + } catch (const MessagingException& /*e*/) {} + } +} + +QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber) +{ + TopicFixture fix; + std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str(); + Receiver receiver1 = fix.session.createReceiver(address); + Receiver receiver2 = fix.session.createReceiver(address); + Sender sender = fix.session.createSender(fix.topic); + sender.send(Message("one"), true); + Message in = receiver1.fetch(Duration::IMMEDIATE); + BOOST_CHECK_EQUAL(in.getContent(), std::string("one")); + sender.send(Message("two"), true); + in = receiver2.fetch(Duration::IMMEDIATE); + BOOST_CHECK_EQUAL(in.getContent(), std::string("two")); + fix.session.acknowledge(); +} + +QPID_AUTO_TEST_CASE(testAcknowledgeUpTo) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + const uint count(20); + for (uint i = 0; i < count; ++i) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + + Session other = fix.connection.createSession(); + Receiver receiver = other.createReceiver(fix.queue); + std::vector<Message> messages; + for (uint i = 0; i < count; ++i) { + Message msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + messages.push_back(msg); + } + const uint batch = 10; + other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only + + messages.clear(); + other.sync(); + other.close(); + + other = fix.connection.createSession(); + receiver = other.createReceiver(fix.queue); + Message msg; + for (uint i = 0; i < (count-batch); ++i) { + msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); + } + other.acknowledgeUpTo(msg); + other.sync(); + other.close(); + + Message m; + //check queue is empty + BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |