diff options
Diffstat (limited to 'cpp/src/qpid/sys/Dispatcher.cpp')
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 256 |
1 files changed, 203 insertions, 53 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index 9a20e2c3bc..3a1da13bd0 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -40,10 +40,10 @@ void Dispatcher::run() { // If can read/write then dispatch appropriate callbacks if (h) { - h->dispatchCallbacks(event.dir); + h->dispatchCallbacks(event.type); } else { // Handle shutdown - switch (event.dir) { + switch (event.type) { case Poller::SHUTDOWN: goto dispatcher_shutdown; default: @@ -57,7 +57,11 @@ dispatcher_shutdown: ; } -void DispatchHandle::watch(Poller::shared_ptr poller0) { +DispatchHandle::~DispatchHandle() { + stopWatch(); +} + +void DispatchHandle::startWatch(Poller::shared_ptr poller0) { bool r = readableCallback; bool w = writableCallback; @@ -84,25 +88,26 @@ void DispatchHandle::watch(Poller::shared_ptr poller0) { } void DispatchHandle::rewatch() { - assert(poller); bool r = readableCallback; bool w = writableCallback; ScopedLock<Mutex> lock(stateLock); switch(state) { - case DispatchHandle::IDLE: - assert(false); + case IDLE: break; - case DispatchHandle::DELAYED_R: - case DispatchHandle::DELAYED_W: - case DispatchHandle::CALLBACK: + case DELAYED_R: + case DELAYED_W: + case CALLBACK: state = r ? (w ? DELAYED_RW : DELAYED_R) : DELAYED_W; break; - case DispatchHandle::INACTIVE: - case DispatchHandle::ACTIVE_R: - case DispatchHandle::ACTIVE_W: { + case DELAYED_DELETE: + break; + case INACTIVE: + case ACTIVE_R: + case ACTIVE_W: { + assert(poller); Poller::Direction d = r ? (w ? Poller::INOUT : Poller::IN) : Poller::OUT; @@ -112,42 +117,43 @@ void DispatchHandle::rewatch() { ACTIVE_W; break; } - case DispatchHandle::DELAYED_RW: - case DispatchHandle::ACTIVE_RW: + case DELAYED_RW: + case ACTIVE_RW: // Don't need to do anything already waiting for readable/writable break; } } void DispatchHandle::rewatchRead() { - assert(poller); if (!readableCallback) { return; } ScopedLock<Mutex> lock(stateLock); switch(state) { - case DispatchHandle::IDLE: - assert(false); + case IDLE: break; - case DispatchHandle::DELAYED_R: - case DispatchHandle::DELAYED_RW: + case DELAYED_R: + case DELAYED_RW: + case DELAYED_DELETE: break; - case DispatchHandle::DELAYED_W: + case DELAYED_W: state = DELAYED_RW; break; - case DispatchHandle::CALLBACK: + case CALLBACK: state = DELAYED_R; break; - case DispatchHandle::ACTIVE_R: - case DispatchHandle::ACTIVE_RW: - // Nothing to do: already wating for readable + case ACTIVE_R: + case ACTIVE_RW: + // Nothing to do: already waiting for readable break; - case DispatchHandle::INACTIVE: + case INACTIVE: + assert(poller); poller->modFd(*this, Poller::IN); state = ACTIVE_R; break; - case DispatchHandle::ACTIVE_W: + case ACTIVE_W: + assert(poller); poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; break; @@ -155,101 +161,245 @@ void DispatchHandle::rewatchRead() { } void DispatchHandle::rewatchWrite() { - assert(poller); if (!writableCallback) { return; } ScopedLock<Mutex> lock(stateLock); switch(state) { - case DispatchHandle::IDLE: - assert(false); + case IDLE: break; - case DispatchHandle::DELAYED_W: - case DispatchHandle::DELAYED_RW: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_DELETE: break; - case DispatchHandle::DELAYED_R: + case DELAYED_R: state = DELAYED_RW; break; - case DispatchHandle::CALLBACK: + case CALLBACK: state = DELAYED_W; break; - case DispatchHandle::INACTIVE: + case INACTIVE: + assert(poller); poller->modFd(*this, Poller::OUT); state = ACTIVE_W; break; - case DispatchHandle::ACTIVE_R: + case ACTIVE_R: + assert(poller); poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; break; - case DispatchHandle::ACTIVE_W: - case DispatchHandle::ACTIVE_RW: + case ACTIVE_W: + case ACTIVE_RW: // Nothing to do: already waiting for writable break; } } +void DispatchHandle::unwatchRead() { + if (!readableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + break; + case DELAYED_R: + state = CALLBACK; + break; + case DELAYED_RW: + state = DELAYED_W; + break; + case DELAYED_W: + case CALLBACK: + case DELAYED_DELETE: + break; + case ACTIVE_R: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + case ACTIVE_RW: + assert(poller); + poller->modFd(*this, Poller::OUT); + state = ACTIVE_W; + break; + case ACTIVE_W: + case INACTIVE: + break; + } +} + +void DispatchHandle::unwatchWrite() { + if (!writableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case IDLE: + break; + case DELAYED_W: + state = CALLBACK; + break; + case DELAYED_RW: + state = DELAYED_R; + break; + case DELAYED_R: + case CALLBACK: + case DELAYED_DELETE: + break; + case ACTIVE_W: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + case ACTIVE_RW: + assert(poller); + poller->modFd(*this, Poller::IN); + state = ACTIVE_R; + break; + case ACTIVE_R: + case INACTIVE: + break; + } +} + void DispatchHandle::unwatch() { - assert(poller); ScopedLock<Mutex> lock(stateLock); + switch (state) { + case IDLE: + break; + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case CALLBACK: + state = CALLBACK; + break; + case DELAYED_DELETE: + break; + default: + assert(poller); + poller->modFd(*this, Poller::NONE); + state = INACTIVE; + break; + } +} + +void DispatchHandle::stopWatch() { + ScopedLock<Mutex> lock(stateLock); + if ( state == IDLE) { + return; + } + assert(poller); poller->delFd(*this); poller.reset(); state = IDLE; } -void DispatchHandle::dispatchCallbacks(Poller::Direction dir) { - // Note that we are now doing the callbacks +// The slightly strange switch structure +// is to ensure that the lock is released before +// we do the delete +void DispatchHandle::doDelete() { + // If we're in the middle of a callback defer the delete { ScopedLock<Mutex> lock(stateLock); - assert( - state == ACTIVE_R || - state == ACTIVE_W || - state == ACTIVE_RW); + switch (state) { + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case CALLBACK: + case DELAYED_DELETE: + state = DELAYED_DELETE; + return; + default: + break; + } + } + // If we're not then do it right away + delete this; +} - state = CALLBACK; +void DispatchHandle::dispatchCallbacks(Poller::EventType type) { + // Note that we are now doing the callbacks + { + ScopedLock<Mutex> lock(stateLock); + + // Set up to wait for same events next time unless reset + switch(state) { + case ACTIVE_R: + state = DELAYED_R; + break; + case ACTIVE_W: + state = DELAYED_W; + break; + case ACTIVE_RW: + state = DELAYED_RW; + break; + default: + assert(false); + } } // Do callbacks - whilst we are doing the callbacks we are prevented from processing // the same handle until we re-enable it. To avoid rentering the callbacks for a single // handle re-enabling in the callbacks is actually deferred until they are complete. - switch (dir) { - case Poller::IN: + switch (type) { + case Poller::READABLE: readableCallback(*this); break; - case Poller::OUT: + case Poller::WRITABLE: writableCallback(*this); break; - case Poller::INOUT: + case Poller::READ_WRITABLE: readableCallback(*this); writableCallback(*this); break; + case Poller::DISCONNECTED: + { + ScopedLock<Mutex> lock(stateLock); + state = CALLBACK; + } + if (disconnectedCallback) { + disconnectedCallback(*this); + } + break; default: assert(false); } // If any of the callbacks re-enabled reading/writing then actually // do it now + { ScopedLock<Mutex> lock(stateLock); switch (state) { case DELAYED_R: poller->modFd(*this, Poller::IN); state = ACTIVE_R; - break; + return; case DELAYED_W: poller->modFd(*this, Poller::OUT); state = ACTIVE_W; - break; + return; case DELAYED_RW: poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; - break; + return; case CALLBACK: state = INACTIVE; - break; + return; + case IDLE: + return; default: // This should be impossible assert(false); + return; + case DELAYED_DELETE: + break; + } } + delete this; } }} |