summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp73
1 files changed, 52 insertions, 21 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index d082d74367..871aa0c657 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -21,13 +21,14 @@
#include "ForkedBroker.h"
#include "BrokerFixture.h"
-#include "qpid/cluster/Cpg.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/DumpClient.h"
#include "qpid/framing/AMQBody.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
@@ -41,7 +42,7 @@
namespace qpid {
namespace cluster {
-boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
+Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
}} // namespace qpid::cluster
@@ -81,11 +82,11 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
add(n);
// Wait for all n members to join the cluster
int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
- while (retry && getGlobalCluster()->size() != n) {
+ while (retry && getGlobalCluster().size() != n) {
::sleep(1);
--retry;
}
- BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size());
+ BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
}
void ClusterFixture::add() {
@@ -135,7 +136,37 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-QPID_AUTO_TEST_CASE(testDumpClient) {
+#if 0 // FIXME aconway 2008-09-10: finish & enable
+QPID_AUTO_TEST_CASE(testDumpConsumers) {
+ ClusterFixture cluster(1);
+ Client a(cluster[0]);
+ a.session.queueDeclare("q");
+ a.subs.subscribe(a.lq, "q");
+
+ cluster.add();
+ Client b(cluster[1]);
+ try {
+ b.connection.newSession(a.session.getId().getName());
+ BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
+ } catch (const SessionBusyException&) {}
+
+ // Transfer some messages to the subscription by client a.
+ Message m;
+ a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+
+ b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+
+ // Verify that the queue has been drained on both brokers.
+ // This proves that the consumer was replicated when the second broker joined.
+ BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
+}
+#endif
+
+QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
BrokerFixture donor, receiver;
{
Client c(donor.getPort());
@@ -146,13 +177,13 @@ QPID_AUTO_TEST_CASE(testDumpClient) {
c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args);
c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo");
- c.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("one", "foo"));
+ c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo"));
c.session.exchangeDeclare("ext", arg::type="topic");
c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar");
c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
- c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("one", "bar"));
- c.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("two", "bar"));
+ c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar"));
+ c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar"));
c.session.close();
c.connection.close();
@@ -202,11 +233,11 @@ QPID_AUTO_TEST_CASE(testDumpClient) {
BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
// Verify bindings
- r.session.messageTransfer(arg::destination="exd", arg::content=TransferContent("xxx", "foo"));
+ r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo"));
BOOST_CHECK(r.subs.get(m, "qa"));
BOOST_CHECK_EQUAL(m.getData(), "xxx");
- r.session.messageTransfer(arg::destination="ext", arg::content=TransferContent("yyy", "bar"));
+ r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar"));
BOOST_CHECK(r.subs.get(m, "qb"));
BOOST_CHECK_EQUAL(m.getData(), "yyy");
@@ -254,8 +285,8 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
ClusterFixture cluster(2);
Client c0(cluster[0]);
c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c0.session.messageTransfer(arg::content=Message("foo", "q"));
+ c0.session.messageTransfer(arg::content=Message("bar", "q"));
c0.session.close();
Client c1(cluster[1]);
Message msg;
@@ -268,19 +299,19 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
ClusterFixture cluster (3);
- Client c0(cluster[0]);
+ Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c0.session.messageTransfer(arg::content=Message("foo", "q"));
+ c0.session.messageTransfer(arg::content=Message("bar", "q"));
Message msg;
// Dequeue on 2 others, ensure correct order.
- Client c1(cluster[1]);
+ Client c1(cluster[1], "c1");
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("foo", msg.getData());
- Client c2(cluster[2]);
+ Client c2(cluster[2], "c2");
BOOST_CHECK(c1.subs.get(msg, "q"));
BOOST_CHECK_EQUAL("bar", msg.getData());
@@ -298,8 +329,8 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
// Now send messages
Client c1(cluster[1]);
- c1.session.messageTransfer(arg::content=TransferContent("foo", "q"));
- c1.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+ c1.session.messageTransfer(arg::content=Message("foo", "q"));
+ c1.session.messageTransfer(arg::content=Message("bar", "q"));
// Check they arrived
Message m;