summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-10-05 15:08:25 +0000
committerAlan Conway <aconway@apache.org>2009-10-05 15:08:25 +0000
commit4f80172fc8a66475045df2299b45c4eb6d46a1b2 (patch)
tree3b80ced1287ae02a67264c64e23c86d57cd99582 /cpp/src/tests/cluster_test.cpp
parent6dec488633e43e6a4ebe95fb8289f73d70d00867 (diff)
downloadqpid-python-4f80172fc8a66475045df2299b45c4eb6d46a1b2.tar.gz
Fixed: cluster udpate did not presever deliver-properties.exchange on messages.
Also minor improvements: - Improved debug logging for consumers. - Cluster tests scripts work with latest corosync: don't check/set ais group. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@821830 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp58
1 files changed, 54 insertions, 4 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 28fcdd13ad..bc7a666c95 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -85,6 +85,12 @@ void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
args += "--no-data-dir";
}
+ClusterFixture::Args prepareArgs(const bool durableFlag = false) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ return args;
+}
+
// Timeout for tests that wait for messages
const sys::Duration TIMEOUT=sys::TIME_SEC/4;
@@ -596,16 +602,19 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
}
}
-QPID_AUTO_TEST_CASE(testCatchupSharedState) {
+// Test that message data and delivery properties are updated properly.
+QPID_AUTO_TEST_CASE(testUpdateMessages) {
ClusterFixture::Args args;
prepareArgs(args, durableFlag);
ClusterFixture cluster(1, args, -1);
Client c0(cluster[0], "c0");
- // Create some shared state.
+ // Create messages with different delivery properties
c0.session.queueDeclare("q", arg::durable=durableFlag);
+ c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q");
c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag));
- c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag));
+ c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag),
+ arg::destination="amq.fanout");
while (c0.session.queueQuery("q").getMessageCount() != 2)
sys::usleep(1000); // Wait for message to show up on broker 0.
@@ -628,9 +637,12 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(m.getDeliveryProperties().hasExchange());
BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+ BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
// Add another broker, don't wait for join - should be stalled till ready.
@@ -1100,6 +1112,44 @@ QPID_AUTO_TEST_CASE(testRelease) {
}
}
-QPID_AUTO_TEST_SUITE_END()
+// Browse for 1 message with byte credit, return true if a message was
+// received false if not.
+bool browseByteCredit(Client& c, const string& q, int n, Message& m) {
+ SubscriptionSettings browseSettings(
+ FlowControl(1, n, false), // 1 message, n bytes credit, no window
+ ACCEPT_MODE_NONE,
+ ACQUIRE_MODE_NOT_ACQUIRED,
+ 0 // No auto-ack.
+ );
+ LocalQueue lq;
+ Subscription s = c.subs.subscribe(lq, q, browseSettings);
+ c.session.messageFlush(arg::destination=q, arg::sync=true);
+ c.session.sync();
+ c.subs.getSubscription(q).cancel();
+ return lq.get(m, 0); // No timeout, flush should push message thru.
+}
+
+// Ensure cluster update preserves exact message size, use byte credt as test.
+QPID_AUTO_TEST_CASE(testExactByteCredit) {
+ ClusterFixture cluster(1, prepareArgs(), -1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("MyMessage", "q"));
+ cluster.add();
+
+ int size=36; // Size of message on broker: headers+body
+ Client c1(cluster[1], "c1");
+ Message m;
+
+ // Ensure we get the message with exact credit.
+ BOOST_CHECK(browseByteCredit(c0, "q", size, m));
+ BOOST_CHECK(browseByteCredit(c1, "q", size, m));
+ // and not with one byte less.
+ BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m));
+ BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m));
+}
+
+
+QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests