summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/Dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/Dispatcher.cpp')
-rw-r--r--cpp/src/qpid/sys/Dispatcher.cpp203
1 files changed, 184 insertions, 19 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp
index 4838e5e4cd..9a20e2c3bc 100644
--- a/cpp/src/qpid/sys/Dispatcher.cpp
+++ b/cpp/src/qpid/sys/Dispatcher.cpp
@@ -36,23 +36,20 @@ Dispatcher::~Dispatcher() {
void Dispatcher::run() {
do {
Poller::Event event = poller->wait();
- // Poller::wait guarantees to return an event
DispatchHandle* h = static_cast<DispatchHandle*>(event.handle);
- switch (event.dir) {
- case Poller::IN:
- h->readableCallback(*h);
- break;
- case Poller::OUT:
- h->writableCallback(*h);
- break;
- case Poller::INOUT:
- h->readableCallback(*h);
- h->writableCallback(*h);
- break;
- case Poller::SHUTDOWN:
- goto dispatcher_shutdown;
- default:
- ;
+
+ // If can read/write then dispatch appropriate callbacks
+ if (h) {
+ h->dispatchCallbacks(event.dir);
+ } else {
+ // Handle shutdown
+ switch (event.dir) {
+ case Poller::SHUTDOWN:
+ goto dispatcher_shutdown;
+ default:
+ // This should be impossible
+ assert(false);
+ }
}
} while (true);
@@ -63,11 +60,16 @@ dispatcher_shutdown:
void DispatchHandle::watch(Poller::shared_ptr poller0) {
bool r = readableCallback;
bool w = writableCallback;
-
+
+ ScopedLock<Mutex> lock(stateLock);
+ assert(state == IDLE);
+
// If no callbacks set then do nothing (that is what we were asked to do!)
// TODO: Maybe this should be an assert instead
- if (!r && !w)
+ if (!r && !w) {
+ state = INACTIVE;
return;
+ }
Poller::Direction d = r ?
(w ? Poller::INOUT : Poller::IN) :
@@ -75,16 +77,179 @@ void DispatchHandle::watch(Poller::shared_ptr poller0) {
poller = poller0;
poller->addFd(*this, d);
+
+ state = r ?
+ (w ? ACTIVE_RW : ACTIVE_R) :
+ ACTIVE_W;
}
void DispatchHandle::rewatch() {
assert(poller);
- poller->rearmFd(*this);
+ bool r = readableCallback;
+ bool w = writableCallback;
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case DispatchHandle::IDLE:
+ assert(false);
+ break;
+ case DispatchHandle::DELAYED_R:
+ case DispatchHandle::DELAYED_W:
+ case DispatchHandle::CALLBACK:
+ state = r ?
+ (w ? DELAYED_RW : DELAYED_R) :
+ DELAYED_W;
+ break;
+ case DispatchHandle::INACTIVE:
+ case DispatchHandle::ACTIVE_R:
+ case DispatchHandle::ACTIVE_W: {
+ Poller::Direction d = r ?
+ (w ? Poller::INOUT : Poller::IN) :
+ Poller::OUT;
+ poller->modFd(*this, d);
+ state = r ?
+ (w ? ACTIVE_RW : ACTIVE_R) :
+ ACTIVE_W;
+ break;
+ }
+ case DispatchHandle::DELAYED_RW:
+ case DispatchHandle::ACTIVE_RW:
+ // Don't need to do anything already waiting for readable/writable
+ break;
+ }
+}
+
+void DispatchHandle::rewatchRead() {
+ assert(poller);
+ if (!readableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case DispatchHandle::IDLE:
+ assert(false);
+ break;
+ case DispatchHandle::DELAYED_R:
+ case DispatchHandle::DELAYED_RW:
+ break;
+ case DispatchHandle::DELAYED_W:
+ state = DELAYED_RW;
+ break;
+ case DispatchHandle::CALLBACK:
+ state = DELAYED_R;
+ break;
+ case DispatchHandle::ACTIVE_R:
+ case DispatchHandle::ACTIVE_RW:
+ // Nothing to do: already wating for readable
+ break;
+ case DispatchHandle::INACTIVE:
+ poller->modFd(*this, Poller::IN);
+ state = ACTIVE_R;
+ break;
+ case DispatchHandle::ACTIVE_W:
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ break;
+ }
+}
+
+void DispatchHandle::rewatchWrite() {
+ assert(poller);
+ if (!writableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case DispatchHandle::IDLE:
+ assert(false);
+ break;
+ case DispatchHandle::DELAYED_W:
+ case DispatchHandle::DELAYED_RW:
+ break;
+ case DispatchHandle::DELAYED_R:
+ state = DELAYED_RW;
+ break;
+ case DispatchHandle::CALLBACK:
+ state = DELAYED_W;
+ break;
+ case DispatchHandle::INACTIVE:
+ poller->modFd(*this, Poller::OUT);
+ state = ACTIVE_W;
+ break;
+ case DispatchHandle::ACTIVE_R:
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ break;
+ case DispatchHandle::ACTIVE_W:
+ case DispatchHandle::ACTIVE_RW:
+ // Nothing to do: already waiting for writable
+ break;
+ }
}
void DispatchHandle::unwatch() {
+ assert(poller);
+ ScopedLock<Mutex> lock(stateLock);
poller->delFd(*this);
poller.reset();
+ state = IDLE;
+}
+
+void DispatchHandle::dispatchCallbacks(Poller::Direction dir) {
+ // Note that we are now doing the callbacks
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ assert(
+ state == ACTIVE_R ||
+ state == ACTIVE_W ||
+ state == ACTIVE_RW);
+
+ state = CALLBACK;
+ }
+
+ // Do callbacks - whilst we are doing the callbacks we are prevented from processing
+ // the same handle until we re-enable it. To avoid rentering the callbacks for a single
+ // handle re-enabling in the callbacks is actually deferred until they are complete.
+ switch (dir) {
+ case Poller::IN:
+ readableCallback(*this);
+ break;
+ case Poller::OUT:
+ writableCallback(*this);
+ break;
+ case Poller::INOUT:
+ readableCallback(*this);
+ writableCallback(*this);
+ break;
+ default:
+ assert(false);
+ }
+
+ // If any of the callbacks re-enabled reading/writing then actually
+ // do it now
+ ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case DELAYED_R:
+ poller->modFd(*this, Poller::IN);
+ state = ACTIVE_R;
+ break;
+ case DELAYED_W:
+ poller->modFd(*this, Poller::OUT);
+ state = ACTIVE_W;
+ break;
+ case DELAYED_RW:
+ poller->modFd(*this, Poller::INOUT);
+ state = ACTIVE_RW;
+ break;
+ case CALLBACK:
+ state = INACTIVE;
+ break;
+ default:
+ // This should be impossible
+ assert(false);
+ }
}
}}