summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-09-08 16:48:53 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-09-08 16:48:53 +0000
commit8db918da6cb8d883c2f6c506823293c9029f1b18 (patch)
treede9050bc4c795bd0c0a297a2281874701202b566 /cpp
parent067cc9ea6978d9f018f1d0ef9f741ac1a7b5343b (diff)
downloadqpid-python-8db918da6cb8d883c2f6c506823293c9029f1b18.tar.gz
Move rdma recv buffers to a single large allocation rather than piecemeal allocations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995130 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp8
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_factories.cpp13
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_factories.h1
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp36
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.h5
5 files changed, 53 insertions, 10 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index aa0dfbd5a4..3fb4395660 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -60,12 +60,8 @@ namespace Rdma {
qp->notifyRecv();
qp->notifySend();
- // Prepost some recv buffers before we go any further
- for (int i = 0; i<recvBufferCount; ++i) {
- // Allocate recv buffer
- Buffer* b = qp->createBuffer(bufferSize);
- qp->postRecv(b);
- }
+ // Prepost recv buffers before we go any further
+ qp->allocateRecvBuffers(recvBufferCount, bufferSize);
for (int i = 0; i<xmitBufferCount; ++i) {
// Allocate xmit buffer
diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.cpp b/cpp/src/qpid/sys/rdma/rdma_factories.cpp
index 69741438fa..7090f12e89 100644
--- a/cpp/src/qpid/sys/rdma/rdma_factories.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_factories.cpp
@@ -42,6 +42,10 @@ namespace Rdma {
if (p) (void) ::ibv_dealloc_pd(p);
}
+ void deregMr(::ibv_mr* mr) throw () {
+ if (mr) (void) ::ibv_dereg_mr(mr);
+ }
+
void destroyCChannel(::ibv_comp_channel* c) throw () {
if (c) (void) ::ibv_destroy_comp_channel(c);
}
@@ -79,10 +83,15 @@ namespace Rdma {
}
boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) {
- ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c));
+ ::ibv_pd* pd = CHECK_NULL(::ibv_alloc_pd(c));
return boost::shared_ptr< ::ibv_pd >(pd, deallocPd);
}
+ boost::shared_ptr< ::ibv_mr > regMr(::ibv_pd* pd, void* addr, size_t length, int access) {
+ ::ibv_mr* mr = CHECK_NULL(::ibv_reg_mr(pd, addr, length, access));
+ return boost::shared_ptr< ::ibv_mr >(mr, deregMr);
+ }
+
boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) {
::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c));
return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel);
@@ -90,7 +99,7 @@ namespace Rdma {
boost::shared_ptr< ::ibv_cq >
mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) {
- ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0));
+ ::ibv_cq* cq = CHECK_NULL(::ibv_create_cq(c, cqe, context, cc, 0));
return boost::shared_ptr< ::ibv_cq >(cq, destroyCq);
}
}
diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.h b/cpp/src/qpid/sys/rdma/rdma_factories.h
index 3432baf08c..eded68976c 100644
--- a/cpp/src/qpid/sys/rdma/rdma_factories.h
+++ b/cpp/src/qpid/sys/rdma/rdma_factories.h
@@ -32,6 +32,7 @@ namespace Rdma {
boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e);
boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp);
boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c);
+ boost::shared_ptr< ::ibv_mr > regMr(::ibv_pd* pd, void* addr, size_t length, int access);
boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c);
boost::shared_ptr< ::ibv_cq > mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc);
}
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index b046b012db..071d453933 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -61,9 +61,20 @@ namespace Rdma {
sge.lkey = mr->lkey;
}
+ Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) :
+ bufferSize(byteCount),
+ mr(0)
+ {
+ sge.addr = (uintptr_t) bytes;
+ sge.length = 0;
+ sge.lkey = lkey;
+ }
+
Buffer::~Buffer() {
- (void) ::ibv_dereg_mr(mr);
- delete [] bytes();
+ if (mr) {
+ (void) ::ibv_dereg_mr(mr);
+ delete [] bytes();
+ }
}
QueuePairEvent::QueuePairEvent() :
@@ -155,6 +166,9 @@ namespace Rdma {
// Reset back pointer in case someone else has the qp
qp->qp_context = 0;
+ // Deallocate recv buffer memory
+ if (rmr) delete [] static_cast<char*>(rmr->addr);
+
// The buffers ptr_deque automatically deletes all the buffers we've allocated
}
@@ -165,6 +179,24 @@ namespace Rdma {
return b;
}
+ void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize)
+ {
+ assert(!rmr);
+
+ // Round up buffersize to cacheline (64 bytes)
+ bufferSize = (bufferSize+63) & (~63);
+
+ // Allocate memory block for all receive buffers
+ char* mem = new char [recvBufferCount * bufferSize];
+ rmr = regMr(pd.get(), mem, recvBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+ for (int i = 0; i<recvBufferCount; ++i) {
+ // Allocate recv buffer
+ Buffer* b = new Buffer(rmr->lkey, &mem[i*bufferSize], bufferSize);
+ buffers.push_front(b);
+ postRecv(b);
+ }
+ }
+
// Make channel non-blocking by making
// associated fd nonblocking
void QueuePair::nonblocking() {
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h
index 488cf8c646..73d133a306 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.h
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -57,6 +57,7 @@ namespace Rdma {
~Buffer();
private:
+ Buffer(uint32_t lkey, char* bytes, const int32_t byteCount);
const int32_t bufferSize;
::ibv_mr* mr;
::ibv_sge sge;
@@ -116,6 +117,7 @@ namespace Rdma {
friend class Connection;
boost::shared_ptr< ::ibv_pd > pd;
+ boost::shared_ptr< ::ibv_mr > rmr;
boost::shared_ptr< ::ibv_comp_channel > cchannel;
boost::shared_ptr< ::ibv_cq > scq;
boost::shared_ptr< ::ibv_cq > rcq;
@@ -133,6 +135,9 @@ namespace Rdma {
// Create a buffer to use for writing
Buffer* createBuffer(int s);
+ // Create and post recv buffers
+ void allocateRecvBuffers(int recvBufferCount, int bufferSize);
+
// Make channel non-blocking by making
// associated fd nonblocking
void nonblocking();