diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-09-23 18:43:08 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-09-23 18:43:08 +0000 |
commit | 3e33a73b2f351be2c4d7bbf71eab6320bebb8204 (patch) | |
tree | 187ddcb2369a707713f7f263548ebaf94a93516a /cpp | |
parent | 1af11d7c52307f2cdd98db9ea9595bade6abbc9d (diff) | |
download | qpid-python-3e33a73b2f351be2c4d7bbf71eab6320bebb8204.tar.gz |
Removed the state lock from the RdmaIO code
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698276 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/sys/AtomicValue_gcc.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 217 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 6 |
3 files changed, 129 insertions, 96 deletions
diff --git a/cpp/src/qpid/sys/AtomicValue_gcc.h b/cpp/src/qpid/sys/AtomicValue_gcc.h index da60edad65..d022b07c1d 100644 --- a/cpp/src/qpid/sys/AtomicValue_gcc.h +++ b/cpp/src/qpid/sys/AtomicValue_gcc.h @@ -57,7 +57,7 @@ class AtomicValue /** If current value == testval then set to newval. Returns true if the swap was performed. */ bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); } - T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(0); } + T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); } private: T value; diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index e3dc0cbf8f..77e766dd79 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -99,14 +99,25 @@ namespace Rdma { // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::deferDelete() { - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - if (outstandingWrites > 0 || state != IDLE) { - deleting = true; + State oldState; + State newState; + bool doReturn; + //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // It is safe to assign to deleting here as we either delete ourselves + // before leaving this function or deleting is set on exit + do { + newState = oldState = state.get(); + doReturn = false; + if (outstandingWrites > 0 || oldState != IDLE) { + deleting = true; + doReturn = true; + } else{ + newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { return; } - state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor - } delete this; } @@ -136,7 +147,7 @@ namespace Rdma { // Mark now closed (so we don't accept any more writes or make any idle callbacks) void AsynchIO::queueWriteClose() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // Don't think we actually need to lock here as transition is 1 way only to closed closed = true; } @@ -144,130 +155,150 @@ namespace Rdma { // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. // If we are then we just return as we know that we will eventually do the idle callback anyway. // - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); // We can get here in any state (as the caller could be in any thread) - switch (state) { - case NOTIFY_WRITE: - case PENDING_NOTIFY: - // We only need to note a pending notify if we're already doing a notify as data processing - // is always followed by write notification processing - state = PENDING_NOTIFY; - return; - case PENDING_DATA: - return; - case DATA: - // Only need to return here as data processing will do the idleCallback itself anyway + State oldState; + State newState; + bool doReturn; + do { + newState = oldState = state.get(); + doReturn = false; + switch (oldState) { + case NOTIFY_WRITE: + case PENDING_NOTIFY: + // We only need to note a pending notify if we're already doing a notify as data processing + // is always followed by write notification processing + newState = PENDING_NOTIFY; + doReturn = true; + break; + case PENDING_DATA: + doReturn = true; + break; + case DATA: + // Only need to return here as data processing will do the idleCallback itself anyway + doReturn = true; + break; + case IDLE: + newState = NOTIFY_WRITE; + break; + case DELETED: + assert(oldState!=DELETED); + doReturn = true; + }; + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { return; - case IDLE: - state = NOTIFY_WRITE; - break; - case DELETED: - assert(state!=DELETED); - } } doWriteCallback(); // Keep track of what we need to do so that we can release the lock - enum {COMPLETION, NOTIFY} action; - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + enum {COMPLETION, NOTIFY, RETURN, EXIT} action; // If there was pending data whilst we were doing this, process it now - switch (state) { - case NOTIFY_WRITE: - state = IDLE; - return; - case PENDING_DATA: - action = COMPLETION; - break; - case PENDING_NOTIFY: - action = NOTIFY; - break; - default: - assert(state!=IDLE && state!=DATA && state!=DELETED); - return; - } - // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the + // + // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the // correct result if we reenter notifyPendingWrite(), in which case we want to // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks // not IDLE) - state = NOTIFY_WRITE; - } do { + //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + do { + newState = oldState = state.get(); + action = RETURN; // Anything but COMPLETION + switch (oldState) { + case NOTIFY_WRITE: + newState = IDLE; + action = (action == COMPLETION) ? EXIT : RETURN; + break; + case PENDING_DATA: + newState = NOTIFY_WRITE; + action = COMPLETION; + break; + case PENDING_NOTIFY: + newState = NOTIFY_WRITE; + action = NOTIFY; + break; + default: + assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED); + action = RETURN; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state // so that we do need to process completions or notifications now switch (action) { case COMPLETION: processCompletions(); + // Fall through case NOTIFY: doWriteCallback(); break; - } - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - switch (state) { - case NOTIFY_WRITE: - state = IDLE; - goto exit; - case PENDING_DATA: - action = COMPLETION; - break; - case PENDING_NOTIFY: - action = NOTIFY; - break; - default: - assert(state!=IDLE && state!=DATA && state!=DELETED); + case RETURN: + return; + case EXIT: + // If we just processed completions we might need to delete ourselves + if (deleting && outstandingWrites == 0) { + delete this; + } return; - } - state = NOTIFY_WRITE; } } while (true); - exit: - // If we just processed completions we might need to delete ourselves - if (action == COMPLETION && deleting && outstandingWrites == 0) { - delete this; - } } void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) { // Keep track of writable notifications - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - // We're already processing a notification - switch (state) { - case IDLE: - break; - default: - state = PENDING_DATA; + // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + State oldState; + State newState; + bool doReturn; + do { + newState = oldState = state.get(); + doReturn = false; + // We're already processing a notification + switch (oldState) { + case IDLE: + newState = DATA; + break; + default: + // Can't get here in DATA state as that would violate the serialisation rules + assert( oldState!=DATA ); + newState = PENDING_DATA; + doReturn = true; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { return; } - // Can't get here in DATA state as that would violate the serialisation rules - assert( state==IDLE ); - state = DATA; - } processCompletions(); - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - assert( state==DATA ); - state = NOTIFY_WRITE; - } + //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + do { + newState = oldState = state.get(); + assert( oldState==DATA ); + newState = NOTIFY_WRITE; + } while (!state.boolCompareAndSwap(oldState, newState)); do { doWriteCallback(); - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - if ( state==NOTIFY_WRITE ) { - state = IDLE; + // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + bool doBreak; + do { + newState = oldState = state.get(); + doBreak = false; + if ( oldState==NOTIFY_WRITE ) { + newState = IDLE; + doBreak = true; + } else { + // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered + assert( oldState==PENDING_NOTIFY ); + newState = NOTIFY_WRITE; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doBreak) { break; } - // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered - assert( state==PENDING_NOTIFY ); - state = NOTIFY_WRITE; - } } while (true); // We might need to delete ourselves diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 29132b8967..57bf735307 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -23,6 +23,7 @@ #include "rdma_wrap.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Mutex.h" @@ -53,8 +54,9 @@ namespace Rdma { int outstandingWrites; bool closed; // TODO: Perhaps (probably) this state can be merged with the following... bool deleting; // TODO: Perhaps (probably) this state can be merged with the following... - enum { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED } state; - qpid::sys::Mutex stateLock; + enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED }; + qpid::sys::AtomicValue<State> state; + //qpid::sys::Mutex stateLock; std::deque<Buffer*> bufferQueue; qpid::sys::Mutex bufferQueueLock; boost::ptr_deque<Buffer> buffers; |