diff options
author | Alan Conway <aconway@apache.org> | 2006-11-29 14:36:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-29 14:36:08 +0000 |
commit | b13e1a24fcca8797b7be5a242f164afbe17ec4f6 (patch) | |
tree | ef0362e52c125bc75b07ef3e374dabfa52254e98 /cpp/test | |
parent | 16d818e749462daf5e0e43079b2e48991646c619 (diff) | |
download | qpid-python-b13e1a24fcca8797b7be5a242f164afbe17ec4f6.tar.gz |
Posix EventChannel implementation using epoll. Placeholder for kevents.
Dynamic thread pool EventChannelThreads to serve EventChannel.
Misc cleanup/enhancements.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480582 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/test')
-rw-r--r-- | cpp/test/client/client_test.cpp | 4 | ||||
-rw-r--r-- | cpp/test/client/echo_service.cpp | 4 | ||||
-rw-r--r-- | cpp/test/client/topic_listener.cpp | 13 | ||||
-rw-r--r-- | cpp/test/client/topic_publisher.cpp | 12 | ||||
-rw-r--r-- | cpp/test/unit/qpid/ExceptionTest.cpp | 61 | ||||
-rw-r--r-- | cpp/test/unit/qpid/posix/EventChannelTest.cpp | 187 | ||||
-rw-r--r-- | cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp | 247 |
7 files changed, 514 insertions, 14 deletions
diff --git a/cpp/test/client/client_test.cpp b/cpp/test/client/client_test.cpp index 18b162ec8a..0a3c300f4a 100644 --- a/cpp/test/client/client_test.cpp +++ b/cpp/test/client/client_test.cpp @@ -90,7 +90,9 @@ int main(int argc, char**) con.close(); std::cout << "Closed connection." << std::endl; }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << "Error [" << error.code << "] " << error.msg << " (" + << error.location.file << ":" << error.location.line + << ")" << std::endl; return 1; } return 0; diff --git a/cpp/test/client/echo_service.cpp b/cpp/test/client/echo_service.cpp index f0aa49fd4b..3df3da0b86 100644 --- a/cpp/test/client/echo_service.cpp +++ b/cpp/test/client/echo_service.cpp @@ -107,7 +107,7 @@ int main(int argc, char** argv){ connection.close(); } catch(qpid::QpidError error) { - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << error.what() << std::endl; } } else { try { @@ -133,7 +133,7 @@ int main(int argc, char** argv){ connection.close(); } catch(qpid::QpidError error) { - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << error.what() << std::endl; } } } diff --git a/cpp/test/client/topic_listener.cpp b/cpp/test/client/topic_listener.cpp index 413d482361..bd7cfdc62c 100644 --- a/cpp/test/client/topic_listener.cpp +++ b/cpp/test/client/topic_listener.cpp @@ -38,7 +38,7 @@ class Listener : public MessageListener{ const bool transactional; bool init; int count; - int64_t start; + Time start; void shutdown(); void report(); @@ -96,7 +96,7 @@ int main(int argc, char** argv){ channel.run(); connection.close(); }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << error.what() << std::endl; } } } @@ -106,7 +106,7 @@ Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) : void Listener::received(Message& message){ if(!init){ - start = Time::now().msecs(); + start = now(); count = 0; init = true; } @@ -128,10 +128,11 @@ void Listener::shutdown(){ } void Listener::report(){ - int64_t finish = Time::now().msecs(); - int64_t time = finish - start; + Time finish = now(); + Time time = finish - start; std::stringstream reportstr; - reportstr << "Received " << count << " messages in " << time << " ms."; + reportstr << "Received " << count << " messages in " + << time/TIME_MSEC << " ms."; Message msg; msg.setData(reportstr.str()); channel->publish(msg, string(), responseQueue); diff --git a/cpp/test/client/topic_publisher.cpp b/cpp/test/client/topic_publisher.cpp index d9f271e2f0..97d589c1d1 100644 --- a/cpp/test/client/topic_publisher.cpp +++ b/cpp/test/client/topic_publisher.cpp @@ -114,11 +114,13 @@ int main(int argc, char** argv){ int64_t sum(0); for(int i = 0; i < batchSize; i++){ if(i > 0 && args.getDelay()) sleep(args.getDelay()); - int64_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize()); + Time time = publisher.publish( + args.getMessages(), args.getSubscribers(), args.getSize()); if(!max || time > max) max = time; if(!min || time < min) min = time; sum += time; - std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl; + std::cout << "Completed " << (i+1) << " of " << batchSize + << " in " << time/TIME_MSEC << "ms" << std::endl; } publisher.terminate(); int64_t avg = sum / batchSize; @@ -129,7 +131,7 @@ int main(int argc, char** argv){ channel.close(); connection.close(); }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << error.what() << std::endl; } } } @@ -153,7 +155,7 @@ void Publisher::waitForCompletion(int msgs){ int64_t Publisher::publish(int msgs, int listeners, int size){ Message msg; msg.setData(generateData(size)); - int64_t start = Time::now().msecs(); + Time start = now(); { Monitor::ScopedLock l(monitor); for(int i = 0; i < msgs; i++){ @@ -170,7 +172,7 @@ int64_t Publisher::publish(int msgs, int listeners, int size){ waitForCompletion(listeners); } - int64_t finish(Time::now().msecs()); + Time finish = now(); return finish - start; } diff --git a/cpp/test/unit/qpid/ExceptionTest.cpp b/cpp/test/unit/qpid/ExceptionTest.cpp new file mode 100644 index 0000000000..7c3261dc29 --- /dev/null +++ b/cpp/test/unit/qpid/ExceptionTest.cpp @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <qpid/Exception.h> +#include <qpid_test_plugin.h> + +using namespace qpid; + +struct CountDestroyedException : public Exception { + int& count; + static int staticCount; + CountDestroyedException() : count(staticCount) { } + CountDestroyedException(int& n) : count(n) {} + ~CountDestroyedException() throw() { count++; } + void throwSelf() const { throw *this; } +}; + +int CountDestroyedException::staticCount = 0; + + +class ExceptionTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ExceptionTest); + CPPUNIT_TEST(testHeapException); + CPPUNIT_TEST_SUITE_END(); + public: + // Verify proper memory management for heap-allocated exceptions. + void testHeapException() { + int count = 0; + try { + std::auto_ptr<Exception> p( + new CountDestroyedException(count)); + p.release()->throwSelf(); + CPPUNIT_FAIL("Expected CountDestroyedException."); + } catch (const CountDestroyedException& e) { + CPPUNIT_ASSERT(&e.count == &count); + } + CPPUNIT_ASSERT_EQUAL(1, count); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ExceptionTest); + diff --git a/cpp/test/unit/qpid/posix/EventChannelTest.cpp b/cpp/test/unit/qpid/posix/EventChannelTest.cpp new file mode 100644 index 0000000000..8846a0e340 --- /dev/null +++ b/cpp/test/unit/qpid/posix/EventChannelTest.cpp @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/posix/EventChannel.h> +#include <qpid/posix/check.h> +#include <qpid/sys/Runnable.h> +#include <qpid/sys/Socket.h> +#include <qpid/sys/Thread.h> +#include <qpid_test_plugin.h> + +#include <sys/socket.h> +#include <signal.h> +#include <netinet/in.h> +#include <netdb.h> +#include <iostream> + +using namespace qpid::sys; + + +const char hello[] = "hello"; +const size_t size = sizeof(hello); + +struct RunMe : public Runnable +{ + bool ran; + RunMe() : ran(false) {} + void run() { ran = true; } +}; + +class EventChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(EventChannelTest); + CPPUNIT_TEST(testEvent); + CPPUNIT_TEST(testRead); + CPPUNIT_TEST(testFailedRead); + CPPUNIT_TEST(testWrite); + CPPUNIT_TEST(testFailedWrite); + CPPUNIT_TEST(testReadWrite); + CPPUNIT_TEST(testAccept); + CPPUNIT_TEST_SUITE_END(); + + private: + EventChannel::shared_ptr ec; + int pipe[2]; + char readBuf[size]; + + public: + + void setUp() + { + memset(readBuf, size, 0); + ec = EventChannel::create(); + if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno); + // Ignore SIGPIPE, otherwise we will crash writing to broken pipe. + signal(SIGPIPE, SIG_IGN); + } + + // Verify that calling getEvent returns event. + template <class T> bool isNextEvent(T& event) + { + return &event == dynamic_cast<T*>(ec->getEvent()); + } + + template <class T> bool isNextEventOk(T& event) + { + Event* next = ec->getEvent(); + if (next) next->throwIfError(); + return &event == next; + } + + void testEvent() + { + RunMe runMe; + CPPUNIT_ASSERT(!runMe.ran); + // Instances of Event just pass thru the channel immediately. + Event e(runMe.functor()); + ec->postEvent(e); + CPPUNIT_ASSERT(isNextEventOk(e)); + e.dispatch(); + CPPUNIT_ASSERT(runMe.ran); + } + + void testRead() { + ReadEvent re(pipe[0], readBuf, size); + ec->postEvent(re); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size)); + CPPUNIT_ASSERT(isNextEventOk(re)); + CPPUNIT_ASSERT_EQUAL(size, re.getSize()); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testFailedRead() + { + ReadEvent re(pipe[0], readBuf, size); + ec->postEvent(re); + + // EOF before all data read. + ::close(pipe[1]); + CPPUNIT_ASSERT(isNextEvent(re)); + CPPUNIT_ASSERT(re.hasError()); + try { + re.throwIfError(); + CPPUNIT_FAIL("Expected QpidError."); + } + catch (const qpid::QpidError&) { } + + // Bad file descriptor. Note in this case we fail + // in postEvent and throw immediately. + try { + ReadEvent bad; + ec->postEvent(bad); + CPPUNIT_FAIL("Expected QpidError."); + } + catch (const qpid::QpidError&) { } + } + + void testWrite() { + WriteEvent wr(pipe[1], hello, size); + ec->postEvent(wr); + CPPUNIT_ASSERT(isNextEventOk(wr)); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));; + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testFailedWrite() { + WriteEvent wr(pipe[1], hello, size); + ::close(pipe[0]); + ec->postEvent(wr); + CPPUNIT_ASSERT(isNextEvent(wr)); + CPPUNIT_ASSERT(wr.hasError()); + } + + void testReadWrite() + { + ReadEvent re(pipe[0], readBuf, size); + WriteEvent wr(pipe[1], hello, size); + ec->postEvent(re); + ec->postEvent(wr); + ec->getEvent(); + ec->getEvent(); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testAccept() { + Socket s = Socket::createTcp(); + int port = s.listen(0, 10); + CPPUNIT_ASSERT(port != 0); + + AcceptEvent ae(s.fd()); + ec->postEvent(ae); + Socket client = Socket::createTcp(); + client.connect("localhost", port); + CPPUNIT_ASSERT(isNextEvent(ae)); + ae.dispatch(); + + // Verify client writes are read by the accepted descriptor. + char readBuf[size]; + ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size); + ec->postEvent(re); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello))); + CPPUNIT_ASSERT(isNextEvent(re)); + re.dispatch(); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest); + diff --git a/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp b/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp new file mode 100644 index 0000000000..5c467880be --- /dev/null +++ b/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <boost/bind.hpp> + +#include <qpid/sys/Socket.h> +#include <qpid/posix/EventChannelThreads.h> +#include <qpid_test_plugin.h> + + +using namespace std; + +using namespace qpid::sys; + +const int nConnections = 5; +const int nMessages = 10; // Messages read/written per connection. + + +// Accepts + reads + writes. +const int totalEvents = nConnections+2*nConnections*nMessages; + +/** + * Messages are numbered 0..nMessages. + * We count the total number of events, and the + * number of reads and writes for each message number. + */ +class TestResults : public Monitor { + public: + TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {} + + void countEvent() { + if (--nEventsRemaining == 0) + shutdown(); + } + + void countRead(int messageNo) { + ++reads[messageNo]; + countEvent(); + } + + void countWrite(int messageNo) { + ++writes[messageNo]; + countEvent(); + } + + void shutdown(const std::string& exceptionMsg = std::string()) { + ScopedLock lock(*this); + exception = exceptionMsg; + isShutdown = true; + notifyAll(); + } + + void wait() { + ScopedLock lock(*this); + Time deadline = now() + 10*TIME_SEC; + while (!isShutdown) { + CPPUNIT_ASSERT(Monitor::wait(deadline)); + } + } + + bool isShutdown; + std::string exception; + AtomicCount reads[nMessages]; + AtomicCount writes[nMessages]; + AtomicCount nEventsRemaining; +}; + +TestResults results; + +EventChannelThreads::shared_ptr threads; + +// Functor to wrap callbacks in try/catch. +class SafeCallback { + public: + SafeCallback(Runnable& r) : callback(r.functor()) {} + SafeCallback(Event::Callback cb) : callback(cb) {} + + void operator()() { + std::string exception; + try { + callback(); + return; + } + catch (const std::exception& e) { + exception = e.what(); + } + catch (...) { + exception = "Unknown exception."; + } + results.shutdown(exception); + } + + private: + Event::Callback callback; +}; + +/** Repost an event N times. */ +class Repost { + public: + Repost(int n) : count (n) {} + virtual ~Repost() {} + + void repost(Event* event) { + if (--count==0) { + delete event; + } else { + threads->postEvent(event); + } + } + private: + int count; +}; + + + +/** Repeating read event. */ +class TestReadEvent : public ReadEvent, public Runnable, private Repost { + public: + explicit TestReadEvent(int fd=-1) : + ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)), + Repost(nMessages) + {} + + void run() { + CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize()); + CPPUNIT_ASSERT(0 <= value); + CPPUNIT_ASSERT(value < nMessages); + results.countRead(value); + repost(this); + } + + private: + int value; + ReadEvent original; +}; + + +/** Fire and forget write event */ +class TestWriteEvent : public WriteEvent, public Runnable, private Repost { + public: + TestWriteEvent(int fd=-1) : + WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)), + Repost(nMessages), + value(0) + {} + + void run() { + CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize()); + results.countWrite(value++); + repost(this); + } + + private: + int value; +}; + +/** Fire-and-forget Accept event, posts reads on the accepted connection. */ +class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost { + public: + TestAcceptEvent(int fd=-1) : + AcceptEvent(fd, SafeCallback(*this)), + Repost(nConnections) + {} + + void run() { + threads->postEvent(new TestReadEvent(getAcceptedDesscriptor())); + results.countEvent(); + repost(this); + } +}; + +class EventChannelThreadsTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(EventChannelThreadsTest); + CPPUNIT_TEST(testThreads); + CPPUNIT_TEST_SUITE_END(); + + public: + + void setUp() { + threads = EventChannelThreads::create(EventChannel::create()); + } + + void tearDown() { + threads.reset(); + } + + void testThreads() + { + Socket listener = Socket::createTcp(); + int port = listener.listen(); + + // Post looping accept events, will repost nConnections times. + // The accept event will automatically post read events. + threads->postEvent(new TestAcceptEvent(listener.fd())); + + // Make connections. + Socket connections[nConnections]; + for (int i = 0; i < nConnections; ++i) { + connections[i] = Socket::createTcp(); + connections[i].connect("localhost", port); + } + + // Post looping write events. + for (int i = 0; i < nConnections; ++i) { + threads->postEvent(new TestWriteEvent(connections[i].fd())); + } + + // Wait for all events to be dispatched. + results.wait(); + + if (!results.exception.empty()) CPPUNIT_FAIL(results.exception); + CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining)); + + // Expect a read and write for each messageNo from each connection. + for (int messageNo = 0; messageNo < nMessages; ++messageNo) { + CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo])); + CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo])); + } + + threads->shutdown(); + threads->join(); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest); + |