diff options
author | Clifford Allan Jansen <cliffjansen@apache.org> | 2012-02-09 16:31:45 +0000 |
---|---|---|
committer | Clifford Allan Jansen <cliffjansen@apache.org> | 2012-02-09 16:31:45 +0000 |
commit | 261f22ddb7a44201a9f9581e6016f0ad83ed625d (patch) | |
tree | bb07dbaaa07c2a881bfb4a5d554988639861f65b | |
parent | ef7ba8bbf8dbfaa52ed36a184018040856747672 (diff) | |
download | qpid-python-261f22ddb7a44201a9f9581e6016f0ad83ed625d.tar.gz |
QPID-3571 PosixPoller implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1242380 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp | 793 |
1 files changed, 793 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp b/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp new file mode 100644 index 0000000000..eb0c3384d1 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp @@ -0,0 +1,793 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/AtomicCount.h" +#include "qpid/sys/DeletionManager.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Condition.h" + +#include <poll.h> +#include <errno.h> +#include <signal.h> + +#include <assert.h> +#include <queue> +#include <set> +#include <exception> + +/* + * + * This is a qpid::sys::Poller implementation for Posix systems. + * + * This module follows the structure of the Linux EpollPoller as closely as possible + * to simplify maintainability. Noteworthy differences: + * + * The Linux epoll_xxx() calls present one event at a time to multiple callers whereas poll() + * returns one or more events to a single caller. The EventStream class layers a + * "one event per call" view of the poll() result to multiple threads. + * + * The HandleSet is the master set of in-use PollerHandles. The EventStream + * maintains a snapshot copy taken just before the call to poll() that remains static + * until all flagged events have been processed. + * + * There is an additional window where the PollerHandlePrivate class may survive the + * parent PollerHandle destructor, i.e. between snapshots. + * + * Safe interrupting of the Poller is implemented using the "self-pipe trick". + * + */ + +namespace qpid { +namespace sys { + +// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used +DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager; + +// Instantiate (and define) class static for DeletionManager +template <> +DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0); + +class PollerHandlePrivate { + friend class Poller; + friend class PollerPrivate; + friend class PollerHandle; + friend class HandleSet; + + enum FDStat { + ABSENT, + MONITORED, + INACTIVE, + HUNGUP, + MONITORED_HUNGUP, + INTERRUPTED, + INTERRUPTED_HUNGUP, + DELETED + }; + + short events; + const IOHandlePrivate* ioHandle; + PollerHandle* pollerHandle; + FDStat stat; + Mutex lock; + + PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : + events(0), + ioHandle(h), + pollerHandle(p), + stat(ABSENT) { + } + + int fd() const { + return toFd(ioHandle); + } + + bool isActive() const { + return stat == MONITORED || stat == MONITORED_HUNGUP; + } + + void setActive() { + stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP) + ? MONITORED_HUNGUP + : MONITORED; + } + + bool isInactive() const { + return stat == INACTIVE || stat == HUNGUP; + } + + void setInactive() { + stat = INACTIVE; + } + + bool isIdle() const { + return stat == ABSENT; + } + + void setIdle() { + stat = ABSENT; + } + + bool isHungup() const { + return + stat == MONITORED_HUNGUP || + stat == HUNGUP || + stat == INTERRUPTED_HUNGUP; + } + + void setHungup() { + assert(stat == MONITORED); + stat = HUNGUP; + } + + bool isInterrupted() const { + return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP; + } + + void setInterrupted() { + stat = (stat == MONITORED_HUNGUP || stat == HUNGUP) + ? INTERRUPTED_HUNGUP + : INTERRUPTED; + } + + bool isDeleted() const { + return stat == DELETED; + } + + void setDeleted() { + stat = DELETED; + } +}; + +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(h.impl, this)) +{} + +PollerHandle::~PollerHandle() { + { + ScopedLock<Mutex> l(impl->lock); + if (impl->isDeleted()) { + return; + } + impl->pollerHandle = 0; + if (impl->isInterrupted()) { + impl->setDeleted(); + return; + } + assert(impl->isIdle()); + impl->setDeleted(); + } + PollerHandleDeletionManager.markForDeletion(impl); +} + +class HandleSet +{ + Mutex lock; + bool stale; + std::set<PollerHandlePrivate*> handles; + public: + HandleSet() : stale(true) {} + void add(PollerHandlePrivate*); + void remove(PollerHandlePrivate*); + void cleanup(); + bool snapshot(std::vector<PollerHandlePrivate *>& , std::vector<struct ::pollfd>&); + void setStale(); +}; + +void HandleSet::add(PollerHandlePrivate* h) +{ + ScopedLock<Mutex> l(lock); + handles.insert(h); +} +void HandleSet::remove(PollerHandlePrivate* h) +{ + ScopedLock<Mutex> l(lock); + handles.erase(h); +} +void HandleSet::cleanup() +{ + // Inform all registered handles of disconnection + std::set<PollerHandlePrivate*> copy; + handles.swap(copy); + for (std::set<PollerHandlePrivate*>::const_iterator i = copy.begin(); i != copy.end(); ++i) { + PollerHandlePrivate& eh = **i; + { + ScopedLock<Mutex> l(eh.lock); + if (!eh.isDeleted()) { + Poller::Event event((*i)->pollerHandle, Poller::DISCONNECTED); + event.process(); + } + } + } +} +void HandleSet::setStale() +{ + // invalidate cached pollfds for next snapshot + ScopedLock<Mutex> l(lock); + stale = true; +} + +/** + * Concrete implementation of Poller to use Posix poll() + * interface + */ +class PollerPrivate { + friend class Poller; + friend class EventStream; + friend class HandleSet; + + class SignalPipe { + /** + * Used to wakeup a thread in ::poll() + */ + int fds[2]; + bool signaled; + bool permanent; + Mutex lock; + public: + SignalPipe() : signaled(false), permanent(false) { + QPID_POSIX_CHECK(::pipe(fds)); + } + + ~SignalPipe() { + ::close(fds[0]); + ::close(fds[1]); + } + + int getFD() { + return fds[0]; + } + + bool isSet() { + return signaled; + } + + void set() { + ScopedLock<Mutex> l(lock); + if (signaled) + return; + signaled = true; + QPID_POSIX_CHECK(::write(fds[1], " ", 1)); + } + + void reset() { + if (permanent) + return; + ScopedLock<Mutex> l(lock); + if (signaled) { + char ignore; + QPID_POSIX_CHECK(::read(fds[0], &ignore, 1)); + signaled = false; + } + } + + void setPermanently() { + // async signal safe calls only. No locking. + permanent = true; + signaled = true; + QPID_POSIX_CHECK(::write(fds[1], " ", 2)); + // poll() should never block now + } + }; + + // Collect pending events and serialize access. Maintain array of pollfd structs. + class EventStream { + typedef Poller::Event Event; + PollerPrivate& pollerPrivate; + SignalPipe& signalPipe; + std::queue<PollerHandlePrivate*> interruptedHandles; + std::vector<struct ::pollfd> pollfds; + std::vector<PollerHandlePrivate*> pollHandles; + Mutex streamLock; + Mutex serializeLock; + Condition serializer; + bool busy; + int currentPollfd; + int pollCount; + int waiters; + + public: + + EventStream(PollerPrivate* p) : pollerPrivate(*p), signalPipe(p->signalPipe), busy(false), + currentPollfd(0), pollCount(0), waiters(0) { + // The signal pipe is the first element of pollfds and pollHandles + pollfds.reserve(8); + pollfds.resize(1); + pollfds[0].fd = pollerPrivate.signalPipe.getFD(); + pollfds[0].events = POLLIN; + pollfds[0].revents = 0; + + pollHandles.reserve(8); + pollHandles.resize(1); + pollHandles[0] = 0; + } + + void addInterrupt(PollerHandle& handle) { + ScopedLock<Mutex> l(streamLock); + interruptedHandles.push(handle.impl); + } + + // Serialize access to the stream. + Event next(Duration timeout) { + AbsTime targetTimeout = + (timeout == TIME_INFINITE) ? + FAR_FUTURE : + AbsTime(now(), timeout); + + + ScopedLock<Mutex> l(serializeLock); + Event event(0, Poller::INVALID); + while (busy) { + waiters++; + bool timedout = !serializer.wait(serializeLock, targetTimeout); + waiters--; + + if (busy && timedout) { + return Event(0, Poller::TIMEOUT); + } + } + busy = true; + { + ScopedUnlock<Mutex> ul(serializeLock); + event = getEvent(targetTimeout); + } + busy = false; + + if (waiters > 0) + serializer.notify(); + return event; + } + + Event getEvent(AbsTime targetTimeout) { + bool timeoutPending = false; + + ScopedLock<Mutex> l(streamLock); // hold lock except for poll() + + // loop until poll event, async interrupt, or timeout + while (true) { + + // first check for any interrupts + while (interruptedHandles.size() > 0) { + PollerHandlePrivate& eh = *interruptedHandles.front(); + interruptedHandles.pop(); + { + ScopedLock<Mutex> lk(eh.lock); + if (!eh.isDeleted()) { + if (!eh.isIdle()) { + eh.setInactive(); + } + + // nullify the corresponding pollfd event, if any + int ehfd = eh.fd(); + std::vector<struct ::pollfd>::iterator i = pollfds.begin() + 1; // skip self pipe at front + for (; i != pollfds.end(); i++) { + if (i->fd == ehfd) { + i->events = 0; + if (i->revents) { + i->revents = 0; + pollCount--; + } + break; + } + } + return Event(eh.pollerHandle, Poller::INTERRUPTED); + } + } + PollerHandleDeletionManager.markForDeletion(&eh); + } + + // Check for shutdown + if (pollerPrivate.isShutdown) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, Poller::SHUTDOWN); + } + + // search for any remaining events from earlier poll() + int nfds = pollfds.size(); + while ((pollCount > 0) && (currentPollfd < nfds)) { + int index = currentPollfd++; + short evt = pollfds[index].revents; + if (evt != 0) { + pollCount--; + PollerHandlePrivate& eh = *pollHandles[index]; + ScopedLock<Mutex> l(eh.lock); + // stop polling this handle until resetMode() + pollfds[index].events = 0; + + // the handle could have gone inactive since snapshot taken + if (eh.isActive()) { + PollerHandle* handle = eh.pollerHandle; + assert(handle); + + // If the connection has been hungup we could still be readable + // (just not writable), allow us to readable until we get here again + if (evt & POLLHUP) { + if (eh.isHungup()) { + eh.setInactive(); + // Don't set up last Handle so that we don't reset this handle + // on re-entering Poller::wait. This means that we will never + // be set active again once we've returned disconnected, and so + // can never be returned again. + return Event(handle, Poller::DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); + } + return Event(handle, PollerPrivate::epollToDirection(evt)); + } + } + } + + if (timeoutPending) { + return Event(0, Poller::TIMEOUT); + } + + // no outstanding events, poll() for more + { + ScopedUnlock<Mutex> ul(streamLock); + + bool refreshed = pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds); + if (refreshed) { + // we just drained all interruptedHandles and got a fresh snapshot + PollerHandleDeletionManager.markAllUnusedInThisThread(); + } + + if (!signalPipe.isSet()) { + int timeoutMs = -1; + if (!(targetTimeout == FAR_FUTURE)) { + timeoutMs = Duration(now(), targetTimeout) / TIME_MSEC; + if (timeoutMs < 0) + timeoutMs = 0; + } + + pollCount = ::poll(&pollfds[0], pollfds.size(), timeoutMs); + + if (pollCount ==-1 && errno != EINTR) { + QPID_POSIX_CHECK(pollCount); + } + else if (pollCount == 0) { + // timeout, unless shutdown or interrupt arrives in another thread + timeoutPending = true; + } + else { + if (pollfds[0].revents) { + pollCount--; // signal pipe doesn't count + } + } + } + else + pollCount = 0; + signalPipe.reset(); + } + currentPollfd = 1; + } + } + }; + + bool isShutdown; + HandleSet registeredHandles; + AtomicCount threadCount; + SignalPipe signalPipe; + EventStream eventStream; + + static short directionToEpollEvent(Poller::Direction dir) { + switch (dir) { + case Poller::INPUT: return POLLIN; + case Poller::OUTPUT: return POLLOUT; + case Poller::INOUT: return POLLIN | POLLOUT; + default: return 0; + } + } + + static Poller::EventType epollToDirection(short events) { + // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs + // can give you both! + events = (events & POLLHUP) ? events & ~POLLOUT : events; + short e = events & (POLLIN | POLLOUT); + switch (e) { + case POLLIN: return Poller::READABLE; + case POLLOUT: return Poller::WRITABLE; + case POLLIN | POLLOUT: return Poller::READ_WRITABLE; + default: + return (events & (POLLHUP | POLLERR)) ? + Poller::DISCONNECTED : Poller::INVALID; + } + } + + PollerPrivate() : + isShutdown(false), eventStream(this) { + } + + ~PollerPrivate() {} + + void resetMode(PollerHandlePrivate& handle); + + void interrupt() { + signalPipe.set(); + } + + void interruptAll() { + // be async signal safe + signalPipe.setPermanently(); + } +}; + + +void Poller::registerHandle(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(eh.isIdle()); + + eh.setActive(); + impl->registeredHandles.add(handle.impl); + // not stale until monitored +} + +void Poller::unregisterHandle(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + eh.setIdle(); + impl->registeredHandles.remove(handle.impl); + impl->registeredHandles.setStale(); + impl->interrupt(); +} + +void PollerPrivate::resetMode(PollerHandlePrivate& eh) { + PollerHandle* ph; + { + // Called after an event has been processed for a handle + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isActive()); + + if (eh.isIdle() || eh.isDeleted()) { + return; + } + + if (eh.events==0) { + eh.setActive(); + return; + } + + if (!eh.isInterrupted()) { + // Handle still in use, allow events to resume. + eh.setActive(); + registeredHandles.setStale(); + // Ouch. This scales poorly for large handle sets. + // TODO: avoid new snapshot, perhaps create an index to pollfds or a + // pending reset queue to be processed before each poll(). However, the real + // scalable solution is to implement the OS-specific epoll equivalent. + interrupt(); + return; + } + ph = eh.pollerHandle; + } + + eventStream.addInterrupt(*ph); + interrupt(); +} + +void Poller::monitorHandle(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + short oldEvents = eh.events; + eh.events |= PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + + // tell polling thread to update its pollfds + impl->registeredHandles.setStale(); + impl->interrupt(); +} + +void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + short oldEvents = eh.events; + eh.events &= ~PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + + impl->registeredHandles.setStale(); + impl->interrupt(); +} + +void Poller::shutdown() { + // NB: this function must be async-signal safe, it must not + // call any function that is not async-signal safe. + + // Allow sloppy code to shut us down more than once + if (impl->isShutdown) + return; + + // Don't use any locking here - isShutdown will be visible to all + // after the write() anyway (it's a memory barrier) + impl->isShutdown = true; + + impl->interruptAll(); +} + +bool Poller::interrupt(PollerHandle& handle) { + { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + if (eh.isIdle() || eh.isDeleted()) { + return false; + } + + if (eh.isInterrupted()) { + return true; + } + + if (eh.isInactive()) { + eh.setInterrupted(); + return true; + } + eh.setInterrupted(); + eh.events = 0; + } + + impl->registeredHandles.setStale(); + impl->eventStream.addInterrupt(handle); + impl->interrupt(); + return true; +} + +void Poller::run() { + // Ensure that we exit thread responsibly under all circumstances + try { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + ++(impl->threadCount); + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + //last thread to respond to shutdown cleans up: + if (--(impl->threadCount) == 0) impl->registeredHandles.cleanup(); + PollerHandleDeletionManager.destroyThreadState(); + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); + } catch (const std::exception& e) { + QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what()); + } + PollerHandleDeletionManager.destroyThreadState(); + --(impl->threadCount); +} + +bool Poller::hasShutdown() +{ + return impl->isShutdown; +} + +Poller::Event Poller::wait(Duration timeout) { + static __thread PollerHandlePrivate* lastReturnedHandle = 0; + + if (lastReturnedHandle) { + impl->resetMode(*lastReturnedHandle); + lastReturnedHandle = 0; + } + + Event event = impl->eventStream.next(timeout); + + switch (event.type) { + case INTERRUPTED: + case READABLE: + case WRITABLE: + case READ_WRITABLE: + lastReturnedHandle = event.handle->impl; + break; + default: + ; + } + + return event; +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + + +bool HandleSet::snapshot(std::vector<PollerHandlePrivate *>& hs , std::vector<struct ::pollfd>& fds) +{ + // Element 0 of the vectors is always the signal pipe, leave undisturbed + { + ScopedLock<Mutex> l(lock); + if (!stale) + return false; // no refresh done + + hs.resize(1); + for (std::set<PollerHandlePrivate*>::const_iterator i = handles.begin(); i != handles.end(); ++i) { + hs.push_back(*i); + } + stale = false; + // have copy of handle set (in vector form), drop the lock and build the pollfds + } + + // sync pollfds to same sizing as the handles + int sz = hs.size(); + fds.resize(sz); + + for (int j = 1; j < sz; ++j) { + // create a pollfd entry for each handle + struct ::pollfd& pollfd = fds[j]; + PollerHandlePrivate& eh = *hs[j]; + ScopedLock<Mutex> lk(eh.lock); + + if (!eh.isInactive() && !eh.isDeleted()) { + pollfd.fd = eh.fd(); + pollfd.events = eh.events; + } else { + pollfd.fd = -1; // tell poll() to ignore this fd + pollfd.events = 0; + } + } + return true; +} + + +}} |