diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-12-23 17:11:48 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-12-23 17:11:48 +0000 |
commit | ff152807775cf3ad146742a59bbe44146cbb9a34 (patch) | |
tree | 19f87a8d23341e1d14ff5138d4f968867db174cd | |
parent | 0f8775149a61e04a9e514f28a2bd35766b3ca991 (diff) | |
download | qpid-python-ff152807775cf3ad146742a59bbe44146cbb9a34.tar.gz |
Changes due to review comments from Doug Ledford:
- Removed lock unsafe operation Rdma::QueuePair::bufferAvailable()
and replaced the unavailable case with failing getBuffer().
- Improved asserts in the Rdma::QueuePair::getBuffer() code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1052330 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.h | 3 |
7 files changed, 34 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index 313a99df68..e08ced0abc 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -349,14 +349,16 @@ void RdmaConnector::send(AMQFrame& frame) { void RdmaConnector::writebuff(Rdma::AsynchIO&) { // It's possible to be disconnected and be writable Mutex::ScopedLock l(dataConnectedLock); - if (!dataConnected) + if (!dataConnected) { return; - + } Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - if (codec->canEncode()) { - Rdma::Buffer* buffer = aio->getBuffer(); + if (!codec->canEncode()) { + return; + } + Rdma::Buffer* buffer = aio->getBuffer(); + if (buffer) { size_t encoded = codec->encode(buffer->bytes(), buffer->byteCount()); - buffer->dataCount(encoded); aio->queueWrite(buffer); } @@ -366,7 +368,7 @@ bool RdmaConnector::canEncode() { Mutex::ScopedLock l(lock); //have at least one full frameset or a whole buffers worth of data - return aio->writable() && aio->bufferAvailable() && (lastEof || currentSize >= maxFrameSize); + return aio->writable() && (lastEof || currentSize >= maxFrameSize); } size_t RdmaConnector::encode(const char* buffer, size_t size) diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 0e92210313..c2ea815d31 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -117,6 +117,7 @@ void RdmaIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")"); Rdma::Buffer* buff = aio->getBuffer(); + assert(buff); framing::Buffer out(buff->bytes(), buff->byteCount()); data.encode(out); buff->dataCount(data.encodedSize()); @@ -138,25 +139,29 @@ void RdmaIOHandler::activateOutput() { void RdmaIOHandler::idle(Rdma::AsynchIO&) { // TODO: Shouldn't need this test as idle() should only ever be called when // the connection is writable anyway - if ( !(aio->writable() && aio->bufferAvailable()) ) { + if ( !aio->writable() ) { return; } if (codec == 0) return; - if (codec->canEncode()) { - Rdma::Buffer* buff = aio->getBuffer(); + if (!codec->canEncode()) { + return; + } + Rdma::Buffer* buff = aio->getBuffer(); + if (buff) { size_t encoded=codec->encode(buff->bytes(), buff->byteCount()); buff->dataCount(encoded); aio->queueWrite(buff); + if (codec->isClosed()) { + close(); + } } - if (codec->isClosed()) - close(); } void RdmaIOHandler::initProtocolOut() { // We mustn't have already started the conversation // but we must be able to send assert( codec == 0 ); - assert( aio->writable() && aio->bufferAvailable() ); + assert( aio->writable() ); codec = factory->create(*this, identifier, SecuritySettings()); write(framing::ProtocolInitiation(codec->getVersion())); } diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 67c672f857..e53ebb0520 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -93,8 +93,9 @@ Xor128Generator output; Xor128Generator input; void write(Rdma::AsynchIO& aio) { - while (aio.writable() && aio.bufferAvailable() && smsgs < target) { + while (aio.writable() && smsgs < target) { Rdma::Buffer* b = aio.getBuffer(); + if (!b) break; b->dataCount(msgsize); uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes()); uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t); diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index adf27542fb..330c2395bd 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -87,7 +87,6 @@ namespace Rdma { void start(qpid::sys::Poller::shared_ptr poller); bool writable() const; - bool bufferAvailable() const; void queueWrite(Buffer* buff); void notifyPendingWrite(); void drainWriteQueue(NotifyCallback); @@ -134,10 +133,6 @@ namespace Rdma { return outstandingWrites; } - inline bool AsynchIO::bufferAvailable() const { - return qp->bufferAvailable(); - } - inline Buffer* AsynchIO::getBuffer() { return qp->getBuffer(); } diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index d924c388ec..33bb8247a1 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -79,10 +79,11 @@ void dataError(Rdma::AsynchIO&) { void idle(ConRec* cr, Rdma::AsynchIO& a) { // Need to make sure full is not called as it would reorder messages - while (!cr->queuedWrites.empty() && a.writable() && a.bufferAvailable()) { + while (!cr->queuedWrites.empty() && a.writable()) { + Rdma::Buffer* rbuf = a.getBuffer(); + if (!rbuf) break; Buffer* buf = cr->queuedWrites.front(); cr->queuedWrites.pop(); - Rdma::Buffer* rbuf = a.getBuffer(); std::copy(buf->bytes(), buf->bytes()+buf->byteCount(), rbuf->bytes()); rbuf->dataCount(buf->byteCount()); delete buf; @@ -92,8 +93,11 @@ void idle(ConRec* cr, Rdma::AsynchIO& a) { void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { // Echo data back - if (cr->queuedWrites.empty() && a.writable() && a.bufferAvailable()) { - Rdma::Buffer* buf = a.getBuffer(); + Rdma::Buffer* buf = 0; + if (cr->queuedWrites.empty() && a.writable()) { + buf = a.getBuffer(); + } + if (buf) { std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes()); buf->dataCount(b->dataCount()); a.queueWrite(buf); diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index ec6e6c6b99..a51244a7dc 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -184,9 +184,12 @@ namespace Rdma { Buffer* QueuePair::getBuffer() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock); - assert(!freeBuffers.empty()); - Buffer* b = &sendBuffers[freeBuffers.back()]; + if (freeBuffers.empty()) + return 0; + int i = freeBuffers.back(); freeBuffers.pop_back(); + assert(i >= 0 && i < int(sendBuffers.size())); + Buffer* b = &sendBuffers[i]; b->dataCount(0); return b; } @@ -198,10 +201,6 @@ namespace Rdma { freeBuffers.push_back(i); } - bool QueuePair::bufferAvailable() const { - return !freeBuffers.empty(); - } - void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize) { assert(!rmr); diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 1d72abcd03..a3cd584102 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -144,9 +144,6 @@ namespace Rdma { // Return buffer to pool after use void returnBuffer(Buffer* b); - // Check whether any buffers are available - bool bufferAvailable() const; - // Create and post recv buffers void allocateRecvBuffers(int recvBufferCount, int bufferSize); |