summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/BrokerFixture.h8
-rw-r--r--cpp/src/tests/Makefile.am6
-rw-r--r--cpp/src/tests/QueueEvents.cpp233
-rwxr-xr-xcpp/src/tests/replication_test98
4 files changed, 342 insertions, 3 deletions
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index 2a4faa2fd4..205b4d90ef 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -66,8 +66,14 @@ struct BrokerFixture : private boost::noncopyable {
brokerThread = qpid::sys::Thread(*broker);
};
- ~BrokerFixture() {
+ void shutdownBroker()
+ {
broker->shutdown();
+ broker = BrokerPtr();
+ }
+
+ ~BrokerFixture() {
+ if (broker) broker->shutdown();
brokerThread.join();
}
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 8cc72da96b..314b90ba8b 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -88,7 +88,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
ForkedBroker.h \
ManagementTest.cpp \
MessageReplayTracker.cpp \
- ConsoleTest.cpp
+ ConsoleTest.cpp \
+ QueueEvents.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -198,7 +199,7 @@ DispatcherTest_LDADD=$(lib_common)
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test
EXTRA_DIST += \
run_test vg_check \
@@ -219,6 +220,7 @@ EXTRA_DIST += \
MessageUtils.h \
TestMessageStore.h \
TxMocks.h \
+ replication_test \
start_cluster stop_cluster restart_cluster
check_LTLIBRARIES += libdlclose_noop.la
diff --git a/cpp/src/tests/QueueEvents.cpp b/cpp/src/tests/QueueEvents.cpp
new file mode 100644
index 0000000000..7aea23922d
--- /dev/null
+++ b/cpp/src/tests/QueueEvents.cpp
@@ -0,0 +1,233 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageUtils.h"
+#include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueEvents.h"
+#include "qpid/client/QueueOptions.h"
+#include "qpid/framing/SequenceNumber.h"
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+QPID_AUTO_TEST_SUITE(QueueEventsSuite)
+
+using namespace qpid::client;
+using namespace qpid::broker;
+using namespace qpid::sys;
+using qpid::framing::SequenceNumber;
+
+struct DummyListener
+{
+ typedef std::deque<QueueEvents::Event> Events;
+
+ Events events;
+ boost::shared_ptr<Poller> poller;
+
+ void handle(QueueEvents::Event e)
+ {
+ if (events.empty()) {
+ BOOST_FAIL("Unexpected event received");
+ } else {
+ BOOST_CHECK_EQUAL(events.front().type, e.type);
+ BOOST_CHECK_EQUAL(events.front().msg.queue, e.msg.queue);
+ BOOST_CHECK_EQUAL(events.front().msg.payload, e.msg.payload);
+ BOOST_CHECK_EQUAL(events.front().msg.position, e.msg.position);
+ events.pop_front();
+ }
+ if (events.empty() && poller) poller->shutdown();
+ }
+
+ void expect(QueueEvents::Event e)
+ {
+ events.push_back(e);
+ }
+};
+
+QPID_AUTO_TEST_CASE(testBasicEventProcessing)
+{
+ boost::shared_ptr<Poller> poller(new Poller());
+ sys::Dispatcher dispatcher(poller);
+ Thread dispatchThread(dispatcher);
+ QueueEvents events(poller);
+ DummyListener listener;
+ listener.poller = poller;
+ events.registerListener("dummy", boost::bind(&DummyListener::handle, &listener, _1));
+ //signal occurence of some events:
+ Queue queue("queue1");
+ SequenceNumber id;
+ QueuedMessage event1(&queue, MessageUtils::createMessage(), id);
+ QueuedMessage event2(&queue, MessageUtils::createMessage(), ++id);
+
+ events.enqueued(event1);
+ events.enqueued(event2);
+ events.dequeued(event1);
+ //define events expected by listener:
+ listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event1));
+ listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event2));
+ listener.expect(QueueEvents::Event(QueueEvents::DEQUEUE, event1));
+
+ dispatchThread.join();
+ events.shutdown();
+ events.unregisterListener("dummy");
+}
+
+
+struct EventRecorder
+{
+ struct EventRecord
+ {
+ QueueEvents::EventType type;
+ std::string queue;
+ std::string content;
+ SequenceNumber position;
+ };
+
+ typedef std::deque<EventRecord> Events;
+
+ Events events;
+
+ void handle(QueueEvents::Event event)
+ {
+ EventRecord record;
+ record.type = event.type;
+ record.queue = event.msg.queue->getName();
+ event.msg.payload->getFrames().getContent(record.content);
+ record.position = event.msg.position;
+ events.push_back(record);
+ }
+
+ void check(QueueEvents::EventType type, const std::string& queue, const std::string& content, const SequenceNumber& position)
+ {
+ if (events.empty()) {
+ BOOST_FAIL("Missed event");
+ } else {
+ BOOST_CHECK_EQUAL(events.front().type, type);
+ BOOST_CHECK_EQUAL(events.front().queue, queue);
+ BOOST_CHECK_EQUAL(events.front().content, content);
+ BOOST_CHECK_EQUAL(events.front().position, position);
+ events.pop_front();
+ }
+ }
+ void checkEnqueue(const std::string& queue, const std::string& data, const SequenceNumber& position)
+ {
+ check(QueueEvents::ENQUEUE, queue, data, position);
+ }
+
+ void checkDequeue(const std::string& queue, const std::string& data, const SequenceNumber& position)
+ {
+ check(QueueEvents::DEQUEUE, queue, data, position);
+ }
+};
+
+QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
+{
+ ProxySessionFixture fixture;
+ //register dummy event listener to broker
+ EventRecorder listener;
+ fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
+
+ //declare queue with event options specified
+ QueueOptions options;
+ options.enableQueueEvents(false);
+ std::string q("queue-events-test");
+ fixture.session.queueDeclare(arg::queue=q, arg::arguments=options);
+ //send and consume some messages
+ LocalQueue incoming;
+ Subscription sub = fixture.subs.subscribe(incoming, q);
+ for (int i = 0; i < 5; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 0; i < 3; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ for (int i = 5; i < 10; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 3; i < 10; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ fixture.connection.close();
+ fixture.shutdownBroker();
+
+ //check listener was notified of all events, and in correct order
+ SequenceNumber enqueueId(1);
+ SequenceNumber dequeueId(1);
+ for (int i = 0; i < 5; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 0; i < 3; i++) {
+ listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++);
+ }
+ for (int i = 5; i < 10; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 3; i < 10; i++) {
+ listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
+{
+ ProxySessionFixture fixture;
+ //register dummy event listener to broker
+ EventRecorder listener;
+ fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
+
+ //declare queue with event options specified
+ QueueOptions options;
+ options.enableQueueEvents(true);
+ std::string q("queue-events-test");
+ fixture.session.queueDeclare(arg::queue=q, arg::arguments=options);
+ //send and consume some messages
+ LocalQueue incoming;
+ Subscription sub = fixture.subs.subscribe(incoming, q);
+ for (int i = 0; i < 5; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 0; i < 3; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ for (int i = 5; i < 10; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 3; i < 10; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ fixture.connection.close();
+ fixture.shutdownBroker();
+
+ //check listener was notified of all events, and in correct order
+ SequenceNumber enqueueId(1);
+ SequenceNumber dequeueId(1);
+ for (int i = 0; i < 5; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 5; i < 10; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+
diff --git a/cpp/src/tests/replication_test b/cpp/src/tests/replication_test
new file mode 100755
index 0000000000..6bf8041343
--- /dev/null
+++ b/cpp/src/tests/replication_test
@@ -0,0 +1,98 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run the federation tests.
+MY_DIR=`dirname \`which $0\``
+PYTHON_DIR=${MY_DIR}/../../../python
+
+trap stop_brokers INT TERM QUIT
+
+stop_brokers() {
+ if [[ $BROKER_A ]] ; then
+ ../qpidd -q --port $BROKER_A
+ unset BROKER_A
+ fi
+ if [[ $BROKER_B ]] ; then
+ ../qpidd -q --port $BROKER_B
+ unset BROKER_B
+ fi
+}
+
+if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true > qpidd.port
+ BROKER_A=`cat qpidd.port`
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so > qpidd.port
+ BROKER_B=`cat qpidd.port`
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+ $PYTHON_DIR/commands/qpid-route queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+
+ #create test queues
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-b
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c
+
+ #publish and consume from test queus on broker A:
+ rm -f queue-*.repl
+ for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done
+ for i in `seq 1 20`; do echo Message $i for B >> queue-b-input.repl; done
+ for i in `seq 1 15`; do echo Message $i for C >> queue-c-input.repl; done
+
+ ./sender --port $BROKER_A --routing-key queue-a --send-eos 1 < queue-a-input.repl
+ ./sender --port $BROKER_A --routing-key queue-b --send-eos 1 < queue-b-input.repl
+ ./sender --port $BROKER_A --routing-key queue-c --send-eos 1 < queue-c-input.repl
+
+ ./receiver --port $BROKER_A --queue queue-a --messages 5 > /dev/null
+ ./receiver --port $BROKER_A --queue queue-b --messages 10 > /dev/null
+ ./receiver --port $BROKER_A --queue queue-c --messages 10 > /dev/null
+
+ #shutdown broker A then check that broker Bs versions of the queues are as expected
+ ../qpidd -q --port $BROKER_A
+ unset BROKER_A
+
+ #validate replicated queues:
+ ./receiver --port $BROKER_B --queue queue-a > queue-a-backup.repl
+ ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl
+ ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl
+ tail -5 queue-a-input.repl > queue-a-expected.repl
+ tail -10 queue-b-input.repl > queue-b-expected.repl
+ diff queue-a-backup.repl queue-a-expected.repl || FAIL=1
+ diff queue-b-backup.repl queue-b-expected.repl || FAIL=1
+ diff queue-c-backup.repl queue-c-input.repl || FAIL=1
+
+ if [[ $FAIL ]]; then
+ echo replication test failed: expectations not met!
+ else
+ echo queue state replicated as expected
+ rm queue-*.repl
+ fi
+
+ stop_brokers
+else
+ echo "Skipping replication test, plugins not built or python utils not located"
+fi
+