summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-10-12 16:05:26 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-10-12 16:05:26 +0000
commita2bfe45d88da47239a974165f9a14dceb21670e4 (patch)
treef3b40cd8bdf3d4e9edd4b7edf504573e48cd3694
parentef2d5950861b806b2064ae74dabfb4b0c3c1c864 (diff)
downloadqpid-python-a2bfe45d88da47239a974165f9a14dceb21670e4.tar.gz
Improve the performance of the Rdma::AsynchIO by using a very
simple state machine to reduce the context switch for notifyPendingWrite() by allowing it to "hijack" existing concurrent processing on an IO thread. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1021823 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp62
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.h7
2 files changed, 62 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 5616c30ae8..1caa9b7e72 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -29,6 +29,8 @@
using qpid::sys::SocketAddress;
using qpid::sys::DispatchHandle;
using qpid::sys::Poller;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
namespace Rdma {
AsynchIO::AsynchIO(
@@ -55,7 +57,7 @@ namespace Rdma {
idleCallback(ic),
fullCallback(fc),
errorCallback(ec),
- pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this))
+ pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this))
{
qp->nonblocking();
qp->notifyRecv();
@@ -74,7 +76,7 @@ namespace Rdma {
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
// Turn off callbacks if necessary (before doing the deletes)
- if (state.get() != STOPPED) {
+ if (state != STOPPED) {
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
dataHandle.stopWatch();
}
@@ -89,6 +91,7 @@ namespace Rdma {
// Mark for deletion/Delete this object when we have no outstanding writes
void AsynchIO::stop(NotifyCallback nc) {
+ ScopedLock<Mutex> l(stateLock);
state = STOPPED;
notifyCallback = nc;
dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this));
@@ -140,15 +143,64 @@ namespace Rdma {
}
void AsynchIO::notifyPendingWrite() {
- dataHandle.call(pendingWriteAction);
+ ScopedLock<Mutex> l(stateLock);
+ switch (state) {
+ case IDLE:
+ dataHandle.call(pendingWriteAction);
+ break;
+ case NOTIFY:
+ state = NOTIFY_PENDING;
+ break;
+ case NOTIFY_PENDING:
+ case STOPPED:
+ break;
+ }
}
void AsynchIO::dataEvent() {
- if (state.get() == STOPPED) return;
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ if (state == STOPPED) return;
+ state = NOTIFY_PENDING;
+ }
processCompletions();
- doWriteCallback();
+ writeEvent();
+ }
+
+ void AsynchIO::writeEvent() {
+ State newState;
+ do {
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ switch (state) {
+ case STOPPED:
+ return;
+ default:
+ state = NOTIFY;
+ }
+ }
+
+ doWriteCallback();
+
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ newState = state;
+ switch (newState) {
+ case NOTIFY_PENDING:
+ state = NOTIFY;
+ break;
+ case STOPPED:
+ break;
+ default:
+ state = IDLE;
+ }
+ }
+ } while (newState == NOTIFY_PENDING);
}
void AsynchIO::processCompletions() {
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
index 62779e4e78..70c1a2a76a 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -26,6 +26,7 @@
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/sys/SocketAddress.h"
#include <netinet/in.h>
@@ -51,8 +52,9 @@ namespace Rdma {
int xmitBufferCount;
int outstandingWrites;
bool draining;
- enum State {IDLE, STOPPED};
- qpid::sys::AtomicValue<State> state;
+ enum State {IDLE, NOTIFY, NOTIFY_PENDING, STOPPED};
+ State state;
+ qpid::sys::Mutex stateLock;
QueuePair::intrusive_ptr qp;
qpid::sys::DispatchHandleRef dataHandle;
@@ -101,6 +103,7 @@ namespace Rdma {
const static int IgnoreData = 0x10000000; // Message contains no application data
void dataEvent();
+ void writeEvent();
void processCompletions();
void doWriteCallback();
void checkDrained();