diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp | 674 |
1 files changed, 674 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp new file mode 100644 index 0000000000..9ad05c71a3 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -0,0 +1,674 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/AtomicCount.h" +#include "qpid/sys/DeletionManager.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/log/Statement.h" + +#include <sys/epoll.h> +#include <errno.h> +#include <signal.h> + +#include <assert.h> +#include <queue> +#include <set> +#include <exception> + +namespace qpid { +namespace sys { + +// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used +DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager; + +// Instantiate (and define) class static for DeletionManager +template <> +DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0); + +class PollerHandlePrivate { + friend class Poller; + friend class PollerPrivate; + friend class PollerHandle; + + enum FDStat { + ABSENT, + MONITORED, + INACTIVE, + HUNGUP, + MONITORED_HUNGUP, + INTERRUPTED, + INTERRUPTED_HUNGUP, + DELETED + }; + + ::__uint32_t events; + const IOHandlePrivate* ioHandle; + PollerHandle* pollerHandle; + FDStat stat; + Mutex lock; + + PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : + events(0), + ioHandle(h), + pollerHandle(p), + stat(ABSENT) { + } + + int fd() const { + return toFd(ioHandle); + } + + bool isActive() const { + return stat == MONITORED || stat == MONITORED_HUNGUP; + } + + void setActive() { + stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP) + ? MONITORED_HUNGUP + : MONITORED; + } + + bool isInactive() const { + return stat == INACTIVE || stat == HUNGUP; + } + + void setInactive() { + stat = INACTIVE; + } + + bool isIdle() const { + return stat == ABSENT; + } + + void setIdle() { + stat = ABSENT; + } + + bool isHungup() const { + return + stat == MONITORED_HUNGUP || + stat == HUNGUP || + stat == INTERRUPTED_HUNGUP; + } + + void setHungup() { + assert(stat == MONITORED); + stat = HUNGUP; + } + + bool isInterrupted() const { + return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP; + } + + void setInterrupted() { + stat = (stat == MONITORED_HUNGUP || stat == HUNGUP) + ? INTERRUPTED_HUNGUP + : INTERRUPTED; + } + + bool isDeleted() const { + return stat == DELETED; + } + + void setDeleted() { + stat = DELETED; + } +}; + +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(h.impl, this)) +{} + +PollerHandle::~PollerHandle() { + { + ScopedLock<Mutex> l(impl->lock); + if (impl->isDeleted()) { + return; + } + impl->pollerHandle = 0; + if (impl->isInterrupted()) { + impl->setDeleted(); + return; + } + assert(impl->isIdle()); + impl->setDeleted(); + } + PollerHandleDeletionManager.markForDeletion(impl); +} + +class HandleSet +{ + Mutex lock; + std::set<PollerHandle*> handles; + public: + void add(PollerHandle*); + void remove(PollerHandle*); + void cleanup(); +}; + +void HandleSet::add(PollerHandle* h) +{ + ScopedLock<Mutex> l(lock); + handles.insert(h); +} +void HandleSet::remove(PollerHandle* h) +{ + ScopedLock<Mutex> l(lock); + handles.erase(h); +} +void HandleSet::cleanup() +{ + // Inform all registered handles of disconnection + std::set<PollerHandle*> copy; + handles.swap(copy); + for (std::set<PollerHandle*>::const_iterator i = copy.begin(); i != copy.end(); ++i) { + Poller::Event event(*i, Poller::DISCONNECTED); + event.process(); + } +} + +/** + * Concrete implementation of Poller to use the Linux specific epoll + * interface + */ +class PollerPrivate { + friend class Poller; + + static const int DefaultFds = 256; + + struct ReadablePipe { + int fds[2]; + + /** + * This encapsulates an always readable pipe which we can add + * to the epoll set to force epoll_wait to return + */ + ReadablePipe() { + QPID_POSIX_CHECK(::pipe(fds)); + // Just write the pipe's fds to the pipe + QPID_POSIX_CHECK(::write(fds[1], fds, 2)); + } + + ~ReadablePipe() { + ::close(fds[0]); + ::close(fds[1]); + } + + int getFD() { + return fds[0]; + } + }; + + static ReadablePipe alwaysReadable; + static int alwaysReadableFd; + + class InterruptHandle: public PollerHandle { + std::queue<PollerHandle*> handles; + + void processEvent(Poller::EventType) { + PollerHandle* handle = handles.front(); + handles.pop(); + assert(handle); + + // Synthesise event + Poller::Event event(handle, Poller::INTERRUPTED); + + // Process synthesised event + event.process(); + } + + public: + InterruptHandle() : + PollerHandle(DummyIOHandle) + {} + + void addHandle(PollerHandle& h) { + handles.push(&h); + } + + PollerHandle* getHandle() { + PollerHandle* handle = handles.front(); + handles.pop(); + return handle; + } + + bool queuedHandles() { + return handles.size() > 0; + } + }; + + const int epollFd; + bool isShutdown; + InterruptHandle interruptHandle; + HandleSet registeredHandles; + AtomicCount threadCount; + + static ::__uint32_t directionToEpollEvent(Poller::Direction dir) { + switch (dir) { + case Poller::INPUT: return ::EPOLLIN; + case Poller::OUTPUT: return ::EPOLLOUT; + case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT; + default: return 0; + } + } + + static Poller::EventType epollToDirection(::__uint32_t events) { + // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs + // can give you both! + events = (events & ::EPOLLHUP) ? events & ~::EPOLLOUT : events; + ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT); + switch (e) { + case ::EPOLLIN: return Poller::READABLE; + case ::EPOLLOUT: return Poller::WRITABLE; + case ::EPOLLIN | ::EPOLLOUT: return Poller::READ_WRITABLE; + default: + return (events & (::EPOLLHUP | ::EPOLLERR)) ? + Poller::DISCONNECTED : Poller::INVALID; + } + } + + PollerPrivate() : + epollFd(::epoll_create(DefaultFds)), + isShutdown(false) { + QPID_POSIX_CHECK(epollFd); + // Add always readable fd into our set (but not listening to it yet) + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 1; + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe)); + } + + ~PollerPrivate() { + // It's probably okay to ignore any errors here as there can't be data loss + ::close(epollFd); + + // Need to put the interruptHandle in idle state to delete it + static_cast<PollerHandle&>(interruptHandle).impl->setIdle(); + } + + void resetMode(PollerHandlePrivate& handle); + + void interrupt() { + ::epoll_event epe; + // Use EPOLLONESHOT so we only wake a single thread + epe.events = ::EPOLLIN | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle); + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); + } + + void interruptAll() { + ::epoll_event epe; + // Not EPOLLONESHOT, so we eventually get all threads + epe.events = ::EPOLLIN; + epe.data.u64 = 2; // Keep valgrind happy + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe)); + } +}; + +PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; +int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD(); + +void Poller::registerHandle(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(eh.isIdle()); + + ::epoll_event epe; + epe.events = ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + + impl->registeredHandles.add(&handle); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe)); + + eh.setActive(); +} + +void Poller::unregisterHandle(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + impl->registeredHandles.remove(&handle); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0); + // Ignore EBADF since deleting a nonexistent fd has the overall required result! + // And allows the case where a sloppy program closes the fd and then does the delFd() + if (rc == -1 && errno != EBADF) { + QPID_POSIX_CHECK(rc); + } + + eh.setIdle(); +} + +void PollerPrivate::resetMode(PollerHandlePrivate& eh) { + PollerHandle* ph; + { + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isActive()); + + if (eh.isIdle() || eh.isDeleted()) { + return; + } + + if (eh.events==0) { + eh.setActive(); + return; + } + + if (!eh.isInterrupted()) { + ::epoll_event epe; + epe.events = eh.events | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); + + eh.setActive(); + return; + } + ph = eh.pollerHandle; + } + + PollerHandlePrivate& ihp = *static_cast<PollerHandle&>(interruptHandle).impl; + ScopedLock<Mutex> l(ihp.lock); + interruptHandle.addHandle(*ph); + ihp.setActive(); + interrupt(); +} + +void Poller::monitorHandle(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + ::__uint32_t oldEvents = eh.events; + eh.events |= PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + + ::epoll_event epe; + epe.events = eh.events | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); +} + +void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + ::__uint32_t oldEvents = eh.events; + eh.events &= ~PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + + ::epoll_event epe; + epe.events = eh.events | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); +} + +void Poller::shutdown() { + // NB: this function must be async-signal safe, it must not + // call any function that is not async-signal safe. + + // Allow sloppy code to shut us down more than once + if (impl->isShutdown) + return; + + // Don't use any locking here - isShutdown will be visible to all + // after the epoll_ctl() anyway (it's a memory barrier) + impl->isShutdown = true; + + impl->interruptAll(); +} + +bool Poller::interrupt(PollerHandle& handle) { + { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + if (eh.isIdle() || eh.isDeleted()) { + return false; + } + + if (eh.isInterrupted()) { + return true; + } + + // Stop monitoring handle for read or write + ::epoll_event epe; + epe.events = 0; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); + + if (eh.isInactive()) { + eh.setInterrupted(); + return true; + } + eh.setInterrupted(); + } + + PollerPrivate::InterruptHandle& ih = impl->interruptHandle; + PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl; + ScopedLock<Mutex> l(eh.lock); + ih.addHandle(handle); + + impl->interrupt(); + eh.setActive(); + return true; +} + +void Poller::run() { + // Ensure that we exit thread responsibly under all circumstances + try { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + ++(impl->threadCount); + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + PollerHandleDeletionManager.destroyThreadState(); + //last thread to respond to shutdown cleans up: + if (--(impl->threadCount) == 0) impl->registeredHandles.cleanup(); + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); + } catch (const std::exception& e) { + QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what()); + } + PollerHandleDeletionManager.destroyThreadState(); + --(impl->threadCount); +} + +bool Poller::hasShutdown() +{ + return impl->isShutdown; +} + +Poller::Event Poller::wait(Duration timeout) { + static __thread PollerHandlePrivate* lastReturnedHandle = 0; + epoll_event epe; + int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC; + AbsTime targetTimeout = + (timeout == TIME_INFINITE) ? + FAR_FUTURE : + AbsTime(now(), timeout); + + if (lastReturnedHandle) { + impl->resetMode(*lastReturnedHandle); + lastReturnedHandle = 0; + } + + // Repeat until we weren't interrupted by signal + do { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs); + if (rc ==-1 && errno != EINTR) { + QPID_POSIX_CHECK(rc); + } else if (rc > 0) { + assert(rc == 1); + void* dataPtr = epe.data.ptr; + + // Check if this is an interrupt + PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle; + if (dataPtr == &interruptHandle) { + // If we are shutting down we need to rearm the shutdown interrupt to + // ensure everyone still sees it. It's okay that this might be overridden + // below as we will be back here if it is. + if (impl->isShutdown) { + impl->interruptAll(); + } + PollerHandle* wrappedHandle = 0; + { + ScopedLock<Mutex> l(interruptHandle.impl->lock); + if (interruptHandle.impl->isActive()) { + wrappedHandle = interruptHandle.getHandle(); + // If there is an interrupt queued behind this one we need to arm it + // We do it this way so that another thread can pick it up + if (interruptHandle.queuedHandles()) { + impl->interrupt(); + interruptHandle.impl->setActive(); + } else { + interruptHandle.impl->setInactive(); + } + } + } + if (wrappedHandle) { + PollerHandlePrivate& eh = *wrappedHandle->impl; + { + ScopedLock<Mutex> l(eh.lock); + if (!eh.isDeleted()) { + if (!eh.isIdle()) { + eh.setInactive(); + } + lastReturnedHandle = &eh; + assert(eh.pollerHandle == wrappedHandle); + return Event(wrappedHandle, INTERRUPTED); + } + } + PollerHandleDeletionManager.markForDeletion(&eh); + } + continue; + } + + // Check for shutdown + if (impl->isShutdown) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, SHUTDOWN); + } + + PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr); + ScopedLock<Mutex> l(eh.lock); + + // the handle could have gone inactive since we left the epoll_wait + if (eh.isActive()) { + PollerHandle* handle = eh.pollerHandle; + assert(handle); + + // If the connection has been hungup we could still be readable + // (just not writable), allow us to readable until we get here again + if (epe.events & ::EPOLLHUP) { + if (eh.isHungup()) { + eh.setInactive(); + // Don't set up last Handle so that we don't reset this handle + // on re-entering Poller::wait. This means that we will never + // be set active again once we've returned disconnected, and so + // can never be returned again. + return Event(handle, DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); + } + lastReturnedHandle = &eh; + return Event(handle, PollerPrivate::epollToDirection(epe.events)); + } + } + // We only get here if one of the following: + // * epoll_wait was interrupted by a signal + // * epoll_wait timed out + // * the state of the handle changed after being returned by epoll_wait + // + // The only things we can do here are return a timeout or wait more. + // Obviously if we timed out we return timeout; if the wait was meant to + // be indefinite then we should never return with a time out so we go again. + // If the wait wasn't indefinite, we check whether we are after the target wait + // time or not + if (timeoutMs == -1) { + continue; + } + if (rc == 0 && now() > targetTimeout) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, TIMEOUT); + } + } while (true); +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + +}} |