summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/QueueEvents.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-15 11:29:38 +0000
committerGordon Sim <gsim@apache.org>2009-01-15 11:29:38 +0000
commit62371078f1f853a05d0ff2e31190baf018e4afc9 (patch)
tree1dcdb048c61b02aa8b88156782956d220e7d9ba7 /qpid/cpp/src/tests/QueueEvents.cpp
parent0e301ae9e147701bc0c0e522a02ddb41683ff607 (diff)
downloadqpid-python-62371078f1f853a05d0ff2e31190baf018e4afc9.tar.gz
QPID-1567: Initial support for asynchronous queue state replication
* Added QueueEvents class with per broker instance * Modified qpid::broker::Queue to notify QueueEvents of enqueues and dequeues (based on configuration) * Added replication subdir containing two plugins: - an event listener that registers with QueueEvents and creates messages representing received events on a replication queue - a custom exchange type for processing messages of the format created by the listener plugin * Added new option for controlling event generation to qpid::client::QueueOptions * Added new queue option to qpid-config script for the same git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@734674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/QueueEvents.cpp')
-rw-r--r--qpid/cpp/src/tests/QueueEvents.cpp233
1 files changed, 233 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/QueueEvents.cpp b/qpid/cpp/src/tests/QueueEvents.cpp
new file mode 100644
index 0000000000..7aea23922d
--- /dev/null
+++ b/qpid/cpp/src/tests/QueueEvents.cpp
@@ -0,0 +1,233 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageUtils.h"
+#include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueEvents.h"
+#include "qpid/client/QueueOptions.h"
+#include "qpid/framing/SequenceNumber.h"
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+QPID_AUTO_TEST_SUITE(QueueEventsSuite)
+
+using namespace qpid::client;
+using namespace qpid::broker;
+using namespace qpid::sys;
+using qpid::framing::SequenceNumber;
+
+struct DummyListener
+{
+ typedef std::deque<QueueEvents::Event> Events;
+
+ Events events;
+ boost::shared_ptr<Poller> poller;
+
+ void handle(QueueEvents::Event e)
+ {
+ if (events.empty()) {
+ BOOST_FAIL("Unexpected event received");
+ } else {
+ BOOST_CHECK_EQUAL(events.front().type, e.type);
+ BOOST_CHECK_EQUAL(events.front().msg.queue, e.msg.queue);
+ BOOST_CHECK_EQUAL(events.front().msg.payload, e.msg.payload);
+ BOOST_CHECK_EQUAL(events.front().msg.position, e.msg.position);
+ events.pop_front();
+ }
+ if (events.empty() && poller) poller->shutdown();
+ }
+
+ void expect(QueueEvents::Event e)
+ {
+ events.push_back(e);
+ }
+};
+
+QPID_AUTO_TEST_CASE(testBasicEventProcessing)
+{
+ boost::shared_ptr<Poller> poller(new Poller());
+ sys::Dispatcher dispatcher(poller);
+ Thread dispatchThread(dispatcher);
+ QueueEvents events(poller);
+ DummyListener listener;
+ listener.poller = poller;
+ events.registerListener("dummy", boost::bind(&DummyListener::handle, &listener, _1));
+ //signal occurence of some events:
+ Queue queue("queue1");
+ SequenceNumber id;
+ QueuedMessage event1(&queue, MessageUtils::createMessage(), id);
+ QueuedMessage event2(&queue, MessageUtils::createMessage(), ++id);
+
+ events.enqueued(event1);
+ events.enqueued(event2);
+ events.dequeued(event1);
+ //define events expected by listener:
+ listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event1));
+ listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event2));
+ listener.expect(QueueEvents::Event(QueueEvents::DEQUEUE, event1));
+
+ dispatchThread.join();
+ events.shutdown();
+ events.unregisterListener("dummy");
+}
+
+
+struct EventRecorder
+{
+ struct EventRecord
+ {
+ QueueEvents::EventType type;
+ std::string queue;
+ std::string content;
+ SequenceNumber position;
+ };
+
+ typedef std::deque<EventRecord> Events;
+
+ Events events;
+
+ void handle(QueueEvents::Event event)
+ {
+ EventRecord record;
+ record.type = event.type;
+ record.queue = event.msg.queue->getName();
+ event.msg.payload->getFrames().getContent(record.content);
+ record.position = event.msg.position;
+ events.push_back(record);
+ }
+
+ void check(QueueEvents::EventType type, const std::string& queue, const std::string& content, const SequenceNumber& position)
+ {
+ if (events.empty()) {
+ BOOST_FAIL("Missed event");
+ } else {
+ BOOST_CHECK_EQUAL(events.front().type, type);
+ BOOST_CHECK_EQUAL(events.front().queue, queue);
+ BOOST_CHECK_EQUAL(events.front().content, content);
+ BOOST_CHECK_EQUAL(events.front().position, position);
+ events.pop_front();
+ }
+ }
+ void checkEnqueue(const std::string& queue, const std::string& data, const SequenceNumber& position)
+ {
+ check(QueueEvents::ENQUEUE, queue, data, position);
+ }
+
+ void checkDequeue(const std::string& queue, const std::string& data, const SequenceNumber& position)
+ {
+ check(QueueEvents::DEQUEUE, queue, data, position);
+ }
+};
+
+QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
+{
+ ProxySessionFixture fixture;
+ //register dummy event listener to broker
+ EventRecorder listener;
+ fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
+
+ //declare queue with event options specified
+ QueueOptions options;
+ options.enableQueueEvents(false);
+ std::string q("queue-events-test");
+ fixture.session.queueDeclare(arg::queue=q, arg::arguments=options);
+ //send and consume some messages
+ LocalQueue incoming;
+ Subscription sub = fixture.subs.subscribe(incoming, q);
+ for (int i = 0; i < 5; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 0; i < 3; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ for (int i = 5; i < 10; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 3; i < 10; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ fixture.connection.close();
+ fixture.shutdownBroker();
+
+ //check listener was notified of all events, and in correct order
+ SequenceNumber enqueueId(1);
+ SequenceNumber dequeueId(1);
+ for (int i = 0; i < 5; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 0; i < 3; i++) {
+ listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++);
+ }
+ for (int i = 5; i < 10; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 3; i < 10; i++) {
+ listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
+{
+ ProxySessionFixture fixture;
+ //register dummy event listener to broker
+ EventRecorder listener;
+ fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
+
+ //declare queue with event options specified
+ QueueOptions options;
+ options.enableQueueEvents(true);
+ std::string q("queue-events-test");
+ fixture.session.queueDeclare(arg::queue=q, arg::arguments=options);
+ //send and consume some messages
+ LocalQueue incoming;
+ Subscription sub = fixture.subs.subscribe(incoming, q);
+ for (int i = 0; i < 5; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 0; i < 3; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ for (int i = 5; i < 10; i++) {
+ fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
+ }
+ for (int i = 3; i < 10; i++) {
+ BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
+ }
+ fixture.connection.close();
+ fixture.shutdownBroker();
+
+ //check listener was notified of all events, and in correct order
+ SequenceNumber enqueueId(1);
+ SequenceNumber dequeueId(1);
+ for (int i = 0; i < 5; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+ for (int i = 5; i < 10; i++) {
+ listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
+ }
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+