summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/rdma
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/rdma')
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp247
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp720
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.h250
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp210
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/rdma_exception.h69
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp105
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/rdma_factories.h40
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp566
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h287
9 files changed, 2494 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
new file mode 100644
index 0000000000..38e9b59541
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
+
+#include <netdb.h>
+#include <arpa/inet.h>
+
+#include <vector>
+#include <string>
+#include <iostream>
+#include <algorithm>
+#include <cmath>
+#include <boost/bind.hpp>
+
+using std::vector;
+using std::string;
+using std::cout;
+using std::cerr;
+using std::copy;
+using std::rand;
+
+using qpid::sys::Thread;
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+using qpid::sys::SocketAddress;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_SEC;
+using qpid::sys::TIME_INFINITE;
+
+namespace qpid {
+namespace tests {
+
+// count of messages
+int64_t smsgs = 0;
+int64_t sbytes = 0;
+int64_t rmsgs = 0;
+int64_t rbytes = 0;
+
+int target = 1000000;
+int msgsize = 200;
+AbsTime startTime;
+Duration sendingDuration(TIME_INFINITE);
+Duration fullTestDuration(TIME_INFINITE);
+
+// Random generator
+// This is an RNG designed by George Marsaglia see http://en.wikipedia.org/wiki/Xorshift
+class Xor128Generator {
+ uint32_t x;
+ uint32_t y;
+ uint32_t z;
+ uint32_t w;
+
+public:
+ Xor128Generator() :
+ x(123456789),y(362436069),z(521288629),w(88675123)
+ {++(*this);}
+
+ Xor128Generator& operator++() {
+ uint32_t t = x ^ (x << 11);
+ x = y; y = z; z = w;
+ w = w ^ (w >> 19) ^ t ^ (t >> 8);
+ return *this;
+ }
+
+ uint32_t operator*() {
+ return w;
+ }
+};
+
+Xor128Generator output;
+Xor128Generator input;
+
+void write(Rdma::AsynchIO& aio) {
+ while (aio.writable() && smsgs < target) {
+ Rdma::Buffer* b = aio.getSendBuffer();
+ if (!b) break;
+ b->dataCount(msgsize);
+ uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes());
+ uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t);
+ while (ip != lip) {*ip++ = *output; ++output;}
+ aio.queueWrite(b);
+ ++smsgs;
+ sbytes += msgsize;
+ }
+}
+
+void dataError(Rdma::AsynchIO&) {
+ cout << "Data error:\n";
+}
+
+void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
+ ++rmsgs;
+ rbytes += b->dataCount();
+
+ // Check message is unaltered
+ bool match = true;
+ uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes());
+ uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t);
+ while (ip != lip) { if (*ip++ != *input) {match = false; break;} ++input;}
+ if (!match) {
+ cout << "Data doesn't match: at msg " << rmsgs << " byte " << rbytes-b->dataCount() << " (ish)\n";
+ exit(1);
+ }
+
+ // When all messages have been recvd stop
+ if (rmsgs < target) {
+ write(aio);
+ } else {
+ fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
+ if (aio.incompletedWrites() == 0)
+ p->shutdown();
+ }
+}
+
+void full(Rdma::AsynchIO& a, Rdma::Buffer* b) {
+ // Warn as we shouldn't get here anymore
+ cerr << "!";
+
+ // Don't need to keep buffer just adjust the counts
+ --smsgs;
+ sbytes -= b->dataCount();
+
+ // Give buffer back
+ a.returnSendBuffer(b);
+}
+
+void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
+ if (smsgs < target) {
+ write(aio);
+ } else {
+ sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
+ if (rmsgs >= target && aio.incompletedWrites() == 0)
+ p->shutdown();
+ }
+}
+
+void drained(Rdma::AsynchIO&) {
+ cout << "Drained:\n";
+}
+
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
+ cout << "Connected\n";
+ Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
+
+ Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.rdmaProtocolVersion,
+ cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&data, poller, _1, _2),
+ boost::bind(&idle, poller, _1),
+ &full,
+ dataError);
+
+ startTime = AbsTime::now();
+ write(*aio);
+
+ aio->start(poller);
+}
+
+void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+ cout << "Disconnected\n";
+ p->shutdown();
+}
+
+void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) {
+ cout << "Connection error\n";
+ p->shutdown();
+}
+
+void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
+ cout << "Connection rejected\n";
+ p->shutdown();
+}
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char* argv[]) {
+ vector<string> args(&argv[0], &argv[argc]);
+
+ string host = args[1];
+ string port = (args.size() < 3) ? "20079" : args[2];
+
+ if (args.size() > 3)
+ msgsize = atoi(args[3].c_str());
+ cout << "Message size: " << msgsize << "\n";
+
+ try {
+ boost::shared_ptr<Poller> p(new Poller());
+
+ Rdma::Connector c(
+ Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&connected, p, _1, _2),
+ boost::bind(&connectionError, p, _1, _2),
+ boost::bind(&disconnected, p, _1),
+ boost::bind(&rejected, p, _1, _2));
+
+ SocketAddress sa(host, port);
+ cout << "Connecting to: " << sa.asString() <<"\n";
+ c.start(p, sa);
+
+ // The poller loop blocks all signals so run in its own thread
+ Thread t(*p);
+ t.join();
+ } catch (Rdma::Exception& e) {
+ int err = e.getError();
+ cerr << "Error: " << e.what() << "(" << err << ")\n";
+ }
+
+ cout
+ << "Sent: " << smsgs
+ << "msgs (" << sbytes
+ << "bytes) in: " << double(sendingDuration)/TIME_SEC
+ << "s: " << double(smsgs)*TIME_SEC/sendingDuration
+ << "msgs/s(" << double(sbytes)*TIME_SEC/sendingDuration
+ << "bytes/s)\n";
+ cout
+ << "Recd: " << rmsgs
+ << "msgs (" << rbytes
+ << "bytes) in: " << double(fullTestDuration)/TIME_SEC
+ << "s: " << double(rmsgs)*TIME_SEC/fullTestDuration
+ << "msgs/s(" << double(rbytes)*TIME_SEC/fullTestDuration
+ << "bytes/s)\n";
+
+}
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
new file mode 100644
index 0000000000..78bcdec68e
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -0,0 +1,720 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/rdma/RdmaIO.h"
+
+#include "qpid/log/Statement.h"
+
+#include <string>
+#include <boost/bind.hpp>
+
+using qpid::sys::SocketAddress;
+using qpid::sys::DispatchHandle;
+using qpid::sys::Poller;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
+
+namespace Rdma {
+ // Set packing as these are 'on the wire' structures
+# pragma pack(push, 1)
+
+ // Header structure for each transmitted frame
+ struct FrameHeader {
+ const static uint32_t FlagsMask = 0xf0000000;
+ uint32_t data; // written in network order
+
+ FrameHeader() {}
+ FrameHeader(uint32_t credit, uint32_t flags = 0) {
+ data = htonl((credit & ~FlagsMask) | (flags & FlagsMask));
+ }
+
+ uint32_t credit() const {
+ return ntohl(data) & ~FlagsMask;
+ }
+
+ uint32_t flags() const {
+ return ntohl(data) & FlagsMask;
+ }
+ };
+
+ const size_t FrameHeaderSize = sizeof(FrameHeader);
+
+ // Structure for Connection Parameters on the network
+ //
+ // The original version (now called 0) of these parameters had a couple of mistakes:
+ // * No way to version the protocol (need to introduce a new protocol for iWarp)
+ // * Used host order int32 (but only deployed on LE archs as far as we know)
+ // so effectively was LE on the wire which is the opposite of network order.
+ //
+ // Fortunately the values sent were sufficiently restricted that a 16 bit short could
+ // be carved out to indicate the protocol version as these bits were always sent as 0.
+ //
+ // So the current version of parameters uses the last 2 bytes to indicate the protocol
+ // version, if this is 0 then we interpret the rest of the struct without byte swapping
+ // to remain compatible with the previous protocol.
+ struct NConnectionParams {
+ uint32_t maxRecvBufferSize;
+ uint16_t initialXmitCredit;
+ uint16_t rdmaProtocolVersion;
+
+ NConnectionParams(const ConnectionParams& c) :
+ maxRecvBufferSize(c.rdmaProtocolVersion ? htonl(c.maxRecvBufferSize) : c.maxRecvBufferSize),
+ initialXmitCredit(c.rdmaProtocolVersion ? htons(c.initialXmitCredit) : c.initialXmitCredit),
+ // 0 is the same with/without byteswapping!
+ rdmaProtocolVersion(htons(c.rdmaProtocolVersion))
+ {}
+
+ operator ConnectionParams() const {
+ return
+ ConnectionParams(
+ rdmaProtocolVersion ? ntohl(maxRecvBufferSize) : maxRecvBufferSize,
+ rdmaProtocolVersion ? ntohs(initialXmitCredit) : initialXmitCredit,
+ ntohs(rdmaProtocolVersion));
+ }
+ };
+# pragma pack(pop)
+
+ class IOException : public std::exception {
+ std::string s;
+
+ public:
+ IOException(std::string s0): s(s0) {}
+ ~IOException() throw() {}
+
+ const char* what() const throw() {
+ return s.c_str();
+ }
+ };
+
+ AsynchIO::AsynchIO(
+ QueuePair::intrusive_ptr q,
+ int version,
+ int size,
+ int xCredit,
+ int rCount,
+ ReadCallback rc,
+ IdleCallback ic,
+ FullCallback fc,
+ ErrorCallback ec
+ ) :
+ protocolVersion(version),
+ bufferSize(size),
+ recvCredit(0),
+ xmitCredit(xCredit),
+ recvBufferCount(rCount),
+ xmitBufferCount(xCredit),
+ outstandingWrites(0),
+ draining(false),
+ state(IDLE),
+ qp(q),
+ dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this), 0, 0),
+ readCallback(rc),
+ idleCallback(ic),
+ fullCallback(fc),
+ errorCallback(ec),
+ pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this))
+ {
+ if (protocolVersion > maxSupportedProtocolVersion)
+ throw IOException("Unsupported Rdma Protocol");
+ qp->nonblocking();
+ qp->notifyRecv();
+ qp->notifySend();
+
+ // Prepost recv buffers before we go any further
+ qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize);
+
+ // Create xmit buffers, reserve space for frame header.
+ qp->createSendBuffers(xmitBufferCount, bufferSize, FrameHeaderSize);
+ }
+
+ AsynchIO::~AsynchIO() {
+ // Warn if we are deleting whilst there are still unreclaimed write buffers
+ if ( outstandingWrites>0 )
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
+
+ // Turn off callbacks if necessary (before doing the deletes)
+ if (state != STOPPED) {
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
+ dataHandle.stopWatch();
+ }
+ // TODO: It might turn out to be more efficient in high connection loads to reuse the
+ // buffers rather than having to reregister them all the time (this would be straightforward if all
+ // connections haver the same buffer size and harder otherwise)
+ }
+
+ void AsynchIO::start(Poller::shared_ptr poller) {
+ dataHandle.startWatch(poller);
+ }
+
+ // State constraints
+ // On entry: None
+ // On exit: STOPPED
+ // Mark for deletion/Delete this object when we have no outstanding writes
+ void AsynchIO::stop(NotifyCallback nc) {
+ ScopedLock<Mutex> l(stateLock);
+ state = STOPPED;
+ notifyCallback = nc;
+ dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this));
+ }
+
+ namespace {
+ void requestedCall(AsynchIO* aio, AsynchIO::RequestCallback callback) {
+ assert(callback);
+ callback(*aio);
+ }
+ }
+
+ void AsynchIO::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ dataHandle.call(boost::bind(&requestedCall, this, callback));
+ }
+
+ // Mark writing closed (so we don't accept any more writes or make any idle callbacks)
+ void AsynchIO::drainWriteQueue(NotifyCallback nc) {
+ draining = true;
+ notifyCallback = nc;
+ }
+
+ void AsynchIO::queueBuffer(Buffer* buff, int credit) {
+ switch (protocolVersion) {
+ case 0:
+ if (!buff) {
+ Buffer* ob = getSendBuffer();
+ // Have to send something as adapters hate it when you try to transfer 0 bytes
+ *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit);
+ ob->dataCount(sizeof(uint32_t));
+ qp->postSend(credit | IgnoreData, ob);
+ } else if (credit > 0) {
+ qp->postSend(credit, buff);
+ } else {
+ qp->postSend(buff);
+ }
+ break;
+ case 1:
+ if (!buff)
+ buff = getSendBuffer();
+ // Add FrameHeader after frame data
+ FrameHeader header(credit);
+ assert(buff->dataCount() <= buff->byteCount()); // ensure app data doesn't impinge on reserved space.
+ ::memcpy(buff->bytes()+buff->dataCount(), &header, FrameHeaderSize);
+ buff->dataCount(buff->dataCount()+FrameHeaderSize);
+ qp->postSend(buff);
+ break;
+ }
+ }
+
+ Buffer* AsynchIO::extractBuffer(const QueuePairEvent& e) {
+ Buffer* b = e.getBuffer();
+ switch (protocolVersion) {
+ case 0: {
+ bool dataPresent = true;
+ // Get our xmitCredit if it was sent
+ if (e.immPresent() ) {
+ assert(xmitCredit>=0);
+ xmitCredit += (e.getImm() & ~FlagsMask);
+ dataPresent = ((e.getImm() & IgnoreData) == 0);
+ assert(xmitCredit>0);
+ }
+ if (!dataPresent) {
+ b->dataCount(0);
+ }
+ break;
+ }
+ case 1:
+ b->dataCount(b->dataCount()-FrameHeaderSize);
+ FrameHeader header;
+ ::memcpy(&header, b->bytes()+b->dataCount(), FrameHeaderSize);
+ assert(xmitCredit>=0);
+ xmitCredit += header.credit();
+ assert(xmitCredit>=0);
+ break;
+ }
+
+ return b;
+ }
+
+ void AsynchIO::queueWrite(Buffer* buff) {
+ // Make sure we don't overrun our available buffers
+ // either at our end or the known available at the peers end
+ if (writable()) {
+ // TODO: We might want to batch up sending credit
+ int creditSent = recvCredit & ~FlagsMask;
+ queueBuffer(buff, creditSent);
+ recvCredit -= creditSent;
+ ++outstandingWrites;
+ --xmitCredit;
+ assert(xmitCredit>=0);
+ } else {
+ if (fullCallback) {
+ fullCallback(*this, buff);
+ } else {
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away");
+ returnSendBuffer(buff);
+ }
+ }
+ }
+
+ // State constraints
+ // On entry: None
+ // On exit: NOTIFY_PENDING || STOPPED
+ void AsynchIO::notifyPendingWrite() {
+ ScopedLock<Mutex> l(stateLock);
+ switch (state) {
+ case IDLE:
+ dataHandle.call(pendingWriteAction);
+ // Fall Thru
+ case NOTIFY:
+ state = NOTIFY_PENDING;
+ break;
+ case NOTIFY_PENDING:
+ case STOPPED:
+ break;
+ }
+ }
+
+ // State constraints
+ // On entry: IDLE || STOPPED
+ // On exit: IDLE || STOPPED
+ void AsynchIO::dataEvent() {
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ if (state == STOPPED) return;
+
+ state = NOTIFY_PENDING;
+ }
+ processCompletions();
+
+ writeEvent();
+ }
+
+ // State constraints
+ // On entry: NOTIFY_PENDING || STOPPED
+ // On exit: IDLE || STOPPED
+ void AsynchIO::writeEvent() {
+ State newState;
+ do {
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ switch (state) {
+ case STOPPED:
+ return;
+ default:
+ state = NOTIFY;
+ }
+ }
+
+ doWriteCallback();
+
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ newState = state;
+ switch (newState) {
+ case NOTIFY_PENDING:
+ case STOPPED:
+ break;
+ default:
+ state = IDLE;
+ }
+ }
+ } while (newState == NOTIFY_PENDING);
+ }
+
+ void AsynchIO::processCompletions() {
+ QueuePair::intrusive_ptr q = qp->getNextChannelEvent();
+
+ // Re-enable notification for queue:
+ // This needs to happen before we could do anything that could generate more work completion
+ // events (ie the callbacks etc. in the following).
+ // This can't make us reenter this code as the handle attached to the completion queue will still be
+ // disabled by the poller until we leave this code
+ qp->notifyRecv();
+ qp->notifySend();
+
+ int recvEvents = 0;
+ int sendEvents = 0;
+
+ // If no event do nothing
+ if (!q)
+ return;
+
+ assert(q == qp);
+
+ // Repeat until no more events
+ do {
+ QueuePairEvent e(qp->getNextEvent());
+ if (!e)
+ break;
+
+ ::ibv_wc_status status = e.getEventStatus();
+ if (status != IBV_WC_SUCCESS) {
+ // Need special check for IBV_WC_WR_FLUSH_ERR here
+ // we will get this for every send/recv queue entry that was pending
+ // when disconnected, these aren't real errors and mostly need to be ignored
+ if (status == IBV_WC_WR_FLUSH_ERR) {
+ QueueDirection dir = e.getDirection();
+ if (dir == SEND) {
+ Buffer* b = e.getBuffer();
+ ++sendEvents;
+ returnSendBuffer(b);
+ --outstandingWrites;
+ } else {
+ ++recvEvents;
+ }
+ continue;
+ }
+ errorCallback(*this);
+ // TODO: Probably need to flush queues at this point
+ return;
+ }
+
+ // Test if recv (or recv with imm)
+ //::ibv_wc_opcode eventType = e.getEventType();
+ QueueDirection dir = e.getDirection();
+ if (dir == RECV) {
+ ++recvEvents;
+
+ Buffer* b = extractBuffer(e);
+
+ // if there was no data sent then the message was only to update our credit
+ if ( b->dataCount() > 0 ) {
+ readCallback(*this, b);
+ }
+
+ // At this point the buffer has been consumed so put it back on the recv queue
+ // TODO: Is this safe to do if the connection is disconnected already?
+ qp->postRecv(b);
+
+ // Received another message
+ ++recvCredit;
+
+ // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently)
+ if (recvCredit > recvBufferCount/2) {
+ // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message
+ // but this is a little unlikely, as to get in this state we have to have received messages without sending any
+ // for a while so its likely we've received an credit update from the far side.
+ if (writable()) {
+ int creditSent = recvCredit & ~FlagsMask;
+ queueBuffer(0, creditSent);
+ recvCredit -= creditSent;
+ ++outstandingWrites;
+ --xmitCredit;
+ assert(xmitCredit>=0);
+ } else {
+ QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit");
+ }
+ }
+ } else {
+ Buffer* b = e.getBuffer();
+ ++sendEvents;
+ returnSendBuffer(b);
+ --outstandingWrites;
+ }
+ } while (true);
+
+ // Not sure if this is expected or not
+ if (recvEvents == 0 && sendEvents == 0) {
+ QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions");
+ }
+ }
+
+ void AsynchIO::doWriteCallback() {
+ // TODO: maybe don't call idle unless we're low on write buffers
+ // Keep on calling the idle routine as long as we are writable and we got something to write last call
+
+ // Do callback even if there are no available free buffers as the application itself might be
+ // holding onto buffers
+ while (writable()) {
+ int xc = xmitCredit;
+ idleCallback(*this);
+ // Check whether we actually wrote anything
+ if (xmitCredit == xc) {
+ QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit);
+ return;
+ }
+ }
+
+ checkDrained();
+ }
+
+ void AsynchIO::checkDrained() {
+ // If we've got all the write confirmations and we're draining
+ // We might get deleted in the drained callback so return immediately
+ if (draining) {
+ if (outstandingWrites == 0) {
+ draining = false;
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ nc(*this);
+ }
+ return;
+ }
+ }
+
+ void AsynchIO::doStoppedCallback() {
+ // Ensure we can't get any more callbacks (except for the stopped callback)
+ dataHandle.stopWatch();
+
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ nc(*this);
+ }
+
+ ConnectionManager::ConnectionManager(
+ ErrorCallback errc,
+ DisconnectedCallback dc
+ ) :
+ state(IDLE),
+ ci(Connection::make()),
+ handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0),
+ errorCallback(errc),
+ disconnectedCallback(dc)
+ {
+ QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager");
+ ci->nonblocking();
+ }
+
+ ConnectionManager::~ConnectionManager()
+ {
+ QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager");
+ }
+
+ void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) {
+ startConnection(ci, addr);
+ handle.startWatch(poller);
+ }
+
+ void ConnectionManager::doStoppedCallback() {
+ // Ensure we can't get any more callbacks (except for the stopped callback)
+ handle.stopWatch();
+
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ nc(*this);
+ }
+
+ void ConnectionManager::stop(NotifyCallback nc) {
+ state = STOPPED;
+ notifyCallback = nc;
+ handle.call(boost::bind(&ConnectionManager::doStoppedCallback, this));
+ }
+
+ void ConnectionManager::event(DispatchHandle&) {
+ if (state.get() == STOPPED) return;
+ connectionEvent(ci);
+ }
+
+ Listener::Listener(
+ const ConnectionParams& cp,
+ EstablishedCallback ec,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ ConnectionRequestCallback crc
+ ) :
+ ConnectionManager(errc, dc),
+ checkConnectionParams(cp),
+ connectionRequestCallback(crc),
+ establishedCallback(ec)
+ {
+ }
+
+ void Listener::startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) {
+ ci->bind(addr);
+ ci->listen();
+ }
+
+ namespace {
+ const int64_t PoisonContext = -1;
+ }
+
+ void Listener::connectionEvent(Connection::intrusive_ptr ci) {
+ ConnectionEvent e(ci->getNextEvent());
+
+ // If (for whatever reason) there was no event do nothing
+ if (!e)
+ return;
+
+ // Important documentation ommision the new rdma_cm_id
+ // you get from CONNECT_REQUEST has the same context info
+ // as its parent listening rdma_cm_id
+ ::rdma_cm_event_type eventType = e.getEventType();
+ ::rdma_conn_param conn_param = e.getConnectionParam();
+ Rdma::Connection::intrusive_ptr id = e.getConnection();
+
+ // Check for previous disconnection (it appears that you actually can get connection
+ // request events after a disconnect event in rare circumstances)
+ if (reinterpret_cast<int64_t>(id->getContext<void*>())==PoisonContext)
+ return;
+
+ switch (eventType) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST: {
+ // Make sure peer has sent params we can use
+ if (!conn_param.private_data || conn_param.private_data_len < sizeof(NConnectionParams)) {
+ QPID_LOG(warning, "Rdma: rejecting connection attempt: unusable connection parameters");
+ id->reject();
+ break;
+ }
+
+ const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data);
+ ConnectionParams cp = *rcp;
+
+ // Reject if requested msg size is bigger than we allow
+ if (
+ cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize ||
+ cp.initialXmitCredit > checkConnectionParams.initialXmitCredit
+ ) {
+ QPID_LOG(warning, "Rdma: rejecting connection attempt: connection parameters out of range: ("
+ << cp.maxRecvBufferSize << ">" << checkConnectionParams.maxRecvBufferSize << " || "
+ << cp.initialXmitCredit << ">" << checkConnectionParams.initialXmitCredit
+ << ")");
+ id->reject(&checkConnectionParams);
+ break;
+ }
+
+ bool accept = true;
+ if (connectionRequestCallback)
+ accept = connectionRequestCallback(id, cp);
+
+ if (accept) {
+ // Accept connection
+ cp.initialXmitCredit = checkConnectionParams.initialXmitCredit;
+ id->accept(conn_param, rcp);
+ } else {
+ // Reject connection
+ QPID_LOG(warning, "Rdma: rejecting connection attempt: application policy");
+ id->reject();
+ }
+ break;
+ }
+ case RDMA_CM_EVENT_ESTABLISHED:
+ establishedCallback(id);
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ disconnectedCallback(id);
+ // Poison the id context so that we do no more callbacks on it
+ id->removeContext();
+ id->addContext(reinterpret_cast<void*>(PoisonContext));
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ errorCallback(id, CONNECT_ERROR);
+ break;
+ default:
+ // Unexpected response
+ errorCallback(id, UNKNOWN);
+ //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
+ }
+ }
+
+ Connector::Connector(
+ const ConnectionParams& cp,
+ ConnectedCallback cc,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ RejectedCallback rc
+ ) :
+ ConnectionManager(errc, dc),
+ connectionParams(cp),
+ rejectedCallback(rc),
+ connectedCallback(cc)
+ {
+ }
+
+ void Connector::startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) {
+ ci->resolve_addr(addr);
+ }
+
+ void Connector::connectionEvent(Connection::intrusive_ptr ci) {
+ ConnectionEvent e(ci->getNextEvent());
+
+ // If (for whatever reason) there was no event do nothing
+ if (!e)
+ return;
+
+ ::rdma_cm_event_type eventType = e.getEventType();
+ ::rdma_conn_param conn_param = e.getConnectionParam();
+ Rdma::Connection::intrusive_ptr id = e.getConnection();
+ switch (eventType) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ // RESOLVE_ADDR
+ ci->resolve_route();
+ break;
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ // RESOLVE_ADDR
+ errorCallback(ci, ADDR_ERROR);
+ break;
+ case RDMA_CM_EVENT_ROUTE_RESOLVED: {
+ // RESOLVE_ROUTE:
+ NConnectionParams rcp(connectionParams);
+ ci->connect(&rcp);
+ break;
+ }
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ // RESOLVE_ROUTE:
+ errorCallback(ci, ROUTE_ERROR);
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ // CONNECTING
+ errorCallback(ci, CONNECT_ERROR);
+ break;
+ case RDMA_CM_EVENT_UNREACHABLE:
+ // CONNECTING
+ errorCallback(ci, UNREACHABLE);
+ break;
+ case RDMA_CM_EVENT_REJECTED: {
+ // CONNECTING
+
+ // We can get this event if our peer is not running on the other side
+ // in this case we could get nearly anything in the private data:
+ // From private_data == 0 && private_data_len == 0 (Chelsio iWarp)
+ // to 148 bytes of zeros (Mellanox IB)
+ //
+ // So assume that if the the private data is absent or not the size of
+ // the connection parameters it isn't valid
+ ConnectionParams cp(0, 0, 0);
+ if (conn_param.private_data && conn_param.private_data_len == sizeof(NConnectionParams)) {
+ // Extract private data from event
+ const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data);
+ cp = *rcp;
+ }
+ rejectedCallback(ci, cp);
+ break;
+ }
+ case RDMA_CM_EVENT_ESTABLISHED: {
+ // CONNECTING
+ // Extract private data from event
+ assert(conn_param.private_data && conn_param.private_data_len >= sizeof(NConnectionParams));
+ const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data);
+ ConnectionParams cp = *rcp;
+ connectedCallback(ci, cp);
+ break;
+ }
+ case RDMA_CM_EVENT_DISCONNECTED:
+ // ESTABLISHED
+ disconnectedCallback(ci);
+ break;
+ default:
+ QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType);
+ }
+ }
+}
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
new file mode 100644
index 0000000000..ec9caaf08d
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -0,0 +1,250 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef Rdma_Acceptor_h
+#define Rdma_Acceptor_h
+
+#include "qpid/sys/rdma/rdma_wrap.h"
+
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/SocketAddress.h"
+
+#include <netinet/in.h>
+
+#include <boost/function.hpp>
+
+namespace Rdma {
+
+ class Connection;
+
+ class AsynchIO
+ {
+ typedef boost::function1<void, AsynchIO&> ErrorCallback;
+ typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
+ typedef boost::function1<void, AsynchIO&> IdleCallback;
+ typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback;
+ typedef boost::function1<void, AsynchIO&> NotifyCallback;
+
+ int protocolVersion;
+ int bufferSize;
+ int recvCredit;
+ int xmitCredit;
+ int recvBufferCount;
+ int xmitBufferCount;
+ int outstandingWrites;
+ bool draining;
+ enum State {IDLE, NOTIFY, NOTIFY_PENDING, STOPPED};
+ State state;
+ qpid::sys::Mutex stateLock;
+ QueuePair::intrusive_ptr qp;
+ qpid::sys::DispatchHandleRef dataHandle;
+
+ ReadCallback readCallback;
+ IdleCallback idleCallback;
+ FullCallback fullCallback;
+ ErrorCallback errorCallback;
+ NotifyCallback notifyCallback;
+ qpid::sys::DispatchHandle::Callback pendingWriteAction;
+
+ public:
+ typedef boost::function1<void, AsynchIO&> RequestCallback;
+
+ // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use
+ // for buffers both read and write (allocate half to each up front) and fail if we cannot allocate that much
+ // locked memory
+ AsynchIO(
+ QueuePair::intrusive_ptr q,
+ int version,
+ int size,
+ int xCredit,
+ int rCount,
+ ReadCallback rc,
+ IdleCallback ic,
+ FullCallback fc,
+ ErrorCallback ec
+ );
+ ~AsynchIO();
+
+ void start(qpid::sys::Poller::shared_ptr poller);
+ bool writable() const;
+ void queueWrite(Buffer* buff);
+ void notifyPendingWrite();
+ void drainWriteQueue(NotifyCallback);
+ void stop(NotifyCallback);
+ void requestCallback(RequestCallback);
+ int incompletedWrites() const;
+ Buffer* getSendBuffer();
+ void returnSendBuffer(Buffer*);
+
+ private:
+ const static int maxSupportedProtocolVersion = 1;
+
+ // Constants for the peer-peer command messages
+ // These are sent in the high bits if the imm data of an rdma message
+ // The low bits are used to send the credit
+ const static int FlagsMask = 0xF0000000; // Mask for all flag bits - be sure to update this if you add more command bits
+ const static int IgnoreData = 0x10000000; // Message contains no application data
+
+ void dataEvent();
+ void writeEvent();
+ void processCompletions();
+ void doWriteCallback();
+ void checkDrained();
+ void doStoppedCallback();
+
+ void queueBuffer(Buffer* buff, int credit);
+ Buffer* extractBuffer(const QueuePairEvent& e);
+ };
+
+ // We're only writable if:
+ // * not draining write queue
+ // * we've got space in the transmit queue
+ // * we've got credit to transmit
+ // * if there's only 1 transmit credit we must send some credit
+ inline bool AsynchIO::writable() const {
+ assert(xmitCredit>=0);
+ return !draining &&
+ outstandingWrites < xmitBufferCount &&
+ xmitCredit > 0 &&
+ ( xmitCredit > 1 || recvCredit > 0);
+ }
+
+ inline int AsynchIO::incompletedWrites() const {
+ return outstandingWrites;
+ }
+
+ inline Buffer* AsynchIO::getSendBuffer() {
+ return qp->getSendBuffer();
+ }
+
+ inline void AsynchIO::returnSendBuffer(Buffer* b) {
+ qp->returnSendBuffer(b);
+ }
+
+ // These are the parameters necessary to start the conversation
+ // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer
+ // * Each peer HAS to know the initial "credit" it has for transmitting to its peer
+ struct ConnectionParams {
+ uint32_t maxRecvBufferSize;
+ uint16_t initialXmitCredit;
+ uint16_t rdmaProtocolVersion;
+
+ // Default to protocol version 1
+ ConnectionParams(uint32_t s, uint16_t c, uint16_t v = 1) :
+ maxRecvBufferSize(s),
+ initialXmitCredit(c),
+ rdmaProtocolVersion(v)
+ {}
+ };
+
+ enum ErrorType {
+ ADDR_ERROR,
+ ROUTE_ERROR,
+ CONNECT_ERROR,
+ UNREACHABLE,
+ UNKNOWN
+ };
+
+ typedef boost::function2<void, Rdma::Connection::intrusive_ptr, ErrorType> ErrorCallback;
+ typedef boost::function1<void, Rdma::Connection::intrusive_ptr> DisconnectedCallback;
+
+ class ConnectionManager {
+ typedef boost::function1<void, ConnectionManager&> NotifyCallback;
+
+ enum State {IDLE, STOPPED};
+ qpid::sys::AtomicValue<State> state;
+ Connection::intrusive_ptr ci;
+ qpid::sys::DispatchHandleRef handle;
+ NotifyCallback notifyCallback;
+
+ protected:
+ ErrorCallback errorCallback;
+ DisconnectedCallback disconnectedCallback;
+
+ public:
+ ConnectionManager(
+ ErrorCallback errc,
+ DisconnectedCallback dc
+ );
+
+ virtual ~ConnectionManager();
+
+ void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr);
+ void stop(NotifyCallback);
+
+ private:
+ void event(qpid::sys::DispatchHandle& handle);
+ void doStoppedCallback();
+
+ virtual void startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) = 0;
+ virtual void connectionEvent(Connection::intrusive_ptr ci) = 0;
+ };
+
+ typedef boost::function2<bool, Rdma::Connection::intrusive_ptr, const ConnectionParams&> ConnectionRequestCallback;
+ typedef boost::function1<void, Rdma::Connection::intrusive_ptr> EstablishedCallback;
+
+ class Listener : public ConnectionManager
+ {
+ ConnectionParams checkConnectionParams;
+ ConnectionRequestCallback connectionRequestCallback;
+ EstablishedCallback establishedCallback;
+
+ public:
+ Listener(
+ const ConnectionParams& cp,
+ EstablishedCallback ec,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ ConnectionRequestCallback crc = 0
+ );
+
+ private:
+ void startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr);
+ void connectionEvent(Connection::intrusive_ptr ci);
+ };
+
+ typedef boost::function2<void, Rdma::Connection::intrusive_ptr, const ConnectionParams&> RejectedCallback;
+ typedef boost::function2<void, Rdma::Connection::intrusive_ptr, const ConnectionParams&> ConnectedCallback;
+
+ class Connector : public ConnectionManager
+ {
+ ConnectionParams connectionParams;
+ RejectedCallback rejectedCallback;
+ ConnectedCallback connectedCallback;
+
+ public:
+ Connector(
+ const ConnectionParams& cp,
+ ConnectedCallback cc,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ RejectedCallback rc = 0
+ );
+
+ private:
+ void startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr);
+ void connectionEvent(Connection::intrusive_ptr ci);
+ };
+}
+
+#endif // Rdma_Acceptor_h
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
new file mode 100644
index 0000000000..9b0710fd8f
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+
+#include <arpa/inet.h>
+
+#include <vector>
+#include <queue>
+#include <string>
+#include <iostream>
+
+#include <boost/bind.hpp>
+
+using std::vector;
+using std::queue;
+using std::string;
+using std::cout;
+using std::cerr;
+
+using qpid::sys::Thread;
+using qpid::sys::SocketAddress;
+using qpid::sys::Poller;
+
+// All the accepted connections
+namespace qpid {
+namespace tests {
+
+struct Buffer {
+ char* bytes() const {return bytes_;}
+ int32_t byteCount() const {return size;}
+
+ Buffer(const int32_t s):
+ bytes_(new char[s]),
+ size(s)
+ {
+ }
+
+ ~Buffer() {
+ delete [] bytes_;
+ }
+private:
+ char* bytes_;
+ int32_t size;
+};
+
+struct ConRec {
+ Rdma::Connection::intrusive_ptr connection;
+ Rdma::AsynchIO* data;
+ queue<Buffer*> queuedWrites;
+
+ ConRec(Rdma::Connection::intrusive_ptr c) :
+ connection(c)
+ {}
+};
+
+void dataError(Rdma::AsynchIO&) {
+ cout << "Data error:\n";
+}
+
+void idle(ConRec* cr, Rdma::AsynchIO& a) {
+ // Need to make sure full is not called as it would reorder messages
+ while (!cr->queuedWrites.empty() && a.writable()) {
+ Rdma::Buffer* rbuf = a.getSendBuffer();
+ if (!rbuf) break;
+ Buffer* buf = cr->queuedWrites.front();
+ cr->queuedWrites.pop();
+ std::copy(buf->bytes(), buf->bytes()+buf->byteCount(), rbuf->bytes());
+ rbuf->dataCount(buf->byteCount());
+ delete buf;
+ a.queueWrite(rbuf);
+ }
+}
+
+void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
+ // Echo data back
+ Rdma::Buffer* buf = 0;
+ if (cr->queuedWrites.empty() && a.writable()) {
+ buf = a.getSendBuffer();
+ }
+ if (buf) {
+ std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes());
+ buf->dataCount(b->dataCount());
+ a.queueWrite(buf);
+ } else {
+ Buffer* buf = new Buffer(b->dataCount());
+ std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes());
+ cr->queuedWrites.push(buf);
+ // Try to empty queue
+ idle(cr, a);
+ }
+}
+
+void full(ConRec*, Rdma::AsynchIO&, Rdma::Buffer*) {
+ // Shouldn't ever be called
+ cout << "!";
+}
+
+void drained(Rdma::AsynchIO&) {
+ cout << "Drained:\n";
+}
+
+void disconnected(Rdma::Connection::intrusive_ptr& ci) {
+ ConRec* cr = ci->getContext<ConRec>();
+ cr->connection->disconnect();
+ cr->data->drainWriteQueue(drained);
+ delete cr;
+ cout << "Disconnected: " << cr << "\n";
+}
+
+void connectionError(Rdma::Connection::intrusive_ptr& ci, Rdma::ErrorType) {
+ ConRec* cr = ci->getContext<ConRec>();
+ cr->connection->disconnect();
+ if (cr) {
+ cr->data->drainWriteQueue(drained);
+ delete cr;
+ }
+ cout << "Connection error: " << cr << "\n";
+}
+
+bool connectionRequest(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
+ cout << "Incoming connection: ";
+
+ // For fun reject alternate connection attempts
+ static bool x = false;
+ x = true;
+
+ // Must create aio here so as to prepost buffers *before* we accept connection
+ if (x) {
+ ConRec* cr = new ConRec(ci);
+ Rdma::AsynchIO* aio =
+ new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.rdmaProtocolVersion,
+ cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(data, cr, _1, _2),
+ boost::bind(idle, cr, _1),
+ boost::bind(full, cr, _1, _2),
+ dataError);
+ ci->addContext(cr);
+ cr->data = aio;
+ cout << "Accept=>" << cr << "\n";
+ } else {
+ cout << "Reject\n";
+ }
+
+ return x;
+}
+
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
+ static int cnt = 0;
+ ConRec* cr = ci->getContext<ConRec>();
+ cout << "Connected: " << cr << "(" << ++cnt << ")\n";
+
+ cr->data->start(poller);
+}
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char* argv[]) {
+ vector<string> args(&argv[0], &argv[argc]);
+
+ std::string port = (args.size() < 2) ? "20079" : args[1];
+ cout << "Listening on port: " << port << "\n";
+
+ try {
+ boost::shared_ptr<Poller> p(new Poller());
+
+ Rdma::Listener a(
+ Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(connected, p, _1),
+ connectionError,
+ disconnected,
+ connectionRequest);
+
+
+ SocketAddress sa("", port);
+ a.start(p, sa);
+
+ // The poller loop blocks all signals so run in its own thread
+ Thread t(*p);
+
+ ::pause();
+ p->shutdown();
+ t.join();
+ } catch (Rdma::Exception& e) {
+ int err = e.getError();
+ cerr << "Error: " << e.what() << "(" << err << ")\n";
+ }
+}
diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h b/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h
new file mode 100644
index 0000000000..a3a289e38a
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef RDMA_EXCEPTION_H
+#define RDMA_EXCEPTION_H
+
+#include <exception>
+
+#include <errno.h>
+#include <string.h>
+
+namespace Rdma {
+ static __thread char s[50];
+ class Exception : public std::exception {
+ int err;
+
+ public:
+ Exception(int e) : err(e) {}
+ int getError() { return err; }
+ const char* what() const throw() {
+ return ::strerror_r(err, s, 50);
+ }
+ };
+
+ inline void THROW_ERRNO() {
+ throw Rdma::Exception(errno);
+ }
+
+ inline void CHECK(int rc) {
+ if (rc != 0)
+ throw Rdma::Exception((rc == -1) ? errno : rc >0 ? rc : -rc);
+ }
+
+ inline int GETERR(int rc) {
+ return (rc == -1) ? errno : rc > 0 ? rc : -rc;
+ }
+
+ inline void CHECK_IBV(int rc) {
+ if (rc != 0)
+ throw Rdma::Exception(rc);
+ }
+
+ template <typename T>
+ inline
+ T* CHECK_NULL(T* rc) {
+ if (rc == 0)
+ THROW_ERRNO();
+ return rc;
+ }
+}
+
+#endif // RDMA_EXCEPTION_H
diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp b/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
new file mode 100644
index 0000000000..a66f5b4035
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/rdma/rdma_factories.h"
+
+#include "qpid/sys/rdma/rdma_exception.h"
+
+
+namespace Rdma {
+ // Intentionally ignore return values for these functions
+ // - we can't do anything about then anyway
+ void acker(::rdma_cm_event* e) throw () {
+ if (e) (void) ::rdma_ack_cm_event(e);
+ }
+
+ void destroyEChannel(::rdma_event_channel* c) throw () {
+ if (c) (void) ::rdma_destroy_event_channel(c);
+ }
+
+ void destroyId(::rdma_cm_id* i) throw () {
+ if (i) (void) ::rdma_destroy_id(i);
+ }
+
+ void deallocPd(::ibv_pd* p) throw () {
+ if (p) (void) ::ibv_dealloc_pd(p);
+ }
+
+ void deregMr(::ibv_mr* mr) throw () {
+ if (mr) (void) ::ibv_dereg_mr(mr);
+ }
+
+ void destroyCChannel(::ibv_comp_channel* c) throw () {
+ if (c) (void) ::ibv_destroy_comp_channel(c);
+ }
+
+ void destroyCq(::ibv_cq* cq) throw () {
+ if (cq) (void) ::ibv_destroy_cq(cq);
+ }
+
+ void destroyQp(::ibv_qp* qp) throw () {
+ if (qp) (void) ::ibv_destroy_qp(qp);
+ }
+
+ boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i) {
+ return boost::shared_ptr< ::rdma_cm_id >(i, destroyId);
+ }
+
+ boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e) {
+ return boost::shared_ptr< ::rdma_cm_event >(e, acker);
+ }
+
+ boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp) {
+ return boost::shared_ptr< ::ibv_qp > (qp, destroyQp);
+ }
+
+ boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
+ ::rdma_event_channel* c = CHECK_NULL(::rdma_create_event_channel());
+ return boost::shared_ptr< ::rdma_event_channel >(c, destroyEChannel);
+ }
+
+ boost::shared_ptr< ::rdma_cm_id >
+ mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) {
+ ::rdma_cm_id* i;
+ CHECK(::rdma_create_id(ec, &i, context, ps));
+ return mkId(i);
+ }
+
+ boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) {
+ ::ibv_pd* pd = CHECK_NULL(::ibv_alloc_pd(c));
+ return boost::shared_ptr< ::ibv_pd >(pd, deallocPd);
+ }
+
+ boost::shared_ptr< ::ibv_mr > regMr(::ibv_pd* pd, void* addr, size_t length, ::ibv_access_flags access) {
+ ::ibv_mr* mr = CHECK_NULL(::ibv_reg_mr(pd, addr, length, access));
+ return boost::shared_ptr< ::ibv_mr >(mr, deregMr);
+ }
+
+ boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) {
+ ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c));
+ return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel);
+ }
+
+ boost::shared_ptr< ::ibv_cq >
+ mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) {
+ ::ibv_cq* cq = CHECK_NULL(::ibv_create_cq(c, cqe, context, cc, 0));
+ return boost::shared_ptr< ::ibv_cq >(cq, destroyCq);
+ }
+}
diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h b/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
new file mode 100644
index 0000000000..bfca71fc7e
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef RDMA_FACTORIES_H
+#define RDMA_FACTORIES_H
+
+#include <rdma/rdma_cma.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace Rdma {
+ boost::shared_ptr< ::rdma_event_channel > mkEChannel();
+ boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps);
+ boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i);
+ boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e);
+ boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp);
+ boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c);
+ boost::shared_ptr< ::ibv_mr > regMr(::ibv_pd* pd, void* addr, size_t length, ::ibv_access_flags access);
+ boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c);
+ boost::shared_ptr< ::ibv_cq > mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc);
+}
+
+#endif // RDMA_FACTORIES_H
diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
new file mode 100644
index 0000000000..efe454c5be
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -0,0 +1,566 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/rdma/rdma_wrap.h"
+
+#include "qpid/sys/rdma/rdma_factories.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <fcntl.h>
+#include <netdb.h>
+
+#include <iostream>
+#include <stdexcept>
+
+namespace Rdma {
+ const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
+ 0, // .private_data
+ 0, // .private_data_len
+ 4, // .responder_resources
+ 4, // .initiator_depth
+ 0, // .flow_control
+ 5, // .retry_count
+ 7 // .rnr_retry_count
+ };
+
+ // This is moderately inefficient so don't use in a critical path
+ int deviceCount() {
+ int count;
+ ::ibv_free_device_list(::ibv_get_device_list(&count));
+ return count;
+ }
+
+ Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount,
+ const int32_t reserve) :
+ bufferSize(byteCount + reserve), reserved(reserve)
+ {
+ sge.addr = (uintptr_t) bytes;
+ sge.length = 0;
+ sge.lkey = lkey;
+ }
+
+ QueuePairEvent::QueuePairEvent() :
+ dir(NONE)
+ {}
+
+ QueuePairEvent::QueuePairEvent(
+ const ::ibv_wc& w,
+ boost::shared_ptr< ::ibv_cq > c,
+ QueueDirection d) :
+ cq(c),
+ wc(w),
+ dir(d)
+ {
+ assert(dir != NONE);
+ }
+
+ QueuePairEvent::operator bool() const {
+ return dir != NONE;
+ }
+
+ bool QueuePairEvent::immPresent() const {
+ return wc.wc_flags & IBV_WC_WITH_IMM;
+ }
+
+ uint32_t QueuePairEvent::getImm() const {
+ return ntohl(wc.imm_data);
+ }
+
+ QueueDirection QueuePairEvent::getDirection() const {
+ return dir;
+ }
+
+ ::ibv_wc_opcode QueuePairEvent::getEventType() const {
+ return wc.opcode;
+ }
+
+ ::ibv_wc_status QueuePairEvent::getEventStatus() const {
+ return wc.status;
+ }
+
+ Buffer* QueuePairEvent::getBuffer() const {
+ Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
+ b->dataCount(wc.byte_len);
+ return b;
+ }
+
+ QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ pd(allocPd(i->verbs)),
+ cchannel(mkCChannel(i->verbs)),
+ scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+ rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+ outstandingSendEvents(0),
+ outstandingRecvEvents(0)
+ {
+ impl->fd = cchannel->fd;
+
+ // Set cq context to this QueuePair object so we can find
+ // ourselves again
+ scq->cq_context = this;
+ rcq->cq_context = this;
+
+ ::ibv_device_attr dev_attr;
+ CHECK(::ibv_query_device(i->verbs, &dev_attr));
+
+ ::ibv_qp_init_attr qp_attr = {};
+
+ // TODO: make a default struct for this
+ qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES;
+ qp_attr.cap.max_send_sge = 1;
+ qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES;
+ qp_attr.cap.max_recv_sge = 1;
+
+ qp_attr.send_cq = scq.get();
+ qp_attr.recv_cq = rcq.get();
+ qp_attr.qp_type = IBV_QPT_RC;
+
+ CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr));
+ qp = mkQp(i->qp);
+
+ // Set the qp context to this so we can find ourselves again
+ qp->qp_context = this;
+ }
+
+ QueuePair::~QueuePair() {
+ // Reset back pointer in case someone else has the qp
+ qp->qp_context = 0;
+
+ // Dispose queue pair before we ack events
+ qp.reset();
+
+ if (outstandingSendEvents > 0)
+ ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
+ if (outstandingRecvEvents > 0)
+ ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
+
+ // Deallocate recv buffer memory
+ if (rmr) delete [] static_cast<char*>(rmr->addr);
+
+ // Deallocate recv buffer memory
+ if (smr) delete [] static_cast<char*>(smr->addr);
+
+ // The buffers vectors automatically deletes all the buffers we've allocated
+ }
+
+ // Create buffers to use for writing
+ void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved)
+ {
+ assert(!smr);
+
+ // Round up buffersize to cacheline (64 bytes)
+ int dataLength = (bufferSize+reserved+63) & (~63);
+
+ // Allocate memory block for all receive buffers
+ char* mem = new char [sendBufferCount * dataLength];
+ smr = regMr(pd.get(), mem, sendBufferCount * dataLength, ::IBV_ACCESS_LOCAL_WRITE);
+ sendBuffers.reserve(sendBufferCount);
+ freeBuffers.reserve(sendBufferCount);
+ for (int i = 0; i<sendBufferCount; ++i) {
+ // Allocate xmit buffer
+ sendBuffers.push_back(Buffer(smr->lkey, &mem[i*dataLength], bufferSize, reserved));
+ freeBuffers.push_back(i);
+ }
+ }
+
+ Buffer* QueuePair::getSendBuffer() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock);
+ if (freeBuffers.empty())
+ return 0;
+ int i = freeBuffers.back();
+ freeBuffers.pop_back();
+ assert(i >= 0 && i < int(sendBuffers.size()));
+ Buffer* b = &sendBuffers[i];
+ b->dataCount(0);
+ return b;
+ }
+
+ void QueuePair::returnSendBuffer(Buffer* b) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock);
+ int i = b - &sendBuffers[0];
+ assert(i >= 0 && i < int(sendBuffers.size()));
+ freeBuffers.push_back(i);
+ }
+
+ void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize)
+ {
+ assert(!rmr);
+
+ // Round up buffersize to cacheline (64 bytes)
+ bufferSize = (bufferSize+63) & (~63);
+
+ // Allocate memory block for all receive buffers
+ char* mem = new char [recvBufferCount * bufferSize];
+ rmr = regMr(pd.get(), mem, recvBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+ recvBuffers.reserve(recvBufferCount);
+ for (int i = 0; i<recvBufferCount; ++i) {
+ // Allocate recv buffer
+ recvBuffers.push_back(Buffer(rmr->lkey, &mem[i*bufferSize], bufferSize));
+ postRecv(&recvBuffers[i]);
+ }
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void QueuePair::nonblocking() {
+ ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ QueuePair::intrusive_ptr QueuePair::getNextChannelEvent() {
+ // First find out which cq has the event
+ ::ibv_cq* cq;
+ void* ctx;
+ int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
+ if (rc == -1 && errno == EAGAIN)
+ return 0;
+ CHECK(rc);
+
+ // Batch acknowledge the event
+ if (cq == scq.get()) {
+ if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingSendEvents);
+ outstandingSendEvents = 0;
+ }
+ } else if (cq == rcq.get()) {
+ if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingRecvEvents);
+ outstandingRecvEvents = 0;
+ }
+ }
+
+ return static_cast<QueuePair*>(ctx);
+ }
+
+ QueuePairEvent QueuePair::getNextEvent() {
+ ::ibv_wc w;
+ if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, scq, SEND);
+ else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, rcq, RECV);
+ else
+ return QueuePairEvent();
+ }
+
+ void QueuePair::notifyRecv() {
+ CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
+ }
+
+ void QueuePair::notifySend() {
+ CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
+ }
+
+ void QueuePair::postRecv(Buffer* buf) {
+ ::ibv_recv_wr rwr = {};
+
+ rwr.wr_id = reinterpret_cast<uint64_t>(buf);
+ // We are given the whole buffer
+ buf->dataCount(buf->byteCount());
+ rwr.sg_list = &buf->sge;
+ rwr.num_sge = 1;
+
+ ::ibv_recv_wr* badrwr = 0;
+ CHECK(::ibv_post_recv(qp.get(), &rwr, &badrwr));
+ if (badrwr)
+ throw std::logic_error("ibv_post_recv(): Bad rwr");
+ }
+
+ void QueuePair::postSend(Buffer* buf) {
+ ::ibv_send_wr swr = {};
+
+ swr.wr_id = reinterpret_cast<uint64_t>(buf);
+ swr.opcode = IBV_WR_SEND;
+ swr.send_flags = IBV_SEND_SIGNALED;
+ swr.sg_list = &buf->sge;
+ swr.num_sge = 1;
+
+ ::ibv_send_wr* badswr = 0;
+ CHECK(::ibv_post_send(qp.get(), &swr, &badswr));
+ if (badswr)
+ throw std::logic_error("ibv_post_send(): Bad swr");
+ }
+
+ void QueuePair::postSend(uint32_t imm, Buffer* buf) {
+ ::ibv_send_wr swr = {};
+
+ swr.wr_id = reinterpret_cast<uint64_t>(buf);
+ swr.imm_data = htonl(imm);
+ swr.opcode = IBV_WR_SEND_WITH_IMM;
+ swr.send_flags = IBV_SEND_SIGNALED;
+ swr.sg_list = &buf->sge;
+ swr.num_sge = 1;
+
+ ::ibv_send_wr* badswr = 0;
+ CHECK(::ibv_post_send(qp.get(), &swr, &badswr));
+ if (badswr)
+ throw std::logic_error("ibv_post_send(): Bad swr");
+ }
+
+ ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
+ id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
+ Connection::find(e->id) : new Connection(e->id)),
+ listen_id(Connection::find(e->listen_id)),
+ event(mkEvent(e))
+ {}
+
+ ConnectionEvent::operator bool() const {
+ return event;
+ }
+
+ ::rdma_cm_event_type ConnectionEvent::getEventType() const {
+ return event->event;
+ }
+
+ ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
+ // It's badly documented, but it seems from the librdma source code that all the following
+ // event types have a valid param.conn
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ case RDMA_CM_EVENT_ESTABLISHED:
+ case RDMA_CM_EVENT_REJECTED:
+ case RDMA_CM_EVENT_DISCONNECTED:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ return event->param.conn;
+ default:
+ ::rdma_conn_param p = {};
+ return p;
+ }
+ }
+
+ boost::intrusive_ptr<Connection> ConnectionEvent::getConnection () const {
+ return id;
+ }
+
+ boost::intrusive_ptr<Connection> ConnectionEvent::getListenId() const {
+ return listen_id;
+ }
+
+ // Wrap the passed in rdma_cm_id with a Connection
+ // this basically happens only on connection request
+ Connection::Connection(::rdma_cm_id* i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ id(mkId(i)),
+ context(0)
+ {
+ impl->fd = id->channel->fd;
+
+ // Just overwrite the previous context as it will
+ // have come from the listening connection
+ if (i)
+ i->context = this;
+ }
+
+ Connection::Connection() :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ channel(mkEChannel()),
+ id(mkId(channel.get(), this, RDMA_PS_TCP)),
+ context(0)
+ {
+ impl->fd = channel->fd;
+ }
+
+ Connection::~Connection() {
+ // Reset the id context in case someone else has it
+ id->context = 0;
+ }
+
+ void Connection::ensureQueuePair() {
+ assert(id.get());
+
+ // Only allocate a queue pair if there isn't one already
+ if (qp)
+ return;
+
+ qp = new QueuePair(id);
+ }
+
+ Connection::intrusive_ptr Connection::make() {
+ return new Connection();
+ }
+
+ Connection::intrusive_ptr Connection::find(::rdma_cm_id* i) {
+ if (!i)
+ return 0;
+ Connection* id = static_cast< Connection* >(i->context);
+ if (!id)
+ throw std::logic_error("Couldn't find existing Connection");
+ return id;
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void Connection::nonblocking() {
+ assert(id.get());
+ ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ ConnectionEvent Connection::getNextEvent() {
+ assert(id.get());
+ ::rdma_cm_event* e;
+ int rc = ::rdma_get_cm_event(id->channel, &e);
+ if (GETERR(rc) == EAGAIN)
+ return ConnectionEvent();
+ CHECK(rc);
+ return ConnectionEvent(e);
+ }
+
+ void Connection::bind(const qpid::sys::SocketAddress& src_addr) const {
+ assert(id.get());
+ CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr));
+ }
+
+ void Connection::listen(int backlog) const {
+ assert(id.get());
+ CHECK(::rdma_listen(id.get(), backlog));
+ }
+
+ void Connection::resolve_addr(
+ const qpid::sys::SocketAddress& dst_addr,
+ int timeout_ms) const
+ {
+ assert(id.get());
+ CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms));
+ }
+
+ void Connection::resolve_route(int timeout_ms) const {
+ assert(id.get());
+ CHECK(::rdma_resolve_route(id.get(), timeout_ms));
+ }
+
+ void Connection::disconnect() const {
+ assert(id.get());
+ int rc = ::rdma_disconnect(id.get());
+ // iWarp doesn't let you disconnect a disconnected connection
+ // but Infiniband can do so it's okay to call rdma_disconnect()
+ // in response to a disconnect event, but we may get an error
+ if (GETERR(rc) == EINVAL)
+ return;
+ CHECK(rc);
+ }
+
+ // TODO: Currently you can only connect with the default connection parameters
+ void Connection::connect(const void* data, size_t len) {
+ assert(id.get());
+ // Need to have a queue pair before we can connect
+ ensureQueuePair();
+
+ ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+ p.private_data = data;
+ p.private_data_len = len;
+ CHECK(::rdma_connect(id.get(), &p));
+ }
+
+ void Connection::connect() {
+ connect(0, 0);
+ }
+
+ void Connection::accept(const ::rdma_conn_param& param, const void* data, size_t len) {
+ assert(id.get());
+ // Need to have a queue pair before we can accept
+ ensureQueuePair();
+
+ ::rdma_conn_param p = param;
+ p.private_data = data;
+ p.private_data_len = len;
+ CHECK(::rdma_accept(id.get(), &p));
+ }
+
+ void Connection::accept(const ::rdma_conn_param& param) {
+ accept(param, 0, 0);
+ }
+
+ void Connection::reject(const void* data, size_t len) const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), data, len));
+ }
+
+ void Connection::reject() const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), 0, 0));
+ }
+
+ QueuePair::intrusive_ptr Connection::getQueuePair() {
+ assert(id.get());
+
+ ensureQueuePair();
+
+ return qp;
+ }
+
+ std::string Connection::getLocalName() const {
+ ::sockaddr* addr = ::rdma_get_local_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
+
+ std::string Connection::getPeerName() const {
+ ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
+}
+
+std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
+# define CHECK_TYPE(t) case t: o << #t; break;
+ switch(t) {
+ CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
+ CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
+ CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
+ CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
+ CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
+ CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
+ CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
+ CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
+ CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
+ default:
+ o << "UNKNOWN_EVENT";
+ }
+# undef CHECK_TYPE
+ return o;
+}
diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
new file mode 100644
index 0000000000..8e3429027b
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -0,0 +1,287 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef RDMA_WRAP_H
+#define RDMA_WRAP_H
+
+#include <rdma/rdma_cma.h>
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
+
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/ptr_container/ptr_deque.hpp>
+
+#include <vector>
+
+namespace qpid {
+namespace sys {
+ class SocketAddress;
+}}
+
+namespace Rdma {
+ const int DEFAULT_TIMEOUT = 2000; // 2 secs
+ const int DEFAULT_BACKLOG = 100;
+ const int DEFAULT_CQ_ENTRIES = 256;
+ const int DEFAULT_WR_ENTRIES = 64;
+ extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM;
+
+ int deviceCount();
+
+ struct Buffer {
+ friend class QueuePair;
+ friend class QueuePairEvent;
+
+ char* bytes() const;
+ int32_t byteCount() const;
+ int32_t dataCount() const;
+ void dataCount(int32_t);
+
+ private:
+ Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0);
+ int32_t bufferSize;
+ int32_t reserved; // for framing header
+ ::ibv_sge sge;
+ };
+
+ inline char* Buffer::bytes() const {
+ return (char*) sge.addr;
+ }
+
+ /** return the number of bytes available for application data */
+ inline int32_t Buffer::byteCount() const {
+ return bufferSize - reserved;
+ }
+
+ inline int32_t Buffer::dataCount() const {
+ return sge.length;
+ }
+
+ inline void Buffer::dataCount(int32_t s) {
+ // catch any attempt to overflow a buffer
+ assert(s <= bufferSize + reserved);
+ sge.length = s;
+ }
+
+ class Connection;
+
+ enum QueueDirection {
+ NONE,
+ SEND,
+ RECV
+ };
+
+ class QueuePairEvent {
+ boost::shared_ptr< ::ibv_cq > cq;
+ ::ibv_wc wc;
+ QueueDirection dir;
+
+ friend class QueuePair;
+
+ QueuePairEvent();
+ QueuePairEvent(
+ const ::ibv_wc& w,
+ boost::shared_ptr< ::ibv_cq > c,
+ QueueDirection d);
+
+ public:
+ operator bool() const;
+ bool immPresent() const;
+ uint32_t getImm() const;
+ QueueDirection getDirection() const;
+ ::ibv_wc_opcode getEventType() const;
+ ::ibv_wc_status getEventStatus() const;
+ Buffer* getBuffer() const;
+ };
+
+ // Wrapper for a queue pair - this has the functionality for
+ // putting buffers on the receive queue and for sending buffers
+ // to the other end of the connection.
+ class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
+ friend class Connection;
+
+ boost::shared_ptr< ::ibv_pd > pd;
+ boost::shared_ptr< ::ibv_mr > smr;
+ boost::shared_ptr< ::ibv_mr > rmr;
+ boost::shared_ptr< ::ibv_comp_channel > cchannel;
+ boost::shared_ptr< ::ibv_cq > scq;
+ boost::shared_ptr< ::ibv_cq > rcq;
+ boost::shared_ptr< ::ibv_qp > qp;
+ int outstandingSendEvents;
+ int outstandingRecvEvents;
+ std::vector<Buffer> sendBuffers;
+ std::vector<Buffer> recvBuffers;
+ qpid::sys::Mutex bufferLock;
+ std::vector<int> freeBuffers;
+
+ QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
+ ~QueuePair();
+
+ public:
+ typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
+
+ // Create a buffers to use for writing
+ void createSendBuffers(int sendBufferCount, int dataSize, int headerSize);
+
+ // Get a send buffer
+ Buffer* getSendBuffer();
+
+ // Return buffer to pool after use
+ void returnSendBuffer(Buffer* b);
+
+ // Create and post recv buffers
+ void allocateRecvBuffers(int recvBufferCount, int bufferSize);
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void nonblocking();
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ QueuePair::intrusive_ptr getNextChannelEvent();
+
+ QueuePairEvent getNextEvent();
+
+ void postRecv(Buffer* buf);
+ void postSend(Buffer* buf);
+ void postSend(uint32_t imm, Buffer* buf);
+ void notifyRecv();
+ void notifySend();
+ };
+
+ class ConnectionEvent {
+ friend class Connection;
+
+ // The order of the members is important as we have to acknowledge
+ // the event before destroying the ids on destruction
+ boost::intrusive_ptr<Connection> id;
+ boost::intrusive_ptr<Connection> listen_id;
+ boost::shared_ptr< ::rdma_cm_event > event;
+
+ ConnectionEvent() {}
+ ConnectionEvent(::rdma_cm_event* e);
+
+ // Default copy, assignment and destructor ok
+ public:
+ operator bool() const;
+ ::rdma_cm_event_type getEventType() const;
+ ::rdma_conn_param getConnectionParam() const;
+ boost::intrusive_ptr<Connection> getConnection () const;
+ boost::intrusive_ptr<Connection> getListenId() const;
+ };
+
+ // For the moment this is a fairly simple wrapper for rdma_cm_id.
+ //
+ // NB: It allocates a protection domain (pd) per connection which means that
+ // registered buffers can't be shared between different connections
+ // (this can only happen between connections on the same controller in any case,
+ // so needs careful management if used)
+ class Connection : public qpid::sys::IOHandle, public qpid::RefCounted {
+ boost::shared_ptr< ::rdma_event_channel > channel;
+ boost::shared_ptr< ::rdma_cm_id > id;
+ QueuePair::intrusive_ptr qp;
+
+ void* context;
+
+ friend class ConnectionEvent;
+ friend class QueuePair;
+
+ // Wrap the passed in rdma_cm_id with a Connection
+ // this basically happens only on connection request
+ Connection(::rdma_cm_id* i);
+ Connection();
+ ~Connection();
+
+ void ensureQueuePair();
+
+ public:
+ typedef boost::intrusive_ptr<Connection> intrusive_ptr;
+
+ static intrusive_ptr make();
+ static intrusive_ptr find(::rdma_cm_id* i);
+
+ template <typename T>
+ void addContext(T* c) {
+ // Don't allow replacing context
+ if (!context)
+ context = c;
+ }
+
+ void removeContext() {
+ context = 0;
+ }
+
+ template <typename T>
+ T* getContext() {
+ return static_cast<T*>(context);
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void nonblocking();
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ ConnectionEvent getNextEvent();
+
+ void bind(const qpid::sys::SocketAddress& src_addr) const;
+ void listen(int backlog = DEFAULT_BACKLOG) const;
+ void resolve_addr(
+ const qpid::sys::SocketAddress& dst_addr,
+ int timeout_ms = DEFAULT_TIMEOUT) const;
+ void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const;
+ void disconnect() const;
+
+ // TODO: Currently you can only connect with the default connection parameters
+ void connect(const void* data, size_t len);
+ void connect();
+ template <typename T>
+ void connect(const T* data) {
+ connect(data, sizeof(T));
+ }
+
+ // TODO: Not sure how to default accept params - they come from the connection request
+ // event
+ void accept(const ::rdma_conn_param& param, const void* data, size_t len);
+ void accept(const ::rdma_conn_param& param);
+ template <typename T>
+ void accept(const ::rdma_conn_param& param, const T* data) {
+ accept(param, data, sizeof(T));
+ }
+
+ void reject(const void* data, size_t len) const;
+ void reject() const;
+ template <typename T>
+ void reject(const T* data) const {
+ reject(data, sizeof(T));
+ }
+
+ QueuePair::intrusive_ptr getQueuePair();
+ std::string getLocalName() const;
+ std::string getPeerName() const;
+ std::string getFullName() const { return getLocalName()+"-"+getPeerName(); }
+ };
+}
+
+std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t);
+
+#endif // RDMA_WRAP_H