diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 566 |
1 files changed, 566 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp new file mode 100644 index 0000000000..efe454c5be --- /dev/null +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -0,0 +1,566 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/rdma/rdma_wrap.h" + +#include "qpid/sys/rdma/rdma_factories.h" +#include "qpid/sys/rdma/rdma_exception.h" + +#include "qpid/sys/posix/PrivatePosix.h" + +#include <fcntl.h> +#include <netdb.h> + +#include <iostream> +#include <stdexcept> + +namespace Rdma { + const ::rdma_conn_param DEFAULT_CONNECT_PARAM = { + 0, // .private_data + 0, // .private_data_len + 4, // .responder_resources + 4, // .initiator_depth + 0, // .flow_control + 5, // .retry_count + 7 // .rnr_retry_count + }; + + // This is moderately inefficient so don't use in a critical path + int deviceCount() { + int count; + ::ibv_free_device_list(::ibv_get_device_list(&count)); + return count; + } + + Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, + const int32_t reserve) : + bufferSize(byteCount + reserve), reserved(reserve) + { + sge.addr = (uintptr_t) bytes; + sge.length = 0; + sge.lkey = lkey; + } + + QueuePairEvent::QueuePairEvent() : + dir(NONE) + {} + + QueuePairEvent::QueuePairEvent( + const ::ibv_wc& w, + boost::shared_ptr< ::ibv_cq > c, + QueueDirection d) : + cq(c), + wc(w), + dir(d) + { + assert(dir != NONE); + } + + QueuePairEvent::operator bool() const { + return dir != NONE; + } + + bool QueuePairEvent::immPresent() const { + return wc.wc_flags & IBV_WC_WITH_IMM; + } + + uint32_t QueuePairEvent::getImm() const { + return ntohl(wc.imm_data); + } + + QueueDirection QueuePairEvent::getDirection() const { + return dir; + } + + ::ibv_wc_opcode QueuePairEvent::getEventType() const { + return wc.opcode; + } + + ::ibv_wc_status QueuePairEvent::getEventStatus() const { + return wc.status; + } + + Buffer* QueuePairEvent::getBuffer() const { + Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id); + b->dataCount(wc.byte_len); + return b; + } + + QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + pd(allocPd(i->verbs)), + cchannel(mkCChannel(i->verbs)), + scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), + rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), + outstandingSendEvents(0), + outstandingRecvEvents(0) + { + impl->fd = cchannel->fd; + + // Set cq context to this QueuePair object so we can find + // ourselves again + scq->cq_context = this; + rcq->cq_context = this; + + ::ibv_device_attr dev_attr; + CHECK(::ibv_query_device(i->verbs, &dev_attr)); + + ::ibv_qp_init_attr qp_attr = {}; + + // TODO: make a default struct for this + qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES; + qp_attr.cap.max_send_sge = 1; + qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES; + qp_attr.cap.max_recv_sge = 1; + + qp_attr.send_cq = scq.get(); + qp_attr.recv_cq = rcq.get(); + qp_attr.qp_type = IBV_QPT_RC; + + CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr)); + qp = mkQp(i->qp); + + // Set the qp context to this so we can find ourselves again + qp->qp_context = this; + } + + QueuePair::~QueuePair() { + // Reset back pointer in case someone else has the qp + qp->qp_context = 0; + + // Dispose queue pair before we ack events + qp.reset(); + + if (outstandingSendEvents > 0) + ::ibv_ack_cq_events(scq.get(), outstandingSendEvents); + if (outstandingRecvEvents > 0) + ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents); + + // Deallocate recv buffer memory + if (rmr) delete [] static_cast<char*>(rmr->addr); + + // Deallocate recv buffer memory + if (smr) delete [] static_cast<char*>(smr->addr); + + // The buffers vectors automatically deletes all the buffers we've allocated + } + + // Create buffers to use for writing + void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved) + { + assert(!smr); + + // Round up buffersize to cacheline (64 bytes) + int dataLength = (bufferSize+reserved+63) & (~63); + + // Allocate memory block for all receive buffers + char* mem = new char [sendBufferCount * dataLength]; + smr = regMr(pd.get(), mem, sendBufferCount * dataLength, ::IBV_ACCESS_LOCAL_WRITE); + sendBuffers.reserve(sendBufferCount); + freeBuffers.reserve(sendBufferCount); + for (int i = 0; i<sendBufferCount; ++i) { + // Allocate xmit buffer + sendBuffers.push_back(Buffer(smr->lkey, &mem[i*dataLength], bufferSize, reserved)); + freeBuffers.push_back(i); + } + } + + Buffer* QueuePair::getSendBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock); + if (freeBuffers.empty()) + return 0; + int i = freeBuffers.back(); + freeBuffers.pop_back(); + assert(i >= 0 && i < int(sendBuffers.size())); + Buffer* b = &sendBuffers[i]; + b->dataCount(0); + return b; + } + + void QueuePair::returnSendBuffer(Buffer* b) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferLock); + int i = b - &sendBuffers[0]; + assert(i >= 0 && i < int(sendBuffers.size())); + freeBuffers.push_back(i); + } + + void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize) + { + assert(!rmr); + + // Round up buffersize to cacheline (64 bytes) + bufferSize = (bufferSize+63) & (~63); + + // Allocate memory block for all receive buffers + char* mem = new char [recvBufferCount * bufferSize]; + rmr = regMr(pd.get(), mem, recvBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE); + recvBuffers.reserve(recvBufferCount); + for (int i = 0; i<recvBufferCount; ++i) { + // Allocate recv buffer + recvBuffers.push_back(Buffer(rmr->lkey, &mem[i*bufferSize], bufferSize)); + postRecv(&recvBuffers[i]); + } + } + + // Make channel non-blocking by making + // associated fd nonblocking + void QueuePair::nonblocking() { + ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK); + } + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + QueuePair::intrusive_ptr QueuePair::getNextChannelEvent() { + // First find out which cq has the event + ::ibv_cq* cq; + void* ctx; + int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx); + if (rc == -1 && errno == EAGAIN) + return 0; + CHECK(rc); + + // Batch acknowledge the event + if (cq == scq.get()) { + if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) { + ::ibv_ack_cq_events(cq, outstandingSendEvents); + outstandingSendEvents = 0; + } + } else if (cq == rcq.get()) { + if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) { + ::ibv_ack_cq_events(cq, outstandingRecvEvents); + outstandingRecvEvents = 0; + } + } + + return static_cast<QueuePair*>(ctx); + } + + QueuePairEvent QueuePair::getNextEvent() { + ::ibv_wc w; + if (::ibv_poll_cq(scq.get(), 1, &w) == 1) + return QueuePairEvent(w, scq, SEND); + else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1) + return QueuePairEvent(w, rcq, RECV); + else + return QueuePairEvent(); + } + + void QueuePair::notifyRecv() { + CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0)); + } + + void QueuePair::notifySend() { + CHECK_IBV(ibv_req_notify_cq(scq.get(), 0)); + } + + void QueuePair::postRecv(Buffer* buf) { + ::ibv_recv_wr rwr = {}; + + rwr.wr_id = reinterpret_cast<uint64_t>(buf); + // We are given the whole buffer + buf->dataCount(buf->byteCount()); + rwr.sg_list = &buf->sge; + rwr.num_sge = 1; + + ::ibv_recv_wr* badrwr = 0; + CHECK(::ibv_post_recv(qp.get(), &rwr, &badrwr)); + if (badrwr) + throw std::logic_error("ibv_post_recv(): Bad rwr"); + } + + void QueuePair::postSend(Buffer* buf) { + ::ibv_send_wr swr = {}; + + swr.wr_id = reinterpret_cast<uint64_t>(buf); + swr.opcode = IBV_WR_SEND; + swr.send_flags = IBV_SEND_SIGNALED; + swr.sg_list = &buf->sge; + swr.num_sge = 1; + + ::ibv_send_wr* badswr = 0; + CHECK(::ibv_post_send(qp.get(), &swr, &badswr)); + if (badswr) + throw std::logic_error("ibv_post_send(): Bad swr"); + } + + void QueuePair::postSend(uint32_t imm, Buffer* buf) { + ::ibv_send_wr swr = {}; + + swr.wr_id = reinterpret_cast<uint64_t>(buf); + swr.imm_data = htonl(imm); + swr.opcode = IBV_WR_SEND_WITH_IMM; + swr.send_flags = IBV_SEND_SIGNALED; + swr.sg_list = &buf->sge; + swr.num_sge = 1; + + ::ibv_send_wr* badswr = 0; + CHECK(::ibv_post_send(qp.get(), &swr, &badswr)); + if (badswr) + throw std::logic_error("ibv_post_send(): Bad swr"); + } + + ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) : + id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ? + Connection::find(e->id) : new Connection(e->id)), + listen_id(Connection::find(e->listen_id)), + event(mkEvent(e)) + {} + + ConnectionEvent::operator bool() const { + return event; + } + + ::rdma_cm_event_type ConnectionEvent::getEventType() const { + return event->event; + } + + ::rdma_conn_param ConnectionEvent::getConnectionParam() const { + // It's badly documented, but it seems from the librdma source code that all the following + // event types have a valid param.conn + switch (event->event) { + case RDMA_CM_EVENT_CONNECT_REQUEST: + case RDMA_CM_EVENT_ESTABLISHED: + case RDMA_CM_EVENT_REJECTED: + case RDMA_CM_EVENT_DISCONNECTED: + case RDMA_CM_EVENT_CONNECT_ERROR: + return event->param.conn; + default: + ::rdma_conn_param p = {}; + return p; + } + } + + boost::intrusive_ptr<Connection> ConnectionEvent::getConnection () const { + return id; + } + + boost::intrusive_ptr<Connection> ConnectionEvent::getListenId() const { + return listen_id; + } + + // Wrap the passed in rdma_cm_id with a Connection + // this basically happens only on connection request + Connection::Connection(::rdma_cm_id* i) : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + id(mkId(i)), + context(0) + { + impl->fd = id->channel->fd; + + // Just overwrite the previous context as it will + // have come from the listening connection + if (i) + i->context = this; + } + + Connection::Connection() : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + channel(mkEChannel()), + id(mkId(channel.get(), this, RDMA_PS_TCP)), + context(0) + { + impl->fd = channel->fd; + } + + Connection::~Connection() { + // Reset the id context in case someone else has it + id->context = 0; + } + + void Connection::ensureQueuePair() { + assert(id.get()); + + // Only allocate a queue pair if there isn't one already + if (qp) + return; + + qp = new QueuePair(id); + } + + Connection::intrusive_ptr Connection::make() { + return new Connection(); + } + + Connection::intrusive_ptr Connection::find(::rdma_cm_id* i) { + if (!i) + return 0; + Connection* id = static_cast< Connection* >(i->context); + if (!id) + throw std::logic_error("Couldn't find existing Connection"); + return id; + } + + // Make channel non-blocking by making + // associated fd nonblocking + void Connection::nonblocking() { + assert(id.get()); + ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK); + } + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + ConnectionEvent Connection::getNextEvent() { + assert(id.get()); + ::rdma_cm_event* e; + int rc = ::rdma_get_cm_event(id->channel, &e); + if (GETERR(rc) == EAGAIN) + return ConnectionEvent(); + CHECK(rc); + return ConnectionEvent(e); + } + + void Connection::bind(const qpid::sys::SocketAddress& src_addr) const { + assert(id.get()); + CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr)); + } + + void Connection::listen(int backlog) const { + assert(id.get()); + CHECK(::rdma_listen(id.get(), backlog)); + } + + void Connection::resolve_addr( + const qpid::sys::SocketAddress& dst_addr, + int timeout_ms) const + { + assert(id.get()); + CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms)); + } + + void Connection::resolve_route(int timeout_ms) const { + assert(id.get()); + CHECK(::rdma_resolve_route(id.get(), timeout_ms)); + } + + void Connection::disconnect() const { + assert(id.get()); + int rc = ::rdma_disconnect(id.get()); + // iWarp doesn't let you disconnect a disconnected connection + // but Infiniband can do so it's okay to call rdma_disconnect() + // in response to a disconnect event, but we may get an error + if (GETERR(rc) == EINVAL) + return; + CHECK(rc); + } + + // TODO: Currently you can only connect with the default connection parameters + void Connection::connect(const void* data, size_t len) { + assert(id.get()); + // Need to have a queue pair before we can connect + ensureQueuePair(); + + ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; + p.private_data = data; + p.private_data_len = len; + CHECK(::rdma_connect(id.get(), &p)); + } + + void Connection::connect() { + connect(0, 0); + } + + void Connection::accept(const ::rdma_conn_param& param, const void* data, size_t len) { + assert(id.get()); + // Need to have a queue pair before we can accept + ensureQueuePair(); + + ::rdma_conn_param p = param; + p.private_data = data; + p.private_data_len = len; + CHECK(::rdma_accept(id.get(), &p)); + } + + void Connection::accept(const ::rdma_conn_param& param) { + accept(param, 0, 0); + } + + void Connection::reject(const void* data, size_t len) const { + assert(id.get()); + CHECK(::rdma_reject(id.get(), data, len)); + } + + void Connection::reject() const { + assert(id.get()); + CHECK(::rdma_reject(id.get(), 0, 0)); + } + + QueuePair::intrusive_ptr Connection::getQueuePair() { + assert(id.get()); + + ensureQueuePair(); + + return qp; + } + + std::string Connection::getLocalName() const { + ::sockaddr* addr = ::rdma_get_local_addr(id.get()); + char hostName[NI_MAXHOST]; + char portName[NI_MAXSERV]; + CHECK_IBV(::getnameinfo( + addr, sizeof(::sockaddr_storage), + hostName, sizeof(hostName), + portName, sizeof(portName), + NI_NUMERICHOST | NI_NUMERICSERV)); + std::string r(hostName); + r += ":"; + r += portName; + return r; + } + + std::string Connection::getPeerName() const { + ::sockaddr* addr = ::rdma_get_peer_addr(id.get()); + char hostName[NI_MAXHOST]; + char portName[NI_MAXSERV]; + CHECK_IBV(::getnameinfo( + addr, sizeof(::sockaddr_storage), + hostName, sizeof(hostName), + portName, sizeof(portName), + NI_NUMERICHOST | NI_NUMERICSERV)); + std::string r(hostName); + r += ":"; + r += portName; + return r; + } +} + +std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) { +# define CHECK_TYPE(t) case t: o << #t; break; + switch(t) { + CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED) + CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED) + CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE) + CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR) + CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE) + CHECK_TYPE(RDMA_CM_EVENT_REJECTED) + CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED) + CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED) + CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL) + CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN) + CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR) + default: + o << "UNKNOWN_EVENT"; + } +# undef CHECK_TYPE + return o; +} |