summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/rdma/rdma_wrap.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp61
1 files changed, 37 insertions, 24 deletions
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index 071d453933..c286782c96 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -50,33 +50,14 @@ namespace Rdma {
return count;
}
- Buffer::Buffer(::ibv_pd* pd, const int32_t s) :
- bufferSize(s),
- mr(CHECK_NULL(::ibv_reg_mr(
- pd, new char[s], s,
- ::IBV_ACCESS_LOCAL_WRITE)))
- {
- sge.addr = (uintptr_t) mr->addr;
- sge.length = 0;
- sge.lkey = mr->lkey;
- }
-
Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) :
- bufferSize(byteCount),
- mr(0)
+ bufferSize(byteCount)
{
sge.addr = (uintptr_t) bytes;
sge.length = 0;
sge.lkey = lkey;
}
- Buffer::~Buffer() {
- if (mr) {
- (void) ::ibv_dereg_mr(mr);
- delete [] bytes();
- }
- }
-
QueuePairEvent::QueuePairEvent() :
dir(NONE)
{}
@@ -169,16 +150,48 @@ namespace Rdma {
// Deallocate recv buffer memory
if (rmr) delete [] static_cast<char*>(rmr->addr);
+ // Deallocate recv buffer memory
+ if (smr) delete [] static_cast<char*>(smr->addr);
+
// The buffers ptr_deque automatically deletes all the buffers we've allocated
}
- // Create a buffer to use for writing
- Buffer* QueuePair::createBuffer(int s) {
- Buffer* b = new Buffer(pd.get(), s);
- buffers.push_front(b);
+ // Create buffers to use for writing
+ void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize)
+ {
+ assert(!smr);
+
+ // Round up buffersize to cacheline (64 bytes)
+ bufferSize = (bufferSize+63) & (~63);
+
+ // Allocate memory block for all receive buffers
+ char* mem = new char [sendBufferCount * bufferSize];
+ smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+ for (int i = 0; i<sendBufferCount; ++i) {
+ // Allocate xmit buffer
+ Buffer* b = new Buffer(smr->lkey, &mem[i*bufferSize], bufferSize);
+ buffers.push_front(b);
+ bufferQueue.push_back(b);
+ }
+ }
+
+ Buffer* QueuePair::getBuffer() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
+ assert(!bufferQueue.empty());
+ Buffer* b = bufferQueue.back();
+ bufferQueue.pop_back();
return b;
}
+ void QueuePair::returnBuffer(Buffer* b) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
+ bufferQueue.push_back(b);
+ }
+
+ bool QueuePair::bufferAvailable() const {
+ return !bufferQueue.empty();
+ }
+
void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize)
{
assert(!rmr);