summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-04-28 04:41:46 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-04-28 04:41:46 +0000
commit9f153bc328112ed2ee25a801eff1f6a277c7bb19 (patch)
treeacd13eebcfe1a3ee196ab229741ce6a20e9eb27c /cpp/src
parenta301f95243dd1cd367a0a8d041c1168b8adc1e86 (diff)
downloadqpid-python-9f153bc328112ed2ee25a801eff1f6a277c7bb19.tar.gz
Work In Progress:
Added initial rdma code including test server and client Turn off rdma support by default but autoconf should now detect whether necessary rdma/ibverbs libs and headers are present git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652053 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am33
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaClient.cpp178
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp351
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h128
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaServer.cpp142
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_exception.h45
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_factories.cpp39
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_factories.h48
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.h550
9 files changed, 1514 insertions, 0 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index a64f70abd8..1e31ac60fd 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -91,6 +91,39 @@ noinst_LTLIBRARIES=libLogger.la # libqpidamqp_0_10.la
libLogger_la_SOURCES=qpid/log/Logger.cpp qpid/log/Logger.h
libLogger_la_CXXFLAGS=$(AM_CXXFLAGS) -Wno-unused-parameter
+if RDMA
+
+# RDMA (Infiniband) protocol code
+libqpidrdma_la_SOURCES = \
+ qpid/sys/rdma/rdma_exception.h \
+ qpid/sys/rdma/rdma_factories.cpp \
+ qpid/sys/rdma/RdmaIO.cpp \
+ qpid/sys/rdma/RdmaIO.h \
+ qpid/sys/rdma/rdma_wrap.h
+libqpidrdma_la_LIBADD = \
+ -lrdmacm \
+ -libverbs
+libqpidrdma_la_CXXFLAGS = \
+ $(AM_CXXFLAGS) -Wno-missing-field-initializers
+noinst_LTLIBRARIES += \
+ libqpidrdma.la
+qpidd_LDADD += \
+ libqpidrdma.la
+
+noinst_PROGRAMS += RdmaServer RdmaClient
+RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp
+RdmaServer_CXXFLAGS = \
+ $(AM_CXXFLAGS) -Wno-missing-field-initializers
+RdmaServer_LDADD = \
+ libqpidrdma.la libqpidcommon.la
+RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp
+RdmaClient_CXXFLAGS = \
+ $(AM_CXXFLAGS) -Wno-missing-field-initializers
+RdmaClient_LDADD = \
+ libqpidrdma.la libqpidcommon.la
+
+endif
+
# New 0-10 codec, to be integrated in future.
# libqpidamqp_0_10_la_SOURCES=
EXTRA_DIST+=\
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
new file mode 100644
index 0000000000..7c2d9de505
--- /dev/null
+++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -0,0 +1,178 @@
+#include "RdmaIO.h"
+#include "qpid/sys/Time.h"
+
+#include <netdb.h>
+#include <arpa/inet.h>
+
+#include <vector>
+#include <string>
+#include <iostream>
+#include <algorithm>
+#include <cmath>
+#include <boost/bind.hpp>
+
+using std::vector;
+using std::string;
+using std::cout;
+using std::cerr;
+using std::copy;
+using std::rand;
+
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_SEC;
+using qpid::sys::TIME_INFINITE;
+
+// count of messages
+int64_t smsgs = 0;
+int64_t sbytes = 0;
+int64_t rmsgs = 0;
+int64_t rbytes = 0;
+
+int outstandingwrites = 0;
+
+int target = 1000000;
+int msgsize = 200;
+AbsTime startTime;
+Duration sendingDuration(TIME_INFINITE);
+Duration fullTestDuration(TIME_INFINITE);
+
+vector<char> testString;
+
+void write(Rdma::AsynchIO& aio) {
+ //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
+ while (smsgs < target && outstandingwrites < (3*Rdma::DEFAULT_WR_ENTRIES/4)) {
+ Rdma::Buffer* b = aio.getBuffer();
+ std::copy(testString.begin(), testString.end(), b->bytes);
+ b->dataCount = msgsize;
+ aio.queueWrite(b);
+ ++outstandingwrites;
+ ++smsgs;
+ sbytes += b->byteCount;
+ }
+ //}
+}
+
+void dataError(Rdma::AsynchIO&) {
+ cout << "Data error:\n";
+}
+
+void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
+ ++rmsgs;
+ rbytes += b->byteCount;
+
+ // When all messages have been recvd stop
+ if (rmsgs < target) {
+ write(aio);
+ return;
+ }
+
+ fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
+ if (outstandingwrites == 0)
+ p->shutdown();
+}
+
+void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
+ --outstandingwrites;
+ if (smsgs < target) {
+ write(aio);
+ return;
+ }
+
+ sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
+ if (smsgs >= target && rmsgs >= target && outstandingwrites == 0)
+ p->shutdown();
+}
+
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
+ cout << "Connected\n";
+ Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
+
+ Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
+ boost::bind(&data, poller, _1, _2),
+ boost::bind(&idle, poller, _1),
+ dataError);
+
+ startTime = AbsTime::now();
+ write(*aio);
+
+ aio->start(poller);
+}
+
+void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+ cout << "Disconnected\n";
+ p->shutdown();
+}
+
+void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+ cout << "Connection error\n";
+ p->shutdown();
+}
+
+void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+ cout << "Connection rejected\n";
+ p->shutdown();
+}
+
+int main(int argc, char* argv[]) {
+ vector<string> args(&argv[0], &argv[argc]);
+
+ ::addrinfo *res;
+ ::addrinfo hints = {};
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ string port = (args.size() < 3) ? "20079" : args[2];
+ int n = ::getaddrinfo(args[1].c_str(), port.c_str(), &hints, &res);
+ if (n<0) {
+ cerr << "Can't find information for: " << args[1] << "\n";
+ return 1;
+ } else {
+ cout << "Connecting to: " << args[1] << ":" << port <<"\n";
+ }
+
+ if (args.size() > 3)
+ msgsize = atoi(args[3].c_str());
+ cout << "Message size: " << msgsize << "\n";
+
+ // Make a random message of that size
+ testString.resize(msgsize);
+ for (int i = 0; i < msgsize; ++i) {
+ testString[i] = 32 + rand() & 0x3f;
+ }
+
+ try {
+ boost::shared_ptr<Poller> p(new Poller());
+ Dispatcher d(p);
+
+ Rdma::Connector c(
+ *res->ai_addr,
+ boost::bind(&connected, p, _1),
+ boost::bind(&connectionError, p, _1),
+ boost::bind(&disconnected, p, _1),
+ boost::bind(&rejected, p, _1));
+
+ c.start(p);
+ d.run();
+ } catch (Rdma::Exception& e) {
+ int err = e.getError();
+ cerr << "Error: " << e.what() << "(" << err << ")\n";
+ }
+
+ cout
+ << "Sent: " << smsgs
+ << "msgs (" << sbytes
+ << "bytes) in: " << double(sendingDuration)/TIME_SEC
+ << "s: " << double(smsgs)*TIME_SEC/sendingDuration
+ << "msgs/s(" << double(sbytes)*TIME_SEC/sendingDuration
+ << "bytes/s)\n";
+ cout
+ << "Recd: " << rmsgs
+ << "msgs (" << rbytes
+ << "bytes) in: " << double(fullTestDuration)/TIME_SEC
+ << "s: " << double(rmsgs)*TIME_SEC/fullTestDuration
+ << "msgs/s(" << double(rbytes)*TIME_SEC/fullTestDuration
+ << "bytes/s)\n";
+
+}
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
new file mode 100644
index 0000000000..31d109ea4d
--- /dev/null
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -0,0 +1,351 @@
+#include "RdmaIO.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+
+namespace Rdma {
+ AsynchIO::AsynchIO(
+ QueuePair::intrusive_ptr q,
+ int s,
+ ReadCallback rc,
+ IdleCallback ic,
+ ErrorCallback ec
+ ) :
+ qp(q),
+ dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
+ bufferSize(s),
+ recvBufferCount(DEFAULT_WR_ENTRIES),
+ readCallback(rc),
+ idleCallback(ic),
+ errorCallback(ec)
+ {
+ qp->nonblocking();
+ qp->notifyRecv();
+ qp->notifySend();
+
+ // Prepost some recv buffers before we go any further
+ for (int i = 0; i<recvBufferCount; ++i) {
+ Buffer* b = qp->createBuffer(bufferSize);
+ buffers.push_front(b);
+ b->dataCount = b->byteCount;
+ qp->postRecv(b);
+ }
+ }
+
+ AsynchIO::~AsynchIO() {
+ // The buffers ptr_deque automatically deletes all the buffers we've allocated
+ }
+
+ void AsynchIO::start(Poller::shared_ptr poller) {
+ dataHandle.startWatch(poller);
+ }
+
+ void AsynchIO::queueReadBuffer(Buffer*) {
+ }
+
+ void AsynchIO::queueWrite(Buffer* buff) {
+ qp->postSend(buff);
+ }
+
+ void AsynchIO::notifyPendingWrite() {
+ }
+
+ void AsynchIO::queueWriteClose() {
+ }
+
+ Buffer* AsynchIO::getBuffer() {
+ if (bufferQueue.empty()) {
+ Buffer* b = qp->createBuffer(bufferSize);
+ buffers.push_front(b);
+ b->dataCount = 0;
+ return b;
+ } else {
+ Buffer* b = bufferQueue.front();
+ bufferQueue.pop_front();
+ b->dataCount = 0;
+ b->dataStart = 0;
+ return b;
+ }
+
+ }
+
+ void AsynchIO::dataEvent(DispatchHandle&) {
+ QueuePair::intrusive_ptr q = qp->getNextChannelEvent();
+
+ // If no event do nothing
+ if (!q)
+ return;
+
+ assert(q == qp);
+
+ // Re-enable notification for queue
+ qp->notifySend();
+ qp->notifyRecv();
+
+ // Repeat until no more events
+ do {
+ QueuePairEvent e(qp->getNextEvent());
+ if (!e)
+ return;
+
+ ::ibv_wc_status status = e.getEventStatus();
+ if (status != IBV_WC_SUCCESS) {
+ errorCallback(*this);
+ return;
+ }
+
+ // Test if recv (or recv with imm)
+ //::ibv_wc_opcode eventType = e.getEventType();
+ Buffer* b = e.getBuffer();
+ QueueDirection dir = e.getDirection();
+ if (dir == RECV) {
+ readCallback(*this, b);
+ // At this point the buffer has been consumed so put it back on the recv queue
+ qp->postRecv(b);
+ } else {
+ bufferQueue.push_front(b);
+ idleCallback(*this);
+ }
+ } while (true);
+ }
+
+ Listener::Listener(
+ const sockaddr& src,
+ ConnectedCallback cc,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ ConnectionRequestCallback crc
+ ) :
+ src_addr(src),
+ ci(Connection::make()),
+ handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0),
+ connectedCallback(cc),
+ errorCallback(errc),
+ disconnectedCallback(dc),
+ connectionRequestCallback(crc),
+ state(IDLE)
+ {
+ ci->nonblocking();
+ }
+
+ void Listener::start(Poller::shared_ptr poller) {
+ ci->bind(src_addr);
+ ci->listen();
+ state = LISTENING;
+ handle.startWatch(poller);
+ }
+
+ void Listener::connectionEvent(DispatchHandle&) {
+ ConnectionEvent e(ci->getNextEvent());
+
+ // If (for whatever reason) there was no event do nothing
+ if (!e)
+ return;
+
+ // Important documentation ommision the new rdma_cm_id
+ // you get from CONNECT_REQUEST has the same context info
+ // as its parent listening rdma_cm_id
+ ::rdma_cm_event_type eventType = e.getEventType();
+ Rdma::Connection::intrusive_ptr id = e.getConnection();
+
+ switch (eventType) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST: {
+ bool accept = true;
+ // Extract connection parameters and private data from event
+ ::rdma_conn_param conn_param = e.getConnectionParam();
+
+ if (connectionRequestCallback)
+ //TODO: pass private data to callback (and accept new private data for accept somehow)
+ accept = connectionRequestCallback(id);
+ if (accept) {
+ // Accept connection
+ id->accept(conn_param);
+ } else {
+ //Reject connection
+ id->reject();
+ }
+
+ break;
+ }
+ case RDMA_CM_EVENT_ESTABLISHED:
+ connectedCallback(id);
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ disconnectedCallback(id);
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ errorCallback(id);
+ break;
+ default:
+ std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
+ }
+ }
+
+ Connector::Connector(
+ const sockaddr& dst,
+ ConnectedCallback cc,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ RejectedCallback rc
+ ) :
+ dst_addr(dst),
+ ci(Connection::make()),
+ handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0),
+ connectedCallback(cc),
+ errorCallback(errc),
+ disconnectedCallback(dc),
+ rejectedCallback(rc),
+ state(IDLE)
+ {
+ ci->nonblocking();
+ }
+
+ void Connector::start(Poller::shared_ptr poller) {
+ ci->resolve_addr(dst_addr);
+ state = RESOLVE_ADDR;
+ handle.startWatch(poller);
+ }
+
+ void Connector::connectionEvent(DispatchHandle&) {
+ ConnectionEvent e(ci->getNextEvent());
+
+ // If (for whatever reason) there was no event do nothing
+ if (!e)
+ return;
+
+ ::rdma_cm_event_type eventType = e.getEventType();
+#if 1
+ switch (eventType) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ // RESOLVE_ADDR
+ state = RESOLVE_ROUTE;
+ ci->resolve_route();
+ break;
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ // RESOLVE_ADDR
+ state = ERROR;
+ errorCallback(ci);
+ break;
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ // RESOLVE_ROUTE:
+ state = CONNECTING;
+ ci->connect();
+ break;
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ // RESOLVE_ROUTE:
+ state = ERROR;
+ errorCallback(ci);
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ // CONNECTING
+ state = ERROR;
+ errorCallback(ci);
+ break;
+ case RDMA_CM_EVENT_UNREACHABLE:
+ // CONNECTING
+ state = ERROR;
+ errorCallback(ci);
+ break;
+ case RDMA_CM_EVENT_REJECTED:
+ // CONNECTING
+ state = REJECTED;
+ rejectedCallback(ci);
+ break;
+ case RDMA_CM_EVENT_ESTABLISHED:
+ // CONNECTING
+ state = ESTABLISHED;
+ connectedCallback(ci);
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ // ESTABLISHED
+ state = DISCONNECTED;
+ disconnectedCallback(ci);
+ break;
+ default:
+ std::cerr << "Warning: unexpected event in " << state << " state - " << eventType << "\n";
+ state = ERROR;
+ }
+#else
+ switch (state) {
+ case IDLE:
+ std::cerr << "Warning: event in IDLE state\n";
+ break;
+ case RESOLVE_ADDR:
+ switch (eventType) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ state = RESOLVE_ROUTE;
+ ci->resolve_route();
+ break;
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ state = ERROR;
+ errorCallback(ci);
+ break;
+ default:
+ state = ERROR;
+ std::cerr << "Warning: unexpected response to resolve_addr - " << eventType << "\n";
+ }
+ break;
+ case RESOLVE_ROUTE:
+ switch (eventType) {
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ state = CONNECTING;
+ ci->connect();
+ break;
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ state = ERROR;
+ errorCallback(ci);
+ break;
+ default:
+ state = ERROR;
+ std::cerr << "Warning: unexpected response to resolve_route - " << eventType << "\n";
+ }
+ break;
+ case CONNECTING:
+ switch (eventType) {
+ case RDMA_CM_EVENT_CONNECT_RESPONSE:
+ std::cerr << "connect_response\n";
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ state = ERROR;
+ errorCallback(ci);
+ break;
+ case RDMA_CM_EVENT_UNREACHABLE:
+ state = ERROR;
+ errorCallback(ci);
+ break;
+ case RDMA_CM_EVENT_REJECTED:
+ state = REJECTED;
+ rejectedCallback(ci);
+ break;
+ case RDMA_CM_EVENT_ESTABLISHED:
+ state = ESTABLISHED;
+ connectedCallback(ci);
+ break;
+ default:
+ state = ERROR;
+ std::cerr << "Warning: unexpected response to connect - " << eventType << "\n";
+ }
+ break;
+ case ESTABLISHED:
+ switch (eventType) {
+ case RDMA_CM_EVENT_DISCONNECTED:
+ disconnectedCallback(ci);
+ break;
+ default:
+ std::cerr << "Warning: unexpected event in ESTABLISHED state - " << eventType << "\n";
+ }
+ break;
+ case REJECTED:
+ std::cerr << "Warning: event in REJECTED state - " << eventType << "\n";
+ break;
+ case ERROR:
+ std::cerr << "Warning: event in ERROR state - " << eventType << "\n";
+ break;
+ case LISTENING:
+ case ACCEPTING:
+ std::cerr << "Warning: in an illegal state (and received event!) - " << eventType << "\n";
+ break;
+ }
+#endif
+ }
+}
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
new file mode 100644
index 0000000000..efa0ee7097
--- /dev/null
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -0,0 +1,128 @@
+#ifndef Rdma_Acceptor_h
+#define Rdma_Acceptor_h
+
+#include "rdma_wrap.h"
+
+#include "qpid/sys/Dispatcher.h"
+
+#include <netinet/in.h>
+
+#include <boost/function.hpp>
+#include <boost/ptr_container/ptr_deque.hpp>
+#include <deque>
+
+using qpid::sys::DispatchHandle;
+using qpid::sys::Poller;
+
+namespace Rdma {
+
+ class Connection;
+ enum ConnectionState {
+ IDLE,
+ RESOLVE_ADDR,
+ RESOLVE_ROUTE,
+ LISTENING,
+ CONNECTING,
+ ACCEPTING,
+ ESTABLISHED,
+ REJECTED,
+ DISCONNECTED,
+ ERROR
+ };
+
+ typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback;
+ typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback;
+ typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback;
+ typedef boost::function1<bool, Rdma::Connection::intrusive_ptr&> ConnectionRequestCallback;
+ typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> RejectedCallback;
+
+ class AsynchIO
+ {
+ typedef boost::function1<void, AsynchIO&> ErrorCallback;
+ typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
+ typedef boost::function1<void, AsynchIO&> IdleCallback;
+
+ QueuePair::intrusive_ptr qp;
+ DispatchHandle dataHandle;
+ int bufferSize;
+ int recvBufferCount;
+ std::deque<Buffer*> bufferQueue;
+ boost::ptr_deque<Buffer> buffers;
+
+ ReadCallback readCallback;
+ IdleCallback idleCallback;
+ ErrorCallback errorCallback;
+
+ public:
+ AsynchIO(
+ QueuePair::intrusive_ptr q,
+ int s,
+ ReadCallback rc,
+ IdleCallback ic,
+ ErrorCallback ec
+ );
+ ~AsynchIO();
+
+ void start(Poller::shared_ptr poller);
+ void queueReadBuffer(Buffer* buff);
+ void queueWrite(Buffer* buff);
+ void notifyPendingWrite();
+ void queueWriteClose();
+ Buffer* getBuffer();
+
+ private:
+ void dataEvent(DispatchHandle& handle);
+ };
+
+ class Listener
+ {
+ sockaddr src_addr;
+ Connection::intrusive_ptr ci;
+ DispatchHandle handle;
+ ConnectedCallback connectedCallback;
+ ErrorCallback errorCallback;
+ DisconnectedCallback disconnectedCallback;
+ ConnectionRequestCallback connectionRequestCallback;
+ ConnectionState state;
+
+ public:
+ Listener(
+ const sockaddr& src,
+ ConnectedCallback cc,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ ConnectionRequestCallback crc = 0
+ );
+ void start(Poller::shared_ptr poller);
+
+ private:
+ void connectionEvent(DispatchHandle& handle);
+ };
+
+ class Connector
+ {
+ sockaddr dst_addr;
+ Connection::intrusive_ptr ci;
+ DispatchHandle handle;
+ ConnectedCallback connectedCallback;
+ ErrorCallback errorCallback;
+ DisconnectedCallback disconnectedCallback;
+ RejectedCallback rejectedCallback;
+ ConnectionState state;
+
+ public:
+ Connector(
+ const sockaddr& dst,
+ ConnectedCallback cc,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ RejectedCallback rc = 0
+ );
+ void start(Poller::shared_ptr poller);
+
+ private:
+ void connectionEvent(DispatchHandle& handle);
+ };
+}
+
+#endif // Rdma_Acceptor_h
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
new file mode 100644
index 0000000000..488fe28658
--- /dev/null
+++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -0,0 +1,142 @@
+#include "RdmaIO.h"
+
+#include <arpa/inet.h>
+
+#include <vector>
+#include <queue>
+#include <string>
+#include <iostream>
+
+#include <boost/bind.hpp>
+
+using std::vector;
+using std::queue;
+using std::string;
+using std::cout;
+using std::cerr;
+
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+
+// All the accepted connections
+struct ConRec {
+ Rdma::Connection::intrusive_ptr connection;
+ Rdma::AsynchIO* data;
+ int outstandingWrites;
+ queue<Rdma::Buffer*> queuedWrites;
+
+ ConRec(Rdma::Connection::intrusive_ptr c) :
+ connection(c),
+ outstandingWrites(0)
+ {}
+};
+
+void dataError(Rdma::AsynchIO&) {
+ cout << "Data error:\n";
+}
+
+void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
+ // Echo data back
+ Rdma::Buffer* buf = a.getBuffer();
+ std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes);
+ buf->dataCount = b->dataCount;
+ if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) {
+ a.queueWrite(buf);
+ ++(cr->outstandingWrites);
+ } else {
+ cr->queuedWrites.push(buf);
+ }
+}
+
+void idle(ConRec* cr, Rdma::AsynchIO& a) {
+ --(cr->outstandingWrites);
+ //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4)
+ while (!cr->queuedWrites.empty() && cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) {
+ Rdma::Buffer* buf = cr->queuedWrites.front();
+ cr->queuedWrites.pop();
+ a.queueWrite(buf);
+ ++(cr->outstandingWrites);
+ }
+}
+
+void disconnected(Rdma::Connection::intrusive_ptr& ci) {
+ ConRec* cr = ci->getContext<ConRec>();
+ cr->connection->disconnect();
+ delete cr->data;
+ delete cr;
+ cout << "Disconnected: " << cr << "\n";
+}
+
+void connectionError(Rdma::Connection::intrusive_ptr& ci) {
+ ConRec* cr = ci->getContext<ConRec>();
+ cr->connection->disconnect();
+ if (cr) {
+ delete cr->data;
+ delete cr;
+ }
+ cout << "Connection error: " << cr << "\n";
+}
+
+bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
+ cout << "Incoming connection: ";
+
+ // For fun reject alternate connection attempts
+ static bool x = false;
+ x ^= 1;
+
+ // Must create aio here so as to prepost buffers *before* we accept connection
+ if (x) {
+ ConRec* cr = new ConRec(ci);
+ Rdma::AsynchIO* aio =
+ new Rdma::AsynchIO(ci->getQueuePair(), 8000,
+ boost::bind(data, cr, _1, _2),
+ boost::bind(idle, cr, _1),
+ dataError);
+ ci->addContext(cr);
+ cr->data = aio;
+ cout << "Accept=>" << cr << "\n";
+ } else {
+ cout << "Reject\n";
+ }
+
+ return x;
+}
+
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
+ static int cnt = 0;
+ ConRec* cr = ci->getContext<ConRec>();
+ cout << "Connected: " << cr << "(" << ++cnt << ")\n";
+
+ cr->data->start(poller);
+}
+
+int main(int argc, char* argv[]) {
+ vector<string> args(&argv[0], &argv[argc]);
+
+ ::sockaddr_in sin;
+
+ int port = (args.size() < 2) ? 20079 : atoi(args[1].c_str());
+ cout << "Listening on port: " << port << "\n";
+
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(port);
+ sin.sin_addr.s_addr = INADDR_ANY;
+
+ try {
+ boost::shared_ptr<Poller> p(new Poller());
+ Dispatcher d(p);
+
+ Rdma::Listener a((const sockaddr&)(sin),
+ boost::bind(connected, p, _1),
+ connectionError,
+ disconnected,
+ connectionRequest);
+
+
+ a.start(p);
+ d.run();
+ } catch (Rdma::Exception& e) {
+ int err = e.getError();
+ cerr << "Error: " << e.what() << "(" << err << ")\n";
+ }
+}
diff --git a/cpp/src/qpid/sys/rdma/rdma_exception.h b/cpp/src/qpid/sys/rdma/rdma_exception.h
new file mode 100644
index 0000000000..2773f25917
--- /dev/null
+++ b/cpp/src/qpid/sys/rdma/rdma_exception.h
@@ -0,0 +1,45 @@
+#ifndef RDMA_EXCEPTION_H
+#define RDMA_EXCEPTION_H
+
+#include <exception>
+
+#include <errno.h>
+#include <string.h>
+
+namespace Rdma {
+ static __thread char s[50];
+ class Exception : public std::exception {
+ int err;
+
+ public:
+ Exception(int e) : err(e) {}
+ int getError() { return err; }
+ const char* what() const throw() {
+ return ::strerror_r(err, s, 50);
+ }
+ };
+
+ inline void THROW_ERRNO() {
+ throw Rdma::Exception(errno);
+ }
+
+ inline void CHECK(int rc) {
+ if (rc != 0)
+ throw Rdma::Exception((rc == -1) ? errno : rc >0 ? rc : -rc);
+ }
+
+ inline void CHECK_IBV(int rc) {
+ if (rc != 0)
+ throw Rdma::Exception(rc);
+ }
+
+ template <typename T>
+ inline
+ T* CHECK_NULL(T* rc) {
+ if (rc == 0)
+ THROW_ERRNO();
+ return rc;
+ }
+}
+
+#endif // RDMA_EXCEPTION_H
diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.cpp b/cpp/src/qpid/sys/rdma/rdma_factories.cpp
new file mode 100644
index 0000000000..53dc4f8935
--- /dev/null
+++ b/cpp/src/qpid/sys/rdma/rdma_factories.cpp
@@ -0,0 +1,39 @@
+#include "rdma_factories.h"
+
+namespace Rdma {
+ void acker(::rdma_cm_event* e) throw () {
+ if (e)
+ // Intentionally ignore return value - we can't do anything about it here
+ (void) ::rdma_ack_cm_event(e);
+ }
+
+ void destroyEChannel(::rdma_event_channel* c) throw () {
+ if (c)
+ // Intentionally ignore return value - we can't do anything about it here
+ (void) ::rdma_destroy_event_channel(c);
+ }
+
+ void destroyId(::rdma_cm_id* i) throw () {
+ if (i)
+ // Intentionally ignore return value - we can't do anything about it here
+ (void) ::rdma_destroy_id(i);
+ }
+
+ void deallocPd(::ibv_pd* p) throw () {
+ if (p)
+ // Intentionally ignore return value - we can't do anything about it here
+ (void) ::ibv_dealloc_pd(p);
+ }
+
+ void destroyCChannel(::ibv_comp_channel* c) throw () {
+ if (c)
+ // Intentionally ignore return value - we can't do anything about it here
+ (void) ::ibv_destroy_comp_channel(c);
+ }
+
+ void destroyCq(::ibv_cq* cq) throw () {
+ if (cq)
+ (void) ::ibv_destroy_cq(cq);
+ }
+
+}
diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.h b/cpp/src/qpid/sys/rdma/rdma_factories.h
new file mode 100644
index 0000000000..26a93bb494
--- /dev/null
+++ b/cpp/src/qpid/sys/rdma/rdma_factories.h
@@ -0,0 +1,48 @@
+#ifndef RDMA_FACTORIES_H
+#define RDMA_FACTORIES_H
+
+#include "rdma_exception.h"
+
+#include <rdma/rdma_cma.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace Rdma {
+ // These allow us to use simple shared_ptrs to do ref counting
+ void acker(::rdma_cm_event* e) throw ();
+ void destroyEChannel(::rdma_event_channel* c) throw ();
+ void destroyId(::rdma_cm_id* i) throw ();
+ void deallocPd(::ibv_pd* p) throw ();
+ void destroyCChannel(::ibv_comp_channel* c) throw ();
+ void destroyCq(::ibv_cq* cq) throw ();
+
+ inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
+ return
+ boost::shared_ptr< ::rdma_event_channel >(::rdma_create_event_channel(), destroyEChannel);
+ }
+
+ inline boost::shared_ptr< ::rdma_cm_id >
+ mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) {
+ ::rdma_cm_id* i;
+ CHECK(::rdma_create_id(ec, &i, context, ps));
+ return boost::shared_ptr< ::rdma_cm_id >(i, destroyId);
+ }
+
+ inline boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) {
+ ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c));
+ return boost::shared_ptr< ::ibv_pd >(pd, deallocPd);
+ }
+
+ inline boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) {
+ ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c));
+ return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel);
+ }
+
+ inline boost::shared_ptr< ::ibv_cq >
+ mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) {
+ ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0));
+ return boost::shared_ptr< ::ibv_cq >(cq, destroyCq);
+ }
+}
+
+#endif // RDMA_FACTORIES_H
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h
new file mode 100644
index 0000000000..421c8fc41b
--- /dev/null
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -0,0 +1,550 @@
+#ifndef RDMA_WRAP_H
+#define RDMA_WRAP_H
+
+#include "rdma_factories.h"
+
+#include <rdma/rdma_cma.h>
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <fcntl.h>
+
+#include <vector>
+#include <algorithm>
+#include <iostream>
+#include <stdexcept>
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace Rdma {
+ const int DEFAULT_TIMEOUT = 2000; // 2 secs
+ const int DEFAULT_BACKLOG = 100;
+ const int DEFAULT_CQ_ENTRIES = 256;
+ const int DEFAULT_WR_ENTRIES = 64;
+ const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
+ 0, // .private_data
+ 0, // .private_data_len
+ 4, // .responder_resources
+ 4, // .initiator_depth
+ 0, // .flow_control
+ 5, // .retry_count
+ 7 // .rnr_retry_count
+ };
+
+ struct Buffer {
+ friend class QueuePair;
+
+ char* const bytes;
+ const int32_t byteCount;
+ int32_t dataStart;
+ int32_t dataCount;
+
+ Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
+ bytes(b),
+ byteCount(s),
+ dataStart(0),
+ dataCount(0),
+ mr(CHECK_NULL(::ibv_reg_mr(
+ pd, bytes, byteCount,
+ ::IBV_ACCESS_LOCAL_WRITE)))
+ {}
+
+ ~Buffer() {
+ (void) ::ibv_dereg_mr(mr);
+ delete [] bytes;
+ }
+
+ private:
+ ::ibv_mr* mr;
+ };
+
+ class Connection;
+
+ enum QueueDirection {
+ NONE,
+ SEND,
+ RECV
+ };
+
+ class QueuePairEvent {
+ boost::shared_ptr< ::ibv_cq > cq;
+ ::ibv_wc wc;
+ QueueDirection dir;
+
+ friend class QueuePair;
+
+ QueuePairEvent() :
+ dir(NONE)
+ {}
+
+ QueuePairEvent(
+ const ::ibv_wc& w,
+ boost::shared_ptr< ::ibv_cq > c,
+ QueueDirection d) :
+ cq(c),
+ wc(w),
+ dir(d)
+ {
+ assert(dir != NONE);
+ }
+
+ public:
+ operator bool() const {
+ return dir != NONE;
+ }
+
+ QueueDirection getDirection() const {
+ return dir;
+ }
+
+ ::ibv_wc_opcode getEventType() const {
+ return wc.opcode;
+ }
+
+ ::ibv_wc_status getEventStatus() const {
+ return wc.status;
+ }
+
+ Buffer* getBuffer() const {
+ Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
+ b->dataCount = wc.byte_len;
+ return b;
+ }
+ };
+
+ // Wrapper for a queue pair - this has the functionality for
+ // putting buffers on the receive queue and for sending buffers
+ // to the other end of the connection.
+ //
+ // Currently QueuePairs are contained inside Connections and have no
+ // separate lifetime
+ class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
+ boost::shared_ptr< ::ibv_pd > pd;
+ boost::shared_ptr< ::ibv_comp_channel > cchannel;
+ boost::shared_ptr< ::ibv_cq > scq;
+ boost::shared_ptr< ::ibv_cq > rcq;
+ boost::shared_ptr< ::rdma_cm_id > id;
+ int outstandingSendEvents;
+ int outstandingRecvEvents;
+
+ friend class Connection;
+
+ QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
+ ~QueuePair();
+
+ public:
+ typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
+
+ // Create a buffer to use for writing
+ Buffer* createBuffer(int s) {
+ return new Buffer(pd.get(), new char[s], s);
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void nonblocking() {
+ ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ QueuePair::intrusive_ptr getNextChannelEvent() {
+ // First find out which cq has the event
+ ::ibv_cq* cq;
+ void* ctx;
+ int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
+ if (rc == -1 && errno == EAGAIN)
+ return 0;
+ CHECK(rc);
+
+ // Batch acknowledge the event
+ if (cq == scq.get()) {
+ if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingSendEvents);
+ outstandingSendEvents = 0;
+ }
+ } else if (cq == rcq.get()) {
+ if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingRecvEvents);
+ outstandingRecvEvents = 0;
+ }
+ }
+
+ return static_cast<QueuePair*>(ctx);
+ }
+
+ QueuePairEvent getNextEvent() {
+ ::ibv_wc w;
+ if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, scq, SEND);
+ else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, rcq, RECV);
+ else
+ return QueuePairEvent();
+ }
+
+ void postRecv(Buffer* buf);
+ void postSend(Buffer* buf);
+ void notifyRecv();
+ void notifySend();
+ };
+
+ class ConnectionEvent {
+ friend class Connection;
+
+ // The order of the members is important as we have to acknowledge
+ // the event before destroying the ids on destruction
+ boost::intrusive_ptr<Connection> id;
+ boost::intrusive_ptr<Connection> listen_id;
+ boost::shared_ptr< ::rdma_cm_event > event;
+
+ ConnectionEvent() {}
+ ConnectionEvent(::rdma_cm_event* e);
+
+ // Default copy, assignment and destructor ok
+ public:
+ operator bool() const {
+ return event;
+ }
+
+ ::rdma_cm_event_type getEventType() const {
+ return event->event;
+ }
+
+ ::rdma_conn_param getConnectionParam() const {
+ if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) {
+ return event->param.conn;
+ } else {
+ ::rdma_conn_param p = {};
+ return p;
+ }
+ }
+
+ boost::intrusive_ptr<Connection> getConnection () const {
+ return id;
+ }
+
+ boost::intrusive_ptr<Connection> getListenId() const {
+ return listen_id;
+ }
+ };
+
+ // For the moment this is a fairly simple wrapper for rdma_cm_id.
+ //
+ // NB: It allocates a protection domain (pd) per connection which means that
+ // registered buffers can't be shared between different connections
+ // (this can only happen between connections on the same controller in any case,
+ // so needs careful management if used)
+ class Connection : public qpid::sys::IOHandle, public qpid::RefCounted {
+ boost::shared_ptr< ::rdma_event_channel > channel;
+ boost::shared_ptr< ::rdma_cm_id > id;
+ QueuePair::intrusive_ptr qp;
+
+ void* context;
+
+ friend class ConnectionEvent;
+ friend class QueuePair;
+
+ // Wrap the passed in rdma_cm_id with a Connection
+ // this basically happens only on connection request
+ Connection(::rdma_cm_id* i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ id(i, destroyId),
+ context(0)
+ {
+ impl->fd = id->channel->fd;
+
+ // Just overwrite the previous context as it will
+ // have come from the listening connection
+ if (i)
+ i->context = this;
+ }
+
+ Connection() :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ channel(mkEChannel()),
+ id(mkId(channel.get(), this, RDMA_PS_TCP)),
+ context(0)
+ {
+ impl->fd = channel->fd;
+ }
+
+ // Default destructor fine
+
+ void ensureQueuePair() {
+ assert(id.get());
+
+ // Only allocate a queue pair if there isn't one already
+ if (qp)
+ return;
+
+ qp = new QueuePair(id);
+ }
+
+ public:
+ typedef boost::intrusive_ptr<Connection> intrusive_ptr;
+
+ static intrusive_ptr make() {
+ return new Connection();
+ }
+
+ static intrusive_ptr find(::rdma_cm_id* i) {
+ if (!i)
+ return 0;
+ Connection* id = static_cast< Connection* >(i->context);
+ if (!id)
+ throw std::logic_error("Couldn't find existing Connection");
+ return id;
+ }
+
+ template <typename T>
+ void addContext(T* c) {
+ // Don't allow replacing context
+ if (!context)
+ context = c;
+ }
+
+ template <typename T>
+ T* getContext() {
+ return static_cast<T*>(context);
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void nonblocking() {
+ assert(id.get());
+ ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ ConnectionEvent getNextEvent() {
+ assert(id.get());
+ ::rdma_cm_event* e;
+ int rc = ::rdma_get_cm_event(id->channel, &e);
+ if (rc == -1 && errno == EAGAIN)
+ return ConnectionEvent();
+ CHECK(rc);
+ return ConnectionEvent(e);
+ }
+
+ void bind(sockaddr& src_addr) const {
+ assert(id.get());
+ CHECK(::rdma_bind_addr(id.get(), &src_addr));
+ }
+
+ void listen(int backlog = DEFAULT_BACKLOG) const {
+ assert(id.get());
+ CHECK(::rdma_listen(id.get(), backlog));
+ }
+
+ void resolve_addr(
+ sockaddr& dst_addr,
+ sockaddr* src_addr = 0,
+ int timeout_ms = DEFAULT_TIMEOUT) const
+ {
+ assert(id.get());
+ CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, timeout_ms));
+ }
+
+ void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const {
+ assert(id.get());
+ CHECK(::rdma_resolve_route(id.get(), timeout_ms));
+ }
+
+ void disconnect() const {
+ assert(id.get());
+ CHECK(::rdma_disconnect(id.get()));
+ }
+
+ // TODO: Currently you can only connect with the default connection parameters
+ void connect() {
+ assert(id.get());
+
+ // Need to have a queue pair before we can connect
+ ensureQueuePair();
+
+ ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+ CHECK(::rdma_connect(id.get(), &p));
+ }
+
+ template <typename T>
+ void connect(const T* data) {
+ assert(id.get());
+ // Need to have a queue pair before we can connect
+ ensureQueuePair();
+
+ ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+ p.private_data = data;
+ p.private_data_len = sizeof(T);
+ CHECK(::rdma_connect(id.get(), &p));
+ }
+
+ // TODO: Not sure how to default accept params - they come from the connection request
+ // event
+ template <typename T>
+ void accept(const ::rdma_conn_param& param, const T* data) {
+ assert(id.get());
+ // Need to have a queue pair before we can accept
+ ensureQueuePair();
+
+ ::rdma_conn_param p = param;
+ p.private_data = data;
+ p.private_data_len = sizeof(T);
+ CHECK(::rdma_accept(id.get(), &p));
+ }
+
+ void accept(const ::rdma_conn_param& param) {
+ assert(id.get());
+ // Need to have a queue pair before we can accept
+ ensureQueuePair();
+
+ ::rdma_conn_param p = param;
+ p.private_data = 0;
+ p.private_data_len = 0;
+ CHECK(::rdma_accept(id.get(), &p));
+ }
+
+ template <typename T>
+ void reject(const T* data) const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), data, sizeof(T)));
+ }
+
+ void reject() const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), 0, 0));
+ }
+
+ QueuePair::intrusive_ptr getQueuePair() {
+ assert(id.get());
+
+ ensureQueuePair();
+
+ return qp;
+ }
+ };
+
+ inline QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ pd(allocPd(i->verbs)),
+ cchannel(mkCChannel(i->verbs)),
+ scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+ rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+ id(i),
+ outstandingSendEvents(0),
+ outstandingRecvEvents(0)
+ {
+ impl->fd = cchannel->fd;
+
+ // Set cq context to this QueuePair object so we can find
+ // ourselves again
+ scq->cq_context = this;
+ rcq->cq_context = this;
+
+ ::ibv_qp_init_attr qp_attr = {};
+
+ // TODO: make a default struct for this
+ qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES;
+ qp_attr.cap.max_send_sge = 4;
+ qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES;
+ qp_attr.cap.max_recv_sge = 4;
+
+ qp_attr.send_cq = scq.get();
+ qp_attr.recv_cq = rcq.get();
+ qp_attr.qp_type = IBV_QPT_RC;
+
+ CHECK(::rdma_create_qp(id.get(), pd.get(), &qp_attr));
+
+ // Set the qp context to this so we can find ourselves again
+ id->qp->qp_context = this;
+ }
+
+ inline QueuePair::~QueuePair() {
+ if (outstandingSendEvents > 0)
+ ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
+ if (outstandingRecvEvents > 0)
+ ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
+
+ ::rdma_destroy_qp(id.get());
+ }
+
+ inline void QueuePair::notifyRecv() {
+ CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
+ }
+
+ inline void QueuePair::notifySend() {
+ CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
+ }
+
+ inline void QueuePair::postRecv(Buffer* buf) {
+ ::ibv_recv_wr rwr = {};
+ ::ibv_sge sge;
+
+ sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+ sge.length = buf->dataCount;
+ sge.lkey = buf->mr->lkey;
+
+ rwr.wr_id = reinterpret_cast<uint64_t>(buf);
+ rwr.sg_list = &sge;
+ rwr.num_sge = 1;
+
+ ::ibv_recv_wr* badrwr = 0;
+ CHECK_IBV(::ibv_post_recv(id->qp, &rwr, &badrwr));
+ if (badrwr)
+ throw std::logic_error("ibv_post_recv(): Bad rwr");
+ }
+
+ inline void QueuePair::postSend(Buffer* buf) {
+ ::ibv_send_wr swr = {};
+ ::ibv_sge sge;
+
+ sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+ sge.length = buf->dataCount;
+ sge.lkey = buf->mr->lkey;
+
+ swr.wr_id = reinterpret_cast<uint64_t>(buf);
+ swr.opcode = IBV_WR_SEND;
+ swr.send_flags = IBV_SEND_SIGNALED;
+ swr.sg_list = &sge;
+ swr.num_sge = 1;
+
+ ::ibv_send_wr* badswr = 0;
+ CHECK_IBV(::ibv_post_send(id->qp, &swr, &badswr));
+ if (badswr)
+ throw std::logic_error("ibv_post_send(): Bad swr");
+ }
+
+ inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
+ id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
+ Connection::find(e->id) : new Connection(e->id)),
+ listen_id(Connection::find(e->listen_id)),
+ event(e, acker)
+ {}
+}
+
+inline std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
+# define CHECK_TYPE(t) case t: o << #t; break;
+ switch(t) {
+ CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
+ CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
+ CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
+ CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
+ CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
+ CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
+ CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
+ CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
+ CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
+ }
+# undef CHECK_TYPE
+ return o;
+}
+
+#endif // RDMA_WRAP_H