diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-12-23 17:11:39 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-12-23 17:11:39 +0000 |
commit | 0f8775149a61e04a9e514f28a2bd35766b3ca991 (patch) | |
tree | 40f1425467fcf36341719af343571031e6f699c8 /cpp/src | |
parent | 1eb6ea19fdbc9e5b7e20f5b1b21b8687452b4c71 (diff) | |
download | qpid-python-0f8775149a61e04a9e514f28a2bd35766b3ca991.tar.gz |
Implementation for v1 rdma protocol - append sent credit and flags word
after sent frame (instead of sending it in immediate data).
Small change to send buffer management to support this to 0 dataCount
when returning buffers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1052329 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 93 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 1 |
4 files changed, 77 insertions, 26 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 984f4daf89..0e92210313 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -298,6 +298,9 @@ void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connect bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp, ConnectionCodec::Factory* f) { try { + if (cp.rdmaProtocolVersion == 0) { + QPID_LOG(warning, "Rdma: connection from protocol version 0 client"); + } RdmaIOHandler* async = new RdmaIOHandler(ci, f); Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index b5cedf3d70..74ceceffad 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -32,8 +32,30 @@ using qpid::sys::ScopedLock; using qpid::sys::Mutex; namespace Rdma { - // Set packing as this is 'on the wire' structure + // Set packing as these are 'on the wire' structures # pragma pack(push, 1) + + // Header structure for each transmitted frame + struct FrameHeader { + const static uint32_t FlagsMask = 0xf0000000; + uint32_t data; // written in network order + + FrameHeader() {} + FrameHeader(uint32_t credit, uint32_t flags = 0) { + data = htonl((credit & ~FlagsMask) | (flags & FlagsMask)); + } + + uint32_t credit() const { + return ntohl(data) & ~FlagsMask; + } + + uint32_t flags() const { + return ntohl(data) & FlagsMask; + } + }; + + const size_t FrameHeaderSize = sizeof(FrameHeader); + // Structure for Connection Parameters on the network // // The original version (now called 0) of these parameters had a couple of mistakes: @@ -116,10 +138,10 @@ namespace Rdma { qp->notifySend(); // Prepost recv buffers before we go any further - qp->allocateRecvBuffers(recvBufferCount, bufferSize); + qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize); // Create xmit buffers - qp->createSendBuffers(xmitBufferCount, bufferSize); + qp->createSendBuffers(xmitBufferCount, bufferSize+FrameHeaderSize); } AsynchIO::~AsynchIO() { @@ -173,33 +195,58 @@ namespace Rdma { } void AsynchIO::queueBuffer(Buffer* buff, int credit) { - if (!buff) { - Buffer* ob = getBuffer(); - // Have to send something as adapters hate it when you try to transfer 0 bytes - *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit); - ob->dataCount(sizeof(uint32_t)); - qp->postSend(credit | IgnoreData, ob); - } else if (credit > 0) { - qp->postSend(credit, buff); - } else { - qp->postSend(buff); + switch (protocolVersion) { + case 0: + if (!buff) { + Buffer* ob = getBuffer(); + // Have to send something as adapters hate it when you try to transfer 0 bytes + *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit); + ob->dataCount(sizeof(uint32_t)); + qp->postSend(credit | IgnoreData, ob); + } else if (credit > 0) { + qp->postSend(credit, buff); + } else { + qp->postSend(buff); + } + break; + case 1: + Buffer* ob = buff ? buff : getBuffer(); + // Add FrameHeader after frame data + FrameHeader header(credit); + ::memcpy(ob->bytes()+ob->dataCount(), &header, FrameHeaderSize); + ob->dataCount(ob->dataCount()+FrameHeaderSize); + qp->postSend(ob); + break; } } Buffer* AsynchIO::extractBuffer(const QueuePairEvent& e) { - // Get our xmitCredit if it was sent - bool dataPresent = true; - if (e.immPresent() ) { + Buffer* b = e.getBuffer(); + switch (protocolVersion) { + case 0: { + bool dataPresent = true; + // Get our xmitCredit if it was sent + if (e.immPresent() ) { + assert(xmitCredit>=0); + xmitCredit += (e.getImm() & ~FlagsMask); + dataPresent = ((e.getImm() & IgnoreData) == 0); + assert(xmitCredit>0); + } + if (!dataPresent) { + b->dataCount(0); + } + break; + } + case 1: + b->dataCount(b->dataCount()-FrameHeaderSize); + FrameHeader header; + ::memcpy(&header, b->bytes()+b->dataCount(), FrameHeaderSize); assert(xmitCredit>=0); - xmitCredit += (e.getImm() & ~FlagsMask); - dataPresent = ((e.getImm() & IgnoreData) == 0); - assert(xmitCredit>0); + xmitCredit += header.credit(); + assert(xmitCredit>=0); + break; } - Buffer* b = e.getBuffer(); - if (!dataPresent) { - b->dataCount(0); - } return b; } diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index d5da9ee7fe..adf27542fb 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -98,7 +98,7 @@ namespace Rdma { void returnBuffer(Buffer*); private: - const static int maxSupportedProtocolVersion = 0; + const static int maxSupportedProtocolVersion = 1; // Constants for the peer-peer command messages // These are sent in the high bits if the imm data of an rdma message @@ -154,8 +154,8 @@ namespace Rdma { uint16_t initialXmitCredit; uint16_t rdmaProtocolVersion; - // Default to protocol version 0 - ConnectionParams(uint32_t s, uint16_t c, uint16_t v = 0) : + // Default to protocol version 1 + ConnectionParams(uint32_t s, uint16_t c, uint16_t v = 1) : maxRecvBufferSize(s), initialXmitCredit(c), rdmaProtocolVersion(v) diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index 8d5545baa8..ec6e6c6b99 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -187,6 +187,7 @@ namespace Rdma { assert(!freeBuffers.empty()); Buffer* b = &sendBuffers[freeBuffers.back()]; freeBuffers.pop_back(); + b->dataCount(0); return b; } |