summaryrefslogtreecommitdiff
path: root/cpp/lib/common/sys/posix/EventChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/common/sys/posix/EventChannel.cpp')
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.cpp38
1 files changed, 9 insertions, 29 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
index 79bf8030cb..66350bca83 100644
--- a/cpp/lib/common/sys/posix/EventChannel.cpp
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -151,8 +151,7 @@ class EventChannel::Descriptor : private boost::noncopyable {
/**
- * Holds the epoll fd, Descriptor map and dispatch queue.
- * Most of the epoll work is done by the Descriptors.
+ * Holds a map of Descriptors, which do most of the work.
*/
class EventChannel::Impl {
public:
@@ -168,8 +167,6 @@ class EventChannel::Impl {
/** Wait for an event, return 0 on timeout */
Event* wait(Time timeout);
- Queue& getDispatchQueue() { return *dispatchQueue; }
-
void shutdown();
private:
@@ -179,9 +176,8 @@ class EventChannel::Impl {
Monitor monitor;
int epollFd;
DescriptorMap descriptors;
- int pipe[2];
+ int shutdownPipe[2];
AtomicCount nWaiters;
- Queue* dispatchQueue;
bool isShutdown;
};
@@ -336,7 +332,7 @@ Event* EventChannel::Descriptor::wake(uint32_t epollEvents) {
EventChannel::Impl::Impl(int epollSize):
- epollFd(-1), dispatchQueue(0), isShutdown(false)
+ epollFd(-1), isShutdown(false)
{
// Create the epoll file descriptor.
epollFd = epoll_create(epollSize);
@@ -345,17 +341,16 @@ EventChannel::Impl::Impl(int epollSize):
// Create a pipe and write a single byte. The byte is never
// read so the pipes read fd is always ready for read.
// We activate the FD when there are messages in the queue.
- QPID_POSIX_CHECK(::pipe(pipe));
+ QPID_POSIX_CHECK(::pipe(shutdownPipe));
static char zero = '\0';
- QPID_POSIX_CHECK(::write(pipe[1], &zero, 1));
- dispatchQueue = &getDescriptor(pipe[0]).getQueue(IN);
+ QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1));
}
EventChannel::Impl::~Impl() {
shutdown();
::close(epollFd);
- ::close(pipe[0]);
- ::close(pipe[1]);
+ ::close(shutdownPipe[0]);
+ ::close(shutdownPipe[1]);
}
@@ -369,18 +364,11 @@ void EventChannel::Impl::shutdown() {
// TODO aconway 2006-12-20: If I just close the epollFd will
// that wake all threads? If so with what? Would be simpler than:
- // Create a pipe and write a single byte. The byte is never
- // read so the pipes read fd is always ready for read.
- // Since we use level-triggered epoll this will wake up all
- // wait() threads.
- //
- QPID_POSIX_CHECK(::pipe(pipe));
- static char zero = '\0';
- QPID_POSIX_CHECK(::write(pipe[1], &zero, 1));
CleanStruct<epoll_event> ee;
ee.data.ptr = 0;
ee.events = EPOLLIN;
- QPID_POSIX_CHECK(epoll_ctl(epollFd, EPOLL_CTL_ADD, pipe[0], &ee));
+ QPID_POSIX_CHECK(
+ epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee));
}
// Wait for nWaiters to get out.
while (nWaiters > 0) {
@@ -455,8 +443,6 @@ Event* EventChannel::Impl::wait(Time timeoutNs)
if (ed == 0) // We're being shut-down.
throw ShutdownException();
assert(ed != 0);
- // TODO aconway 2006-12-20: DEBUG
- cout << endl << epoll(ee.events) << endl;
event = ed->wake(ee.events);
}
return event;
@@ -591,10 +577,4 @@ void AcceptEvent::complete(EventChannel::Descriptor& ed)
}
}
-void DispatchEvent::prepare(EventChannel::Impl& impl) {
- impl.getDispatchQueue().push(this);
-}
-
-void DispatchEvent::complete(EventChannel::Descriptor&) {}
-
}}