diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 62 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 7 |
2 files changed, 62 insertions, 7 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 5616c30ae8..1caa9b7e72 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -29,6 +29,8 @@ using qpid::sys::SocketAddress; using qpid::sys::DispatchHandle; using qpid::sys::Poller; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; namespace Rdma { AsynchIO::AsynchIO( @@ -55,7 +57,7 @@ namespace Rdma { idleCallback(ic), fullCallback(fc), errorCallback(ec), - pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this)) + pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this)) { qp->nonblocking(); qp->notifyRecv(); @@ -74,7 +76,7 @@ namespace Rdma { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); // Turn off callbacks if necessary (before doing the deletes) - if (state.get() != STOPPED) { + if (state != STOPPED) { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); dataHandle.stopWatch(); } @@ -89,6 +91,7 @@ namespace Rdma { // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::stop(NotifyCallback nc) { + ScopedLock<Mutex> l(stateLock); state = STOPPED; notifyCallback = nc; dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this)); @@ -140,15 +143,64 @@ namespace Rdma { } void AsynchIO::notifyPendingWrite() { - dataHandle.call(pendingWriteAction); + ScopedLock<Mutex> l(stateLock); + switch (state) { + case IDLE: + dataHandle.call(pendingWriteAction); + break; + case NOTIFY: + state = NOTIFY_PENDING; + break; + case NOTIFY_PENDING: + case STOPPED: + break; + } } void AsynchIO::dataEvent() { - if (state.get() == STOPPED) return; + { + ScopedLock<Mutex> l(stateLock); + + if (state == STOPPED) return; + state = NOTIFY_PENDING; + } processCompletions(); - doWriteCallback(); + writeEvent(); + } + + void AsynchIO::writeEvent() { + State newState; + do { + { + ScopedLock<Mutex> l(stateLock); + + switch (state) { + case STOPPED: + return; + default: + state = NOTIFY; + } + } + + doWriteCallback(); + + { + ScopedLock<Mutex> l(stateLock); + + newState = state; + switch (newState) { + case NOTIFY_PENDING: + state = NOTIFY; + break; + case STOPPED: + break; + default: + state = IDLE; + } + } + } while (newState == NOTIFY_PENDING); } void AsynchIO::processCompletions() { diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 62779e4e78..70c1a2a76a 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -26,6 +26,7 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/SocketAddress.h" #include <netinet/in.h> @@ -51,8 +52,9 @@ namespace Rdma { int xmitBufferCount; int outstandingWrites; bool draining; - enum State {IDLE, STOPPED}; - qpid::sys::AtomicValue<State> state; + enum State {IDLE, NOTIFY, NOTIFY_PENDING, STOPPED}; + State state; + qpid::sys::Mutex stateLock; QueuePair::intrusive_ptr qp; qpid::sys::DispatchHandleRef dataHandle; @@ -101,6 +103,7 @@ namespace Rdma { const static int IgnoreData = 0x10000000; // Message contains no application data void dataEvent(); + void writeEvent(); void processCompletions(); void doWriteCallback(); void checkDrained(); |