From b7796ee265b7835e2c835b8374c9b2f06fc39471 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 21 Dec 2006 02:22:12 +0000 Subject: Simplify EventChannel.cpp: remove unnecessary DispatchEvent feature. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489249 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/lib/common/sys/posix/EventChannel.cpp | 38 ++++++++----------------------- cpp/lib/common/sys/posix/EventChannel.h | 17 ++------------ cpp/tests/EventChannelTest.cpp | 13 ----------- 3 files changed, 11 insertions(+), 57 deletions(-) diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp index 79bf8030cb..66350bca83 100644 --- a/cpp/lib/common/sys/posix/EventChannel.cpp +++ b/cpp/lib/common/sys/posix/EventChannel.cpp @@ -151,8 +151,7 @@ class EventChannel::Descriptor : private boost::noncopyable { /** - * Holds the epoll fd, Descriptor map and dispatch queue. - * Most of the epoll work is done by the Descriptors. + * Holds a map of Descriptors, which do most of the work. */ class EventChannel::Impl { public: @@ -168,8 +167,6 @@ class EventChannel::Impl { /** Wait for an event, return 0 on timeout */ Event* wait(Time timeout); - Queue& getDispatchQueue() { return *dispatchQueue; } - void shutdown(); private: @@ -179,9 +176,8 @@ class EventChannel::Impl { Monitor monitor; int epollFd; DescriptorMap descriptors; - int pipe[2]; + int shutdownPipe[2]; AtomicCount nWaiters; - Queue* dispatchQueue; bool isShutdown; }; @@ -336,7 +332,7 @@ Event* EventChannel::Descriptor::wake(uint32_t epollEvents) { EventChannel::Impl::Impl(int epollSize): - epollFd(-1), dispatchQueue(0), isShutdown(false) + epollFd(-1), isShutdown(false) { // Create the epoll file descriptor. epollFd = epoll_create(epollSize); @@ -345,17 +341,16 @@ EventChannel::Impl::Impl(int epollSize): // Create a pipe and write a single byte. The byte is never // read so the pipes read fd is always ready for read. // We activate the FD when there are messages in the queue. - QPID_POSIX_CHECK(::pipe(pipe)); + QPID_POSIX_CHECK(::pipe(shutdownPipe)); static char zero = '\0'; - QPID_POSIX_CHECK(::write(pipe[1], &zero, 1)); - dispatchQueue = &getDescriptor(pipe[0]).getQueue(IN); + QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1)); } EventChannel::Impl::~Impl() { shutdown(); ::close(epollFd); - ::close(pipe[0]); - ::close(pipe[1]); + ::close(shutdownPipe[0]); + ::close(shutdownPipe[1]); } @@ -369,18 +364,11 @@ void EventChannel::Impl::shutdown() { // TODO aconway 2006-12-20: If I just close the epollFd will // that wake all threads? If so with what? Would be simpler than: - // Create a pipe and write a single byte. The byte is never - // read so the pipes read fd is always ready for read. - // Since we use level-triggered epoll this will wake up all - // wait() threads. - // - QPID_POSIX_CHECK(::pipe(pipe)); - static char zero = '\0'; - QPID_POSIX_CHECK(::write(pipe[1], &zero, 1)); CleanStruct ee; ee.data.ptr = 0; ee.events = EPOLLIN; - QPID_POSIX_CHECK(epoll_ctl(epollFd, EPOLL_CTL_ADD, pipe[0], &ee)); + QPID_POSIX_CHECK( + epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee)); } // Wait for nWaiters to get out. while (nWaiters > 0) { @@ -455,8 +443,6 @@ Event* EventChannel::Impl::wait(Time timeoutNs) if (ed == 0) // We're being shut-down. throw ShutdownException(); assert(ed != 0); - // TODO aconway 2006-12-20: DEBUG - cout << endl << epoll(ee.events) << endl; event = ed->wake(ee.events); } return event; @@ -591,10 +577,4 @@ void AcceptEvent::complete(EventChannel::Descriptor& ed) } } -void DispatchEvent::prepare(EventChannel::Impl& impl) { - impl.getDispatchQueue().push(this); -} - -void DispatchEvent::complete(EventChannel::Descriptor&) {} - }} diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h index 0181a9c780..0a28a80add 100644 --- a/cpp/lib/common/sys/posix/EventChannel.h +++ b/cpp/lib/common/sys/posix/EventChannel.h @@ -125,20 +125,7 @@ class Event friend class EventChannel::Queue; }; -/** - * An event that does not wait for anything, it is processed - * immediately by one of the channel threads. - */ -class DispatchEvent : public Event { - public: - DispatchEvent(Callback cb=0) : Event(cb) {} - - protected: - void prepare(EventChannel::Impl&); - void complete(EventChannel::Descriptor&); -}; - -// Utility base class. +/** Base class for events related to a file descriptor */ class FDEvent : public Event { public: int getDescriptor() const { return descriptor; } @@ -149,7 +136,7 @@ class FDEvent : public Event { int descriptor; }; -// Utility base class +/** Base class for read or write events. */ class IOEvent : public FDEvent { public: size_t getSize() const { return size; } diff --git a/cpp/tests/EventChannelTest.cpp b/cpp/tests/EventChannelTest.cpp index 46eb7d231b..8be9262c70 100644 --- a/cpp/tests/EventChannelTest.cpp +++ b/cpp/tests/EventChannelTest.cpp @@ -47,7 +47,6 @@ struct RunMe : public Runnable class EventChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(EventChannelTest); - CPPUNIT_TEST(testDispatch); CPPUNIT_TEST(testRead); CPPUNIT_TEST(testPartialRead); CPPUNIT_TEST(testFailedRead); @@ -86,18 +85,6 @@ class EventChannelTest : public CppUnit::TestCase return &event == next; } - void testDispatch() - { - RunMe runMe; - CPPUNIT_ASSERT(!runMe.ran); - // Instances of Event just pass thru the channel immediately. - DispatchEvent e(runMe.functor()); - ec->post(e); - CPPUNIT_ASSERT(isNextEventOk(e)); - e.dispatch(); - CPPUNIT_ASSERT(runMe.ran); - } - void testRead() { ReadEvent re(pipe[0], readBuf, size); ec->post(re); -- cgit v1.2.1