diff options
author | Andrew Stitcher <astitcher@apache.org> | 2008-04-28 04:41:46 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-28 04:41:46 +0000 |
commit | 9f153bc328112ed2ee25a801eff1f6a277c7bb19 (patch) | |
tree | acd13eebcfe1a3ee196ab229741ce6a20e9eb27c /cpp/src | |
parent | a301f95243dd1cd367a0a8d041c1168b8adc1e86 (diff) | |
download | qpid-python-9f153bc328112ed2ee25a801eff1f6a277c7bb19.tar.gz |
Work In Progress:
Added initial rdma code including test server and client
Turn off rdma support by default but autoconf should now detect whether
necessary rdma/ibverbs libs and headers are present
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652053 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 33 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 178 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 351 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 128 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 142 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_exception.h | 45 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_factories.cpp | 39 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_factories.h | 48 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.h | 550 |
9 files changed, 1514 insertions, 0 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index a64f70abd8..1e31ac60fd 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -91,6 +91,39 @@ noinst_LTLIBRARIES=libLogger.la # libqpidamqp_0_10.la libLogger_la_SOURCES=qpid/log/Logger.cpp qpid/log/Logger.h libLogger_la_CXXFLAGS=$(AM_CXXFLAGS) -Wno-unused-parameter +if RDMA + +# RDMA (Infiniband) protocol code +libqpidrdma_la_SOURCES = \ + qpid/sys/rdma/rdma_exception.h \ + qpid/sys/rdma/rdma_factories.cpp \ + qpid/sys/rdma/RdmaIO.cpp \ + qpid/sys/rdma/RdmaIO.h \ + qpid/sys/rdma/rdma_wrap.h +libqpidrdma_la_LIBADD = \ + -lrdmacm \ + -libverbs +libqpidrdma_la_CXXFLAGS = \ + $(AM_CXXFLAGS) -Wno-missing-field-initializers +noinst_LTLIBRARIES += \ + libqpidrdma.la +qpidd_LDADD += \ + libqpidrdma.la + +noinst_PROGRAMS += RdmaServer RdmaClient +RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp +RdmaServer_CXXFLAGS = \ + $(AM_CXXFLAGS) -Wno-missing-field-initializers +RdmaServer_LDADD = \ + libqpidrdma.la libqpidcommon.la +RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp +RdmaClient_CXXFLAGS = \ + $(AM_CXXFLAGS) -Wno-missing-field-initializers +RdmaClient_LDADD = \ + libqpidrdma.la libqpidcommon.la + +endif + # New 0-10 codec, to be integrated in future. # libqpidamqp_0_10_la_SOURCES= EXTRA_DIST+=\ diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp new file mode 100644 index 0000000000..7c2d9de505 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -0,0 +1,178 @@ +#include "RdmaIO.h" +#include "qpid/sys/Time.h" + +#include <netdb.h> +#include <arpa/inet.h> + +#include <vector> +#include <string> +#include <iostream> +#include <algorithm> +#include <cmath> +#include <boost/bind.hpp> + +using std::vector; +using std::string; +using std::cout; +using std::cerr; +using std::copy; +using std::rand; + +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::sys::TIME_SEC; +using qpid::sys::TIME_INFINITE; + +// count of messages +int64_t smsgs = 0; +int64_t sbytes = 0; +int64_t rmsgs = 0; +int64_t rbytes = 0; + +int outstandingwrites = 0; + +int target = 1000000; +int msgsize = 200; +AbsTime startTime; +Duration sendingDuration(TIME_INFINITE); +Duration fullTestDuration(TIME_INFINITE); + +vector<char> testString; + +void write(Rdma::AsynchIO& aio) { + //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) { + while (smsgs < target && outstandingwrites < (3*Rdma::DEFAULT_WR_ENTRIES/4)) { + Rdma::Buffer* b = aio.getBuffer(); + std::copy(testString.begin(), testString.end(), b->bytes); + b->dataCount = msgsize; + aio.queueWrite(b); + ++outstandingwrites; + ++smsgs; + sbytes += b->byteCount; + } + //} +} + +void dataError(Rdma::AsynchIO&) { + cout << "Data error:\n"; +} + +void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) { + ++rmsgs; + rbytes += b->byteCount; + + // When all messages have been recvd stop + if (rmsgs < target) { + write(aio); + return; + } + + fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); + if (outstandingwrites == 0) + p->shutdown(); +} + +void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) { + --outstandingwrites; + if (smsgs < target) { + write(aio); + return; + } + + sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); + if (smsgs >= target && rmsgs >= target && outstandingwrites == 0) + p->shutdown(); +} + +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { + cout << "Connected\n"; + Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); + + Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize, + boost::bind(&data, poller, _1, _2), + boost::bind(&idle, poller, _1), + dataError); + + startTime = AbsTime::now(); + write(*aio); + + aio->start(poller); +} + +void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { + cout << "Disconnected\n"; + p->shutdown(); +} + +void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { + cout << "Connection error\n"; + p->shutdown(); +} + +void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { + cout << "Connection rejected\n"; + p->shutdown(); +} + +int main(int argc, char* argv[]) { + vector<string> args(&argv[0], &argv[argc]); + + ::addrinfo *res; + ::addrinfo hints = {}; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + string port = (args.size() < 3) ? "20079" : args[2]; + int n = ::getaddrinfo(args[1].c_str(), port.c_str(), &hints, &res); + if (n<0) { + cerr << "Can't find information for: " << args[1] << "\n"; + return 1; + } else { + cout << "Connecting to: " << args[1] << ":" << port <<"\n"; + } + + if (args.size() > 3) + msgsize = atoi(args[3].c_str()); + cout << "Message size: " << msgsize << "\n"; + + // Make a random message of that size + testString.resize(msgsize); + for (int i = 0; i < msgsize; ++i) { + testString[i] = 32 + rand() & 0x3f; + } + + try { + boost::shared_ptr<Poller> p(new Poller()); + Dispatcher d(p); + + Rdma::Connector c( + *res->ai_addr, + boost::bind(&connected, p, _1), + boost::bind(&connectionError, p, _1), + boost::bind(&disconnected, p, _1), + boost::bind(&rejected, p, _1)); + + c.start(p); + d.run(); + } catch (Rdma::Exception& e) { + int err = e.getError(); + cerr << "Error: " << e.what() << "(" << err << ")\n"; + } + + cout + << "Sent: " << smsgs + << "msgs (" << sbytes + << "bytes) in: " << double(sendingDuration)/TIME_SEC + << "s: " << double(smsgs)*TIME_SEC/sendingDuration + << "msgs/s(" << double(sbytes)*TIME_SEC/sendingDuration + << "bytes/s)\n"; + cout + << "Recd: " << rmsgs + << "msgs (" << rbytes + << "bytes) in: " << double(fullTestDuration)/TIME_SEC + << "s: " << double(rmsgs)*TIME_SEC/fullTestDuration + << "msgs/s(" << double(rbytes)*TIME_SEC/fullTestDuration + << "bytes/s)\n"; + +} diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp new file mode 100644 index 0000000000..31d109ea4d --- /dev/null +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -0,0 +1,351 @@ +#include "RdmaIO.h" + +#include <iostream> +#include <boost/bind.hpp> + +namespace Rdma { + AsynchIO::AsynchIO( + QueuePair::intrusive_ptr q, + int s, + ReadCallback rc, + IdleCallback ic, + ErrorCallback ec + ) : + qp(q), + dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0), + bufferSize(s), + recvBufferCount(DEFAULT_WR_ENTRIES), + readCallback(rc), + idleCallback(ic), + errorCallback(ec) + { + qp->nonblocking(); + qp->notifyRecv(); + qp->notifySend(); + + // Prepost some recv buffers before we go any further + for (int i = 0; i<recvBufferCount; ++i) { + Buffer* b = qp->createBuffer(bufferSize); + buffers.push_front(b); + b->dataCount = b->byteCount; + qp->postRecv(b); + } + } + + AsynchIO::~AsynchIO() { + // The buffers ptr_deque automatically deletes all the buffers we've allocated + } + + void AsynchIO::start(Poller::shared_ptr poller) { + dataHandle.startWatch(poller); + } + + void AsynchIO::queueReadBuffer(Buffer*) { + } + + void AsynchIO::queueWrite(Buffer* buff) { + qp->postSend(buff); + } + + void AsynchIO::notifyPendingWrite() { + } + + void AsynchIO::queueWriteClose() { + } + + Buffer* AsynchIO::getBuffer() { + if (bufferQueue.empty()) { + Buffer* b = qp->createBuffer(bufferSize); + buffers.push_front(b); + b->dataCount = 0; + return b; + } else { + Buffer* b = bufferQueue.front(); + bufferQueue.pop_front(); + b->dataCount = 0; + b->dataStart = 0; + return b; + } + + } + + void AsynchIO::dataEvent(DispatchHandle&) { + QueuePair::intrusive_ptr q = qp->getNextChannelEvent(); + + // If no event do nothing + if (!q) + return; + + assert(q == qp); + + // Re-enable notification for queue + qp->notifySend(); + qp->notifyRecv(); + + // Repeat until no more events + do { + QueuePairEvent e(qp->getNextEvent()); + if (!e) + return; + + ::ibv_wc_status status = e.getEventStatus(); + if (status != IBV_WC_SUCCESS) { + errorCallback(*this); + return; + } + + // Test if recv (or recv with imm) + //::ibv_wc_opcode eventType = e.getEventType(); + Buffer* b = e.getBuffer(); + QueueDirection dir = e.getDirection(); + if (dir == RECV) { + readCallback(*this, b); + // At this point the buffer has been consumed so put it back on the recv queue + qp->postRecv(b); + } else { + bufferQueue.push_front(b); + idleCallback(*this); + } + } while (true); + } + + Listener::Listener( + const sockaddr& src, + ConnectedCallback cc, + ErrorCallback errc, + DisconnectedCallback dc, + ConnectionRequestCallback crc + ) : + src_addr(src), + ci(Connection::make()), + handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0), + connectedCallback(cc), + errorCallback(errc), + disconnectedCallback(dc), + connectionRequestCallback(crc), + state(IDLE) + { + ci->nonblocking(); + } + + void Listener::start(Poller::shared_ptr poller) { + ci->bind(src_addr); + ci->listen(); + state = LISTENING; + handle.startWatch(poller); + } + + void Listener::connectionEvent(DispatchHandle&) { + ConnectionEvent e(ci->getNextEvent()); + + // If (for whatever reason) there was no event do nothing + if (!e) + return; + + // Important documentation ommision the new rdma_cm_id + // you get from CONNECT_REQUEST has the same context info + // as its parent listening rdma_cm_id + ::rdma_cm_event_type eventType = e.getEventType(); + Rdma::Connection::intrusive_ptr id = e.getConnection(); + + switch (eventType) { + case RDMA_CM_EVENT_CONNECT_REQUEST: { + bool accept = true; + // Extract connection parameters and private data from event + ::rdma_conn_param conn_param = e.getConnectionParam(); + + if (connectionRequestCallback) + //TODO: pass private data to callback (and accept new private data for accept somehow) + accept = connectionRequestCallback(id); + if (accept) { + // Accept connection + id->accept(conn_param); + } else { + //Reject connection + id->reject(); + } + + break; + } + case RDMA_CM_EVENT_ESTABLISHED: + connectedCallback(id); + break; + case RDMA_CM_EVENT_DISCONNECTED: + disconnectedCallback(id); + break; + case RDMA_CM_EVENT_CONNECT_ERROR: + errorCallback(id); + break; + default: + std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; + } + } + + Connector::Connector( + const sockaddr& dst, + ConnectedCallback cc, + ErrorCallback errc, + DisconnectedCallback dc, + RejectedCallback rc + ) : + dst_addr(dst), + ci(Connection::make()), + handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0), + connectedCallback(cc), + errorCallback(errc), + disconnectedCallback(dc), + rejectedCallback(rc), + state(IDLE) + { + ci->nonblocking(); + } + + void Connector::start(Poller::shared_ptr poller) { + ci->resolve_addr(dst_addr); + state = RESOLVE_ADDR; + handle.startWatch(poller); + } + + void Connector::connectionEvent(DispatchHandle&) { + ConnectionEvent e(ci->getNextEvent()); + + // If (for whatever reason) there was no event do nothing + if (!e) + return; + + ::rdma_cm_event_type eventType = e.getEventType(); +#if 1 + switch (eventType) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + // RESOLVE_ADDR + state = RESOLVE_ROUTE; + ci->resolve_route(); + break; + case RDMA_CM_EVENT_ADDR_ERROR: + // RESOLVE_ADDR + state = ERROR; + errorCallback(ci); + break; + case RDMA_CM_EVENT_ROUTE_RESOLVED: + // RESOLVE_ROUTE: + state = CONNECTING; + ci->connect(); + break; + case RDMA_CM_EVENT_ROUTE_ERROR: + // RESOLVE_ROUTE: + state = ERROR; + errorCallback(ci); + break; + case RDMA_CM_EVENT_CONNECT_ERROR: + // CONNECTING + state = ERROR; + errorCallback(ci); + break; + case RDMA_CM_EVENT_UNREACHABLE: + // CONNECTING + state = ERROR; + errorCallback(ci); + break; + case RDMA_CM_EVENT_REJECTED: + // CONNECTING + state = REJECTED; + rejectedCallback(ci); + break; + case RDMA_CM_EVENT_ESTABLISHED: + // CONNECTING + state = ESTABLISHED; + connectedCallback(ci); + break; + case RDMA_CM_EVENT_DISCONNECTED: + // ESTABLISHED + state = DISCONNECTED; + disconnectedCallback(ci); + break; + default: + std::cerr << "Warning: unexpected event in " << state << " state - " << eventType << "\n"; + state = ERROR; + } +#else + switch (state) { + case IDLE: + std::cerr << "Warning: event in IDLE state\n"; + break; + case RESOLVE_ADDR: + switch (eventType) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + state = RESOLVE_ROUTE; + ci->resolve_route(); + break; + case RDMA_CM_EVENT_ADDR_ERROR: + state = ERROR; + errorCallback(ci); + break; + default: + state = ERROR; + std::cerr << "Warning: unexpected response to resolve_addr - " << eventType << "\n"; + } + break; + case RESOLVE_ROUTE: + switch (eventType) { + case RDMA_CM_EVENT_ROUTE_RESOLVED: + state = CONNECTING; + ci->connect(); + break; + case RDMA_CM_EVENT_ROUTE_ERROR: + state = ERROR; + errorCallback(ci); + break; + default: + state = ERROR; + std::cerr << "Warning: unexpected response to resolve_route - " << eventType << "\n"; + } + break; + case CONNECTING: + switch (eventType) { + case RDMA_CM_EVENT_CONNECT_RESPONSE: + std::cerr << "connect_response\n"; + break; + case RDMA_CM_EVENT_CONNECT_ERROR: + state = ERROR; + errorCallback(ci); + break; + case RDMA_CM_EVENT_UNREACHABLE: + state = ERROR; + errorCallback(ci); + break; + case RDMA_CM_EVENT_REJECTED: + state = REJECTED; + rejectedCallback(ci); + break; + case RDMA_CM_EVENT_ESTABLISHED: + state = ESTABLISHED; + connectedCallback(ci); + break; + default: + state = ERROR; + std::cerr << "Warning: unexpected response to connect - " << eventType << "\n"; + } + break; + case ESTABLISHED: + switch (eventType) { + case RDMA_CM_EVENT_DISCONNECTED: + disconnectedCallback(ci); + break; + default: + std::cerr << "Warning: unexpected event in ESTABLISHED state - " << eventType << "\n"; + } + break; + case REJECTED: + std::cerr << "Warning: event in REJECTED state - " << eventType << "\n"; + break; + case ERROR: + std::cerr << "Warning: event in ERROR state - " << eventType << "\n"; + break; + case LISTENING: + case ACCEPTING: + std::cerr << "Warning: in an illegal state (and received event!) - " << eventType << "\n"; + break; + } +#endif + } +} diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h new file mode 100644 index 0000000000..efa0ee7097 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -0,0 +1,128 @@ +#ifndef Rdma_Acceptor_h +#define Rdma_Acceptor_h + +#include "rdma_wrap.h" + +#include "qpid/sys/Dispatcher.h" + +#include <netinet/in.h> + +#include <boost/function.hpp> +#include <boost/ptr_container/ptr_deque.hpp> +#include <deque> + +using qpid::sys::DispatchHandle; +using qpid::sys::Poller; + +namespace Rdma { + + class Connection; + enum ConnectionState { + IDLE, + RESOLVE_ADDR, + RESOLVE_ROUTE, + LISTENING, + CONNECTING, + ACCEPTING, + ESTABLISHED, + REJECTED, + DISCONNECTED, + ERROR + }; + + typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback; + typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback; + typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback; + typedef boost::function1<bool, Rdma::Connection::intrusive_ptr&> ConnectionRequestCallback; + typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> RejectedCallback; + + class AsynchIO + { + typedef boost::function1<void, AsynchIO&> ErrorCallback; + typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; + typedef boost::function1<void, AsynchIO&> IdleCallback; + + QueuePair::intrusive_ptr qp; + DispatchHandle dataHandle; + int bufferSize; + int recvBufferCount; + std::deque<Buffer*> bufferQueue; + boost::ptr_deque<Buffer> buffers; + + ReadCallback readCallback; + IdleCallback idleCallback; + ErrorCallback errorCallback; + + public: + AsynchIO( + QueuePair::intrusive_ptr q, + int s, + ReadCallback rc, + IdleCallback ic, + ErrorCallback ec + ); + ~AsynchIO(); + + void start(Poller::shared_ptr poller); + void queueReadBuffer(Buffer* buff); + void queueWrite(Buffer* buff); + void notifyPendingWrite(); + void queueWriteClose(); + Buffer* getBuffer(); + + private: + void dataEvent(DispatchHandle& handle); + }; + + class Listener + { + sockaddr src_addr; + Connection::intrusive_ptr ci; + DispatchHandle handle; + ConnectedCallback connectedCallback; + ErrorCallback errorCallback; + DisconnectedCallback disconnectedCallback; + ConnectionRequestCallback connectionRequestCallback; + ConnectionState state; + + public: + Listener( + const sockaddr& src, + ConnectedCallback cc, + ErrorCallback errc, + DisconnectedCallback dc, + ConnectionRequestCallback crc = 0 + ); + void start(Poller::shared_ptr poller); + + private: + void connectionEvent(DispatchHandle& handle); + }; + + class Connector + { + sockaddr dst_addr; + Connection::intrusive_ptr ci; + DispatchHandle handle; + ConnectedCallback connectedCallback; + ErrorCallback errorCallback; + DisconnectedCallback disconnectedCallback; + RejectedCallback rejectedCallback; + ConnectionState state; + + public: + Connector( + const sockaddr& dst, + ConnectedCallback cc, + ErrorCallback errc, + DisconnectedCallback dc, + RejectedCallback rc = 0 + ); + void start(Poller::shared_ptr poller); + + private: + void connectionEvent(DispatchHandle& handle); + }; +} + +#endif // Rdma_Acceptor_h diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp new file mode 100644 index 0000000000..488fe28658 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -0,0 +1,142 @@ +#include "RdmaIO.h" + +#include <arpa/inet.h> + +#include <vector> +#include <queue> +#include <string> +#include <iostream> + +#include <boost/bind.hpp> + +using std::vector; +using std::queue; +using std::string; +using std::cout; +using std::cerr; + +using qpid::sys::Poller; +using qpid::sys::Dispatcher; + +// All the accepted connections +struct ConRec { + Rdma::Connection::intrusive_ptr connection; + Rdma::AsynchIO* data; + int outstandingWrites; + queue<Rdma::Buffer*> queuedWrites; + + ConRec(Rdma::Connection::intrusive_ptr c) : + connection(c), + outstandingWrites(0) + {} +}; + +void dataError(Rdma::AsynchIO&) { + cout << "Data error:\n"; +} + +void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { + // Echo data back + Rdma::Buffer* buf = a.getBuffer(); + std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes); + buf->dataCount = b->dataCount; + if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { + a.queueWrite(buf); + ++(cr->outstandingWrites); + } else { + cr->queuedWrites.push(buf); + } +} + +void idle(ConRec* cr, Rdma::AsynchIO& a) { + --(cr->outstandingWrites); + //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4) + while (!cr->queuedWrites.empty() && cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { + Rdma::Buffer* buf = cr->queuedWrites.front(); + cr->queuedWrites.pop(); + a.queueWrite(buf); + ++(cr->outstandingWrites); + } +} + +void disconnected(Rdma::Connection::intrusive_ptr& ci) { + ConRec* cr = ci->getContext<ConRec>(); + cr->connection->disconnect(); + delete cr->data; + delete cr; + cout << "Disconnected: " << cr << "\n"; +} + +void connectionError(Rdma::Connection::intrusive_ptr& ci) { + ConRec* cr = ci->getContext<ConRec>(); + cr->connection->disconnect(); + if (cr) { + delete cr->data; + delete cr; + } + cout << "Connection error: " << cr << "\n"; +} + +bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { + cout << "Incoming connection: "; + + // For fun reject alternate connection attempts + static bool x = false; + x ^= 1; + + // Must create aio here so as to prepost buffers *before* we accept connection + if (x) { + ConRec* cr = new ConRec(ci); + Rdma::AsynchIO* aio = + new Rdma::AsynchIO(ci->getQueuePair(), 8000, + boost::bind(data, cr, _1, _2), + boost::bind(idle, cr, _1), + dataError); + ci->addContext(cr); + cr->data = aio; + cout << "Accept=>" << cr << "\n"; + } else { + cout << "Reject\n"; + } + + return x; +} + +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { + static int cnt = 0; + ConRec* cr = ci->getContext<ConRec>(); + cout << "Connected: " << cr << "(" << ++cnt << ")\n"; + + cr->data->start(poller); +} + +int main(int argc, char* argv[]) { + vector<string> args(&argv[0], &argv[argc]); + + ::sockaddr_in sin; + + int port = (args.size() < 2) ? 20079 : atoi(args[1].c_str()); + cout << "Listening on port: " << port << "\n"; + + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + sin.sin_addr.s_addr = INADDR_ANY; + + try { + boost::shared_ptr<Poller> p(new Poller()); + Dispatcher d(p); + + Rdma::Listener a((const sockaddr&)(sin), + boost::bind(connected, p, _1), + connectionError, + disconnected, + connectionRequest); + + + a.start(p); + d.run(); + } catch (Rdma::Exception& e) { + int err = e.getError(); + cerr << "Error: " << e.what() << "(" << err << ")\n"; + } +} diff --git a/cpp/src/qpid/sys/rdma/rdma_exception.h b/cpp/src/qpid/sys/rdma/rdma_exception.h new file mode 100644 index 0000000000..2773f25917 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/rdma_exception.h @@ -0,0 +1,45 @@ +#ifndef RDMA_EXCEPTION_H +#define RDMA_EXCEPTION_H + +#include <exception> + +#include <errno.h> +#include <string.h> + +namespace Rdma { + static __thread char s[50]; + class Exception : public std::exception { + int err; + + public: + Exception(int e) : err(e) {} + int getError() { return err; } + const char* what() const throw() { + return ::strerror_r(err, s, 50); + } + }; + + inline void THROW_ERRNO() { + throw Rdma::Exception(errno); + } + + inline void CHECK(int rc) { + if (rc != 0) + throw Rdma::Exception((rc == -1) ? errno : rc >0 ? rc : -rc); + } + + inline void CHECK_IBV(int rc) { + if (rc != 0) + throw Rdma::Exception(rc); + } + + template <typename T> + inline + T* CHECK_NULL(T* rc) { + if (rc == 0) + THROW_ERRNO(); + return rc; + } +} + +#endif // RDMA_EXCEPTION_H diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.cpp b/cpp/src/qpid/sys/rdma/rdma_factories.cpp new file mode 100644 index 0000000000..53dc4f8935 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/rdma_factories.cpp @@ -0,0 +1,39 @@ +#include "rdma_factories.h" + +namespace Rdma { + void acker(::rdma_cm_event* e) throw () { + if (e) + // Intentionally ignore return value - we can't do anything about it here + (void) ::rdma_ack_cm_event(e); + } + + void destroyEChannel(::rdma_event_channel* c) throw () { + if (c) + // Intentionally ignore return value - we can't do anything about it here + (void) ::rdma_destroy_event_channel(c); + } + + void destroyId(::rdma_cm_id* i) throw () { + if (i) + // Intentionally ignore return value - we can't do anything about it here + (void) ::rdma_destroy_id(i); + } + + void deallocPd(::ibv_pd* p) throw () { + if (p) + // Intentionally ignore return value - we can't do anything about it here + (void) ::ibv_dealloc_pd(p); + } + + void destroyCChannel(::ibv_comp_channel* c) throw () { + if (c) + // Intentionally ignore return value - we can't do anything about it here + (void) ::ibv_destroy_comp_channel(c); + } + + void destroyCq(::ibv_cq* cq) throw () { + if (cq) + (void) ::ibv_destroy_cq(cq); + } + +} diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.h b/cpp/src/qpid/sys/rdma/rdma_factories.h new file mode 100644 index 0000000000..26a93bb494 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/rdma_factories.h @@ -0,0 +1,48 @@ +#ifndef RDMA_FACTORIES_H +#define RDMA_FACTORIES_H + +#include "rdma_exception.h" + +#include <rdma/rdma_cma.h> + +#include <boost/shared_ptr.hpp> + +namespace Rdma { + // These allow us to use simple shared_ptrs to do ref counting + void acker(::rdma_cm_event* e) throw (); + void destroyEChannel(::rdma_event_channel* c) throw (); + void destroyId(::rdma_cm_id* i) throw (); + void deallocPd(::ibv_pd* p) throw (); + void destroyCChannel(::ibv_comp_channel* c) throw (); + void destroyCq(::ibv_cq* cq) throw (); + + inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() { + return + boost::shared_ptr< ::rdma_event_channel >(::rdma_create_event_channel(), destroyEChannel); + } + + inline boost::shared_ptr< ::rdma_cm_id > + mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) { + ::rdma_cm_id* i; + CHECK(::rdma_create_id(ec, &i, context, ps)); + return boost::shared_ptr< ::rdma_cm_id >(i, destroyId); + } + + inline boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) { + ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c)); + return boost::shared_ptr< ::ibv_pd >(pd, deallocPd); + } + + inline boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) { + ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c)); + return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel); + } + + inline boost::shared_ptr< ::ibv_cq > + mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) { + ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0)); + return boost::shared_ptr< ::ibv_cq >(cq, destroyCq); + } +} + +#endif // RDMA_FACTORIES_H diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h new file mode 100644 index 0000000000..421c8fc41b --- /dev/null +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -0,0 +1,550 @@ +#ifndef RDMA_WRAP_H +#define RDMA_WRAP_H + +#include "rdma_factories.h" + +#include <rdma/rdma_cma.h> + +#include "qpid/RefCounted.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/posix/PrivatePosix.h" + +#include <fcntl.h> + +#include <vector> +#include <algorithm> +#include <iostream> +#include <stdexcept> +#include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> + +namespace Rdma { + const int DEFAULT_TIMEOUT = 2000; // 2 secs + const int DEFAULT_BACKLOG = 100; + const int DEFAULT_CQ_ENTRIES = 256; + const int DEFAULT_WR_ENTRIES = 64; + const ::rdma_conn_param DEFAULT_CONNECT_PARAM = { + 0, // .private_data + 0, // .private_data_len + 4, // .responder_resources + 4, // .initiator_depth + 0, // .flow_control + 5, // .retry_count + 7 // .rnr_retry_count + }; + + struct Buffer { + friend class QueuePair; + + char* const bytes; + const int32_t byteCount; + int32_t dataStart; + int32_t dataCount; + + Buffer(::ibv_pd* pd, char* const b, const int32_t s) : + bytes(b), + byteCount(s), + dataStart(0), + dataCount(0), + mr(CHECK_NULL(::ibv_reg_mr( + pd, bytes, byteCount, + ::IBV_ACCESS_LOCAL_WRITE))) + {} + + ~Buffer() { + (void) ::ibv_dereg_mr(mr); + delete [] bytes; + } + + private: + ::ibv_mr* mr; + }; + + class Connection; + + enum QueueDirection { + NONE, + SEND, + RECV + }; + + class QueuePairEvent { + boost::shared_ptr< ::ibv_cq > cq; + ::ibv_wc wc; + QueueDirection dir; + + friend class QueuePair; + + QueuePairEvent() : + dir(NONE) + {} + + QueuePairEvent( + const ::ibv_wc& w, + boost::shared_ptr< ::ibv_cq > c, + QueueDirection d) : + cq(c), + wc(w), + dir(d) + { + assert(dir != NONE); + } + + public: + operator bool() const { + return dir != NONE; + } + + QueueDirection getDirection() const { + return dir; + } + + ::ibv_wc_opcode getEventType() const { + return wc.opcode; + } + + ::ibv_wc_status getEventStatus() const { + return wc.status; + } + + Buffer* getBuffer() const { + Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id); + b->dataCount = wc.byte_len; + return b; + } + }; + + // Wrapper for a queue pair - this has the functionality for + // putting buffers on the receive queue and for sending buffers + // to the other end of the connection. + // + // Currently QueuePairs are contained inside Connections and have no + // separate lifetime + class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { + boost::shared_ptr< ::ibv_pd > pd; + boost::shared_ptr< ::ibv_comp_channel > cchannel; + boost::shared_ptr< ::ibv_cq > scq; + boost::shared_ptr< ::ibv_cq > rcq; + boost::shared_ptr< ::rdma_cm_id > id; + int outstandingSendEvents; + int outstandingRecvEvents; + + friend class Connection; + + QueuePair(boost::shared_ptr< ::rdma_cm_id > id); + ~QueuePair(); + + public: + typedef boost::intrusive_ptr<QueuePair> intrusive_ptr; + + // Create a buffer to use for writing + Buffer* createBuffer(int s) { + return new Buffer(pd.get(), new char[s], s); + } + + // Make channel non-blocking by making + // associated fd nonblocking + void nonblocking() { + ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK); + } + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + QueuePair::intrusive_ptr getNextChannelEvent() { + // First find out which cq has the event + ::ibv_cq* cq; + void* ctx; + int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx); + if (rc == -1 && errno == EAGAIN) + return 0; + CHECK(rc); + + // Batch acknowledge the event + if (cq == scq.get()) { + if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) { + ::ibv_ack_cq_events(cq, outstandingSendEvents); + outstandingSendEvents = 0; + } + } else if (cq == rcq.get()) { + if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) { + ::ibv_ack_cq_events(cq, outstandingRecvEvents); + outstandingRecvEvents = 0; + } + } + + return static_cast<QueuePair*>(ctx); + } + + QueuePairEvent getNextEvent() { + ::ibv_wc w; + if (::ibv_poll_cq(scq.get(), 1, &w) == 1) + return QueuePairEvent(w, scq, SEND); + else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1) + return QueuePairEvent(w, rcq, RECV); + else + return QueuePairEvent(); + } + + void postRecv(Buffer* buf); + void postSend(Buffer* buf); + void notifyRecv(); + void notifySend(); + }; + + class ConnectionEvent { + friend class Connection; + + // The order of the members is important as we have to acknowledge + // the event before destroying the ids on destruction + boost::intrusive_ptr<Connection> id; + boost::intrusive_ptr<Connection> listen_id; + boost::shared_ptr< ::rdma_cm_event > event; + + ConnectionEvent() {} + ConnectionEvent(::rdma_cm_event* e); + + // Default copy, assignment and destructor ok + public: + operator bool() const { + return event; + } + + ::rdma_cm_event_type getEventType() const { + return event->event; + } + + ::rdma_conn_param getConnectionParam() const { + if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) { + return event->param.conn; + } else { + ::rdma_conn_param p = {}; + return p; + } + } + + boost::intrusive_ptr<Connection> getConnection () const { + return id; + } + + boost::intrusive_ptr<Connection> getListenId() const { + return listen_id; + } + }; + + // For the moment this is a fairly simple wrapper for rdma_cm_id. + // + // NB: It allocates a protection domain (pd) per connection which means that + // registered buffers can't be shared between different connections + // (this can only happen between connections on the same controller in any case, + // so needs careful management if used) + class Connection : public qpid::sys::IOHandle, public qpid::RefCounted { + boost::shared_ptr< ::rdma_event_channel > channel; + boost::shared_ptr< ::rdma_cm_id > id; + QueuePair::intrusive_ptr qp; + + void* context; + + friend class ConnectionEvent; + friend class QueuePair; + + // Wrap the passed in rdma_cm_id with a Connection + // this basically happens only on connection request + Connection(::rdma_cm_id* i) : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + id(i, destroyId), + context(0) + { + impl->fd = id->channel->fd; + + // Just overwrite the previous context as it will + // have come from the listening connection + if (i) + i->context = this; + } + + Connection() : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + channel(mkEChannel()), + id(mkId(channel.get(), this, RDMA_PS_TCP)), + context(0) + { + impl->fd = channel->fd; + } + + // Default destructor fine + + void ensureQueuePair() { + assert(id.get()); + + // Only allocate a queue pair if there isn't one already + if (qp) + return; + + qp = new QueuePair(id); + } + + public: + typedef boost::intrusive_ptr<Connection> intrusive_ptr; + + static intrusive_ptr make() { + return new Connection(); + } + + static intrusive_ptr find(::rdma_cm_id* i) { + if (!i) + return 0; + Connection* id = static_cast< Connection* >(i->context); + if (!id) + throw std::logic_error("Couldn't find existing Connection"); + return id; + } + + template <typename T> + void addContext(T* c) { + // Don't allow replacing context + if (!context) + context = c; + } + + template <typename T> + T* getContext() { + return static_cast<T*>(context); + } + + // Make channel non-blocking by making + // associated fd nonblocking + void nonblocking() { + assert(id.get()); + ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK); + } + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + ConnectionEvent getNextEvent() { + assert(id.get()); + ::rdma_cm_event* e; + int rc = ::rdma_get_cm_event(id->channel, &e); + if (rc == -1 && errno == EAGAIN) + return ConnectionEvent(); + CHECK(rc); + return ConnectionEvent(e); + } + + void bind(sockaddr& src_addr) const { + assert(id.get()); + CHECK(::rdma_bind_addr(id.get(), &src_addr)); + } + + void listen(int backlog = DEFAULT_BACKLOG) const { + assert(id.get()); + CHECK(::rdma_listen(id.get(), backlog)); + } + + void resolve_addr( + sockaddr& dst_addr, + sockaddr* src_addr = 0, + int timeout_ms = DEFAULT_TIMEOUT) const + { + assert(id.get()); + CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, timeout_ms)); + } + + void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const { + assert(id.get()); + CHECK(::rdma_resolve_route(id.get(), timeout_ms)); + } + + void disconnect() const { + assert(id.get()); + CHECK(::rdma_disconnect(id.get())); + } + + // TODO: Currently you can only connect with the default connection parameters + void connect() { + assert(id.get()); + + // Need to have a queue pair before we can connect + ensureQueuePair(); + + ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; + CHECK(::rdma_connect(id.get(), &p)); + } + + template <typename T> + void connect(const T* data) { + assert(id.get()); + // Need to have a queue pair before we can connect + ensureQueuePair(); + + ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; + p.private_data = data; + p.private_data_len = sizeof(T); + CHECK(::rdma_connect(id.get(), &p)); + } + + // TODO: Not sure how to default accept params - they come from the connection request + // event + template <typename T> + void accept(const ::rdma_conn_param& param, const T* data) { + assert(id.get()); + // Need to have a queue pair before we can accept + ensureQueuePair(); + + ::rdma_conn_param p = param; + p.private_data = data; + p.private_data_len = sizeof(T); + CHECK(::rdma_accept(id.get(), &p)); + } + + void accept(const ::rdma_conn_param& param) { + assert(id.get()); + // Need to have a queue pair before we can accept + ensureQueuePair(); + + ::rdma_conn_param p = param; + p.private_data = 0; + p.private_data_len = 0; + CHECK(::rdma_accept(id.get(), &p)); + } + + template <typename T> + void reject(const T* data) const { + assert(id.get()); + CHECK(::rdma_reject(id.get(), data, sizeof(T))); + } + + void reject() const { + assert(id.get()); + CHECK(::rdma_reject(id.get(), 0, 0)); + } + + QueuePair::intrusive_ptr getQueuePair() { + assert(id.get()); + + ensureQueuePair(); + + return qp; + } + }; + + inline QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + pd(allocPd(i->verbs)), + cchannel(mkCChannel(i->verbs)), + scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), + rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), + id(i), + outstandingSendEvents(0), + outstandingRecvEvents(0) + { + impl->fd = cchannel->fd; + + // Set cq context to this QueuePair object so we can find + // ourselves again + scq->cq_context = this; + rcq->cq_context = this; + + ::ibv_qp_init_attr qp_attr = {}; + + // TODO: make a default struct for this + qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES; + qp_attr.cap.max_send_sge = 4; + qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES; + qp_attr.cap.max_recv_sge = 4; + + qp_attr.send_cq = scq.get(); + qp_attr.recv_cq = rcq.get(); + qp_attr.qp_type = IBV_QPT_RC; + + CHECK(::rdma_create_qp(id.get(), pd.get(), &qp_attr)); + + // Set the qp context to this so we can find ourselves again + id->qp->qp_context = this; + } + + inline QueuePair::~QueuePair() { + if (outstandingSendEvents > 0) + ::ibv_ack_cq_events(scq.get(), outstandingSendEvents); + if (outstandingRecvEvents > 0) + ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents); + + ::rdma_destroy_qp(id.get()); + } + + inline void QueuePair::notifyRecv() { + CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0)); + } + + inline void QueuePair::notifySend() { + CHECK_IBV(ibv_req_notify_cq(scq.get(), 0)); + } + + inline void QueuePair::postRecv(Buffer* buf) { + ::ibv_recv_wr rwr = {}; + ::ibv_sge sge; + + sge.addr = (uintptr_t) buf->bytes+buf->dataStart; + sge.length = buf->dataCount; + sge.lkey = buf->mr->lkey; + + rwr.wr_id = reinterpret_cast<uint64_t>(buf); + rwr.sg_list = &sge; + rwr.num_sge = 1; + + ::ibv_recv_wr* badrwr = 0; + CHECK_IBV(::ibv_post_recv(id->qp, &rwr, &badrwr)); + if (badrwr) + throw std::logic_error("ibv_post_recv(): Bad rwr"); + } + + inline void QueuePair::postSend(Buffer* buf) { + ::ibv_send_wr swr = {}; + ::ibv_sge sge; + + sge.addr = (uintptr_t) buf->bytes+buf->dataStart; + sge.length = buf->dataCount; + sge.lkey = buf->mr->lkey; + + swr.wr_id = reinterpret_cast<uint64_t>(buf); + swr.opcode = IBV_WR_SEND; + swr.send_flags = IBV_SEND_SIGNALED; + swr.sg_list = &sge; + swr.num_sge = 1; + + ::ibv_send_wr* badswr = 0; + CHECK_IBV(::ibv_post_send(id->qp, &swr, &badswr)); + if (badswr) + throw std::logic_error("ibv_post_send(): Bad swr"); + } + + inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) : + id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ? + Connection::find(e->id) : new Connection(e->id)), + listen_id(Connection::find(e->listen_id)), + event(e, acker) + {} +} + +inline std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) { +# define CHECK_TYPE(t) case t: o << #t; break; + switch(t) { + CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED) + CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED) + CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE) + CHECK_TYPE(RDMA_CM_EVENT_REJECTED) + CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED) + CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED) + CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL) + CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN) + CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR) + } +# undef CHECK_TYPE + return o; +} + +#endif // RDMA_WRAP_H |