summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp54
1 files changed, 54 insertions, 0 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 99ca5c7161..72440bbe88 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -31,6 +31,7 @@
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/Uuid.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/enum.h"
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
@@ -201,6 +202,59 @@ template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
return s;
}
+class Sender {
+ public:
+ Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
+ void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) {
+ AMQFrame f(body);
+ f.setChannel(channel);
+ f.setFirstSegment(firstSeg);
+ f.setLastSegment(lastSeg);
+ f.setFirstFrame(firstFrame);
+ f.setLastFrame(lastFrame);
+ connection->handle(f);
+ }
+
+ private:
+ boost::shared_ptr<ConnectionImpl> connection;
+ uint16_t channel;
+};
+
+QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
+ // Verify that we dump a partially recieved message to a new member.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare("q");
+ Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
+
+ // Send first 2 frames of message.
+ MessageTransferBody transfer(
+ ProtocolVersion(), std::string(), // default exchange.
+ framing::message::ACCEPT_MODE_NONE,
+ framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
+ sender.send(transfer, true, false, true, true);
+ AMQHeaderBody header;
+ header.get<DeliveryProperties>(true)->setRoutingKey("q");
+ sender.send(header, false, false, true, true);
+
+ // No reliable way to ensure the partial message has arrived
+ // before we start the new broker, so we sleep.
+ ::usleep(250);
+ cluster.add();
+
+ // Send final 2 frames of message.
+ sender.send(AMQContentBody("ab"), false, true, true, false);
+ sender.send(AMQContentBody("cd"), false, true, false, true);
+
+ // Verify message is enqued correctly on second member.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "abcd");
+
+ BOOST_CHECK_EQUAL(2u, getGlobalCluster().getUrls().size());
+}
+
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");