summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/Plugin.cpp17
-rw-r--r--qpid/cpp/src/qpid/Plugin.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp2
-rw-r--r--qpid/cpp/src/qpidd.cpp8
-rw-r--r--qpid/cpp/src/tests/BrokerFixture.h44
-rw-r--r--qpid/cpp/src/tests/Cpg.cpp115
-rwxr-xr-xqpid/cpp/src/tests/ais_check13
-rw-r--r--qpid/cpp/src/tests/cluster.mk8
-rw-r--r--qpid/cpp/src/tests/cluster_client.cpp108
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp215
-rwxr-xr-xqpid/cpp/src/tests/start_cluster26
-rwxr-xr-xqpid/cpp/src/tests/stop_cluster14
12 files changed, 272 insertions, 301 deletions
diff --git a/qpid/cpp/src/qpid/Plugin.cpp b/qpid/cpp/src/qpid/Plugin.cpp
index 77627c3742..733d134334 100644
--- a/qpid/cpp/src/qpid/Plugin.cpp
+++ b/qpid/cpp/src/qpid/Plugin.cpp
@@ -19,16 +19,16 @@
*/
#include "Plugin.h"
+#include "qpid/Options.h"
namespace qpid {
namespace {
-// This is a single threaded singleton implementation so
-// it is important to be sure that the first use of this
-// singleton is when the program is still single threaded
Plugin::Plugins& thePlugins() {
+ // This is a single threaded singleton implementation so
+ // it is important to be sure that the first use of this
+ // singleton is when the program is still single threaded
static Plugin::Plugins plugins;
-
return plugins;
}
}
@@ -42,8 +42,13 @@ Plugin::~Plugin() {}
Options* Plugin::getOptions() { return 0; }
-const Plugin::Plugins& Plugin::getPlugins() {
- return thePlugins();
+const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); }
+
+void Plugin::addOptions(Options& opts) {
+ for (Plugins::const_iterator i = getPlugins().begin(); i != getPlugins().end(); ++i) {
+ if ((*i)->getOptions())
+ opts.add(*(*i)->getOptions());
+ }
}
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/Plugin.h b/qpid/cpp/src/qpid/Plugin.h
index 7b01c83273..3ead770129 100644
--- a/qpid/cpp/src/qpid/Plugin.h
+++ b/qpid/cpp/src/qpid/Plugin.h
@@ -88,6 +88,9 @@ class Plugin : boost::noncopyable
* Caller must not delete plugin pointers.
*/
static const Plugins& getPlugins();
+
+ /** For each registered plugin, add plugin.getOptions() to opts. */
+ static void addOptions(Options& opts);
};
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 01d97d2a17..ce8aa0dc33 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -92,7 +92,7 @@ Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
handles.put(handle, &handler);
- QPID_LOG(debug, "Initialize CPG handle " << handle);
+ QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle);
}
Cpg::~Cpg() {
diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp
index 0194f9af23..67c94383e8 100644
--- a/qpid/cpp/src/qpidd.cpp
+++ b/qpid/cpp/src/qpidd.cpp
@@ -102,13 +102,7 @@ struct QpiddOptions : public qpid::Options {
add(broker);
add(daemon);
add(log);
- const Plugin::Plugins& plugins=
- Plugin::getPlugins();
- for (Plugin::Plugins::const_iterator i = plugins.begin();
- i != plugins.end();
- ++i)
- if ((*i)->getOptions() != 0)
- add(*(*i)->getOptions());
+ Plugin::addOptions(*this);
}
void usage() const {
diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h
index 31f63d71a0..b6837f6553 100644
--- a/qpid/cpp/src/tests/BrokerFixture.h
+++ b/qpid/cpp/src/tests/BrokerFixture.h
@@ -29,19 +29,19 @@
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
+#include <boost/noncopyable.hpp>
/**
* A fixture with an in-process broker.
*/
-struct BrokerFixture {
+struct BrokerFixture : private boost::noncopyable {
typedef qpid::broker::Broker Broker;
typedef boost::shared_ptr<Broker> BrokerPtr;
BrokerPtr broker;
qpid::sys::Thread brokerThread;
- BrokerFixture() {
- Broker::Options opts;
+ BrokerFixture(Broker::Options opts=Broker::Options()) {
opts.port=0;
// Management doesn't play well with multiple in-process brokers.
opts.enableMgmt=false;
@@ -65,8 +65,11 @@ struct BrokerFixture {
void open(qpid::client::Connection& c) {
c.open("localhost", broker->getPort());
}
+
+ uint16_t getPort() { return broker->getPort(); }
};
+/** Connection that opens in its constructor */
struct LocalConnection : public qpid::client::Connection {
LocalConnection(uint16_t port) { open("localhost", port); }
};
@@ -80,25 +83,36 @@ struct ProxyConnection : public qpid::client::Connection {
~ProxyConnection() { close(); }
};
-/**
- * A BrokerFixture with open Connection, Session and
- * SubscriptionManager and LocalQueue for convenience.
+/** Convenience class to create and open a connection and session
+ * and some related useful objects.
*/
-template <class ConnectionType>
-struct SessionFixtureT : BrokerFixture {
+template <class ConnectionType=ProxyConnection, class SessionType=qpid::client::Session>
+struct ClientT {
ConnectionType connection;
- qpid::client::Session session;
+ SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
+ ClientT(uint16_t port) : connection(port),
+ session(connection.newSession("Client")),
+ subs(session)
+ {}
+
+ ~ClientT() { connection.close(); }
+};
- SessionFixtureT() : connection(broker->getPort()),
- session(connection.newSession("SessionFixture")),
- subs(session)
+typedef ClientT<> Client;
+
+/**
+ * A BrokerFixture and ready-connected BrokerFixture::Client all in one.
+ */
+template <class ConnectionType, class SessionType=qpid::client::Session>
+struct SessionFixtureT : BrokerFixture, ClientT<ConnectionType,SessionType> {
+
+ SessionFixtureT(Broker::Options opts=Broker::Options()) :
+ BrokerFixture(opts),
+ ClientT<ConnectionType,SessionType>(broker->getPort())
{}
- ~SessionFixtureT() {
- connection.close();
- }
};
typedef SessionFixtureT<LocalConnection> SessionFixture;
diff --git a/qpid/cpp/src/tests/Cpg.cpp b/qpid/cpp/src/tests/Cpg.cpp
deleted file mode 100644
index db7debaa08..0000000000
--- a/qpid/cpp/src/tests/Cpg.cpp
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-
-#include "test_tools.h"
-#include "qpid/cluster/Cpg.h"
-#include "qpid/framing/AMQBody.h"
-
-#include <boost/bind.hpp>
-#include "unit_test.h"
-
-#include <string>
-#include <iostream>
-#include <iterator>
-#include <vector>
-#include <algorithm>
-
-QPID_AUTO_TEST_SUITE(CpgTestSuite)
-
-
-using namespace std;
-using namespace qpid::cluster;
-using namespace qpid::framing;
-
-// For debugging: op << for CPG types.
-
-ostream& operator<<(ostream& o, const cpg_name* n) {
- return o << qpid::cluster::Cpg::str(*n);
-}
-
-ostream& operator<<(ostream& o, const cpg_address& a) {
- return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")";
-}
-
-template <class T>
-ostream& operator<<(ostream& o, const pair<T*, int>& array) {
- o << "{ ";
- ostream_iterator<cpg_address> i(o, " ");
- copy(array.first, array.first+array.second, i);
- o << "}";
- return o;
-}
-
-struct Callback : public Cpg::Handler {
- Callback(const string group_) : group(group_) {}
- string group;
- vector<string> delivered;
- vector<int> configChanges;
-
- void deliver (
- cpg_handle_t /*handle*/,
- struct cpg_name *grp,
- uint32_t /*nodeid*/,
- uint32_t /*pid*/,
- void* msg,
- int msg_len)
- {
- BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
- delivered.push_back(string((char*)msg,msg_len));
- }
-
- void configChange(
- cpg_handle_t /*handle*/,
- struct cpg_name *grp,
- struct cpg_address */*members*/, int nMembers,
- struct cpg_address */*left*/, int nLeft,
- struct cpg_address */*joined*/, int nJoined
- )
- {
- BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
- configChanges.push_back(nMembers);
- BOOST_MESSAGE("configChange: "<<
- nLeft<<" left "<<
- nJoined<<" joined "<<
- nMembers<<" members.");
- }
-};
-
-QPID_AUTO_TEST_CASE(CpgBasic) {
- // Verify basic functionality of cpg. This will catch any
- // openais configuration or permission errors.
- //
- Cpg::Name group("CpgBasic");
- Callback cb(group.str());
- Cpg cpg(cb);
- cpg.join(group);
- iovec iov = { (void*)"Hello!", 6 };
- cpg.mcast(group, &iov, 1);
- cpg.leave(group);
- cpg.dispatchSome();
-
- BOOST_REQUIRE_EQUAL(1u, cb.delivered.size());
- BOOST_CHECK_EQUAL("Hello!", cb.delivered.front());
- BOOST_REQUIRE_EQUAL(2u, cb.configChanges.size());
- BOOST_CHECK_EQUAL(1, cb.configChanges[0]);
- BOOST_CHECK_EQUAL(0, cb.configChanges[1]);
-}
-
-
-QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/ais_check b/qpid/cpp/src/tests/ais_check
index f16480b7f0..e2d53b5870 100755
--- a/qpid/cpp/src/tests/ais_check
+++ b/qpid/cpp/src/tests/ais_check
@@ -23,11 +23,14 @@ EOF
exit 0; # A warning, not a failure.
fi
+# Execute command with the ais group set.
+with_ais_group() {
+ id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group."; exit 1; }
+ echo $* | newgrp ais
+}
+
# Run the tests
srcdir=`dirname $0`
-$srcdir/start_cluster 4
-./ais_test
-ret=$?
-$srcdir/stop_cluster
-exit $ret
+with_ais_group ./cluster_test || ERROR=1
+exit $ERROR
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index ba49a6774d..d2961f0954 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -11,10 +11,10 @@ lib_cluster = $(abs_builddir)/../libqpidcluster.la
# ais_check checks conditions for AIS tests and runs if ok.
TESTS+=ais_check
-EXTRA_DIST+=ais_check start_cluster stop_cluster
+EXTRA_DIST+=ais_check
-check_PROGRAMS+=ais_test
-ais_test_SOURCES=ais_test.cpp Cpg.cpp
-ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
+check_PROGRAMS+=cluster_test
+cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
+cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
endif
diff --git a/qpid/cpp/src/tests/cluster_client.cpp b/qpid/cpp/src/tests/cluster_client.cpp
deleted file mode 100644
index efb6e04aa8..0000000000
--- a/qpid/cpp/src/tests/cluster_client.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "unit_test.h"
-#include "BrokerFixture.h"
-#include "qpid/client/Session.h"
-
-#include <fstream>
-#include <vector>
-#include <functional>
-
-QPID_AUTO_TEST_SUITE(cluster_clientTestSuite)
-
-using namespace qpid;
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::client::arg;
-using framing::TransferContent;
-using std::vector;
-using std::string;
-using std::ifstream;
-using std::ws;
-
-struct ClusterConnections : public vector<shared_ptr<Connection> > {
- ClusterConnections() {
- ifstream portfile("cluster.ports");
- BOOST_REQUIRE(portfile.good());
- portfile >> ws;
- while (portfile.good()) {
- uint16_t port;
- portfile >> port >> ws;
- push_back(make_shared_ptr(new Connection(port)));
- back()->open("localhost", port);
- }
- BOOST_REQUIRE(size() > 1);
- }
-
- ~ClusterConnections() {
- for (iterator i = begin(); i != end(); ++i ){
- (*i)->close();
- }
- }
-};
-
-QPID_AUTO_TEST_CASE(testWiringReplication) {
- // Declare on one broker, use on others.
- ClusterConnections cluster;
- BOOST_REQUIRE(cluster.size() > 1);
-
- Session broker0 = cluster[0]->newSession();
- broker0.exchangeDeclare(exchange="ex");
- broker0.queueDeclare(queue="q");
- broker0.queueBind(exchange="ex", queue="q", routingKey="key");
- broker0.close();
-
- for (size_t i = 1; i < cluster.size(); ++i) {
- Session s = cluster[i]->newSession();
- s.messageTransfer(content=TransferContent("data", "key", "ex"));
- s.messageSubscribe(queue="q", destination="q");
- s.messageFlow(destination="q", unit=0, value=1);//messages
- FrameSet::shared_ptr msg = s.get();
- BOOST_CHECK(msg->isA<MessageTransferBody>());
- BOOST_CHECK_EQUAL(string("data"), msg->getContent());
- s.getExecution().completed(msg->getId(), true, true);
- cluster[i]->close();
- }
-}
-
-QPID_AUTO_TEST_CASE(testMessageReplication) {
- // Enqueue on one broker, dequeue on another.
- ClusterConnections cluster;
- BOOST_REQUIRE(cluster.size() > 1);
-
- Session broker0 = cluster[0]->newSession();
- broker0.queueDeclare(queue="q");
- broker0.messageTransfer(content=TransferContent("data", "q"));
- broker0.close();
-
- Session broker1 = cluster[1]->newSession();
- broker1.
- s.messageSubscribe(queue="q", destination="q");
- s.messageFlow(destination="q", unit=0, value=1);//messages
- FrameSet::shared_ptr msg = s.get();
- BOOST_CHECK(msg->isA<MessageTransferBody>());
- BOOST_CHECK_EQUAL(string("data"), msg->getContent());
- s.getExecution().completed(msg->getId(), true, true);
- cluster[i]->close();
- }
-}
-
-// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover.
-
-QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
new file mode 100644
index 0000000000..19dffe2ee4
--- /dev/null
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -0,0 +1,215 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "BrokerFixture.h"
+
+#include "qpid/cluster/Cpg.h"
+#include "qpid/framing/AMQBody.h"
+
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <string>
+#include <iostream>
+#include <iterator>
+#include <vector>
+#include <algorithm>
+
+QPID_AUTO_TEST_SUITE(CpgTestSuite)
+
+
+using namespace std;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using boost::ptr_vector;
+
+// For debugging: op << for CPG types.
+
+ostream& operator<<(ostream& o, const cpg_name* n) {
+ return o << qpid::cluster::Cpg::str(*n);
+}
+
+ostream& operator<<(ostream& o, const cpg_address& a) {
+ return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")";
+}
+
+template <class T>
+ostream& operator<<(ostream& o, const pair<T*, int>& array) {
+ o << "{ ";
+ ostream_iterator<cpg_address> i(o, " ");
+ copy(array.first, array.first+array.second, i);
+ o << "}";
+ return o;
+}
+
+struct Callback : public Cpg::Handler {
+ Callback(const string group_) : group(group_) {}
+ string group;
+ vector<string> delivered;
+ vector<int> configChanges;
+
+ void deliver (
+ cpg_handle_t /*handle*/,
+ struct cpg_name *grp,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* msg,
+ int msg_len)
+ {
+ BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
+ delivered.push_back(string((char*)msg,msg_len));
+ }
+
+ void configChange(
+ cpg_handle_t /*handle*/,
+ struct cpg_name *grp,
+ struct cpg_address */*members*/, int nMembers,
+ struct cpg_address */*left*/, int nLeft,
+ struct cpg_address */*joined*/, int nJoined
+ )
+ {
+ BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
+ configChanges.push_back(nMembers);
+ BOOST_MESSAGE("configChange: "<<
+ nLeft<<" left "<<
+ nJoined<<" joined "<<
+ nMembers<<" members.");
+ }
+};
+
+QPID_AUTO_TEST_CASE(CpgBasic) {
+ // Verify basic functionality of cpg. This will catch any
+ // openais configuration or permission errors.
+ //
+ Cpg::Name group("CpgBasic");
+ Callback cb(group.str());
+ Cpg cpg(cb);
+ cpg.join(group);
+ iovec iov = { (void*)"Hello!", 6 };
+ cpg.mcast(group, &iov, 1);
+ cpg.leave(group);
+ cpg.dispatchSome();
+
+ BOOST_REQUIRE_EQUAL(1u, cb.delivered.size());
+ BOOST_CHECK_EQUAL("Hello!", cb.delivered.front());
+ BOOST_REQUIRE_EQUAL(2u, cb.configChanges.size());
+ BOOST_CHECK_EQUAL(1, cb.configChanges[0]);
+ BOOST_CHECK_EQUAL(0, cb.configChanges[1]);
+}
+
+
+QPID_AUTO_TEST_CASE(CpgMulti) {
+ // Verify using multiple handles in one process.
+ //
+ Cpg::Name group("CpgMulti");
+ Callback cb1(group.str());
+ Cpg cpg1(cb1);
+
+ Callback cb2(group.str());
+ Cpg cpg2(cb2);
+
+ cpg1.join(group);
+ cpg2.join(group);
+ iovec iov1 = { (void*)"Hello1", 6 };
+ iovec iov2 = { (void*)"Hello2", 6 };
+ cpg1.mcast(group, &iov1, 1);
+ cpg2.mcast(group, &iov2, 1);
+ cpg1.leave(group);
+ cpg2.leave(group);
+
+ cpg1.dispatchSome();
+ BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
+ BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
+ BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
+
+ cpg2.dispatchSome();
+ BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
+ BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
+ BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
+}
+
+// Test cluster of BrokerFixtures.
+struct ClusterFixture : public ptr_vector<BrokerFixture> {
+ ClusterFixture(size_t n=0) { add(n); }
+ void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
+ void add();
+};
+
+void ClusterFixture::add() {
+ qpid::broker::Broker::Options opts;
+ // Assumes the cluster plugin is loaded.
+ qpid::Plugin::addOptions(opts);
+ const char* argv[] = { "--cluster-name=$CLUSTER" };
+ // FIXME aconway 2008-06-26: fix parse() signature, should not need cast.
+ opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
+ push_back(new BrokerFixture(opts));
+}
+
+#if 0 // FIXME aconway 2008-06-26: TODO
+QPID_AUTO_TEST_CASE(testWiringReplication) {
+ const size_t SIZE=3;
+ ClusterFixture cluster(SIZE);
+ Client c0(cluster[0].getPort());
+ BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
+ BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
+ c0.session.queueDeclare("q");
+ c0.session.exchangeDeclare("ex", arg::type="direct");
+ BOOST_CHECK_EQUAL("q", c0.session.queueQuery("q").getQueue());
+ BOOST_CHECK_EQUAL("direct", c0.session.exchangeQuery("ex").getType());
+ c0.close();
+
+ // Verify all brokers get wiring update.
+ for (size_t i = 1; i < cluster.size(); ++i) {
+ Client c(cluster[i].getPort());
+ BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
+ BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
+ c.close();
+ }
+}
+
+QPID_AUTO_TEST_CASE(testMessageReplication) {
+ // Enqueue on one broker, dequeue on another.
+ ClusterConnections cluster;
+ BOOST_REQUIRE(cluster.size() > 1);
+
+ Session broker0 = cluster[0]->newSession();
+ broker0.queueDeclare(queue="q");
+ broker0.messageTransfer(content=TransferContent("data", "q"));
+ broker0.close();
+
+ Session broker1 = cluster[1]->newSession();
+ broker1.
+ c.session.messageSubscribe(queue="q", destination="q");
+ c.session.messageFlow(destination="q", unit=0, value=1);//messages
+ FrameSet::shared_ptr msg = c.session.get();
+ BOOST_CHECK(msg->isA<MessageTransferBody>());
+ BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+ c.session.getExecution().completed(msg->getId(), true, true);
+ cluster[i]->close();
+ }
+}
+
+// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover.
+
+#endif
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster
deleted file mode 100755
index 46ecbad9c5..0000000000
--- a/qpid/cpp/src/tests/start_cluster
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/sh
-# Start a cluster of brokers on local host.
-# Print the cluster's URL.
-#
-
-# Execute command with the ais group set.
-with_ais_group() {
- id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group."; exit 1; }
- echo $* | newgrp ais
-}
-
-test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
-test -z "$*" && { echo "Usage: $0 cluster-size [options]"; exit 1; }
-
-rm -f cluster*.log cluster.ports
-SIZE=$1
-shift
-CLUSTER=`whoami` # Cluster name=user name, avoid clashes.
-OPTS="--load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
-
-for (( i=0; i<SIZE; ++i )); do
- PORT=`with_ais_group ../qpidd $OPTS` || exit 1
- echo $PORT >> cluster.ports
-done
-
-
diff --git a/qpid/cpp/src/tests/stop_cluster b/qpid/cpp/src/tests/stop_cluster
deleted file mode 100755
index 6afcb527e5..0000000000
--- a/qpid/cpp/src/tests/stop_cluster
+++ /dev/null
@@ -1,14 +0,0 @@
-#!/bin/sh
-# Stop brokers on ports listed in cluster.ports
-
-
-PORTS=`cat cluster.ports`
-for PORT in $PORTS ; do
- ../qpidd -qp $PORT || ERROR="$ERROR $PORT"
-done
-rm -f cluster.ports
-
-if [ -n "$ERROR" ]; then
- echo "Errors stopping brokers on ports: $ERROR"
- exit 1
-fi