diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 58 |
1 files changed, 54 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 28fcdd13ad..bc7a666c95 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -85,6 +85,12 @@ void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { args += "--no-data-dir"; } +ClusterFixture::Args prepareArgs(const bool durableFlag = false) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + return args; +} + // Timeout for tests that wait for messages const sys::Duration TIMEOUT=sys::TIME_SEC/4; @@ -596,16 +602,19 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { } } -QPID_AUTO_TEST_CASE(testCatchupSharedState) { +// Test that message data and delivery properties are updated properly. +QPID_AUTO_TEST_CASE(testUpdateMessages) { ClusterFixture::Args args; prepareArgs(args, durableFlag); ClusterFixture cluster(1, args, -1); Client c0(cluster[0], "c0"); - // Create some shared state. + // Create messages with different delivery properties c0.session.queueDeclare("q", arg::durable=durableFlag); + c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q"); c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag)); + c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag), + arg::destination="amq.fanout"); while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. @@ -628,9 +637,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(m.getDeliveryProperties().hasExchange()); BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK(m.getDeliveryProperties().hasExchange()); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); // Add another broker, don't wait for join - should be stalled till ready. @@ -1100,6 +1112,44 @@ QPID_AUTO_TEST_CASE(testRelease) { } } -QPID_AUTO_TEST_SUITE_END() +// Browse for 1 message with byte credit, return true if a message was +// received false if not. +bool browseByteCredit(Client& c, const string& q, int n, Message& m) { + SubscriptionSettings browseSettings( + FlowControl(1, n, false), // 1 message, n bytes credit, no window + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + Subscription s = c.subs.subscribe(lq, q, browseSettings); + c.session.messageFlush(arg::destination=q, arg::sync=true); + c.session.sync(); + c.subs.getSubscription(q).cancel(); + return lq.get(m, 0); // No timeout, flush should push message thru. +} + +// Ensure cluster update preserves exact message size, use byte credt as test. +QPID_AUTO_TEST_CASE(testExactByteCredit) { + ClusterFixture cluster(1, prepareArgs(), -1); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("MyMessage", "q")); + cluster.add(); + + int size=36; // Size of message on broker: headers+body + Client c1(cluster[1], "c1"); + Message m; + + // Ensure we get the message with exact credit. + BOOST_CHECK(browseByteCredit(c0, "q", size, m)); + BOOST_CHECK(browseByteCredit(c1, "q", size, m)); + // and not with one byte less. + BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m)); + BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m)); +} + + +QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |