diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 83 |
1 files changed, 79 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 7b67fed388..5cfcbc262d 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -23,6 +23,7 @@ #include "qpid/client/Connection.h" #include "qpid/client/Session.h" +#include "qpid/client/FailoverListener.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/Cpg.h" #include "qpid/cluster/DumpClient.h" @@ -38,7 +39,9 @@ #include <iostream> #include <iterator> #include <vector> +#include <set> #include <algorithm> +#include <iterator> namespace qpid { namespace cluster { @@ -46,6 +49,12 @@ Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp }} // namespace qpid::cluster +namespace std { // ostream operators in std:: namespace +template <class T> +ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); } +} + + QPID_AUTO_TEST_SUITE(cluster) using namespace std; @@ -88,11 +97,8 @@ struct ClusterFixture : public vector<uint16_t> { } void waitFor(size_t n) { - size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster().getUrls().size() != n) { + for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() != n; --retry) ::usleep(1000); - --retry; - } } }; @@ -164,6 +170,75 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } +template <class C> set<uint16_t> makeSet(const C& c) { + set<uint16_t> s; + std::copy(c.begin(), c.end(), std::inserter(s, s.begin())); + return s; +} + +std::set<uint16_t> portsFromFailoverArray(const framing::Array& urlArray) { + std::set<uint16_t> ports; + for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) { + Url url((*i)->get<std::string>()); + BOOST_REQUIRE(url.size() > 0); + BOOST_REQUIRE(url[0].get<TcpAddress>()); + ports.insert(url[0].get<TcpAddress>()->port); + } + return ports; +} + +std::set<uint16_t> portsFromFailoverMessage(const Message& m) { + framing::Array urlArray; + m.getHeaders().getArray("amq.failover", urlArray); + return portsFromFailoverArray(urlArray); +} + +QPID_AUTO_TEST_CASE(FailoverExchange) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("q"); + c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover"); + + Message m; + BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m)); + + cluster.add(); + BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m)); +} + +std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t n) { + // Wait till there are n ports in the list. + vector<Url> kb = fl.getKnownBrokers(); + for (size_t retry=1000; kb.size() != n && retry != 0; --retry) { + ::usleep(1000); + kb = fl.getKnownBrokers(); + } + set<uint16_t> s; + for (vector<Url>::const_iterator i = kb.begin(); i != kb.end(); ++i) { + BOOST_MESSAGE("Failover URL: " << *i); + BOOST_CHECK(i->size() >= 1); + BOOST_CHECK((*i)[0].get<TcpAddress>()); + s.insert((*i)[0].get<TcpAddress>()->port); + } + return s; +} + +QPID_AUTO_TEST_CASE(testFailoverListener) { + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + FailoverListener fl(c0.connection); + + set<uint16_t> set0=makeSet(cluster); + + BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1)); + cluster.add(); + BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2)); + cluster.kill(1); + BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1)); +} + QPID_AUTO_TEST_CASE(DumpConsumers) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); |