diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/DispatchHandle.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/DispatchHandle.cpp | 129 |
1 files changed, 100 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/sys/DispatchHandle.cpp b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp index cbdee7eda6..cd7dec7fa6 100644 --- a/qpid/cpp/src/qpid/sys/DispatchHandle.cpp +++ b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp @@ -21,6 +21,8 @@ #include "DispatchHandle.h" +#include <algorithm> + #include <boost/cast.hpp> #include <assert.h> @@ -29,7 +31,6 @@ namespace qpid { namespace sys { DispatchHandle::~DispatchHandle() { - stopWatch(); } void DispatchHandle::startWatch(Poller::shared_ptr poller0) { @@ -37,13 +38,21 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) { bool w = writableCallback; ScopedLock<Mutex> lock(stateLock); - assert(state == IDLE); + assert(state == IDLE || state == DELAYED_IDLE); // If no callbacks set then do nothing (that is what we were asked to do!) // TODO: Maybe this should be an assert instead if (!r && !w) { - state = INACTIVE; - return; + switch (state) { + case IDLE: + state = INACTIVE; + return; + case DELAYED_IDLE: + state = DELAYED_INACTIVE; + return; + default: + assert(state == IDLE || state == DELAYED_IDLE); + } } Poller::Direction d = r ? @@ -53,9 +62,20 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) { poller = poller0; poller->addFd(*this, d); - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; + switch (state) { + case IDLE: + state = r ? + (w ? ACTIVE_RW : ACTIVE_R) : + ACTIVE_W; + return; + case DELAYED_IDLE: + state = r ? + (w ? DELAYED_RW : DELAYED_R) : + DELAYED_W; + return; + default: + assert(state == IDLE || state == DELAYED_IDLE); + } } void DispatchHandle::rewatch() { @@ -93,6 +113,8 @@ void DispatchHandle::rewatch() { case ACTIVE_RW: // Don't need to do anything already waiting for readable/writable break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -130,6 +152,8 @@ void DispatchHandle::rewatchRead() { poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -167,6 +191,8 @@ void DispatchHandle::rewatchWrite() { case ACTIVE_RW: // Nothing to do: already waiting for writable break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -203,6 +229,8 @@ void DispatchHandle::unwatchRead() { case ACTIVE_W: case INACTIVE: break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -239,6 +267,8 @@ void DispatchHandle::unwatchWrite() { case ACTIVE_R: case INACTIVE: break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -261,6 +291,8 @@ void DispatchHandle::unwatch() { poller->modFd(*this, Poller::NONE); state = INACTIVE; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } } @@ -280,47 +312,72 @@ void DispatchHandle::stopWatch() { default: state = IDLE; break; + case ACTIVE_DELETE: + assert(state != ACTIVE_DELETE); } assert(poller); poller->delFd(*this); poller.reset(); } +// If we are already in the IDLE state we can't do the callback as we might +// race to delete and callback at the same time +// TODO: might be able to fix this by adding a new state, but would make +// the state machine even more complex void DispatchHandle::call(Callback iCb) { assert(iCb); ScopedLock<Mutex> lock(stateLock); - interruptedCallbacks.push(iCb); - - (void) poller->interrupt(*this); + switch (state) { + case IDLE: + case ACTIVE_DELETE: + assert(false); + return; + default: + interruptedCallbacks.push(iCb); + assert(poller); + (void) poller->interrupt(*this); + } } // The slightly strange switch structure // is to ensure that the lock is released before // we do the delete void DispatchHandle::doDelete() { - // Ensure that we're no longer watching anything - stopWatch(); - - // If we're in the middle of a callback defer the delete { ScopedLock<Mutex> lock(stateLock); + // Ensure that we're no longer watching anything switch (state) { + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + assert(poller); + poller->delFd(*this); + poller.reset(); + // Fallthrough case DELAYED_IDLE: - case DELAYED_DELETE: state = DELAYED_DELETE; + // Fallthrough + case DELAYED_DELETE: + case ACTIVE_DELETE: return; case IDLE: break; default: - // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states - assert(false); + state = ACTIVE_DELETE; + assert(poller); + (void) poller->interrupt(*this); + poller->delFd(*this); + return; } } - // If we're not then do it right away + // If we're IDLE we can do this right away delete this; } void DispatchHandle::processEvent(Poller::EventType type) { + CallbackQueue callbacks; + // Note that we are now doing the callbacks { ScopedLock<Mutex> lock(stateLock); @@ -336,6 +393,16 @@ void DispatchHandle::processEvent(Poller::EventType type) { case ACTIVE_RW: state = DELAYED_RW; break; + case ACTIVE_DELETE: + // Need to make sure we clean up any pending callbacks in this case + std::swap(callbacks, interruptedCallbacks); + goto saybyebye; + // Can get here in idle if we are stopped in a different thread + // just after we return with this handle in Poller::wait + case IDLE: + // Can get here in INACTIVE if a non connection thread unwatches + // whilst we were stuck in the above lock + case INACTIVE: // Can only get here in a DELAYED_* state in the rare case // that we're already here for reading and we get activated for // writing and we can write (it might be possible the other way @@ -348,9 +415,9 @@ void DispatchHandle::processEvent(Poller::EventType type) { case DELAYED_IDLE: case DELAYED_DELETE: return; - default: - assert(false); } + + std::swap(callbacks, interruptedCallbacks); } // Do callbacks - whilst we are doing the callbacks we are prevented from processing @@ -378,8 +445,8 @@ void DispatchHandle::processEvent(Poller::EventType type) { break; case Poller::INTERRUPTED: { - ScopedLock<Mutex> lock(stateLock); - assert(interruptedCallbacks.size() > 0); + // We could only be interrupted if we also had a callback to do + assert(callbacks.size() > 0); // We'll actually do the interrupt below } break; @@ -387,16 +454,18 @@ void DispatchHandle::processEvent(Poller::EventType type) { assert(false); } - { - ScopedLock<Mutex> lock(stateLock); - // If we've got a pending interrupt do it now - while (interruptedCallbacks.size() > 0) { - Callback cb = interruptedCallbacks.front(); + // If we have any callbacks do them now - + // (because we use a copy from before the previous callbacks we won't + // do anything yet that was just added) + while (callbacks.size() > 0) { + Callback cb = callbacks.front(); assert(cb); cb(*this); - interruptedCallbacks.pop(); + callbacks.pop(); } + { + ScopedLock<Mutex> lock(stateLock); // If any of the callbacks re-enabled reading/writing then actually // do it now switch (state) { @@ -425,7 +494,9 @@ void DispatchHandle::processEvent(Poller::EventType type) { case DELAYED_DELETE: break; } - } + } + +saybyebye: delete this; } |