diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 37 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 126 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 28 |
4 files changed, 66 insertions, 147 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 7c2d9de505..afff96b72f 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -30,8 +30,8 @@ int64_t smsgs = 0; int64_t sbytes = 0; int64_t rmsgs = 0; int64_t rbytes = 0; - -int outstandingwrites = 0; +int64_t cmsgs = 0; +int writable = true; int target = 1000000; int msgsize = 200; @@ -42,17 +42,18 @@ Duration fullTestDuration(TIME_INFINITE); vector<char> testString; void write(Rdma::AsynchIO& aio) { - //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) { - while (smsgs < target && outstandingwrites < (3*Rdma::DEFAULT_WR_ENTRIES/4)) { + if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) { + while (writable) { + if (smsgs >= target) + return; Rdma::Buffer* b = aio.getBuffer(); std::copy(testString.begin(), testString.end(), b->bytes); b->dataCount = msgsize; aio.queueWrite(b); - ++outstandingwrites; ++smsgs; sbytes += b->byteCount; } - //} + } } void dataError(Rdma::AsynchIO&) { @@ -66,24 +67,27 @@ void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) { // When all messages have been recvd stop if (rmsgs < target) { write(aio); - return; + } else { + fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); + if (cmsgs >= target) + p->shutdown(); } +} - fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); - if (outstandingwrites == 0) - p->shutdown(); +void full(Rdma::AsynchIO&) { + writable = false; } void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) { - --outstandingwrites; + writable = true; + ++cmsgs; if (smsgs < target) { write(aio); - return; + } else { + sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); + if (rmsgs >= target && cmsgs >= target) + p->shutdown(); } - - sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); - if (smsgs >= target && rmsgs >= target && outstandingwrites == 0) - p->shutdown(); } void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { @@ -93,6 +97,7 @@ void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize, boost::bind(&data, poller, _1, _2), boost::bind(&idle, poller, _1), + &full, dataError); startTime = AbsTime::now(); diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 31d109ea4d..755d6f17c4 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -9,14 +9,18 @@ namespace Rdma { int s, ReadCallback rc, IdleCallback ic, + FullCallback fc, ErrorCallback ec ) : qp(q), dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0), bufferSize(s), recvBufferCount(DEFAULT_WR_ENTRIES), + xmitBufferCount(DEFAULT_WR_ENTRIES), + outstandingWrites(0), readCallback(rc), idleCallback(ic), + fullCallback(fc), errorCallback(ec) { qp->nonblocking(); @@ -40,20 +44,28 @@ namespace Rdma { dataHandle.startWatch(poller); } - void AsynchIO::queueReadBuffer(Buffer*) { - } - + // TODO: Currently we don't prevent write buffer overrun we just advise + // when to stop writing. void AsynchIO::queueWrite(Buffer* buff) { qp->postSend(buff); + ++outstandingWrites; + if (outstandingWrites >= xmitBufferCount) { + fullCallback(*this); + } } void AsynchIO::notifyPendingWrite() { + // Just perform the idle callback (if possible) + if (outstandingWrites < xmitBufferCount) { + idleCallback(*this); + } } void AsynchIO::queueWriteClose() { } Buffer* AsynchIO::getBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); if (bufferQueue.empty()) { Buffer* b = qp->createBuffer(bufferSize); buffers.push_front(b); @@ -103,7 +115,12 @@ namespace Rdma { // At this point the buffer has been consumed so put it back on the recv queue qp->postRecv(b); } else { + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); bufferQueue.push_front(b); + } + --outstandingWrites; + // TODO: maybe don't call idle unless we're low on write buffers idleCallback(*this); } } while (true); @@ -122,8 +139,7 @@ namespace Rdma { connectedCallback(cc), errorCallback(errc), disconnectedCallback(dc), - connectionRequestCallback(crc), - state(IDLE) + connectionRequestCallback(crc) { ci->nonblocking(); } @@ -131,7 +147,6 @@ namespace Rdma { void Listener::start(Poller::shared_ptr poller) { ci->bind(src_addr); ci->listen(); - state = LISTENING; handle.startWatch(poller); } @@ -194,15 +209,13 @@ namespace Rdma { connectedCallback(cc), errorCallback(errc), disconnectedCallback(dc), - rejectedCallback(rc), - state(IDLE) + rejectedCallback(rc) { ci->nonblocking(); } void Connector::start(Poller::shared_ptr poller) { ci->resolve_addr(dst_addr); - state = RESOLVE_ADDR; handle.startWatch(poller); } @@ -214,138 +227,45 @@ namespace Rdma { return; ::rdma_cm_event_type eventType = e.getEventType(); -#if 1 switch (eventType) { case RDMA_CM_EVENT_ADDR_RESOLVED: // RESOLVE_ADDR - state = RESOLVE_ROUTE; ci->resolve_route(); break; case RDMA_CM_EVENT_ADDR_ERROR: // RESOLVE_ADDR - state = ERROR; errorCallback(ci); break; case RDMA_CM_EVENT_ROUTE_RESOLVED: // RESOLVE_ROUTE: - state = CONNECTING; ci->connect(); break; case RDMA_CM_EVENT_ROUTE_ERROR: // RESOLVE_ROUTE: - state = ERROR; errorCallback(ci); break; case RDMA_CM_EVENT_CONNECT_ERROR: // CONNECTING - state = ERROR; errorCallback(ci); break; case RDMA_CM_EVENT_UNREACHABLE: // CONNECTING - state = ERROR; errorCallback(ci); break; case RDMA_CM_EVENT_REJECTED: // CONNECTING - state = REJECTED; rejectedCallback(ci); break; case RDMA_CM_EVENT_ESTABLISHED: // CONNECTING - state = ESTABLISHED; connectedCallback(ci); break; case RDMA_CM_EVENT_DISCONNECTED: // ESTABLISHED - state = DISCONNECTED; disconnectedCallback(ci); break; default: - std::cerr << "Warning: unexpected event in " << state << " state - " << eventType << "\n"; - state = ERROR; - } -#else - switch (state) { - case IDLE: - std::cerr << "Warning: event in IDLE state\n"; - break; - case RESOLVE_ADDR: - switch (eventType) { - case RDMA_CM_EVENT_ADDR_RESOLVED: - state = RESOLVE_ROUTE; - ci->resolve_route(); - break; - case RDMA_CM_EVENT_ADDR_ERROR: - state = ERROR; - errorCallback(ci); - break; - default: - state = ERROR; - std::cerr << "Warning: unexpected response to resolve_addr - " << eventType << "\n"; - } - break; - case RESOLVE_ROUTE: - switch (eventType) { - case RDMA_CM_EVENT_ROUTE_RESOLVED: - state = CONNECTING; - ci->connect(); - break; - case RDMA_CM_EVENT_ROUTE_ERROR: - state = ERROR; - errorCallback(ci); - break; - default: - state = ERROR; - std::cerr << "Warning: unexpected response to resolve_route - " << eventType << "\n"; - } - break; - case CONNECTING: - switch (eventType) { - case RDMA_CM_EVENT_CONNECT_RESPONSE: - std::cerr << "connect_response\n"; - break; - case RDMA_CM_EVENT_CONNECT_ERROR: - state = ERROR; - errorCallback(ci); - break; - case RDMA_CM_EVENT_UNREACHABLE: - state = ERROR; - errorCallback(ci); - break; - case RDMA_CM_EVENT_REJECTED: - state = REJECTED; - rejectedCallback(ci); - break; - case RDMA_CM_EVENT_ESTABLISHED: - state = ESTABLISHED; - connectedCallback(ci); - break; - default: - state = ERROR; - std::cerr << "Warning: unexpected response to connect - " << eventType << "\n"; - } - break; - case ESTABLISHED: - switch (eventType) { - case RDMA_CM_EVENT_DISCONNECTED: - disconnectedCallback(ci); - break; - default: - std::cerr << "Warning: unexpected event in ESTABLISHED state - " << eventType << "\n"; - } - break; - case REJECTED: - std::cerr << "Warning: event in REJECTED state - " << eventType << "\n"; - break; - case ERROR: - std::cerr << "Warning: event in ERROR state - " << eventType << "\n"; - break; - case LISTENING: - case ACCEPTING: - std::cerr << "Warning: in an illegal state (and received event!) - " << eventType << "\n"; - break; + std::cerr << "Warning: unexpected event in connect: " << eventType << "\n"; } -#endif } } diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index efa0ee7097..5c8d49607c 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -4,6 +4,7 @@ #include "rdma_wrap.h" #include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Mutex.h" #include <netinet/in.h> @@ -17,18 +18,6 @@ using qpid::sys::Poller; namespace Rdma { class Connection; - enum ConnectionState { - IDLE, - RESOLVE_ADDR, - RESOLVE_ROUTE, - LISTENING, - CONNECTING, - ACCEPTING, - ESTABLISHED, - REJECTED, - DISCONNECTED, - ERROR - }; typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback; typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback; @@ -41,16 +30,21 @@ namespace Rdma { typedef boost::function1<void, AsynchIO&> ErrorCallback; typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; typedef boost::function1<void, AsynchIO&> IdleCallback; + typedef boost::function1<void, AsynchIO&> FullCallback; QueuePair::intrusive_ptr qp; DispatchHandle dataHandle; int bufferSize; int recvBufferCount; + int xmitBufferCount; + int outstandingWrites; std::deque<Buffer*> bufferQueue; + qpid::sys::Mutex bufferQueueLock; boost::ptr_deque<Buffer> buffers; ReadCallback readCallback; IdleCallback idleCallback; + FullCallback fullCallback; ErrorCallback errorCallback; public: @@ -59,12 +53,12 @@ namespace Rdma { int s, ReadCallback rc, IdleCallback ic, + FullCallback fc, ErrorCallback ec ); ~AsynchIO(); void start(Poller::shared_ptr poller); - void queueReadBuffer(Buffer* buff); void queueWrite(Buffer* buff); void notifyPendingWrite(); void queueWriteClose(); @@ -83,7 +77,6 @@ namespace Rdma { ErrorCallback errorCallback; DisconnectedCallback disconnectedCallback; ConnectionRequestCallback connectionRequestCallback; - ConnectionState state; public: Listener( @@ -108,7 +101,6 @@ namespace Rdma { ErrorCallback errorCallback; DisconnectedCallback disconnectedCallback; RejectedCallback rejectedCallback; - ConnectionState state; public: Connector( diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index 488fe28658..f7f739d6c2 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -22,12 +22,12 @@ using qpid::sys::Dispatcher; struct ConRec { Rdma::Connection::intrusive_ptr connection; Rdma::AsynchIO* data; - int outstandingWrites; + bool writable; queue<Rdma::Buffer*> queuedWrites; ConRec(Rdma::Connection::intrusive_ptr c) : connection(c), - outstandingWrites(0) + writable(true) {} }; @@ -40,23 +40,24 @@ void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { Rdma::Buffer* buf = a.getBuffer(); std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes); buf->dataCount = b->dataCount; - if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { + if (cr->queuedWrites.empty() && cr->writable) { a.queueWrite(buf); - ++(cr->outstandingWrites); } else { cr->queuedWrites.push(buf); } } +void full(ConRec* cr, Rdma::AsynchIO&) { + cr->writable = false; +} + void idle(ConRec* cr, Rdma::AsynchIO& a) { - --(cr->outstandingWrites); - //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4) - while (!cr->queuedWrites.empty() && cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { - Rdma::Buffer* buf = cr->queuedWrites.front(); - cr->queuedWrites.pop(); - a.queueWrite(buf); - ++(cr->outstandingWrites); - } + cr->writable = true; + while (!cr->queuedWrites.empty() && cr->writable) { + Rdma::Buffer* buf = cr->queuedWrites.front(); + cr->queuedWrites.pop(); + a.queueWrite(buf); + } } void disconnected(Rdma::Connection::intrusive_ptr& ci) { @@ -82,7 +83,7 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { // For fun reject alternate connection attempts static bool x = false; - x ^= 1; + x = true; // Must create aio here so as to prepost buffers *before* we accept connection if (x) { @@ -91,6 +92,7 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { new Rdma::AsynchIO(ci->getQueuePair(), 8000, boost::bind(data, cr, _1, _2), boost::bind(idle, cr, _1), + boost::bind(full, cr, _1), dataError); ci->addContext(cr); cr->data = aio; |