diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 7 |
4 files changed, 54 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index c0775ab9cd..1a85cbb4bc 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -305,6 +305,7 @@ void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) { Mutex::ScopedLock l(lock); identifier = id; aio = a; + assert(aio->bufferAvailable()); newBuffer(); } void RdmaConnector::Writer::handle(framing::AMQFrame& frame) { @@ -346,7 +347,7 @@ void RdmaConnector::Writer::write(Rdma::AsynchIO&) { if (lastEof==0) return; size_t bytesWritten = 0; - while (aio->writable() && !frames.empty()) { + while (aio->writable() && aio->bufferAvailable() && !frames.empty()) { const AMQFrame* frame = &frames.front(); uint32_t size = frame->size(); while (size <= encode.available()) { diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 80475e662d..fc9174c493 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -96,7 +96,7 @@ RdmaIOHandler::~RdmaIOHandler() { void RdmaIOHandler::write(const framing::ProtocolInitiation& data) { - QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); + QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")"); Rdma::Buffer* buff = aio->getBuffer(); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); @@ -113,7 +113,9 @@ void RdmaIOHandler::activateOutput() { } void RdmaIOHandler::idle(Rdma::AsynchIO&) { - if (!aio->writable()) { + // TODO: Shouldn't need this test as idle() should only ever be called when + // the connection is writable anyway + if ( !(aio->writable() && aio->bufferAvailable()) ) { return; } if (isClient && codec == 0) { @@ -138,7 +140,7 @@ void RdmaIOHandler::error(Rdma::AsynchIO&) { } void RdmaIOHandler::full(Rdma::AsynchIO&) { - QPID_LOG(debug, "buffer full [" << identifier << "]"); + QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); } // The logic here is subtly different from TCP as RDMA is message oriented @@ -163,7 +165,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { decoded = in.getPosition(); - QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")"); + QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); try { codec = factory->create(protocolInit.getVersion(), *this, identifier); if (!codec) { @@ -231,19 +233,28 @@ void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connect bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp, ConnectionCodec::Factory* f) { - RdmaIOHandler* async = new RdmaIOHandler(ci, f); - Rdma::AsynchIO* aio = - new Rdma::AsynchIO(ci->getQueuePair(), - cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, - boost::bind(&RdmaIOHandler::readbuff, async, _1, _2), - boost::bind(&RdmaIOHandler::idle, async, _1), - 0, // boost::bind(&RdmaIOHandler::full, async, _1), - boost::bind(&RdmaIOHandler::error, async, _1)); - async->init(aio); - - // Record aio so we can get it back from a connection - ci->addContext(async); - return true; + try { + RdmaIOHandler* async = new RdmaIOHandler(ci, f); + Rdma::AsynchIO* aio = + new Rdma::AsynchIO(ci->getQueuePair(), + cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, + boost::bind(&RdmaIOHandler::readbuff, async, _1, _2), + boost::bind(&RdmaIOHandler::idle, async, _1), + 0, // boost::bind(&RdmaIOHandler::full, async, _1), + boost::bind(&RdmaIOHandler::error, async, _1)); + async->init(aio); + + // Record aio so we can get it back from a connection + ci->addContext(async); + return true; + } catch (const Rdma::Exception& e) { + QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): " << e.what()); + } catch (const std::exception& e) { + QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what()); + } + + // If we get here we caught an exception so reject connection + return false; } void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) { @@ -312,7 +323,7 @@ void RdmaIOProtocolFactory::connect( string port = ss.str(); int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res); if (n<0) { - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); + throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n))); } Rdma::Connector c( diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index dd4fbefcaf..e3dc0cbf8f 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -62,11 +62,21 @@ namespace Rdma { // Prepost some recv buffers before we go any further for (int i = 0; i<recvBufferCount; ++i) { + // Allocate recv buffer Buffer* b = qp->createBuffer(bufferSize); buffers.push_front(b); b->dataCount = b->byteCount; qp->postRecv(b); } + + for (int i = 0; i<xmitBufferCount; ++i) { + // Allocate xmit buffer + Buffer* b = qp->createBuffer(bufferSize); + buffers.push_front(b); + bufferQueue.push_front(b); + b->dataCount = 0; + b->dataStart = 0; + } } AsynchIO::~AsynchIO() { @@ -378,18 +388,12 @@ namespace Rdma { Buffer* AsynchIO::getBuffer() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); - if (bufferQueue.empty()) { - Buffer* b = qp->createBuffer(bufferSize); - buffers.push_front(b); - return b; - } else { - Buffer* b = bufferQueue.front(); - bufferQueue.pop_front(); - b->dataCount = 0; - b->dataStart = 0; - return b; - } - + assert(!bufferQueue.empty()); + Buffer* b = bufferQueue.front(); + bufferQueue.pop_front(); + b->dataCount = 0; + b->dataStart = 0; + return b; } void AsynchIO::returnBuffer(Buffer* b) { diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 8b1422a1af..29132b8967 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -65,6 +65,9 @@ namespace Rdma { ErrorCallback errorCallback; public: + // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use + // for buffers both read and write (allocate half to each up front) and fail if we cannot allocate that much + // locked memory AsynchIO( QueuePair::intrusive_ptr q, int size, @@ -78,6 +81,7 @@ namespace Rdma { void start(qpid::sys::Poller::shared_ptr poller); bool writable() const; + bool bufferAvailable() const; void queueWrite(Buffer* buff); void notifyPendingWrite(); void queueWriteClose(); @@ -109,6 +113,9 @@ namespace Rdma { return outstandingWrites; } + inline bool AsynchIO::bufferAvailable() const { + return !bufferQueue.empty(); + } // These are the parameters necessary to start the conversation // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer // * Each peer HAS to know the initial "credit" it has for transmitting to its peer |