summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/Dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/Dispatcher.cpp')
-rw-r--r--cpp/src/qpid/sys/Dispatcher.cpp256
1 files changed, 203 insertions, 53 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp
index 9a20e2c3bc..3a1da13bd0 100644
--- a/cpp/src/qpid/sys/Dispatcher.cpp
+++ b/cpp/src/qpid/sys/Dispatcher.cpp
@@ -40,10 +40,10 @@ void Dispatcher::run() {
// If can read/write then dispatch appropriate callbacks
if (h) {
- h->dispatchCallbacks(event.dir);
+ h->dispatchCallbacks(event.type);
} else {
// Handle shutdown
- switch (event.dir) {
+ switch (event.type) {
case Poller::SHUTDOWN:
goto dispatcher_shutdown;
default:
@@ -57,7 +57,11 @@ dispatcher_shutdown:
;
}
-void DispatchHandle::watch(Poller::shared_ptr poller0) {
+DispatchHandle::~DispatchHandle() {
+ stopWatch();
+}
+
+void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
bool r = readableCallback;
bool w = writableCallback;
@@ -84,25 +88,26 @@ void DispatchHandle::watch(Poller::shared_ptr poller0) {
}
void DispatchHandle::rewatch() {
- assert(poller);
bool r = readableCallback;
bool w = writableCallback;
ScopedLock<Mutex> lock(stateLock);
switch(state) {
- case DispatchHandle::IDLE:
- assert(false);
+ case IDLE:
break;
- case DispatchHandle::DELAYED_R:
- case DispatchHandle::DELAYED_W:
- case DispatchHandle::CALLBACK:
+ case DELAYED_R:
+ case DELAYED_W:
+ case CALLBACK:
state = r ?
(w ? DELAYED_RW : DELAYED_R) :
DELAYED_W;
break;
- case DispatchHandle::INACTIVE:
- case DispatchHandle::ACTIVE_R:
- case DispatchHandle::ACTIVE_W: {
+ case DELAYED_DELETE:
+ break;
+ case INACTIVE:
+ case ACTIVE_R:
+ case ACTIVE_W: {
+ assert(poller);
Poller::Direction d = r ?
(w ? Poller::INOUT : Poller::IN) :
Poller::OUT;
@@ -112,42 +117,43 @@ void DispatchHandle::rewatch() {
ACTIVE_W;
break;
}
- case DispatchHandle::DELAYED_RW:
- case DispatchHandle::ACTIVE_RW:
+ case DELAYED_RW:
+ case ACTIVE_RW:
// Don't need to do anything already waiting for readable/writable
break;
}
}
void DispatchHandle::rewatchRead() {
- assert(poller);
if (!readableCallback) {
return;
}
ScopedLock<Mutex> lock(stateLock);
switch(state) {
- case DispatchHandle::IDLE:
- assert(false);
+ case IDLE:
break;
- case DispatchHandle::DELAYED_R:
- case DispatchHandle::DELAYED_RW:
+ case DELAYED_R:
+ case DELAYED_RW:
+ case DELAYED_DELETE:
break;
- case DispatchHandle::DELAYED_W:
+ case DELAYED_W:
state = DELAYED_RW;
break;
- case DispatchHandle::CALLBACK:
+ case CALLBACK:
state = DELAYED_R;
break;
- case DispatchHandle::ACTIVE_R:
- case DispatchHandle::ACTIVE_RW:
- // Nothing to do: already wating for readable
+ case ACTIVE_R:
+ case ACTIVE_RW:
+ // Nothing to do: already waiting for readable
break;
- case DispatchHandle::INACTIVE:
+ case INACTIVE:
+ assert(poller);
poller->modFd(*this, Poller::IN);
state = ACTIVE_R;
break;
- case DispatchHandle::ACTIVE_W:
+ case ACTIVE_W:
+ assert(poller);
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
break;
@@ -155,101 +161,245 @@ void DispatchHandle::rewatchRead() {
}
void DispatchHandle::rewatchWrite() {
- assert(poller);
if (!writableCallback) {
return;
}
ScopedLock<Mutex> lock(stateLock);
switch(state) {
- case DispatchHandle::IDLE:
- assert(false);
+ case IDLE:
break;
- case DispatchHandle::DELAYED_W:
- case DispatchHandle::DELAYED_RW:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_DELETE:
break;
- case DispatchHandle::DELAYED_R:
+ case DELAYED_R:
state = DELAYED_RW;
break;
- case DispatchHandle::CALLBACK:
+ case CALLBACK:
state = DELAYED_W;
break;
- case DispatchHandle::INACTIVE:
+ case INACTIVE:
+ assert(poller);
poller->modFd(*this, Poller::OUT);
state = ACTIVE_W;
break;
- case DispatchHandle::ACTIVE_R:
+ case ACTIVE_R:
+ assert(poller);
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
break;
- case DispatchHandle::ACTIVE_W:
- case DispatchHandle::ACTIVE_RW:
+ case ACTIVE_W:
+ case ACTIVE_RW:
// Nothing to do: already waiting for writable
break;
}
}
+void DispatchHandle::unwatchRead() {
+ if (!readableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ break;
+ case DELAYED_R:
+ state = CALLBACK;
+ break;
+ case DELAYED_RW:
+ state = DELAYED_W;
+ break;
+ case DELAYED_W:
+ case CALLBACK:
+ case DELAYED_DELETE:
+ break;
+ case ACTIVE_R:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ case ACTIVE_RW:
+ assert(poller);
+ poller->modFd(*this, Poller::OUT);
+ state = ACTIVE_W;
+ break;
+ case ACTIVE_W:
+ case INACTIVE:
+ break;
+ }
+}
+
+void DispatchHandle::unwatchWrite() {
+ if (!writableCallback) {
+ return;
+ }
+
+ ScopedLock<Mutex> lock(stateLock);
+ switch(state) {
+ case IDLE:
+ break;
+ case DELAYED_W:
+ state = CALLBACK;
+ break;
+ case DELAYED_RW:
+ state = DELAYED_R;
+ break;
+ case DELAYED_R:
+ case CALLBACK:
+ case DELAYED_DELETE:
+ break;
+ case ACTIVE_W:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ case ACTIVE_RW:
+ assert(poller);
+ poller->modFd(*this, Poller::IN);
+ state = ACTIVE_R;
+ break;
+ case ACTIVE_R:
+ case INACTIVE:
+ break;
+ }
+}
+
void DispatchHandle::unwatch() {
- assert(poller);
ScopedLock<Mutex> lock(stateLock);
+ switch (state) {
+ case IDLE:
+ break;
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case CALLBACK:
+ state = CALLBACK;
+ break;
+ case DELAYED_DELETE:
+ break;
+ default:
+ assert(poller);
+ poller->modFd(*this, Poller::NONE);
+ state = INACTIVE;
+ break;
+ }
+}
+
+void DispatchHandle::stopWatch() {
+ ScopedLock<Mutex> lock(stateLock);
+ if ( state == IDLE) {
+ return;
+ }
+ assert(poller);
poller->delFd(*this);
poller.reset();
state = IDLE;
}
-void DispatchHandle::dispatchCallbacks(Poller::Direction dir) {
- // Note that we are now doing the callbacks
+// The slightly strange switch structure
+// is to ensure that the lock is released before
+// we do the delete
+void DispatchHandle::doDelete() {
+ // If we're in the middle of a callback defer the delete
{
ScopedLock<Mutex> lock(stateLock);
- assert(
- state == ACTIVE_R ||
- state == ACTIVE_W ||
- state == ACTIVE_RW);
+ switch (state) {
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case CALLBACK:
+ case DELAYED_DELETE:
+ state = DELAYED_DELETE;
+ return;
+ default:
+ break;
+ }
+ }
+ // If we're not then do it right away
+ delete this;
+}
- state = CALLBACK;
+void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
+ // Note that we are now doing the callbacks
+ {
+ ScopedLock<Mutex> lock(stateLock);
+
+ // Set up to wait for same events next time unless reset
+ switch(state) {
+ case ACTIVE_R:
+ state = DELAYED_R;
+ break;
+ case ACTIVE_W:
+ state = DELAYED_W;
+ break;
+ case ACTIVE_RW:
+ state = DELAYED_RW;
+ break;
+ default:
+ assert(false);
+ }
}
// Do callbacks - whilst we are doing the callbacks we are prevented from processing
// the same handle until we re-enable it. To avoid rentering the callbacks for a single
// handle re-enabling in the callbacks is actually deferred until they are complete.
- switch (dir) {
- case Poller::IN:
+ switch (type) {
+ case Poller::READABLE:
readableCallback(*this);
break;
- case Poller::OUT:
+ case Poller::WRITABLE:
writableCallback(*this);
break;
- case Poller::INOUT:
+ case Poller::READ_WRITABLE:
readableCallback(*this);
writableCallback(*this);
break;
+ case Poller::DISCONNECTED:
+ {
+ ScopedLock<Mutex> lock(stateLock);
+ state = CALLBACK;
+ }
+ if (disconnectedCallback) {
+ disconnectedCallback(*this);
+ }
+ break;
default:
assert(false);
}
// If any of the callbacks re-enabled reading/writing then actually
// do it now
+ {
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case DELAYED_R:
poller->modFd(*this, Poller::IN);
state = ACTIVE_R;
- break;
+ return;
case DELAYED_W:
poller->modFd(*this, Poller::OUT);
state = ACTIVE_W;
- break;
+ return;
case DELAYED_RW:
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
- break;
+ return;
case CALLBACK:
state = INACTIVE;
- break;
+ return;
+ case IDLE:
+ return;
default:
// This should be impossible
assert(false);
+ return;
+ case DELAYED_DELETE:
+ break;
+ }
}
+ delete this;
}
}}