diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-09-08 16:48:53 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-09-08 16:48:53 +0000 |
commit | 8db918da6cb8d883c2f6c506823293c9029f1b18 (patch) | |
tree | de9050bc4c795bd0c0a297a2281874701202b566 /cpp | |
parent | 067cc9ea6978d9f018f1d0ef9f741ac1a7b5343b (diff) | |
download | qpid-python-8db918da6cb8d883c2f6c506823293c9029f1b18.tar.gz |
Move rdma recv buffers to a single large allocation rather than piecemeal allocations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995130 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_factories.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_factories.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.h | 5 |
5 files changed, 53 insertions, 10 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index aa0dfbd5a4..3fb4395660 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -60,12 +60,8 @@ namespace Rdma { qp->notifyRecv(); qp->notifySend(); - // Prepost some recv buffers before we go any further - for (int i = 0; i<recvBufferCount; ++i) { - // Allocate recv buffer - Buffer* b = qp->createBuffer(bufferSize); - qp->postRecv(b); - } + // Prepost recv buffers before we go any further + qp->allocateRecvBuffers(recvBufferCount, bufferSize); for (int i = 0; i<xmitBufferCount; ++i) { // Allocate xmit buffer diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.cpp b/cpp/src/qpid/sys/rdma/rdma_factories.cpp index 69741438fa..7090f12e89 100644 --- a/cpp/src/qpid/sys/rdma/rdma_factories.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_factories.cpp @@ -42,6 +42,10 @@ namespace Rdma { if (p) (void) ::ibv_dealloc_pd(p); } + void deregMr(::ibv_mr* mr) throw () { + if (mr) (void) ::ibv_dereg_mr(mr); + } + void destroyCChannel(::ibv_comp_channel* c) throw () { if (c) (void) ::ibv_destroy_comp_channel(c); } @@ -79,10 +83,15 @@ namespace Rdma { } boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) { - ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c)); + ::ibv_pd* pd = CHECK_NULL(::ibv_alloc_pd(c)); return boost::shared_ptr< ::ibv_pd >(pd, deallocPd); } + boost::shared_ptr< ::ibv_mr > regMr(::ibv_pd* pd, void* addr, size_t length, int access) { + ::ibv_mr* mr = CHECK_NULL(::ibv_reg_mr(pd, addr, length, access)); + return boost::shared_ptr< ::ibv_mr >(mr, deregMr); + } + boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) { ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c)); return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel); @@ -90,7 +99,7 @@ namespace Rdma { boost::shared_ptr< ::ibv_cq > mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) { - ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0)); + ::ibv_cq* cq = CHECK_NULL(::ibv_create_cq(c, cqe, context, cc, 0)); return boost::shared_ptr< ::ibv_cq >(cq, destroyCq); } } diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.h b/cpp/src/qpid/sys/rdma/rdma_factories.h index 3432baf08c..eded68976c 100644 --- a/cpp/src/qpid/sys/rdma/rdma_factories.h +++ b/cpp/src/qpid/sys/rdma/rdma_factories.h @@ -32,6 +32,7 @@ namespace Rdma { boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e); boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp); boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c); + boost::shared_ptr< ::ibv_mr > regMr(::ibv_pd* pd, void* addr, size_t length, int access); boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c); boost::shared_ptr< ::ibv_cq > mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc); } diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index b046b012db..071d453933 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -61,9 +61,20 @@ namespace Rdma { sge.lkey = mr->lkey; } + Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) : + bufferSize(byteCount), + mr(0) + { + sge.addr = (uintptr_t) bytes; + sge.length = 0; + sge.lkey = lkey; + } + Buffer::~Buffer() { - (void) ::ibv_dereg_mr(mr); - delete [] bytes(); + if (mr) { + (void) ::ibv_dereg_mr(mr); + delete [] bytes(); + } } QueuePairEvent::QueuePairEvent() : @@ -155,6 +166,9 @@ namespace Rdma { // Reset back pointer in case someone else has the qp qp->qp_context = 0; + // Deallocate recv buffer memory + if (rmr) delete [] static_cast<char*>(rmr->addr); + // The buffers ptr_deque automatically deletes all the buffers we've allocated } @@ -165,6 +179,24 @@ namespace Rdma { return b; } + void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize) + { + assert(!rmr); + + // Round up buffersize to cacheline (64 bytes) + bufferSize = (bufferSize+63) & (~63); + + // Allocate memory block for all receive buffers + char* mem = new char [recvBufferCount * bufferSize]; + rmr = regMr(pd.get(), mem, recvBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE); + for (int i = 0; i<recvBufferCount; ++i) { + // Allocate recv buffer + Buffer* b = new Buffer(rmr->lkey, &mem[i*bufferSize], bufferSize); + buffers.push_front(b); + postRecv(b); + } + } + // Make channel non-blocking by making // associated fd nonblocking void QueuePair::nonblocking() { diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 488cf8c646..73d133a306 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -57,6 +57,7 @@ namespace Rdma { ~Buffer(); private: + Buffer(uint32_t lkey, char* bytes, const int32_t byteCount); const int32_t bufferSize; ::ibv_mr* mr; ::ibv_sge sge; @@ -116,6 +117,7 @@ namespace Rdma { friend class Connection; boost::shared_ptr< ::ibv_pd > pd; + boost::shared_ptr< ::ibv_mr > rmr; boost::shared_ptr< ::ibv_comp_channel > cchannel; boost::shared_ptr< ::ibv_cq > scq; boost::shared_ptr< ::ibv_cq > rcq; @@ -133,6 +135,9 @@ namespace Rdma { // Create a buffer to use for writing Buffer* createBuffer(int s); + // Create and post recv buffers + void allocateRecvBuffers(int recvBufferCount, int bufferSize); + // Make channel non-blocking by making // associated fd nonblocking void nonblocking(); |