summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp')
-rw-r--r--trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp634
1 files changed, 0 insertions, 634 deletions
diff --git a/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
deleted file mode 100644
index d7f64f3b4c..0000000000
--- a/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ /dev/null
@@ -1,634 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/IOHandle.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/DeletionManager.h"
-#include "qpid/sys/posix/check.h"
-#include "qpid/sys/posix/PrivatePosix.h"
-#include "qpid/log/Statement.h"
-
-#include <sys/epoll.h>
-#include <errno.h>
-#include <signal.h>
-
-#include <assert.h>
-#include <queue>
-#include <exception>
-
-namespace qpid {
-namespace sys {
-
-// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used
-DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager;
-
-// Instantiate (and define) class static for DeletionManager
-template <>
-DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0);
-
-class PollerHandlePrivate {
- friend class Poller;
- friend class PollerPrivate;
- friend class PollerHandle;
-
- enum FDStat {
- ABSENT,
- MONITORED,
- INACTIVE,
- HUNGUP,
- MONITORED_HUNGUP,
- INTERRUPTED,
- INTERRUPTED_HUNGUP,
- DELETED
- };
-
- ::__uint32_t events;
- const IOHandlePrivate* ioHandle;
- PollerHandle* pollerHandle;
- FDStat stat;
- Mutex lock;
-
- PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
- events(0),
- ioHandle(h),
- pollerHandle(p),
- stat(ABSENT) {
- }
-
- int fd() const {
- return toFd(ioHandle);
- }
-
- bool isActive() const {
- return stat == MONITORED || stat == MONITORED_HUNGUP;
- }
-
- void setActive() {
- stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP)
- ? MONITORED_HUNGUP
- : MONITORED;
- }
-
- bool isInactive() const {
- return stat == INACTIVE || stat == HUNGUP;
- }
-
- void setInactive() {
- stat = INACTIVE;
- }
-
- bool isIdle() const {
- return stat == ABSENT;
- }
-
- void setIdle() {
- stat = ABSENT;
- }
-
- bool isHungup() const {
- return
- stat == MONITORED_HUNGUP ||
- stat == HUNGUP ||
- stat == INTERRUPTED_HUNGUP;
- }
-
- void setHungup() {
- assert(stat == MONITORED);
- stat = HUNGUP;
- }
-
- bool isInterrupted() const {
- return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP;
- }
-
- void setInterrupted() {
- stat = (stat == MONITORED_HUNGUP || stat == HUNGUP)
- ? INTERRUPTED_HUNGUP
- : INTERRUPTED;
- }
-
- bool isDeleted() const {
- return stat == DELETED;
- }
-
- void setDeleted() {
- stat = DELETED;
- }
-};
-
-PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(h.impl, this))
-{}
-
-PollerHandle::~PollerHandle() {
- {
- ScopedLock<Mutex> l(impl->lock);
- if (impl->isDeleted()) {
- return;
- }
- impl->pollerHandle = 0;
- if (impl->isInterrupted()) {
- impl->setDeleted();
- return;
- }
- assert(impl->isIdle());
- impl->setDeleted();
- }
- PollerHandleDeletionManager.markForDeletion(impl);
-}
-
-/**
- * Concrete implementation of Poller to use the Linux specific epoll
- * interface
- */
-class PollerPrivate {
- friend class Poller;
-
- static const int DefaultFds = 256;
-
- struct ReadablePipe {
- int fds[2];
-
- /**
- * This encapsulates an always readable pipe which we can add
- * to the epoll set to force epoll_wait to return
- */
- ReadablePipe() {
- QPID_POSIX_CHECK(::pipe(fds));
- // Just write the pipe's fds to the pipe
- QPID_POSIX_CHECK(::write(fds[1], fds, 2));
- }
-
- ~ReadablePipe() {
- ::close(fds[0]);
- ::close(fds[1]);
- }
-
- int getFD() {
- return fds[0];
- }
- };
-
- static ReadablePipe alwaysReadable;
- static int alwaysReadableFd;
-
- class InterruptHandle: public PollerHandle {
- std::queue<PollerHandle*> handles;
-
- void processEvent(Poller::EventType) {
- PollerHandle* handle = handles.front();
- handles.pop();
- assert(handle);
-
- // Synthesise event
- Poller::Event event(handle, Poller::INTERRUPTED);
-
- // Process synthesised event
- event.process();
- }
-
- public:
- InterruptHandle() :
- PollerHandle(DummyIOHandle)
- {}
-
- void addHandle(PollerHandle& h) {
- handles.push(&h);
- }
-
- PollerHandle* getHandle() {
- PollerHandle* handle = handles.front();
- handles.pop();
- return handle;
- }
-
- bool queuedHandles() {
- return handles.size() > 0;
- }
- };
-
- const int epollFd;
- bool isShutdown;
- InterruptHandle interruptHandle;
- ::sigset_t sigMask;
-
- static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
- switch (dir) {
- case Poller::INPUT: return ::EPOLLIN;
- case Poller::OUTPUT: return ::EPOLLOUT;
- case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT;
- default: return 0;
- }
- }
-
- static Poller::EventType epollToDirection(::__uint32_t events) {
- // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs
- // can give you both!
- events = (events & ::EPOLLHUP) ? events & ~::EPOLLOUT : events;
- ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT);
- switch (e) {
- case ::EPOLLIN: return Poller::READABLE;
- case ::EPOLLOUT: return Poller::WRITABLE;
- case ::EPOLLIN | ::EPOLLOUT: return Poller::READ_WRITABLE;
- default:
- return (events & (::EPOLLHUP | ::EPOLLERR)) ?
- Poller::DISCONNECTED : Poller::INVALID;
- }
- }
-
- PollerPrivate() :
- epollFd(::epoll_create(DefaultFds)),
- isShutdown(false) {
- QPID_POSIX_CHECK(epollFd);
- ::sigemptyset(&sigMask);
- // Add always readable fd into our set (but not listening to it yet)
- ::epoll_event epe;
- epe.events = 0;
- epe.data.u64 = 1;
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_ADD, alwaysReadableFd, &epe));
- }
-
- ~PollerPrivate() {
- // It's probably okay to ignore any errors here as there can't be data loss
- ::close(epollFd);
-
- // Need to put the interruptHandle in idle state to delete it
- static_cast<PollerHandle&>(interruptHandle).impl->setIdle();
- }
-
- void resetMode(PollerHandlePrivate& handle);
-
- void interrupt() {
- ::epoll_event epe;
- // Use EPOLLONESHOT so we only wake a single thread
- epe.events = ::EPOLLIN | ::EPOLLONESHOT;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));
- }
-
- void interruptAll() {
- ::epoll_event epe;
- // Not EPOLLONESHOT, so we eventually get all threads
- epe.events = ::EPOLLIN;
- epe.data.u64 = 2; // Keep valgrind happy
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));
- }
-};
-
-PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
-int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
-
-void Poller::registerHandle(PollerHandle& handle) {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- assert(eh.isIdle());
-
- ::epoll_event epe;
- epe.events = ::EPOLLONESHOT;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
-
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
-
- eh.setActive();
-}
-
-void Poller::unregisterHandle(PollerHandle& handle) {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- assert(!eh.isIdle());
-
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
- // Ignore EBADF since deleting a nonexistent fd has the overall required result!
- // And allows the case where a sloppy program closes the fd and then does the delFd()
- if (rc == -1 && errno != EBADF) {
- QPID_POSIX_CHECK(rc);
- }
-
- eh.setIdle();
-}
-
-void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
- PollerHandle* ph;
- {
- ScopedLock<Mutex> l(eh.lock);
- assert(!eh.isActive());
-
- if (eh.isIdle() || eh.isDeleted()) {
- return;
- }
-
- if (eh.events==0) {
- eh.setActive();
- return;
- }
-
- if (!eh.isInterrupted()) {
- ::epoll_event epe;
- epe.events = eh.events | ::EPOLLONESHOT;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
-
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
-
- eh.setActive();
- return;
- }
- ph = eh.pollerHandle;
- }
-
- PollerHandlePrivate& ihp = *static_cast<PollerHandle&>(interruptHandle).impl;
- ScopedLock<Mutex> l(ihp.lock);
- interruptHandle.addHandle(*ph);
- ihp.setActive();
- interrupt();
-}
-
-void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- assert(!eh.isIdle());
-
- ::__uint32_t oldEvents = eh.events;
- eh.events |= PollerPrivate::directionToEpollEvent(dir);
-
- // If no change nothing more to do - avoid unnecessary system call
- if (oldEvents==eh.events) {
- return;
- }
-
- // If we're not actually listening wait till we are to perform change
- if (!eh.isActive()) {
- return;
- }
-
- ::epoll_event epe;
- epe.events = eh.events | ::EPOLLONESHOT;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
-
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
-}
-
-void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- assert(!eh.isIdle());
-
- ::__uint32_t oldEvents = eh.events;
- eh.events &= ~PollerPrivate::directionToEpollEvent(dir);
-
- // If no change nothing more to do - avoid unnecessary system call
- if (oldEvents==eh.events) {
- return;
- }
-
- // If we're not actually listening wait till we are to perform change
- if (!eh.isActive()) {
- return;
- }
-
- ::epoll_event epe;
- epe.events = eh.events | ::EPOLLONESHOT;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
-
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
-}
-
-void Poller::shutdown() {
- // NB: this function must be async-signal safe, it must not
- // call any function that is not async-signal safe.
-
- // Allow sloppy code to shut us down more than once
- if (impl->isShutdown)
- return;
-
- // Don't use any locking here - isShutdown will be visible to all
- // after the epoll_ctl() anyway (it's a memory barrier)
- impl->isShutdown = true;
-
- impl->interruptAll();
-}
-
-bool Poller::interrupt(PollerHandle& handle) {
- {
- PollerHandlePrivate& eh = *handle.impl;
- ScopedLock<Mutex> l(eh.lock);
- if (eh.isIdle() || eh.isDeleted()) {
- return false;
- }
-
- if (eh.isInterrupted()) {
- return true;
- }
-
- // Stop monitoring handle for read or write
- ::epoll_event epe;
- epe.events = 0;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
-
- if (eh.isInactive()) {
- eh.setInterrupted();
- return true;
- }
- eh.setInterrupted();
- }
-
- PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
- PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;
- ScopedLock<Mutex> l(eh.lock);
- ih.addHandle(handle);
-
- impl->interrupt();
- eh.setActive();
- return true;
-}
-
-void Poller::run() {
- // Ensure that we exit thread responsibly under all circumstances
- try {
- // Make sure we can't be interrupted by signals at a bad time
- ::sigset_t ss;
- ::sigfillset(&ss);
- ::pthread_sigmask(SIG_SETMASK, &ss, 0);
-
- do {
- Event event = wait();
-
- // If can read/write then dispatch appropriate callbacks
- if (event.handle) {
- event.process();
- } else {
- // Handle shutdown
- switch (event.type) {
- case SHUTDOWN:
- PollerHandleDeletionManager.destroyThreadState();
- return;
- default:
- // This should be impossible
- assert(false);
- }
- }
- } while (true);
- } catch (const std::exception& e) {
- QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what());
- }
- PollerHandleDeletionManager.destroyThreadState();
-}
-
-Poller::Event Poller::wait(Duration timeout) {
- static __thread PollerHandlePrivate* lastReturnedHandle = 0;
- epoll_event epe;
- int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
- AbsTime targetTimeout =
- (timeout == TIME_INFINITE) ?
- FAR_FUTURE :
- AbsTime(now(), timeout);
-
- if (lastReturnedHandle) {
- impl->resetMode(*lastReturnedHandle);
- lastReturnedHandle = 0;
- }
-
- // Repeat until we weren't interrupted by signal
- do {
- PollerHandleDeletionManager.markAllUnusedInThisThread();
- // Need to run on kernels without epoll_pwait()
- // - fortunately in this case we don't really need the atomicity of epoll_pwait()
-#if 1
- sigset_t os;
- pthread_sigmask(SIG_SETMASK, &impl->sigMask, &os);
- int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
- pthread_sigmask(SIG_SETMASK, &os, 0);
-#else
- int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask);
-#endif
-
- if (rc ==-1 && errno != EINTR) {
- QPID_POSIX_CHECK(rc);
- } else if (rc > 0) {
- assert(rc == 1);
- void* dataPtr = epe.data.ptr;
-
- // Check if this is an interrupt
- PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle;
- if (dataPtr == &interruptHandle) {
- PollerHandle* wrappedHandle = 0;
- {
- ScopedLock<Mutex> l(interruptHandle.impl->lock);
- if (interruptHandle.impl->isActive()) {
- wrappedHandle = interruptHandle.getHandle();
- // If there is an interrupt queued behind this one we need to arm it
- // We do it this way so that another thread can pick it up
- if (interruptHandle.queuedHandles()) {
- impl->interrupt();
- interruptHandle.impl->setActive();
- } else {
- interruptHandle.impl->setInactive();
- }
- }
- }
- if (wrappedHandle) {
- PollerHandlePrivate& eh = *wrappedHandle->impl;
- {
- ScopedLock<Mutex> l(eh.lock);
- if (!eh.isDeleted()) {
- if (!eh.isIdle()) {
- eh.setInactive();
- }
- lastReturnedHandle = &eh;
- assert(eh.pollerHandle == wrappedHandle);
- return Event(wrappedHandle, INTERRUPTED);
- }
- }
- PollerHandleDeletionManager.markForDeletion(&eh);
- }
- continue;
- }
-
- // Check for shutdown
- if (impl->isShutdown) {
- PollerHandleDeletionManager.markAllUnusedInThisThread();
- return Event(0, SHUTDOWN);
- }
-
- PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr);
- ScopedLock<Mutex> l(eh.lock);
-
- // the handle could have gone inactive since we left the epoll_wait
- if (eh.isActive()) {
- PollerHandle* handle = eh.pollerHandle;
- assert(handle);
-
- // If the connection has been hungup we could still be readable
- // (just not writable), allow us to readable until we get here again
- if (epe.events & ::EPOLLHUP) {
- if (eh.isHungup()) {
- eh.setInactive();
- // Don't set up last Handle so that we don't reset this handle
- // on re-entering Poller::wait. This means that we will never
- // be set active again once we've returned disconnected, and so
- // can never be returned again.
- return Event(handle, DISCONNECTED);
- }
- eh.setHungup();
- } else {
- eh.setInactive();
- }
- lastReturnedHandle = &eh;
- return Event(handle, PollerPrivate::epollToDirection(epe.events));
- }
- }
- // We only get here if one of the following:
- // * epoll_wait was interrupted by a signal
- // * epoll_wait timed out
- // * the state of the handle changed after being returned by epoll_wait
- //
- // The only things we can do here are return a timeout or wait more.
- // Obviously if we timed out we return timeout; if the wait was meant to
- // be indefinite then we should never return with a time out so we go again.
- // If the wait wasn't indefinite, we check whether we are after the target wait
- // time or not
- if (timeoutMs == -1) {
- continue;
- }
- if (rc == 0 && now() > targetTimeout) {
- PollerHandleDeletionManager.markAllUnusedInThisThread();
- return Event(0, TIMEOUT);
- }
- } while (true);
-}
-
-// Concrete constructors
-Poller::Poller() :
- impl(new PollerPrivate())
-{}
-
-Poller::~Poller() {
- delete impl;
-}
-
-}}