summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-12-23 17:11:09 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-12-23 17:11:09 +0000
commit632183f9ca6cf81a5720d0205fa4d410a7350a8c (patch)
treea6709d1545f26ad68523b8e14b021df2fe25c5e8 /cpp/src
parent6580b2f89fe3e2b5aae21bab83ffaefa38d04eff (diff)
downloadqpid-python-632183f9ca6cf81a5720d0205fa4d410a7350a8c.tar.gz
Factored rdma sending/receiving code out to make manipulating
credit isolated git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1052325 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp61
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h5
2 files changed, 42 insertions, 24 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index b47165a302..e082fdc416 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -172,18 +172,45 @@ namespace Rdma {
notifyCallback = nc;
}
+ void AsynchIO::queueBuffer(Buffer* buff, int credit) {
+ if (!buff) {
+ Buffer* ob = getBuffer();
+ // Have to send something as adapters hate it when you try to transfer 0 bytes
+ *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit);
+ ob->dataCount(sizeof(uint32_t));
+ qp->postSend(credit | IgnoreData, ob);
+ } else if (credit > 0) {
+ qp->postSend(credit, buff);
+ } else {
+ qp->postSend(buff);
+ }
+ }
+
+ Buffer* AsynchIO::extractBuffer(const QueuePairEvent& e) {
+ // Get our xmitCredit if it was sent
+ bool dataPresent = true;
+ if (e.immPresent() ) {
+ assert(xmitCredit>=0);
+ xmitCredit += (e.getImm() & ~FlagsMask);
+ dataPresent = ((e.getImm() & IgnoreData) == 0);
+ assert(xmitCredit>0);
+ }
+
+ Buffer* b = e.getBuffer();
+ if (!dataPresent) {
+ b->dataCount(0);
+ }
+ return b;
+ }
+
void AsynchIO::queueWrite(Buffer* buff) {
// Make sure we don't overrun our available buffers
// either at our end or the known available at the peers end
if (writable()) {
// TODO: We might want to batch up sending credit
- if (recvCredit > 0) {
- int creditSent = recvCredit & ~FlagsMask;
- qp->postSend(creditSent, buff);
- recvCredit -= creditSent;
- } else {
- qp->postSend(buff);
- }
+ int creditSent = recvCredit & ~FlagsMask;
+ queueBuffer(buff, creditSent);
+ recvCredit -= creditSent;
++outstandingWrites;
--xmitCredit;
assert(xmitCredit>=0);
@@ -315,22 +342,14 @@ namespace Rdma {
// Test if recv (or recv with imm)
//::ibv_wc_opcode eventType = e.getEventType();
- Buffer* b = e.getBuffer();
QueueDirection dir = e.getDirection();
if (dir == RECV) {
++recvEvents;
- // Get our xmitCredit if it was sent
- bool dataPresent = true;
- if (e.immPresent() ) {
- assert(xmitCredit>=0);
- xmitCredit += (e.getImm() & ~FlagsMask);
- dataPresent = ((e.getImm() & IgnoreData) == 0);
- assert(xmitCredit>0);
- }
+ Buffer* b = extractBuffer(e);
// if there was no data sent then the message was only to update our credit
- if ( dataPresent ) {
+ if ( b->dataCount() > 0 ) {
readCallback(*this, b);
}
@@ -347,13 +366,8 @@ namespace Rdma {
// but this is a little unlikely, as to get in this state we have to have received messages without sending any
// for a while so its likely we've received an credit update from the far side.
if (writable()) {
- Buffer* ob = getBuffer();
- // Have to send something as adapters hate it when you try to transfer 0 bytes
- *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(recvCredit);
- ob->dataCount(sizeof(uint32_t));
-
int creditSent = recvCredit & ~FlagsMask;
- qp->postSend(creditSent | IgnoreData, ob);
+ queueBuffer(0, creditSent);
recvCredit -= creditSent;
++outstandingWrites;
--xmitCredit;
@@ -363,6 +377,7 @@ namespace Rdma {
}
}
} else {
+ Buffer* b = e.getBuffer();
++sendEvents;
returnBuffer(b);
--outstandingWrites;
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index 3a543e8ad6..d5da9ee7fe 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -103,7 +103,7 @@ namespace Rdma {
// Constants for the peer-peer command messages
// These are sent in the high bits if the imm data of an rdma message
// The low bits are used to send the credit
- const static int FlagsMask = 0x10000000; // Mask for all flag bits - be sure to update this if you add more command bits
+ const static int FlagsMask = 0xF0000000; // Mask for all flag bits - be sure to update this if you add more command bits
const static int IgnoreData = 0x10000000; // Message contains no application data
void dataEvent();
@@ -112,6 +112,9 @@ namespace Rdma {
void doWriteCallback();
void checkDrained();
void doStoppedCallback();
+
+ void queueBuffer(Buffer* buff, int credit);
+ Buffer* extractBuffer(const QueuePairEvent& e);
};
// We're only writable if: