diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-05-04 15:55:21 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-05-04 15:55:21 +0000 |
commit | 564d179640cf49feeb8ff84133f892499afb0e65 (patch) | |
tree | d6fb4c2f2c789379937c2b622772cea2aed69e1d /cpp | |
parent | c912884c11debf57e8c154fba7dbbcae8ea34d90 (diff) | |
download | qpid-python-564d179640cf49feeb8ff84133f892499afb0e65.tar.gz |
Refactored the DispatchHandle/Poller code to remove a long standing
set of race conditions.
- Changed Poller naming for better clarity with
new semantics.
- Changed Poller semantics to avoid DispatchHandle
keeping so much state
- Changed Poller so that it will never re-enable a
Handle until Poller::wait is called again on the same thread
that returned the Handle.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@771338 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/PollerDispatch.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/PollerDispatch.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.cpp | 394 | ||||
-rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Poller.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 178 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 8 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/IocpPoller.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/PollableCondition.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/PollerTest.cpp | 58 |
11 files changed, 313 insertions, 381 deletions
diff --git a/cpp/src/qpid/cluster/PollerDispatch.cpp b/cpp/src/qpid/cluster/PollerDispatch.cpp index 528a21481c..c8e28c0d51 100644 --- a/cpp/src/qpid/cluster/PollerDispatch.cpp +++ b/cpp/src/qpid/cluster/PollerDispatch.cpp @@ -36,6 +36,10 @@ PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p, ) {} +PollerDispatch::~PollerDispatch() { + dispatchHandle.stopWatch(); +} + void PollerDispatch::start() { dispatchHandle.startWatch(poller); } diff --git a/cpp/src/qpid/cluster/PollerDispatch.h b/cpp/src/qpid/cluster/PollerDispatch.h index 24b3496eb5..52137b72a8 100644 --- a/cpp/src/qpid/cluster/PollerDispatch.h +++ b/cpp/src/qpid/cluster/PollerDispatch.h @@ -37,6 +37,9 @@ class PollerDispatch { public: PollerDispatch(Cpg&, boost::shared_ptr<sys::Poller> poller, boost::function<void()> onError) ; + + ~PollerDispatch(); + void start(); private: @@ -47,7 +50,7 @@ class PollerDispatch { Cpg& cpg; boost::shared_ptr<sys::Poller> poller; boost::function<void()> onError; - sys::DispatchHandle dispatchHandle; + sys::DispatchHandleRef dispatchHandle; }; diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp index cd7dec7fa6..7bf305d275 100644 --- a/cpp/src/qpid/sys/DispatchHandle.cpp +++ b/cpp/src/qpid/sys/DispatchHandle.cpp @@ -30,6 +30,16 @@ namespace qpid { namespace sys { +DispatchHandle::DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : + PollerHandle(h), + readableCallback(rCb), + writableCallback(wCb), + disconnectedCallback(dCb), + state(IDLE) +{ +} + + DispatchHandle::~DispatchHandle() { } @@ -38,123 +48,56 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) { bool w = writableCallback; ScopedLock<Mutex> lock(stateLock); - assert(state == IDLE || state == DELAYED_IDLE); - - // If no callbacks set then do nothing (that is what we were asked to do!) - // TODO: Maybe this should be an assert instead - if (!r && !w) { - switch (state) { - case IDLE: - state = INACTIVE; - return; - case DELAYED_IDLE: - state = DELAYED_INACTIVE; - return; - default: - assert(state == IDLE || state == DELAYED_IDLE); - } - } - - Poller::Direction d = r ? - (w ? Poller::INOUT : Poller::INPUT) : - Poller::OUTPUT; + assert(state == IDLE); poller = poller0; - poller->addFd(*this, d); - - switch (state) { - case IDLE: - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; - return; - case DELAYED_IDLE: - state = r ? - (w ? DELAYED_RW : DELAYED_R) : - DELAYED_W; - return; - default: - assert(state == IDLE || state == DELAYED_IDLE); - } + poller->registerHandle(*this); + state = WAITING; + Poller::Direction dir = r ? + ( w ? Poller::INOUT : Poller::INPUT ) : + ( w ? Poller::OUTPUT : Poller::NONE ); + poller->monitorHandle(*this, dir); } void DispatchHandle::rewatch() { bool r = readableCallback; bool w = writableCallback; + if (!r && !w) { + return; + } + Poller::Direction dir = r ? + ( w ? Poller::INOUT : Poller::INPUT ) : + ( w ? Poller::OUTPUT : Poller::NONE ); ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_W: - case DELAYED_INACTIVE: - state = r ? - (w ? DELAYED_RW : DELAYED_R) : - DELAYED_W; - break; - case DELAYED_DELETE: - break; - case INACTIVE: - case ACTIVE_R: - case ACTIVE_W: { - assert(poller); - Poller::Direction d = r ? - (w ? Poller::INOUT : Poller::INPUT) : - Poller::OUTPUT; - poller->modFd(*this, d); - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; - break; - } - case DELAYED_RW: - case ACTIVE_RW: - // Don't need to do anything already waiting for readable/writable + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); } + assert(poller); + poller->monitorHandle(*this, dir); } void DispatchHandle::rewatchRead() { if (!readableCallback) { return; } - + ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_RW: - case DELAYED_DELETE: - break; - case DELAYED_W: - state = DELAYED_RW; - break; - case DELAYED_INACTIVE: - state = DELAYED_R; - break; - case ACTIVE_R: - case ACTIVE_RW: - // Nothing to do: already waiting for readable - break; - case INACTIVE: - assert(poller); - poller->modFd(*this, Poller::INPUT); - state = ACTIVE_R; - break; - case ACTIVE_W: - assert(poller); - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); } + assert(poller); + poller->monitorHandle(*this, Poller::INPUT); } void DispatchHandle::rewatchWrite() { @@ -165,35 +108,14 @@ void DispatchHandle::rewatchWrite() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_W: - case DELAYED_RW: - case DELAYED_DELETE: - break; - case DELAYED_R: - state = DELAYED_RW; - break; - case DELAYED_INACTIVE: - state = DELAYED_W; - break; - case INACTIVE: - assert(poller); - poller->modFd(*this, Poller::OUTPUT); - state = ACTIVE_W; - break; - case ACTIVE_R: - assert(poller); - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; - break; - case ACTIVE_W: - case ACTIVE_RW: - // Nothing to do: already waiting for writable + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); - } + } + assert(poller); + poller->monitorHandle(*this, Poller::OUTPUT); } void DispatchHandle::unwatchRead() { @@ -204,34 +126,14 @@ void DispatchHandle::unwatchRead() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - state = DELAYED_INACTIVE; - break; - case DELAYED_RW: - state = DELAYED_W; - break; - case DELAYED_W: - case DELAYED_INACTIVE: - case DELAYED_DELETE: - break; - case ACTIVE_R: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; - break; - case ACTIVE_RW: - assert(poller); - poller->modFd(*this, Poller::OUTPUT); - state = ACTIVE_W; - break; - case ACTIVE_W: - case INACTIVE: + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); } + assert(poller); + poller->unmonitorHandle(*this, Poller::INPUT); } void DispatchHandle::unwatchWrite() { @@ -242,95 +144,62 @@ void DispatchHandle::unwatchWrite() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_W: - state = DELAYED_INACTIVE; - break; - case DELAYED_RW: - state = DELAYED_R; - break; - case DELAYED_R: - case DELAYED_INACTIVE: - case DELAYED_DELETE: - break; - case ACTIVE_W: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; - break; - case ACTIVE_RW: - assert(poller); - poller->modFd(*this, Poller::INPUT); - state = ACTIVE_R; - break; - case ACTIVE_R: - case INACTIVE: + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); - } + } + assert(poller); + poller->unmonitorHandle(*this, Poller::OUTPUT); } void DispatchHandle::unwatch() { ScopedLock<Mutex> lock(stateLock); - switch (state) { + switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - state = DELAYED_INACTIVE; - break; - case DELAYED_DELETE: - break; + case STOPPING: + case DELETING: + return; default: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); - } + } + assert(poller); + poller->unmonitorHandle(*this, Poller::INOUT); } void DispatchHandle::stopWatch() { ScopedLock<Mutex> lock(stateLock); switch (state) { case IDLE: - case DELAYED_IDLE: - case DELAYED_DELETE: + assert(state != IDLE); + return; + case STOPPING: + assert(state != STOPPING); return; - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - state = DELAYED_IDLE; + case CALLING: + state = STOPPING; break; - default: + case WAITING: state = IDLE; break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); + case DELETING: + return; } assert(poller); - poller->delFd(*this); + poller->unregisterHandle(*this); poller.reset(); } -// If we are already in the IDLE state we can't do the callback as we might -// race to delete and callback at the same time -// TODO: might be able to fix this by adding a new state, but would make -// the state machine even more complex +// If we are in the IDLE/STOPPING state we can't do the callback as we've +// not/no longer got the fd registered in any poller void DispatchHandle::call(Callback iCb) { assert(iCb); ScopedLock<Mutex> lock(stateLock); switch (state) { case IDLE: - case ACTIVE_DELETE: - assert(false); + case STOPPING: + case DELETING: return; default: interruptedCallbacks.push(iCb); @@ -347,27 +216,24 @@ void DispatchHandle::doDelete() { ScopedLock<Mutex> lock(stateLock); // Ensure that we're no longer watching anything switch (state) { - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - assert(poller); - poller->delFd(*this); - poller.reset(); - // Fallthrough - case DELAYED_IDLE: - state = DELAYED_DELETE; - // Fallthrough - case DELAYED_DELETE: - case ACTIVE_DELETE: - return; case IDLE: + state = DELETING; break; - default: - state = ACTIVE_DELETE; + case STOPPING: + state = DELETING; + return; + case WAITING: + state = DELETING; assert(poller); (void) poller->interrupt(*this); - poller->delFd(*this); + poller->unregisterHandle(*this); + return; + case CALLING: + state = DELETING; + assert(poller); + poller->unregisterHandle(*this); + return; + case DELETING: return; } } @@ -378,43 +244,28 @@ void DispatchHandle::doDelete() { void DispatchHandle::processEvent(Poller::EventType type) { CallbackQueue callbacks; - // Note that we are now doing the callbacks + // Phase I { ScopedLock<Mutex> lock(stateLock); - // Set up to wait for same events next time unless reset switch(state) { - case ACTIVE_R: - state = DELAYED_R; - break; - case ACTIVE_W: - state = DELAYED_W; - break; - case ACTIVE_RW: - state = DELAYED_RW; + case IDLE: + // Can get here if a non connection thread stops watching + // whilst we were stuck in the above lock + return; + case WAITING: + state = CALLING; break; - case ACTIVE_DELETE: + case CALLING: + assert(state!=CALLING); + return; + case STOPPING: + assert(state!=STOPPING); + return; + case DELETING: // Need to make sure we clean up any pending callbacks in this case std::swap(callbacks, interruptedCallbacks); goto saybyebye; - // Can get here in idle if we are stopped in a different thread - // just after we return with this handle in Poller::wait - case IDLE: - // Can get here in INACTIVE if a non connection thread unwatches - // whilst we were stuck in the above lock - case INACTIVE: - // Can only get here in a DELAYED_* state in the rare case - // that we're already here for reading and we get activated for - // writing and we can write (it might be possible the other way - // round too). In this case we're already processing the handle - // in a different thread in this function so return right away - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - case DELAYED_IDLE: - case DELAYED_DELETE: - return; } std::swap(callbacks, interruptedCallbacks); @@ -434,10 +285,9 @@ void DispatchHandle::processEvent(Poller::EventType type) { readableCallback(*this); writableCallback(*this); break; - case Poller::DISCONNECTED: - { + case Poller::DISCONNECTED: { ScopedLock<Mutex> lock(stateLock); - state = DELAYED_INACTIVE; + poller->unmonitorHandle(*this, Poller::INOUT); } if (disconnectedCallback) { disconnectedCallback(*this); @@ -466,32 +316,20 @@ void DispatchHandle::processEvent(Poller::EventType type) { { ScopedLock<Mutex> lock(stateLock); - // If any of the callbacks re-enabled reading/writing then actually - // do it now switch (state) { - case DELAYED_R: - poller->modFd(*this, Poller::INPUT); - state = ACTIVE_R; - return; - case DELAYED_W: - poller->modFd(*this, Poller::OUTPUT); - state = ACTIVE_W; + case IDLE: + assert(state!=IDLE); return; - case DELAYED_RW: - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; + case STOPPING: + state = IDLE; return; - case DELAYED_INACTIVE: - state = INACTIVE; + case WAITING: + assert(state!=WAITING); return; - case DELAYED_IDLE: - state = IDLE; - return; - default: - // This should be impossible - assert(false); + case CALLING: + state = WAITING; return; - case DELAYED_DELETE: + case DELETING: break; } } diff --git a/cpp/src/qpid/sys/DispatchHandle.h b/cpp/src/qpid/sys/DispatchHandle.h index bc9f98775e..916d4c641a 100644 --- a/cpp/src/qpid/sys/DispatchHandle.h +++ b/cpp/src/qpid/sys/DispatchHandle.h @@ -64,10 +64,11 @@ private: Poller::shared_ptr poller; Mutex stateLock; enum { - IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW, - ACTIVE_DELETE, - DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW, - DELAYED_DELETE + IDLE, + STOPPING, + WAITING, + CALLING, + DELETING } state; public: @@ -83,14 +84,7 @@ public: *@param wCb Callback called when the handle is writable. *@param dCb Callback called when the handle is disconnected. */ - QPID_COMMON_EXTERN DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : - PollerHandle(h), - readableCallback(rCb), - writableCallback(wCb), - disconnectedCallback(dCb), - state(IDLE) - {} - + QPID_COMMON_EXTERN DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb); QPID_COMMON_EXTERN ~DispatchHandle(); /** Add this DispatchHandle to the poller to be watched. */ @@ -122,7 +116,6 @@ public: QPID_COMMON_EXTERN void call(Callback iCb); protected: - /** Override to get extra processing done when the DispatchHandle is deleted. */ QPID_COMMON_EXTERN void doDelete(); private: diff --git a/cpp/src/qpid/sys/Poller.h b/cpp/src/qpid/sys/Poller.h index 825ad8bfed..fd45848b79 100644 --- a/cpp/src/qpid/sys/Poller.h +++ b/cpp/src/qpid/sys/Poller.h @@ -94,10 +94,10 @@ public: // Poller run loop QPID_COMMON_EXTERN void run(); - QPID_COMMON_EXTERN void addFd(PollerHandle& handle, Direction dir); - QPID_COMMON_EXTERN void delFd(PollerHandle& handle); - QPID_COMMON_EXTERN void modFd(PollerHandle& handle, Direction dir); - QPID_COMMON_EXTERN void rearmFd(PollerHandle& handle); + QPID_COMMON_EXTERN void registerHandle(PollerHandle& handle); + QPID_COMMON_EXTERN void unregisterHandle(PollerHandle& handle); + QPID_COMMON_EXTERN void monitorHandle(PollerHandle& handle, Direction dir); + QPID_COMMON_EXTERN void unmonitorHandle(PollerHandle& handle, Direction dir); QPID_COMMON_EXTERN Event wait(Duration timeout = TIME_INFINITE); }; @@ -108,6 +108,7 @@ class IOHandle; class PollerHandlePrivate; class PollerHandle { friend class Poller; + friend class PollerPrivate; friend struct Poller::Event; PollerHandlePrivate* const impl; diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 42b5d8b1aa..d0623b86f4 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -46,6 +46,7 @@ DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerH class PollerHandlePrivate { friend class Poller; + friend class PollerPrivate; friend class PollerHandle; enum FDStat { @@ -55,6 +56,7 @@ class PollerHandlePrivate { HUNGUP, MONITORED_HUNGUP, INTERRUPTED, + INTERRUPTED_HUNGUP, DELETED }; @@ -76,7 +78,9 @@ class PollerHandlePrivate { } void setActive() { - stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED; + stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP) + ? MONITORED_HUNGUP + : MONITORED; } bool isInactive() const { @@ -96,7 +100,10 @@ class PollerHandlePrivate { } bool isHungup() const { - return stat == MONITORED_HUNGUP || stat == HUNGUP; + return + stat == MONITORED_HUNGUP || + stat == HUNGUP || + stat == INTERRUPTED_HUNGUP; } void setHungup() { @@ -105,11 +112,13 @@ class PollerHandlePrivate { } bool isInterrupted() const { - return stat == INTERRUPTED; + return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP; } void setInterrupted() { - stat = INTERRUPTED; + stat = (stat == MONITORED_HUNGUP || stat == HUNGUP) + ? INTERRUPTED_HUNGUP + : INTERRUPTED; } bool isDeleted() const { @@ -131,13 +140,13 @@ PollerHandle::~PollerHandle() { if (impl->isDeleted()) { return; } + impl->pollerHandle = 0; if (impl->isInterrupted()) { impl->setDeleted(); return; } - if (impl->isActive()) { - impl->setDeleted(); - } + assert(impl->isIdle()); + impl->setDeleted(); } PollerHandleDeletionManager.markForDeletion(impl); } @@ -256,8 +265,13 @@ class PollerPrivate { ~PollerPrivate() { // It's probably okay to ignore any errors here as there can't be data loss ::close(epollFd); + + // Need to put the interruptHandle in idle state to delete it + static_cast<PollerHandle&>(interruptHandle).impl->setIdle(); } - + + void resetMode(PollerHandlePrivate& handle); + void interrupt() { ::epoll_event epe; // Use EPOLLONESHOT so we only wake a single thread @@ -279,74 +293,122 @@ class PollerPrivate { PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD(); -void Poller::addFd(PollerHandle& handle, Direction dir) { +void Poller::registerHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - ::epoll_event epe; - int op; + assert(eh.isIdle()); - if (eh.isIdle()) { - op = EPOLL_CTL_ADD; - epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; - } else { - assert(eh.isActive()); - op = EPOLL_CTL_MOD; - epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir); - } + ::epoll_event epe; + epe.events = ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe)); - // Record monitoring state of this fd - eh.events = epe.events; eh.setActive(); } -void Poller::delFd(PollerHandle& handle) { +void Poller::unregisterHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0); // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { QPID_POSIX_CHECK(rc); } + eh.setIdle(); } -// modFd is equivalent to delFd followed by addFd -void Poller::modFd(PollerHandle& handle, Direction dir) { +void PollerPrivate::resetMode(PollerHandlePrivate& eh) { + PollerHandle* ph; + { + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isActive()); + + if (eh.isIdle() || eh.isDeleted()) { + return; + } + + if (eh.events==0) { + eh.setActive(); + return; + } + + if (!eh.isInterrupted()) { + ::epoll_event epe; + epe.events = eh.events | ::EPOLLONESHOT; + epe.data.u64 = 0; // Keep valgrind happy + epe.data.ptr = &eh; + + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + + eh.setActive(); + return; + } + ph = eh.pollerHandle; + } + + PollerHandlePrivate& ihp = *static_cast<PollerHandle&>(interruptHandle).impl; + ScopedLock<Mutex> l(ihp.lock); + interruptHandle.addHandle(*ph); + ihp.setActive(); + interrupt(); +} + +void Poller::monitorHandle(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); + ::__uint32_t oldEvents = eh.events; + eh.events |= PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + ::epoll_event epe; - epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT; + epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - - // Record monitoring state of this fd - eh.events = epe.events; - eh.setActive(); } -void Poller::rearmFd(PollerHandle& handle) { +void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - assert(eh.isInactive()); + assert(!eh.isIdle()); + ::__uint32_t oldEvents = eh.events; + eh.events &= ~PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + ::epoll_event epe; - epe.events = eh.events; + epe.events = eh.events | ::EPOLLONESHOT; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - - eh.setActive(); } void Poller::shutdown() { @@ -368,14 +430,25 @@ bool Poller::interrupt(PollerHandle& handle) { { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); - if (!eh.isActive()) { + if (eh.isIdle() || eh.isDeleted()) { return false; } + + if (eh.isInterrupted()) { + return true; + } + + // Stop monitoring handle for read or write ::epoll_event epe; epe.events = 0; epe.data.u64 = 0; // Keep valgrind happy QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); - eh.setInterrupted(); + + if (eh.isInactive()) { + eh.setInterrupted(); + return true; + } + eh.setInterrupted(); } PollerPrivate::InterruptHandle& ih = impl->interruptHandle; @@ -414,6 +487,7 @@ void Poller::run() { } Poller::Event Poller::wait(Duration timeout) { + static __thread PollerHandlePrivate* lastReturnedHandle = 0; epoll_event epe; int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC; AbsTime targetTimeout = @@ -421,6 +495,11 @@ Poller::Event Poller::wait(Duration timeout) { FAR_FUTURE : AbsTime(now(), timeout); + if (lastReturnedHandle) { + impl->resetMode(*lastReturnedHandle); + lastReturnedHandle = 0; + } + // Repeat until we weren't interrupted by signal do { PollerHandleDeletionManager.markAllUnusedInThisThread(); @@ -460,12 +539,19 @@ Poller::Event Poller::wait(Duration timeout) { } } if (wrappedHandle) { - ScopedLock<Mutex> l(wrappedHandle->impl->lock); - if (!wrappedHandle->impl->isDeleted()) { - wrappedHandle->impl->setInactive(); + PollerHandlePrivate& eh = *wrappedHandle->impl; + { + ScopedLock<Mutex> l(eh.lock); + if (!eh.isDeleted()) { + if (!eh.isIdle()) { + eh.setInactive(); + } + lastReturnedHandle = &eh; + assert(eh.pollerHandle == wrappedHandle); return Event(wrappedHandle, INTERRUPTED); } - PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl); + } + PollerHandleDeletionManager.markForDeletion(&eh); } continue; } @@ -482,6 +568,7 @@ Poller::Event Poller::wait(Duration timeout) { // the handle could have gone inactive since we left the epoll_wait if (eh.isActive()) { PollerHandle* handle = eh.pollerHandle; + assert(handle); // If the connection has been hungup we could still be readable // (just not writable), allow us to readable until we get here again @@ -493,15 +580,8 @@ Poller::Event Poller::wait(Duration timeout) { } else { eh.setInactive(); } + lastReturnedHandle = &eh; return Event(handle, PollerPrivate::epollToDirection(epe.events)); - } else if (eh.isDeleted()) { - // The handle has been deleted whilst still active and so must be removed - // from the poller - int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0); - // Ignore EBADF since it's quite likely that we could race with closing the fd - if (rc == -1 && errno != EBADF) { - QPID_POSIX_CHECK(rc); - } } } // We only get here if one of the following: diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index a914dc817a..ecfed6b42a 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -75,6 +75,7 @@ namespace sys { class AsynchAcceptorPrivate { public: AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptorPrivate(); void start(Poller::shared_ptr poller); private: @@ -109,6 +110,10 @@ AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s, s.setNonblocking(); } +AsynchAcceptorPrivate::~AsynchAcceptorPrivate() { + handle.stopWatch(); +} + void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) { handle.startWatch(poller); } diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 0a3c36452c..88be5d7634 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -146,7 +146,7 @@ AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) { } void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) { - poller->addFd(PollerHandle(socket), Poller::INPUT); + poller->monitorHandle(PollerHandle(socket), Poller::INPUT); restart (); } @@ -426,7 +426,7 @@ void AsynchIO::queueForDeletion() { void AsynchIO::start(Poller::shared_ptr poller0) { poller = poller0; - poller->addFd(PollerHandle(socket), Poller::INPUT); + poller->monitorHandle(PollerHandle(socket), Poller::INPUT); if (writeQueue.size() > 0) // Already have data queued for write notifyPendingWrite(); startReading(); @@ -471,7 +471,7 @@ void AsynchIO::notifyPendingWrite() { boost::bind(&AsynchIO::completion, this, _1)); IOHandle h(hp); PollerHandle ph(h); - poller->addFd(ph, Poller::OUTPUT); + poller->monitorHandle(ph, Poller::OUTPUT); } void AsynchIO::queueWriteClose() { @@ -559,7 +559,7 @@ void AsynchIO::requestCallback(RequestCallback callback) { callback); IOHandle h(hp); PollerHandle ph(h); - poller->addFd(ph, Poller::INPUT); + poller->monitorHandle(ph, Poller::INPUT); } /** diff --git a/cpp/src/qpid/sys/windows/IocpPoller.cpp b/cpp/src/qpid/sys/windows/IocpPoller.cpp index 3760c26c00..467ef8facb 100755 --- a/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -122,7 +122,7 @@ void Poller::run() { } while (true); } -void Poller::addFd(PollerHandle& handle, Direction dir) { +void Poller::monitorHandle(PollerHandle& handle, Direction dir) { HANDLE h = (HANDLE)(handle.impl->fd); if (h != INVALID_HANDLE_VALUE) { HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0); @@ -146,9 +146,9 @@ void Poller::addFd(PollerHandle& handle, Direction dir) { } // All no-ops... -void Poller::delFd(PollerHandle& handle) {} -void Poller::modFd(PollerHandle& handle, Direction dir) {} -void Poller::rearmFd(PollerHandle& handle) {} +void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {} +void Poller::registerHandle(PollerHandle& handle) {} +void Poller::unregisterHandle(PollerHandle& handle) {} Poller::Event Poller::wait(Duration timeout) { DWORD timeoutMs = 0; diff --git a/cpp/src/qpid/sys/windows/PollableCondition.cpp b/cpp/src/qpid/sys/windows/PollableCondition.cpp index ed0f7c3917..82913934d6 100644 --- a/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -76,15 +76,15 @@ void PollableConditionPrivate::poke() if (!armed) return; - // addFd will queue a completion for the IOCP; when it's handled, a + // monitorHandle will queue a completion for the IOCP; when it's handled, a // poller thread will call back to dispatch() below. PollerHandle ph(*this); - poller->addFd(ph, Poller::INPUT); + poller->monitorHandle(ph, Poller::INPUT); } void PollableConditionPrivate::dispatch(AsynchIoResult *result) { - delete result; // Poller::addFd() allocates this + delete result; // Poller::monitorHandle() allocates this cb(parent); } diff --git a/cpp/src/tests/PollerTest.cpp b/cpp/src/tests/PollerTest.cpp index 5b6b09ef65..19f641ca36 100644 --- a/cpp/src/tests/PollerTest.cpp +++ b/cpp/src/tests/PollerTest.cpp @@ -111,7 +111,8 @@ int main(int /*argc*/, char** /*argv*/) PollerHandle h0(f0); PollerHandle h1(f1); - poller->addFd(h0, Poller::INOUT); + poller->registerHandle(h0); + poller->monitorHandle(h0, Poller::INOUT); // h0 should be writable Poller::Event event = poller->wait(); @@ -123,19 +124,16 @@ int main(int /*argc*/, char** /*argv*/) cout << "Wrote(0): " << bytesWritten << " bytes\n"; // Wait for 500ms - h0 no longer writable - poller->rearmFd(h0); event = poller->wait(500000000); assert(event.handle == 0); // Test we can read it all now - poller->addFd(h1, Poller::INOUT); + poller->registerHandle(h1); + poller->monitorHandle(h1, Poller::INOUT); event = poller->wait(); assert(event.handle == &h1); assert(event.type == Poller::READ_WRITABLE); - // Can't interrupt, it's not active - assert(poller->interrupt(h1) == false); - bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); cout << "Read(1): " << bytesRead << " bytes\n"; @@ -145,39 +143,52 @@ int main(int /*argc*/, char** /*argv*/) event = poller->wait(); assert(event.handle == &h0); assert(event.type == Poller::INTERRUPTED); - assert(poller->interrupt(h0) == false); // Test multiple interrupts - poller->rearmFd(h0); - poller->rearmFd(h1); assert(poller->interrupt(h0) == true); assert(poller->interrupt(h1) == true); - // Make sure we can't interrupt them again - assert(poller->interrupt(h0) == false); - assert(poller->interrupt(h1) == false); + // Make sure we can interrupt them again + assert(poller->interrupt(h0) == true); + assert(poller->interrupt(h1) == true); - // Make sure that they both come out in the correct order + // Make sure that they both come out event = poller->wait(); - assert(event.handle == &h0); assert(event.type == Poller::INTERRUPTED); - assert(poller->interrupt(h0) == false); + assert(event.handle == &h0 || event.handle == &h1); + if (event.handle == &h0) { + event = poller->wait(); + assert(event.type == Poller::INTERRUPTED); + assert(event.handle == &h1); + } else { + event = poller->wait(); + assert(event.type == Poller::INTERRUPTED); + assert(event.handle == &h0); + } + + poller->unmonitorHandle(h1, Poller::INOUT); + event = poller->wait(); - assert(event.handle == &h1); - assert(event.type == Poller::INTERRUPTED); - assert(poller->interrupt(h1) == false); + assert(event.handle == &h0); + assert(event.type == Poller::WRITABLE); - // At this point h1 should have been disabled from the poller - // (as it was just returned) and h0 can write again - poller->rearmFd(h0); + // We didn't write anything so it should still be writable event = poller->wait(); assert(event.handle == &h0); assert(event.type == Poller::WRITABLE); - // Now both the handles should be disabled + poller->unmonitorHandle(h0, Poller::INOUT); + event = poller->wait(500000000); assert(event.handle == 0); + poller->unregisterHandle(h1); + poller->unregisterHandle(h0); + + // Make sure we can't interrupt them now + assert(poller->interrupt(h0) == false); + assert(poller->interrupt(h1) == false); + // Test shutdown poller->shutdown(); event = poller->wait(); @@ -188,9 +199,6 @@ int main(int /*argc*/, char** /*argv*/) assert(event.handle == 0); assert(event.type == Poller::SHUTDOWN); - poller->delFd(h1); - poller->delFd(h0); - return 0; } catch (exception& e) { cout << "Caught exception " << e.what() << "\n"; |