summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/RdmaIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp93
1 files changed, 70 insertions, 23 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index b5cedf3d70..74ceceffad 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -32,8 +32,30 @@ using qpid::sys::ScopedLock;
using qpid::sys::Mutex;
namespace Rdma {
- // Set packing as this is 'on the wire' structure
+ // Set packing as these are 'on the wire' structures
# pragma pack(push, 1)
+
+ // Header structure for each transmitted frame
+ struct FrameHeader {
+ const static uint32_t FlagsMask = 0xf0000000;
+ uint32_t data; // written in network order
+
+ FrameHeader() {}
+ FrameHeader(uint32_t credit, uint32_t flags = 0) {
+ data = htonl((credit & ~FlagsMask) | (flags & FlagsMask));
+ }
+
+ uint32_t credit() const {
+ return ntohl(data) & ~FlagsMask;
+ }
+
+ uint32_t flags() const {
+ return ntohl(data) & FlagsMask;
+ }
+ };
+
+ const size_t FrameHeaderSize = sizeof(FrameHeader);
+
// Structure for Connection Parameters on the network
//
// The original version (now called 0) of these parameters had a couple of mistakes:
@@ -116,10 +138,10 @@ namespace Rdma {
qp->notifySend();
// Prepost recv buffers before we go any further
- qp->allocateRecvBuffers(recvBufferCount, bufferSize);
+ qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize);
// Create xmit buffers
- qp->createSendBuffers(xmitBufferCount, bufferSize);
+ qp->createSendBuffers(xmitBufferCount, bufferSize+FrameHeaderSize);
}
AsynchIO::~AsynchIO() {
@@ -173,33 +195,58 @@ namespace Rdma {
}
void AsynchIO::queueBuffer(Buffer* buff, int credit) {
- if (!buff) {
- Buffer* ob = getBuffer();
- // Have to send something as adapters hate it when you try to transfer 0 bytes
- *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit);
- ob->dataCount(sizeof(uint32_t));
- qp->postSend(credit | IgnoreData, ob);
- } else if (credit > 0) {
- qp->postSend(credit, buff);
- } else {
- qp->postSend(buff);
+ switch (protocolVersion) {
+ case 0:
+ if (!buff) {
+ Buffer* ob = getBuffer();
+ // Have to send something as adapters hate it when you try to transfer 0 bytes
+ *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit);
+ ob->dataCount(sizeof(uint32_t));
+ qp->postSend(credit | IgnoreData, ob);
+ } else if (credit > 0) {
+ qp->postSend(credit, buff);
+ } else {
+ qp->postSend(buff);
+ }
+ break;
+ case 1:
+ Buffer* ob = buff ? buff : getBuffer();
+ // Add FrameHeader after frame data
+ FrameHeader header(credit);
+ ::memcpy(ob->bytes()+ob->dataCount(), &header, FrameHeaderSize);
+ ob->dataCount(ob->dataCount()+FrameHeaderSize);
+ qp->postSend(ob);
+ break;
}
}
Buffer* AsynchIO::extractBuffer(const QueuePairEvent& e) {
- // Get our xmitCredit if it was sent
- bool dataPresent = true;
- if (e.immPresent() ) {
+ Buffer* b = e.getBuffer();
+ switch (protocolVersion) {
+ case 0: {
+ bool dataPresent = true;
+ // Get our xmitCredit if it was sent
+ if (e.immPresent() ) {
+ assert(xmitCredit>=0);
+ xmitCredit += (e.getImm() & ~FlagsMask);
+ dataPresent = ((e.getImm() & IgnoreData) == 0);
+ assert(xmitCredit>0);
+ }
+ if (!dataPresent) {
+ b->dataCount(0);
+ }
+ break;
+ }
+ case 1:
+ b->dataCount(b->dataCount()-FrameHeaderSize);
+ FrameHeader header;
+ ::memcpy(&header, b->bytes()+b->dataCount(), FrameHeaderSize);
assert(xmitCredit>=0);
- xmitCredit += (e.getImm() & ~FlagsMask);
- dataPresent = ((e.getImm() & IgnoreData) == 0);
- assert(xmitCredit>0);
+ xmitCredit += header.credit();
+ assert(xmitCredit>=0);
+ break;
}
- Buffer* b = e.getBuffer();
- if (!dataPresent) {
- b->dataCount(0);
- }
return b;
}