diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/cpp/src/qpid/sys/rdma | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/sys/rdma')
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp | 247 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp | 720 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.h | 250 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp | 210 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/rdma_exception.h | 69 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp | 105 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/rdma_factories.h | 40 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 566 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h | 287 |
9 files changed, 2494 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp new file mode 100644 index 0000000000..38e9b59541 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/rdma/rdma_exception.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" + +#include <netdb.h> +#include <arpa/inet.h> + +#include <vector> +#include <string> +#include <iostream> +#include <algorithm> +#include <cmath> +#include <boost/bind.hpp> + +using std::vector; +using std::string; +using std::cout; +using std::cerr; +using std::copy; +using std::rand; + +using qpid::sys::Thread; +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::SocketAddress; +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::sys::TIME_SEC; +using qpid::sys::TIME_INFINITE; + +namespace qpid { +namespace tests { + +// count of messages +int64_t smsgs = 0; +int64_t sbytes = 0; +int64_t rmsgs = 0; +int64_t rbytes = 0; + +int target = 1000000; +int msgsize = 200; +AbsTime startTime; +Duration sendingDuration(TIME_INFINITE); +Duration fullTestDuration(TIME_INFINITE); + +// Random generator +// This is an RNG designed by George Marsaglia see http://en.wikipedia.org/wiki/Xorshift +class Xor128Generator { + uint32_t x; + uint32_t y; + uint32_t z; + uint32_t w; + +public: + Xor128Generator() : + x(123456789),y(362436069),z(521288629),w(88675123) + {++(*this);} + + Xor128Generator& operator++() { + uint32_t t = x ^ (x << 11); + x = y; y = z; z = w; + w = w ^ (w >> 19) ^ t ^ (t >> 8); + return *this; + } + + uint32_t operator*() { + return w; + } +}; + +Xor128Generator output; +Xor128Generator input; + +void write(Rdma::AsynchIO& aio) { + while (aio.writable() && smsgs < target) { + Rdma::Buffer* b = aio.getSendBuffer(); + if (!b) break; + b->dataCount(msgsize); + uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes()); + uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t); + while (ip != lip) {*ip++ = *output; ++output;} + aio.queueWrite(b); + ++smsgs; + sbytes += msgsize; + } +} + +void dataError(Rdma::AsynchIO&) { + cout << "Data error:\n"; +} + +void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) { + ++rmsgs; + rbytes += b->dataCount(); + + // Check message is unaltered + bool match = true; + uint32_t* ip = reinterpret_cast<uint32_t*>(b->bytes()); + uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t); + while (ip != lip) { if (*ip++ != *input) {match = false; break;} ++input;} + if (!match) { + cout << "Data doesn't match: at msg " << rmsgs << " byte " << rbytes-b->dataCount() << " (ish)\n"; + exit(1); + } + + // When all messages have been recvd stop + if (rmsgs < target) { + write(aio); + } else { + fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); + if (aio.incompletedWrites() == 0) + p->shutdown(); + } +} + +void full(Rdma::AsynchIO& a, Rdma::Buffer* b) { + // Warn as we shouldn't get here anymore + cerr << "!"; + + // Don't need to keep buffer just adjust the counts + --smsgs; + sbytes -= b->dataCount(); + + // Give buffer back + a.returnSendBuffer(b); +} + +void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) { + if (smsgs < target) { + write(aio); + } else { + sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); + if (rmsgs >= target && aio.incompletedWrites() == 0) + p->shutdown(); + } +} + +void drained(Rdma::AsynchIO&) { + cout << "Drained:\n"; +} + +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { + cout << "Connected\n"; + Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); + + Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), + cp.rdmaProtocolVersion, + cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, + boost::bind(&data, poller, _1, _2), + boost::bind(&idle, poller, _1), + &full, + dataError); + + startTime = AbsTime::now(); + write(*aio); + + aio->start(poller); +} + +void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { + cout << "Disconnected\n"; + p->shutdown(); +} + +void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) { + cout << "Connection error\n"; + p->shutdown(); +} + +void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) { + cout << "Connection rejected\n"; + p->shutdown(); +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char* argv[]) { + vector<string> args(&argv[0], &argv[argc]); + + string host = args[1]; + string port = (args.size() < 3) ? "20079" : args[2]; + + if (args.size() > 3) + msgsize = atoi(args[3].c_str()); + cout << "Message size: " << msgsize << "\n"; + + try { + boost::shared_ptr<Poller> p(new Poller()); + + Rdma::Connector c( + Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&connected, p, _1, _2), + boost::bind(&connectionError, p, _1, _2), + boost::bind(&disconnected, p, _1), + boost::bind(&rejected, p, _1, _2)); + + SocketAddress sa(host, port); + cout << "Connecting to: " << sa.asString() <<"\n"; + c.start(p, sa); + + // The poller loop blocks all signals so run in its own thread + Thread t(*p); + t.join(); + } catch (Rdma::Exception& e) { + int err = e.getError(); + cerr << "Error: " << e.what() << "(" << err << ")\n"; + } + + cout + << "Sent: " << smsgs + << "msgs (" << sbytes + << "bytes) in: " << double(sendingDuration)/TIME_SEC + << "s: " << double(smsgs)*TIME_SEC/sendingDuration + << "msgs/s(" << double(sbytes)*TIME_SEC/sendingDuration + << "bytes/s)\n"; + cout + << "Recd: " << rmsgs + << "msgs (" << rbytes + << "bytes) in: " << double(fullTestDuration)/TIME_SEC + << "s: " << double(rmsgs)*TIME_SEC/fullTestDuration + << "msgs/s(" << double(rbytes)*TIME_SEC/fullTestDuration + << "bytes/s)\n"; + +} diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp new file mode 100644 index 0000000000..78bcdec68e --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -0,0 +1,720 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/rdma/RdmaIO.h" + +#include "qpid/log/Statement.h" + +#include <string> +#include <boost/bind.hpp> + +using qpid::sys::SocketAddress; +using qpid::sys::DispatchHandle; +using qpid::sys::Poller; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; + +namespace Rdma { + // Set packing as these are 'on the wire' structures +# pragma pack(push, 1) + + // Header structure for each transmitted frame + struct FrameHeader { + const static uint32_t FlagsMask = 0xf0000000; + uint32_t data; // written in network order + + FrameHeader() {} + FrameHeader(uint32_t credit, uint32_t flags = 0) { + data = htonl((credit & ~FlagsMask) | (flags & FlagsMask)); + } + + uint32_t credit() const { + return ntohl(data) & ~FlagsMask; + } + + uint32_t flags() const { + return ntohl(data) & FlagsMask; + } + }; + + const size_t FrameHeaderSize = sizeof(FrameHeader); + + // Structure for Connection Parameters on the network + // + // The original version (now called 0) of these parameters had a couple of mistakes: + // * No way to version the protocol (need to introduce a new protocol for iWarp) + // * Used host order int32 (but only deployed on LE archs as far as we know) + // so effectively was LE on the wire which is the opposite of network order. + // + // Fortunately the values sent were sufficiently restricted that a 16 bit short could + // be carved out to indicate the protocol version as these bits were always sent as 0. + // + // So the current version of parameters uses the last 2 bytes to indicate the protocol + // version, if this is 0 then we interpret the rest of the struct without byte swapping + // to remain compatible with the previous protocol. + struct NConnectionParams { + uint32_t maxRecvBufferSize; + uint16_t initialXmitCredit; + uint16_t rdmaProtocolVersion; + + NConnectionParams(const ConnectionParams& c) : + maxRecvBufferSize(c.rdmaProtocolVersion ? htonl(c.maxRecvBufferSize) : c.maxRecvBufferSize), + initialXmitCredit(c.rdmaProtocolVersion ? htons(c.initialXmitCredit) : c.initialXmitCredit), + // 0 is the same with/without byteswapping! + rdmaProtocolVersion(htons(c.rdmaProtocolVersion)) + {} + + operator ConnectionParams() const { + return + ConnectionParams( + rdmaProtocolVersion ? ntohl(maxRecvBufferSize) : maxRecvBufferSize, + rdmaProtocolVersion ? ntohs(initialXmitCredit) : initialXmitCredit, + ntohs(rdmaProtocolVersion)); + } + }; +# pragma pack(pop) + + class IOException : public std::exception { + std::string s; + + public: + IOException(std::string s0): s(s0) {} + ~IOException() throw() {} + + const char* what() const throw() { + return s.c_str(); + } + }; + + AsynchIO::AsynchIO( + QueuePair::intrusive_ptr q, + int version, + int size, + int xCredit, + int rCount, + ReadCallback rc, + IdleCallback ic, + FullCallback fc, + ErrorCallback ec + ) : + protocolVersion(version), + bufferSize(size), + recvCredit(0), + xmitCredit(xCredit), + recvBufferCount(rCount), + xmitBufferCount(xCredit), + outstandingWrites(0), + draining(false), + state(IDLE), + qp(q), + dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this), 0, 0), + readCallback(rc), + idleCallback(ic), + fullCallback(fc), + errorCallback(ec), + pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this)) + { + if (protocolVersion > maxSupportedProtocolVersion) + throw IOException("Unsupported Rdma Protocol"); + qp->nonblocking(); + qp->notifyRecv(); + qp->notifySend(); + + // Prepost recv buffers before we go any further + qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize); + + // Create xmit buffers, reserve space for frame header. + qp->createSendBuffers(xmitBufferCount, bufferSize, FrameHeaderSize); + } + + AsynchIO::~AsynchIO() { + // Warn if we are deleting whilst there are still unreclaimed write buffers + if ( outstandingWrites>0 ) + QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); + + // Turn off callbacks if necessary (before doing the deletes) + if (state != STOPPED) { + QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); + dataHandle.stopWatch(); + } + // TODO: It might turn out to be more efficient in high connection loads to reuse the + // buffers rather than having to reregister them all the time (this would be straightforward if all + // connections haver the same buffer size and harder otherwise) + } + + void AsynchIO::start(Poller::shared_ptr poller) { + dataHandle.startWatch(poller); + } + + // State constraints + // On entry: None + // On exit: STOPPED + // Mark for deletion/Delete this object when we have no outstanding writes + void AsynchIO::stop(NotifyCallback nc) { + ScopedLock<Mutex> l(stateLock); + state = STOPPED; + notifyCallback = nc; + dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this)); + } + + namespace { + void requestedCall(AsynchIO* aio, AsynchIO::RequestCallback callback) { + assert(callback); + callback(*aio); + } + } + + void AsynchIO::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + dataHandle.call(boost::bind(&requestedCall, this, callback)); + } + + // Mark writing closed (so we don't accept any more writes or make any idle callbacks) + void AsynchIO::drainWriteQueue(NotifyCallback nc) { + draining = true; + notifyCallback = nc; + } + + void AsynchIO::queueBuffer(Buffer* buff, int credit) { + switch (protocolVersion) { + case 0: + if (!buff) { + Buffer* ob = getSendBuffer(); + // Have to send something as adapters hate it when you try to transfer 0 bytes + *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit); + ob->dataCount(sizeof(uint32_t)); + qp->postSend(credit | IgnoreData, ob); + } else if (credit > 0) { + qp->postSend(credit, buff); + } else { + qp->postSend(buff); + } + break; + case 1: + if (!buff) + buff = getSendBuffer(); + // Add FrameHeader after frame data + FrameHeader header(credit); + assert(buff->dataCount() <= buff->byteCount()); // ensure app data doesn't impinge on reserved space. + ::memcpy(buff->bytes()+buff->dataCount(), &header, FrameHeaderSize); + buff->dataCount(buff->dataCount()+FrameHeaderSize); + qp->postSend(buff); + break; + } + } + + Buffer* AsynchIO::extractBuffer(const QueuePairEvent& e) { + Buffer* b = e.getBuffer(); + switch (protocolVersion) { + case 0: { + bool dataPresent = true; + // Get our xmitCredit if it was sent + if (e.immPresent() ) { + assert(xmitCredit>=0); + xmitCredit += (e.getImm() & ~FlagsMask); + dataPresent = ((e.getImm() & IgnoreData) == 0); + assert(xmitCredit>0); + } + if (!dataPresent) { + b->dataCount(0); + } + break; + } + case 1: + b->dataCount(b->dataCount()-FrameHeaderSize); + FrameHeader header; + ::memcpy(&header, b->bytes()+b->dataCount(), FrameHeaderSize); + assert(xmitCredit>=0); + xmitCredit += header.credit(); + assert(xmitCredit>=0); + break; + } + + return b; + } + + void AsynchIO::queueWrite(Buffer* buff) { + // Make sure we don't overrun our available buffers + // either at our end or the known available at the peers end + if (writable()) { + // TODO: We might want to batch up sending credit + int creditSent = recvCredit & ~FlagsMask; + queueBuffer(buff, creditSent); + recvCredit -= creditSent; + ++outstandingWrites; + --xmitCredit; + assert(xmitCredit>=0); + } else { + if (fullCallback) { + fullCallback(*this, buff); + } else { + QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away"); + returnSendBuffer(buff); + } + } + } + + // State constraints + // On entry: None + // On exit: NOTIFY_PENDING || STOPPED + void AsynchIO::notifyPendingWrite() { + ScopedLock<Mutex> l(stateLock); + switch (state) { + case IDLE: + dataHandle.call(pendingWriteAction); + // Fall Thru + case NOTIFY: + state = NOTIFY_PENDING; + break; + case NOTIFY_PENDING: + case STOPPED: + break; + } + } + + // State constraints + // On entry: IDLE || STOPPED + // On exit: IDLE || STOPPED + void AsynchIO::dataEvent() { + { + ScopedLock<Mutex> l(stateLock); + + if (state == STOPPED) return; + + state = NOTIFY_PENDING; + } + processCompletions(); + + writeEvent(); + } + + // State constraints + // On entry: NOTIFY_PENDING || STOPPED + // On exit: IDLE || STOPPED + void AsynchIO::writeEvent() { + State newState; + do { + { + ScopedLock<Mutex> l(stateLock); + + switch (state) { + case STOPPED: + return; + default: + state = NOTIFY; + } + } + + doWriteCallback(); + + { + ScopedLock<Mutex> l(stateLock); + + newState = state; + switch (newState) { + case NOTIFY_PENDING: + case STOPPED: + break; + default: + state = IDLE; + } + } + } while (newState == NOTIFY_PENDING); + } + + void AsynchIO::processCompletions() { + QueuePair::intrusive_ptr q = qp->getNextChannelEvent(); + + // Re-enable notification for queue: + // This needs to happen before we could do anything that could generate more work completion + // events (ie the callbacks etc. in the following). + // This can't make us reenter this code as the handle attached to the completion queue will still be + // disabled by the poller until we leave this code + qp->notifyRecv(); + qp->notifySend(); + + int recvEvents = 0; + int sendEvents = 0; + + // If no event do nothing + if (!q) + return; + + assert(q == qp); + + // Repeat until no more events + do { + QueuePairEvent e(qp->getNextEvent()); + if (!e) + break; + + ::ibv_wc_status status = e.getEventStatus(); + if (status != IBV_WC_SUCCESS) { + // Need special check for IBV_WC_WR_FLUSH_ERR here + // we will get this for every send/recv queue entry that was pending + // when disconnected, these aren't real errors and mostly need to be ignored + if (status == IBV_WC_WR_FLUSH_ERR) { + QueueDirection dir = e.getDirection(); + if (dir == SEND) { + Buffer* b = e.getBuffer(); + ++sendEvents; + returnSendBuffer(b); + --outstandingWrites; + } else { + ++recvEvents; + } + continue; + } + errorCallback(*this); + // TODO: Probably need to flush queues at this point + return; + } + + // Test if recv (or recv with imm) + //::ibv_wc_opcode eventType = e.getEventType(); + QueueDirection dir = e.getDirection(); + if (dir == RECV) { + ++recvEvents; + + Buffer* b = extractBuffer(e); + + // if there was no data sent then the message was only to update our credit + if ( b->dataCount() > 0 ) { + readCallback(*this, b); + } + + // At this point the buffer has been consumed so put it back on the recv queue + // TODO: Is this safe to do if the connection is disconnected already? + qp->postRecv(b); + + // Received another message + ++recvCredit; + + // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently) + if (recvCredit > recvBufferCount/2) { + // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message + // but this is a little unlikely, as to get in this state we have to have received messages without sending any + // for a while so its likely we've received an credit update from the far side. + if (writable()) { + int creditSent = recvCredit & ~FlagsMask; + queueBuffer(0, creditSent); + recvCredit -= creditSent; + ++outstandingWrites; + --xmitCredit; + assert(xmitCredit>=0); + } else { + QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit"); + } + } + } else { + Buffer* b = e.getBuffer(); + ++sendEvents; + returnSendBuffer(b); + --outstandingWrites; + } + } while (true); + + // Not sure if this is expected or not + if (recvEvents == 0 && sendEvents == 0) { + QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions"); + } + } + + void AsynchIO::doWriteCallback() { + // TODO: maybe don't call idle unless we're low on write buffers + // Keep on calling the idle routine as long as we are writable and we got something to write last call + + // Do callback even if there are no available free buffers as the application itself might be + // holding onto buffers + while (writable()) { + int xc = xmitCredit; + idleCallback(*this); + // Check whether we actually wrote anything + if (xmitCredit == xc) { + QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit); + return; + } + } + + checkDrained(); + } + + void AsynchIO::checkDrained() { + // If we've got all the write confirmations and we're draining + // We might get deleted in the drained callback so return immediately + if (draining) { + if (outstandingWrites == 0) { + draining = false; + NotifyCallback nc; + nc.swap(notifyCallback); + nc(*this); + } + return; + } + } + + void AsynchIO::doStoppedCallback() { + // Ensure we can't get any more callbacks (except for the stopped callback) + dataHandle.stopWatch(); + + NotifyCallback nc; + nc.swap(notifyCallback); + nc(*this); + } + + ConnectionManager::ConnectionManager( + ErrorCallback errc, + DisconnectedCallback dc + ) : + state(IDLE), + ci(Connection::make()), + handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0), + errorCallback(errc), + disconnectedCallback(dc) + { + QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager"); + ci->nonblocking(); + } + + ConnectionManager::~ConnectionManager() + { + QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager"); + } + + void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) { + startConnection(ci, addr); + handle.startWatch(poller); + } + + void ConnectionManager::doStoppedCallback() { + // Ensure we can't get any more callbacks (except for the stopped callback) + handle.stopWatch(); + + NotifyCallback nc; + nc.swap(notifyCallback); + nc(*this); + } + + void ConnectionManager::stop(NotifyCallback nc) { + state = STOPPED; + notifyCallback = nc; + handle.call(boost::bind(&ConnectionManager::doStoppedCallback, this)); + } + + void ConnectionManager::event(DispatchHandle&) { + if (state.get() == STOPPED) return; + connectionEvent(ci); + } + + Listener::Listener( + const ConnectionParams& cp, + EstablishedCallback ec, + ErrorCallback errc, + DisconnectedCallback dc, + ConnectionRequestCallback crc + ) : + ConnectionManager(errc, dc), + checkConnectionParams(cp), + connectionRequestCallback(crc), + establishedCallback(ec) + { + } + + void Listener::startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) { + ci->bind(addr); + ci->listen(); + } + + namespace { + const int64_t PoisonContext = -1; + } + + void Listener::connectionEvent(Connection::intrusive_ptr ci) { + ConnectionEvent e(ci->getNextEvent()); + + // If (for whatever reason) there was no event do nothing + if (!e) + return; + + // Important documentation ommision the new rdma_cm_id + // you get from CONNECT_REQUEST has the same context info + // as its parent listening rdma_cm_id + ::rdma_cm_event_type eventType = e.getEventType(); + ::rdma_conn_param conn_param = e.getConnectionParam(); + Rdma::Connection::intrusive_ptr id = e.getConnection(); + + // Check for previous disconnection (it appears that you actually can get connection + // request events after a disconnect event in rare circumstances) + if (reinterpret_cast<int64_t>(id->getContext<void*>())==PoisonContext) + return; + + switch (eventType) { + case RDMA_CM_EVENT_CONNECT_REQUEST: { + // Make sure peer has sent params we can use + if (!conn_param.private_data || conn_param.private_data_len < sizeof(NConnectionParams)) { + QPID_LOG(warning, "Rdma: rejecting connection attempt: unusable connection parameters"); + id->reject(); + break; + } + + const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data); + ConnectionParams cp = *rcp; + + // Reject if requested msg size is bigger than we allow + if ( + cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize || + cp.initialXmitCredit > checkConnectionParams.initialXmitCredit + ) { + QPID_LOG(warning, "Rdma: rejecting connection attempt: connection parameters out of range: (" + << cp.maxRecvBufferSize << ">" << checkConnectionParams.maxRecvBufferSize << " || " + << cp.initialXmitCredit << ">" << checkConnectionParams.initialXmitCredit + << ")"); + id->reject(&checkConnectionParams); + break; + } + + bool accept = true; + if (connectionRequestCallback) + accept = connectionRequestCallback(id, cp); + + if (accept) { + // Accept connection + cp.initialXmitCredit = checkConnectionParams.initialXmitCredit; + id->accept(conn_param, rcp); + } else { + // Reject connection + QPID_LOG(warning, "Rdma: rejecting connection attempt: application policy"); + id->reject(); + } + break; + } + case RDMA_CM_EVENT_ESTABLISHED: + establishedCallback(id); + break; + case RDMA_CM_EVENT_DISCONNECTED: + disconnectedCallback(id); + // Poison the id context so that we do no more callbacks on it + id->removeContext(); + id->addContext(reinterpret_cast<void*>(PoisonContext)); + break; + case RDMA_CM_EVENT_CONNECT_ERROR: + errorCallback(id, CONNECT_ERROR); + break; + default: + // Unexpected response + errorCallback(id, UNKNOWN); + //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; + } + } + + Connector::Connector( + const ConnectionParams& cp, + ConnectedCallback cc, + ErrorCallback errc, + DisconnectedCallback dc, + RejectedCallback rc + ) : + ConnectionManager(errc, dc), + connectionParams(cp), + rejectedCallback(rc), + connectedCallback(cc) + { + } + + void Connector::startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) { + ci->resolve_addr(addr); + } + + void Connector::connectionEvent(Connection::intrusive_ptr ci) { + ConnectionEvent e(ci->getNextEvent()); + + // If (for whatever reason) there was no event do nothing + if (!e) + return; + + ::rdma_cm_event_type eventType = e.getEventType(); + ::rdma_conn_param conn_param = e.getConnectionParam(); + Rdma::Connection::intrusive_ptr id = e.getConnection(); + switch (eventType) { + case RDMA_CM_EVENT_ADDR_RESOLVED: + // RESOLVE_ADDR + ci->resolve_route(); + break; + case RDMA_CM_EVENT_ADDR_ERROR: + // RESOLVE_ADDR + errorCallback(ci, ADDR_ERROR); + break; + case RDMA_CM_EVENT_ROUTE_RESOLVED: { + // RESOLVE_ROUTE: + NConnectionParams rcp(connectionParams); + ci->connect(&rcp); + break; + } + case RDMA_CM_EVENT_ROUTE_ERROR: + // RESOLVE_ROUTE: + errorCallback(ci, ROUTE_ERROR); + break; + case RDMA_CM_EVENT_CONNECT_ERROR: + // CONNECTING + errorCallback(ci, CONNECT_ERROR); + break; + case RDMA_CM_EVENT_UNREACHABLE: + // CONNECTING + errorCallback(ci, UNREACHABLE); + break; + case RDMA_CM_EVENT_REJECTED: { + // CONNECTING + + // We can get this event if our peer is not running on the other side + // in this case we could get nearly anything in the private data: + // From private_data == 0 && private_data_len == 0 (Chelsio iWarp) + // to 148 bytes of zeros (Mellanox IB) + // + // So assume that if the the private data is absent or not the size of + // the connection parameters it isn't valid + ConnectionParams cp(0, 0, 0); + if (conn_param.private_data && conn_param.private_data_len == sizeof(NConnectionParams)) { + // Extract private data from event + const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data); + cp = *rcp; + } + rejectedCallback(ci, cp); + break; + } + case RDMA_CM_EVENT_ESTABLISHED: { + // CONNECTING + // Extract private data from event + assert(conn_param.private_data && conn_param.private_data_len >= sizeof(NConnectionParams)); + const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data); + ConnectionParams cp = *rcp; + connectedCallback(ci, cp); + break; + } + case RDMA_CM_EVENT_DISCONNECTED: + // ESTABLISHED + disconnectedCallback(ci); + break; + default: + QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType); + } + } +} diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h new file mode 100644 index 0000000000..ec9caaf08d --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -0,0 +1,250 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef Rdma_Acceptor_h +#define Rdma_Acceptor_h + +#include "qpid/sys/rdma/rdma_wrap.h" + +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/SocketAddress.h" + +#include <netinet/in.h> + +#include <boost/function.hpp> + +namespace Rdma { + + class Connection; + + class AsynchIO + { + typedef boost::function1<void, AsynchIO&> ErrorCallback; + typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; + typedef boost::function1<void, AsynchIO&> IdleCallback; + typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback; + typedef boost::function1<void, AsynchIO&> NotifyCallback; + + int protocolVersion; + int bufferSize; + int recvCredit; + int xmitCredit; + int recvBufferCount; + int xmitBufferCount; + int outstandingWrites; + bool draining; + enum State {IDLE, NOTIFY, NOTIFY_PENDING, STOPPED}; + State state; + qpid::sys::Mutex stateLock; + QueuePair::intrusive_ptr qp; + qpid::sys::DispatchHandleRef dataHandle; + + ReadCallback readCallback; + IdleCallback idleCallback; + FullCallback fullCallback; + ErrorCallback errorCallback; + NotifyCallback notifyCallback; + qpid::sys::DispatchHandle::Callback pendingWriteAction; + + public: + typedef boost::function1<void, AsynchIO&> RequestCallback; + + // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use + // for buffers both read and write (allocate half to each up front) and fail if we cannot allocate that much + // locked memory + AsynchIO( + QueuePair::intrusive_ptr q, + int version, + int size, + int xCredit, + int rCount, + ReadCallback rc, + IdleCallback ic, + FullCallback fc, + ErrorCallback ec + ); + ~AsynchIO(); + + void start(qpid::sys::Poller::shared_ptr poller); + bool writable() const; + void queueWrite(Buffer* buff); + void notifyPendingWrite(); + void drainWriteQueue(NotifyCallback); + void stop(NotifyCallback); + void requestCallback(RequestCallback); + int incompletedWrites() const; + Buffer* getSendBuffer(); + void returnSendBuffer(Buffer*); + + private: + const static int maxSupportedProtocolVersion = 1; + + // Constants for the peer-peer command messages + // These are sent in the high bits if the imm data of an rdma message + // The low bits are used to send the credit + const static int FlagsMask = 0xF0000000; // Mask for all flag bits - be sure to update this if you add more command bits + const static int IgnoreData = 0x10000000; // Message contains no application data + + void dataEvent(); + void writeEvent(); + void processCompletions(); + void doWriteCallback(); + void checkDrained(); + void doStoppedCallback(); + + void queueBuffer(Buffer* buff, int credit); + Buffer* extractBuffer(const QueuePairEvent& e); + }; + + // We're only writable if: + // * not draining write queue + // * we've got space in the transmit queue + // * we've got credit to transmit + // * if there's only 1 transmit credit we must send some credit + inline bool AsynchIO::writable() const { + assert(xmitCredit>=0); + return !draining && + outstandingWrites < xmitBufferCount && + xmitCredit > 0 && + ( xmitCredit > 1 || recvCredit > 0); + } + + inline int AsynchIO::incompletedWrites() const { + return outstandingWrites; + } + + inline Buffer* AsynchIO::getSendBuffer() { + return qp->getSendBuffer(); + } + + inline void AsynchIO::returnSendBuffer(Buffer* b) { + qp->returnSendBuffer(b); + } + + // These are the parameters necessary to start the conversation + // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer + // * Each peer HAS to know the initial "credit" it has for transmitting to its peer + struct ConnectionParams { + uint32_t maxRecvBufferSize; + uint16_t initialXmitCredit; + uint16_t rdmaProtocolVersion; + + // Default to protocol version 1 + ConnectionParams(uint32_t s, uint16_t c, uint16_t v = 1) : + maxRecvBufferSize(s), + initialXmitCredit(c), + rdmaProtocolVersion(v) + {} + }; + + enum ErrorType { + ADDR_ERROR, + ROUTE_ERROR, + CONNECT_ERROR, + UNREACHABLE, + UNKNOWN + }; + + typedef boost::function2<void, Rdma::Connection::intrusive_ptr, ErrorType> ErrorCallback; + typedef boost::function1<void, Rdma::Connection::intrusive_ptr> DisconnectedCallback; + + class ConnectionManager { + typedef boost::function1<void, ConnectionManager&> NotifyCallback; + + enum State {IDLE, STOPPED}; + qpid::sys::AtomicValue<State> state; + Connection::intrusive_ptr ci; + qpid::sys::DispatchHandleRef handle; + NotifyCallback notifyCallback; + + protected: + ErrorCallback errorCallback; + DisconnectedCallback disconnectedCallback; + + public: + ConnectionManager( + ErrorCallback errc, + DisconnectedCallback dc + ); + + virtual ~ConnectionManager(); + + void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr); + void stop(NotifyCallback); + + private: + void event(qpid::sys::DispatchHandle& handle); + void doStoppedCallback(); + + virtual void startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) = 0; + virtual void connectionEvent(Connection::intrusive_ptr ci) = 0; + }; + + typedef boost::function2<bool, Rdma::Connection::intrusive_ptr, const ConnectionParams&> ConnectionRequestCallback; + typedef boost::function1<void, Rdma::Connection::intrusive_ptr> EstablishedCallback; + + class Listener : public ConnectionManager + { + ConnectionParams checkConnectionParams; + ConnectionRequestCallback connectionRequestCallback; + EstablishedCallback establishedCallback; + + public: + Listener( + const ConnectionParams& cp, + EstablishedCallback ec, + ErrorCallback errc, + DisconnectedCallback dc, + ConnectionRequestCallback crc = 0 + ); + + private: + void startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr); + void connectionEvent(Connection::intrusive_ptr ci); + }; + + typedef boost::function2<void, Rdma::Connection::intrusive_ptr, const ConnectionParams&> RejectedCallback; + typedef boost::function2<void, Rdma::Connection::intrusive_ptr, const ConnectionParams&> ConnectedCallback; + + class Connector : public ConnectionManager + { + ConnectionParams connectionParams; + RejectedCallback rejectedCallback; + ConnectedCallback connectedCallback; + + public: + Connector( + const ConnectionParams& cp, + ConnectedCallback cc, + ErrorCallback errc, + DisconnectedCallback dc, + RejectedCallback rc = 0 + ); + + private: + void startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr); + void connectionEvent(Connection::intrusive_ptr ci); + }; +} + +#endif // Rdma_Acceptor_h diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp new file mode 100644 index 0000000000..9b0710fd8f --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -0,0 +1,210 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Thread.h" +#include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/rdma/rdma_exception.h" + +#include <arpa/inet.h> + +#include <vector> +#include <queue> +#include <string> +#include <iostream> + +#include <boost/bind.hpp> + +using std::vector; +using std::queue; +using std::string; +using std::cout; +using std::cerr; + +using qpid::sys::Thread; +using qpid::sys::SocketAddress; +using qpid::sys::Poller; + +// All the accepted connections +namespace qpid { +namespace tests { + +struct Buffer { + char* bytes() const {return bytes_;} + int32_t byteCount() const {return size;} + + Buffer(const int32_t s): + bytes_(new char[s]), + size(s) + { + } + + ~Buffer() { + delete [] bytes_; + } +private: + char* bytes_; + int32_t size; +}; + +struct ConRec { + Rdma::Connection::intrusive_ptr connection; + Rdma::AsynchIO* data; + queue<Buffer*> queuedWrites; + + ConRec(Rdma::Connection::intrusive_ptr c) : + connection(c) + {} +}; + +void dataError(Rdma::AsynchIO&) { + cout << "Data error:\n"; +} + +void idle(ConRec* cr, Rdma::AsynchIO& a) { + // Need to make sure full is not called as it would reorder messages + while (!cr->queuedWrites.empty() && a.writable()) { + Rdma::Buffer* rbuf = a.getSendBuffer(); + if (!rbuf) break; + Buffer* buf = cr->queuedWrites.front(); + cr->queuedWrites.pop(); + std::copy(buf->bytes(), buf->bytes()+buf->byteCount(), rbuf->bytes()); + rbuf->dataCount(buf->byteCount()); + delete buf; + a.queueWrite(rbuf); + } +} + +void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { + // Echo data back + Rdma::Buffer* buf = 0; + if (cr->queuedWrites.empty() && a.writable()) { + buf = a.getSendBuffer(); + } + if (buf) { + std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes()); + buf->dataCount(b->dataCount()); + a.queueWrite(buf); + } else { + Buffer* buf = new Buffer(b->dataCount()); + std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes()); + cr->queuedWrites.push(buf); + // Try to empty queue + idle(cr, a); + } +} + +void full(ConRec*, Rdma::AsynchIO&, Rdma::Buffer*) { + // Shouldn't ever be called + cout << "!"; +} + +void drained(Rdma::AsynchIO&) { + cout << "Drained:\n"; +} + +void disconnected(Rdma::Connection::intrusive_ptr& ci) { + ConRec* cr = ci->getContext<ConRec>(); + cr->connection->disconnect(); + cr->data->drainWriteQueue(drained); + delete cr; + cout << "Disconnected: " << cr << "\n"; +} + +void connectionError(Rdma::Connection::intrusive_ptr& ci, Rdma::ErrorType) { + ConRec* cr = ci->getContext<ConRec>(); + cr->connection->disconnect(); + if (cr) { + cr->data->drainWriteQueue(drained); + delete cr; + } + cout << "Connection error: " << cr << "\n"; +} + +bool connectionRequest(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { + cout << "Incoming connection: "; + + // For fun reject alternate connection attempts + static bool x = false; + x = true; + + // Must create aio here so as to prepost buffers *before* we accept connection + if (x) { + ConRec* cr = new ConRec(ci); + Rdma::AsynchIO* aio = + new Rdma::AsynchIO(ci->getQueuePair(), + cp.rdmaProtocolVersion, + cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, + boost::bind(data, cr, _1, _2), + boost::bind(idle, cr, _1), + boost::bind(full, cr, _1, _2), + dataError); + ci->addContext(cr); + cr->data = aio; + cout << "Accept=>" << cr << "\n"; + } else { + cout << "Reject\n"; + } + + return x; +} + +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { + static int cnt = 0; + ConRec* cr = ci->getContext<ConRec>(); + cout << "Connected: " << cr << "(" << ++cnt << ")\n"; + + cr->data->start(poller); +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char* argv[]) { + vector<string> args(&argv[0], &argv[argc]); + + std::string port = (args.size() < 2) ? "20079" : args[1]; + cout << "Listening on port: " << port << "\n"; + + try { + boost::shared_ptr<Poller> p(new Poller()); + + Rdma::Listener a( + Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(connected, p, _1), + connectionError, + disconnected, + connectionRequest); + + + SocketAddress sa("", port); + a.start(p, sa); + + // The poller loop blocks all signals so run in its own thread + Thread t(*p); + + ::pause(); + p->shutdown(); + t.join(); + } catch (Rdma::Exception& e) { + int err = e.getError(); + cerr << "Error: " << e.what() << "(" << err << ")\n"; + } +} diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h b/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h new file mode 100644 index 0000000000..a3a289e38a --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef RDMA_EXCEPTION_H +#define RDMA_EXCEPTION_H + +#include <exception> + +#include <errno.h> +#include <string.h> + +namespace Rdma { + static __thread char s[50]; + class Exception : public std::exception { + int err; + + public: + Exception(int e) : err(e) {} + int getError() { return err; } + const char* what() const throw() { + return ::strerror_r(err, s, 50); + } + }; + + inline void THROW_ERRNO() { + throw Rdma::Exception(errno); + } + + inline void CHECK(int rc) { + if (rc != 0) + throw Rdma::Exception((rc == -1) ? errno : rc >0 ? rc : -rc); + } + + inline int GETERR(int rc) { + return (rc == -1) ? errno : rc > 0 ? rc : -rc; + } + + inline void CHECK_IBV(int rc) { + if (rc != 0) + throw Rdma::Exception(rc); + } + + template <typename T> + inline + T* CHECK_NULL(T* rc) { + if (rc == 0) + THROW_ERRNO(); + return rc; + } +} + +#endif // RDMA_EXCEPTION_H diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp b/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp new file mode 100644 index 0000000000..a66f5b4035 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/rdma/rdma_factories.h" + +#include "qpid/sys/rdma/rdma_exception.h" + + +namespace Rdma { + // Intentionally ignore return values for these functions + // - we can't do anything about then anyway + void acker(::rdma_cm_event* e) throw () { + if (e) (void) ::rdma_ack_cm_event(e); + } + + void destroyEChannel(::rdma_event_channel* c) throw () { + if (c) (void) ::rdma_destroy_event_channel(c); + } + + void destroyId(::rdma_cm_id* i) throw () { + if (i) (void) ::rdma_destroy_id(i); + } + + void deallocPd(::ibv_pd* p) throw () { + if (p) (void) ::ibv_dealloc_pd(p); + } + + void deregMr(::ibv_mr* mr) throw () { + if (mr) (void) ::ibv_dereg_mr(mr); + } + + void destroyCChannel(::ibv_comp_channel* c) throw () { + if (c) (void) ::ibv_destroy_comp_channel(c); + } + + void destroyCq(::ibv_cq* cq) throw () { + if (cq) (void) ::ibv_destroy_cq(cq); + } + + void destroyQp(::ibv_qp* qp) throw () { + if (qp) (void) ::ibv_destroy_qp(qp); + } + + boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i) { + return boost::shared_ptr< ::rdma_cm_id >(i, destroyId); + } + + boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e) { + return boost::shared_ptr< ::rdma_cm_event >(e, acker); + } + + boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp) { + return boost::shared_ptr< ::ibv_qp > (qp, destroyQp); + } + + boost::shared_ptr< ::rdma_event_channel > mkEChannel() { + ::rdma_event_channel* c = CHECK_NULL(::rdma_create_event_channel()); + return boost::shared_ptr< ::rdma_event_channel >(c, destroyEChannel); + } + + boost::shared_ptr< ::rdma_cm_id > + mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) { + ::rdma_cm_id* i; + CHECK(::rdma_create_id(ec, &i, context, ps)); + return mkId(i); + } + + boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) { + ::ibv_pd* pd = CHECK_NULL(::ibv_alloc_pd(c)); + return boost::shared_ptr< ::ibv_pd >(pd, deallocPd); + } + + boost::shared_ptr< ::ibv_mr > regMr(::ibv_pd* pd, void* addr, size_t length, ::ibv_access_flags access) { + ::ibv_mr* mr = CHECK_NULL(::ibv_reg_mr(pd, addr, length, access)); + return boost::shared_ptr< ::ibv_mr >(mr, deregMr); + } + + boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) { + ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c)); + return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel); + } + + boost::shared_ptr< ::ibv_cq > + mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) { + ::ibv_cq* cq = CHECK_NULL(::ibv_create_cq(c, cqe, context, cc, 0)); + return boost::shared_ptr< ::ibv_cq >(cq, destroyCq); + } +} diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h b/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h new file mode 100644 index 0000000000..bfca71fc7e --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef RDMA_FACTORIES_H +#define RDMA_FACTORIES_H + +#include <rdma/rdma_cma.h> + +#include <boost/shared_ptr.hpp> + +namespace Rdma { + boost::shared_ptr< ::rdma_event_channel > mkEChannel(); + boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps); + boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i); + boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e); + boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp); + boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c); + boost::shared_ptr< ::ibv_mr > regMr(::ibv_pd* pd, void* addr, size_t length, ::ibv_access_flags access); + boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c); + boost::shared_ptr< ::ibv_cq > mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc); +} + +#endif // RDMA_FACTORIES_H diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp new file mode 100644 index 0000000000..efe454c5be --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -0,0 +1,566 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/rdma/rdma_wrap.h" + +#include "qpid/sys/rdma/rdma_factories.h" +#include "qpid/sys/rdma/rdma_exception.h" + +#include "qpid/sys/posix/PrivatePosix.h" + +#include <fcntl.h> +#include <netdb.h> + +#include <iostream> +#include <stdexcept> + +namespace Rdma { + const ::rdma_conn_param DEFAULT_CONNECT_PARAM = { + 0, // .private_data + 0, // .private_data_len + 4, // .responder_resources + 4, // .initiator_depth + 0, // .flow_control + 5, // .retry_count + 7 // .rnr_retry_count + }; + + // This is moderately inefficient so don't use in a critical path + int deviceCount() { + int count; + ::ibv_free_device_list(::ibv_get_device_list(&count)); + return count; + } + + Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, + const int32_t reserve) : + bufferSize(byteCount + reserve), reserved(reserve) + { + sge.addr = (uintptr_t) bytes; + sge.length = 0; + sge.lkey = lkey; + } + + QueuePairEvent::QueuePairEvent() : + dir(NONE) + {} + + QueuePairEvent::QueuePairEvent( + const ::ibv_wc& w, + boost::shared_ptr< ::ibv_cq > c, + QueueDirection d) : + cq(c), + wc(w), + dir(d) + { + assert(dir != NONE); + } + + QueuePairEvent::operator bool() const { + return dir != NONE; + } + + bool QueuePairEvent::immPresent() const { + return wc.wc_flags & IBV_WC_WITH_IMM; + } + + uint32_t QueuePairEvent::getImm() const { + return ntohl(wc.imm_data); + } + + QueueDirection QueuePairEvent::getDirection() const { + return dir; + } + + ::ibv_wc_opcode QueuePairEvent::getEventType() const { + return wc.opcode; + } + + ::ibv_wc_status QueuePairEvent::getEventStatus() const { + return wc.status; + } + + Buffer* QueuePairEvent::getBuffer() const { + Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id); + b->dataCount(wc.byte_len); + return b; + } + + QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + pd(allocPd(i->verbs)), + cchannel(mkCChannel(i->verbs)), + scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), + rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), + outstandingSendEvents(0), + outstandingRecvEvents(0) + { + impl->fd = cchannel->fd; + + // Set cq context to this QueuePair object so we can find + // ourselves again + scq->cq_context = this; + rcq->cq_context = this; + + ::ibv_device_attr dev_attr; + CHECK(::ibv_query_device(i->verbs, &dev_attr)); + + ::ibv_qp_init_attr qp_attr = {}; + + // TODO: make a default struct for this + qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES; + qp_attr.cap.max_send_sge = 1; + qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES; + qp_attr.cap.max_recv_sge = 1; + + qp_attr.send_cq = scq.get(); + qp_attr.recv_cq = rcq.get(); + qp_attr.qp_type = IBV_QPT_RC; + + CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr)); + qp = mkQp(i->qp); + + // Set the qp context to this so we can find ourselves again + qp->qp_context = this; + } + + QueuePair::~QueuePair() { + // Reset back pointer in case someone else has the qp + qp->qp_context = 0; + + // Dispose queue pair before we ack events + qp.reset(); + + if (outstandingSendEvents > 0) + ::ibv_ack_cq_events(scq.get(), outstandingSendEvents); + if (outstandingRecvEvents > 0) + ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents); + + // Deallocate recv buffer memory + if (rmr) delete [] static_cast<char*>(rmr->addr); + + // Deallocate recv buffer memory + if (smr) delete [] static_cast<char*>(smr->addr); + + // The buffers vectors automatically deletes all the buffers we've allocated + } + + // Create buffers to use for writing + void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved) + { + assert(!smr); + + // Round up buffersize to cacheline (64 bytes) + int dataLength = (bufferSize+reserved+63) & (~63); + + // Allocate memory block for all receive buffers + char* mem = new char [sendBufferCount * dataLength]; + smr = regMr(pd.get(), mem, sendBufferCount * dataLength, ::IBV_ACCESS_LOCAL_WRITE); + sendBuffers.reserve(sendBufferCount); + freeBuffers.reserve(sendBufferCount); + for (int i = 0; i<sendBufferCount; ++i) { + // Allocate xmit buffer + sendBuffers.push_back(Buffer(smr->lkey, &mem[i*dataLength], bufferSize, reserved)); + freeBuffers.push_back(i); + } + } + + Buffer* QueuePair::getSendBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock); + if (freeBuffers.empty()) + return 0; + int i = freeBuffers.back(); + freeBuffers.pop_back(); + assert(i >= 0 && i < int(sendBuffers.size())); + Buffer* b = &sendBuffers[i]; + b->dataCount(0); + return b; + } + + void QueuePair::returnSendBuffer(Buffer* b) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock); + int i = b - &sendBuffers[0]; + assert(i >= 0 && i < int(sendBuffers.size())); + freeBuffers.push_back(i); + } + + void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize) + { + assert(!rmr); + + // Round up buffersize to cacheline (64 bytes) + bufferSize = (bufferSize+63) & (~63); + + // Allocate memory block for all receive buffers + char* mem = new char [recvBufferCount * bufferSize]; + rmr = regMr(pd.get(), mem, recvBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE); + recvBuffers.reserve(recvBufferCount); + for (int i = 0; i<recvBufferCount; ++i) { + // Allocate recv buffer + recvBuffers.push_back(Buffer(rmr->lkey, &mem[i*bufferSize], bufferSize)); + postRecv(&recvBuffers[i]); + } + } + + // Make channel non-blocking by making + // associated fd nonblocking + void QueuePair::nonblocking() { + ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK); + } + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + QueuePair::intrusive_ptr QueuePair::getNextChannelEvent() { + // First find out which cq has the event + ::ibv_cq* cq; + void* ctx; + int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx); + if (rc == -1 && errno == EAGAIN) + return 0; + CHECK(rc); + + // Batch acknowledge the event + if (cq == scq.get()) { + if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) { + ::ibv_ack_cq_events(cq, outstandingSendEvents); + outstandingSendEvents = 0; + } + } else if (cq == rcq.get()) { + if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) { + ::ibv_ack_cq_events(cq, outstandingRecvEvents); + outstandingRecvEvents = 0; + } + } + + return static_cast<QueuePair*>(ctx); + } + + QueuePairEvent QueuePair::getNextEvent() { + ::ibv_wc w; + if (::ibv_poll_cq(scq.get(), 1, &w) == 1) + return QueuePairEvent(w, scq, SEND); + else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1) + return QueuePairEvent(w, rcq, RECV); + else + return QueuePairEvent(); + } + + void QueuePair::notifyRecv() { + CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0)); + } + + void QueuePair::notifySend() { + CHECK_IBV(ibv_req_notify_cq(scq.get(), 0)); + } + + void QueuePair::postRecv(Buffer* buf) { + ::ibv_recv_wr rwr = {}; + + rwr.wr_id = reinterpret_cast<uint64_t>(buf); + // We are given the whole buffer + buf->dataCount(buf->byteCount()); + rwr.sg_list = &buf->sge; + rwr.num_sge = 1; + + ::ibv_recv_wr* badrwr = 0; + CHECK(::ibv_post_recv(qp.get(), &rwr, &badrwr)); + if (badrwr) + throw std::logic_error("ibv_post_recv(): Bad rwr"); + } + + void QueuePair::postSend(Buffer* buf) { + ::ibv_send_wr swr = {}; + + swr.wr_id = reinterpret_cast<uint64_t>(buf); + swr.opcode = IBV_WR_SEND; + swr.send_flags = IBV_SEND_SIGNALED; + swr.sg_list = &buf->sge; + swr.num_sge = 1; + + ::ibv_send_wr* badswr = 0; + CHECK(::ibv_post_send(qp.get(), &swr, &badswr)); + if (badswr) + throw std::logic_error("ibv_post_send(): Bad swr"); + } + + void QueuePair::postSend(uint32_t imm, Buffer* buf) { + ::ibv_send_wr swr = {}; + + swr.wr_id = reinterpret_cast<uint64_t>(buf); + swr.imm_data = htonl(imm); + swr.opcode = IBV_WR_SEND_WITH_IMM; + swr.send_flags = IBV_SEND_SIGNALED; + swr.sg_list = &buf->sge; + swr.num_sge = 1; + + ::ibv_send_wr* badswr = 0; + CHECK(::ibv_post_send(qp.get(), &swr, &badswr)); + if (badswr) + throw std::logic_error("ibv_post_send(): Bad swr"); + } + + ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) : + id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ? + Connection::find(e->id) : new Connection(e->id)), + listen_id(Connection::find(e->listen_id)), + event(mkEvent(e)) + {} + + ConnectionEvent::operator bool() const { + return event; + } + + ::rdma_cm_event_type ConnectionEvent::getEventType() const { + return event->event; + } + + ::rdma_conn_param ConnectionEvent::getConnectionParam() const { + // It's badly documented, but it seems from the librdma source code that all the following + // event types have a valid param.conn + switch (event->event) { + case RDMA_CM_EVENT_CONNECT_REQUEST: + case RDMA_CM_EVENT_ESTABLISHED: + case RDMA_CM_EVENT_REJECTED: + case RDMA_CM_EVENT_DISCONNECTED: + case RDMA_CM_EVENT_CONNECT_ERROR: + return event->param.conn; + default: + ::rdma_conn_param p = {}; + return p; + } + } + + boost::intrusive_ptr<Connection> ConnectionEvent::getConnection () const { + return id; + } + + boost::intrusive_ptr<Connection> ConnectionEvent::getListenId() const { + return listen_id; + } + + // Wrap the passed in rdma_cm_id with a Connection + // this basically happens only on connection request + Connection::Connection(::rdma_cm_id* i) : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + id(mkId(i)), + context(0) + { + impl->fd = id->channel->fd; + + // Just overwrite the previous context as it will + // have come from the listening connection + if (i) + i->context = this; + } + + Connection::Connection() : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + channel(mkEChannel()), + id(mkId(channel.get(), this, RDMA_PS_TCP)), + context(0) + { + impl->fd = channel->fd; + } + + Connection::~Connection() { + // Reset the id context in case someone else has it + id->context = 0; + } + + void Connection::ensureQueuePair() { + assert(id.get()); + + // Only allocate a queue pair if there isn't one already + if (qp) + return; + + qp = new QueuePair(id); + } + + Connection::intrusive_ptr Connection::make() { + return new Connection(); + } + + Connection::intrusive_ptr Connection::find(::rdma_cm_id* i) { + if (!i) + return 0; + Connection* id = static_cast< Connection* >(i->context); + if (!id) + throw std::logic_error("Couldn't find existing Connection"); + return id; + } + + // Make channel non-blocking by making + // associated fd nonblocking + void Connection::nonblocking() { + assert(id.get()); + ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK); + } + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + ConnectionEvent Connection::getNextEvent() { + assert(id.get()); + ::rdma_cm_event* e; + int rc = ::rdma_get_cm_event(id->channel, &e); + if (GETERR(rc) == EAGAIN) + return ConnectionEvent(); + CHECK(rc); + return ConnectionEvent(e); + } + + void Connection::bind(const qpid::sys::SocketAddress& src_addr) const { + assert(id.get()); + CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr)); + } + + void Connection::listen(int backlog) const { + assert(id.get()); + CHECK(::rdma_listen(id.get(), backlog)); + } + + void Connection::resolve_addr( + const qpid::sys::SocketAddress& dst_addr, + int timeout_ms) const + { + assert(id.get()); + CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms)); + } + + void Connection::resolve_route(int timeout_ms) const { + assert(id.get()); + CHECK(::rdma_resolve_route(id.get(), timeout_ms)); + } + + void Connection::disconnect() const { + assert(id.get()); + int rc = ::rdma_disconnect(id.get()); + // iWarp doesn't let you disconnect a disconnected connection + // but Infiniband can do so it's okay to call rdma_disconnect() + // in response to a disconnect event, but we may get an error + if (GETERR(rc) == EINVAL) + return; + CHECK(rc); + } + + // TODO: Currently you can only connect with the default connection parameters + void Connection::connect(const void* data, size_t len) { + assert(id.get()); + // Need to have a queue pair before we can connect + ensureQueuePair(); + + ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; + p.private_data = data; + p.private_data_len = len; + CHECK(::rdma_connect(id.get(), &p)); + } + + void Connection::connect() { + connect(0, 0); + } + + void Connection::accept(const ::rdma_conn_param& param, const void* data, size_t len) { + assert(id.get()); + // Need to have a queue pair before we can accept + ensureQueuePair(); + + ::rdma_conn_param p = param; + p.private_data = data; + p.private_data_len = len; + CHECK(::rdma_accept(id.get(), &p)); + } + + void Connection::accept(const ::rdma_conn_param& param) { + accept(param, 0, 0); + } + + void Connection::reject(const void* data, size_t len) const { + assert(id.get()); + CHECK(::rdma_reject(id.get(), data, len)); + } + + void Connection::reject() const { + assert(id.get()); + CHECK(::rdma_reject(id.get(), 0, 0)); + } + + QueuePair::intrusive_ptr Connection::getQueuePair() { + assert(id.get()); + + ensureQueuePair(); + + return qp; + } + + std::string Connection::getLocalName() const { + ::sockaddr* addr = ::rdma_get_local_addr(id.get()); + char hostName[NI_MAXHOST]; + char portName[NI_MAXSERV]; + CHECK_IBV(::getnameinfo( + addr, sizeof(::sockaddr_storage), + hostName, sizeof(hostName), + portName, sizeof(portName), + NI_NUMERICHOST | NI_NUMERICSERV)); + std::string r(hostName); + r += ":"; + r += portName; + return r; + } + + std::string Connection::getPeerName() const { + ::sockaddr* addr = ::rdma_get_peer_addr(id.get()); + char hostName[NI_MAXHOST]; + char portName[NI_MAXSERV]; + CHECK_IBV(::getnameinfo( + addr, sizeof(::sockaddr_storage), + hostName, sizeof(hostName), + portName, sizeof(portName), + NI_NUMERICHOST | NI_NUMERICSERV)); + std::string r(hostName); + r += ":"; + r += portName; + return r; + } +} + +std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) { +# define CHECK_TYPE(t) case t: o << #t; break; + switch(t) { + CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED) + CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED) + CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE) + CHECK_TYPE(RDMA_CM_EVENT_REJECTED) + CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED) + CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED) + CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL) + CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN) + CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR) + default: + o << "UNKNOWN_EVENT"; + } +# undef CHECK_TYPE + return o; +} diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h new file mode 100644 index 0000000000..8e3429027b --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -0,0 +1,287 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef RDMA_WRAP_H +#define RDMA_WRAP_H + +#include <rdma/rdma_cma.h> + +#include "qpid/RefCounted.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" + +#include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/ptr_container/ptr_deque.hpp> + +#include <vector> + +namespace qpid { +namespace sys { + class SocketAddress; +}} + +namespace Rdma { + const int DEFAULT_TIMEOUT = 2000; // 2 secs + const int DEFAULT_BACKLOG = 100; + const int DEFAULT_CQ_ENTRIES = 256; + const int DEFAULT_WR_ENTRIES = 64; + extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM; + + int deviceCount(); + + struct Buffer { + friend class QueuePair; + friend class QueuePairEvent; + + char* bytes() const; + int32_t byteCount() const; + int32_t dataCount() const; + void dataCount(int32_t); + + private: + Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0); + int32_t bufferSize; + int32_t reserved; // for framing header + ::ibv_sge sge; + }; + + inline char* Buffer::bytes() const { + return (char*) sge.addr; + } + + /** return the number of bytes available for application data */ + inline int32_t Buffer::byteCount() const { + return bufferSize - reserved; + } + + inline int32_t Buffer::dataCount() const { + return sge.length; + } + + inline void Buffer::dataCount(int32_t s) { + // catch any attempt to overflow a buffer + assert(s <= bufferSize + reserved); + sge.length = s; + } + + class Connection; + + enum QueueDirection { + NONE, + SEND, + RECV + }; + + class QueuePairEvent { + boost::shared_ptr< ::ibv_cq > cq; + ::ibv_wc wc; + QueueDirection dir; + + friend class QueuePair; + + QueuePairEvent(); + QueuePairEvent( + const ::ibv_wc& w, + boost::shared_ptr< ::ibv_cq > c, + QueueDirection d); + + public: + operator bool() const; + bool immPresent() const; + uint32_t getImm() const; + QueueDirection getDirection() const; + ::ibv_wc_opcode getEventType() const; + ::ibv_wc_status getEventStatus() const; + Buffer* getBuffer() const; + }; + + // Wrapper for a queue pair - this has the functionality for + // putting buffers on the receive queue and for sending buffers + // to the other end of the connection. + class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { + friend class Connection; + + boost::shared_ptr< ::ibv_pd > pd; + boost::shared_ptr< ::ibv_mr > smr; + boost::shared_ptr< ::ibv_mr > rmr; + boost::shared_ptr< ::ibv_comp_channel > cchannel; + boost::shared_ptr< ::ibv_cq > scq; + boost::shared_ptr< ::ibv_cq > rcq; + boost::shared_ptr< ::ibv_qp > qp; + int outstandingSendEvents; + int outstandingRecvEvents; + std::vector<Buffer> sendBuffers; + std::vector<Buffer> recvBuffers; + qpid::sys::Mutex bufferLock; + std::vector<int> freeBuffers; + + QueuePair(boost::shared_ptr< ::rdma_cm_id > id); + ~QueuePair(); + + public: + typedef boost::intrusive_ptr<QueuePair> intrusive_ptr; + + // Create a buffers to use for writing + void createSendBuffers(int sendBufferCount, int dataSize, int headerSize); + + // Get a send buffer + Buffer* getSendBuffer(); + + // Return buffer to pool after use + void returnSendBuffer(Buffer* b); + + // Create and post recv buffers + void allocateRecvBuffers(int recvBufferCount, int bufferSize); + + // Make channel non-blocking by making + // associated fd nonblocking + void nonblocking(); + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + QueuePair::intrusive_ptr getNextChannelEvent(); + + QueuePairEvent getNextEvent(); + + void postRecv(Buffer* buf); + void postSend(Buffer* buf); + void postSend(uint32_t imm, Buffer* buf); + void notifyRecv(); + void notifySend(); + }; + + class ConnectionEvent { + friend class Connection; + + // The order of the members is important as we have to acknowledge + // the event before destroying the ids on destruction + boost::intrusive_ptr<Connection> id; + boost::intrusive_ptr<Connection> listen_id; + boost::shared_ptr< ::rdma_cm_event > event; + + ConnectionEvent() {} + ConnectionEvent(::rdma_cm_event* e); + + // Default copy, assignment and destructor ok + public: + operator bool() const; + ::rdma_cm_event_type getEventType() const; + ::rdma_conn_param getConnectionParam() const; + boost::intrusive_ptr<Connection> getConnection () const; + boost::intrusive_ptr<Connection> getListenId() const; + }; + + // For the moment this is a fairly simple wrapper for rdma_cm_id. + // + // NB: It allocates a protection domain (pd) per connection which means that + // registered buffers can't be shared between different connections + // (this can only happen between connections on the same controller in any case, + // so needs careful management if used) + class Connection : public qpid::sys::IOHandle, public qpid::RefCounted { + boost::shared_ptr< ::rdma_event_channel > channel; + boost::shared_ptr< ::rdma_cm_id > id; + QueuePair::intrusive_ptr qp; + + void* context; + + friend class ConnectionEvent; + friend class QueuePair; + + // Wrap the passed in rdma_cm_id with a Connection + // this basically happens only on connection request + Connection(::rdma_cm_id* i); + Connection(); + ~Connection(); + + void ensureQueuePair(); + + public: + typedef boost::intrusive_ptr<Connection> intrusive_ptr; + + static intrusive_ptr make(); + static intrusive_ptr find(::rdma_cm_id* i); + + template <typename T> + void addContext(T* c) { + // Don't allow replacing context + if (!context) + context = c; + } + + void removeContext() { + context = 0; + } + + template <typename T> + T* getContext() { + return static_cast<T*>(context); + } + + // Make channel non-blocking by making + // associated fd nonblocking + void nonblocking(); + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + ConnectionEvent getNextEvent(); + + void bind(const qpid::sys::SocketAddress& src_addr) const; + void listen(int backlog = DEFAULT_BACKLOG) const; + void resolve_addr( + const qpid::sys::SocketAddress& dst_addr, + int timeout_ms = DEFAULT_TIMEOUT) const; + void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const; + void disconnect() const; + + // TODO: Currently you can only connect with the default connection parameters + void connect(const void* data, size_t len); + void connect(); + template <typename T> + void connect(const T* data) { + connect(data, sizeof(T)); + } + + // TODO: Not sure how to default accept params - they come from the connection request + // event + void accept(const ::rdma_conn_param& param, const void* data, size_t len); + void accept(const ::rdma_conn_param& param); + template <typename T> + void accept(const ::rdma_conn_param& param, const T* data) { + accept(param, data, sizeof(T)); + } + + void reject(const void* data, size_t len) const; + void reject() const; + template <typename T> + void reject(const T* data) const { + reject(data, sizeof(T)); + } + + QueuePair::intrusive_ptr getQueuePair(); + std::string getLocalName() const; + std::string getPeerName() const; + std::string getFullName() const { return getLocalName()+"-"+getPeerName(); } + }; +} + +std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t); + +#endif // RDMA_WRAP_H |