summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp')
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp720
1 files changed, 720 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
new file mode 100644
index 0000000000..78bcdec68e
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -0,0 +1,720 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/rdma/RdmaIO.h"
+
+#include "qpid/log/Statement.h"
+
+#include <string>
+#include <boost/bind.hpp>
+
+using qpid::sys::SocketAddress;
+using qpid::sys::DispatchHandle;
+using qpid::sys::Poller;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
+
+namespace Rdma {
+ // Set packing as these are 'on the wire' structures
+# pragma pack(push, 1)
+
+ // Header structure for each transmitted frame
+ struct FrameHeader {
+ const static uint32_t FlagsMask = 0xf0000000;
+ uint32_t data; // written in network order
+
+ FrameHeader() {}
+ FrameHeader(uint32_t credit, uint32_t flags = 0) {
+ data = htonl((credit & ~FlagsMask) | (flags & FlagsMask));
+ }
+
+ uint32_t credit() const {
+ return ntohl(data) & ~FlagsMask;
+ }
+
+ uint32_t flags() const {
+ return ntohl(data) & FlagsMask;
+ }
+ };
+
+ const size_t FrameHeaderSize = sizeof(FrameHeader);
+
+ // Structure for Connection Parameters on the network
+ //
+ // The original version (now called 0) of these parameters had a couple of mistakes:
+ // * No way to version the protocol (need to introduce a new protocol for iWarp)
+ // * Used host order int32 (but only deployed on LE archs as far as we know)
+ // so effectively was LE on the wire which is the opposite of network order.
+ //
+ // Fortunately the values sent were sufficiently restricted that a 16 bit short could
+ // be carved out to indicate the protocol version as these bits were always sent as 0.
+ //
+ // So the current version of parameters uses the last 2 bytes to indicate the protocol
+ // version, if this is 0 then we interpret the rest of the struct without byte swapping
+ // to remain compatible with the previous protocol.
+ struct NConnectionParams {
+ uint32_t maxRecvBufferSize;
+ uint16_t initialXmitCredit;
+ uint16_t rdmaProtocolVersion;
+
+ NConnectionParams(const ConnectionParams& c) :
+ maxRecvBufferSize(c.rdmaProtocolVersion ? htonl(c.maxRecvBufferSize) : c.maxRecvBufferSize),
+ initialXmitCredit(c.rdmaProtocolVersion ? htons(c.initialXmitCredit) : c.initialXmitCredit),
+ // 0 is the same with/without byteswapping!
+ rdmaProtocolVersion(htons(c.rdmaProtocolVersion))
+ {}
+
+ operator ConnectionParams() const {
+ return
+ ConnectionParams(
+ rdmaProtocolVersion ? ntohl(maxRecvBufferSize) : maxRecvBufferSize,
+ rdmaProtocolVersion ? ntohs(initialXmitCredit) : initialXmitCredit,
+ ntohs(rdmaProtocolVersion));
+ }
+ };
+# pragma pack(pop)
+
+ class IOException : public std::exception {
+ std::string s;
+
+ public:
+ IOException(std::string s0): s(s0) {}
+ ~IOException() throw() {}
+
+ const char* what() const throw() {
+ return s.c_str();
+ }
+ };
+
+ AsynchIO::AsynchIO(
+ QueuePair::intrusive_ptr q,
+ int version,
+ int size,
+ int xCredit,
+ int rCount,
+ ReadCallback rc,
+ IdleCallback ic,
+ FullCallback fc,
+ ErrorCallback ec
+ ) :
+ protocolVersion(version),
+ bufferSize(size),
+ recvCredit(0),
+ xmitCredit(xCredit),
+ recvBufferCount(rCount),
+ xmitBufferCount(xCredit),
+ outstandingWrites(0),
+ draining(false),
+ state(IDLE),
+ qp(q),
+ dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this), 0, 0),
+ readCallback(rc),
+ idleCallback(ic),
+ fullCallback(fc),
+ errorCallback(ec),
+ pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this))
+ {
+ if (protocolVersion > maxSupportedProtocolVersion)
+ throw IOException("Unsupported Rdma Protocol");
+ qp->nonblocking();
+ qp->notifyRecv();
+ qp->notifySend();
+
+ // Prepost recv buffers before we go any further
+ qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize);
+
+ // Create xmit buffers, reserve space for frame header.
+ qp->createSendBuffers(xmitBufferCount, bufferSize, FrameHeaderSize);
+ }
+
+ AsynchIO::~AsynchIO() {
+ // Warn if we are deleting whilst there are still unreclaimed write buffers
+ if ( outstandingWrites>0 )
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
+
+ // Turn off callbacks if necessary (before doing the deletes)
+ if (state != STOPPED) {
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
+ dataHandle.stopWatch();
+ }
+ // TODO: It might turn out to be more efficient in high connection loads to reuse the
+ // buffers rather than having to reregister them all the time (this would be straightforward if all
+ // connections haver the same buffer size and harder otherwise)
+ }
+
+ void AsynchIO::start(Poller::shared_ptr poller) {
+ dataHandle.startWatch(poller);
+ }
+
+ // State constraints
+ // On entry: None
+ // On exit: STOPPED
+ // Mark for deletion/Delete this object when we have no outstanding writes
+ void AsynchIO::stop(NotifyCallback nc) {
+ ScopedLock<Mutex> l(stateLock);
+ state = STOPPED;
+ notifyCallback = nc;
+ dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this));
+ }
+
+ namespace {
+ void requestedCall(AsynchIO* aio, AsynchIO::RequestCallback callback) {
+ assert(callback);
+ callback(*aio);
+ }
+ }
+
+ void AsynchIO::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ dataHandle.call(boost::bind(&requestedCall, this, callback));
+ }
+
+ // Mark writing closed (so we don't accept any more writes or make any idle callbacks)
+ void AsynchIO::drainWriteQueue(NotifyCallback nc) {
+ draining = true;
+ notifyCallback = nc;
+ }
+
+ void AsynchIO::queueBuffer(Buffer* buff, int credit) {
+ switch (protocolVersion) {
+ case 0:
+ if (!buff) {
+ Buffer* ob = getSendBuffer();
+ // Have to send something as adapters hate it when you try to transfer 0 bytes
+ *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(credit);
+ ob->dataCount(sizeof(uint32_t));
+ qp->postSend(credit | IgnoreData, ob);
+ } else if (credit > 0) {
+ qp->postSend(credit, buff);
+ } else {
+ qp->postSend(buff);
+ }
+ break;
+ case 1:
+ if (!buff)
+ buff = getSendBuffer();
+ // Add FrameHeader after frame data
+ FrameHeader header(credit);
+ assert(buff->dataCount() <= buff->byteCount()); // ensure app data doesn't impinge on reserved space.
+ ::memcpy(buff->bytes()+buff->dataCount(), &header, FrameHeaderSize);
+ buff->dataCount(buff->dataCount()+FrameHeaderSize);
+ qp->postSend(buff);
+ break;
+ }
+ }
+
+ Buffer* AsynchIO::extractBuffer(const QueuePairEvent& e) {
+ Buffer* b = e.getBuffer();
+ switch (protocolVersion) {
+ case 0: {
+ bool dataPresent = true;
+ // Get our xmitCredit if it was sent
+ if (e.immPresent() ) {
+ assert(xmitCredit>=0);
+ xmitCredit += (e.getImm() & ~FlagsMask);
+ dataPresent = ((e.getImm() & IgnoreData) == 0);
+ assert(xmitCredit>0);
+ }
+ if (!dataPresent) {
+ b->dataCount(0);
+ }
+ break;
+ }
+ case 1:
+ b->dataCount(b->dataCount()-FrameHeaderSize);
+ FrameHeader header;
+ ::memcpy(&header, b->bytes()+b->dataCount(), FrameHeaderSize);
+ assert(xmitCredit>=0);
+ xmitCredit += header.credit();
+ assert(xmitCredit>=0);
+ break;
+ }
+
+ return b;
+ }
+
+ void AsynchIO::queueWrite(Buffer* buff) {
+ // Make sure we don't overrun our available buffers
+ // either at our end or the known available at the peers end
+ if (writable()) {
+ // TODO: We might want to batch up sending credit
+ int creditSent = recvCredit & ~FlagsMask;
+ queueBuffer(buff, creditSent);
+ recvCredit -= creditSent;
+ ++outstandingWrites;
+ --xmitCredit;
+ assert(xmitCredit>=0);
+ } else {
+ if (fullCallback) {
+ fullCallback(*this, buff);
+ } else {
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away");
+ returnSendBuffer(buff);
+ }
+ }
+ }
+
+ // State constraints
+ // On entry: None
+ // On exit: NOTIFY_PENDING || STOPPED
+ void AsynchIO::notifyPendingWrite() {
+ ScopedLock<Mutex> l(stateLock);
+ switch (state) {
+ case IDLE:
+ dataHandle.call(pendingWriteAction);
+ // Fall Thru
+ case NOTIFY:
+ state = NOTIFY_PENDING;
+ break;
+ case NOTIFY_PENDING:
+ case STOPPED:
+ break;
+ }
+ }
+
+ // State constraints
+ // On entry: IDLE || STOPPED
+ // On exit: IDLE || STOPPED
+ void AsynchIO::dataEvent() {
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ if (state == STOPPED) return;
+
+ state = NOTIFY_PENDING;
+ }
+ processCompletions();
+
+ writeEvent();
+ }
+
+ // State constraints
+ // On entry: NOTIFY_PENDING || STOPPED
+ // On exit: IDLE || STOPPED
+ void AsynchIO::writeEvent() {
+ State newState;
+ do {
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ switch (state) {
+ case STOPPED:
+ return;
+ default:
+ state = NOTIFY;
+ }
+ }
+
+ doWriteCallback();
+
+ {
+ ScopedLock<Mutex> l(stateLock);
+
+ newState = state;
+ switch (newState) {
+ case NOTIFY_PENDING:
+ case STOPPED:
+ break;
+ default:
+ state = IDLE;
+ }
+ }
+ } while (newState == NOTIFY_PENDING);
+ }
+
+ void AsynchIO::processCompletions() {
+ QueuePair::intrusive_ptr q = qp->getNextChannelEvent();
+
+ // Re-enable notification for queue:
+ // This needs to happen before we could do anything that could generate more work completion
+ // events (ie the callbacks etc. in the following).
+ // This can't make us reenter this code as the handle attached to the completion queue will still be
+ // disabled by the poller until we leave this code
+ qp->notifyRecv();
+ qp->notifySend();
+
+ int recvEvents = 0;
+ int sendEvents = 0;
+
+ // If no event do nothing
+ if (!q)
+ return;
+
+ assert(q == qp);
+
+ // Repeat until no more events
+ do {
+ QueuePairEvent e(qp->getNextEvent());
+ if (!e)
+ break;
+
+ ::ibv_wc_status status = e.getEventStatus();
+ if (status != IBV_WC_SUCCESS) {
+ // Need special check for IBV_WC_WR_FLUSH_ERR here
+ // we will get this for every send/recv queue entry that was pending
+ // when disconnected, these aren't real errors and mostly need to be ignored
+ if (status == IBV_WC_WR_FLUSH_ERR) {
+ QueueDirection dir = e.getDirection();
+ if (dir == SEND) {
+ Buffer* b = e.getBuffer();
+ ++sendEvents;
+ returnSendBuffer(b);
+ --outstandingWrites;
+ } else {
+ ++recvEvents;
+ }
+ continue;
+ }
+ errorCallback(*this);
+ // TODO: Probably need to flush queues at this point
+ return;
+ }
+
+ // Test if recv (or recv with imm)
+ //::ibv_wc_opcode eventType = e.getEventType();
+ QueueDirection dir = e.getDirection();
+ if (dir == RECV) {
+ ++recvEvents;
+
+ Buffer* b = extractBuffer(e);
+
+ // if there was no data sent then the message was only to update our credit
+ if ( b->dataCount() > 0 ) {
+ readCallback(*this, b);
+ }
+
+ // At this point the buffer has been consumed so put it back on the recv queue
+ // TODO: Is this safe to do if the connection is disconnected already?
+ qp->postRecv(b);
+
+ // Received another message
+ ++recvCredit;
+
+ // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently)
+ if (recvCredit > recvBufferCount/2) {
+ // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message
+ // but this is a little unlikely, as to get in this state we have to have received messages without sending any
+ // for a while so its likely we've received an credit update from the far side.
+ if (writable()) {
+ int creditSent = recvCredit & ~FlagsMask;
+ queueBuffer(0, creditSent);
+ recvCredit -= creditSent;
+ ++outstandingWrites;
+ --xmitCredit;
+ assert(xmitCredit>=0);
+ } else {
+ QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit");
+ }
+ }
+ } else {
+ Buffer* b = e.getBuffer();
+ ++sendEvents;
+ returnSendBuffer(b);
+ --outstandingWrites;
+ }
+ } while (true);
+
+ // Not sure if this is expected or not
+ if (recvEvents == 0 && sendEvents == 0) {
+ QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions");
+ }
+ }
+
+ void AsynchIO::doWriteCallback() {
+ // TODO: maybe don't call idle unless we're low on write buffers
+ // Keep on calling the idle routine as long as we are writable and we got something to write last call
+
+ // Do callback even if there are no available free buffers as the application itself might be
+ // holding onto buffers
+ while (writable()) {
+ int xc = xmitCredit;
+ idleCallback(*this);
+ // Check whether we actually wrote anything
+ if (xmitCredit == xc) {
+ QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit);
+ return;
+ }
+ }
+
+ checkDrained();
+ }
+
+ void AsynchIO::checkDrained() {
+ // If we've got all the write confirmations and we're draining
+ // We might get deleted in the drained callback so return immediately
+ if (draining) {
+ if (outstandingWrites == 0) {
+ draining = false;
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ nc(*this);
+ }
+ return;
+ }
+ }
+
+ void AsynchIO::doStoppedCallback() {
+ // Ensure we can't get any more callbacks (except for the stopped callback)
+ dataHandle.stopWatch();
+
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ nc(*this);
+ }
+
+ ConnectionManager::ConnectionManager(
+ ErrorCallback errc,
+ DisconnectedCallback dc
+ ) :
+ state(IDLE),
+ ci(Connection::make()),
+ handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0),
+ errorCallback(errc),
+ disconnectedCallback(dc)
+ {
+ QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager");
+ ci->nonblocking();
+ }
+
+ ConnectionManager::~ConnectionManager()
+ {
+ QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager");
+ }
+
+ void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) {
+ startConnection(ci, addr);
+ handle.startWatch(poller);
+ }
+
+ void ConnectionManager::doStoppedCallback() {
+ // Ensure we can't get any more callbacks (except for the stopped callback)
+ handle.stopWatch();
+
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ nc(*this);
+ }
+
+ void ConnectionManager::stop(NotifyCallback nc) {
+ state = STOPPED;
+ notifyCallback = nc;
+ handle.call(boost::bind(&ConnectionManager::doStoppedCallback, this));
+ }
+
+ void ConnectionManager::event(DispatchHandle&) {
+ if (state.get() == STOPPED) return;
+ connectionEvent(ci);
+ }
+
+ Listener::Listener(
+ const ConnectionParams& cp,
+ EstablishedCallback ec,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ ConnectionRequestCallback crc
+ ) :
+ ConnectionManager(errc, dc),
+ checkConnectionParams(cp),
+ connectionRequestCallback(crc),
+ establishedCallback(ec)
+ {
+ }
+
+ void Listener::startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) {
+ ci->bind(addr);
+ ci->listen();
+ }
+
+ namespace {
+ const int64_t PoisonContext = -1;
+ }
+
+ void Listener::connectionEvent(Connection::intrusive_ptr ci) {
+ ConnectionEvent e(ci->getNextEvent());
+
+ // If (for whatever reason) there was no event do nothing
+ if (!e)
+ return;
+
+ // Important documentation ommision the new rdma_cm_id
+ // you get from CONNECT_REQUEST has the same context info
+ // as its parent listening rdma_cm_id
+ ::rdma_cm_event_type eventType = e.getEventType();
+ ::rdma_conn_param conn_param = e.getConnectionParam();
+ Rdma::Connection::intrusive_ptr id = e.getConnection();
+
+ // Check for previous disconnection (it appears that you actually can get connection
+ // request events after a disconnect event in rare circumstances)
+ if (reinterpret_cast<int64_t>(id->getContext<void*>())==PoisonContext)
+ return;
+
+ switch (eventType) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST: {
+ // Make sure peer has sent params we can use
+ if (!conn_param.private_data || conn_param.private_data_len < sizeof(NConnectionParams)) {
+ QPID_LOG(warning, "Rdma: rejecting connection attempt: unusable connection parameters");
+ id->reject();
+ break;
+ }
+
+ const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data);
+ ConnectionParams cp = *rcp;
+
+ // Reject if requested msg size is bigger than we allow
+ if (
+ cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize ||
+ cp.initialXmitCredit > checkConnectionParams.initialXmitCredit
+ ) {
+ QPID_LOG(warning, "Rdma: rejecting connection attempt: connection parameters out of range: ("
+ << cp.maxRecvBufferSize << ">" << checkConnectionParams.maxRecvBufferSize << " || "
+ << cp.initialXmitCredit << ">" << checkConnectionParams.initialXmitCredit
+ << ")");
+ id->reject(&checkConnectionParams);
+ break;
+ }
+
+ bool accept = true;
+ if (connectionRequestCallback)
+ accept = connectionRequestCallback(id, cp);
+
+ if (accept) {
+ // Accept connection
+ cp.initialXmitCredit = checkConnectionParams.initialXmitCredit;
+ id->accept(conn_param, rcp);
+ } else {
+ // Reject connection
+ QPID_LOG(warning, "Rdma: rejecting connection attempt: application policy");
+ id->reject();
+ }
+ break;
+ }
+ case RDMA_CM_EVENT_ESTABLISHED:
+ establishedCallback(id);
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ disconnectedCallback(id);
+ // Poison the id context so that we do no more callbacks on it
+ id->removeContext();
+ id->addContext(reinterpret_cast<void*>(PoisonContext));
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ errorCallback(id, CONNECT_ERROR);
+ break;
+ default:
+ // Unexpected response
+ errorCallback(id, UNKNOWN);
+ //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
+ }
+ }
+
+ Connector::Connector(
+ const ConnectionParams& cp,
+ ConnectedCallback cc,
+ ErrorCallback errc,
+ DisconnectedCallback dc,
+ RejectedCallback rc
+ ) :
+ ConnectionManager(errc, dc),
+ connectionParams(cp),
+ rejectedCallback(rc),
+ connectedCallback(cc)
+ {
+ }
+
+ void Connector::startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) {
+ ci->resolve_addr(addr);
+ }
+
+ void Connector::connectionEvent(Connection::intrusive_ptr ci) {
+ ConnectionEvent e(ci->getNextEvent());
+
+ // If (for whatever reason) there was no event do nothing
+ if (!e)
+ return;
+
+ ::rdma_cm_event_type eventType = e.getEventType();
+ ::rdma_conn_param conn_param = e.getConnectionParam();
+ Rdma::Connection::intrusive_ptr id = e.getConnection();
+ switch (eventType) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ // RESOLVE_ADDR
+ ci->resolve_route();
+ break;
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ // RESOLVE_ADDR
+ errorCallback(ci, ADDR_ERROR);
+ break;
+ case RDMA_CM_EVENT_ROUTE_RESOLVED: {
+ // RESOLVE_ROUTE:
+ NConnectionParams rcp(connectionParams);
+ ci->connect(&rcp);
+ break;
+ }
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ // RESOLVE_ROUTE:
+ errorCallback(ci, ROUTE_ERROR);
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ // CONNECTING
+ errorCallback(ci, CONNECT_ERROR);
+ break;
+ case RDMA_CM_EVENT_UNREACHABLE:
+ // CONNECTING
+ errorCallback(ci, UNREACHABLE);
+ break;
+ case RDMA_CM_EVENT_REJECTED: {
+ // CONNECTING
+
+ // We can get this event if our peer is not running on the other side
+ // in this case we could get nearly anything in the private data:
+ // From private_data == 0 && private_data_len == 0 (Chelsio iWarp)
+ // to 148 bytes of zeros (Mellanox IB)
+ //
+ // So assume that if the the private data is absent or not the size of
+ // the connection parameters it isn't valid
+ ConnectionParams cp(0, 0, 0);
+ if (conn_param.private_data && conn_param.private_data_len == sizeof(NConnectionParams)) {
+ // Extract private data from event
+ const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data);
+ cp = *rcp;
+ }
+ rejectedCallback(ci, cp);
+ break;
+ }
+ case RDMA_CM_EVENT_ESTABLISHED: {
+ // CONNECTING
+ // Extract private data from event
+ assert(conn_param.private_data && conn_param.private_data_len >= sizeof(NConnectionParams));
+ const NConnectionParams* rcp = static_cast<const NConnectionParams*>(conn_param.private_data);
+ ConnectionParams cp = *rcp;
+ connectedCallback(ci, cp);
+ break;
+ }
+ case RDMA_CM_EVENT_DISCONNECTED:
+ // ESTABLISHED
+ disconnectedCallback(ci);
+ break;
+ default:
+ QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType);
+ }
+ }
+}