summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp566
1 files changed, 566 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
new file mode 100644
index 0000000000..efe454c5be
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -0,0 +1,566 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/rdma/rdma_wrap.h"
+
+#include "qpid/sys/rdma/rdma_factories.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <fcntl.h>
+#include <netdb.h>
+
+#include <iostream>
+#include <stdexcept>
+
+namespace Rdma {
+ const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
+ 0, // .private_data
+ 0, // .private_data_len
+ 4, // .responder_resources
+ 4, // .initiator_depth
+ 0, // .flow_control
+ 5, // .retry_count
+ 7 // .rnr_retry_count
+ };
+
+ // This is moderately inefficient so don't use in a critical path
+ int deviceCount() {
+ int count;
+ ::ibv_free_device_list(::ibv_get_device_list(&count));
+ return count;
+ }
+
+ Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount,
+ const int32_t reserve) :
+ bufferSize(byteCount + reserve), reserved(reserve)
+ {
+ sge.addr = (uintptr_t) bytes;
+ sge.length = 0;
+ sge.lkey = lkey;
+ }
+
+ QueuePairEvent::QueuePairEvent() :
+ dir(NONE)
+ {}
+
+ QueuePairEvent::QueuePairEvent(
+ const ::ibv_wc& w,
+ boost::shared_ptr< ::ibv_cq > c,
+ QueueDirection d) :
+ cq(c),
+ wc(w),
+ dir(d)
+ {
+ assert(dir != NONE);
+ }
+
+ QueuePairEvent::operator bool() const {
+ return dir != NONE;
+ }
+
+ bool QueuePairEvent::immPresent() const {
+ return wc.wc_flags & IBV_WC_WITH_IMM;
+ }
+
+ uint32_t QueuePairEvent::getImm() const {
+ return ntohl(wc.imm_data);
+ }
+
+ QueueDirection QueuePairEvent::getDirection() const {
+ return dir;
+ }
+
+ ::ibv_wc_opcode QueuePairEvent::getEventType() const {
+ return wc.opcode;
+ }
+
+ ::ibv_wc_status QueuePairEvent::getEventStatus() const {
+ return wc.status;
+ }
+
+ Buffer* QueuePairEvent::getBuffer() const {
+ Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
+ b->dataCount(wc.byte_len);
+ return b;
+ }
+
+ QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ pd(allocPd(i->verbs)),
+ cchannel(mkCChannel(i->verbs)),
+ scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+ rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+ outstandingSendEvents(0),
+ outstandingRecvEvents(0)
+ {
+ impl->fd = cchannel->fd;
+
+ // Set cq context to this QueuePair object so we can find
+ // ourselves again
+ scq->cq_context = this;
+ rcq->cq_context = this;
+
+ ::ibv_device_attr dev_attr;
+ CHECK(::ibv_query_device(i->verbs, &dev_attr));
+
+ ::ibv_qp_init_attr qp_attr = {};
+
+ // TODO: make a default struct for this
+ qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES;
+ qp_attr.cap.max_send_sge = 1;
+ qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES;
+ qp_attr.cap.max_recv_sge = 1;
+
+ qp_attr.send_cq = scq.get();
+ qp_attr.recv_cq = rcq.get();
+ qp_attr.qp_type = IBV_QPT_RC;
+
+ CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr));
+ qp = mkQp(i->qp);
+
+ // Set the qp context to this so we can find ourselves again
+ qp->qp_context = this;
+ }
+
+ QueuePair::~QueuePair() {
+ // Reset back pointer in case someone else has the qp
+ qp->qp_context = 0;
+
+ // Dispose queue pair before we ack events
+ qp.reset();
+
+ if (outstandingSendEvents > 0)
+ ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
+ if (outstandingRecvEvents > 0)
+ ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
+
+ // Deallocate recv buffer memory
+ if (rmr) delete [] static_cast<char*>(rmr->addr);
+
+ // Deallocate recv buffer memory
+ if (smr) delete [] static_cast<char*>(smr->addr);
+
+ // The buffers vectors automatically deletes all the buffers we've allocated
+ }
+
+ // Create buffers to use for writing
+ void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved)
+ {
+ assert(!smr);
+
+ // Round up buffersize to cacheline (64 bytes)
+ int dataLength = (bufferSize+reserved+63) & (~63);
+
+ // Allocate memory block for all receive buffers
+ char* mem = new char [sendBufferCount * dataLength];
+ smr = regMr(pd.get(), mem, sendBufferCount * dataLength, ::IBV_ACCESS_LOCAL_WRITE);
+ sendBuffers.reserve(sendBufferCount);
+ freeBuffers.reserve(sendBufferCount);
+ for (int i = 0; i<sendBufferCount; ++i) {
+ // Allocate xmit buffer
+ sendBuffers.push_back(Buffer(smr->lkey, &mem[i*dataLength], bufferSize, reserved));
+ freeBuffers.push_back(i);
+ }
+ }
+
+ Buffer* QueuePair::getSendBuffer() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock);
+ if (freeBuffers.empty())
+ return 0;
+ int i = freeBuffers.back();
+ freeBuffers.pop_back();
+ assert(i >= 0 && i < int(sendBuffers.size()));
+ Buffer* b = &sendBuffers[i];
+ b->dataCount(0);
+ return b;
+ }
+
+ void QueuePair::returnSendBuffer(Buffer* b) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock);
+ int i = b - &sendBuffers[0];
+ assert(i >= 0 && i < int(sendBuffers.size()));
+ freeBuffers.push_back(i);
+ }
+
+ void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize)
+ {
+ assert(!rmr);
+
+ // Round up buffersize to cacheline (64 bytes)
+ bufferSize = (bufferSize+63) & (~63);
+
+ // Allocate memory block for all receive buffers
+ char* mem = new char [recvBufferCount * bufferSize];
+ rmr = regMr(pd.get(), mem, recvBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+ recvBuffers.reserve(recvBufferCount);
+ for (int i = 0; i<recvBufferCount; ++i) {
+ // Allocate recv buffer
+ recvBuffers.push_back(Buffer(rmr->lkey, &mem[i*bufferSize], bufferSize));
+ postRecv(&recvBuffers[i]);
+ }
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void QueuePair::nonblocking() {
+ ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ QueuePair::intrusive_ptr QueuePair::getNextChannelEvent() {
+ // First find out which cq has the event
+ ::ibv_cq* cq;
+ void* ctx;
+ int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
+ if (rc == -1 && errno == EAGAIN)
+ return 0;
+ CHECK(rc);
+
+ // Batch acknowledge the event
+ if (cq == scq.get()) {
+ if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingSendEvents);
+ outstandingSendEvents = 0;
+ }
+ } else if (cq == rcq.get()) {
+ if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingRecvEvents);
+ outstandingRecvEvents = 0;
+ }
+ }
+
+ return static_cast<QueuePair*>(ctx);
+ }
+
+ QueuePairEvent QueuePair::getNextEvent() {
+ ::ibv_wc w;
+ if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, scq, SEND);
+ else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, rcq, RECV);
+ else
+ return QueuePairEvent();
+ }
+
+ void QueuePair::notifyRecv() {
+ CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
+ }
+
+ void QueuePair::notifySend() {
+ CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
+ }
+
+ void QueuePair::postRecv(Buffer* buf) {
+ ::ibv_recv_wr rwr = {};
+
+ rwr.wr_id = reinterpret_cast<uint64_t>(buf);
+ // We are given the whole buffer
+ buf->dataCount(buf->byteCount());
+ rwr.sg_list = &buf->sge;
+ rwr.num_sge = 1;
+
+ ::ibv_recv_wr* badrwr = 0;
+ CHECK(::ibv_post_recv(qp.get(), &rwr, &badrwr));
+ if (badrwr)
+ throw std::logic_error("ibv_post_recv(): Bad rwr");
+ }
+
+ void QueuePair::postSend(Buffer* buf) {
+ ::ibv_send_wr swr = {};
+
+ swr.wr_id = reinterpret_cast<uint64_t>(buf);
+ swr.opcode = IBV_WR_SEND;
+ swr.send_flags = IBV_SEND_SIGNALED;
+ swr.sg_list = &buf->sge;
+ swr.num_sge = 1;
+
+ ::ibv_send_wr* badswr = 0;
+ CHECK(::ibv_post_send(qp.get(), &swr, &badswr));
+ if (badswr)
+ throw std::logic_error("ibv_post_send(): Bad swr");
+ }
+
+ void QueuePair::postSend(uint32_t imm, Buffer* buf) {
+ ::ibv_send_wr swr = {};
+
+ swr.wr_id = reinterpret_cast<uint64_t>(buf);
+ swr.imm_data = htonl(imm);
+ swr.opcode = IBV_WR_SEND_WITH_IMM;
+ swr.send_flags = IBV_SEND_SIGNALED;
+ swr.sg_list = &buf->sge;
+ swr.num_sge = 1;
+
+ ::ibv_send_wr* badswr = 0;
+ CHECK(::ibv_post_send(qp.get(), &swr, &badswr));
+ if (badswr)
+ throw std::logic_error("ibv_post_send(): Bad swr");
+ }
+
+ ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
+ id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
+ Connection::find(e->id) : new Connection(e->id)),
+ listen_id(Connection::find(e->listen_id)),
+ event(mkEvent(e))
+ {}
+
+ ConnectionEvent::operator bool() const {
+ return event;
+ }
+
+ ::rdma_cm_event_type ConnectionEvent::getEventType() const {
+ return event->event;
+ }
+
+ ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
+ // It's badly documented, but it seems from the librdma source code that all the following
+ // event types have a valid param.conn
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ case RDMA_CM_EVENT_ESTABLISHED:
+ case RDMA_CM_EVENT_REJECTED:
+ case RDMA_CM_EVENT_DISCONNECTED:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ return event->param.conn;
+ default:
+ ::rdma_conn_param p = {};
+ return p;
+ }
+ }
+
+ boost::intrusive_ptr<Connection> ConnectionEvent::getConnection () const {
+ return id;
+ }
+
+ boost::intrusive_ptr<Connection> ConnectionEvent::getListenId() const {
+ return listen_id;
+ }
+
+ // Wrap the passed in rdma_cm_id with a Connection
+ // this basically happens only on connection request
+ Connection::Connection(::rdma_cm_id* i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ id(mkId(i)),
+ context(0)
+ {
+ impl->fd = id->channel->fd;
+
+ // Just overwrite the previous context as it will
+ // have come from the listening connection
+ if (i)
+ i->context = this;
+ }
+
+ Connection::Connection() :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ channel(mkEChannel()),
+ id(mkId(channel.get(), this, RDMA_PS_TCP)),
+ context(0)
+ {
+ impl->fd = channel->fd;
+ }
+
+ Connection::~Connection() {
+ // Reset the id context in case someone else has it
+ id->context = 0;
+ }
+
+ void Connection::ensureQueuePair() {
+ assert(id.get());
+
+ // Only allocate a queue pair if there isn't one already
+ if (qp)
+ return;
+
+ qp = new QueuePair(id);
+ }
+
+ Connection::intrusive_ptr Connection::make() {
+ return new Connection();
+ }
+
+ Connection::intrusive_ptr Connection::find(::rdma_cm_id* i) {
+ if (!i)
+ return 0;
+ Connection* id = static_cast< Connection* >(i->context);
+ if (!id)
+ throw std::logic_error("Couldn't find existing Connection");
+ return id;
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void Connection::nonblocking() {
+ assert(id.get());
+ ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ ConnectionEvent Connection::getNextEvent() {
+ assert(id.get());
+ ::rdma_cm_event* e;
+ int rc = ::rdma_get_cm_event(id->channel, &e);
+ if (GETERR(rc) == EAGAIN)
+ return ConnectionEvent();
+ CHECK(rc);
+ return ConnectionEvent(e);
+ }
+
+ void Connection::bind(const qpid::sys::SocketAddress& src_addr) const {
+ assert(id.get());
+ CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr));
+ }
+
+ void Connection::listen(int backlog) const {
+ assert(id.get());
+ CHECK(::rdma_listen(id.get(), backlog));
+ }
+
+ void Connection::resolve_addr(
+ const qpid::sys::SocketAddress& dst_addr,
+ int timeout_ms) const
+ {
+ assert(id.get());
+ CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms));
+ }
+
+ void Connection::resolve_route(int timeout_ms) const {
+ assert(id.get());
+ CHECK(::rdma_resolve_route(id.get(), timeout_ms));
+ }
+
+ void Connection::disconnect() const {
+ assert(id.get());
+ int rc = ::rdma_disconnect(id.get());
+ // iWarp doesn't let you disconnect a disconnected connection
+ // but Infiniband can do so it's okay to call rdma_disconnect()
+ // in response to a disconnect event, but we may get an error
+ if (GETERR(rc) == EINVAL)
+ return;
+ CHECK(rc);
+ }
+
+ // TODO: Currently you can only connect with the default connection parameters
+ void Connection::connect(const void* data, size_t len) {
+ assert(id.get());
+ // Need to have a queue pair before we can connect
+ ensureQueuePair();
+
+ ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+ p.private_data = data;
+ p.private_data_len = len;
+ CHECK(::rdma_connect(id.get(), &p));
+ }
+
+ void Connection::connect() {
+ connect(0, 0);
+ }
+
+ void Connection::accept(const ::rdma_conn_param& param, const void* data, size_t len) {
+ assert(id.get());
+ // Need to have a queue pair before we can accept
+ ensureQueuePair();
+
+ ::rdma_conn_param p = param;
+ p.private_data = data;
+ p.private_data_len = len;
+ CHECK(::rdma_accept(id.get(), &p));
+ }
+
+ void Connection::accept(const ::rdma_conn_param& param) {
+ accept(param, 0, 0);
+ }
+
+ void Connection::reject(const void* data, size_t len) const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), data, len));
+ }
+
+ void Connection::reject() const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), 0, 0));
+ }
+
+ QueuePair::intrusive_ptr Connection::getQueuePair() {
+ assert(id.get());
+
+ ensureQueuePair();
+
+ return qp;
+ }
+
+ std::string Connection::getLocalName() const {
+ ::sockaddr* addr = ::rdma_get_local_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
+
+ std::string Connection::getPeerName() const {
+ ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
+}
+
+std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
+# define CHECK_TYPE(t) case t: o << #t; break;
+ switch(t) {
+ CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
+ CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
+ CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
+ CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
+ CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
+ CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
+ CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
+ CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
+ CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
+ default:
+ o << "UNKNOWN_EVENT";
+ }
+# undef CHECK_TYPE
+ return o;
+}