summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-10-12 16:05:15 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-10-12 16:05:15 +0000
commitef2d5950861b806b2064ae74dabfb4b0c3c1c864 (patch)
treefec19513f8e3fc30498862a4917bfbd0815e77a5
parentfe9fca8fadffbb8658a001884b33c39fcd29f2c4 (diff)
downloadqpid-python-ef2d5950861b806b2064ae74dabfb4b0c3c1c864.tar.gz
Rewrite Rdma::AsynchIO to use deferred code rather than a state machine:
This eliminates a lot of difficult to understand error prone state machine code git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1021822 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp255
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.h9
2 files changed, 26 insertions, 238 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 23660a0b9f..5616c30ae8 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -54,7 +54,8 @@ namespace Rdma {
readCallback(rc),
idleCallback(ic),
fullCallback(fc),
- errorCallback(ec)
+ errorCallback(ec),
+ pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this))
{
qp->nonblocking();
qp->notifyRecv();
@@ -73,7 +74,7 @@ namespace Rdma {
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
// Turn off callbacks if necessary (before doing the deletes)
- if (state.get() != SHUTDOWN) {
+ if (state.get() != STOPPED) {
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
dataHandle.stopWatch();
}
@@ -88,30 +89,9 @@ namespace Rdma {
// Mark for deletion/Delete this object when we have no outstanding writes
void AsynchIO::stop(NotifyCallback nc) {
- State oldState;
- State newState;
- bool doReturn;
- //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- do {
- newState = oldState = state.get();
- doReturn = true;
- if (oldState == IDLE || oldState == DRAINED) {
- doReturn = false;
- newState = SHUTDOWN;
- }
- } while (!state.boolCompareAndSwap(oldState, newState));
-
- // Ensure we can't get any more callbacks (except for the stopped callback)
- dataHandle.stopWatch();
-
- if (doReturn) {
- notifyCallback = nc;
- return;
- }
- // Callback, but don't store it - SHUTDOWN state means callback has been called
- // we *are* allowed to delete the AsynchIO in this callback, so we have to return immediately
- // after the callback
- nc(*this);
+ state = STOPPED;
+ notifyCallback = nc;
+ dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this));
}
namespace {
@@ -130,31 +110,8 @@ namespace Rdma {
// Mark writing closed (so we don't accept any more writes or make any idle callbacks)
void AsynchIO::drainWriteQueue(NotifyCallback nc) {
- State oldState;
- State newState;
- bool doReturn;
- //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- do {
- newState = oldState = state.get();
- doReturn = true;
- switch (oldState) {
- case IDLE:
- if (outstandingWrites == 0) {
- doReturn = false;
- newState = DRAINED;
- break;
- }
- /*FALLTHRU*/
- default:
- draining = true;
- break;
- }
- } while (!state.boolCompareAndSwap(oldState, newState));
- if (doReturn) {
- notifyCallback = nc;
- return;
- }
- nc(*this);
+ draining = true;
+ notifyCallback = nc;
}
void AsynchIO::queueWrite(Buffer* buff) {
@@ -183,167 +140,15 @@ namespace Rdma {
}
void AsynchIO::notifyPendingWrite() {
- // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
- // If we are then we just return as we know that we will eventually do the idle callback anyway.
- //
- // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- // We can get here in any state (as the caller could be in any thread)
- State oldState;
- State newState;
- bool doReturn;
- do {
- newState = oldState = state.get();
- doReturn = false;
- switch (oldState) {
- case NOTIFY_WRITE:
- case PENDING_NOTIFY:
- // We only need to note a pending notify if we're already doing a notify as data processing
- // is always followed by write notification processing
- newState = PENDING_NOTIFY;
- doReturn = true;
- break;
- case PENDING_DATA:
- doReturn = true;
- break;
- case DATA:
- // Only need to return here as data processing will do the idleCallback itself anyway
- doReturn = true;
- break;
- case IDLE:
- newState = NOTIFY_WRITE;
- break;
- case SHUTDOWN:
- // We can get here because it is too hard to eliminate all races of stop() and notifyPendingWrite()
- // just do nothing.
- doReturn = true;
- case DRAINED:
- // This is not allowed - we can't make any more writes as we're draining the write queue.
- assert(oldState!=DRAINED);
- doReturn = true;
- };
- } while (!state.boolCompareAndSwap(oldState, newState));
- if (doReturn) {
- return;
- }
-
- doWriteCallback();
-
- // Keep track of what we need to do so that we can release the lock
- enum {COMPLETION, NOTIFY, RETURN, EXIT} action;
- // If there was pending data whilst we were doing this, process it now
- //
- // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the
- // correct result if we reenter notifyPendingWrite(), in which case we want to
- // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks
- // not IDLE)
- do {
- //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- do {
- newState = oldState = state.get();
- action = RETURN; // Anything but COMPLETION
- switch (oldState) {
- case NOTIFY_WRITE:
- newState = IDLE;
- action = (action == COMPLETION) ? EXIT : RETURN;
- break;
- case PENDING_DATA:
- newState = NOTIFY_WRITE;
- action = COMPLETION;
- break;
- case PENDING_NOTIFY:
- newState = NOTIFY_WRITE;
- action = NOTIFY;
- break;
- default:
- assert(oldState!=IDLE && oldState!=DATA && oldState!=SHUTDOWN);
- action = RETURN;
- }
- } while (!state.boolCompareAndSwap(oldState, newState));
-
- // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state
- // so that we do need to process completions or notifications now
- switch (action) {
- case COMPLETION:
- processCompletions();
- // Fall through
- case NOTIFY:
- doWriteCallback();
- break;
- case RETURN:
- return;
- case EXIT:
- // If we just processed completions we might need to delete ourselves
- // TODO: XXX: can we delete ourselves correctly in notifyPendingWrite()?
- checkDrainedStopped();
- return;
- }
- } while (true);
+ dataHandle.call(pendingWriteAction);
}
void AsynchIO::dataEvent() {
- // Keep track of writable notifications
- // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- State oldState;
- State newState;
- bool doReturn;
- do {
- newState = oldState = state.get();
- doReturn = false;
- // We're already processing a notification
- switch (oldState) {
- case IDLE:
- newState = DATA;
- break;
- case SHUTDOWN:
- doReturn = true;
- // Fallthru
- case DRAINED:
- break;
- default:
- // Can't get here in DATA state as that would violate the serialisation rules
- assert( oldState!=DATA );
- newState = PENDING_DATA;
- doReturn = true;
- }
- } while (!state.boolCompareAndSwap(oldState, newState));
- if (doReturn) {
- return;
- }
-
+ if (state.get() == STOPPED) return;
+
processCompletions();
- //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- do {
- newState = oldState = state.get();
- switch (oldState) {
- case DATA:
- newState = NOTIFY_WRITE;
- break;
- case DRAINED:
- break;
- default:
- assert( oldState==DATA || oldState==DRAINED);
- }
- } while (!state.boolCompareAndSwap(oldState, newState));
-
- while (newState==NOTIFY_WRITE) {
- doWriteCallback();
-
- // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- do {
- newState = oldState = state.get();
- if ( oldState==NOTIFY_WRITE ) {
- newState = IDLE;
- } else {
- // Can't get DATA/PENDING_DATA/DRAINED here as dataEvent cannot be reentered
- assert( oldState==PENDING_NOTIFY );
- newState = NOTIFY_WRITE;
- }
- } while (!state.boolCompareAndSwap(oldState, newState));
- }
-
- // We might delete ourselves in here so return immediately
- checkDrainedStopped();
+ doWriteCallback();
}
void AsynchIO::processCompletions() {
@@ -471,46 +276,30 @@ namespace Rdma {
return;
}
}
+
+ checkDrained();
}
- void AsynchIO::checkDrainedStopped() {
+ void AsynchIO::checkDrained() {
// If we've got all the write confirmations and we're draining
// We might get deleted in the drained callback so return immediately
if (draining) {
if (outstandingWrites == 0) {
draining = false;
- doDrainedCallback();
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ nc(*this);
}
return;
}
-
- // We might need to delete ourselves
- if (notifyCallback) {
- doStoppedCallback();
- }
- }
-
- void AsynchIO::doDrainedCallback() {
- NotifyCallback nc;
- nc.swap(notifyCallback);
- // Transition unconditionally to DRAINED
- State oldState;
- do {
- oldState = state.get();
- assert(oldState==IDLE);
- } while (!state.boolCompareAndSwap(oldState, DRAINED));
- nc(*this);
}
-
+
void AsynchIO::doStoppedCallback() {
+ // Ensure we can't get any more callbacks (except for the stopped callback)
+ dataHandle.stopWatch();
+
NotifyCallback nc;
nc.swap(notifyCallback);
- // Transition unconditionally to SHUTDOWN
- State oldState;
- do {
- oldState = state.get();
- assert(oldState==IDLE);
- } while (!state.boolCompareAndSwap(oldState, SHUTDOWN));
nc(*this);
}
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
index 00eba28716..62779e4e78 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -50,10 +50,9 @@ namespace Rdma {
int recvBufferCount;
int xmitBufferCount;
int outstandingWrites;
- bool draining; // TODO: Perhaps (probably) this state can be merged with the following...
- enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN };
+ bool draining;
+ enum State {IDLE, STOPPED};
qpid::sys::AtomicValue<State> state;
- //qpid::sys::Mutex stateLock;
QueuePair::intrusive_ptr qp;
qpid::sys::DispatchHandleRef dataHandle;
@@ -62,6 +61,7 @@ namespace Rdma {
FullCallback fullCallback;
ErrorCallback errorCallback;
NotifyCallback notifyCallback;
+ qpid::sys::DispatchHandle::Callback pendingWriteAction;
public:
typedef boost::function1<void, AsynchIO&> RequestCallback;
@@ -103,9 +103,8 @@ namespace Rdma {
void dataEvent();
void processCompletions();
void doWriteCallback();
- void checkDrainedStopped();
+ void checkDrained();
void doStoppedCallback();
- void doDrainedCallback();
};
// We're only writable if: