diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 191 |
1 files changed, 152 insertions, 39 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index c880f30e6b..eee2df58cc 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -27,6 +27,7 @@ #include "qpid/client/ConnectionAccess.h" #include "qpid/client/Session.h" #include "qpid/client/FailoverListener.h" +#include "qpid/client/FailoverManager.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/Cpg.h" #include "qpid/cluster/UpdateClient.h" @@ -35,6 +36,8 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/enum.h" #include "qpid/log/Logger.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> @@ -148,49 +151,65 @@ vector<string> browse(Client& c, const string& q, int n) { c.subs.subscribe(lq, q, browseSettings); vector<string> result; for (int i = 0; i < n; ++i) { - result.push_back(lq.get(TIMEOUT).getData()); + Message m; + if (!lq.get(m, TIMEOUT)) + break; + result.push_back(m.getData()); } c.subs.getSubscription(q).cancel(); return result; } +ConnectionSettings aclSettings(int port, const std::string& id) { + ConnectionSettings settings; + settings.port = port; + settings.mechanism = "PLAIN"; + settings.username = id; + settings.password = id; + return settings; +} + +#if 0 +// FIXME aconway 2009-03-10: This test passes but exposes a memory leak in the SASL client code. +// Enable it when the leak is fixed. + +QPID_AUTO_TEST_CASE(testAcl) { + ofstream policyFile("cluster_test.acl"); + // FIXME aconway 2009-02-12: guest -> qpidd? + policyFile << "acl allow foo@QPID create queue name=foo" << endl + << "acl allow foo@QPID create queue name=foo2" << endl + << "acl deny foo@QPID create queue name=bar" << endl + << "acl allow all all" << endl; + policyFile.close(); + char cwd[1024]; + BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); + ClusterFixture cluster(2,-1, list_of<string> + ("--no-data-dir") + ("--auth=no") + ("--acl-file="+string(cwd)+"/cluster_test.acl") + ("--cluster-mechanism=PLAIN") + ("--cluster-username=cluster") + ("--cluster-password=cluster") + ("--load-module=../.libs/acl.so")); + + Client c0(aclSettings(cluster[0], "c0"), "c0"); + Client c1(aclSettings(cluster[1], "c1"), "c1"); + Client foo(aclSettings(cluster[1], "foo"), "foo"); + + foo.session.queueDeclare("foo"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); + + BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); + BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); -// FIXME aconway 2009-02-12: need to figure out how to test this properly. -// Current problems: -// - all brokers share the same data-dir (set ACL without data dir?) -// - updater's user name not making it through to updatee for ACL checks. -// -// QPID_AUTO_TEST_CASE(testAcl) { -// ofstream policyFile("cluster_test.acl"); -// // FIXME aconway 2009-02-12: guest -> qpidd? -// policyFile << "acl allow guest@QPID all all" << endl -// << "acl allow foo@QPID create queue name=foo" << endl -// << "acl allow bar@QPID create queue name=bar" << endl -// << "acl deny all create queue" << endl -// << "acl allow all all" << endl; -// policyFile.close(); -// ClusterFixture cluster(2,-1, list_of<string> -// ("--data-dir=.") ("--auth=no") -// ("--acl-file=cluster_test.acl") -// ("--cluster-mechanism=PLAIN") -// ("--load-module=../.libs/acl.so")); -// Client c0(cluster[0], "c0"); -// Client c1(cluster[1], "c1"); - -// ConnectionSettings settings; -// settings.port = cluster[0]; -// settings.username = "foo"; -// Client foo(settings, "foo"); - -// foo.session.queueDeclare("foo"); -// BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); -// BOOST_CHECK_EQUAL(c1.session.queueQuery("foo").getQueue(), "foo"); - -// BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), int); -// BOOST_CHECK_EQUAL(c0.session.queueQuery("bar").getQueue(), ""); -// BOOST_CHECK_EQUAL(c1.session.queueQuery("bar").getQueue(), ""); -// } + cluster.add(); + Client c2(aclSettings(cluster[2], "c2"), "c2"); + BOOST_CHECK_THROW(foo.session.queueDeclare("bar"), framing::NotAllowedException); + BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); +} +#endif QPID_AUTO_TEST_CASE(testMessageTimeToLive) { // Note: this doesn't actually test for cluster race conditions around TTL, @@ -199,13 +218,23 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { ClusterFixture cluster(2); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); + c0.session.queueDeclare("p"); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); c0.session.messageTransfer(arg::content=Message("b", "q")); - BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<string>("a")("b")); - sys::usleep(300*1000); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000)); + c0.session.messageTransfer(arg::content=Message("y", "p")); + cluster.add(); + Client c2(cluster[1], "c2"); + + BOOST_CHECK_EQUAL(browse(c0, "p", 2), list_of<string>("x")("y")); + BOOST_CHECK_EQUAL(browse(c1, "p", 2), list_of<string>("x")("y")); + BOOST_CHECK_EQUAL(browse(c2, "p", 2), list_of<string>("x")("y")); + + sys::usleep(200*1000); BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b")); BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b")); + BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b")); } QPID_AUTO_TEST_CASE(testSequenceOptions) { @@ -506,10 +535,11 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=Message("foo","q")); c0.session.messageTransfer(arg::content=Message("bar","q")); + while (c0.session.queueQuery("q").getMessageCount() != 2) sys::usleep(1000); // Wait for message to show up on broker 0. - // Add a new broker, it should catch up. + // Add a new broker, it will catch up. cluster.add(); // Do some work post-add @@ -527,6 +557,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); @@ -628,4 +659,86 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } +QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) +{ + struct Sender : FailoverManager::Command + { + std::string queue; + std::string content; + + Sender(const std::string& q, const std::string& c) : queue(q), content(c) {} + + void execute(AsyncSession& session, bool) + { + session.messageTransfer(arg::content=Message(content, queue)); + } + }; + + struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable + { + FailoverManager& mgr; + std::string queue; + std::string expectedContent; + qpid::client::Subscription subscription; + qpid::sys::Monitor lock; + bool ready; + + Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false) {} + + void received(Message& message) + { + BOOST_CHECK_EQUAL(expectedContent, message.getData()); + subscription.cancel(); + } + + void execute(AsyncSession& session, bool) + { + session.queueDeclare(arg::queue=queue); + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, queue); + session.sync(); + setReady(); + subs.run(); + //cleanup: + session.queueDelete(arg::queue=queue); + } + + void run() + { + mgr.execute(*this); + } + + void waitForReady() + { + qpid::sys::Monitor::ScopedLock l(lock); + while (!ready) { + lock.wait(); + } + } + + void setReady() + { + qpid::sys::Monitor::ScopedLock l(lock); + ready = true; + lock.notify(); + } + }; + + ClusterFixture cluster(2); + ConnectionSettings settings; + settings.port = cluster[1]; + settings.heartbeat = 1; + FailoverManager fmgr(settings); + Sender sender("my-queue", "my-data"); + Receiver receiver(fmgr, "my-queue", "my-data"); + qpid::sys::Thread runner(receiver); + receiver.waitForReady(); + cluster.kill(1); + //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection: + ::usleep(2*1000*1000); + fmgr.execute(sender); + runner.join(); + fmgr.close(); +} + QPID_AUTO_TEST_SUITE_END() |