summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-09-08 16:48:58 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-09-08 16:48:58 +0000
commit9048ed46bb240aa3839f74d8b6daf837592186be (patch)
treebfd8c6e2d704a93b36ab575395bf4af6297a9b60 /cpp/src
parent8db918da6cb8d883c2f6c506823293c9029f1b18 (diff)
downloadqpid-python-9048ed46bb240aa3839f74d8b6daf837592186be.tar.gz
Refactored Rdma write buffers to be controlled by the rdma_wrapper layer
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995131 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp25
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h14
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp61
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.h23
4 files changed, 67 insertions, 56 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 3fb4395660..a72ed12af7 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -63,11 +63,8 @@ namespace Rdma {
// Prepost recv buffers before we go any further
qp->allocateRecvBuffers(recvBufferCount, bufferSize);
- for (int i = 0; i<xmitBufferCount; ++i) {
- // Allocate xmit buffer
- Buffer* b = qp->createBuffer(bufferSize);
- bufferQueue.push_front(b);
- }
+ // Create xmit buffers
+ qp->createSendBuffers(xmitBufferCount, bufferSize);
}
AsynchIO::~AsynchIO() {
@@ -427,10 +424,7 @@ namespace Rdma {
}
} else {
++sendEvents;
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
- bufferQueue.push_front(b);
- }
+ returnBuffer(b);
--outstandingWrites;
}
} while (true);
@@ -480,19 +474,6 @@ namespace Rdma {
nc(*this);
}
- Buffer* AsynchIO::getBuffer() {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
- assert(!bufferQueue.empty());
- Buffer* b = bufferQueue.front();
- bufferQueue.pop_front();
- return b;
- }
-
- void AsynchIO::returnBuffer(Buffer* b) {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
- bufferQueue.push_front(b);
- }
-
ConnectionManager::ConnectionManager(
ErrorCallback errc,
DisconnectedCallback dc
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index 9f55a7be7c..5876646b96 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -32,7 +32,6 @@
#include <netinet/in.h>
#include <boost/function.hpp>
-#include <deque>
namespace Rdma {
@@ -56,8 +55,6 @@ namespace Rdma {
enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN };
qpid::sys::AtomicValue<State> state;
//qpid::sys::Mutex stateLock;
- std::deque<Buffer*> bufferQueue;
- qpid::sys::Mutex bufferQueueLock;
QueuePair::intrusive_ptr qp;
qpid::sys::DispatchHandleRef dataHandle;
@@ -126,8 +123,17 @@ namespace Rdma {
}
inline bool AsynchIO::bufferAvailable() const {
- return !bufferQueue.empty();
+ return qp->bufferAvailable();
}
+
+ inline Buffer* AsynchIO::getBuffer() {
+ return qp->getBuffer();
+ }
+
+ inline void AsynchIO::returnBuffer(Buffer* b) {
+ qp->returnBuffer(b);
+ }
+
// These are the parameters necessary to start the conversation
// * Each peer HAS to allocate buffers of the size of the maximum receive from its peer
// * Each peer HAS to know the initial "credit" it has for transmitting to its peer
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index 071d453933..c286782c96 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -50,33 +50,14 @@ namespace Rdma {
return count;
}
- Buffer::Buffer(::ibv_pd* pd, const int32_t s) :
- bufferSize(s),
- mr(CHECK_NULL(::ibv_reg_mr(
- pd, new char[s], s,
- ::IBV_ACCESS_LOCAL_WRITE)))
- {
- sge.addr = (uintptr_t) mr->addr;
- sge.length = 0;
- sge.lkey = mr->lkey;
- }
-
Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) :
- bufferSize(byteCount),
- mr(0)
+ bufferSize(byteCount)
{
sge.addr = (uintptr_t) bytes;
sge.length = 0;
sge.lkey = lkey;
}
- Buffer::~Buffer() {
- if (mr) {
- (void) ::ibv_dereg_mr(mr);
- delete [] bytes();
- }
- }
-
QueuePairEvent::QueuePairEvent() :
dir(NONE)
{}
@@ -169,16 +150,48 @@ namespace Rdma {
// Deallocate recv buffer memory
if (rmr) delete [] static_cast<char*>(rmr->addr);
+ // Deallocate recv buffer memory
+ if (smr) delete [] static_cast<char*>(smr->addr);
+
// The buffers ptr_deque automatically deletes all the buffers we've allocated
}
- // Create a buffer to use for writing
- Buffer* QueuePair::createBuffer(int s) {
- Buffer* b = new Buffer(pd.get(), s);
- buffers.push_front(b);
+ // Create buffers to use for writing
+ void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize)
+ {
+ assert(!smr);
+
+ // Round up buffersize to cacheline (64 bytes)
+ bufferSize = (bufferSize+63) & (~63);
+
+ // Allocate memory block for all receive buffers
+ char* mem = new char [sendBufferCount * bufferSize];
+ smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+ for (int i = 0; i<sendBufferCount; ++i) {
+ // Allocate xmit buffer
+ Buffer* b = new Buffer(smr->lkey, &mem[i*bufferSize], bufferSize);
+ buffers.push_front(b);
+ bufferQueue.push_back(b);
+ }
+ }
+
+ Buffer* QueuePair::getBuffer() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
+ assert(!bufferQueue.empty());
+ Buffer* b = bufferQueue.back();
+ bufferQueue.pop_back();
return b;
}
+ void QueuePair::returnBuffer(Buffer* b) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
+ bufferQueue.push_back(b);
+ }
+
+ bool QueuePair::bufferAvailable() const {
+ return !bufferQueue.empty();
+ }
+
void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize)
{
assert(!rmr);
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h
index 73d133a306..f951dcb0af 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.h
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -25,11 +25,14 @@
#include "qpid/RefCounted.h"
#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_deque.hpp>
+#include <vector>
+
namespace qpid {
namespace sys {
class SocketAddress;
@@ -53,13 +56,9 @@ namespace Rdma {
int32_t dataCount() const;
void dataCount(int32_t);
- Buffer(::ibv_pd* pd, const int32_t s);
- ~Buffer();
-
private:
Buffer(uint32_t lkey, char* bytes, const int32_t byteCount);
const int32_t bufferSize;
- ::ibv_mr* mr;
::ibv_sge sge;
};
@@ -117,6 +116,7 @@ namespace Rdma {
friend class Connection;
boost::shared_ptr< ::ibv_pd > pd;
+ boost::shared_ptr< ::ibv_mr > smr;
boost::shared_ptr< ::ibv_mr > rmr;
boost::shared_ptr< ::ibv_comp_channel > cchannel;
boost::shared_ptr< ::ibv_cq > scq;
@@ -125,6 +125,8 @@ namespace Rdma {
int outstandingSendEvents;
int outstandingRecvEvents;
boost::ptr_deque<Buffer> buffers;
+ qpid::sys::Mutex bufferQueueLock;
+ std::vector<Buffer*> bufferQueue;
QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
~QueuePair();
@@ -132,8 +134,17 @@ namespace Rdma {
public:
typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
- // Create a buffer to use for writing
- Buffer* createBuffer(int s);
+ // Create a buffers to use for writing
+ void createSendBuffers(int sendBufferCount, int bufferSize);
+
+ // Get a send buffer
+ Buffer* getBuffer();
+
+ // Return buffer to pool after use
+ void returnBuffer(Buffer* b);
+
+ // Check whether any buffers are available
+ bool bufferAvailable() const;
// Create and post recv buffers
void allocateRecvBuffers(int recvBufferCount, int bufferSize);