summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp62
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h7
2 files changed, 62 insertions, 7 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 5616c30ae8..1caa9b7e72 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -29,6 +29,8 @@
using qpid::sys::SocketAddress;
using qpid::sys::DispatchHandle;
using qpid::sys::Poller;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
namespace Rdma {
AsynchIO::AsynchIO(
@@ -55,7 +57,7 @@ namespace Rdma {
idleCallback(ic),
fullCallback(fc),
errorCallback(ec),
- pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this))
+ pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this))
{
qp->nonblocking();
qp->notifyRecv();
@@ -74,7 +76,7 @@ namespace Rdma {
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
// Turn off callbacks if necessary (before doing the deletes)
- if (state.get() != STOPPED) {
+ if (state != STOPPED) {
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
dataHandle.stopWatch();
}
@@ -89,6 +91,7 @@ namespace Rdma {
// Mark for deletion/Delete this object when we have no outstanding writes
void AsynchIO::stop(NotifyCallback nc) {
+ ScopedLock<Mutex> l(stateLock);
state = STOPPED;
notifyCallback = nc;
dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this));
@@ -140,15 +143,64 @@ namespace Rdma {
}
void AsynchIO::notifyPendingWrite() {
- dataHandle.call(pendingWriteAction);
+ ScopedLock<Mutex> l(stateLock);
+ switch (state) {
+ case IDLE:
+ dataHandle.call(pendingWriteAction);
+ break;
+ case NOTIFY:
+ state = NOTIFY_PENDING;
+ break;
+ case NOTIFY_PENDING:
+ case STOPPED:
+ break;
+ }
}
void AsynchIO::dataEvent() {
- if (state.get() == STOPPED) return;
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ if (state == STOPPED) return;
+ state = NOTIFY_PENDING;
+ }
processCompletions();
- doWriteCallback();
+ writeEvent();
+ }
+
+ void AsynchIO::writeEvent() {
+ State newState;
+ do {
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ switch (state) {
+ case STOPPED:
+ return;
+ default:
+ state = NOTIFY;
+ }
+ }
+
+ doWriteCallback();
+
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ newState = state;
+ switch (newState) {
+ case NOTIFY_PENDING:
+ state = NOTIFY;
+ break;
+ case STOPPED:
+ break;
+ default:
+ state = IDLE;
+ }
+ }
+ } while (newState == NOTIFY_PENDING);
}
void AsynchIO::processCompletions() {
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index 62779e4e78..70c1a2a76a 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -26,6 +26,7 @@
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/sys/SocketAddress.h"
#include <netinet/in.h>
@@ -51,8 +52,9 @@ namespace Rdma {
int xmitBufferCount;
int outstandingWrites;
bool draining;
- enum State {IDLE, STOPPED};
- qpid::sys::AtomicValue<State> state;
+ enum State {IDLE, NOTIFY, NOTIFY_PENDING, STOPPED};
+ State state;
+ qpid::sys::Mutex stateLock;
QueuePair::intrusive_ptr qp;
qpid::sys::DispatchHandleRef dataHandle;
@@ -101,6 +103,7 @@ namespace Rdma {
const static int IgnoreData = 0x10000000; // Message contains no application data
void dataEvent();
+ void writeEvent();
void processCompletions();
void doWriteCallback();
void checkDrained();