summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-09-19 14:15:54 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-09-19 14:15:54 +0000
commit7afcca6c7f08224ad5c7b44d8215f4c68c08dd65 (patch)
treea5e74c27844ce87319f7f2c482ce4beddc7bb0c5 /cpp/src
parentefa73f05b8b5e300c46ff6ab78c334f5d8b7fa2b (diff)
downloadqpid-python-7afcca6c7f08224ad5c7b44d8215f4c68c08dd65.tar.gz
RDMA bugfixes:
- Changed Rdma connection creation to allocate all necessary buffer memory immediately. This has the effect that no later buffer allocations happen which can fail so that once accepted connections won't fail because of lack of locked memory. - Fixed connection logic so we reject a new connection if we can't create the necessary handlers rather than kill the entire broker (this includes not enough locked memory) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp3
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp47
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp28
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h7
4 files changed, 54 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
index c0775ab9cd..1a85cbb4bc 100644
--- a/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -305,6 +305,7 @@ void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) {
Mutex::ScopedLock l(lock);
identifier = id;
aio = a;
+ assert(aio->bufferAvailable());
newBuffer();
}
void RdmaConnector::Writer::handle(framing::AMQFrame& frame) {
@@ -346,7 +347,7 @@ void RdmaConnector::Writer::write(Rdma::AsynchIO&) {
if (lastEof==0)
return;
size_t bytesWritten = 0;
- while (aio->writable() && !frames.empty()) {
+ while (aio->writable() && aio->bufferAvailable() && !frames.empty()) {
const AMQFrame* frame = &frames.front();
uint32_t size = frame->size();
while (size <= encode.available()) {
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 80475e662d..fc9174c493 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -96,7 +96,7 @@ RdmaIOHandler::~RdmaIOHandler() {
void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
{
- QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");
+ QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")");
Rdma::Buffer* buff = aio->getBuffer();
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
@@ -113,7 +113,9 @@ void RdmaIOHandler::activateOutput() {
}
void RdmaIOHandler::idle(Rdma::AsynchIO&) {
- if (!aio->writable()) {
+ // TODO: Shouldn't need this test as idle() should only ever be called when
+ // the connection is writable anyway
+ if ( !(aio->writable() && aio->bufferAvailable()) ) {
return;
}
if (isClient && codec == 0) {
@@ -138,7 +140,7 @@ void RdmaIOHandler::error(Rdma::AsynchIO&) {
}
void RdmaIOHandler::full(Rdma::AsynchIO&) {
- QPID_LOG(debug, "buffer full [" << identifier << "]");
+ QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
}
// The logic here is subtly different from TCP as RDMA is message oriented
@@ -163,7 +165,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
- QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
+ QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this, identifier);
if (!codec) {
@@ -231,19 +233,28 @@ void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connect
bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
ConnectionCodec::Factory* f) {
- RdmaIOHandler* async = new RdmaIOHandler(ci, f);
- Rdma::AsynchIO* aio =
- new Rdma::AsynchIO(ci->getQueuePair(),
- cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
- boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
- boost::bind(&RdmaIOHandler::idle, async, _1),
- 0, // boost::bind(&RdmaIOHandler::full, async, _1),
- boost::bind(&RdmaIOHandler::error, async, _1));
- async->init(aio);
-
- // Record aio so we can get it back from a connection
- ci->addContext(async);
- return true;
+ try {
+ RdmaIOHandler* async = new RdmaIOHandler(ci, f);
+ Rdma::AsynchIO* aio =
+ new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
+ boost::bind(&RdmaIOHandler::idle, async, _1),
+ 0, // boost::bind(&RdmaIOHandler::full, async, _1),
+ boost::bind(&RdmaIOHandler::error, async, _1));
+ async->init(aio);
+
+ // Record aio so we can get it back from a connection
+ ci->addContext(async);
+ return true;
+ } catch (const Rdma::Exception& e) {
+ QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): " << e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what());
+ }
+
+ // If we get here we caught an exception so reject connection
+ return false;
}
void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
@@ -312,7 +323,7 @@ void RdmaIOProtocolFactory::connect(
string port = ss.str();
int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
if (n<0) {
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+ throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n)));
}
Rdma::Connector c(
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index dd4fbefcaf..e3dc0cbf8f 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -62,11 +62,21 @@ namespace Rdma {
// Prepost some recv buffers before we go any further
for (int i = 0; i<recvBufferCount; ++i) {
+ // Allocate recv buffer
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
b->dataCount = b->byteCount;
qp->postRecv(b);
}
+
+ for (int i = 0; i<xmitBufferCount; ++i) {
+ // Allocate xmit buffer
+ Buffer* b = qp->createBuffer(bufferSize);
+ buffers.push_front(b);
+ bufferQueue.push_front(b);
+ b->dataCount = 0;
+ b->dataStart = 0;
+ }
}
AsynchIO::~AsynchIO() {
@@ -378,18 +388,12 @@ namespace Rdma {
Buffer* AsynchIO::getBuffer() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
- if (bufferQueue.empty()) {
- Buffer* b = qp->createBuffer(bufferSize);
- buffers.push_front(b);
- return b;
- } else {
- Buffer* b = bufferQueue.front();
- bufferQueue.pop_front();
- b->dataCount = 0;
- b->dataStart = 0;
- return b;
- }
-
+ assert(!bufferQueue.empty());
+ Buffer* b = bufferQueue.front();
+ bufferQueue.pop_front();
+ b->dataCount = 0;
+ b->dataStart = 0;
+ return b;
}
void AsynchIO::returnBuffer(Buffer* b) {
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index 8b1422a1af..29132b8967 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -65,6 +65,9 @@ namespace Rdma {
ErrorCallback errorCallback;
public:
+ // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use
+ // for buffers both read and write (allocate half to each up front) and fail if we cannot allocate that much
+ // locked memory
AsynchIO(
QueuePair::intrusive_ptr q,
int size,
@@ -78,6 +81,7 @@ namespace Rdma {
void start(qpid::sys::Poller::shared_ptr poller);
bool writable() const;
+ bool bufferAvailable() const;
void queueWrite(Buffer* buff);
void notifyPendingWrite();
void queueWriteClose();
@@ -109,6 +113,9 @@ namespace Rdma {
return outstandingWrites;
}
+ inline bool AsynchIO::bufferAvailable() const {
+ return !bufferQueue.empty();
+ }
// These are the parameters necessary to start the conversation
// * Each peer HAS to allocate buffers of the size of the maximum receive from its peer
// * Each peer HAS to know the initial "credit" it has for transmitting to its peer