diff options
Diffstat (limited to 'cpp/lib/common/sys/posix/EventChannel.cpp')
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannel.cpp | 176 |
1 files changed, 124 insertions, 52 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp index ee6b6bbe2b..37e3c2257a 100644 --- a/cpp/lib/common/sys/posix/EventChannel.cpp +++ b/cpp/lib/common/sys/posix/EventChannel.cpp @@ -43,6 +43,7 @@ #include "check.h" #include "EventChannel.h" +#include "sys/AtomicCount.h" using namespace std; @@ -57,7 +58,17 @@ namespace sys { namespace { typedef enum { IN, OUT } Direction; + typedef std::pair<Event*, Event*> EventPair; + +/** + * Template to zero out a C-struct on construction. Avoids uninitialized memory + * warnings from valgrind or other mem checking tool. + */ +template <class T> struct CleanStruct : public T { + CleanStruct() { memset(this, 0, sizeof(*this)); } +}; + } // namespace /** @@ -153,25 +164,29 @@ class EventChannel::Impl { Queue& getDispatchQueue() { return *dispatchQueue; } + void shutdown(); + private: typedef boost::ptr_map<int, Descriptor> DescriptorMap; - Mutex lock; + Monitor monitor; int epollFd; DescriptorMap descriptors; int pipe[2]; + AtomicCount nWaiters; Queue* dispatchQueue; + bool isShutdown; }; - // ================================================================ // EventChannel::Queue::implementation. static const char* shutdownMsg = "Event queue shut down."; -EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d), +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : + lock(d.lock), descriptor(d), myEvent(dir==IN ? EPOLLIN : EPOLLOUT) {} @@ -193,11 +208,14 @@ void EventChannel::Queue::setBit(uint32_t &epollFlags) { Event* EventChannel::Queue::wake(uint32_t epollFlags) { // Called with lock held. if (!queue.empty() && (isMyEvent(epollFlags))) { - Event* e = queue.front()->complete(descriptor); - if (e) { - queue.pop_front(); - return e; - } + assert(!queue.empty()); + Event* e = queue.front(); + assert(e); + // TODO aconway 2006-12-20: Can/should we move event completion + // out into dispatch() so it doesn't happen in Descriptor locks? + e->complete(descriptor); + queue.pop_front(); + return e; } return 0; } @@ -250,9 +268,7 @@ void EventChannel::Descriptor::update() { void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { // Caller holds lock - assert(!isShutdown()); - struct epoll_event ee; - memset(&ee, 0, sizeof(ee)); + CleanStruct<epoll_event> ee; ee.data.ptr = this; ee.events = events; int status = ::epoll_ctl(epollFd, op, myFd, &ee); @@ -263,7 +279,6 @@ void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { Mutex::ScopedLock l(lock); - cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl; // If we have an error: if (epollEvents & (EPOLLERR | EPOLLHUP)) { shutdownUnsafe(); @@ -282,7 +297,7 @@ EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { EventChannel::Impl::Impl(int epollSize): - epollFd(-1), dispatchQueue(0) + epollFd(-1), dispatchQueue(0), isShutdown(false) { // Create the epoll file descriptor. epollFd = epoll_create(epollSize); @@ -298,22 +313,90 @@ EventChannel::Impl::Impl(int epollSize): } EventChannel::Impl::~Impl() { - close(epollFd); - close(pipe[0]); - close(pipe[1]); + shutdown(); + ::close(epollFd); + ::close(pipe[0]); + ::close(pipe[1]); } +void EventChannel::Impl::shutdown() { + Monitor::ScopedLock l(monitor); + if (!isShutdown) { // I'm starting shutdown. + isShutdown = true; + if (nWaiters == 0) + return; + + // TODO aconway 2006-12-20: If I just close the epollFd will + // that wake all threads? If so with what? Would be simpler than: + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // Since we use level-triggered epoll this will wake up all + // wait() threads. + // + QPID_POSIX_CHECK(::pipe(pipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(pipe[1], &zero, 1)); + CleanStruct<epoll_event> ee; + ee.data.ptr = 0; + ee.events = EPOLLIN; + QPID_POSIX_CHECK(epoll_ctl(epollFd, EPOLL_CTL_ADD, pipe[0], &ee)); + } + // Wait for nWaiters to get out. + while (nWaiters > 0) { + monitor.wait(); + } +} + +// TODO aconway 2006-12-20: DEBUG remove +struct epoll { + epoll(uint32_t e) : events(e) { } + uint32_t events; +}; + +#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "") +ostream& operator << (ostream& out, epoll e) { + out << "epoll_event.events: "; + BIT(EPOLLIN); + BIT(EPOLLPRI); + BIT(EPOLLOUT); + BIT(EPOLLRDNORM); + BIT(EPOLLRDBAND); + BIT(EPOLLWRNORM); + BIT(EPOLLWRBAND); + BIT(EPOLLMSG); + BIT(EPOLLERR); + BIT(EPOLLHUP); + BIT(EPOLLONESHOT); + BIT(EPOLLET); + return out; +} + + + /** * Wait for epoll to wake up, return the descriptor or 0 on timeout. */ Event* EventChannel::Impl::wait(Time timeoutNs) { + { + Monitor::ScopedLock l(monitor); + if (isShutdown) + throw ShutdownException(); + } + + // Increase nWaiters for the duration, notify the monitor if I'm + // the last one out. + // + AtomicCount::ScopedIncrement si( + nWaiters, boost::bind(&Monitor::notifyAll, &monitor)); + // No lock, all thread safe calls or local variables: // const long timeoutMs = (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; - struct epoll_event ee; + CleanStruct<epoll_event> ee; Event* event = 0; bool doSwap = true; @@ -324,14 +407,18 @@ Event* EventChannel::Impl::wait(Time timeoutNs) int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. if (n == 0) // Timeout return 0; - if (n < 0 && errno != EINTR) // Interrupt, ignore it. + if (n < 0 && errno == EINTR) // Interrupt, ignore it. continue; if (n < 0) throw QPID_POSIX_ERROR(errno); assert(n == 1); Descriptor* ed = reinterpret_cast<Descriptor*>(ee.data.ptr); - assert(ed); + if (ed == 0) // We're being shut-down. + throw ShutdownException(); + assert(ed != 0); + // TODO aconway 2006-12-20: DEBUG + cout << endl << epoll(ee.events) << endl; EventPair ready = ed->wake(ee.events); // We can only return one event so if both completed push one @@ -352,7 +439,7 @@ Event* EventChannel::Impl::wait(Time timeoutNs) } EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(monitor); Descriptor& ed = descriptors[fd]; ed.activate(epollFd, fd); return ed; @@ -385,6 +472,10 @@ Event* EventChannel::wait(Time timeoutNs) return impl->wait(timeoutNs); } +void EventChannel::shutdown() { + impl->shutdown(); +} + // ================================================================ // Event and subclasses. @@ -430,31 +521,18 @@ void ReadEvent::prepare(EventChannel::Impl& impl) { impl.getDescriptor(descriptor).getQueue(IN).push(this); } -Event* ReadEvent::complete(EventChannel::Descriptor& ed) +void ReadEvent::complete(EventChannel::Descriptor& ed) { ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + bytesRead, size - bytesRead); - - if (n < 0 && errno != EAGAIN) { // Error - setException(QPID_POSIX_ERROR(errno)); - ed.shutdownUnsafe(); // Called with lock held. - } - else if (n == 0) { // End of file - // TODO aconway 2006-12-13: Don't treat EOF as exception - // unless we're partway thru a !noWait read. - setException(QPID_POSIX_ERROR(ENODATA)); - ed.shutdownUnsafe(); // Called with lock held. + if (n > 0) + bytesRead += n; + if (n == 0 || (n < 0 && errno != EAGAIN)) { + // Use ENODATA for file closed. + setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno)); + ed.shutdownUnsafe(); } - else { - if (n > 0) // possible that n < 0 && errno == EAGAIN - bytesRead += n; - if (bytesRead < size && !noWait) { - // Continue reading, not enough data. - return 0; - } - } - return this; } @@ -463,42 +541,36 @@ void WriteEvent::prepare(EventChannel::Impl& impl) { } -Event* WriteEvent::complete(EventChannel::Descriptor& ed) +void WriteEvent::complete(EventChannel::Descriptor& ed) { ssize_t n = ::write(descriptor, static_cast<const char*>(buffer) + bytesWritten, size - bytesWritten); - if(n < 0 && errno == EAGAIN && noWait) { - return 0; - } - if (n < 0 || (bytesWritten += n) < size) { + if (n > 0) + bytesWritten += n; + if(n < 0 && errno != EAGAIN) { setException(QPID_POSIX_ERROR(errno)); ed.shutdownUnsafe(); // Called with lock held. } - return this; } void AcceptEvent::prepare(EventChannel::Impl& impl) { impl.getDescriptor(descriptor).getQueue(IN).push(this); } -Event* AcceptEvent::complete(EventChannel::Descriptor& ed) +void AcceptEvent::complete(EventChannel::Descriptor& ed) { accepted = ::accept(descriptor, 0, 0); if (accepted < 0) { setException(QPID_POSIX_ERROR(errno)); ed.shutdownUnsafe(); // Called with lock held. } - return this; } void DispatchEvent::prepare(EventChannel::Impl& impl) { impl.getDispatchQueue().push(this); } -Event* DispatchEvent::complete(EventChannel::Descriptor&) -{ - return this; -} +void DispatchEvent::complete(EventChannel::Descriptor&) {} }} |