summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/DispatchHandle.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/DispatchHandle.cpp')
-rw-r--r--cpp/src/qpid/sys/DispatchHandle.cpp394
1 files changed, 116 insertions, 278 deletions
diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp
index cd7dec7fa6..7bf305d275 100644
--- a/cpp/src/qpid/sys/DispatchHandle.cpp
+++ b/cpp/src/qpid/sys/DispatchHandle.cpp
@@ -30,6 +30,16 @@
namespace qpid {
namespace sys {
+DispatchHandle::DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+ PollerHandle(h),
+ readableCallback(rCb),
+ writableCallback(wCb),
+ disconnectedCallback(dCb),
+ state(IDLE)
+{
+}
+
+
DispatchHandle::~DispatchHandle() {
}
@@ -38,123 +48,56 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
bool w = writableCallback;
ScopedLock<Mutex> lock(stateLock);
- assert(state == IDLE || state == DELAYED_IDLE);
-
- // If no callbacks set then do nothing (that is what we were asked to do!)
- // TODO: Maybe this should be an assert instead
- if (!r && !w) {
- switch (state) {
- case IDLE:
- state = INACTIVE;
- return;
- case DELAYED_IDLE:
- state = DELAYED_INACTIVE;
- return;
- default:
- assert(state == IDLE || state == DELAYED_IDLE);
- }
- }
-
- Poller::Direction d = r ?
- (w ? Poller::INOUT : Poller::INPUT) :
- Poller::OUTPUT;
+ assert(state == IDLE);
poller = poller0;
- poller->addFd(*this, d);
-
- switch (state) {
- case IDLE:
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
- return;
- case DELAYED_IDLE:
- state = r ?
- (w ? DELAYED_RW : DELAYED_R) :
- DELAYED_W;
- return;
- default:
- assert(state == IDLE || state == DELAYED_IDLE);
- }
+ poller->registerHandle(*this);
+ state = WAITING;
+ Poller::Direction dir = r ?
+ ( w ? Poller::INOUT : Poller::INPUT ) :
+ ( w ? Poller::OUTPUT : Poller::NONE );
+ poller->monitorHandle(*this, dir);
}
void DispatchHandle::rewatch() {
bool r = readableCallback;
bool w = writableCallback;
+ if (!r && !w) {
+ return;
+ }
+ Poller::Direction dir = r ?
+ ( w ? Poller::INOUT : Poller::INPUT ) :
+ ( w ? Poller::OUTPUT : Poller::NONE );
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_INACTIVE:
- state = r ?
- (w ? DELAYED_RW : DELAYED_R) :
- DELAYED_W;
- break;
- case DELAYED_DELETE:
- break;
- case INACTIVE:
- case ACTIVE_R:
- case ACTIVE_W: {
- assert(poller);
- Poller::Direction d = r ?
- (w ? Poller::INOUT : Poller::INPUT) :
- Poller::OUTPUT;
- poller->modFd(*this, d);
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
- break;
- }
- case DELAYED_RW:
- case ACTIVE_RW:
- // Don't need to do anything already waiting for readable/writable
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
}
+ assert(poller);
+ poller->monitorHandle(*this, dir);
}
void DispatchHandle::rewatchRead() {
if (!readableCallback) {
return;
}
-
+
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_RW:
- case DELAYED_DELETE:
- break;
- case DELAYED_W:
- state = DELAYED_RW;
- break;
- case DELAYED_INACTIVE:
- state = DELAYED_R;
- break;
- case ACTIVE_R:
- case ACTIVE_RW:
- // Nothing to do: already waiting for readable
- break;
- case INACTIVE:
- assert(poller);
- poller->modFd(*this, Poller::INPUT);
- state = ACTIVE_R;
- break;
- case ACTIVE_W:
- assert(poller);
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
}
+ assert(poller);
+ poller->monitorHandle(*this, Poller::INPUT);
}
void DispatchHandle::rewatchWrite() {
@@ -165,35 +108,14 @@ void DispatchHandle::rewatchWrite() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_DELETE:
- break;
- case DELAYED_R:
- state = DELAYED_RW;
- break;
- case DELAYED_INACTIVE:
- state = DELAYED_W;
- break;
- case INACTIVE:
- assert(poller);
- poller->modFd(*this, Poller::OUTPUT);
- state = ACTIVE_W;
- break;
- case ACTIVE_R:
- assert(poller);
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
- break;
- case ACTIVE_W:
- case ACTIVE_RW:
- // Nothing to do: already waiting for writable
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
- }
+ }
+ assert(poller);
+ poller->monitorHandle(*this, Poller::OUTPUT);
}
void DispatchHandle::unwatchRead() {
@@ -204,34 +126,14 @@ void DispatchHandle::unwatchRead() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_RW:
- state = DELAYED_W;
- break;
- case DELAYED_W:
- case DELAYED_INACTIVE:
- case DELAYED_DELETE:
- break;
- case ACTIVE_R:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- case ACTIVE_RW:
- assert(poller);
- poller->modFd(*this, Poller::OUTPUT);
- state = ACTIVE_W;
- break;
- case ACTIVE_W:
- case INACTIVE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
}
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::INPUT);
}
void DispatchHandle::unwatchWrite() {
@@ -242,95 +144,62 @@ void DispatchHandle::unwatchWrite() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_W:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_RW:
- state = DELAYED_R;
- break;
- case DELAYED_R:
- case DELAYED_INACTIVE:
- case DELAYED_DELETE:
- break;
- case ACTIVE_W:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- case ACTIVE_RW:
- assert(poller);
- poller->modFd(*this, Poller::INPUT);
- state = ACTIVE_R;
- break;
- case ACTIVE_R:
- case INACTIVE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
- }
+ }
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::OUTPUT);
}
void DispatchHandle::unwatch() {
ScopedLock<Mutex> lock(stateLock);
- switch (state) {
+ switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_DELETE:
- break;
+ case STOPPING:
+ case DELETING:
+ return;
default:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
- }
+ }
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::INOUT);
}
void DispatchHandle::stopWatch() {
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
- case DELAYED_IDLE:
- case DELAYED_DELETE:
+ assert(state != IDLE);
+ return;
+ case STOPPING:
+ assert(state != STOPPING);
return;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- state = DELAYED_IDLE;
+ case CALLING:
+ state = STOPPING;
break;
- default:
+ case WAITING:
state = IDLE;
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
+ case DELETING:
+ return;
}
assert(poller);
- poller->delFd(*this);
+ poller->unregisterHandle(*this);
poller.reset();
}
-// If we are already in the IDLE state we can't do the callback as we might
-// race to delete and callback at the same time
-// TODO: might be able to fix this by adding a new state, but would make
-// the state machine even more complex
+// If we are in the IDLE/STOPPING state we can't do the callback as we've
+// not/no longer got the fd registered in any poller
void DispatchHandle::call(Callback iCb) {
assert(iCb);
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
- case ACTIVE_DELETE:
- assert(false);
+ case STOPPING:
+ case DELETING:
return;
default:
interruptedCallbacks.push(iCb);
@@ -347,27 +216,24 @@ void DispatchHandle::doDelete() {
ScopedLock<Mutex> lock(stateLock);
// Ensure that we're no longer watching anything
switch (state) {
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- assert(poller);
- poller->delFd(*this);
- poller.reset();
- // Fallthrough
- case DELAYED_IDLE:
- state = DELAYED_DELETE;
- // Fallthrough
- case DELAYED_DELETE:
- case ACTIVE_DELETE:
- return;
case IDLE:
+ state = DELETING;
break;
- default:
- state = ACTIVE_DELETE;
+ case STOPPING:
+ state = DELETING;
+ return;
+ case WAITING:
+ state = DELETING;
assert(poller);
(void) poller->interrupt(*this);
- poller->delFd(*this);
+ poller->unregisterHandle(*this);
+ return;
+ case CALLING:
+ state = DELETING;
+ assert(poller);
+ poller->unregisterHandle(*this);
+ return;
+ case DELETING:
return;
}
}
@@ -378,43 +244,28 @@ void DispatchHandle::doDelete() {
void DispatchHandle::processEvent(Poller::EventType type) {
CallbackQueue callbacks;
- // Note that we are now doing the callbacks
+ // Phase I
{
ScopedLock<Mutex> lock(stateLock);
- // Set up to wait for same events next time unless reset
switch(state) {
- case ACTIVE_R:
- state = DELAYED_R;
- break;
- case ACTIVE_W:
- state = DELAYED_W;
- break;
- case ACTIVE_RW:
- state = DELAYED_RW;
+ case IDLE:
+ // Can get here if a non connection thread stops watching
+ // whilst we were stuck in the above lock
+ return;
+ case WAITING:
+ state = CALLING;
break;
- case ACTIVE_DELETE:
+ case CALLING:
+ assert(state!=CALLING);
+ return;
+ case STOPPING:
+ assert(state!=STOPPING);
+ return;
+ case DELETING:
// Need to make sure we clean up any pending callbacks in this case
std::swap(callbacks, interruptedCallbacks);
goto saybyebye;
- // Can get here in idle if we are stopped in a different thread
- // just after we return with this handle in Poller::wait
- case IDLE:
- // Can get here in INACTIVE if a non connection thread unwatches
- // whilst we were stuck in the above lock
- case INACTIVE:
- // Can only get here in a DELAYED_* state in the rare case
- // that we're already here for reading and we get activated for
- // writing and we can write (it might be possible the other way
- // round too). In this case we're already processing the handle
- // in a different thread in this function so return right away
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- case DELAYED_IDLE:
- case DELAYED_DELETE:
- return;
}
std::swap(callbacks, interruptedCallbacks);
@@ -434,10 +285,9 @@ void DispatchHandle::processEvent(Poller::EventType type) {
readableCallback(*this);
writableCallback(*this);
break;
- case Poller::DISCONNECTED:
- {
+ case Poller::DISCONNECTED: {
ScopedLock<Mutex> lock(stateLock);
- state = DELAYED_INACTIVE;
+ poller->unmonitorHandle(*this, Poller::INOUT);
}
if (disconnectedCallback) {
disconnectedCallback(*this);
@@ -466,32 +316,20 @@ void DispatchHandle::processEvent(Poller::EventType type) {
{
ScopedLock<Mutex> lock(stateLock);
- // If any of the callbacks re-enabled reading/writing then actually
- // do it now
switch (state) {
- case DELAYED_R:
- poller->modFd(*this, Poller::INPUT);
- state = ACTIVE_R;
- return;
- case DELAYED_W:
- poller->modFd(*this, Poller::OUTPUT);
- state = ACTIVE_W;
+ case IDLE:
+ assert(state!=IDLE);
return;
- case DELAYED_RW:
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
+ case STOPPING:
+ state = IDLE;
return;
- case DELAYED_INACTIVE:
- state = INACTIVE;
+ case WAITING:
+ assert(state!=WAITING);
return;
- case DELAYED_IDLE:
- state = IDLE;
- return;
- default:
- // This should be impossible
- assert(false);
+ case CALLING:
+ state = WAITING;
return;
- case DELAYED_DELETE:
+ case DELETING:
break;
}
}