diff options
Diffstat (limited to 'cpp/src/qpid/sys/Dispatcher.cpp')
-rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 203 |
1 files changed, 184 insertions, 19 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index 4838e5e4cd..9a20e2c3bc 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -36,23 +36,20 @@ Dispatcher::~Dispatcher() { void Dispatcher::run() { do { Poller::Event event = poller->wait(); - // Poller::wait guarantees to return an event DispatchHandle* h = static_cast<DispatchHandle*>(event.handle); - switch (event.dir) { - case Poller::IN: - h->readableCallback(*h); - break; - case Poller::OUT: - h->writableCallback(*h); - break; - case Poller::INOUT: - h->readableCallback(*h); - h->writableCallback(*h); - break; - case Poller::SHUTDOWN: - goto dispatcher_shutdown; - default: - ; + + // If can read/write then dispatch appropriate callbacks + if (h) { + h->dispatchCallbacks(event.dir); + } else { + // Handle shutdown + switch (event.dir) { + case Poller::SHUTDOWN: + goto dispatcher_shutdown; + default: + // This should be impossible + assert(false); + } } } while (true); @@ -63,11 +60,16 @@ dispatcher_shutdown: void DispatchHandle::watch(Poller::shared_ptr poller0) { bool r = readableCallback; bool w = writableCallback; - + + ScopedLock<Mutex> lock(stateLock); + assert(state == IDLE); + // If no callbacks set then do nothing (that is what we were asked to do!) // TODO: Maybe this should be an assert instead - if (!r && !w) + if (!r && !w) { + state = INACTIVE; return; + } Poller::Direction d = r ? (w ? Poller::INOUT : Poller::IN) : @@ -75,16 +77,179 @@ void DispatchHandle::watch(Poller::shared_ptr poller0) { poller = poller0; poller->addFd(*this, d); + + state = r ? + (w ? ACTIVE_RW : ACTIVE_R) : + ACTIVE_W; } void DispatchHandle::rewatch() { assert(poller); - poller->rearmFd(*this); + bool r = readableCallback; + bool w = writableCallback; + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case DispatchHandle::IDLE: + assert(false); + break; + case DispatchHandle::DELAYED_R: + case DispatchHandle::DELAYED_W: + case DispatchHandle::CALLBACK: + state = r ? + (w ? DELAYED_RW : DELAYED_R) : + DELAYED_W; + break; + case DispatchHandle::INACTIVE: + case DispatchHandle::ACTIVE_R: + case DispatchHandle::ACTIVE_W: { + Poller::Direction d = r ? + (w ? Poller::INOUT : Poller::IN) : + Poller::OUT; + poller->modFd(*this, d); + state = r ? + (w ? ACTIVE_RW : ACTIVE_R) : + ACTIVE_W; + break; + } + case DispatchHandle::DELAYED_RW: + case DispatchHandle::ACTIVE_RW: + // Don't need to do anything already waiting for readable/writable + break; + } +} + +void DispatchHandle::rewatchRead() { + assert(poller); + if (!readableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case DispatchHandle::IDLE: + assert(false); + break; + case DispatchHandle::DELAYED_R: + case DispatchHandle::DELAYED_RW: + break; + case DispatchHandle::DELAYED_W: + state = DELAYED_RW; + break; + case DispatchHandle::CALLBACK: + state = DELAYED_R; + break; + case DispatchHandle::ACTIVE_R: + case DispatchHandle::ACTIVE_RW: + // Nothing to do: already wating for readable + break; + case DispatchHandle::INACTIVE: + poller->modFd(*this, Poller::IN); + state = ACTIVE_R; + break; + case DispatchHandle::ACTIVE_W: + poller->modFd(*this, Poller::INOUT); + state = ACTIVE_RW; + break; + } +} + +void DispatchHandle::rewatchWrite() { + assert(poller); + if (!writableCallback) { + return; + } + + ScopedLock<Mutex> lock(stateLock); + switch(state) { + case DispatchHandle::IDLE: + assert(false); + break; + case DispatchHandle::DELAYED_W: + case DispatchHandle::DELAYED_RW: + break; + case DispatchHandle::DELAYED_R: + state = DELAYED_RW; + break; + case DispatchHandle::CALLBACK: + state = DELAYED_W; + break; + case DispatchHandle::INACTIVE: + poller->modFd(*this, Poller::OUT); + state = ACTIVE_W; + break; + case DispatchHandle::ACTIVE_R: + poller->modFd(*this, Poller::INOUT); + state = ACTIVE_RW; + break; + case DispatchHandle::ACTIVE_W: + case DispatchHandle::ACTIVE_RW: + // Nothing to do: already waiting for writable + break; + } } void DispatchHandle::unwatch() { + assert(poller); + ScopedLock<Mutex> lock(stateLock); poller->delFd(*this); poller.reset(); + state = IDLE; +} + +void DispatchHandle::dispatchCallbacks(Poller::Direction dir) { + // Note that we are now doing the callbacks + { + ScopedLock<Mutex> lock(stateLock); + assert( + state == ACTIVE_R || + state == ACTIVE_W || + state == ACTIVE_RW); + + state = CALLBACK; + } + + // Do callbacks - whilst we are doing the callbacks we are prevented from processing + // the same handle until we re-enable it. To avoid rentering the callbacks for a single + // handle re-enabling in the callbacks is actually deferred until they are complete. + switch (dir) { + case Poller::IN: + readableCallback(*this); + break; + case Poller::OUT: + writableCallback(*this); + break; + case Poller::INOUT: + readableCallback(*this); + writableCallback(*this); + break; + default: + assert(false); + } + + // If any of the callbacks re-enabled reading/writing then actually + // do it now + ScopedLock<Mutex> lock(stateLock); + switch (state) { + case DELAYED_R: + poller->modFd(*this, Poller::IN); + state = ACTIVE_R; + break; + case DELAYED_W: + poller->modFd(*this, Poller::OUT); + state = ACTIVE_W; + break; + case DELAYED_RW: + poller->modFd(*this, Poller::INOUT); + state = ACTIVE_RW; + break; + case CALLBACK: + state = INACTIVE; + break; + default: + // This should be impossible + assert(false); + } } }} |