summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-04-29 22:46:23 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-04-29 22:46:23 +0000
commitc86a77f2ce6150ce8fc0770604d92502acd996b8 (patch)
tree77e5057ee8f96ffe07eb5abfac2d4c06c21823d6 /cpp/src/qpid/sys
parent0c8f372f71409444dd9f3bc38c481c1ec6ba4827 (diff)
downloadqpid-python-c86a77f2ce6150ce8fc0770604d92502acd996b8.tar.gz
More RDMA Work in Progress
Changes to client buffering Buffering improvement to server Removed unused state machine from RdmaIO code Move the write throttling due to limited write buffers into the RdmaIO code git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652180 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaClient.cpp37
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp126
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h22
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaServer.cpp28
4 files changed, 66 insertions, 147 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
index 7c2d9de505..afff96b72f 100644
--- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -30,8 +30,8 @@ int64_t smsgs = 0;
int64_t sbytes = 0;
int64_t rmsgs = 0;
int64_t rbytes = 0;
-
-int outstandingwrites = 0;
+int64_t cmsgs = 0;
+int writable = true;
int target = 1000000;
int msgsize = 200;
@@ -42,17 +42,18 @@ Duration fullTestDuration(TIME_INFINITE);
vector<char> testString;
void write(Rdma::AsynchIO& aio) {
- //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
- while (smsgs < target && outstandingwrites < (3*Rdma::DEFAULT_WR_ENTRIES/4)) {
+ if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
+ while (writable) {
+ if (smsgs >= target)
+ return;
Rdma::Buffer* b = aio.getBuffer();
std::copy(testString.begin(), testString.end(), b->bytes);
b->dataCount = msgsize;
aio.queueWrite(b);
- ++outstandingwrites;
++smsgs;
sbytes += b->byteCount;
}
- //}
+ }
}
void dataError(Rdma::AsynchIO&) {
@@ -66,24 +67,27 @@ void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
// When all messages have been recvd stop
if (rmsgs < target) {
write(aio);
- return;
+ } else {
+ fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
+ if (cmsgs >= target)
+ p->shutdown();
}
+}
- fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
- if (outstandingwrites == 0)
- p->shutdown();
+void full(Rdma::AsynchIO&) {
+ writable = false;
}
void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
- --outstandingwrites;
+ writable = true;
+ ++cmsgs;
if (smsgs < target) {
write(aio);
- return;
+ } else {
+ sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
+ if (rmsgs >= target && cmsgs >= target)
+ p->shutdown();
}
-
- sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
- if (smsgs >= target && rmsgs >= target && outstandingwrites == 0)
- p->shutdown();
}
void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
@@ -93,6 +97,7 @@ void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
boost::bind(&data, poller, _1, _2),
boost::bind(&idle, poller, _1),
+ &full,
dataError);
startTime = AbsTime::now();
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 31d109ea4d..755d6f17c4 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -9,14 +9,18 @@ namespace Rdma {
int s,
ReadCallback rc,
IdleCallback ic,
+ FullCallback fc,
ErrorCallback ec
) :
qp(q),
dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
bufferSize(s),
recvBufferCount(DEFAULT_WR_ENTRIES),
+ xmitBufferCount(DEFAULT_WR_ENTRIES),
+ outstandingWrites(0),
readCallback(rc),
idleCallback(ic),
+ fullCallback(fc),
errorCallback(ec)
{
qp->nonblocking();
@@ -40,20 +44,28 @@ namespace Rdma {
dataHandle.startWatch(poller);
}
- void AsynchIO::queueReadBuffer(Buffer*) {
- }
-
+ // TODO: Currently we don't prevent write buffer overrun we just advise
+ // when to stop writing.
void AsynchIO::queueWrite(Buffer* buff) {
qp->postSend(buff);
+ ++outstandingWrites;
+ if (outstandingWrites >= xmitBufferCount) {
+ fullCallback(*this);
+ }
}
void AsynchIO::notifyPendingWrite() {
+ // Just perform the idle callback (if possible)
+ if (outstandingWrites < xmitBufferCount) {
+ idleCallback(*this);
+ }
}
void AsynchIO::queueWriteClose() {
}
Buffer* AsynchIO::getBuffer() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
if (bufferQueue.empty()) {
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
@@ -103,7 +115,12 @@ namespace Rdma {
// At this point the buffer has been consumed so put it back on the recv queue
qp->postRecv(b);
} else {
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
bufferQueue.push_front(b);
+ }
+ --outstandingWrites;
+ // TODO: maybe don't call idle unless we're low on write buffers
idleCallback(*this);
}
} while (true);
@@ -122,8 +139,7 @@ namespace Rdma {
connectedCallback(cc),
errorCallback(errc),
disconnectedCallback(dc),
- connectionRequestCallback(crc),
- state(IDLE)
+ connectionRequestCallback(crc)
{
ci->nonblocking();
}
@@ -131,7 +147,6 @@ namespace Rdma {
void Listener::start(Poller::shared_ptr poller) {
ci->bind(src_addr);
ci->listen();
- state = LISTENING;
handle.startWatch(poller);
}
@@ -194,15 +209,13 @@ namespace Rdma {
connectedCallback(cc),
errorCallback(errc),
disconnectedCallback(dc),
- rejectedCallback(rc),
- state(IDLE)
+ rejectedCallback(rc)
{
ci->nonblocking();
}
void Connector::start(Poller::shared_ptr poller) {
ci->resolve_addr(dst_addr);
- state = RESOLVE_ADDR;
handle.startWatch(poller);
}
@@ -214,138 +227,45 @@ namespace Rdma {
return;
::rdma_cm_event_type eventType = e.getEventType();
-#if 1
switch (eventType) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
// RESOLVE_ADDR
- state = RESOLVE_ROUTE;
ci->resolve_route();
break;
case RDMA_CM_EVENT_ADDR_ERROR:
// RESOLVE_ADDR
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
// RESOLVE_ROUTE:
- state = CONNECTING;
ci->connect();
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
// RESOLVE_ROUTE:
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
// CONNECTING
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_UNREACHABLE:
// CONNECTING
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_REJECTED:
// CONNECTING
- state = REJECTED;
rejectedCallback(ci);
break;
case RDMA_CM_EVENT_ESTABLISHED:
// CONNECTING
- state = ESTABLISHED;
connectedCallback(ci);
break;
case RDMA_CM_EVENT_DISCONNECTED:
// ESTABLISHED
- state = DISCONNECTED;
disconnectedCallback(ci);
break;
default:
- std::cerr << "Warning: unexpected event in " << state << " state - " << eventType << "\n";
- state = ERROR;
- }
-#else
- switch (state) {
- case IDLE:
- std::cerr << "Warning: event in IDLE state\n";
- break;
- case RESOLVE_ADDR:
- switch (eventType) {
- case RDMA_CM_EVENT_ADDR_RESOLVED:
- state = RESOLVE_ROUTE;
- ci->resolve_route();
- break;
- case RDMA_CM_EVENT_ADDR_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to resolve_addr - " << eventType << "\n";
- }
- break;
- case RESOLVE_ROUTE:
- switch (eventType) {
- case RDMA_CM_EVENT_ROUTE_RESOLVED:
- state = CONNECTING;
- ci->connect();
- break;
- case RDMA_CM_EVENT_ROUTE_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to resolve_route - " << eventType << "\n";
- }
- break;
- case CONNECTING:
- switch (eventType) {
- case RDMA_CM_EVENT_CONNECT_RESPONSE:
- std::cerr << "connect_response\n";
- break;
- case RDMA_CM_EVENT_CONNECT_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- case RDMA_CM_EVENT_UNREACHABLE:
- state = ERROR;
- errorCallback(ci);
- break;
- case RDMA_CM_EVENT_REJECTED:
- state = REJECTED;
- rejectedCallback(ci);
- break;
- case RDMA_CM_EVENT_ESTABLISHED:
- state = ESTABLISHED;
- connectedCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to connect - " << eventType << "\n";
- }
- break;
- case ESTABLISHED:
- switch (eventType) {
- case RDMA_CM_EVENT_DISCONNECTED:
- disconnectedCallback(ci);
- break;
- default:
- std::cerr << "Warning: unexpected event in ESTABLISHED state - " << eventType << "\n";
- }
- break;
- case REJECTED:
- std::cerr << "Warning: event in REJECTED state - " << eventType << "\n";
- break;
- case ERROR:
- std::cerr << "Warning: event in ERROR state - " << eventType << "\n";
- break;
- case LISTENING:
- case ACCEPTING:
- std::cerr << "Warning: in an illegal state (and received event!) - " << eventType << "\n";
- break;
+ std::cerr << "Warning: unexpected event in connect: " << eventType << "\n";
}
-#endif
}
}
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index efa0ee7097..5c8d49607c 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -4,6 +4,7 @@
#include "rdma_wrap.h"
#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Mutex.h"
#include <netinet/in.h>
@@ -17,18 +18,6 @@ using qpid::sys::Poller;
namespace Rdma {
class Connection;
- enum ConnectionState {
- IDLE,
- RESOLVE_ADDR,
- RESOLVE_ROUTE,
- LISTENING,
- CONNECTING,
- ACCEPTING,
- ESTABLISHED,
- REJECTED,
- DISCONNECTED,
- ERROR
- };
typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback;
typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback;
@@ -41,16 +30,21 @@ namespace Rdma {
typedef boost::function1<void, AsynchIO&> ErrorCallback;
typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
typedef boost::function1<void, AsynchIO&> IdleCallback;
+ typedef boost::function1<void, AsynchIO&> FullCallback;
QueuePair::intrusive_ptr qp;
DispatchHandle dataHandle;
int bufferSize;
int recvBufferCount;
+ int xmitBufferCount;
+ int outstandingWrites;
std::deque<Buffer*> bufferQueue;
+ qpid::sys::Mutex bufferQueueLock;
boost::ptr_deque<Buffer> buffers;
ReadCallback readCallback;
IdleCallback idleCallback;
+ FullCallback fullCallback;
ErrorCallback errorCallback;
public:
@@ -59,12 +53,12 @@ namespace Rdma {
int s,
ReadCallback rc,
IdleCallback ic,
+ FullCallback fc,
ErrorCallback ec
);
~AsynchIO();
void start(Poller::shared_ptr poller);
- void queueReadBuffer(Buffer* buff);
void queueWrite(Buffer* buff);
void notifyPendingWrite();
void queueWriteClose();
@@ -83,7 +77,6 @@ namespace Rdma {
ErrorCallback errorCallback;
DisconnectedCallback disconnectedCallback;
ConnectionRequestCallback connectionRequestCallback;
- ConnectionState state;
public:
Listener(
@@ -108,7 +101,6 @@ namespace Rdma {
ErrorCallback errorCallback;
DisconnectedCallback disconnectedCallback;
RejectedCallback rejectedCallback;
- ConnectionState state;
public:
Connector(
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
index 488fe28658..f7f739d6c2 100644
--- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -22,12 +22,12 @@ using qpid::sys::Dispatcher;
struct ConRec {
Rdma::Connection::intrusive_ptr connection;
Rdma::AsynchIO* data;
- int outstandingWrites;
+ bool writable;
queue<Rdma::Buffer*> queuedWrites;
ConRec(Rdma::Connection::intrusive_ptr c) :
connection(c),
- outstandingWrites(0)
+ writable(true)
{}
};
@@ -40,23 +40,24 @@ void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
Rdma::Buffer* buf = a.getBuffer();
std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes);
buf->dataCount = b->dataCount;
- if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) {
+ if (cr->queuedWrites.empty() && cr->writable) {
a.queueWrite(buf);
- ++(cr->outstandingWrites);
} else {
cr->queuedWrites.push(buf);
}
}
+void full(ConRec* cr, Rdma::AsynchIO&) {
+ cr->writable = false;
+}
+
void idle(ConRec* cr, Rdma::AsynchIO& a) {
- --(cr->outstandingWrites);
- //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4)
- while (!cr->queuedWrites.empty() && cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) {
- Rdma::Buffer* buf = cr->queuedWrites.front();
- cr->queuedWrites.pop();
- a.queueWrite(buf);
- ++(cr->outstandingWrites);
- }
+ cr->writable = true;
+ while (!cr->queuedWrites.empty() && cr->writable) {
+ Rdma::Buffer* buf = cr->queuedWrites.front();
+ cr->queuedWrites.pop();
+ a.queueWrite(buf);
+ }
}
void disconnected(Rdma::Connection::intrusive_ptr& ci) {
@@ -82,7 +83,7 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
// For fun reject alternate connection attempts
static bool x = false;
- x ^= 1;
+ x = true;
// Must create aio here so as to prepost buffers *before* we accept connection
if (x) {
@@ -91,6 +92,7 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
new Rdma::AsynchIO(ci->getQueuePair(), 8000,
boost::bind(data, cr, _1, _2),
boost::bind(idle, cr, _1),
+ boost::bind(full, cr, _1),
dataError);
ci->addContext(cr);
cr->data = aio;