diff options
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 516 |
1 files changed, 434 insertions, 82 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 755d6f17c4..8d06fccba1 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -1,12 +1,41 @@ -#include "RdmaIO.h" +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/rdma/RdmaIO.h" + +#include "qpid/log/Statement.h" + #include <iostream> #include <boost/bind.hpp> +using qpid::sys::SocketAddress; +using qpid::sys::DispatchHandle; +using qpid::sys::Poller; + namespace Rdma { AsynchIO::AsynchIO( QueuePair::intrusive_ptr q, - int s, + int size, + int xCredit, + int rCount, ReadCallback rc, IdleCallback ic, FullCallback fc, @@ -14,10 +43,15 @@ namespace Rdma { ) : qp(q), dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0), - bufferSize(s), - recvBufferCount(DEFAULT_WR_ENTRIES), - xmitBufferCount(DEFAULT_WR_ENTRIES), + bufferSize(size), + recvCredit(0), + xmitCredit(xCredit), + recvBufferCount(rCount), + xmitBufferCount(xCredit), outstandingWrites(0), + closed(false), + deleting(false), + state(IDLE), readCallback(rc), idleCallback(ic), fullCallback(fc), @@ -29,80 +63,281 @@ namespace Rdma { // Prepost some recv buffers before we go any further for (int i = 0; i<recvBufferCount; ++i) { + // Allocate recv buffer Buffer* b = qp->createBuffer(bufferSize); buffers.push_front(b); b->dataCount = b->byteCount; qp->postRecv(b); } + + for (int i = 0; i<xmitBufferCount; ++i) { + // Allocate xmit buffer + Buffer* b = qp->createBuffer(bufferSize); + buffers.push_front(b); + bufferQueue.push_front(b); + b->dataCount = 0; + b->dataStart = 0; + } } AsynchIO::~AsynchIO() { + // Warn if we are deleting whilst there are still unreclaimed write buffers + if ( outstandingWrites>0 ) + QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); + + // Turn off callbacks (before doing the deletes) + dataHandle.stopWatch(); + // The buffers ptr_deque automatically deletes all the buffers we've allocated + // TODO: It might turn out to be more efficient in high connection loads to reuse the + // buffers rather than having to reregister them all the time (this would be straightforward if all + // connections haver the same buffer size and harder otherwise) } void AsynchIO::start(Poller::shared_ptr poller) { dataHandle.startWatch(poller); } - // TODO: Currently we don't prevent write buffer overrun we just advise - // when to stop writing. - void AsynchIO::queueWrite(Buffer* buff) { - qp->postSend(buff); - ++outstandingWrites; - if (outstandingWrites >= xmitBufferCount) { - fullCallback(*this); + // Mark for deletion/Delete this object when we have no outstanding writes + void AsynchIO::deferDelete() { + State oldState; + State newState; + bool doReturn; + //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // It is safe to assign to deleting here as we either delete ourselves + // before leaving this function or deleting is set on exit + do { + newState = oldState = state.get(); + doReturn = false; + if (outstandingWrites > 0 || oldState != IDLE) { + deleting = true; + doReturn = true; + } else{ + newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { + return; } + delete this; } - void AsynchIO::notifyPendingWrite() { - // Just perform the idle callback (if possible) - if (outstandingWrites < xmitBufferCount) { - idleCallback(*this); + void AsynchIO::queueWrite(Buffer* buff) { + // Make sure we don't overrun our available buffers + // either at our end or the known available at the peers end + if (writable()) { + // TODO: We might want to batch up sending credit + if (recvCredit > 0) { + int creditSent = recvCredit & ~FlagsMask; + qp->postSend(creditSent, buff); + recvCredit -= creditSent; + } else { + qp->postSend(buff); + } + ++outstandingWrites; + --xmitCredit; + } else { + if (fullCallback) { + fullCallback(*this, buff); + } else { + QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away"); + returnBuffer(buff); + } } } + // Mark now closed (so we don't accept any more writes or make any idle callbacks) void AsynchIO::queueWriteClose() { + // Don't think we actually need to lock here as transition is 1 way only to closed + closed = true; } - Buffer* AsynchIO::getBuffer() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); - if (bufferQueue.empty()) { - Buffer* b = qp->createBuffer(bufferSize); - buffers.push_front(b); - b->dataCount = 0; - return b; - } else { - Buffer* b = bufferQueue.front(); - bufferQueue.pop_front(); - b->dataCount = 0; - b->dataStart = 0; - return b; + void AsynchIO::notifyPendingWrite() { + // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. + // If we are then we just return as we know that we will eventually do the idle callback anyway. + // + // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // We can get here in any state (as the caller could be in any thread) + State oldState; + State newState; + bool doReturn; + do { + newState = oldState = state.get(); + doReturn = false; + switch (oldState) { + case NOTIFY_WRITE: + case PENDING_NOTIFY: + // We only need to note a pending notify if we're already doing a notify as data processing + // is always followed by write notification processing + newState = PENDING_NOTIFY; + doReturn = true; + break; + case PENDING_DATA: + doReturn = true; + break; + case DATA: + // Only need to return here as data processing will do the idleCallback itself anyway + doReturn = true; + break; + case IDLE: + newState = NOTIFY_WRITE; + break; + case DELETED: + assert(oldState!=DELETED); + doReturn = true; + }; + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { + return; + } + + doWriteCallback(); + + // Keep track of what we need to do so that we can release the lock + enum {COMPLETION, NOTIFY, RETURN, EXIT} action; + // If there was pending data whilst we were doing this, process it now + // + // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the + // correct result if we reenter notifyPendingWrite(), in which case we want to + // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks + // not IDLE) + do { + //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + do { + newState = oldState = state.get(); + action = RETURN; // Anything but COMPLETION + switch (oldState) { + case NOTIFY_WRITE: + newState = IDLE; + action = (action == COMPLETION) ? EXIT : RETURN; + break; + case PENDING_DATA: + newState = NOTIFY_WRITE; + action = COMPLETION; + break; + case PENDING_NOTIFY: + newState = NOTIFY_WRITE; + action = NOTIFY; + break; + default: + assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED); + action = RETURN; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + + // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state + // so that we do need to process completions or notifications now + switch (action) { + case COMPLETION: + processCompletions(); + // Fall through + case NOTIFY: + doWriteCallback(); + break; + case RETURN: + return; + case EXIT: + // If we just processed completions we might need to delete ourselves + if (deleting && outstandingWrites == 0) { + delete this; + } + return; + } + } while (true); + } + + void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) { + // Keep track of writable notifications + // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + State oldState; + State newState; + bool doReturn; + do { + newState = oldState = state.get(); + doReturn = false; + // We're already processing a notification + switch (oldState) { + case IDLE: + newState = DATA; + break; + default: + // Can't get here in DATA state as that would violate the serialisation rules + assert( oldState!=DATA ); + newState = PENDING_DATA; + doReturn = true; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { + return; } + processCompletions(); + + //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + do { + newState = oldState = state.get(); + assert( oldState==DATA ); + newState = NOTIFY_WRITE; + } while (!state.boolCompareAndSwap(oldState, newState)); + + do { + doWriteCallback(); + + // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + bool doBreak; + do { + newState = oldState = state.get(); + doBreak = false; + if ( oldState==NOTIFY_WRITE ) { + newState = IDLE; + doBreak = true; + } else { + // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered + assert( oldState==PENDING_NOTIFY ); + newState = NOTIFY_WRITE; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doBreak) { + break; + } + } while (true); + + // We might need to delete ourselves + if (deleting && outstandingWrites == 0) { + delete this; + } } - void AsynchIO::dataEvent(DispatchHandle&) { + void AsynchIO::processCompletions() { QueuePair::intrusive_ptr q = qp->getNextChannelEvent(); + // Re-enable notification for queue: + // This needs to happen before we could do anything that could generate more work completion + // events (ie the callbacks etc. in the following). + // This can't make us reenter this code as the handle attached to the completion queue will still be + // disabled by the poller until we leave this code + qp->notifyRecv(); + qp->notifySend(); + + int recvEvents = 0; + int sendEvents = 0; + // If no event do nothing if (!q) return; assert(q == qp); - // Re-enable notification for queue - qp->notifySend(); - qp->notifyRecv(); - // Repeat until no more events do { QueuePairEvent e(qp->getNextEvent()); if (!e) - return; + break; ::ibv_wc_status status = e.getEventStatus(); if (status != IBV_WC_SUCCESS) { errorCallback(*this); + // TODO: Probably need to flush queues at this point return; } @@ -111,46 +346,143 @@ namespace Rdma { Buffer* b = e.getBuffer(); QueueDirection dir = e.getDirection(); if (dir == RECV) { - readCallback(*this, b); + ++recvEvents; + + // Get our xmitCredit if it was sent + bool dataPresent = true; + if (e.immPresent() ) { + xmitCredit += (e.getImm() & ~FlagsMask); + dataPresent = ((e.getImm() & IgnoreData) == 0); + } + + // if there was no data sent then the message was only to update our credit + if ( dataPresent ) { + readCallback(*this, b); + } + // At this point the buffer has been consumed so put it back on the recv queue + b->dataStart = 0; + b->dataCount = 0; qp->postRecv(b); + + // Received another message + ++recvCredit; + + // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently) + if (recvCredit > recvBufferCount/2) { + // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message + // but this is a little unlikely, as to get in this state we have to have received messages without sending any + // for a while so its likely we've received an credit update from the far side. + if (writable()) { + Buffer* ob = getBuffer(); + // Have to send something as adapters hate it when you try to transfer 0 bytes + *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit); + ob->dataCount = sizeof(uint32_t); + + int creditSent = recvCredit & ~FlagsMask; + qp->postSend(creditSent | IgnoreData, ob); + recvCredit -= creditSent; + ++outstandingWrites; + --xmitCredit; + } else { + QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit"); + } + } } else { + ++sendEvents; { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); bufferQueue.push_front(b); } --outstandingWrites; - // TODO: maybe don't call idle unless we're low on write buffers - idleCallback(*this); } } while (true); + + // Not sure if this is expected or not + if (recvEvents == 0 && sendEvents == 0) { + QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions"); + } + } + + void AsynchIO::doWriteCallback() { + // TODO: maybe don't call idle unless we're low on write buffers + // Keep on calling the idle routine as long as we are writable and we got something to write last call + while (writable()) { + int xc = xmitCredit; + idleCallback(*this); + // Check whether we actually wrote anything + if (xmitCredit == xc) { + QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit); + return; + } + } + } + + Buffer* AsynchIO::getBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + assert(!bufferQueue.empty()); + Buffer* b = bufferQueue.front(); + bufferQueue.pop_front(); + b->dataCount = 0; + b->dataStart = 0; + return b; + } + + void AsynchIO::returnBuffer(Buffer* b) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + bufferQueue.push_front(b); + b->dataCount = 0; + b->dataStart = 0; + } + + ConnectionManager::ConnectionManager( + ErrorCallback errc, + DisconnectedCallback dc + ) : + ci(Connection::make()), + handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0), + errorCallback(errc), + disconnectedCallback(dc) + { + ci->nonblocking(); + } + + ConnectionManager::~ConnectionManager() + { + handle.stopWatch(); + } + + void ConnectionManager::start(Poller::shared_ptr poller) { + startConnection(ci); + handle.startWatch(poller); + } + + void ConnectionManager::event(DispatchHandle&) { + connectionEvent(ci); } Listener::Listener( - const sockaddr& src, - ConnectedCallback cc, + const SocketAddress& src, + const ConnectionParams& cp, + EstablishedCallback ec, ErrorCallback errc, DisconnectedCallback dc, ConnectionRequestCallback crc ) : + ConnectionManager(errc, dc), src_addr(src), - ci(Connection::make()), - handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0), - connectedCallback(cc), - errorCallback(errc), - disconnectedCallback(dc), - connectionRequestCallback(crc) + checkConnectionParams(cp), + connectionRequestCallback(crc), + establishedCallback(ec) { - ci->nonblocking(); } - void Listener::start(Poller::shared_ptr poller) { + void Listener::startConnection(Connection::intrusive_ptr ci) { ci->bind(src_addr); ci->listen(); - handle.startWatch(poller); } - void Listener::connectionEvent(DispatchHandle&) { + void Listener::connectionEvent(Connection::intrusive_ptr ci) { ConnectionEvent e(ci->getNextEvent()); // If (for whatever reason) there was no event do nothing @@ -161,65 +493,75 @@ namespace Rdma { // you get from CONNECT_REQUEST has the same context info // as its parent listening rdma_cm_id ::rdma_cm_event_type eventType = e.getEventType(); + ::rdma_conn_param conn_param = e.getConnectionParam(); Rdma::Connection::intrusive_ptr id = e.getConnection(); switch (eventType) { case RDMA_CM_EVENT_CONNECT_REQUEST: { - bool accept = true; - // Extract connection parameters and private data from event - ::rdma_conn_param conn_param = e.getConnectionParam(); + // Make sure peer has sent params we can use + if (!conn_param.private_data || conn_param.private_data_len < sizeof(ConnectionParams)) { + id->reject(); + break; + } + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + + // Reject if requested msg size is bigger than we allow + if (cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize) { + id->reject(&checkConnectionParams); + break; + } + bool accept = true; if (connectionRequestCallback) - //TODO: pass private data to callback (and accept new private data for accept somehow) - accept = connectionRequestCallback(id); + accept = connectionRequestCallback(id, cp); + if (accept) { // Accept connection - id->accept(conn_param); + cp.initialXmitCredit = checkConnectionParams.initialXmitCredit; + id->accept(conn_param, &cp); } else { - //Reject connection + // Reject connection id->reject(); } - break; } case RDMA_CM_EVENT_ESTABLISHED: - connectedCallback(id); + establishedCallback(id); break; case RDMA_CM_EVENT_DISCONNECTED: disconnectedCallback(id); break; case RDMA_CM_EVENT_CONNECT_ERROR: - errorCallback(id); + errorCallback(id, CONNECT_ERROR); break; default: - std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; + // Unexpected response + errorCallback(id, UNKNOWN); + //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; } } Connector::Connector( - const sockaddr& dst, + const SocketAddress& dst, + const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, DisconnectedCallback dc, RejectedCallback rc ) : + ConnectionManager(errc, dc), dst_addr(dst), - ci(Connection::make()), - handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0), - connectedCallback(cc), - errorCallback(errc), - disconnectedCallback(dc), - rejectedCallback(rc) + connectionParams(cp), + rejectedCallback(rc), + connectedCallback(cc) { - ci->nonblocking(); } - void Connector::start(Poller::shared_ptr poller) { + void Connector::startConnection(Connection::intrusive_ptr ci) { ci->resolve_addr(dst_addr); - handle.startWatch(poller); } - void Connector::connectionEvent(DispatchHandle&) { + void Connector::connectionEvent(Connection::intrusive_ptr ci) { ConnectionEvent e(ci->getNextEvent()); // If (for whatever reason) there was no event do nothing @@ -227,6 +569,8 @@ namespace Rdma { return; ::rdma_cm_event_type eventType = e.getEventType(); + ::rdma_conn_param conn_param = e.getConnectionParam(); + Rdma::Connection::intrusive_ptr id = e.getConnection(); switch (eventType) { case RDMA_CM_EVENT_ADDR_RESOLVED: // RESOLVE_ADDR @@ -234,38 +578,46 @@ namespace Rdma { break; case RDMA_CM_EVENT_ADDR_ERROR: // RESOLVE_ADDR - errorCallback(ci); + errorCallback(ci, ADDR_ERROR); break; case RDMA_CM_EVENT_ROUTE_RESOLVED: // RESOLVE_ROUTE: - ci->connect(); + ci->connect(&connectionParams); break; case RDMA_CM_EVENT_ROUTE_ERROR: // RESOLVE_ROUTE: - errorCallback(ci); + errorCallback(ci, ROUTE_ERROR); break; case RDMA_CM_EVENT_CONNECT_ERROR: // CONNECTING - errorCallback(ci); + errorCallback(ci, CONNECT_ERROR); break; case RDMA_CM_EVENT_UNREACHABLE: // CONNECTING - errorCallback(ci); + errorCallback(ci, UNREACHABLE); break; - case RDMA_CM_EVENT_REJECTED: + case RDMA_CM_EVENT_REJECTED: { // CONNECTING - rejectedCallback(ci); + // Extract private data from event + assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams)); + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + rejectedCallback(ci, cp); break; - case RDMA_CM_EVENT_ESTABLISHED: + } + case RDMA_CM_EVENT_ESTABLISHED: { // CONNECTING - connectedCallback(ci); + // Extract private data from event + assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams)); + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + connectedCallback(ci, cp); break; + } case RDMA_CM_EVENT_DISCONNECTED: // ESTABLISHED disconnectedCallback(ci); break; default: - std::cerr << "Warning: unexpected event in connect: " << eventType << "\n"; + QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType); } } } |