summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-09-23 18:43:08 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-09-23 18:43:08 +0000
commit3e33a73b2f351be2c4d7bbf71eab6320bebb8204 (patch)
tree187ddcb2369a707713f7f263548ebaf94a93516a /cpp
parent1af11d7c52307f2cdd98db9ea9595bade6abbc9d (diff)
downloadqpid-python-3e33a73b2f351be2c4d7bbf71eab6320bebb8204.tar.gz
Removed the state lock from the RdmaIO code
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698276 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/sys/AtomicValue_gcc.h2
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp217
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h6
3 files changed, 129 insertions, 96 deletions
diff --git a/cpp/src/qpid/sys/AtomicValue_gcc.h b/cpp/src/qpid/sys/AtomicValue_gcc.h
index da60edad65..d022b07c1d 100644
--- a/cpp/src/qpid/sys/AtomicValue_gcc.h
+++ b/cpp/src/qpid/sys/AtomicValue_gcc.h
@@ -57,7 +57,7 @@ class AtomicValue
/** If current value == testval then set to newval. Returns true if the swap was performed. */
bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
- T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(0); }
+ T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); }
private:
T value;
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index e3dc0cbf8f..77e766dd79 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -99,14 +99,25 @@ namespace Rdma {
// Mark for deletion/Delete this object when we have no outstanding writes
void AsynchIO::deferDelete() {
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- if (outstandingWrites > 0 || state != IDLE) {
- deleting = true;
+ State oldState;
+ State newState;
+ bool doReturn;
+ //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // It is safe to assign to deleting here as we either delete ourselves
+ // before leaving this function or deleting is set on exit
+ do {
+ newState = oldState = state.get();
+ doReturn = false;
+ if (outstandingWrites > 0 || oldState != IDLE) {
+ deleting = true;
+ doReturn = true;
+ } else{
+ newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
+ }
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doReturn) {
return;
}
- state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
- }
delete this;
}
@@ -136,7 +147,7 @@ namespace Rdma {
// Mark now closed (so we don't accept any more writes or make any idle callbacks)
void AsynchIO::queueWriteClose() {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // Don't think we actually need to lock here as transition is 1 way only to closed
closed = true;
}
@@ -144,130 +155,150 @@ namespace Rdma {
// As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
// If we are then we just return as we know that we will eventually do the idle callback anyway.
//
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
// We can get here in any state (as the caller could be in any thread)
- switch (state) {
- case NOTIFY_WRITE:
- case PENDING_NOTIFY:
- // We only need to note a pending notify if we're already doing a notify as data processing
- // is always followed by write notification processing
- state = PENDING_NOTIFY;
- return;
- case PENDING_DATA:
- return;
- case DATA:
- // Only need to return here as data processing will do the idleCallback itself anyway
+ State oldState;
+ State newState;
+ bool doReturn;
+ do {
+ newState = oldState = state.get();
+ doReturn = false;
+ switch (oldState) {
+ case NOTIFY_WRITE:
+ case PENDING_NOTIFY:
+ // We only need to note a pending notify if we're already doing a notify as data processing
+ // is always followed by write notification processing
+ newState = PENDING_NOTIFY;
+ doReturn = true;
+ break;
+ case PENDING_DATA:
+ doReturn = true;
+ break;
+ case DATA:
+ // Only need to return here as data processing will do the idleCallback itself anyway
+ doReturn = true;
+ break;
+ case IDLE:
+ newState = NOTIFY_WRITE;
+ break;
+ case DELETED:
+ assert(oldState!=DELETED);
+ doReturn = true;
+ };
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doReturn) {
return;
- case IDLE:
- state = NOTIFY_WRITE;
- break;
- case DELETED:
- assert(state!=DELETED);
- }
}
doWriteCallback();
// Keep track of what we need to do so that we can release the lock
- enum {COMPLETION, NOTIFY} action;
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ enum {COMPLETION, NOTIFY, RETURN, EXIT} action;
// If there was pending data whilst we were doing this, process it now
- switch (state) {
- case NOTIFY_WRITE:
- state = IDLE;
- return;
- case PENDING_DATA:
- action = COMPLETION;
- break;
- case PENDING_NOTIFY:
- action = NOTIFY;
- break;
- default:
- assert(state!=IDLE && state!=DATA && state!=DELETED);
- return;
- }
- // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the
+ //
+ // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the
// correct result if we reenter notifyPendingWrite(), in which case we want to
// end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks
// not IDLE)
- state = NOTIFY_WRITE;
- }
do {
+ //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ do {
+ newState = oldState = state.get();
+ action = RETURN; // Anything but COMPLETION
+ switch (oldState) {
+ case NOTIFY_WRITE:
+ newState = IDLE;
+ action = (action == COMPLETION) ? EXIT : RETURN;
+ break;
+ case PENDING_DATA:
+ newState = NOTIFY_WRITE;
+ action = COMPLETION;
+ break;
+ case PENDING_NOTIFY:
+ newState = NOTIFY_WRITE;
+ action = NOTIFY;
+ break;
+ default:
+ assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED);
+ action = RETURN;
+ }
+ } while (!state.boolCompareAndSwap(oldState, newState));
+
// Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state
// so that we do need to process completions or notifications now
switch (action) {
case COMPLETION:
processCompletions();
+ // Fall through
case NOTIFY:
doWriteCallback();
break;
- }
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- switch (state) {
- case NOTIFY_WRITE:
- state = IDLE;
- goto exit;
- case PENDING_DATA:
- action = COMPLETION;
- break;
- case PENDING_NOTIFY:
- action = NOTIFY;
- break;
- default:
- assert(state!=IDLE && state!=DATA && state!=DELETED);
+ case RETURN:
+ return;
+ case EXIT:
+ // If we just processed completions we might need to delete ourselves
+ if (deleting && outstandingWrites == 0) {
+ delete this;
+ }
return;
- }
- state = NOTIFY_WRITE;
}
} while (true);
- exit:
- // If we just processed completions we might need to delete ourselves
- if (action == COMPLETION && deleting && outstandingWrites == 0) {
- delete this;
- }
}
void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) {
// Keep track of writable notifications
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- // We're already processing a notification
- switch (state) {
- case IDLE:
- break;
- default:
- state = PENDING_DATA;
+ // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ State oldState;
+ State newState;
+ bool doReturn;
+ do {
+ newState = oldState = state.get();
+ doReturn = false;
+ // We're already processing a notification
+ switch (oldState) {
+ case IDLE:
+ newState = DATA;
+ break;
+ default:
+ // Can't get here in DATA state as that would violate the serialisation rules
+ assert( oldState!=DATA );
+ newState = PENDING_DATA;
+ doReturn = true;
+ }
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doReturn) {
return;
}
- // Can't get here in DATA state as that would violate the serialisation rules
- assert( state==IDLE );
- state = DATA;
- }
processCompletions();
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- assert( state==DATA );
- state = NOTIFY_WRITE;
- }
+ //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ do {
+ newState = oldState = state.get();
+ assert( oldState==DATA );
+ newState = NOTIFY_WRITE;
+ } while (!state.boolCompareAndSwap(oldState, newState));
do {
doWriteCallback();
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- if ( state==NOTIFY_WRITE ) {
- state = IDLE;
+ // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ bool doBreak;
+ do {
+ newState = oldState = state.get();
+ doBreak = false;
+ if ( oldState==NOTIFY_WRITE ) {
+ newState = IDLE;
+ doBreak = true;
+ } else {
+ // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
+ assert( oldState==PENDING_NOTIFY );
+ newState = NOTIFY_WRITE;
+ }
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doBreak) {
break;
}
- // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
- assert( state==PENDING_NOTIFY );
- state = NOTIFY_WRITE;
- }
} while (true);
// We might need to delete ourselves
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index 29132b8967..57bf735307 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -23,6 +23,7 @@
#include "rdma_wrap.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Mutex.h"
@@ -53,8 +54,9 @@ namespace Rdma {
int outstandingWrites;
bool closed; // TODO: Perhaps (probably) this state can be merged with the following...
bool deleting; // TODO: Perhaps (probably) this state can be merged with the following...
- enum { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED } state;
- qpid::sys::Mutex stateLock;
+ enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED };
+ qpid::sys::AtomicValue<State> state;
+ //qpid::sys::Mutex stateLock;
std::deque<Buffer*> bufferQueue;
qpid::sys::Mutex bufferQueueLock;
boost::ptr_deque<Buffer> buffers;