summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/Dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/Dispatcher.cpp')
-rw-r--r--cpp/src/qpid/sys/Dispatcher.cpp403
1 files changed, 2 insertions, 401 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp
index 02a62c8e66..5f52dcd990 100644
--- a/cpp/src/qpid/sys/Dispatcher.cpp
+++ b/cpp/src/qpid/sys/Dispatcher.cpp
@@ -19,9 +19,7 @@
*
*/
-#include "Dispatcher.h"
-
-#include <boost/cast.hpp>
+#include "qpid/sys/Dispatcher.h"
#include <assert.h>
@@ -36,404 +34,7 @@ Dispatcher::~Dispatcher() {
}
void Dispatcher::run() {
- do {
- Poller::Event event = poller->wait();
-
- // If can read/write then dispatch appropriate callbacks
- if (event.handle) {
- event.process();
- } else {
- // Handle shutdown
- switch (event.type) {
- case Poller::SHUTDOWN:
- goto dispatcher_shutdown;
- default:
- // This should be impossible
- assert(false);
- }
- }
- } while (true);
-
-dispatcher_shutdown:
- ;
-}
-
-DispatchHandle::~DispatchHandle() {
- stopWatch();
-}
-
-void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
- bool r = readableCallback;
- bool w = writableCallback;
-
- ScopedLock<Mutex> lock(stateLock);
- assert(state == IDLE);
-
- // If no callbacks set then do nothing (that is what we were asked to do!)
- // TODO: Maybe this should be an assert instead
- if (!r && !w) {
- state = INACTIVE;
- return;
- }
-
- Poller::Direction d = r ?
- (w ? Poller::INOUT : Poller::IN) :
- Poller::OUT;
-
- poller = poller0;
- poller->addFd(*this, d);
-
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
-}
-
-void DispatchHandle::rewatch() {
- bool r = readableCallback;
- bool w = writableCallback;
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_INACTIVE:
- state = r ?
- (w ? DELAYED_RW : DELAYED_R) :
- DELAYED_W;
- break;
- case DELAYED_DELETE:
- break;
- case INACTIVE:
- case ACTIVE_R:
- case ACTIVE_W: {
- assert(poller);
- Poller::Direction d = r ?
- (w ? Poller::INOUT : Poller::IN) :
- Poller::OUT;
- poller->modFd(*this, d);
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
- break;
- }
- case DELAYED_RW:
- case ACTIVE_RW:
- // Don't need to do anything already waiting for readable/writable
- break;
- }
-}
-
-void DispatchHandle::rewatchRead() {
- if (!readableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_RW:
- case DELAYED_DELETE:
- break;
- case DELAYED_W:
- state = DELAYED_RW;
- break;
- case DELAYED_INACTIVE:
- state = DELAYED_R;
- break;
- case ACTIVE_R:
- case ACTIVE_RW:
- // Nothing to do: already waiting for readable
- break;
- case INACTIVE:
- assert(poller);
- poller->modFd(*this, Poller::IN);
- state = ACTIVE_R;
- break;
- case ACTIVE_W:
- assert(poller);
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
- break;
- }
-}
-
-void DispatchHandle::rewatchWrite() {
- if (!writableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_DELETE:
- break;
- case DELAYED_R:
- state = DELAYED_RW;
- break;
- case DELAYED_INACTIVE:
- state = DELAYED_W;
- break;
- case INACTIVE:
- assert(poller);
- poller->modFd(*this, Poller::OUT);
- state = ACTIVE_W;
- break;
- case ACTIVE_R:
- assert(poller);
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
- break;
- case ACTIVE_W:
- case ACTIVE_RW:
- // Nothing to do: already waiting for writable
- break;
- }
-}
-
-void DispatchHandle::unwatchRead() {
- if (!readableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_RW:
- state = DELAYED_W;
- break;
- case DELAYED_W:
- case DELAYED_INACTIVE:
- case DELAYED_DELETE:
- break;
- case ACTIVE_R:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- case ACTIVE_RW:
- assert(poller);
- poller->modFd(*this, Poller::OUT);
- state = ACTIVE_W;
- break;
- case ACTIVE_W:
- case INACTIVE:
- break;
- }
-}
-
-void DispatchHandle::unwatchWrite() {
- if (!writableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_W:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_RW:
- state = DELAYED_R;
- break;
- case DELAYED_R:
- case DELAYED_INACTIVE:
- case DELAYED_DELETE:
- break;
- case ACTIVE_W:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- case ACTIVE_RW:
- assert(poller);
- poller->modFd(*this, Poller::IN);
- state = ACTIVE_R;
- break;
- case ACTIVE_R:
- case INACTIVE:
- break;
- }
-}
-
-void DispatchHandle::unwatch() {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_DELETE:
- break;
- default:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- }
-}
-
-void DispatchHandle::stopWatch() {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case IDLE:
- case DELAYED_IDLE:
- case DELAYED_DELETE:
- return;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- state = DELAYED_IDLE;
- break;
- default:
- state = IDLE;
- break;
- }
- assert(poller);
- poller->delFd(*this);
- poller.reset();
-}
-
-// The slightly strange switch structure
-// is to ensure that the lock is released before
-// we do the delete
-void DispatchHandle::doDelete() {
- // Ensure that we're no longer watching anything
- stopWatch();
-
- // If we're in the middle of a callback defer the delete
- {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case DELAYED_IDLE:
- case DELAYED_DELETE:
- state = DELAYED_DELETE;
- return;
- case IDLE:
- break;
- default:
- // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
- assert(false);
- }
- }
- // If we're not then do it right away
- delete this;
-}
-
-void DispatchHandle::processEvent(Poller::EventType type) {
- // Note that we are now doing the callbacks
- {
- ScopedLock<Mutex> lock(stateLock);
-
- // Set up to wait for same events next time unless reset
- switch(state) {
- case ACTIVE_R:
- state = DELAYED_R;
- break;
- case ACTIVE_W:
- state = DELAYED_W;
- break;
- case ACTIVE_RW:
- state = DELAYED_RW;
- break;
- // Can only get here in a DELAYED_* state in the rare case
- // that we're already here for reading and we get activated for
- // writing and we can write (it might be possible the other way
- // round too). In this case we're already processing the handle
- // in a different thread in this function so return right away
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- case DELAYED_IDLE:
- case DELAYED_DELETE:
- return;
- default:
- assert(false);
- }
- }
-
- // Do callbacks - whilst we are doing the callbacks we are prevented from processing
- // the same handle until we re-enable it. To avoid rentering the callbacks for a single
- // handle re-enabling in the callbacks is actually deferred until they are complete.
- switch (type) {
- case Poller::READABLE:
- readableCallback(*this);
- break;
- case Poller::WRITABLE:
- writableCallback(*this);
- break;
- case Poller::READ_WRITABLE:
- readableCallback(*this);
- writableCallback(*this);
- break;
- case Poller::DISCONNECTED:
- {
- ScopedLock<Mutex> lock(stateLock);
- state = DELAYED_INACTIVE;
- }
- if (disconnectedCallback) {
- disconnectedCallback(*this);
- }
- break;
- default:
- assert(false);
- }
-
- // If any of the callbacks re-enabled reading/writing then actually
- // do it now
- {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case DELAYED_R:
- poller->modFd(*this, Poller::IN);
- state = ACTIVE_R;
- return;
- case DELAYED_W:
- poller->modFd(*this, Poller::OUT);
- state = ACTIVE_W;
- return;
- case DELAYED_RW:
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
- return;
- case DELAYED_INACTIVE:
- state = INACTIVE;
- return;
- case DELAYED_IDLE:
- state = IDLE;
- return;
- default:
- // This should be impossible
- assert(false);
- return;
- case DELAYED_DELETE:
- break;
- }
- }
- delete this;
+ poller->run();
}
}}