diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:05:26 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:05:26 +0000 |
commit | a2bfe45d88da47239a974165f9a14dceb21670e4 (patch) | |
tree | f3b40cd8bdf3d4e9edd4b7edf504573e48cd3694 | |
parent | ef2d5950861b806b2064ae74dabfb4b0c3c1c864 (diff) | |
download | qpid-python-a2bfe45d88da47239a974165f9a14dceb21670e4.tar.gz |
Improve the performance of the Rdma::AsynchIO by using a very
simple state machine to reduce the context switch for notifyPendingWrite()
by allowing it to "hijack" existing concurrent processing on an IO thread.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1021823 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp | 62 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.h | 7 |
2 files changed, 62 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 5616c30ae8..1caa9b7e72 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -29,6 +29,8 @@ using qpid::sys::SocketAddress; using qpid::sys::DispatchHandle; using qpid::sys::Poller; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; namespace Rdma { AsynchIO::AsynchIO( @@ -55,7 +57,7 @@ namespace Rdma { idleCallback(ic), fullCallback(fc), errorCallback(ec), - pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this)) + pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this)) { qp->nonblocking(); qp->notifyRecv(); @@ -74,7 +76,7 @@ namespace Rdma { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); // Turn off callbacks if necessary (before doing the deletes) - if (state.get() != STOPPED) { + if (state != STOPPED) { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); dataHandle.stopWatch(); } @@ -89,6 +91,7 @@ namespace Rdma { // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::stop(NotifyCallback nc) { + ScopedLock<Mutex> l(stateLock); state = STOPPED; notifyCallback = nc; dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this)); @@ -140,15 +143,64 @@ namespace Rdma { } void AsynchIO::notifyPendingWrite() { - dataHandle.call(pendingWriteAction); + ScopedLock<Mutex> l(stateLock); + switch (state) { + case IDLE: + dataHandle.call(pendingWriteAction); + break; + case NOTIFY: + state = NOTIFY_PENDING; + break; + case NOTIFY_PENDING: + case STOPPED: + break; + } } void AsynchIO::dataEvent() { - if (state.get() == STOPPED) return; + { + ScopedLock<Mutex> l(stateLock); + + if (state == STOPPED) return; + state = NOTIFY_PENDING; + } processCompletions(); - doWriteCallback(); + writeEvent(); + } + + void AsynchIO::writeEvent() { + State newState; + do { + { + ScopedLock<Mutex> l(stateLock); + + switch (state) { + case STOPPED: + return; + default: + state = NOTIFY; + } + } + + doWriteCallback(); + + { + ScopedLock<Mutex> l(stateLock); + + newState = state; + switch (newState) { + case NOTIFY_PENDING: + state = NOTIFY; + break; + case STOPPED: + break; + default: + state = IDLE; + } + } + } while (newState == NOTIFY_PENDING); } void AsynchIO::processCompletions() { diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h index 62779e4e78..70c1a2a76a 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -26,6 +26,7 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/SocketAddress.h" #include <netinet/in.h> @@ -51,8 +52,9 @@ namespace Rdma { int xmitBufferCount; int outstandingWrites; bool draining; - enum State {IDLE, STOPPED}; - qpid::sys::AtomicValue<State> state; + enum State {IDLE, NOTIFY, NOTIFY_PENDING, STOPPED}; + State state; + qpid::sys::Mutex stateLock; QueuePair::intrusive_ptr qp; qpid::sys::DispatchHandleRef dataHandle; @@ -101,6 +103,7 @@ namespace Rdma { const static int IgnoreData = 0x10000000; // Message contains no application data void dataEvent(); + void writeEvent(); void processCompletions(); void doWriteCallback(); void checkDrained(); |