diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-05-13 02:26:21 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-05-13 02:26:21 +0000 |
commit | ad2ba4e72e4cc2fafd5b110d04d8eb258bc207d3 (patch) | |
tree | e46b2b30f56e91d4787de393eea8aac1e85b0561 /cpp/src | |
parent | 31070fa710b1a06e468cc03c486b9c7fbe39e462 (diff) | |
download | qpid-python-ad2ba4e72e4cc2fafd5b110d04d8eb258bc207d3.tar.gz |
Rearrange RDMA wrapper class code so that the interface and implementation
are better separated.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_factories.cpp | 70 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_factories.h | 46 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 364 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.h | 364 |
7 files changed, 463 insertions, 384 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 5a5c10401c..58317838bc 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -26,6 +26,7 @@ #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/log/Statement.h" #include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/rdma/rdma_exception.h" #include "qpid/sys/OutputControl.h" #include "qpid/sys/SecuritySettings.h" diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 78068cbd53..4f51df8498 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/rdma/rdma_exception.h" #include "qpid/sys/Time.h" #include <netdb.h> diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index 137a1ef587..4fcd551bba 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/sys/rdma/RdmaIO.h" +#include "qpid/sys/rdma/rdma_exception.h" #include <arpa/inet.h> diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.cpp b/cpp/src/qpid/sys/rdma/rdma_factories.cpp index a5e9ebd23c..69741438fa 100644 --- a/cpp/src/qpid/sys/rdma/rdma_factories.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_factories.cpp @@ -20,45 +20,77 @@ */ #include "qpid/sys/rdma/rdma_factories.h" +#include "qpid/sys/rdma/rdma_exception.h" + + namespace Rdma { + // Intentionally ignore return values for these functions + // - we can't do anything about then anyway void acker(::rdma_cm_event* e) throw () { - if (e) - // Intentionally ignore return value - we can't do anything about it here - (void) ::rdma_ack_cm_event(e); + if (e) (void) ::rdma_ack_cm_event(e); } void destroyEChannel(::rdma_event_channel* c) throw () { - if (c) - // Intentionally ignore return value - we can't do anything about it here - (void) ::rdma_destroy_event_channel(c); + if (c) (void) ::rdma_destroy_event_channel(c); } void destroyId(::rdma_cm_id* i) throw () { - if (i) - // Intentionally ignore return value - we can't do anything about it here - (void) ::rdma_destroy_id(i); + if (i) (void) ::rdma_destroy_id(i); } void deallocPd(::ibv_pd* p) throw () { - if (p) - // Intentionally ignore return value - we can't do anything about it here - (void) ::ibv_dealloc_pd(p); + if (p) (void) ::ibv_dealloc_pd(p); } void destroyCChannel(::ibv_comp_channel* c) throw () { - if (c) - // Intentionally ignore return value - we can't do anything about it here - (void) ::ibv_destroy_comp_channel(c); + if (c) (void) ::ibv_destroy_comp_channel(c); } void destroyCq(::ibv_cq* cq) throw () { - if (cq) - (void) ::ibv_destroy_cq(cq); + if (cq) (void) ::ibv_destroy_cq(cq); } void destroyQp(::ibv_qp* qp) throw () { - if (qp) - (void) ::ibv_destroy_qp(qp); + if (qp) (void) ::ibv_destroy_qp(qp); + } + + boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i) { + return boost::shared_ptr< ::rdma_cm_id >(i, destroyId); + } + + boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e) { + return boost::shared_ptr< ::rdma_cm_event >(e, acker); + } + + boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp) { + return boost::shared_ptr< ::ibv_qp > (qp, destroyQp); + } + + boost::shared_ptr< ::rdma_event_channel > mkEChannel() { + ::rdma_event_channel* c = CHECK_NULL(::rdma_create_event_channel()); + return boost::shared_ptr< ::rdma_event_channel >(c, destroyEChannel); } + boost::shared_ptr< ::rdma_cm_id > + mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) { + ::rdma_cm_id* i; + CHECK(::rdma_create_id(ec, &i, context, ps)); + return mkId(i); + } + + boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) { + ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c)); + return boost::shared_ptr< ::ibv_pd >(pd, deallocPd); + } + + boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) { + ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c)); + return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel); + } + + boost::shared_ptr< ::ibv_cq > + mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) { + ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0)); + return boost::shared_ptr< ::ibv_cq >(cq, destroyCq); + } } diff --git a/cpp/src/qpid/sys/rdma/rdma_factories.h b/cpp/src/qpid/sys/rdma/rdma_factories.h index 783181cf1b..3432baf08c 100644 --- a/cpp/src/qpid/sys/rdma/rdma_factories.h +++ b/cpp/src/qpid/sys/rdma/rdma_factories.h @@ -21,49 +21,19 @@ #ifndef RDMA_FACTORIES_H #define RDMA_FACTORIES_H -#include "qpid/sys/rdma/rdma_exception.h" - #include <rdma/rdma_cma.h> #include <boost/shared_ptr.hpp> namespace Rdma { - // These allow us to use simple shared_ptrs to do ref counting - void acker(::rdma_cm_event* e) throw (); - void destroyEChannel(::rdma_event_channel* c) throw (); - void destroyId(::rdma_cm_id* i) throw (); - void deallocPd(::ibv_pd* p) throw (); - void destroyCChannel(::ibv_comp_channel* c) throw (); - void destroyCq(::ibv_cq* cq) throw (); - void destroyQp(::ibv_qp* qp) throw (); - - inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() { - ::rdma_event_channel* c = CHECK_NULL(::rdma_create_event_channel()); - return boost::shared_ptr< ::rdma_event_channel >(c, destroyEChannel); - } - - inline boost::shared_ptr< ::rdma_cm_id > - mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) { - ::rdma_cm_id* i; - CHECK(::rdma_create_id(ec, &i, context, ps)); - return boost::shared_ptr< ::rdma_cm_id >(i, destroyId); - } - - inline boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) { - ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c)); - return boost::shared_ptr< ::ibv_pd >(pd, deallocPd); - } - - inline boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) { - ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c)); - return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel); - } - - inline boost::shared_ptr< ::ibv_cq > - mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) { - ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0)); - return boost::shared_ptr< ::ibv_cq >(cq, destroyCq); - } + boost::shared_ptr< ::rdma_event_channel > mkEChannel(); + boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps); + boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i); + boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e); + boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp); + boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c); + boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c); + boost::shared_ptr< ::ibv_cq > mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc); } #endif // RDMA_FACTORIES_H diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index 53e31ca766..8944be2034 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -21,6 +21,17 @@ #include "qpid/sys/rdma/rdma_wrap.h" +#include "qpid/sys/rdma/rdma_factories.h" +#include "qpid/sys/rdma/rdma_exception.h" + +#include "qpid/sys/posix/PrivatePosix.h" + +#include <fcntl.h> +#include <netdb.h> + +#include <iostream> +#include <stdexcept> + namespace Rdma { const ::rdma_conn_param DEFAULT_CONNECT_PARAM = { 0, // .private_data @@ -39,20 +50,64 @@ namespace Rdma { return count; } - ::rdma_conn_param ConnectionEvent::getConnectionParam() const { - // It's badly documented, but it seems from the librdma source code that all the following - // event types have a valid param.conn - switch (event->event) { - case RDMA_CM_EVENT_CONNECT_REQUEST: - case RDMA_CM_EVENT_ESTABLISHED: - case RDMA_CM_EVENT_REJECTED: - case RDMA_CM_EVENT_DISCONNECTED: - case RDMA_CM_EVENT_CONNECT_ERROR: - return event->param.conn; - default: - ::rdma_conn_param p = {}; - return p; - } + Buffer::Buffer(::ibv_pd* pd, char* const b, const int32_t s) : + bytes(b), + byteCount(s), + dataStart(0), + dataCount(0), + mr(CHECK_NULL(::ibv_reg_mr( + pd, bytes, byteCount, + ::IBV_ACCESS_LOCAL_WRITE))) + {} + + Buffer::~Buffer() { + (void) ::ibv_dereg_mr(mr); + delete [] bytes; + } + + QueuePairEvent::QueuePairEvent() : + dir(NONE) + {} + + QueuePairEvent::QueuePairEvent( + const ::ibv_wc& w, + boost::shared_ptr< ::ibv_cq > c, + QueueDirection d) : + cq(c), + wc(w), + dir(d) + { + assert(dir != NONE); + } + + QueuePairEvent::operator bool() const { + return dir != NONE; + } + + bool QueuePairEvent::immPresent() const { + return wc.wc_flags & IBV_WC_WITH_IMM; + } + + uint32_t QueuePairEvent::getImm() const { + return ntohl(wc.imm_data); + } + + QueueDirection QueuePairEvent::getDirection() const { + return dir; + } + + ::ibv_wc_opcode QueuePairEvent::getEventType() const { + return wc.opcode; + } + + ::ibv_wc_status QueuePairEvent::getEventStatus() const { + return wc.status; + } + + Buffer* QueuePairEvent::getBuffer() const { + Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id); + b->dataCount = wc.byte_len; + return b; } QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : @@ -84,7 +139,7 @@ namespace Rdma { qp_attr.qp_type = IBV_QPT_RC; CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr)); - qp = boost::shared_ptr< ::ibv_qp >(i->qp, destroyQp); + qp = mkQp(i->qp); // Set the qp context to this so we can find ourselves again qp->qp_context = this; @@ -100,6 +155,62 @@ namespace Rdma { qp->qp_context = 0; } + // Create a buffer to use for writing + Buffer* QueuePair::createBuffer(int s) { + return new Buffer(pd.get(), new char[s], s); + } + + // Make channel non-blocking by making + // associated fd nonblocking + void QueuePair::nonblocking() { + ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK); + } + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + QueuePair::intrusive_ptr QueuePair::getNextChannelEvent() { + // First find out which cq has the event + ::ibv_cq* cq; + void* ctx; + int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx); + if (rc == -1 && errno == EAGAIN) + return 0; + CHECK(rc); + + // Batch acknowledge the event + if (cq == scq.get()) { + if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) { + ::ibv_ack_cq_events(cq, outstandingSendEvents); + outstandingSendEvents = 0; + } + } else if (cq == rcq.get()) { + if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) { + ::ibv_ack_cq_events(cq, outstandingRecvEvents); + outstandingRecvEvents = 0; + } + } + + return static_cast<QueuePair*>(ctx); + } + + QueuePairEvent QueuePair::getNextEvent() { + ::ibv_wc w; + if (::ibv_poll_cq(scq.get(), 1, &w) == 1) + return QueuePairEvent(w, scq, SEND); + else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1) + return QueuePairEvent(w, rcq, RECV); + else + return QueuePairEvent(); + } + + void QueuePair::notifyRecv() { + CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0)); + } + + void QueuePair::notifySend() { + CHECK_IBV(ibv_req_notify_cq(scq.get(), 0)); + } + void QueuePair::postRecv(Buffer* buf) { ::ibv_recv_wr rwr = {}; ::ibv_sge sge; @@ -158,6 +269,229 @@ namespace Rdma { if (badswr) throw std::logic_error("ibv_post_send(): Bad swr"); } + + ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) : + id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ? + Connection::find(e->id) : new Connection(e->id)), + listen_id(Connection::find(e->listen_id)), + event(mkEvent(e)) + {} + + ConnectionEvent::operator bool() const { + return event; + } + + ::rdma_cm_event_type ConnectionEvent::getEventType() const { + return event->event; + } + + ::rdma_conn_param ConnectionEvent::getConnectionParam() const { + // It's badly documented, but it seems from the librdma source code that all the following + // event types have a valid param.conn + switch (event->event) { + case RDMA_CM_EVENT_CONNECT_REQUEST: + case RDMA_CM_EVENT_ESTABLISHED: + case RDMA_CM_EVENT_REJECTED: + case RDMA_CM_EVENT_DISCONNECTED: + case RDMA_CM_EVENT_CONNECT_ERROR: + return event->param.conn; + default: + ::rdma_conn_param p = {}; + return p; + } + } + + boost::intrusive_ptr<Connection> ConnectionEvent::getConnection () const { + return id; + } + + boost::intrusive_ptr<Connection> ConnectionEvent::getListenId() const { + return listen_id; + } + + // Wrap the passed in rdma_cm_id with a Connection + // this basically happens only on connection request + Connection::Connection(::rdma_cm_id* i) : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + id(mkId(i)), + context(0) + { + impl->fd = id->channel->fd; + + // Just overwrite the previous context as it will + // have come from the listening connection + if (i) + i->context = this; + } + + Connection::Connection() : + qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), + channel(mkEChannel()), + id(mkId(channel.get(), this, RDMA_PS_TCP)), + context(0) + { + impl->fd = channel->fd; + } + + Connection::~Connection() { + // Reset the id context in case someone else has it + id->context = 0; + } + + void Connection::ensureQueuePair() { + assert(id.get()); + + // Only allocate a queue pair if there isn't one already + if (qp) + return; + + qp = new QueuePair(id); + } + + Connection::intrusive_ptr Connection::make() { + return new Connection(); + } + + Connection::intrusive_ptr Connection::find(::rdma_cm_id* i) { + if (!i) + return 0; + Connection* id = static_cast< Connection* >(i->context); + if (!id) + throw std::logic_error("Couldn't find existing Connection"); + return id; + } + + // Make channel non-blocking by making + // associated fd nonblocking + void Connection::nonblocking() { + assert(id.get()); + ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK); + } + + // If we get EAGAIN because the channel has been set non blocking + // and we'd have to wait then return an empty event + ConnectionEvent Connection::getNextEvent() { + assert(id.get()); + ::rdma_cm_event* e; + int rc = ::rdma_get_cm_event(id->channel, &e); + if (GETERR(rc) == EAGAIN) + return ConnectionEvent(); + CHECK(rc); + return ConnectionEvent(e); + } + + void Connection::bind(const qpid::sys::SocketAddress& src_addr) const { + assert(id.get()); + CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr)); + } + + void Connection::listen(int backlog) const { + assert(id.get()); + CHECK(::rdma_listen(id.get(), backlog)); + } + + void Connection::resolve_addr( + const qpid::sys::SocketAddress& dst_addr, + int timeout_ms) const + { + assert(id.get()); + CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms)); + } + + void Connection::resolve_route(int timeout_ms) const { + assert(id.get()); + CHECK(::rdma_resolve_route(id.get(), timeout_ms)); + } + + void Connection::disconnect() const { + assert(id.get()); + int rc = ::rdma_disconnect(id.get()); + // iWarp doesn't let you disconnect a disconnected connection + // but Infiniband can do so it's okay to call rdma_disconnect() + // in response to a disconnect event, but we may get an error + if (GETERR(rc) == EINVAL) + return; + CHECK(rc); + } + + // TODO: Currently you can only connect with the default connection parameters + void Connection::connect(const void* data, size_t len) { + assert(id.get()); + // Need to have a queue pair before we can connect + ensureQueuePair(); + + ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; + p.private_data = data; + p.private_data_len = len; + CHECK(::rdma_connect(id.get(), &p)); + } + + void Connection::connect() { + connect(0, 0); + } + + void Connection::accept(const ::rdma_conn_param& param, const void* data, size_t len) { + assert(id.get()); + // Need to have a queue pair before we can accept + ensureQueuePair(); + + ::rdma_conn_param p = param; + p.private_data = data; + p.private_data_len = len; + CHECK(::rdma_accept(id.get(), &p)); + } + + void Connection::accept(const ::rdma_conn_param& param) { + accept(param, 0, 0); + } + + void Connection::reject(const void* data, size_t len) const { + assert(id.get()); + CHECK(::rdma_reject(id.get(), data, len)); + } + + void Connection::reject() const { + assert(id.get()); + CHECK(::rdma_reject(id.get(), 0, 0)); + } + + QueuePair::intrusive_ptr Connection::getQueuePair() { + assert(id.get()); + + ensureQueuePair(); + + return qp; + } + + std::string Connection::getLocalName() const { + ::sockaddr* addr = ::rdma_get_local_addr(id.get()); + char hostName[NI_MAXHOST]; + char portName[NI_MAXSERV]; + CHECK_IBV(::getnameinfo( + addr, sizeof(::sockaddr_storage), + hostName, sizeof(hostName), + portName, sizeof(portName), + NI_NUMERICHOST | NI_NUMERICSERV)); + std::string r(hostName); + r += ":"; + r += portName; + return r; + } + + std::string Connection::getPeerName() const { + ::sockaddr* addr = ::rdma_get_peer_addr(id.get()); + char hostName[NI_MAXHOST]; + char portName[NI_MAXSERV]; + CHECK_IBV(::getnameinfo( + addr, sizeof(::sockaddr_storage), + hostName, sizeof(hostName), + portName, sizeof(portName), + NI_NUMERICHOST | NI_NUMERICSERV)); + std::string r(hostName); + r += ":"; + r += portName; + return r; + } } std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) { diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index bea5a5d979..5803ae5545 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -21,25 +21,19 @@ #ifndef RDMA_WRAP_H #define RDMA_WRAP_H -#include "qpid/sys/rdma/rdma_factories.h" - #include <rdma/rdma_cma.h> #include "qpid/RefCounted.h" #include "qpid/sys/IOHandle.h" -#include "qpid/sys/posix/PrivatePosix.h" - -#include <fcntl.h> - -#include <netdb.h> -#include <vector> -#include <algorithm> -#include <iostream> -#include <stdexcept> #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> +namespace qpid { +namespace sys { + class SocketAddress; +}} + namespace Rdma { const int DEFAULT_TIMEOUT = 2000; // 2 secs const int DEFAULT_BACKLOG = 100; @@ -57,20 +51,8 @@ namespace Rdma { int32_t dataStart; int32_t dataCount; - Buffer(::ibv_pd* pd, char* const b, const int32_t s) : - bytes(b), - byteCount(s), - dataStart(0), - dataCount(0), - mr(CHECK_NULL(::ibv_reg_mr( - pd, bytes, byteCount, - ::IBV_ACCESS_LOCAL_WRITE))) - {} - - ~Buffer() { - (void) ::ibv_dereg_mr(mr); - delete [] bytes; - } + Buffer(::ibv_pd* pd, char* const b, const int32_t s); + ~Buffer(); private: ::ibv_mr* mr; @@ -91,57 +73,28 @@ namespace Rdma { friend class QueuePair; - QueuePairEvent() : - dir(NONE) - {} - + QueuePairEvent(); QueuePairEvent( const ::ibv_wc& w, boost::shared_ptr< ::ibv_cq > c, - QueueDirection d) : - cq(c), - wc(w), - dir(d) - { - assert(dir != NONE); - } + QueueDirection d); public: - operator bool() const { - return dir != NONE; - } - - bool immPresent() const { - return wc.wc_flags & IBV_WC_WITH_IMM; - } - - uint32_t getImm() const { - return ntohl(wc.imm_data); - } - - QueueDirection getDirection() const { - return dir; - } - - ::ibv_wc_opcode getEventType() const { - return wc.opcode; - } - - ::ibv_wc_status getEventStatus() const { - return wc.status; - } - - Buffer* getBuffer() const { - Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id); - b->dataCount = wc.byte_len; - return b; - } + operator bool() const; + bool immPresent() const; + uint32_t getImm() const; + QueueDirection getDirection() const; + ::ibv_wc_opcode getEventType() const; + ::ibv_wc_status getEventStatus() const; + Buffer* getBuffer() const; }; // Wrapper for a queue pair - this has the functionality for // putting buffers on the receive queue and for sending buffers // to the other end of the connection. class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { + friend class Connection; + boost::shared_ptr< ::ibv_pd > pd; boost::shared_ptr< ::ibv_comp_channel > cchannel; boost::shared_ptr< ::ibv_cq > scq; @@ -150,8 +103,6 @@ namespace Rdma { int outstandingSendEvents; int outstandingRecvEvents; - friend class Connection; - QueuePair(boost::shared_ptr< ::rdma_cm_id > id); ~QueuePair(); @@ -159,52 +110,17 @@ namespace Rdma { typedef boost::intrusive_ptr<QueuePair> intrusive_ptr; // Create a buffer to use for writing - Buffer* createBuffer(int s) { - return new Buffer(pd.get(), new char[s], s); - } + Buffer* createBuffer(int s); // Make channel non-blocking by making // associated fd nonblocking - void nonblocking() { - ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK); - } + void nonblocking(); // If we get EAGAIN because the channel has been set non blocking // and we'd have to wait then return an empty event - QueuePair::intrusive_ptr getNextChannelEvent() { - // First find out which cq has the event - ::ibv_cq* cq; - void* ctx; - int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx); - if (rc == -1 && errno == EAGAIN) - return 0; - CHECK(rc); - - // Batch acknowledge the event - if (cq == scq.get()) { - if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) { - ::ibv_ack_cq_events(cq, outstandingSendEvents); - outstandingSendEvents = 0; - } - } else if (cq == rcq.get()) { - if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) { - ::ibv_ack_cq_events(cq, outstandingRecvEvents); - outstandingRecvEvents = 0; - } - } - - return static_cast<QueuePair*>(ctx); - } + QueuePair::intrusive_ptr getNextChannelEvent(); - QueuePairEvent getNextEvent() { - ::ibv_wc w; - if (::ibv_poll_cq(scq.get(), 1, &w) == 1) - return QueuePairEvent(w, scq, SEND); - else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1) - return QueuePairEvent(w, rcq, RECV); - else - return QueuePairEvent(); - } + QueuePairEvent getNextEvent(); void postRecv(Buffer* buf); void postSend(Buffer* buf); @@ -227,23 +143,11 @@ namespace Rdma { // Default copy, assignment and destructor ok public: - operator bool() const { - return event; - } - - ::rdma_cm_event_type getEventType() const { - return event->event; - } - + operator bool() const; + ::rdma_cm_event_type getEventType() const; ::rdma_conn_param getConnectionParam() const; - - boost::intrusive_ptr<Connection> getConnection () const { - return id; - } - - boost::intrusive_ptr<Connection> getListenId() const { - return listen_id; - } + boost::intrusive_ptr<Connection> getConnection () const; + boost::intrusive_ptr<Connection> getListenId() const; }; // For the moment this is a fairly simple wrapper for rdma_cm_id. @@ -264,60 +168,17 @@ namespace Rdma { // Wrap the passed in rdma_cm_id with a Connection // this basically happens only on connection request - Connection(::rdma_cm_id* i) : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), - id(i, destroyId), - context(0) - { - impl->fd = id->channel->fd; - - // Just overwrite the previous context as it will - // have come from the listening connection - if (i) - i->context = this; - } - - Connection() : - qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), - channel(mkEChannel()), - id(mkId(channel.get(), this, RDMA_PS_TCP)), - context(0) - { - impl->fd = channel->fd; - } + Connection(::rdma_cm_id* i); + Connection(); + ~Connection(); - ~Connection() { - // Reset the id context in case someone else has it - id->context = 0; - } - - // Default destructor fine - - void ensureQueuePair() { - assert(id.get()); - - // Only allocate a queue pair if there isn't one already - if (qp) - return; - - qp = new QueuePair(id); - } + void ensureQueuePair(); public: typedef boost::intrusive_ptr<Connection> intrusive_ptr; - static intrusive_ptr make() { - return new Connection(); - } - - static intrusive_ptr find(::rdma_cm_id* i) { - if (!i) - return 0; - Connection* id = static_cast< Connection* >(i->context); - if (!id) - throw std::logic_error("Couldn't find existing Connection"); - return id; - } + static intrusive_ptr make(); + static intrusive_ptr find(::rdma_cm_id* i); template <typename T> void addContext(T* c) { @@ -333,169 +194,48 @@ namespace Rdma { // Make channel non-blocking by making // associated fd nonblocking - void nonblocking() { - assert(id.get()); - ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK); - } + void nonblocking(); // If we get EAGAIN because the channel has been set non blocking // and we'd have to wait then return an empty event - ConnectionEvent getNextEvent() { - assert(id.get()); - ::rdma_cm_event* e; - int rc = ::rdma_get_cm_event(id->channel, &e); - if (GETERR(rc) == EAGAIN) - return ConnectionEvent(); - CHECK(rc); - return ConnectionEvent(e); - } - - void bind(const qpid::sys::SocketAddress& src_addr) const { - assert(id.get()); - CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr)); - } - - void listen(int backlog = DEFAULT_BACKLOG) const { - assert(id.get()); - CHECK(::rdma_listen(id.get(), backlog)); - } + ConnectionEvent getNextEvent(); + void bind(const qpid::sys::SocketAddress& src_addr) const; + void listen(int backlog = DEFAULT_BACKLOG) const; void resolve_addr( const qpid::sys::SocketAddress& dst_addr, - int timeout_ms = DEFAULT_TIMEOUT) const - { - assert(id.get()); - CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms)); - } - - void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const { - assert(id.get()); - CHECK(::rdma_resolve_route(id.get(), timeout_ms)); - } - - void disconnect() const { - assert(id.get()); - int rc = ::rdma_disconnect(id.get()); - // iWarp doesn't let you disconnect a disconnected connection - // but Infiniband can do so it's okay to call rdma_disconnect() - // in response to a disconnect event, but we may get an error - if (GETERR(rc) == EINVAL) - return; - CHECK(rc); - } + int timeout_ms = DEFAULT_TIMEOUT) const; + void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const; + void disconnect() const; // TODO: Currently you can only connect with the default connection parameters - void connect() { - assert(id.get()); - - // Need to have a queue pair before we can connect - ensureQueuePair(); - - ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; - CHECK(::rdma_connect(id.get(), &p)); - } - + void connect(const void* data, size_t len); + void connect(); template <typename T> void connect(const T* data) { - assert(id.get()); - // Need to have a queue pair before we can connect - ensureQueuePair(); - - ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; - p.private_data = data; - p.private_data_len = sizeof(T); - CHECK(::rdma_connect(id.get(), &p)); - } + connect(data, sizeof(T)); + } // TODO: Not sure how to default accept params - they come from the connection request // event + void accept(const ::rdma_conn_param& param, const void* data, size_t len); + void accept(const ::rdma_conn_param& param); template <typename T> void accept(const ::rdma_conn_param& param, const T* data) { - assert(id.get()); - // Need to have a queue pair before we can accept - ensureQueuePair(); - - ::rdma_conn_param p = param; - p.private_data = data; - p.private_data_len = sizeof(T); - CHECK(::rdma_accept(id.get(), &p)); - } - - void accept(const ::rdma_conn_param& param) { - assert(id.get()); - // Need to have a queue pair before we can accept - ensureQueuePair(); - - ::rdma_conn_param p = param; - p.private_data = 0; - p.private_data_len = 0; - CHECK(::rdma_accept(id.get(), &p)); + accept(param, data, sizeof(T)); } + void reject(const void* data, size_t len) const; + void reject() const; template <typename T> void reject(const T* data) const { - assert(id.get()); - CHECK(::rdma_reject(id.get(), data, sizeof(T))); + reject(data, sizeof(T)); } - void reject() const { - assert(id.get()); - CHECK(::rdma_reject(id.get(), 0, 0)); - } - - QueuePair::intrusive_ptr getQueuePair() { - assert(id.get()); - - ensureQueuePair(); - - return qp; - } - - std::string getLocalName() const { - ::sockaddr* addr = ::rdma_get_local_addr(id.get()); - char hostName[NI_MAXHOST]; - char portName[NI_MAXSERV]; - CHECK_IBV(::getnameinfo( - addr, sizeof(::sockaddr_storage), - hostName, sizeof(hostName), - portName, sizeof(portName), - NI_NUMERICHOST | NI_NUMERICSERV)); - std::string r(hostName); - r += ":"; - r += portName; - return r; - } - - std::string getPeerName() const { - ::sockaddr* addr = ::rdma_get_peer_addr(id.get()); - char hostName[NI_MAXHOST]; - char portName[NI_MAXSERV]; - CHECK_IBV(::getnameinfo( - addr, sizeof(::sockaddr_storage), - hostName, sizeof(hostName), - portName, sizeof(portName), - NI_NUMERICHOST | NI_NUMERICSERV)); - std::string r(hostName); - r += ":"; - r += portName; - return r; - } + QueuePair::intrusive_ptr getQueuePair(); + std::string getLocalName() const; + std::string getPeerName() const; }; - - inline void QueuePair::notifyRecv() { - CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0)); - } - - inline void QueuePair::notifySend() { - CHECK_IBV(ibv_req_notify_cq(scq.get(), 0)); - } - - inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) : - id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ? - Connection::find(e->id) : new Connection(e->id)), - listen_id(Connection::find(e->listen_id)), - event(e, acker) - {} } std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t); |