From 9048ed46bb240aa3839f74d8b6daf837592186be Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Wed, 8 Sep 2010 16:48:58 +0000 Subject: Refactored Rdma write buffers to be controlled by the rdma_wrapper layer git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995131 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/rdma/RdmaIO.cpp | 25 ++------------- cpp/src/qpid/sys/rdma/RdmaIO.h | 14 ++++++--- cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 61 ++++++++++++++++++++++--------------- cpp/src/qpid/sys/rdma/rdma_wrap.h | 23 ++++++++++---- 4 files changed, 67 insertions(+), 56 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 3fb4395660..a72ed12af7 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -63,11 +63,8 @@ namespace Rdma { // Prepost recv buffers before we go any further qp->allocateRecvBuffers(recvBufferCount, bufferSize); - for (int i = 0; icreateBuffer(bufferSize); - bufferQueue.push_front(b); - } + // Create xmit buffers + qp->createSendBuffers(xmitBufferCount, bufferSize); } AsynchIO::~AsynchIO() { @@ -427,10 +424,7 @@ namespace Rdma { } } else { ++sendEvents; - { - qpid::sys::ScopedLock l(bufferQueueLock); - bufferQueue.push_front(b); - } + returnBuffer(b); --outstandingWrites; } } while (true); @@ -480,19 +474,6 @@ namespace Rdma { nc(*this); } - Buffer* AsynchIO::getBuffer() { - qpid::sys::ScopedLock l(bufferQueueLock); - assert(!bufferQueue.empty()); - Buffer* b = bufferQueue.front(); - bufferQueue.pop_front(); - return b; - } - - void AsynchIO::returnBuffer(Buffer* b) { - qpid::sys::ScopedLock l(bufferQueueLock); - bufferQueue.push_front(b); - } - ConnectionManager::ConnectionManager( ErrorCallback errc, DisconnectedCallback dc diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 9f55a7be7c..5876646b96 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -32,7 +32,6 @@ #include #include -#include namespace Rdma { @@ -56,8 +55,6 @@ namespace Rdma { enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN }; qpid::sys::AtomicValue state; //qpid::sys::Mutex stateLock; - std::deque bufferQueue; - qpid::sys::Mutex bufferQueueLock; QueuePair::intrusive_ptr qp; qpid::sys::DispatchHandleRef dataHandle; @@ -126,8 +123,17 @@ namespace Rdma { } inline bool AsynchIO::bufferAvailable() const { - return !bufferQueue.empty(); + return qp->bufferAvailable(); } + + inline Buffer* AsynchIO::getBuffer() { + return qp->getBuffer(); + } + + inline void AsynchIO::returnBuffer(Buffer* b) { + qp->returnBuffer(b); + } + // These are the parameters necessary to start the conversation // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer // * Each peer HAS to know the initial "credit" it has for transmitting to its peer diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index 071d453933..c286782c96 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -50,33 +50,14 @@ namespace Rdma { return count; } - Buffer::Buffer(::ibv_pd* pd, const int32_t s) : - bufferSize(s), - mr(CHECK_NULL(::ibv_reg_mr( - pd, new char[s], s, - ::IBV_ACCESS_LOCAL_WRITE))) - { - sge.addr = (uintptr_t) mr->addr; - sge.length = 0; - sge.lkey = mr->lkey; - } - Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) : - bufferSize(byteCount), - mr(0) + bufferSize(byteCount) { sge.addr = (uintptr_t) bytes; sge.length = 0; sge.lkey = lkey; } - Buffer::~Buffer() { - if (mr) { - (void) ::ibv_dereg_mr(mr); - delete [] bytes(); - } - } - QueuePairEvent::QueuePairEvent() : dir(NONE) {} @@ -169,16 +150,48 @@ namespace Rdma { // Deallocate recv buffer memory if (rmr) delete [] static_cast(rmr->addr); + // Deallocate recv buffer memory + if (smr) delete [] static_cast(smr->addr); + // The buffers ptr_deque automatically deletes all the buffers we've allocated } - // Create a buffer to use for writing - Buffer* QueuePair::createBuffer(int s) { - Buffer* b = new Buffer(pd.get(), s); - buffers.push_front(b); + // Create buffers to use for writing + void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize) + { + assert(!smr); + + // Round up buffersize to cacheline (64 bytes) + bufferSize = (bufferSize+63) & (~63); + + // Allocate memory block for all receive buffers + char* mem = new char [sendBufferCount * bufferSize]; + smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE); + for (int i = 0; ilkey, &mem[i*bufferSize], bufferSize); + buffers.push_front(b); + bufferQueue.push_back(b); + } + } + + Buffer* QueuePair::getBuffer() { + qpid::sys::ScopedLock l(bufferQueueLock); + assert(!bufferQueue.empty()); + Buffer* b = bufferQueue.back(); + bufferQueue.pop_back(); return b; } + void QueuePair::returnBuffer(Buffer* b) { + qpid::sys::ScopedLock l(bufferQueueLock); + bufferQueue.push_back(b); + } + + bool QueuePair::bufferAvailable() const { + return !bufferQueue.empty(); + } + void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize) { assert(!rmr); diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 73d133a306..f951dcb0af 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -25,11 +25,14 @@ #include "qpid/RefCounted.h" #include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" #include #include #include +#include + namespace qpid { namespace sys { class SocketAddress; @@ -53,13 +56,9 @@ namespace Rdma { int32_t dataCount() const; void dataCount(int32_t); - Buffer(::ibv_pd* pd, const int32_t s); - ~Buffer(); - private: Buffer(uint32_t lkey, char* bytes, const int32_t byteCount); const int32_t bufferSize; - ::ibv_mr* mr; ::ibv_sge sge; }; @@ -117,6 +116,7 @@ namespace Rdma { friend class Connection; boost::shared_ptr< ::ibv_pd > pd; + boost::shared_ptr< ::ibv_mr > smr; boost::shared_ptr< ::ibv_mr > rmr; boost::shared_ptr< ::ibv_comp_channel > cchannel; boost::shared_ptr< ::ibv_cq > scq; @@ -125,6 +125,8 @@ namespace Rdma { int outstandingSendEvents; int outstandingRecvEvents; boost::ptr_deque buffers; + qpid::sys::Mutex bufferQueueLock; + std::vector bufferQueue; QueuePair(boost::shared_ptr< ::rdma_cm_id > id); ~QueuePair(); @@ -132,8 +134,17 @@ namespace Rdma { public: typedef boost::intrusive_ptr intrusive_ptr; - // Create a buffer to use for writing - Buffer* createBuffer(int s); + // Create a buffers to use for writing + void createSendBuffers(int sendBufferCount, int bufferSize); + + // Get a send buffer + Buffer* getBuffer(); + + // Return buffer to pool after use + void returnBuffer(Buffer* b); + + // Check whether any buffers are available + bool bufferAvailable() const; // Create and post recv buffers void allocateRecvBuffers(int recvBufferCount, int bufferSize); -- cgit v1.2.1