diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:05:15 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:05:15 +0000 |
commit | ef2d5950861b806b2064ae74dabfb4b0c3c1c864 (patch) | |
tree | fec19513f8e3fc30498862a4917bfbd0815e77a5 | |
parent | fe9fca8fadffbb8658a001884b33c39fcd29f2c4 (diff) | |
download | qpid-python-ef2d5950861b806b2064ae74dabfb4b0c3c1c864.tar.gz |
Rewrite Rdma::AsynchIO to use deferred code rather than a state machine:
This eliminates a lot of difficult to understand error prone
state machine code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1021822 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp | 255 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.h | 9 |
2 files changed, 26 insertions, 238 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 23660a0b9f..5616c30ae8 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -54,7 +54,8 @@ namespace Rdma { readCallback(rc), idleCallback(ic), fullCallback(fc), - errorCallback(ec) + errorCallback(ec), + pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this)) { qp->nonblocking(); qp->notifyRecv(); @@ -73,7 +74,7 @@ namespace Rdma { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); // Turn off callbacks if necessary (before doing the deletes) - if (state.get() != SHUTDOWN) { + if (state.get() != STOPPED) { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); dataHandle.stopWatch(); } @@ -88,30 +89,9 @@ namespace Rdma { // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::stop(NotifyCallback nc) { - State oldState; - State newState; - bool doReturn; - //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - do { - newState = oldState = state.get(); - doReturn = true; - if (oldState == IDLE || oldState == DRAINED) { - doReturn = false; - newState = SHUTDOWN; - } - } while (!state.boolCompareAndSwap(oldState, newState)); - - // Ensure we can't get any more callbacks (except for the stopped callback) - dataHandle.stopWatch(); - - if (doReturn) { - notifyCallback = nc; - return; - } - // Callback, but don't store it - SHUTDOWN state means callback has been called - // we *are* allowed to delete the AsynchIO in this callback, so we have to return immediately - // after the callback - nc(*this); + state = STOPPED; + notifyCallback = nc; + dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this)); } namespace { @@ -130,31 +110,8 @@ namespace Rdma { // Mark writing closed (so we don't accept any more writes or make any idle callbacks) void AsynchIO::drainWriteQueue(NotifyCallback nc) { - State oldState; - State newState; - bool doReturn; - //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - do { - newState = oldState = state.get(); - doReturn = true; - switch (oldState) { - case IDLE: - if (outstandingWrites == 0) { - doReturn = false; - newState = DRAINED; - break; - } - /*FALLTHRU*/ - default: - draining = true; - break; - } - } while (!state.boolCompareAndSwap(oldState, newState)); - if (doReturn) { - notifyCallback = nc; - return; - } - nc(*this); + draining = true; + notifyCallback = nc; } void AsynchIO::queueWrite(Buffer* buff) { @@ -183,167 +140,15 @@ namespace Rdma { } void AsynchIO::notifyPendingWrite() { - // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. - // If we are then we just return as we know that we will eventually do the idle callback anyway. - // - // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - // We can get here in any state (as the caller could be in any thread) - State oldState; - State newState; - bool doReturn; - do { - newState = oldState = state.get(); - doReturn = false; - switch (oldState) { - case NOTIFY_WRITE: - case PENDING_NOTIFY: - // We only need to note a pending notify if we're already doing a notify as data processing - // is always followed by write notification processing - newState = PENDING_NOTIFY; - doReturn = true; - break; - case PENDING_DATA: - doReturn = true; - break; - case DATA: - // Only need to return here as data processing will do the idleCallback itself anyway - doReturn = true; - break; - case IDLE: - newState = NOTIFY_WRITE; - break; - case SHUTDOWN: - // We can get here because it is too hard to eliminate all races of stop() and notifyPendingWrite() - // just do nothing. - doReturn = true; - case DRAINED: - // This is not allowed - we can't make any more writes as we're draining the write queue. - assert(oldState!=DRAINED); - doReturn = true; - }; - } while (!state.boolCompareAndSwap(oldState, newState)); - if (doReturn) { - return; - } - - doWriteCallback(); - - // Keep track of what we need to do so that we can release the lock - enum {COMPLETION, NOTIFY, RETURN, EXIT} action; - // If there was pending data whilst we were doing this, process it now - // - // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the - // correct result if we reenter notifyPendingWrite(), in which case we want to - // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks - // not IDLE) - do { - //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - do { - newState = oldState = state.get(); - action = RETURN; // Anything but COMPLETION - switch (oldState) { - case NOTIFY_WRITE: - newState = IDLE; - action = (action == COMPLETION) ? EXIT : RETURN; - break; - case PENDING_DATA: - newState = NOTIFY_WRITE; - action = COMPLETION; - break; - case PENDING_NOTIFY: - newState = NOTIFY_WRITE; - action = NOTIFY; - break; - default: - assert(oldState!=IDLE && oldState!=DATA && oldState!=SHUTDOWN); - action = RETURN; - } - } while (!state.boolCompareAndSwap(oldState, newState)); - - // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state - // so that we do need to process completions or notifications now - switch (action) { - case COMPLETION: - processCompletions(); - // Fall through - case NOTIFY: - doWriteCallback(); - break; - case RETURN: - return; - case EXIT: - // If we just processed completions we might need to delete ourselves - // TODO: XXX: can we delete ourselves correctly in notifyPendingWrite()? - checkDrainedStopped(); - return; - } - } while (true); + dataHandle.call(pendingWriteAction); } void AsynchIO::dataEvent() { - // Keep track of writable notifications - // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - State oldState; - State newState; - bool doReturn; - do { - newState = oldState = state.get(); - doReturn = false; - // We're already processing a notification - switch (oldState) { - case IDLE: - newState = DATA; - break; - case SHUTDOWN: - doReturn = true; - // Fallthru - case DRAINED: - break; - default: - // Can't get here in DATA state as that would violate the serialisation rules - assert( oldState!=DATA ); - newState = PENDING_DATA; - doReturn = true; - } - } while (!state.boolCompareAndSwap(oldState, newState)); - if (doReturn) { - return; - } - + if (state.get() == STOPPED) return; + processCompletions(); - //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - do { - newState = oldState = state.get(); - switch (oldState) { - case DATA: - newState = NOTIFY_WRITE; - break; - case DRAINED: - break; - default: - assert( oldState==DATA || oldState==DRAINED); - } - } while (!state.boolCompareAndSwap(oldState, newState)); - - while (newState==NOTIFY_WRITE) { - doWriteCallback(); - - // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - do { - newState = oldState = state.get(); - if ( oldState==NOTIFY_WRITE ) { - newState = IDLE; - } else { - // Can't get DATA/PENDING_DATA/DRAINED here as dataEvent cannot be reentered - assert( oldState==PENDING_NOTIFY ); - newState = NOTIFY_WRITE; - } - } while (!state.boolCompareAndSwap(oldState, newState)); - } - - // We might delete ourselves in here so return immediately - checkDrainedStopped(); + doWriteCallback(); } void AsynchIO::processCompletions() { @@ -471,46 +276,30 @@ namespace Rdma { return; } } + + checkDrained(); } - void AsynchIO::checkDrainedStopped() { + void AsynchIO::checkDrained() { // If we've got all the write confirmations and we're draining // We might get deleted in the drained callback so return immediately if (draining) { if (outstandingWrites == 0) { draining = false; - doDrainedCallback(); + NotifyCallback nc; + nc.swap(notifyCallback); + nc(*this); } return; } - - // We might need to delete ourselves - if (notifyCallback) { - doStoppedCallback(); - } - } - - void AsynchIO::doDrainedCallback() { - NotifyCallback nc; - nc.swap(notifyCallback); - // Transition unconditionally to DRAINED - State oldState; - do { - oldState = state.get(); - assert(oldState==IDLE); - } while (!state.boolCompareAndSwap(oldState, DRAINED)); - nc(*this); } - + void AsynchIO::doStoppedCallback() { + // Ensure we can't get any more callbacks (except for the stopped callback) + dataHandle.stopWatch(); + NotifyCallback nc; nc.swap(notifyCallback); - // Transition unconditionally to SHUTDOWN - State oldState; - do { - oldState = state.get(); - assert(oldState==IDLE); - } while (!state.boolCompareAndSwap(oldState, SHUTDOWN)); nc(*this); } diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h index 00eba28716..62779e4e78 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -50,10 +50,9 @@ namespace Rdma { int recvBufferCount; int xmitBufferCount; int outstandingWrites; - bool draining; // TODO: Perhaps (probably) this state can be merged with the following... - enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN }; + bool draining; + enum State {IDLE, STOPPED}; qpid::sys::AtomicValue<State> state; - //qpid::sys::Mutex stateLock; QueuePair::intrusive_ptr qp; qpid::sys::DispatchHandleRef dataHandle; @@ -62,6 +61,7 @@ namespace Rdma { FullCallback fullCallback; ErrorCallback errorCallback; NotifyCallback notifyCallback; + qpid::sys::DispatchHandle::Callback pendingWriteAction; public: typedef boost::function1<void, AsynchIO&> RequestCallback; @@ -103,9 +103,8 @@ namespace Rdma { void dataEvent(); void processCompletions(); void doWriteCallback(); - void checkDrainedStopped(); + void checkDrained(); void doStoppedCallback(); - void doDrainedCallback(); }; // We're only writable if: |