diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-09-08 16:48:58 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-09-08 16:48:58 +0000 |
commit | 9048ed46bb240aa3839f74d8b6daf837592186be (patch) | |
tree | bfd8c6e2d704a93b36ab575395bf4af6297a9b60 /cpp/src | |
parent | 8db918da6cb8d883c2f6c506823293c9029f1b18 (diff) | |
download | qpid-python-9048ed46bb240aa3839f74d8b6daf837592186be.tar.gz |
Refactored Rdma write buffers to be controlled by the rdma_wrapper layer
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995131 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 25 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.h | 23 |
4 files changed, 67 insertions, 56 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 3fb4395660..a72ed12af7 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -63,11 +63,8 @@ namespace Rdma { // Prepost recv buffers before we go any further qp->allocateRecvBuffers(recvBufferCount, bufferSize); - for (int i = 0; i<xmitBufferCount; ++i) { - // Allocate xmit buffer - Buffer* b = qp->createBuffer(bufferSize); - bufferQueue.push_front(b); - } + // Create xmit buffers + qp->createSendBuffers(xmitBufferCount, bufferSize); } AsynchIO::~AsynchIO() { @@ -427,10 +424,7 @@ namespace Rdma { } } else { ++sendEvents; - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); - bufferQueue.push_front(b); - } + returnBuffer(b); --outstandingWrites; } } while (true); @@ -480,19 +474,6 @@ namespace Rdma { nc(*this); } - Buffer* AsynchIO::getBuffer() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); - assert(!bufferQueue.empty()); - Buffer* b = bufferQueue.front(); - bufferQueue.pop_front(); - return b; - } - - void AsynchIO::returnBuffer(Buffer* b) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); - bufferQueue.push_front(b); - } - ConnectionManager::ConnectionManager( ErrorCallback errc, DisconnectedCallback dc diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 9f55a7be7c..5876646b96 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -32,7 +32,6 @@ #include <netinet/in.h> #include <boost/function.hpp> -#include <deque> namespace Rdma { @@ -56,8 +55,6 @@ namespace Rdma { enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN }; qpid::sys::AtomicValue<State> state; //qpid::sys::Mutex stateLock; - std::deque<Buffer*> bufferQueue; - qpid::sys::Mutex bufferQueueLock; QueuePair::intrusive_ptr qp; qpid::sys::DispatchHandleRef dataHandle; @@ -126,8 +123,17 @@ namespace Rdma { } inline bool AsynchIO::bufferAvailable() const { - return !bufferQueue.empty(); + return qp->bufferAvailable(); } + + inline Buffer* AsynchIO::getBuffer() { + return qp->getBuffer(); + } + + inline void AsynchIO::returnBuffer(Buffer* b) { + qp->returnBuffer(b); + } + // These are the parameters necessary to start the conversation // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer // * Each peer HAS to know the initial "credit" it has for transmitting to its peer diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index 071d453933..c286782c96 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -50,33 +50,14 @@ namespace Rdma { return count; } - Buffer::Buffer(::ibv_pd* pd, const int32_t s) : - bufferSize(s), - mr(CHECK_NULL(::ibv_reg_mr( - pd, new char[s], s, - ::IBV_ACCESS_LOCAL_WRITE))) - { - sge.addr = (uintptr_t) mr->addr; - sge.length = 0; - sge.lkey = mr->lkey; - } - Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) : - bufferSize(byteCount), - mr(0) + bufferSize(byteCount) { sge.addr = (uintptr_t) bytes; sge.length = 0; sge.lkey = lkey; } - Buffer::~Buffer() { - if (mr) { - (void) ::ibv_dereg_mr(mr); - delete [] bytes(); - } - } - QueuePairEvent::QueuePairEvent() : dir(NONE) {} @@ -169,16 +150,48 @@ namespace Rdma { // Deallocate recv buffer memory if (rmr) delete [] static_cast<char*>(rmr->addr); + // Deallocate recv buffer memory + if (smr) delete [] static_cast<char*>(smr->addr); + // The buffers ptr_deque automatically deletes all the buffers we've allocated } - // Create a buffer to use for writing - Buffer* QueuePair::createBuffer(int s) { - Buffer* b = new Buffer(pd.get(), s); - buffers.push_front(b); + // Create buffers to use for writing + void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize) + { + assert(!smr); + + // Round up buffersize to cacheline (64 bytes) + bufferSize = (bufferSize+63) & (~63); + + // Allocate memory block for all receive buffers + char* mem = new char [sendBufferCount * bufferSize]; + smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE); + for (int i = 0; i<sendBufferCount; ++i) { + // Allocate xmit buffer + Buffer* b = new Buffer(smr->lkey, &mem[i*bufferSize], bufferSize); + buffers.push_front(b); + bufferQueue.push_back(b); + } + } + + Buffer* QueuePair::getBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + assert(!bufferQueue.empty()); + Buffer* b = bufferQueue.back(); + bufferQueue.pop_back(); return b; } + void QueuePair::returnBuffer(Buffer* b) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + bufferQueue.push_back(b); + } + + bool QueuePair::bufferAvailable() const { + return !bufferQueue.empty(); + } + void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize) { assert(!rmr); diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 73d133a306..f951dcb0af 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -25,11 +25,14 @@ #include "qpid/RefCounted.h" #include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> #include <boost/ptr_container/ptr_deque.hpp> +#include <vector> + namespace qpid { namespace sys { class SocketAddress; @@ -53,13 +56,9 @@ namespace Rdma { int32_t dataCount() const; void dataCount(int32_t); - Buffer(::ibv_pd* pd, const int32_t s); - ~Buffer(); - private: Buffer(uint32_t lkey, char* bytes, const int32_t byteCount); const int32_t bufferSize; - ::ibv_mr* mr; ::ibv_sge sge; }; @@ -117,6 +116,7 @@ namespace Rdma { friend class Connection; boost::shared_ptr< ::ibv_pd > pd; + boost::shared_ptr< ::ibv_mr > smr; boost::shared_ptr< ::ibv_mr > rmr; boost::shared_ptr< ::ibv_comp_channel > cchannel; boost::shared_ptr< ::ibv_cq > scq; @@ -125,6 +125,8 @@ namespace Rdma { int outstandingSendEvents; int outstandingRecvEvents; boost::ptr_deque<Buffer> buffers; + qpid::sys::Mutex bufferQueueLock; + std::vector<Buffer*> bufferQueue; QueuePair(boost::shared_ptr< ::rdma_cm_id > id); ~QueuePair(); @@ -132,8 +134,17 @@ namespace Rdma { public: typedef boost::intrusive_ptr<QueuePair> intrusive_ptr; - // Create a buffer to use for writing - Buffer* createBuffer(int s); + // Create a buffers to use for writing + void createSendBuffers(int sendBufferCount, int bufferSize); + + // Get a send buffer + Buffer* getBuffer(); + + // Return buffer to pool after use + void returnBuffer(Buffer* b); + + // Check whether any buffers are available + bool bufferAvailable() const; // Create and post recv buffers void allocateRecvBuffers(int recvBufferCount, int bufferSize); |