diff options
Diffstat (limited to 'cpp/src/qpid/sys/epoll/EpollPoller.cpp')
-rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 178 |
1 files changed, 129 insertions, 49 deletions
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 42b5d8b1aa..d0623b86f4 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -46,6 +46,7 @@ DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerH class PollerHandlePrivate { friend class Poller; + friend class PollerPrivate; friend class PollerHandle; enum FDStat { @@ -55,6 +56,7 @@ class PollerHandlePrivate { HUNGUP, MONITORED_HUNGUP, INTERRUPTED, + INTERRUPTED_HUNGUP, DELETED }; @@ -76,7 +78,9 @@ class PollerHandlePrivate { } void setActive() { - stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED; + stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP) + ? MONITORED_HUNGUP + : MONITORED; } bool isInactive() const { @@ -96,7 +100,10 @@ class PollerHandlePrivate { } bool isHungup() const { - return stat == MONITORED_HUNGUP || stat == HUNGUP; + return + stat == MONITORED_HUNGUP || + stat == HUNGUP || + stat == INTERRUPTED_HUNGUP; } void setHungup() { @@ -105,11 +112,13 @@ class PollerHandlePrivate { } bool isInterrupted() const { - return stat == INTERRUPTED; + return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP; } void setInterrupted() { - stat = INTERRUPTED; + stat = (stat == MONITORED_HUNGUP || stat == HUNGUP) + ? INTERRUPTED_HUNGUP + : INTERRUPTED; } bool isDeleted() const { @@ -131,13 +140,13 @@ PollerHandle::~PollerHandle() { if (impl->isDeleted()) { return; } + impl->pollerHandle = 0; if (impl->isInterrupted()) { impl->setDeleted(); return; } - if (impl->isActive()) { - impl->setDeleted(); - } + assert(impl->isIdle()); + impl->setDeleted(); } PollerHandleDeletionManager.markForDeletion(impl); } @@ -256,8 +265,13 @@ class PollerPrivate { ~PollerPrivate() { // It's probably okay to ignore any errors here as there can't be data loss ::close(epollFd); + + // Need to put the interruptHandle in idle state to delete it + static_cast<PollerHandle&>(interruptHandle).impl->setIdle(); } - + + void resetMode(PollerHandlePrivate& handle); + void interrupt() { ::epoll_event epe; // Use EPOLLONESHOT so we only wake a single thread @@ -279,74 +293,122 @@ class PollerPrivate { PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD(); -void Poller::addFd(PollerHandle& handle, Direction dir) { +void Poller::registerHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - ::epoll_event epe; - int op; + assert(eh.isIdle()); - if (eh.isIdle()) { - op = EPOLL_CTL_ADD; - epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; - } else { - assert(eh.isActive()); - op = EPOLL_CTL_MOD; - epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); - } + ::epoll_event epe; + epe.events = ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe)); - // Record monitoring state of this fd - eh.events = epe.events; eh.setActive(); } -void Poller::delFd(PollerHandle& handle) { +void Poller::unregisterHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0); // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { QPID_POSIX_CHECK(rc); } + eh.setIdle(); } -// modFd is equivalent to delFd followed by addFd -void Poller::modFd(PollerHandle& handle, Direction dir) { +void PollerPrivate::resetMode(PollerHandlePrivate& eh) { + PollerHandle* ph; + { + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isActive()); + + if (eh.isIdle() || eh.isDeleted()) { + return; + } + + if (eh.events==0) { + eh.setActive(); + return; + } + + if (!eh.isInterrupted()) { + ::epoll_event epe; + epe.events = eh.events | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + + eh.setActive(); + return; + } + ph = eh.pollerHandle; + } + + PollerHandlePrivate& ihp = *static_cast<PollerHandle&>(interruptHandle).impl; + ScopedLock<Mutex> l(ihp.lock); + interruptHandle.addHandle(*ph); + ihp.setActive(); + interrupt(); +} + +void Poller::monitorHandle(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); + ::__uint32_t oldEvents = eh.events; + eh.events |= PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + ::epoll_event epe; - epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; + epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - - // Record monitoring state of this fd - eh.events = epe.events; - eh.setActive(); } -void Poller::rearmFd(PollerHandle& handle) { +void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - assert(eh.isInactive()); + assert(!eh.isIdle()); + ::__uint32_t oldEvents = eh.events; + eh.events &= ~PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + ::epoll_event epe; - epe.events = eh.events; + epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - - eh.setActive(); } void Poller::shutdown() { @@ -368,14 +430,25 @@ bool Poller::interrupt(PollerHandle& handle) { { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - if (!eh.isActive()) { + if (eh.isIdle() || eh.isDeleted()) { return false; } + + if (eh.isInterrupted()) { + return true; + } + + // Stop monitoring handle for read or write ::epoll_event epe; epe.events = 0; epe.data.u64 = 0; // Keep valgrind happy QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - eh.setInterrupted(); + + if (eh.isInactive()) { + eh.setInterrupted(); + return true; + } + eh.setInterrupted(); } PollerPrivate::InterruptHandle& ih = impl->interruptHandle; @@ -414,6 +487,7 @@ void Poller::run() { } Poller::Event Poller::wait(Duration timeout) { + static __thread PollerHandlePrivate* lastReturnedHandle = 0; epoll_event epe; int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC; AbsTime targetTimeout = @@ -421,6 +495,11 @@ Poller::Event Poller::wait(Duration timeout) { FAR_FUTURE : AbsTime(now(), timeout); + if (lastReturnedHandle) { + impl->resetMode(*lastReturnedHandle); + lastReturnedHandle = 0; + } + // Repeat until we weren't interrupted by signal do { PollerHandleDeletionManager.markAllUnusedInThisThread(); @@ -460,12 +539,19 @@ Poller::Event Poller::wait(Duration timeout) { } } if (wrappedHandle) { - ScopedLock<Mutex> l(wrappedHandle->impl->lock); - if (!wrappedHandle->impl->isDeleted()) { - wrappedHandle->impl->setInactive(); + PollerHandlePrivate& eh = *wrappedHandle->impl; + { + ScopedLock<Mutex> l(eh.lock); + if (!eh.isDeleted()) { + if (!eh.isIdle()) { + eh.setInactive(); + } + lastReturnedHandle = &eh; + assert(eh.pollerHandle == wrappedHandle); return Event(wrappedHandle, INTERRUPTED); } - PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl); + } + PollerHandleDeletionManager.markForDeletion(&eh); } continue; } @@ -482,6 +568,7 @@ Poller::Event Poller::wait(Duration timeout) { // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { PollerHandle* handle = eh.pollerHandle; + assert(handle); // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again @@ -493,15 +580,8 @@ Poller::Event Poller::wait(Duration timeout) { } else { eh.setInactive(); } + lastReturnedHandle = &eh; return Event(handle, PollerPrivate::epollToDirection(epe.events)); - } else if (eh.isDeleted()) { - // The handle has been deleted whilst still active and so must be removed - // from the poller - int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0); - // Ignore EBADF since it's quite likely that we could race with closing the fd - if (rc == -1 && errno != EBADF) { - QPID_POSIX_CHECK(rc); - } } } // We only get here if one of the following: |