diff options
author | Alan Conway <aconway@apache.org> | 2008-07-04 19:07:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-07-04 19:07:33 +0000 |
commit | 6a55caca6570a842bfa9541c541a846656d7284d (patch) | |
tree | 53049fb8770abc94e61538f7e42c977f74c90739 /qpid/cpp/src/tests/cluster_test.cpp | |
parent | 0a2270640b1b91915902b60d46cb3dd421218eda (diff) | |
download | qpid-python-6a55caca6570a842bfa9541c541a846656d7284d.tar.gz |
Cluster prototype: handles client-initiated commands (not dequeues)
Details
- Cluster.cpp: serializes all frames thru cluster (see below)
- broker/ConnectionManager: Added handler chain in front of Connection::received.
- sys::Fork and ForkWithMessage - abstractions for forking with posix impl.
- tests/ForkedBroker.h: test utility to fork a broker process.
- broker/SignalHandler: Encapsulated signal handling from qpidd.cpp
- Various minor logging & error message improvements to aid debugging.
NB: current impl will not scale. It is functional working starting point so we
can start testing & profiling to find the right optimizations.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@674107 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 91 |
1 files changed, 37 insertions, 54 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 63e3b257b3..2fa7cd325d 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -19,10 +19,14 @@ #include "test_tools.h" #include "unit_test.h" +#include "ForkedBroker.h" #include "BrokerFixture.h" #include "qpid/cluster/Cpg.h" #include "qpid/framing/AMQBody.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" +#include "qpid/framing/Uuid.h" #include <boost/bind.hpp> #include <boost/ptr_container/ptr_vector.hpp> @@ -33,15 +37,41 @@ #include <vector> #include <algorithm> +#include <signal.h> + QPID_AUTO_TEST_SUITE(CpgTestSuite) using namespace std; +using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::client; +using qpid::broker::Broker; using boost::ptr_vector; +struct ClusterFixture : public ptr_vector<ForkedBroker> { + string name; + + ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); } + void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } + void add(); +}; + +void ClusterFixture::add() { + broker::Broker::Options opts; + Plugin::Factory::addOptions(opts); // For cluster options. + const char* argv[] = { + "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir" + }; + opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv)); + ostringstream prefix; + prefix << "b" << size() << "-"; + QPID_LOG(info, "ClusterFixture adding broker " << prefix.str()); + push_back(new ForkedBroker(opts, prefix.str())); + QPID_LOG(info, "ClusterFixture added broker " << prefix.str()); +} + // For debugging: op << for CPG types. ostream& operator<<(ostream& o, const cpg_name* n) { @@ -117,56 +147,8 @@ QPID_AUTO_TEST_CASE(CpgBasic) { } -QPID_AUTO_TEST_CASE(CpgMulti) { - // Verify using multiple handles in one process. - // - Cpg::Name group("CpgMulti"); - Callback cb1(group.str()); - Cpg cpg1(cb1); - - Callback cb2(group.str()); - Cpg cpg2(cb2); - - cpg1.join(group); - cpg2.join(group); - iovec iov1 = { (void*)"Hello1", 6 }; - iovec iov2 = { (void*)"Hello2", 6 }; - cpg1.mcast(group, &iov1, 1); - cpg2.mcast(group, &iov2, 1); - cpg1.leave(group); - cpg2.leave(group); - - cpg1.dispatchSome(); - BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size()); - BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]); - BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]); - - cpg2.dispatchSome(); - BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size()); - BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]); - BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]); -} - -// Test cluster of BrokerFixtures. -struct ClusterFixture : public ptr_vector<BrokerFixture> { - ClusterFixture(size_t n=0) { add(n); } - void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } - void add(); -}; - -void ClusterFixture::add() { - qpid::broker::Broker::Options opts; - // Assumes the cluster plugin is loaded. - qpid::Plugin::Factory::addOptions(opts); - const char* argv[] = { "--cluster-name", ::getenv("USERNAME") }; - // FIXME aconway 2008-06-26: fix parse() signature, should not need cast. - opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv)); - push_back(new BrokerFixture(opts)); -} - -#if 0 QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(3); + ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers Client c0(cluster[0].getPort()); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); @@ -187,16 +169,17 @@ QPID_AUTO_TEST_CASE(testMessageReplication) { ClusterFixture cluster(2); Client c0(cluster[0].getPort()); c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=TransferContent("data", "q")); + c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); + c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); c0.session.close(); Client c1(cluster[1].getPort()); Message msg; BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL(string("data"), msg.getData()); + BOOST_CHECK_EQUAL(string("foo"), msg.getData()); + BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } -// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover. - -#endif +// TODO aconway 2008-06-25: dequeue replication, failover. QPID_AUTO_TEST_SUITE_END() |