summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/DispatchHandle.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/DispatchHandle.cpp')
-rw-r--r--cpp/src/qpid/sys/DispatchHandle.cpp129
1 files changed, 100 insertions, 29 deletions
diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp
index cbdee7eda6..cd7dec7fa6 100644
--- a/cpp/src/qpid/sys/DispatchHandle.cpp
+++ b/cpp/src/qpid/sys/DispatchHandle.cpp
@@ -21,6 +21,8 @@
#include "DispatchHandle.h"
+#include <algorithm>
+
#include <boost/cast.hpp>
#include <assert.h>
@@ -29,7 +31,6 @@ namespace qpid {
namespace sys {
DispatchHandle::~DispatchHandle() {
- stopWatch();
}
void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
@@ -37,13 +38,21 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
bool w = writableCallback;
ScopedLock<Mutex> lock(stateLock);
- assert(state == IDLE);
+ assert(state == IDLE || state == DELAYED_IDLE);
// If no callbacks set then do nothing (that is what we were asked to do!)
// TODO: Maybe this should be an assert instead
if (!r && !w) {
- state = INACTIVE;
- return;
+ switch (state) {
+ case IDLE:
+ state = INACTIVE;
+ return;
+ case DELAYED_IDLE:
+ state = DELAYED_INACTIVE;
+ return;
+ default:
+ assert(state == IDLE || state == DELAYED_IDLE);
+ }
}
Poller::Direction d = r ?
@@ -53,9 +62,20 @@ void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
poller = poller0;
poller->addFd(*this, d);
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
+ switch (state) {
+ case IDLE:
+ state = r ?
+ (w ? ACTIVE_RW : ACTIVE_R) :
+ ACTIVE_W;
+ return;
+ case DELAYED_IDLE:
+ state = r ?
+ (w ? DELAYED_RW : DELAYED_R) :
+ DELAYED_W;
+ return;
+ default:
+ assert(state == IDLE || state == DELAYED_IDLE);
+ }
}
void DispatchHandle::rewatch() {
@@ -93,6 +113,8 @@ void DispatchHandle::rewatch() {
case ACTIVE_RW:
// Don't need to do anything already waiting for readable/writable
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -130,6 +152,8 @@ void DispatchHandle::rewatchRead() {
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -167,6 +191,8 @@ void DispatchHandle::rewatchWrite() {
case ACTIVE_RW:
// Nothing to do: already waiting for writable
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -203,6 +229,8 @@ void DispatchHandle::unwatchRead() {
case ACTIVE_W:
case INACTIVE:
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -239,6 +267,8 @@ void DispatchHandle::unwatchWrite() {
case ACTIVE_R:
case INACTIVE:
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -261,6 +291,8 @@ void DispatchHandle::unwatch() {
poller->modFd(*this, Poller::NONE);
state = INACTIVE;
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -280,47 +312,72 @@ void DispatchHandle::stopWatch() {
default:
state = IDLE;
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
assert(poller);
poller->delFd(*this);
poller.reset();
}
+// If we are already in the IDLE state we can't do the callback as we might
+// race to delete and callback at the same time
+// TODO: might be able to fix this by adding a new state, but would make
+// the state machine even more complex
void DispatchHandle::call(Callback iCb) {
assert(iCb);
ScopedLock<Mutex> lock(stateLock);
- interruptedCallbacks.push(iCb);
-
- (void) poller->interrupt(*this);
+ switch (state) {
+ case IDLE:
+ case ACTIVE_DELETE:
+ assert(false);
+ return;
+ default:
+ interruptedCallbacks.push(iCb);
+ assert(poller);
+ (void) poller->interrupt(*this);
+ }
}
// The slightly strange switch structure
// is to ensure that the lock is released before
// we do the delete
void DispatchHandle::doDelete() {
- // Ensure that we're no longer watching anything
- stopWatch();
-
- // If we're in the middle of a callback defer the delete
{
ScopedLock<Mutex> lock(stateLock);
+ // Ensure that we're no longer watching anything
switch (state) {
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ assert(poller);
+ poller->delFd(*this);
+ poller.reset();
+ // Fallthrough
case DELAYED_IDLE:
- case DELAYED_DELETE:
state = DELAYED_DELETE;
+ // Fallthrough
+ case DELAYED_DELETE:
+ case ACTIVE_DELETE:
return;
case IDLE:
break;
default:
- // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
- assert(false);
+ state = ACTIVE_DELETE;
+ assert(poller);
+ (void) poller->interrupt(*this);
+ poller->delFd(*this);
+ return;
}
}
- // If we're not then do it right away
+ // If we're IDLE we can do this right away
delete this;
}
void DispatchHandle::processEvent(Poller::EventType type) {
+ CallbackQueue callbacks;
+
// Note that we are now doing the callbacks
{
ScopedLock<Mutex> lock(stateLock);
@@ -336,6 +393,16 @@ void DispatchHandle::processEvent(Poller::EventType type) {
case ACTIVE_RW:
state = DELAYED_RW;
break;
+ case ACTIVE_DELETE:
+ // Need to make sure we clean up any pending callbacks in this case
+ std::swap(callbacks, interruptedCallbacks);
+ goto saybyebye;
+ // Can get here in idle if we are stopped in a different thread
+ // just after we return with this handle in Poller::wait
+ case IDLE:
+ // Can get here in INACTIVE if a non connection thread unwatches
+ // whilst we were stuck in the above lock
+ case INACTIVE:
// Can only get here in a DELAYED_* state in the rare case
// that we're already here for reading and we get activated for
// writing and we can write (it might be possible the other way
@@ -348,9 +415,9 @@ void DispatchHandle::processEvent(Poller::EventType type) {
case DELAYED_IDLE:
case DELAYED_DELETE:
return;
- default:
- assert(false);
}
+
+ std::swap(callbacks, interruptedCallbacks);
}
// Do callbacks - whilst we are doing the callbacks we are prevented from processing
@@ -378,8 +445,8 @@ void DispatchHandle::processEvent(Poller::EventType type) {
break;
case Poller::INTERRUPTED:
{
- ScopedLock<Mutex> lock(stateLock);
- assert(interruptedCallbacks.size() > 0);
+ // We could only be interrupted if we also had a callback to do
+ assert(callbacks.size() > 0);
// We'll actually do the interrupt below
}
break;
@@ -387,16 +454,18 @@ void DispatchHandle::processEvent(Poller::EventType type) {
assert(false);
}
- {
- ScopedLock<Mutex> lock(stateLock);
- // If we've got a pending interrupt do it now
- while (interruptedCallbacks.size() > 0) {
- Callback cb = interruptedCallbacks.front();
+ // If we have any callbacks do them now -
+ // (because we use a copy from before the previous callbacks we won't
+ // do anything yet that was just added)
+ while (callbacks.size() > 0) {
+ Callback cb = callbacks.front();
assert(cb);
cb(*this);
- interruptedCallbacks.pop();
+ callbacks.pop();
}
+ {
+ ScopedLock<Mutex> lock(stateLock);
// If any of the callbacks re-enabled reading/writing then actually
// do it now
switch (state) {
@@ -425,7 +494,9 @@ void DispatchHandle::processEvent(Poller::EventType type) {
case DELAYED_DELETE:
break;
}
- }
+ }
+
+saybyebye:
delete this;
}