summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-12-23 17:11:39 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-12-23 17:11:39 +0000
commit0f8775149a61e04a9e514f28a2bd35766b3ca991 (patch)
tree40f1425467fcf36341719af343571031e6f699c8 /cpp/src
parent1eb6ea19fdbc9e5b7e20f5b1b21b8687452b4c71 (diff)
downloadqpid-python-0f8775149a61e04a9e514f28a2bd35766b3ca991.tar.gz
Implementation for v1 rdma protocol - append sent credit and flags word
after sent frame (instead of sending it in immediate data). Small change to send buffer management to support this to 0 dataCount when returning buffers. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1052329 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp3
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp93
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h6
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp1
4 files changed, 77 insertions, 26 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 984f4daf89..0e92210313 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -298,6 +298,9 @@ void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connect
bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp,
ConnectionCodec::Factory* f) {
try {
+ if (cp.rdmaProtocolVersion == 0) {
+ QPID_LOG(warning, "Rdma: connection from protocol version 0 client");
+ }
RdmaIOHandler* async = new RdmaIOHandler(ci, f);
Rdma::AsynchIO* aio =
new Rdma::AsynchIO(ci->getQueuePair(),
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index b5cedf3d70..74ceceffad 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -32,8 +32,30 @@ using qpid::sys::ScopedLock;
using qpid::sys::Mutex;
namespace Rdma {
- // Set packing as this is 'on the wire' structure
+ // Set packing as these are 'on the wire' structures
# pragma pack(push, 1)
+
+ // Header structure for each transmitted frame
+ struct FrameHeader {
+ const static uint32_t FlagsMask = 0xf0000000;
+ uint32_t data; // written in network order
+
+ FrameHeader() {}
+ FrameHeader(uint32_t credit, uint32_t flags = 0) {
+ data = htonl((credit & ~FlagsMask) | (flags & FlagsMask));
+ }
+
+ uint32_t credit() const {
+ return ntohl(data) & ~FlagsMask;
+ }
+
+ uint32_t flags() const {
+ return ntohl(data) & FlagsMask;
+ }
+ };
+
+ const size_t FrameHeaderSize = sizeof(FrameHeader);
+
// Structure for Connection Parameters on the network
//
// The original version (now called 0) of these parameters had a couple of mistakes:
@@ -116,10 +138,10 @@ namespace Rdma {
qp->notifySend();
// Prepost recv buffers before we go any further
- qp->allocateRecvBuffers(recvBufferCount, bufferSize);
+ qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize);
// Create xmit buffers
- qp->createSendBuffers(xmitBufferCount, bufferSize);
+ qp->createSendBuffers(xmitBufferCount, bufferSize+FrameHeaderSize);
}
AsynchIO::~AsynchIO() {
@@ -173,33 +195,58 @@ namespace Rdma {
}
void AsynchIO::queueBuffer(Buffer* buff, int credit) {
- if (!buff) {
- Buffer* ob = getBuffer();
- // Have to send something as adapters hate it when you try to transfer 0 bytes
- *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit);
- ob->dataCount(sizeof(uint32_t));
- qp->postSend(credit | IgnoreData, ob);
- } else if (credit > 0) {
- qp->postSend(credit, buff);
- } else {
- qp->postSend(buff);
+ switch (protocolVersion) {
+ case 0:
+ if (!buff) {
+ Buffer* ob = getBuffer();
+ // Have to send something as adapters hate it when you try to transfer 0 bytes
+ *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit);
+ ob->dataCount(sizeof(uint32_t));
+ qp->postSend(credit | IgnoreData, ob);
+ } else if (credit > 0) {
+ qp->postSend(credit, buff);
+ } else {
+ qp->postSend(buff);
+ }
+ break;
+ case 1:
+ Buffer* ob = buff ? buff : getBuffer();
+ // Add FrameHeader after frame data
+ FrameHeader header(credit);
+ ::memcpy(ob->bytes()+ob->dataCount(), &header, FrameHeaderSize);
+ ob->dataCount(ob->dataCount()+FrameHeaderSize);
+ qp->postSend(ob);
+ break;
}
}
Buffer* AsynchIO::extractBuffer(const QueuePairEvent& e) {
- // Get our xmitCredit if it was sent
- bool dataPresent = true;
- if (e.immPresent() ) {
+ Buffer* b = e.getBuffer();
+ switch (protocolVersion) {
+ case 0: {
+ bool dataPresent = true;
+ // Get our xmitCredit if it was sent
+ if (e.immPresent() ) {
+ assert(xmitCredit>=0);
+ xmitCredit += (e.getImm() & ~FlagsMask);
+ dataPresent = ((e.getImm() & IgnoreData) == 0);
+ assert(xmitCredit>0);
+ }
+ if (!dataPresent) {
+ b->dataCount(0);
+ }
+ break;
+ }
+ case 1:
+ b->dataCount(b->dataCount()-FrameHeaderSize);
+ FrameHeader header;
+ ::memcpy(&header, b->bytes()+b->dataCount(), FrameHeaderSize);
assert(xmitCredit>=0);
- xmitCredit += (e.getImm() & ~FlagsMask);
- dataPresent = ((e.getImm() & IgnoreData) == 0);
- assert(xmitCredit>0);
+ xmitCredit += header.credit();
+ assert(xmitCredit>=0);
+ break;
}
- Buffer* b = e.getBuffer();
- if (!dataPresent) {
- b->dataCount(0);
- }
return b;
}
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index d5da9ee7fe..adf27542fb 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -98,7 +98,7 @@ namespace Rdma {
void returnBuffer(Buffer*);
private:
- const static int maxSupportedProtocolVersion = 0;
+ const static int maxSupportedProtocolVersion = 1;
// Constants for the peer-peer command messages
// These are sent in the high bits if the imm data of an rdma message
@@ -154,8 +154,8 @@ namespace Rdma {
uint16_t initialXmitCredit;
uint16_t rdmaProtocolVersion;
- // Default to protocol version 0
- ConnectionParams(uint32_t s, uint16_t c, uint16_t v = 0) :
+ // Default to protocol version 1
+ ConnectionParams(uint32_t s, uint16_t c, uint16_t v = 1) :
maxRecvBufferSize(s),
initialXmitCredit(c),
rdmaProtocolVersion(v)
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index 8d5545baa8..ec6e6c6b99 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -187,6 +187,7 @@ namespace Rdma {
assert(!freeBuffers.empty());
Buffer* b = &sendBuffers[freeBuffers.back()];
freeBuffers.pop_back();
+ b->dataCount(0);
return b;
}