summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp85
1 files changed, 85 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index c880f30e6b..6702686c2a 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -27,6 +27,7 @@
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/Session.h"
#include "qpid/client/FailoverListener.h"
+#include "qpid/client/FailoverManager.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/UpdateClient.h"
@@ -35,6 +36,8 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Logger.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
@@ -628,4 +631,86 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
+QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
+{
+ struct Sender : FailoverManager::Command
+ {
+ std::string queue;
+ std::string content;
+
+ Sender(const std::string& q, const std::string& c) : queue(q), content(c) {}
+
+ void execute(AsyncSession& session, bool)
+ {
+ session.messageTransfer(arg::content=Message(content, queue));
+ }
+ };
+
+ struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable
+ {
+ FailoverManager& mgr;
+ std::string queue;
+ std::string expectedContent;
+ qpid::client::Subscription subscription;
+ qpid::sys::Monitor lock;
+ bool ready;
+
+ Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false) {}
+
+ void received(Message& message)
+ {
+ BOOST_CHECK_EQUAL(expectedContent, message.getData());
+ subscription.cancel();
+ }
+
+ void execute(AsyncSession& session, bool)
+ {
+ session.queueDeclare(arg::queue=queue);
+ SubscriptionManager subs(session);
+ subscription = subs.subscribe(*this, queue);
+ session.sync();
+ setReady();
+ subs.run();
+ //cleanup:
+ session.queueDelete(arg::queue=queue);
+ }
+
+ void run()
+ {
+ mgr.execute(*this);
+ }
+
+ void waitForReady()
+ {
+ qpid::sys::Monitor::ScopedLock l(lock);
+ while (!ready) {
+ lock.wait();
+ }
+ }
+
+ void setReady()
+ {
+ qpid::sys::Monitor::ScopedLock l(lock);
+ ready = true;
+ lock.notify();
+ }
+ };
+
+ ClusterFixture cluster(2);
+ ConnectionSettings settings;
+ settings.port = cluster[1];
+ settings.heartbeat = 1;
+ FailoverManager fmgr(settings);
+ Sender sender("my-queue", "my-data");
+ Receiver receiver(fmgr, "my-queue", "my-data");
+ qpid::sys::Thread runner(receiver);
+ receiver.waitForReady();
+ cluster.kill(1);
+ //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+ ::usleep(2*1000*1000);
+ fmgr.execute(sender);
+ runner.join();
+ fmgr.close();
+}
+
QPID_AUTO_TEST_SUITE_END()