summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp83
1 files changed, 79 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 7b67fed388..5cfcbc262d 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -23,6 +23,7 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
+#include "qpid/client/FailoverListener.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/DumpClient.h"
@@ -38,7 +39,9 @@
#include <iostream>
#include <iterator>
#include <vector>
+#include <set>
#include <algorithm>
+#include <iterator>
namespace qpid {
namespace cluster {
@@ -46,6 +49,12 @@ Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
}} // namespace qpid::cluster
+namespace std { // ostream operators in std:: namespace
+template <class T>
+ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); }
+}
+
+
QPID_AUTO_TEST_SUITE(cluster)
using namespace std;
@@ -88,11 +97,8 @@ struct ClusterFixture : public vector<uint16_t> {
}
void waitFor(size_t n) {
- size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
- while (retry && getGlobalCluster().getUrls().size() != n) {
+ for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() != n; --retry)
::usleep(1000);
- --retry;
- }
}
};
@@ -164,6 +170,75 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
+template <class C> set<uint16_t> makeSet(const C& c) {
+ set<uint16_t> s;
+ std::copy(c.begin(), c.end(), std::inserter(s, s.begin()));
+ return s;
+}
+
+std::set<uint16_t> portsFromFailoverArray(const framing::Array& urlArray) {
+ std::set<uint16_t> ports;
+ for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) {
+ Url url((*i)->get<std::string>());
+ BOOST_REQUIRE(url.size() > 0);
+ BOOST_REQUIRE(url[0].get<TcpAddress>());
+ ports.insert(url[0].get<TcpAddress>()->port);
+ }
+ return ports;
+}
+
+std::set<uint16_t> portsFromFailoverMessage(const Message& m) {
+ framing::Array urlArray;
+ m.getHeaders().getArray("amq.failover", urlArray);
+ return portsFromFailoverArray(urlArray);
+}
+
+QPID_AUTO_TEST_CASE(FailoverExchange) {
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q");
+ c0.session.exchangeBind(arg::queue="q", arg::exchange="amq.failover");
+
+ Message m;
+ BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverMessage(m));
+
+ cluster.add();
+ BOOST_CHECK_EQUAL(1u, c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(makeSet(cluster),portsFromFailoverMessage(m));
+}
+
+std::set<uint16_t> portsFromFailoverListener(const FailoverListener& fl, size_t n) {
+ // Wait till there are n ports in the list.
+ vector<Url> kb = fl.getKnownBrokers();
+ for (size_t retry=1000; kb.size() != n && retry != 0; --retry) {
+ ::usleep(1000);
+ kb = fl.getKnownBrokers();
+ }
+ set<uint16_t> s;
+ for (vector<Url>::const_iterator i = kb.begin(); i != kb.end(); ++i) {
+ BOOST_MESSAGE("Failover URL: " << *i);
+ BOOST_CHECK(i->size() >= 1);
+ BOOST_CHECK((*i)[0].get<TcpAddress>());
+ s.insert((*i)[0].get<TcpAddress>()->port);
+ }
+ return s;
+}
+
+QPID_AUTO_TEST_CASE(testFailoverListener) {
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ FailoverListener fl(c0.connection);
+
+ set<uint16_t> set0=makeSet(cluster);
+
+ BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+ cluster.add();
+ BOOST_CHECK_EQUAL(makeSet(cluster), portsFromFailoverListener(fl, 2));
+ cluster.kill(1);
+ BOOST_CHECK_EQUAL(set0, portsFromFailoverListener(fl, 1));
+}
+
QPID_AUTO_TEST_CASE(DumpConsumers) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");