summaryrefslogtreecommitdiff
path: root/cpp/tests/EventChannelThreadsTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/tests/EventChannelThreadsTest.cpp')
-rw-r--r--cpp/tests/EventChannelThreadsTest.cpp247
1 files changed, 247 insertions, 0 deletions
diff --git a/cpp/tests/EventChannelThreadsTest.cpp b/cpp/tests/EventChannelThreadsTest.cpp
new file mode 100644
index 0000000000..285ed29518
--- /dev/null
+++ b/cpp/tests/EventChannelThreadsTest.cpp
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <boost/bind.hpp>
+
+#include <sys/Socket.h>
+#include <posix/EventChannelThreads.h>
+#include <qpid_test_plugin.h>
+
+
+using namespace std;
+
+using namespace qpid::sys;
+
+const int nConnections = 5;
+const int nMessages = 10; // Messages read/written per connection.
+
+
+// Accepts + reads + writes.
+const int totalEvents = nConnections+2*nConnections*nMessages;
+
+/**
+ * Messages are numbered 0..nMessages.
+ * We count the total number of events, and the
+ * number of reads and writes for each message number.
+ */
+class TestResults : public Monitor {
+ public:
+ TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {}
+
+ void countEvent() {
+ if (--nEventsRemaining == 0)
+ shutdown();
+ }
+
+ void countRead(int messageNo) {
+ ++reads[messageNo];
+ countEvent();
+ }
+
+ void countWrite(int messageNo) {
+ ++writes[messageNo];
+ countEvent();
+ }
+
+ void shutdown(const std::string& exceptionMsg = std::string()) {
+ ScopedLock lock(*this);
+ exception = exceptionMsg;
+ isShutdown = true;
+ notifyAll();
+ }
+
+ void wait() {
+ ScopedLock lock(*this);
+ Time deadline = now() + 10*TIME_SEC;
+ while (!isShutdown) {
+ CPPUNIT_ASSERT(Monitor::wait(deadline));
+ }
+ }
+
+ bool isShutdown;
+ std::string exception;
+ AtomicCount reads[nMessages];
+ AtomicCount writes[nMessages];
+ AtomicCount nEventsRemaining;
+};
+
+TestResults results;
+
+EventChannelThreads::shared_ptr threads;
+
+// Functor to wrap callbacks in try/catch.
+class SafeCallback {
+ public:
+ SafeCallback(Runnable& r) : callback(r.functor()) {}
+ SafeCallback(Event::Callback cb) : callback(cb) {}
+
+ void operator()() {
+ std::string exception;
+ try {
+ callback();
+ return;
+ }
+ catch (const std::exception& e) {
+ exception = e.what();
+ }
+ catch (...) {
+ exception = "Unknown exception.";
+ }
+ results.shutdown(exception);
+ }
+
+ private:
+ Event::Callback callback;
+};
+
+/** Repost an event N times. */
+class Repost {
+ public:
+ Repost(int n) : count (n) {}
+ virtual ~Repost() {}
+
+ void repost(Event* event) {
+ if (--count==0) {
+ delete event;
+ } else {
+ threads->postEvent(event);
+ }
+ }
+ private:
+ int count;
+};
+
+
+
+/** Repeating read event. */
+class TestReadEvent : public ReadEvent, public Runnable, private Repost {
+ public:
+ explicit TestReadEvent(int fd=-1) :
+ ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)),
+ Repost(nMessages)
+ {}
+
+ void run() {
+ CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize());
+ CPPUNIT_ASSERT(0 <= value);
+ CPPUNIT_ASSERT(value < nMessages);
+ results.countRead(value);
+ repost(this);
+ }
+
+ private:
+ int value;
+ ReadEvent original;
+};
+
+
+/** Fire and forget write event */
+class TestWriteEvent : public WriteEvent, public Runnable, private Repost {
+ public:
+ TestWriteEvent(int fd=-1) :
+ WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)),
+ Repost(nMessages),
+ value(0)
+ {}
+
+ void run() {
+ CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize());
+ results.countWrite(value++);
+ repost(this);
+ }
+
+ private:
+ int value;
+};
+
+/** Fire-and-forget Accept event, posts reads on the accepted connection. */
+class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost {
+ public:
+ TestAcceptEvent(int fd=-1) :
+ AcceptEvent(fd, SafeCallback(*this)),
+ Repost(nConnections)
+ {}
+
+ void run() {
+ threads->postEvent(new TestReadEvent(getAcceptedDesscriptor()));
+ results.countEvent();
+ repost(this);
+ }
+};
+
+class EventChannelThreadsTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(EventChannelThreadsTest);
+ CPPUNIT_TEST(testThreads);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void setUp() {
+ threads = EventChannelThreads::create(EventChannel::create());
+ }
+
+ void tearDown() {
+ threads.reset();
+ }
+
+ void testThreads()
+ {
+ Socket listener = Socket::createTcp();
+ int port = listener.listen();
+
+ // Post looping accept events, will repost nConnections times.
+ // The accept event will automatically post read events.
+ threads->postEvent(new TestAcceptEvent(listener.fd()));
+
+ // Make connections.
+ Socket connections[nConnections];
+ for (int i = 0; i < nConnections; ++i) {
+ connections[i] = Socket::createTcp();
+ connections[i].connect("localhost", port);
+ }
+
+ // Post looping write events.
+ for (int i = 0; i < nConnections; ++i) {
+ threads->postEvent(new TestWriteEvent(connections[i].fd()));
+ }
+
+ // Wait for all events to be dispatched.
+ results.wait();
+
+ if (!results.exception.empty()) CPPUNIT_FAIL(results.exception);
+ CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining));
+
+ // Expect a read and write for each messageNo from each connection.
+ for (int messageNo = 0; messageNo < nMessages; ++messageNo) {
+ CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo]));
+ CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo]));
+ }
+
+ threads->shutdown();
+ threads->join();
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest);
+