/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/log/Statement.h" #include #include using qpid::sys::SocketAddress; using qpid::sys::DispatchHandle; using qpid::sys::Poller; namespace Rdma { AsynchIO::AsynchIO( QueuePair::intrusive_ptr q, int size, int xCredit, int rCount, ReadCallback rc, IdleCallback ic, FullCallback fc, ErrorCallback ec ) : qp(q), dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0), bufferSize(size), recvCredit(0), xmitCredit(xCredit), recvBufferCount(rCount), xmitBufferCount(xCredit), outstandingWrites(0), closed(false), deleting(false), state(IDLE), readCallback(rc), idleCallback(ic), fullCallback(fc), errorCallback(ec) { qp->nonblocking(); qp->notifyRecv(); qp->notifySend(); // Prepost some recv buffers before we go any further for (int i = 0; icreateBuffer(bufferSize); buffers.push_front(b); b->dataCount = b->byteCount; qp->postRecv(b); } for (int i = 0; icreateBuffer(bufferSize); buffers.push_front(b); bufferQueue.push_front(b); b->dataCount = 0; b->dataStart = 0; } } AsynchIO::~AsynchIO() { // Warn if we are deleting whilst there are still unreclaimed write buffers if ( outstandingWrites>0 ) QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); // Turn off callbacks (before doing the deletes) dataHandle.stopWatch(); // The buffers ptr_deque automatically deletes all the buffers we've allocated // TODO: It might turn out to be more efficient in high connection loads to reuse the // buffers rather than having to reregister them all the time (this would be straightforward if all // connections haver the same buffer size and harder otherwise) } void AsynchIO::start(Poller::shared_ptr poller) { dataHandle.startWatch(poller); } // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::deferDelete() { State oldState; State newState; bool doReturn; //qpid::sys::ScopedLock l(stateLock); // It is safe to assign to deleting here as we either delete ourselves // before leaving this function or deleting is set on exit do { newState = oldState = state.get(); doReturn = false; if (outstandingWrites > 0 || oldState != IDLE) { deleting = true; doReturn = true; } else{ newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor } } while (!state.boolCompareAndSwap(oldState, newState)); if (doReturn) { return; } delete this; } void AsynchIO::queueWrite(Buffer* buff) { // Make sure we don't overrun our available buffers // either at our end or the known available at the peers end if (writable()) { // TODO: We might want to batch up sending credit if (recvCredit > 0) { int creditSent = recvCredit & ~FlagsMask; qp->postSend(creditSent, buff); recvCredit -= creditSent; } else { qp->postSend(buff); } ++outstandingWrites; --xmitCredit; } else { if (fullCallback) { fullCallback(*this, buff); } else { QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away"); returnBuffer(buff); } } } // Mark now closed (so we don't accept any more writes or make any idle callbacks) void AsynchIO::queueWriteClose() { // Don't think we actually need to lock here as transition is 1 way only to closed closed = true; } void AsynchIO::notifyPendingWrite() { // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. // If we are then we just return as we know that we will eventually do the idle callback anyway. // // qpid::sys::ScopedLock l(stateLock); // We can get here in any state (as the caller could be in any thread) State oldState; State newState; bool doReturn; do { newState = oldState = state.get(); doReturn = false; switch (oldState) { case NOTIFY_WRITE: case PENDING_NOTIFY: // We only need to note a pending notify if we're already doing a notify as data processing // is always followed by write notification processing newState = PENDING_NOTIFY; doReturn = true; break; case PENDING_DATA: doReturn = true; break; case DATA: // Only need to return here as data processing will do the idleCallback itself anyway doReturn = true; break; case IDLE: newState = NOTIFY_WRITE; break; case DELETED: assert(oldState!=DELETED); doReturn = true; }; } while (!state.boolCompareAndSwap(oldState, newState)); if (doReturn) { return; } doWriteCallback(); // Keep track of what we need to do so that we can release the lock enum {COMPLETION, NOTIFY, RETURN, EXIT} action; // If there was pending data whilst we were doing this, process it now // // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the // correct result if we reenter notifyPendingWrite(), in which case we want to // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks // not IDLE) do { //qpid::sys::ScopedLock l(stateLock); do { newState = oldState = state.get(); action = RETURN; // Anything but COMPLETION switch (oldState) { case NOTIFY_WRITE: newState = IDLE; action = (action == COMPLETION) ? EXIT : RETURN; break; case PENDING_DATA: newState = NOTIFY_WRITE; action = COMPLETION; break; case PENDING_NOTIFY: newState = NOTIFY_WRITE; action = NOTIFY; break; default: assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED); action = RETURN; } } while (!state.boolCompareAndSwap(oldState, newState)); // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state // so that we do need to process completions or notifications now switch (action) { case COMPLETION: processCompletions(); // Fall through case NOTIFY: doWriteCallback(); break; case RETURN: return; case EXIT: // If we just processed completions we might need to delete ourselves if (deleting && outstandingWrites == 0) { delete this; } return; } } while (true); } void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) { // Keep track of writable notifications // qpid::sys::ScopedLock l(stateLock); State oldState; State newState; bool doReturn; do { newState = oldState = state.get(); doReturn = false; // We're already processing a notification switch (oldState) { case IDLE: newState = DATA; break; default: // Can't get here in DATA state as that would violate the serialisation rules assert( oldState!=DATA ); newState = PENDING_DATA; doReturn = true; } } while (!state.boolCompareAndSwap(oldState, newState)); if (doReturn) { return; } processCompletions(); //qpid::sys::ScopedLock l(stateLock); do { newState = oldState = state.get(); assert( oldState==DATA ); newState = NOTIFY_WRITE; } while (!state.boolCompareAndSwap(oldState, newState)); do { doWriteCallback(); // qpid::sys::ScopedLock l(stateLock); bool doBreak; do { newState = oldState = state.get(); doBreak = false; if ( oldState==NOTIFY_WRITE ) { newState = IDLE; doBreak = true; } else { // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered assert( oldState==PENDING_NOTIFY ); newState = NOTIFY_WRITE; } } while (!state.boolCompareAndSwap(oldState, newState)); if (doBreak) { break; } } while (true); // We might need to delete ourselves if (deleting && outstandingWrites == 0) { delete this; } } void AsynchIO::processCompletions() { QueuePair::intrusive_ptr q = qp->getNextChannelEvent(); // Re-enable notification for queue: // This needs to happen before we could do anything that could generate more work completion // events (ie the callbacks etc. in the following). // This can't make us reenter this code as the handle attached to the completion queue will still be // disabled by the poller until we leave this code qp->notifyRecv(); qp->notifySend(); int recvEvents = 0; int sendEvents = 0; // If no event do nothing if (!q) return; assert(q == qp); // Repeat until no more events do { QueuePairEvent e(qp->getNextEvent()); if (!e) break; ::ibv_wc_status status = e.getEventStatus(); if (status != IBV_WC_SUCCESS) { errorCallback(*this); // TODO: Probably need to flush queues at this point return; } // Test if recv (or recv with imm) //::ibv_wc_opcode eventType = e.getEventType(); Buffer* b = e.getBuffer(); QueueDirection dir = e.getDirection(); if (dir == RECV) { ++recvEvents; // Get our xmitCredit if it was sent bool dataPresent = true; if (e.immPresent() ) { xmitCredit += (e.getImm() & ~FlagsMask); dataPresent = ((e.getImm() & IgnoreData) == 0); } // if there was no data sent then the message was only to update our credit if ( dataPresent ) { readCallback(*this, b); } // At this point the buffer has been consumed so put it back on the recv queue b->dataStart = 0; b->dataCount = 0; qp->postRecv(b); // Received another message ++recvCredit; // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently) if (recvCredit > recvBufferCount/2) { // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message // but this is a little unlikely, as to get in this state we have to have received messages without sending any // for a while so its likely we've received an credit update from the far side. if (writable()) { Buffer* ob = getBuffer(); // Have to send something as adapters hate it when you try to transfer 0 bytes *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit); ob->dataCount = sizeof(uint32_t); int creditSent = recvCredit & ~FlagsMask; qp->postSend(creditSent | IgnoreData, ob); recvCredit -= creditSent; ++outstandingWrites; --xmitCredit; } else { QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit"); } } } else { ++sendEvents; { qpid::sys::ScopedLock l(bufferQueueLock); bufferQueue.push_front(b); } --outstandingWrites; } } while (true); // Not sure if this is expected or not if (recvEvents == 0 && sendEvents == 0) { QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions"); } } void AsynchIO::doWriteCallback() { // TODO: maybe don't call idle unless we're low on write buffers // Keep on calling the idle routine as long as we are writable and we got something to write last call while (writable()) { int xc = xmitCredit; idleCallback(*this); // Check whether we actually wrote anything if (xmitCredit == xc) { QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit); return; } } } Buffer* AsynchIO::getBuffer() { qpid::sys::ScopedLock l(bufferQueueLock); assert(!bufferQueue.empty()); Buffer* b = bufferQueue.front(); bufferQueue.pop_front(); b->dataCount = 0; b->dataStart = 0; return b; } void AsynchIO::returnBuffer(Buffer* b) { qpid::sys::ScopedLock l(bufferQueueLock); bufferQueue.push_front(b); b->dataCount = 0; b->dataStart = 0; } ConnectionManager::ConnectionManager( ErrorCallback errc, DisconnectedCallback dc ) : ci(Connection::make()), handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0), errorCallback(errc), disconnectedCallback(dc) { ci->nonblocking(); } ConnectionManager::~ConnectionManager() { handle.stopWatch(); } void ConnectionManager::start(Poller::shared_ptr poller) { startConnection(ci); handle.startWatch(poller); } void ConnectionManager::event(DispatchHandle&) { connectionEvent(ci); } Listener::Listener( const SocketAddress& src, const ConnectionParams& cp, EstablishedCallback ec, ErrorCallback errc, DisconnectedCallback dc, ConnectionRequestCallback crc ) : ConnectionManager(errc, dc), src_addr(src), checkConnectionParams(cp), connectionRequestCallback(crc), establishedCallback(ec) { } void Listener::startConnection(Connection::intrusive_ptr ci) { ci->bind(src_addr); ci->listen(); } void Listener::connectionEvent(Connection::intrusive_ptr ci) { ConnectionEvent e(ci->getNextEvent()); // If (for whatever reason) there was no event do nothing if (!e) return; // Important documentation ommision the new rdma_cm_id // you get from CONNECT_REQUEST has the same context info // as its parent listening rdma_cm_id ::rdma_cm_event_type eventType = e.getEventType(); ::rdma_conn_param conn_param = e.getConnectionParam(); Rdma::Connection::intrusive_ptr id = e.getConnection(); switch (eventType) { case RDMA_CM_EVENT_CONNECT_REQUEST: { // Make sure peer has sent params we can use if (!conn_param.private_data || conn_param.private_data_len < sizeof(ConnectionParams)) { id->reject(); break; } ConnectionParams cp = *static_cast(conn_param.private_data); // Reject if requested msg size is bigger than we allow if (cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize) { id->reject(&checkConnectionParams); break; } bool accept = true; if (connectionRequestCallback) accept = connectionRequestCallback(id, cp); if (accept) { // Accept connection cp.initialXmitCredit = checkConnectionParams.initialXmitCredit; id->accept(conn_param, &cp); } else { // Reject connection id->reject(); } break; } case RDMA_CM_EVENT_ESTABLISHED: establishedCallback(id); break; case RDMA_CM_EVENT_DISCONNECTED: disconnectedCallback(id); break; case RDMA_CM_EVENT_CONNECT_ERROR: errorCallback(id, CONNECT_ERROR); break; default: // Unexpected response errorCallback(id, UNKNOWN); //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; } } Connector::Connector( const SocketAddress& dst, const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, DisconnectedCallback dc, RejectedCallback rc ) : ConnectionManager(errc, dc), dst_addr(dst), connectionParams(cp), rejectedCallback(rc), connectedCallback(cc) { } void Connector::startConnection(Connection::intrusive_ptr ci) { ci->resolve_addr(dst_addr); } void Connector::connectionEvent(Connection::intrusive_ptr ci) { ConnectionEvent e(ci->getNextEvent()); // If (for whatever reason) there was no event do nothing if (!e) return; ::rdma_cm_event_type eventType = e.getEventType(); ::rdma_conn_param conn_param = e.getConnectionParam(); Rdma::Connection::intrusive_ptr id = e.getConnection(); switch (eventType) { case RDMA_CM_EVENT_ADDR_RESOLVED: // RESOLVE_ADDR ci->resolve_route(); break; case RDMA_CM_EVENT_ADDR_ERROR: // RESOLVE_ADDR errorCallback(ci, ADDR_ERROR); break; case RDMA_CM_EVENT_ROUTE_RESOLVED: // RESOLVE_ROUTE: ci->connect(&connectionParams); break; case RDMA_CM_EVENT_ROUTE_ERROR: // RESOLVE_ROUTE: errorCallback(ci, ROUTE_ERROR); break; case RDMA_CM_EVENT_CONNECT_ERROR: // CONNECTING errorCallback(ci, CONNECT_ERROR); break; case RDMA_CM_EVENT_UNREACHABLE: // CONNECTING errorCallback(ci, UNREACHABLE); break; case RDMA_CM_EVENT_REJECTED: { // CONNECTING // Extract private data from event assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams)); ConnectionParams cp = *static_cast(conn_param.private_data); rejectedCallback(ci, cp); break; } case RDMA_CM_EVENT_ESTABLISHED: { // CONNECTING // Extract private data from event assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams)); ConnectionParams cp = *static_cast(conn_param.private_data); connectedCallback(ci, cp); break; } case RDMA_CM_EVENT_DISCONNECTED: // ESTABLISHED disconnectedCallback(ci); break; default: QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType); } } }