summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaClient.cpp37
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp126
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h22
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaServer.cpp28
4 files changed, 66 insertions, 147 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
index 7c2d9de505..afff96b72f 100644
--- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -30,8 +30,8 @@ int64_t smsgs = 0;
int64_t sbytes = 0;
int64_t rmsgs = 0;
int64_t rbytes = 0;
-
-int outstandingwrites = 0;
+int64_t cmsgs = 0;
+int writable = true;
int target = 1000000;
int msgsize = 200;
@@ -42,17 +42,18 @@ Duration fullTestDuration(TIME_INFINITE);
vector<char> testString;
void write(Rdma::AsynchIO& aio) {
- //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
- while (smsgs < target && outstandingwrites < (3*Rdma::DEFAULT_WR_ENTRIES/4)) {
+ if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
+ while (writable) {
+ if (smsgs >= target)
+ return;
Rdma::Buffer* b = aio.getBuffer();
std::copy(testString.begin(), testString.end(), b->bytes);
b->dataCount = msgsize;
aio.queueWrite(b);
- ++outstandingwrites;
++smsgs;
sbytes += b->byteCount;
}
- //}
+ }
}
void dataError(Rdma::AsynchIO&) {
@@ -66,24 +67,27 @@ void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
// When all messages have been recvd stop
if (rmsgs < target) {
write(aio);
- return;
+ } else {
+ fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
+ if (cmsgs >= target)
+ p->shutdown();
}
+}
- fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
- if (outstandingwrites == 0)
- p->shutdown();
+void full(Rdma::AsynchIO&) {
+ writable = false;
}
void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
- --outstandingwrites;
+ writable = true;
+ ++cmsgs;
if (smsgs < target) {
write(aio);
- return;
+ } else {
+ sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
+ if (rmsgs >= target && cmsgs >= target)
+ p->shutdown();
}
-
- sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
- if (smsgs >= target && rmsgs >= target && outstandingwrites == 0)
- p->shutdown();
}
void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
@@ -93,6 +97,7 @@ void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
boost::bind(&data, poller, _1, _2),
boost::bind(&idle, poller, _1),
+ &full,
dataError);
startTime = AbsTime::now();
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 31d109ea4d..755d6f17c4 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -9,14 +9,18 @@ namespace Rdma {
int s,
ReadCallback rc,
IdleCallback ic,
+ FullCallback fc,
ErrorCallback ec
) :
qp(q),
dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
bufferSize(s),
recvBufferCount(DEFAULT_WR_ENTRIES),
+ xmitBufferCount(DEFAULT_WR_ENTRIES),
+ outstandingWrites(0),
readCallback(rc),
idleCallback(ic),
+ fullCallback(fc),
errorCallback(ec)
{
qp->nonblocking();
@@ -40,20 +44,28 @@ namespace Rdma {
dataHandle.startWatch(poller);
}
- void AsynchIO::queueReadBuffer(Buffer*) {
- }
-
+ // TODO: Currently we don't prevent write buffer overrun we just advise
+ // when to stop writing.
void AsynchIO::queueWrite(Buffer* buff) {
qp->postSend(buff);
+ ++outstandingWrites;
+ if (outstandingWrites >= xmitBufferCount) {
+ fullCallback(*this);
+ }
}
void AsynchIO::notifyPendingWrite() {
+ // Just perform the idle callback (if possible)
+ if (outstandingWrites < xmitBufferCount) {
+ idleCallback(*this);
+ }
}
void AsynchIO::queueWriteClose() {
}
Buffer* AsynchIO::getBuffer() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
if (bufferQueue.empty()) {
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
@@ -103,7 +115,12 @@ namespace Rdma {
// At this point the buffer has been consumed so put it back on the recv queue
qp->postRecv(b);
} else {
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
bufferQueue.push_front(b);
+ }
+ --outstandingWrites;
+ // TODO: maybe don't call idle unless we're low on write buffers
idleCallback(*this);
}
} while (true);
@@ -122,8 +139,7 @@ namespace Rdma {
connectedCallback(cc),
errorCallback(errc),
disconnectedCallback(dc),
- connectionRequestCallback(crc),
- state(IDLE)
+ connectionRequestCallback(crc)
{
ci->nonblocking();
}
@@ -131,7 +147,6 @@ namespace Rdma {
void Listener::start(Poller::shared_ptr poller) {
ci->bind(src_addr);
ci->listen();
- state = LISTENING;
handle.startWatch(poller);
}
@@ -194,15 +209,13 @@ namespace Rdma {
connectedCallback(cc),
errorCallback(errc),
disconnectedCallback(dc),
- rejectedCallback(rc),
- state(IDLE)
+ rejectedCallback(rc)
{
ci->nonblocking();
}
void Connector::start(Poller::shared_ptr poller) {
ci->resolve_addr(dst_addr);
- state = RESOLVE_ADDR;
handle.startWatch(poller);
}
@@ -214,138 +227,45 @@ namespace Rdma {
return;
::rdma_cm_event_type eventType = e.getEventType();
-#if 1
switch (eventType) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
// RESOLVE_ADDR
- state = RESOLVE_ROUTE;
ci->resolve_route();
break;
case RDMA_CM_EVENT_ADDR_ERROR:
// RESOLVE_ADDR
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
// RESOLVE_ROUTE:
- state = CONNECTING;
ci->connect();
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
// RESOLVE_ROUTE:
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
// CONNECTING
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_UNREACHABLE:
// CONNECTING
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_REJECTED:
// CONNECTING
- state = REJECTED;
rejectedCallback(ci);
break;
case RDMA_CM_EVENT_ESTABLISHED:
// CONNECTING
- state = ESTABLISHED;
connectedCallback(ci);
break;
case RDMA_CM_EVENT_DISCONNECTED:
// ESTABLISHED
- state = DISCONNECTED;
disconnectedCallback(ci);
break;
default:
- std::cerr << "Warning: unexpected event in " << state << " state - " << eventType << "\n";
- state = ERROR;
- }
-#else
- switch (state) {
- case IDLE:
- std::cerr << "Warning: event in IDLE state\n";
- break;
- case RESOLVE_ADDR:
- switch (eventType) {
- case RDMA_CM_EVENT_ADDR_RESOLVED:
- state = RESOLVE_ROUTE;
- ci->resolve_route();
- break;
- case RDMA_CM_EVENT_ADDR_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to resolve_addr - " << eventType << "\n";
- }
- break;
- case RESOLVE_ROUTE:
- switch (eventType) {
- case RDMA_CM_EVENT_ROUTE_RESOLVED:
- state = CONNECTING;
- ci->connect();
- break;
- case RDMA_CM_EVENT_ROUTE_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to resolve_route - " << eventType << "\n";
- }
- break;
- case CONNECTING:
- switch (eventType) {
- case RDMA_CM_EVENT_CONNECT_RESPONSE:
- std::cerr << "connect_response\n";
- break;
- case RDMA_CM_EVENT_CONNECT_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- case RDMA_CM_EVENT_UNREACHABLE:
- state = ERROR;
- errorCallback(ci);
- break;
- case RDMA_CM_EVENT_REJECTED:
- state = REJECTED;
- rejectedCallback(ci);
- break;
- case RDMA_CM_EVENT_ESTABLISHED:
- state = ESTABLISHED;
- connectedCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to connect - " << eventType << "\n";
- }
- break;
- case ESTABLISHED:
- switch (eventType) {
- case RDMA_CM_EVENT_DISCONNECTED:
- disconnectedCallback(ci);
- break;
- default:
- std::cerr << "Warning: unexpected event in ESTABLISHED state - " << eventType << "\n";
- }
- break;
- case REJECTED:
- std::cerr << "Warning: event in REJECTED state - " << eventType << "\n";
- break;
- case ERROR:
- std::cerr << "Warning: event in ERROR state - " << eventType << "\n";
- break;
- case LISTENING:
- case ACCEPTING:
- std::cerr << "Warning: in an illegal state (and received event!) - " << eventType << "\n";
- break;
+ std::cerr << "Warning: unexpected event in connect: " << eventType << "\n";
}
-#endif
}
}
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index efa0ee7097..5c8d49607c 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -4,6 +4,7 @@
#include "rdma_wrap.h"
#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Mutex.h"
#include <netinet/in.h>
@@ -17,18 +18,6 @@ using qpid::sys::Poller;
namespace Rdma {
class Connection;
- enum ConnectionState {
- IDLE,
- RESOLVE_ADDR,
- RESOLVE_ROUTE,
- LISTENING,
- CONNECTING,
- ACCEPTING,
- ESTABLISHED,
- REJECTED,
- DISCONNECTED,
- ERROR
- };
typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback;
typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback;
@@ -41,16 +30,21 @@ namespace Rdma {
typedef boost::function1<void, AsynchIO&> ErrorCallback;
typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
typedef boost::function1<void, AsynchIO&> IdleCallback;
+ typedef boost::function1<void, AsynchIO&> FullCallback;
QueuePair::intrusive_ptr qp;
DispatchHandle dataHandle;
int bufferSize;
int recvBufferCount;
+ int xmitBufferCount;
+ int outstandingWrites;
std::deque<Buffer*> bufferQueue;
+ qpid::sys::Mutex bufferQueueLock;
boost::ptr_deque<Buffer> buffers;
ReadCallback readCallback;
IdleCallback idleCallback;
+ FullCallback fullCallback;
ErrorCallback errorCallback;
public:
@@ -59,12 +53,12 @@ namespace Rdma {
int s,
ReadCallback rc,
IdleCallback ic,
+ FullCallback fc,
ErrorCallback ec
);
~AsynchIO();
void start(Poller::shared_ptr poller);
- void queueReadBuffer(Buffer* buff);
void queueWrite(Buffer* buff);
void notifyPendingWrite();
void queueWriteClose();
@@ -83,7 +77,6 @@ namespace Rdma {
ErrorCallback errorCallback;
DisconnectedCallback disconnectedCallback;
ConnectionRequestCallback connectionRequestCallback;
- ConnectionState state;
public:
Listener(
@@ -108,7 +101,6 @@ namespace Rdma {
ErrorCallback errorCallback;
DisconnectedCallback disconnectedCallback;
RejectedCallback rejectedCallback;
- ConnectionState state;
public:
Connector(
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
index 488fe28658..f7f739d6c2 100644
--- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -22,12 +22,12 @@ using qpid::sys::Dispatcher;
struct ConRec {
Rdma::Connection::intrusive_ptr connection;
Rdma::AsynchIO* data;
- int outstandingWrites;
+ bool writable;
queue<Rdma::Buffer*> queuedWrites;
ConRec(Rdma::Connection::intrusive_ptr c) :
connection(c),
- outstandingWrites(0)
+ writable(true)
{}
};
@@ -40,23 +40,24 @@ void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
Rdma::Buffer* buf = a.getBuffer();
std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes);
buf->dataCount = b->dataCount;
- if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) {
+ if (cr->queuedWrites.empty() && cr->writable) {
a.queueWrite(buf);
- ++(cr->outstandingWrites);
} else {
cr->queuedWrites.push(buf);
}
}
+void full(ConRec* cr, Rdma::AsynchIO&) {
+ cr->writable = false;
+}
+
void idle(ConRec* cr, Rdma::AsynchIO& a) {
- --(cr->outstandingWrites);
- //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4)
- while (!cr->queuedWrites.empty() && cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) {
- Rdma::Buffer* buf = cr->queuedWrites.front();
- cr->queuedWrites.pop();
- a.queueWrite(buf);
- ++(cr->outstandingWrites);
- }
+ cr->writable = true;
+ while (!cr->queuedWrites.empty() && cr->writable) {
+ Rdma::Buffer* buf = cr->queuedWrites.front();
+ cr->queuedWrites.pop();
+ a.queueWrite(buf);
+ }
}
void disconnected(Rdma::Connection::intrusive_ptr& ci) {
@@ -82,7 +83,7 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
// For fun reject alternate connection attempts
static bool x = false;
- x ^= 1;
+ x = true;
// Must create aio here so as to prepost buffers *before* we accept connection
if (x) {
@@ -91,6 +92,7 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
new Rdma::AsynchIO(ci->getQueuePair(), 8000,
boost::bind(data, cr, _1, _2),
boost::bind(idle, cr, _1),
+ boost::bind(full, cr, _1),
dataError);
ci->addContext(cr);
cr->data = aio;