summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp3
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp47
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp28
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h7
4 files changed, 54 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
index c0775ab9cd..1a85cbb4bc 100644
--- a/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -305,6 +305,7 @@ void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) {
Mutex::ScopedLock l(lock);
identifier = id;
aio = a;
+ assert(aio->bufferAvailable());
newBuffer();
}
void RdmaConnector::Writer::handle(framing::AMQFrame& frame) {
@@ -346,7 +347,7 @@ void RdmaConnector::Writer::write(Rdma::AsynchIO&) {
if (lastEof==0)
return;
size_t bytesWritten = 0;
- while (aio->writable() && !frames.empty()) {
+ while (aio->writable() && aio->bufferAvailable() && !frames.empty()) {
const AMQFrame* frame = &frames.front();
uint32_t size = frame->size();
while (size <= encode.available()) {
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 80475e662d..fc9174c493 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -96,7 +96,7 @@ RdmaIOHandler::~RdmaIOHandler() {
void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
{
- QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");
+ QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")");
Rdma::Buffer* buff = aio->getBuffer();
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
@@ -113,7 +113,9 @@ void RdmaIOHandler::activateOutput() {
}
void RdmaIOHandler::idle(Rdma::AsynchIO&) {
- if (!aio->writable()) {
+ // TODO: Shouldn't need this test as idle() should only ever be called when
+ // the connection is writable anyway
+ if ( !(aio->writable() && aio->bufferAvailable()) ) {
return;
}
if (isClient && codec == 0) {
@@ -138,7 +140,7 @@ void RdmaIOHandler::error(Rdma::AsynchIO&) {
}
void RdmaIOHandler::full(Rdma::AsynchIO&) {
- QPID_LOG(debug, "buffer full [" << identifier << "]");
+ QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
}
// The logic here is subtly different from TCP as RDMA is message oriented
@@ -163,7 +165,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
- QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
+ QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this, identifier);
if (!codec) {
@@ -231,19 +233,28 @@ void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connect
bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
ConnectionCodec::Factory* f) {
- RdmaIOHandler* async = new RdmaIOHandler(ci, f);
- Rdma::AsynchIO* aio =
- new Rdma::AsynchIO(ci->getQueuePair(),
- cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
- boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
- boost::bind(&RdmaIOHandler::idle, async, _1),
- 0, // boost::bind(&RdmaIOHandler::full, async, _1),
- boost::bind(&RdmaIOHandler::error, async, _1));
- async->init(aio);
-
- // Record aio so we can get it back from a connection
- ci->addContext(async);
- return true;
+ try {
+ RdmaIOHandler* async = new RdmaIOHandler(ci, f);
+ Rdma::AsynchIO* aio =
+ new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
+ boost::bind(&RdmaIOHandler::idle, async, _1),
+ 0, // boost::bind(&RdmaIOHandler::full, async, _1),
+ boost::bind(&RdmaIOHandler::error, async, _1));
+ async->init(aio);
+
+ // Record aio so we can get it back from a connection
+ ci->addContext(async);
+ return true;
+ } catch (const Rdma::Exception& e) {
+ QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): " << e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what());
+ }
+
+ // If we get here we caught an exception so reject connection
+ return false;
}
void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
@@ -312,7 +323,7 @@ void RdmaIOProtocolFactory::connect(
string port = ss.str();
int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
if (n<0) {
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+ throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n)));
}
Rdma::Connector c(
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index dd4fbefcaf..e3dc0cbf8f 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -62,11 +62,21 @@ namespace Rdma {
// Prepost some recv buffers before we go any further
for (int i = 0; i<recvBufferCount; ++i) {
+ // Allocate recv buffer
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
b->dataCount = b->byteCount;
qp->postRecv(b);
}
+
+ for (int i = 0; i<xmitBufferCount; ++i) {
+ // Allocate xmit buffer
+ Buffer* b = qp->createBuffer(bufferSize);
+ buffers.push_front(b);
+ bufferQueue.push_front(b);
+ b->dataCount = 0;
+ b->dataStart = 0;
+ }
}
AsynchIO::~AsynchIO() {
@@ -378,18 +388,12 @@ namespace Rdma {
Buffer* AsynchIO::getBuffer() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
- if (bufferQueue.empty()) {
- Buffer* b = qp->createBuffer(bufferSize);
- buffers.push_front(b);
- return b;
- } else {
- Buffer* b = bufferQueue.front();
- bufferQueue.pop_front();
- b->dataCount = 0;
- b->dataStart = 0;
- return b;
- }
-
+ assert(!bufferQueue.empty());
+ Buffer* b = bufferQueue.front();
+ bufferQueue.pop_front();
+ b->dataCount = 0;
+ b->dataStart = 0;
+ return b;
}
void AsynchIO::returnBuffer(Buffer* b) {
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index 8b1422a1af..29132b8967 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -65,6 +65,9 @@ namespace Rdma {
ErrorCallback errorCallback;
public:
+ // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use
+ // for buffers both read and write (allocate half to each up front) and fail if we cannot allocate that much
+ // locked memory
AsynchIO(
QueuePair::intrusive_ptr q,
int size,
@@ -78,6 +81,7 @@ namespace Rdma {
void start(qpid::sys::Poller::shared_ptr poller);
bool writable() const;
+ bool bufferAvailable() const;
void queueWrite(Buffer* buff);
void notifyPendingWrite();
void queueWriteClose();
@@ -109,6 +113,9 @@ namespace Rdma {
return outstandingWrites;
}
+ inline bool AsynchIO::bufferAvailable() const {
+ return !bufferQueue.empty();
+ }
// These are the parameters necessary to start the conversation
// * Each peer HAS to allocate buffers of the size of the maximum receive from its peer
// * Each peer HAS to know the initial "credit" it has for transmitting to its peer