summaryrefslogtreecommitdiff
path: root/cpp/test
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-29 14:36:08 +0000
committerAlan Conway <aconway@apache.org>2006-11-29 14:36:08 +0000
commitb13e1a24fcca8797b7be5a242f164afbe17ec4f6 (patch)
treeef0362e52c125bc75b07ef3e374dabfa52254e98 /cpp/test
parent16d818e749462daf5e0e43079b2e48991646c619 (diff)
downloadqpid-python-b13e1a24fcca8797b7be5a242f164afbe17ec4f6.tar.gz
Posix EventChannel implementation using epoll. Placeholder for kevents.
Dynamic thread pool EventChannelThreads to serve EventChannel. Misc cleanup/enhancements. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480582 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/test')
-rw-r--r--cpp/test/client/client_test.cpp4
-rw-r--r--cpp/test/client/echo_service.cpp4
-rw-r--r--cpp/test/client/topic_listener.cpp13
-rw-r--r--cpp/test/client/topic_publisher.cpp12
-rw-r--r--cpp/test/unit/qpid/ExceptionTest.cpp61
-rw-r--r--cpp/test/unit/qpid/posix/EventChannelTest.cpp187
-rw-r--r--cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp247
7 files changed, 514 insertions, 14 deletions
diff --git a/cpp/test/client/client_test.cpp b/cpp/test/client/client_test.cpp
index 18b162ec8a..0a3c300f4a 100644
--- a/cpp/test/client/client_test.cpp
+++ b/cpp/test/client/client_test.cpp
@@ -90,7 +90,9 @@ int main(int argc, char**)
con.close();
std::cout << "Closed connection." << std::endl;
}catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << "Error [" << error.code << "] " << error.msg << " ("
+ << error.location.file << ":" << error.location.line
+ << ")" << std::endl;
return 1;
}
return 0;
diff --git a/cpp/test/client/echo_service.cpp b/cpp/test/client/echo_service.cpp
index f0aa49fd4b..3df3da0b86 100644
--- a/cpp/test/client/echo_service.cpp
+++ b/cpp/test/client/echo_service.cpp
@@ -107,7 +107,7 @@ int main(int argc, char** argv){
connection.close();
} catch(qpid::QpidError error) {
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << error.what() << std::endl;
}
} else {
try {
@@ -133,7 +133,7 @@ int main(int argc, char** argv){
connection.close();
} catch(qpid::QpidError error) {
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << error.what() << std::endl;
}
}
}
diff --git a/cpp/test/client/topic_listener.cpp b/cpp/test/client/topic_listener.cpp
index 413d482361..bd7cfdc62c 100644
--- a/cpp/test/client/topic_listener.cpp
+++ b/cpp/test/client/topic_listener.cpp
@@ -38,7 +38,7 @@ class Listener : public MessageListener{
const bool transactional;
bool init;
int count;
- int64_t start;
+ Time start;
void shutdown();
void report();
@@ -96,7 +96,7 @@ int main(int argc, char** argv){
channel.run();
connection.close();
}catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << error.what() << std::endl;
}
}
}
@@ -106,7 +106,7 @@ Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) :
void Listener::received(Message& message){
if(!init){
- start = Time::now().msecs();
+ start = now();
count = 0;
init = true;
}
@@ -128,10 +128,11 @@ void Listener::shutdown(){
}
void Listener::report(){
- int64_t finish = Time::now().msecs();
- int64_t time = finish - start;
+ Time finish = now();
+ Time time = finish - start;
std::stringstream reportstr;
- reportstr << "Received " << count << " messages in " << time << " ms.";
+ reportstr << "Received " << count << " messages in "
+ << time/TIME_MSEC << " ms.";
Message msg;
msg.setData(reportstr.str());
channel->publish(msg, string(), responseQueue);
diff --git a/cpp/test/client/topic_publisher.cpp b/cpp/test/client/topic_publisher.cpp
index d9f271e2f0..97d589c1d1 100644
--- a/cpp/test/client/topic_publisher.cpp
+++ b/cpp/test/client/topic_publisher.cpp
@@ -114,11 +114,13 @@ int main(int argc, char** argv){
int64_t sum(0);
for(int i = 0; i < batchSize; i++){
if(i > 0 && args.getDelay()) sleep(args.getDelay());
- int64_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize());
+ Time time = publisher.publish(
+ args.getMessages(), args.getSubscribers(), args.getSize());
if(!max || time > max) max = time;
if(!min || time < min) min = time;
sum += time;
- std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl;
+ std::cout << "Completed " << (i+1) << " of " << batchSize
+ << " in " << time/TIME_MSEC << "ms" << std::endl;
}
publisher.terminate();
int64_t avg = sum / batchSize;
@@ -129,7 +131,7 @@ int main(int argc, char** argv){
channel.close();
connection.close();
}catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << error.what() << std::endl;
}
}
}
@@ -153,7 +155,7 @@ void Publisher::waitForCompletion(int msgs){
int64_t Publisher::publish(int msgs, int listeners, int size){
Message msg;
msg.setData(generateData(size));
- int64_t start = Time::now().msecs();
+ Time start = now();
{
Monitor::ScopedLock l(monitor);
for(int i = 0; i < msgs; i++){
@@ -170,7 +172,7 @@ int64_t Publisher::publish(int msgs, int listeners, int size){
waitForCompletion(listeners);
}
- int64_t finish(Time::now().msecs());
+ Time finish = now();
return finish - start;
}
diff --git a/cpp/test/unit/qpid/ExceptionTest.cpp b/cpp/test/unit/qpid/ExceptionTest.cpp
new file mode 100644
index 0000000000..7c3261dc29
--- /dev/null
+++ b/cpp/test/unit/qpid/ExceptionTest.cpp
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <qpid/Exception.h>
+#include <qpid_test_plugin.h>
+
+using namespace qpid;
+
+struct CountDestroyedException : public Exception {
+ int& count;
+ static int staticCount;
+ CountDestroyedException() : count(staticCount) { }
+ CountDestroyedException(int& n) : count(n) {}
+ ~CountDestroyedException() throw() { count++; }
+ void throwSelf() const { throw *this; }
+};
+
+int CountDestroyedException::staticCount = 0;
+
+
+class ExceptionTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ExceptionTest);
+ CPPUNIT_TEST(testHeapException);
+ CPPUNIT_TEST_SUITE_END();
+ public:
+ // Verify proper memory management for heap-allocated exceptions.
+ void testHeapException() {
+ int count = 0;
+ try {
+ std::auto_ptr<Exception> p(
+ new CountDestroyedException(count));
+ p.release()->throwSelf();
+ CPPUNIT_FAIL("Expected CountDestroyedException.");
+ } catch (const CountDestroyedException& e) {
+ CPPUNIT_ASSERT(&e.count == &count);
+ }
+ CPPUNIT_ASSERT_EQUAL(1, count);
+ }
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ExceptionTest);
+
diff --git a/cpp/test/unit/qpid/posix/EventChannelTest.cpp b/cpp/test/unit/qpid/posix/EventChannelTest.cpp
new file mode 100644
index 0000000000..8846a0e340
--- /dev/null
+++ b/cpp/test/unit/qpid/posix/EventChannelTest.cpp
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/posix/EventChannel.h>
+#include <qpid/posix/check.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/sys/Socket.h>
+#include <qpid/sys/Thread.h>
+#include <qpid_test_plugin.h>
+
+#include <sys/socket.h>
+#include <signal.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <iostream>
+
+using namespace qpid::sys;
+
+
+const char hello[] = "hello";
+const size_t size = sizeof(hello);
+
+struct RunMe : public Runnable
+{
+ bool ran;
+ RunMe() : ran(false) {}
+ void run() { ran = true; }
+};
+
+class EventChannelTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(EventChannelTest);
+ CPPUNIT_TEST(testEvent);
+ CPPUNIT_TEST(testRead);
+ CPPUNIT_TEST(testFailedRead);
+ CPPUNIT_TEST(testWrite);
+ CPPUNIT_TEST(testFailedWrite);
+ CPPUNIT_TEST(testReadWrite);
+ CPPUNIT_TEST(testAccept);
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+ EventChannel::shared_ptr ec;
+ int pipe[2];
+ char readBuf[size];
+
+ public:
+
+ void setUp()
+ {
+ memset(readBuf, size, 0);
+ ec = EventChannel::create();
+ if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno);
+ // Ignore SIGPIPE, otherwise we will crash writing to broken pipe.
+ signal(SIGPIPE, SIG_IGN);
+ }
+
+ // Verify that calling getEvent returns event.
+ template <class T> bool isNextEvent(T& event)
+ {
+ return &event == dynamic_cast<T*>(ec->getEvent());
+ }
+
+ template <class T> bool isNextEventOk(T& event)
+ {
+ Event* next = ec->getEvent();
+ if (next) next->throwIfError();
+ return &event == next;
+ }
+
+ void testEvent()
+ {
+ RunMe runMe;
+ CPPUNIT_ASSERT(!runMe.ran);
+ // Instances of Event just pass thru the channel immediately.
+ Event e(runMe.functor());
+ ec->postEvent(e);
+ CPPUNIT_ASSERT(isNextEventOk(e));
+ e.dispatch();
+ CPPUNIT_ASSERT(runMe.ran);
+ }
+
+ void testRead() {
+ ReadEvent re(pipe[0], readBuf, size);
+ ec->postEvent(re);
+ CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size));
+ CPPUNIT_ASSERT(isNextEventOk(re));
+ CPPUNIT_ASSERT_EQUAL(size, re.getSize());
+ CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+ }
+
+ void testFailedRead()
+ {
+ ReadEvent re(pipe[0], readBuf, size);
+ ec->postEvent(re);
+
+ // EOF before all data read.
+ ::close(pipe[1]);
+ CPPUNIT_ASSERT(isNextEvent(re));
+ CPPUNIT_ASSERT(re.hasError());
+ try {
+ re.throwIfError();
+ CPPUNIT_FAIL("Expected QpidError.");
+ }
+ catch (const qpid::QpidError&) { }
+
+ // Bad file descriptor. Note in this case we fail
+ // in postEvent and throw immediately.
+ try {
+ ReadEvent bad;
+ ec->postEvent(bad);
+ CPPUNIT_FAIL("Expected QpidError.");
+ }
+ catch (const qpid::QpidError&) { }
+ }
+
+ void testWrite() {
+ WriteEvent wr(pipe[1], hello, size);
+ ec->postEvent(wr);
+ CPPUNIT_ASSERT(isNextEventOk(wr));
+ CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));;
+ CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+ }
+
+ void testFailedWrite() {
+ WriteEvent wr(pipe[1], hello, size);
+ ::close(pipe[0]);
+ ec->postEvent(wr);
+ CPPUNIT_ASSERT(isNextEvent(wr));
+ CPPUNIT_ASSERT(wr.hasError());
+ }
+
+ void testReadWrite()
+ {
+ ReadEvent re(pipe[0], readBuf, size);
+ WriteEvent wr(pipe[1], hello, size);
+ ec->postEvent(re);
+ ec->postEvent(wr);
+ ec->getEvent();
+ ec->getEvent();
+ CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+ }
+
+ void testAccept() {
+ Socket s = Socket::createTcp();
+ int port = s.listen(0, 10);
+ CPPUNIT_ASSERT(port != 0);
+
+ AcceptEvent ae(s.fd());
+ ec->postEvent(ae);
+ Socket client = Socket::createTcp();
+ client.connect("localhost", port);
+ CPPUNIT_ASSERT(isNextEvent(ae));
+ ae.dispatch();
+
+ // Verify client writes are read by the accepted descriptor.
+ char readBuf[size];
+ ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size);
+ ec->postEvent(re);
+ CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello)));
+ CPPUNIT_ASSERT(isNextEvent(re));
+ re.dispatch();
+ CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest);
+
diff --git a/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp b/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp
new file mode 100644
index 0000000000..5c467880be
--- /dev/null
+++ b/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <boost/bind.hpp>
+
+#include <qpid/sys/Socket.h>
+#include <qpid/posix/EventChannelThreads.h>
+#include <qpid_test_plugin.h>
+
+
+using namespace std;
+
+using namespace qpid::sys;
+
+const int nConnections = 5;
+const int nMessages = 10; // Messages read/written per connection.
+
+
+// Accepts + reads + writes.
+const int totalEvents = nConnections+2*nConnections*nMessages;
+
+/**
+ * Messages are numbered 0..nMessages.
+ * We count the total number of events, and the
+ * number of reads and writes for each message number.
+ */
+class TestResults : public Monitor {
+ public:
+ TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {}
+
+ void countEvent() {
+ if (--nEventsRemaining == 0)
+ shutdown();
+ }
+
+ void countRead(int messageNo) {
+ ++reads[messageNo];
+ countEvent();
+ }
+
+ void countWrite(int messageNo) {
+ ++writes[messageNo];
+ countEvent();
+ }
+
+ void shutdown(const std::string& exceptionMsg = std::string()) {
+ ScopedLock lock(*this);
+ exception = exceptionMsg;
+ isShutdown = true;
+ notifyAll();
+ }
+
+ void wait() {
+ ScopedLock lock(*this);
+ Time deadline = now() + 10*TIME_SEC;
+ while (!isShutdown) {
+ CPPUNIT_ASSERT(Monitor::wait(deadline));
+ }
+ }
+
+ bool isShutdown;
+ std::string exception;
+ AtomicCount reads[nMessages];
+ AtomicCount writes[nMessages];
+ AtomicCount nEventsRemaining;
+};
+
+TestResults results;
+
+EventChannelThreads::shared_ptr threads;
+
+// Functor to wrap callbacks in try/catch.
+class SafeCallback {
+ public:
+ SafeCallback(Runnable& r) : callback(r.functor()) {}
+ SafeCallback(Event::Callback cb) : callback(cb) {}
+
+ void operator()() {
+ std::string exception;
+ try {
+ callback();
+ return;
+ }
+ catch (const std::exception& e) {
+ exception = e.what();
+ }
+ catch (...) {
+ exception = "Unknown exception.";
+ }
+ results.shutdown(exception);
+ }
+
+ private:
+ Event::Callback callback;
+};
+
+/** Repost an event N times. */
+class Repost {
+ public:
+ Repost(int n) : count (n) {}
+ virtual ~Repost() {}
+
+ void repost(Event* event) {
+ if (--count==0) {
+ delete event;
+ } else {
+ threads->postEvent(event);
+ }
+ }
+ private:
+ int count;
+};
+
+
+
+/** Repeating read event. */
+class TestReadEvent : public ReadEvent, public Runnable, private Repost {
+ public:
+ explicit TestReadEvent(int fd=-1) :
+ ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)),
+ Repost(nMessages)
+ {}
+
+ void run() {
+ CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize());
+ CPPUNIT_ASSERT(0 <= value);
+ CPPUNIT_ASSERT(value < nMessages);
+ results.countRead(value);
+ repost(this);
+ }
+
+ private:
+ int value;
+ ReadEvent original;
+};
+
+
+/** Fire and forget write event */
+class TestWriteEvent : public WriteEvent, public Runnable, private Repost {
+ public:
+ TestWriteEvent(int fd=-1) :
+ WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)),
+ Repost(nMessages),
+ value(0)
+ {}
+
+ void run() {
+ CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize());
+ results.countWrite(value++);
+ repost(this);
+ }
+
+ private:
+ int value;
+};
+
+/** Fire-and-forget Accept event, posts reads on the accepted connection. */
+class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost {
+ public:
+ TestAcceptEvent(int fd=-1) :
+ AcceptEvent(fd, SafeCallback(*this)),
+ Repost(nConnections)
+ {}
+
+ void run() {
+ threads->postEvent(new TestReadEvent(getAcceptedDesscriptor()));
+ results.countEvent();
+ repost(this);
+ }
+};
+
+class EventChannelThreadsTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(EventChannelThreadsTest);
+ CPPUNIT_TEST(testThreads);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void setUp() {
+ threads = EventChannelThreads::create(EventChannel::create());
+ }
+
+ void tearDown() {
+ threads.reset();
+ }
+
+ void testThreads()
+ {
+ Socket listener = Socket::createTcp();
+ int port = listener.listen();
+
+ // Post looping accept events, will repost nConnections times.
+ // The accept event will automatically post read events.
+ threads->postEvent(new TestAcceptEvent(listener.fd()));
+
+ // Make connections.
+ Socket connections[nConnections];
+ for (int i = 0; i < nConnections; ++i) {
+ connections[i] = Socket::createTcp();
+ connections[i].connect("localhost", port);
+ }
+
+ // Post looping write events.
+ for (int i = 0; i < nConnections; ++i) {
+ threads->postEvent(new TestWriteEvent(connections[i].fd()));
+ }
+
+ // Wait for all events to be dispatched.
+ results.wait();
+
+ if (!results.exception.empty()) CPPUNIT_FAIL(results.exception);
+ CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining));
+
+ // Expect a read and write for each messageNo from each connection.
+ for (int messageNo = 0; messageNo < nMessages; ++messageNo) {
+ CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo]));
+ CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo]));
+ }
+
+ threads->shutdown();
+ threads->join();
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest);
+