diff options
Diffstat (limited to 'cpp/src/qpid/sys/DispatchHandle.cpp')
-rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.cpp | 394 |
1 files changed, 116 insertions, 278 deletions
diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp index cd7dec7fa6..7bf305d275 100644 --- a/cpp/src/qpid/sys/DispatchHandle.cpp +++ b/cpp/src/qpid/sys/DispatchHandle.cpp @@ -30,6 +30,16 @@ namespace qpid { namespace sys { +DispatchHandle::DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : + PollerHandle(h), + readableCallback(rCb), + writableCallback(wCb), + disconnectedCallback(dCb), + state(IDLE) +{ +} + + DispatchHandle::~DispatchHandle() { } @@ -38,123 +48,56 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) { bool w = writableCallback; ScopedLock<Mutex> lock(stateLock); - assert(state == IDLE || state == DELAYED_IDLE); - - // If no callbacks set then do nothing (that is what we were asked to do!) - // TODO: Maybe this should be an assert instead - if (!r && !w) { - switch (state) { - case IDLE: - state = INACTIVE; - return; - case DELAYED_IDLE: - state = DELAYED_INACTIVE; - return; - default: - assert(state == IDLE || state == DELAYED_IDLE); - } - } - - Poller::Direction d = r ? - (w ? Poller::INOUT : Poller::INPUT) : - Poller::OUTPUT; + assert(state == IDLE); poller = poller0; - poller->addFd(*this, d); - - switch (state) { - case IDLE: - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; - return; - case DELAYED_IDLE: - state = r ? - (w ? DELAYED_RW : DELAYED_R) : - DELAYED_W; - return; - default: - assert(state == IDLE || state == DELAYED_IDLE); - } + poller->registerHandle(*this); + state = WAITING; + Poller::Direction dir = r ? + ( w ? Poller::INOUT : Poller::INPUT ) : + ( w ? Poller::OUTPUT : Poller::NONE ); + poller->monitorHandle(*this, dir); } void DispatchHandle::rewatch() { bool r = readableCallback; bool w = writableCallback; + if (!r && !w) { + return; + } + Poller::Direction dir = r ? + ( w ? Poller::INOUT : Poller::INPUT ) : + ( w ? Poller::OUTPUT : Poller::NONE ); ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_W: - case DELAYED_INACTIVE: - state = r ? - (w ? DELAYED_RW : DELAYED_R) : - DELAYED_W; - break; - case DELAYED_DELETE: - break; - case INACTIVE: - case ACTIVE_R: - case ACTIVE_W: { - assert(poller); - Poller::Direction d = r ? - (w ? Poller::INOUT : Poller::INPUT) : - Poller::OUTPUT; - poller->modFd(*this, d); - state = r ? - (w ? ACTIVE_RW : ACTIVE_R) : - ACTIVE_W; - break; - } - case DELAYED_RW: - case ACTIVE_RW: - // Don't need to do anything already waiting for readable/writable + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); } + assert(poller); + poller->monitorHandle(*this, dir); } void DispatchHandle::rewatchRead() { if (!readableCallback) { return; } - + ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_RW: - case DELAYED_DELETE: - break; - case DELAYED_W: - state = DELAYED_RW; - break; - case DELAYED_INACTIVE: - state = DELAYED_R; - break; - case ACTIVE_R: - case ACTIVE_RW: - // Nothing to do: already waiting for readable - break; - case INACTIVE: - assert(poller); - poller->modFd(*this, Poller::INPUT); - state = ACTIVE_R; - break; - case ACTIVE_W: - assert(poller); - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); } + assert(poller); + poller->monitorHandle(*this, Poller::INPUT); } void DispatchHandle::rewatchWrite() { @@ -165,35 +108,14 @@ void DispatchHandle::rewatchWrite() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_W: - case DELAYED_RW: - case DELAYED_DELETE: - break; - case DELAYED_R: - state = DELAYED_RW; - break; - case DELAYED_INACTIVE: - state = DELAYED_W; - break; - case INACTIVE: - assert(poller); - poller->modFd(*this, Poller::OUTPUT); - state = ACTIVE_W; - break; - case ACTIVE_R: - assert(poller); - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; - break; - case ACTIVE_W: - case ACTIVE_RW: - // Nothing to do: already waiting for writable + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); - } + } + assert(poller); + poller->monitorHandle(*this, Poller::OUTPUT); } void DispatchHandle::unwatchRead() { @@ -204,34 +126,14 @@ void DispatchHandle::unwatchRead() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - state = DELAYED_INACTIVE; - break; - case DELAYED_RW: - state = DELAYED_W; - break; - case DELAYED_W: - case DELAYED_INACTIVE: - case DELAYED_DELETE: - break; - case ACTIVE_R: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; - break; - case ACTIVE_RW: - assert(poller); - poller->modFd(*this, Poller::OUTPUT); - state = ACTIVE_W; - break; - case ACTIVE_W: - case INACTIVE: + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); } + assert(poller); + poller->unmonitorHandle(*this, Poller::INPUT); } void DispatchHandle::unwatchWrite() { @@ -242,95 +144,62 @@ void DispatchHandle::unwatchWrite() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_W: - state = DELAYED_INACTIVE; - break; - case DELAYED_RW: - state = DELAYED_R; - break; - case DELAYED_R: - case DELAYED_INACTIVE: - case DELAYED_DELETE: - break; - case ACTIVE_W: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; - break; - case ACTIVE_RW: - assert(poller); - poller->modFd(*this, Poller::INPUT); - state = ACTIVE_R; - break; - case ACTIVE_R: - case INACTIVE: + case STOPPING: + case DELETING: + return; + default: break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); - } + } + assert(poller); + poller->unmonitorHandle(*this, Poller::OUTPUT); } void DispatchHandle::unwatch() { ScopedLock<Mutex> lock(stateLock); - switch (state) { + switch(state) { case IDLE: - case DELAYED_IDLE: - break; - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - state = DELAYED_INACTIVE; - break; - case DELAYED_DELETE: - break; + case STOPPING: + case DELETING: + return; default: - assert(poller); - poller->modFd(*this, Poller::NONE); - state = INACTIVE; break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); - } + } + assert(poller); + poller->unmonitorHandle(*this, Poller::INOUT); } void DispatchHandle::stopWatch() { ScopedLock<Mutex> lock(stateLock); switch (state) { case IDLE: - case DELAYED_IDLE: - case DELAYED_DELETE: + assert(state != IDLE); + return; + case STOPPING: + assert(state != STOPPING); return; - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - state = DELAYED_IDLE; + case CALLING: + state = STOPPING; break; - default: + case WAITING: state = IDLE; break; - case ACTIVE_DELETE: - assert(state != ACTIVE_DELETE); + case DELETING: + return; } assert(poller); - poller->delFd(*this); + poller->unregisterHandle(*this); poller.reset(); } -// If we are already in the IDLE state we can't do the callback as we might -// race to delete and callback at the same time -// TODO: might be able to fix this by adding a new state, but would make -// the state machine even more complex +// If we are in the IDLE/STOPPING state we can't do the callback as we've +// not/no longer got the fd registered in any poller void DispatchHandle::call(Callback iCb) { assert(iCb); ScopedLock<Mutex> lock(stateLock); switch (state) { case IDLE: - case ACTIVE_DELETE: - assert(false); + case STOPPING: + case DELETING: return; default: interruptedCallbacks.push(iCb); @@ -347,27 +216,24 @@ void DispatchHandle::doDelete() { ScopedLock<Mutex> lock(stateLock); // Ensure that we're no longer watching anything switch (state) { - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - assert(poller); - poller->delFd(*this); - poller.reset(); - // Fallthrough - case DELAYED_IDLE: - state = DELAYED_DELETE; - // Fallthrough - case DELAYED_DELETE: - case ACTIVE_DELETE: - return; case IDLE: + state = DELETING; break; - default: - state = ACTIVE_DELETE; + case STOPPING: + state = DELETING; + return; + case WAITING: + state = DELETING; assert(poller); (void) poller->interrupt(*this); - poller->delFd(*this); + poller->unregisterHandle(*this); + return; + case CALLING: + state = DELETING; + assert(poller); + poller->unregisterHandle(*this); + return; + case DELETING: return; } } @@ -378,43 +244,28 @@ void DispatchHandle::doDelete() { void DispatchHandle::processEvent(Poller::EventType type) { CallbackQueue callbacks; - // Note that we are now doing the callbacks + // Phase I { ScopedLock<Mutex> lock(stateLock); - // Set up to wait for same events next time unless reset switch(state) { - case ACTIVE_R: - state = DELAYED_R; - break; - case ACTIVE_W: - state = DELAYED_W; - break; - case ACTIVE_RW: - state = DELAYED_RW; + case IDLE: + // Can get here if a non connection thread stops watching + // whilst we were stuck in the above lock + return; + case WAITING: + state = CALLING; break; - case ACTIVE_DELETE: + case CALLING: + assert(state!=CALLING); + return; + case STOPPING: + assert(state!=STOPPING); + return; + case DELETING: // Need to make sure we clean up any pending callbacks in this case std::swap(callbacks, interruptedCallbacks); goto saybyebye; - // Can get here in idle if we are stopped in a different thread - // just after we return with this handle in Poller::wait - case IDLE: - // Can get here in INACTIVE if a non connection thread unwatches - // whilst we were stuck in the above lock - case INACTIVE: - // Can only get here in a DELAYED_* state in the rare case - // that we're already here for reading and we get activated for - // writing and we can write (it might be possible the other way - // round too). In this case we're already processing the handle - // in a different thread in this function so return right away - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case DELAYED_INACTIVE: - case DELAYED_IDLE: - case DELAYED_DELETE: - return; } std::swap(callbacks, interruptedCallbacks); @@ -434,10 +285,9 @@ void DispatchHandle::processEvent(Poller::EventType type) { readableCallback(*this); writableCallback(*this); break; - case Poller::DISCONNECTED: - { + case Poller::DISCONNECTED: { ScopedLock<Mutex> lock(stateLock); - state = DELAYED_INACTIVE; + poller->unmonitorHandle(*this, Poller::INOUT); } if (disconnectedCallback) { disconnectedCallback(*this); @@ -466,32 +316,20 @@ void DispatchHandle::processEvent(Poller::EventType type) { { ScopedLock<Mutex> lock(stateLock); - // If any of the callbacks re-enabled reading/writing then actually - // do it now switch (state) { - case DELAYED_R: - poller->modFd(*this, Poller::INPUT); - state = ACTIVE_R; - return; - case DELAYED_W: - poller->modFd(*this, Poller::OUTPUT); - state = ACTIVE_W; + case IDLE: + assert(state!=IDLE); return; - case DELAYED_RW: - poller->modFd(*this, Poller::INOUT); - state = ACTIVE_RW; + case STOPPING: + state = IDLE; return; - case DELAYED_INACTIVE: - state = INACTIVE; + case WAITING: + assert(state!=WAITING); return; - case DELAYED_IDLE: - state = IDLE; - return; - default: - // This should be impossible - assert(false); + case CALLING: + state = WAITING; return; - case DELAYED_DELETE: + case DELETING: break; } } |