/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/sys/Poller.h" #include "qpid/sys/IOHandle.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/AtomicCount.h" #include "qpid/sys/DeletionManager.h" #include "qpid/sys/posix/check.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/log/Statement.h" #include "qpid/sys/Condition.h" #include #include #include #include #include #include #include /* * * This is a qpid::sys::Poller implementation for Posix systems. * * This module follows the structure of the Linux EpollPoller as closely as possible * to simplify maintainability. Noteworthy differences: * * The Linux epoll_xxx() calls present one event at a time to multiple callers whereas poll() * returns one or more events to a single caller. The EventStream class layers a * "one event per call" view of the poll() result to multiple threads. * * The HandleSet is the master set of in-use PollerHandles. The EventStream * maintains a snapshot copy taken just before the call to poll() that remains static * until all flagged events have been processed. * * There is an additional window where the PollerHandlePrivate class may survive the * parent PollerHandle destructor, i.e. between snapshots. * * Safe interrupting of the Poller is implemented using the "self-pipe trick". * */ namespace qpid { namespace sys { // Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used DeletionManager PollerHandleDeletionManager; // Instantiate (and define) class static for DeletionManager template <> DeletionManager::AllThreadsStatuses DeletionManager::allThreadsStatuses(0); class PollerHandlePrivate { friend class Poller; friend class PollerPrivate; friend class PollerHandle; friend class HandleSet; enum FDStat { ABSENT, MONITORED, INACTIVE, HUNGUP, MONITORED_HUNGUP, INTERRUPTED, INTERRUPTED_HUNGUP, DELETED }; short events; const IOHandle* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; PollerHandlePrivate(const IOHandle* h, PollerHandle* p) : events(0), ioHandle(h), pollerHandle(p), stat(ABSENT) { } int fd() const { return ioHandle->fd; } bool isActive() const { return stat == MONITORED || stat == MONITORED_HUNGUP; } void setActive() { stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP) ? MONITORED_HUNGUP : MONITORED; } bool isInactive() const { return stat == INACTIVE || stat == HUNGUP; } void setInactive() { stat = INACTIVE; } bool isIdle() const { return stat == ABSENT; } void setIdle() { stat = ABSENT; } bool isHungup() const { return stat == MONITORED_HUNGUP || stat == HUNGUP || stat == INTERRUPTED_HUNGUP; } void setHungup() { assert(stat == MONITORED); stat = HUNGUP; } bool isInterrupted() const { return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP; } void setInterrupted() { stat = (stat == MONITORED_HUNGUP || stat == HUNGUP) ? INTERRUPTED_HUNGUP : INTERRUPTED; } bool isDeleted() const { return stat == DELETED; } void setDeleted() { stat = DELETED; } }; PollerHandle::PollerHandle(const IOHandle& h) : impl(new PollerHandlePrivate(&h, this)) {} PollerHandle::~PollerHandle() { { ScopedLock l(impl->lock); if (impl->isDeleted()) { return; } impl->pollerHandle = 0; if (impl->isInterrupted()) { impl->setDeleted(); return; } assert(impl->isIdle()); impl->setDeleted(); } PollerHandleDeletionManager.markForDeletion(impl); } class HandleSet { Mutex lock; bool stale; std::set handles; public: HandleSet() : stale(true) {} void add(PollerHandlePrivate*); void remove(PollerHandlePrivate*); void cleanup(); bool snapshot(std::vector& , std::vector&); void setStale(); }; void HandleSet::add(PollerHandlePrivate* h) { ScopedLock l(lock); handles.insert(h); } void HandleSet::remove(PollerHandlePrivate* h) { ScopedLock l(lock); handles.erase(h); } void HandleSet::cleanup() { // Inform all registered handles of disconnection std::set copy; handles.swap(copy); for (std::set::const_iterator i = copy.begin(); i != copy.end(); ++i) { PollerHandlePrivate& eh = **i; { ScopedLock l(eh.lock); if (!eh.isDeleted()) { Poller::Event event((*i)->pollerHandle, Poller::DISCONNECTED); event.process(); } } } } void HandleSet::setStale() { // invalidate cached pollfds for next snapshot ScopedLock l(lock); stale = true; } /** * Concrete implementation of Poller to use Posix poll() * interface */ class PollerPrivate { friend class Poller; friend class EventStream; friend class HandleSet; class SignalPipe { /** * Used to wakeup a thread in ::poll() */ int fds[2]; bool signaled; bool permanent; Mutex lock; public: SignalPipe() : signaled(false), permanent(false) { QPID_POSIX_CHECK(::pipe(fds)); } ~SignalPipe() { ::close(fds[0]); ::close(fds[1]); } int getFD() { return fds[0]; } bool isSet() { return signaled; } void set() { ScopedLock l(lock); if (signaled) return; signaled = true; QPID_POSIX_CHECK(::write(fds[1], " ", 1)); } void reset() { if (permanent) return; ScopedLock l(lock); if (signaled) { char ignore; QPID_POSIX_CHECK(::read(fds[0], &ignore, 1)); signaled = false; } } void setPermanently() { // async signal safe calls only. No locking. permanent = true; signaled = true; QPID_POSIX_CHECK(::write(fds[1], " ", 2)); // poll() should never block now } }; // Collect pending events and serialize access. Maintain array of pollfd structs. class EventStream { typedef Poller::Event Event; PollerPrivate& pollerPrivate; SignalPipe& signalPipe; std::queue interruptedHandles; std::vector pollfds; std::vector pollHandles; Mutex streamLock; Mutex serializeLock; Condition serializer; bool busy; int currentPollfd; int pollCount; int waiters; public: EventStream(PollerPrivate* p) : pollerPrivate(*p), signalPipe(p->signalPipe), busy(false), currentPollfd(0), pollCount(0), waiters(0) { // The signal pipe is the first element of pollfds and pollHandles pollfds.reserve(8); pollfds.resize(1); pollfds[0].fd = pollerPrivate.signalPipe.getFD(); pollfds[0].events = POLLIN; pollfds[0].revents = 0; pollHandles.reserve(8); pollHandles.resize(1); pollHandles[0] = 0; } void addInterrupt(PollerHandle& handle) { ScopedLock l(streamLock); interruptedHandles.push(handle.impl); } // Serialize access to the stream. Event next(Duration timeout) { AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE : AbsTime(now(), timeout); ScopedLock l(serializeLock); Event event(0, Poller::INVALID); while (busy) { waiters++; bool timedout = !serializer.wait(serializeLock, targetTimeout); waiters--; if (busy && timedout) { return Event(0, Poller::TIMEOUT); } } busy = true; { ScopedUnlock ul(serializeLock); event = getEvent(targetTimeout); } busy = false; if (waiters > 0) serializer.notify(); return event; } Event getEvent(AbsTime targetTimeout) { bool timeoutPending = false; ScopedLock l(streamLock); // hold lock except for poll() // loop until poll event, async interrupt, or timeout while (true) { // first check for any interrupts while (interruptedHandles.size() > 0) { PollerHandlePrivate& eh = *interruptedHandles.front(); interruptedHandles.pop(); { ScopedLock lk(eh.lock); if (!eh.isDeleted()) { if (!eh.isIdle()) { eh.setInactive(); } // nullify the corresponding pollfd event, if any int ehfd = eh.fd(); std::vector::iterator i = pollfds.begin() + 1; // skip self pipe at front for (; i != pollfds.end(); i++) { if (i->fd == ehfd) { i->events = 0; if (i->revents) { i->revents = 0; pollCount--; } break; } } return Event(eh.pollerHandle, Poller::INTERRUPTED); } } PollerHandleDeletionManager.markForDeletion(&eh); } // Check for shutdown if (pollerPrivate.isShutdown) { PollerHandleDeletionManager.markAllUnusedInThisThread(); return Event(0, Poller::SHUTDOWN); } // search for any remaining events from earlier poll() int nfds = pollfds.size(); while ((pollCount > 0) && (currentPollfd < nfds)) { int index = currentPollfd++; short evt = pollfds[index].revents; if (evt != 0) { pollCount--; PollerHandlePrivate& eh = *pollHandles[index]; ScopedLock l(eh.lock); // stop polling this handle until resetMode() pollfds[index].events = 0; // the handle could have gone inactive since snapshot taken if (eh.isActive()) { PollerHandle* handle = eh.pollerHandle; assert(handle); // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again if (evt & POLLHUP) { if (eh.isHungup()) { eh.setInactive(); // Don't set up last Handle so that we don't reset this handle // on re-entering Poller::wait. This means that we will never // be set active again once we've returned disconnected, and so // can never be returned again. return Event(handle, Poller::DISCONNECTED); } eh.setHungup(); } else { eh.setInactive(); } return Event(handle, PollerPrivate::epollToDirection(evt)); } } } if (timeoutPending) { return Event(0, Poller::TIMEOUT); } // no outstanding events, poll() for more { ScopedUnlock ul(streamLock); bool refreshed = pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds); if (refreshed) { // we just drained all interruptedHandles and got a fresh snapshot PollerHandleDeletionManager.markAllUnusedInThisThread(); } if (!signalPipe.isSet()) { int timeoutMs = -1; if (!(targetTimeout == FAR_FUTURE)) { timeoutMs = Duration(now(), targetTimeout) / TIME_MSEC; if (timeoutMs < 0) timeoutMs = 0; } pollCount = ::poll(&pollfds[0], pollfds.size(), timeoutMs); if (pollCount ==-1 && errno != EINTR) { QPID_POSIX_CHECK(pollCount); } else if (pollCount == 0) { // timeout, unless shutdown or interrupt arrives in another thread timeoutPending = true; } else { if (pollfds[0].revents) { pollCount--; // signal pipe doesn't count } } } else pollCount = 0; signalPipe.reset(); } currentPollfd = 1; } } }; bool isShutdown; HandleSet registeredHandles; AtomicCount threadCount; SignalPipe signalPipe; EventStream eventStream; static short directionToEpollEvent(Poller::Direction dir) { switch (dir) { case Poller::INPUT: return POLLIN; case Poller::OUTPUT: return POLLOUT; case Poller::INOUT: return POLLIN | POLLOUT; default: return 0; } } static Poller::EventType epollToDirection(short events) { // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs // can give you both! events = (events & POLLHUP) ? events & ~POLLOUT : events; short e = events & (POLLIN | POLLOUT); switch (e) { case POLLIN: return Poller::READABLE; case POLLOUT: return Poller::WRITABLE; case POLLIN | POLLOUT: return Poller::READ_WRITABLE; default: return (events & (POLLHUP | POLLERR)) ? Poller::DISCONNECTED : Poller::INVALID; } } PollerPrivate() : isShutdown(false), eventStream(this) { } ~PollerPrivate() {} void resetMode(PollerHandlePrivate& handle); void interrupt() { signalPipe.set(); } void interruptAll() { // be async signal safe signalPipe.setPermanently(); } }; void Poller::registerHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); assert(eh.isIdle()); eh.setActive(); impl->registeredHandles.add(handle.impl); // not stale until monitored } void Poller::unregisterHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); assert(!eh.isIdle()); eh.setIdle(); impl->registeredHandles.remove(handle.impl); impl->registeredHandles.setStale(); impl->interrupt(); } void PollerPrivate::resetMode(PollerHandlePrivate& eh) { PollerHandle* ph; { // Called after an event has been processed for a handle ScopedLock l(eh.lock); assert(!eh.isActive()); if (eh.isIdle() || eh.isDeleted()) { return; } if (eh.events==0) { eh.setActive(); return; } if (!eh.isInterrupted()) { // Handle still in use, allow events to resume. eh.setActive(); registeredHandles.setStale(); // Ouch. This scales poorly for large handle sets. // TODO: avoid new snapshot, perhaps create an index to pollfds or a // pending reset queue to be processed before each poll(). However, the real // scalable solution is to implement the OS-specific epoll equivalent. interrupt(); return; } ph = eh.pollerHandle; } eventStream.addInterrupt(*ph); interrupt(); } void Poller::monitorHandle(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); assert(!eh.isIdle()); short oldEvents = eh.events; eh.events |= PollerPrivate::directionToEpollEvent(dir); // If no change nothing more to do - avoid unnecessary system call if (oldEvents==eh.events) { return; } // If we're not actually listening wait till we are to perform change if (!eh.isActive()) { return; } // tell polling thread to update its pollfds impl->registeredHandles.setStale(); impl->interrupt(); } void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); assert(!eh.isIdle()); short oldEvents = eh.events; eh.events &= ~PollerPrivate::directionToEpollEvent(dir); // If no change nothing more to do - avoid unnecessary system call if (oldEvents==eh.events) { return; } // If we're not actually listening wait till we are to perform change if (!eh.isActive()) { return; } impl->registeredHandles.setStale(); impl->interrupt(); } void Poller::shutdown() { // NB: this function must be async-signal safe, it must not // call any function that is not async-signal safe. // Allow sloppy code to shut us down more than once if (impl->isShutdown) return; // Don't use any locking here - isShutdown will be visible to all // after the write() anyway (it's a memory barrier) impl->isShutdown = true; impl->interruptAll(); } bool Poller::interrupt(PollerHandle& handle) { { PollerHandlePrivate& eh = *handle.impl; ScopedLock l(eh.lock); if (eh.isIdle() || eh.isDeleted()) { return false; } if (eh.isInterrupted()) { return true; } if (eh.isInactive()) { eh.setInterrupted(); return true; } eh.setInterrupted(); eh.events = 0; } impl->registeredHandles.setStale(); impl->eventStream.addInterrupt(handle); impl->interrupt(); return true; } void Poller::run() { // Ensure that we exit thread responsibly under all circumstances try { // Make sure we can't be interrupted by signals at a bad time ::sigset_t ss; ::sigfillset(&ss); ::pthread_sigmask(SIG_SETMASK, &ss, 0); ++(impl->threadCount); do { Event event = wait(); // If can read/write then dispatch appropriate callbacks if (event.handle) { event.process(); } else { // Handle shutdown switch (event.type) { case SHUTDOWN: //last thread to respond to shutdown cleans up: if (--(impl->threadCount) == 0) impl->registeredHandles.cleanup(); PollerHandleDeletionManager.destroyThreadState(); return; default: // This should be impossible assert(false); } } } while (true); } catch (const std::exception& e) { QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what()); } PollerHandleDeletionManager.destroyThreadState(); --(impl->threadCount); } bool Poller::hasShutdown() { return impl->isShutdown; } Poller::Event Poller::wait(Duration timeout) { static __thread PollerHandlePrivate* lastReturnedHandle = 0; if (lastReturnedHandle) { impl->resetMode(*lastReturnedHandle); lastReturnedHandle = 0; } Event event = impl->eventStream.next(timeout); switch (event.type) { case INTERRUPTED: case READABLE: case WRITABLE: case READ_WRITABLE: lastReturnedHandle = event.handle->impl; break; default: ; } return event; } // Concrete constructors Poller::Poller() : impl(new PollerPrivate()) {} Poller::~Poller() { delete impl; } bool HandleSet::snapshot(std::vector& hs , std::vector& fds) { // Element 0 of the vectors is always the signal pipe, leave undisturbed { ScopedLock l(lock); if (!stale) return false; // no refresh done hs.resize(1); for (std::set::const_iterator i = handles.begin(); i != handles.end(); ++i) { hs.push_back(*i); } stale = false; // have copy of handle set (in vector form), drop the lock and build the pollfds } // sync pollfds to same sizing as the handles int sz = hs.size(); fds.resize(sz); for (int j = 1; j < sz; ++j) { // create a pollfd entry for each handle struct ::pollfd& pollfd = fds[j]; PollerHandlePrivate& eh = *hs[j]; ScopedLock lk(eh.lock); if (!eh.isInactive() && !eh.isDeleted()) { pollfd.fd = eh.fd(); pollfd.events = eh.events; } else { pollfd.fd = -1; // tell poll() to ignore this fd pollfd.events = 0; } } return true; } }}