diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 95 | ||||
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 140 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 27 | ||||
-rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 8 |
6 files changed, 209 insertions, 85 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index a782caef6b..9be9e7127c 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -75,14 +75,12 @@ class RdmaConnector : public Connector, public sys::Codec framing::OutputHandler* output; Rdma::AsynchIO* aio; + std::auto_ptr<Rdma::Connector> acon; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); - void handleClosed(); - bool closeInternal(); - // Callbacks void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType); @@ -92,7 +90,9 @@ class RdmaConnector : public Connector, public sys::Codec void readbuff(Rdma::AsynchIO&, Rdma::Buffer*); void writebuff(Rdma::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); - void eof(Rdma::AsynchIO&); + void dataError(Rdma::AsynchIO&); + void drained(); + void stopped(Rdma::AsynchIO* aio=0); std::string identifier; @@ -153,26 +153,33 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, QPID_LOG(debug, "RdmaConnector created for " << version); } +namespace { + void deleteAsynchIO(Rdma::AsynchIO& aio) { + delete &aio; + } +} + RdmaConnector::~RdmaConnector() { + QPID_LOG(debug, "~RdmaConnector " << identifier); close(); - if (aio) aio->deferDelete(); + if (aio) aio->stop(deleteAsynchIO); } void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); - Rdma::Connector* c = new Rdma::Connector( + acon.reset(new Rdma::Connector( Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), boost::bind(&RdmaConnector::disconnected, this, poller, _1), - boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); + boost::bind(&RdmaConnector::rejected, this, poller, _1, _2))); polling = true; SocketAddress sa(host, boost::lexical_cast<std::string>(port)); - c->start(poller, sa); + acon->start(poller, sa); } // The following only gets run when connected @@ -184,7 +191,7 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru boost::bind(&RdmaConnector::readbuff, this, _1, _2), boost::bind(&RdmaConnector::writebuff, this, _1), 0, // write buffers full - boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection + boost::bind(&RdmaConnector::dataError, this, _1)); identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); ProtocolInitiation init(version); @@ -194,31 +201,70 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) { - QPID_LOG(trace, "Connection Error " << identifier); - eof(*aio); + QPID_LOG(debug, "Connection Error " << identifier); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + stopped(); } void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) { - eof(*aio); + QPID_LOG(debug, "Connection disconnected " << identifier); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + drained(); } void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) { - QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); - eof(*aio); + QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + stopped(); } -bool RdmaConnector::closeInternal() { +void RdmaConnector::dataError(Rdma::AsynchIO&) { + QPID_LOG(debug, "Data Error " << identifier); + { Mutex::ScopedLock l(pollingLock); - bool ret = polling; + // If we're closed already then we'll get to drain() anyway + if (!polling) return; polling = false; - if (ret) { - if (aio) aio->queueWriteClose(); } - return ret; + drained(); +} + +void RdmaConnector::stopped(Rdma::AsynchIO* aio) { + delete aio; + if (shutdownHandler) { + shutdownHandler->shutdown(); + } +} + +void RdmaConnector::drained() { + QPID_LOG(debug, "RdmaConnector::drained " << identifier); + if (aio) { + aio->stop(boost::bind(&RdmaConnector::stopped, this, aio)); + aio = 0; + } } void RdmaConnector::close() { - closeInternal(); + QPID_LOG(debug, "RdmaConnector::close " << identifier); + Mutex::ScopedLock l(pollingLock); + if (!polling) return; + polling = false; + if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::setInputHandler(InputHandler* handler){ @@ -259,11 +305,6 @@ void RdmaConnector::send(AMQFrame& frame) { if (notifyWrite && polling) aio->notifyPendingWrite(); } -void RdmaConnector::handleClosed() { - if (closeInternal() && shutdownHandler) - shutdownHandler->shutdown(); -} - // Called in IO thread. (write idle routine) // This is NOT only called in response to previously calling notifyPendingWrite void RdmaConnector::writebuff(Rdma::AsynchIO&) { @@ -340,10 +381,6 @@ void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { aio->queueWrite(buff); } -void RdmaConnector::eof(Rdma::AsynchIO&) { - handleClosed(); -} - void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 58317838bc..e3498fad47 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -74,6 +74,7 @@ class RdmaIOHandler : public OutputControl { void full(Rdma::AsynchIO& aio); void idle(Rdma::AsynchIO& aio); void error(Rdma::AsynchIO& aio); + void drained(Rdma::AsynchIO& aio); }; RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) : @@ -89,12 +90,18 @@ void RdmaIOHandler::init(Rdma::AsynchIO* a) { aio = a; } +namespace { + void deleteAsynchIO(Rdma::AsynchIO& aio) { + delete &aio; + } +} + RdmaIOHandler::~RdmaIOHandler() { if (codec) codec->closed(); delete codec; - aio->deferDelete(); + aio->stop(deleteAsynchIO); } void RdmaIOHandler::write(const framing::ProtocolInitiation& data) @@ -108,7 +115,7 @@ void RdmaIOHandler::write(const framing::ProtocolInitiation& data) } void RdmaIOHandler::close() { - aio->queueWriteClose(); + aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1)); } // TODO: Dummy implementation, need to fill this in for heartbeat timeout to work @@ -133,7 +140,7 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&) { aio->queueWrite(buff); } if (codec->isClosed()) - aio->queueWriteClose(); + aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1)); } void RdmaIOHandler::initProtocolOut() { @@ -149,6 +156,9 @@ void RdmaIOHandler::error(Rdma::AsynchIO&) { close(); } +void RdmaIOHandler::drained(Rdma::AsynchIO&) { +} + void RdmaIOHandler::full(Rdma::AsynchIO&) { QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); } @@ -176,7 +186,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; - aio->queueWriteClose(); + aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1)); } } @@ -195,7 +205,7 @@ void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { // send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; - aio->queueWriteClose(); + aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1)); } } } diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 4f51df8498..a5f54e839d 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -117,6 +117,10 @@ void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) { } } +void drained(Rdma::AsynchIO&) { + cout << "Drained:\n"; +} + void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { cout << "Connected\n"; Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 588d459b65..9244343ff8 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -49,8 +49,7 @@ namespace Rdma { recvBufferCount(rCount), xmitBufferCount(xCredit), outstandingWrites(0), - closed(false), - deleting(false), + draining(false), state(IDLE), readCallback(rc), idleCallback(ic), @@ -85,8 +84,11 @@ namespace Rdma { if ( outstandingWrites>0 ) QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); - // Turn off callbacks (before doing the deletes) - dataHandle.stopWatch(); + // Turn off callbacks if necessary (before doing the deletes) + if (state.get() != SHUTDOWN) { + QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); + dataHandle.stopWatch(); + } // The buffers ptr_deque automatically deletes all the buffers we've allocated // TODO: It might turn out to be more efficient in high connection loads to reuse the @@ -99,27 +101,58 @@ namespace Rdma { } // Mark for deletion/Delete this object when we have no outstanding writes - void AsynchIO::deferDelete() { + void AsynchIO::stop(NotifyCallback nc) { + State oldState; + State newState; + bool doReturn; + //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + do { + newState = oldState = state.get(); + doReturn = false; + if (outstandingWrites > 0 || (oldState != IDLE && oldState != DRAINED)) { + doReturn = true; + break; + } + + newState = SHUTDOWN; + + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { + notifyCallback = nc; + return; + } + dataHandle.stopWatch(); + // Callback, but don't store it - SHUTDOWN state means callback has been called + // we *are* allowed to delete the AsynchIO in this callback, so we have to return immediately + // after the callback + nc(*this); + } + + // Mark writing closed (so we don't accept any more writes or make any idle callbacks) + void AsynchIO::drainWriteQueue(NotifyCallback nc) { + draining = true; + State oldState; State newState; bool doReturn; //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - // It is safe to assign to deleting here as we either delete ourselves - // before leaving this function or deleting is set on exit do { newState = oldState = state.get(); doReturn = false; - if (outstandingWrites > 0 || oldState != IDLE) { - deleting = true; + if (oldState != IDLE) { doReturn = true; - } else{ - newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor + break; + } + + if (outstandingWrites == 0) { + newState = DRAINED; } } while (!state.boolCompareAndSwap(oldState, newState)); if (doReturn) { + notifyCallback = nc; return; } - delete this; + nc(*this); } void AsynchIO::queueWrite(Buffer* buff) { @@ -146,12 +179,6 @@ namespace Rdma { } } - // Mark now closed (so we don't accept any more writes or make any idle callbacks) - void AsynchIO::queueWriteClose() { - // Don't think we actually need to lock here as transition is 1 way only to closed - closed = true; - } - void AsynchIO::notifyPendingWrite() { // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. // If we are then we just return as we know that we will eventually do the idle callback anyway. @@ -182,8 +209,13 @@ namespace Rdma { case IDLE: newState = NOTIFY_WRITE; break; - case DELETED: - assert(oldState!=DELETED); + case SHUTDOWN: + // This is not allowed - we can't make any more writes as we shut the connection down. + assert(oldState!=SHUTDOWN); + doReturn = true; + case DRAINED: + // This is not allowed - we can't make any more writes as we're draining the write queue. + assert(oldState!=DRAINED); doReturn = true; }; } while (!state.boolCompareAndSwap(oldState, newState)); @@ -220,7 +252,7 @@ namespace Rdma { action = NOTIFY; break; default: - assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED); + assert(oldState!=IDLE && oldState!=DATA && oldState!=SHUTDOWN); action = RETURN; } } while (!state.boolCompareAndSwap(oldState, newState)); @@ -238,8 +270,8 @@ namespace Rdma { return; case EXIT: // If we just processed completions we might need to delete ourselves - if (deleting && outstandingWrites == 0) { - delete this; + if (notifyCallback && outstandingWrites == 0) { + doStoppedCallback(); } return; } @@ -260,6 +292,8 @@ namespace Rdma { case IDLE: newState = DATA; break; + case DRAINED: + break; default: // Can't get here in DATA state as that would violate the serialisation rules assert( oldState!=DATA ); @@ -276,35 +310,45 @@ namespace Rdma { //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); do { newState = oldState = state.get(); - assert( oldState==DATA ); - newState = NOTIFY_WRITE; + switch (oldState) { + case DATA: + newState = NOTIFY_WRITE; + break; + case DRAINED: + break; + default: + assert( oldState==DATA || oldState==DRAINED); + } } while (!state.boolCompareAndSwap(oldState, newState)); - do { + while (newState==NOTIFY_WRITE) { doWriteCallback(); // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - bool doBreak; do { newState = oldState = state.get(); - doBreak = false; if ( oldState==NOTIFY_WRITE ) { newState = IDLE; - doBreak = true; } else { - // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered + // Can't get DATA/PENDING_DATA/DRAINED here as dataEvent cannot be reentered assert( oldState==PENDING_NOTIFY ); newState = NOTIFY_WRITE; } } while (!state.boolCompareAndSwap(oldState, newState)); - if (doBreak) { - break; + } + + // If we've got all the write confirmations and we're draining + if (draining) { + if (outstandingWrites == 0) { + doDrainedCallback(); + draining = false; } - } while (true); + return; + } // We might need to delete ourselves - if (deleting && outstandingWrites == 0) { - delete this; + if (notifyCallback && outstandingWrites == 0) { + doStoppedCallback(); } } @@ -418,6 +462,29 @@ namespace Rdma { } } + void AsynchIO::doDrainedCallback() { + NotifyCallback nc; + nc.swap(notifyCallback); + // Transition unconditionally to DRAINED + State oldState; + do { + oldState = state.get(); + } while (!state.boolCompareAndSwap(oldState, DRAINED)); + nc(*this); + } + + void AsynchIO::doStoppedCallback() { + dataHandle.stopWatch(); + NotifyCallback nc; + nc.swap(notifyCallback); + // Transition unconditionally to SHUTDOWN + State oldState; + do { + oldState = state.get(); + } while (!state.boolCompareAndSwap(oldState, SHUTDOWN)); + nc(*this); + } + Buffer* AsynchIO::getBuffer() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); assert(!bufferQueue.empty()); @@ -444,12 +511,13 @@ namespace Rdma { errorCallback(errc), disconnectedCallback(dc) { + QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager"); ci->nonblocking(); } ConnectionManager::~ConnectionManager() { - handle.stopWatch(); + QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager"); } void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) { diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 711685031c..0b86461465 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -43,8 +43,9 @@ namespace Rdma { { typedef boost::function1<void, AsynchIO&> ErrorCallback; typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; - typedef boost::function1<void, AsynchIO&> IdleCallback; - typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback; + typedef boost::function1<void, AsynchIO&> IdleCallback; + typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback; + typedef boost::function1<void, AsynchIO&> NotifyCallback; QueuePair::intrusive_ptr qp; qpid::sys::DispatchHandleRef dataHandle; @@ -54,9 +55,8 @@ namespace Rdma { int recvBufferCount; int xmitBufferCount; int outstandingWrites; - bool closed; // TODO: Perhaps (probably) this state can be merged with the following... - bool deleting; // TODO: Perhaps (probably) this state can be merged with the following... - enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED }; + bool draining; // TODO: Perhaps (probably) this state can be merged with the following... + enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN }; qpid::sys::AtomicValue<State> state; //qpid::sys::Mutex stateLock; std::deque<Buffer*> bufferQueue; @@ -67,6 +67,7 @@ namespace Rdma { IdleCallback idleCallback; FullCallback fullCallback; ErrorCallback errorCallback; + NotifyCallback notifyCallback; public: // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use @@ -82,22 +83,20 @@ namespace Rdma { FullCallback fc, ErrorCallback ec ); + ~AsynchIO(); void start(qpid::sys::Poller::shared_ptr poller); bool writable() const; bool bufferAvailable() const; void queueWrite(Buffer* buff); void notifyPendingWrite(); - void queueWriteClose(); - void deferDelete(); + void drainWriteQueue(NotifyCallback); + void stop(NotifyCallback); int incompletedWrites() const; Buffer* getBuffer(); void returnBuffer(Buffer*); private: - // Don't let anyone else delete us to make sure there can't be outstanding callbacks - ~AsynchIO(); - // Constants for the peer-peer command messages // These are sent in the high bits if the imm data of an rdma message // The low bits are used to send the credit @@ -107,10 +106,12 @@ namespace Rdma { void dataEvent(qpid::sys::DispatchHandle& handle); void processCompletions(); void doWriteCallback(); + void doStoppedCallback(); + void doDrainedCallback(); }; inline bool AsynchIO::writable() const { - return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0); + return (!draining && outstandingWrites < xmitBufferCount && xmitCredit > 0); } inline int AsynchIO::incompletedWrites() const { @@ -146,7 +147,7 @@ namespace Rdma { class ConnectionManager { Connection::intrusive_ptr ci; - qpid::sys::DispatchHandle handle; + qpid::sys::DispatchHandleRef handle; protected: ErrorCallback errorCallback; @@ -160,7 +161,7 @@ namespace Rdma { virtual ~ConnectionManager(); - void start(qpid::sys::Poller::shared_ptr polle, const qpid::sys::SocketAddress& addrr); + void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr); private: void event(qpid::sys::DispatchHandle& handle); diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index 4fcd551bba..d42784fbaa 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -86,10 +86,14 @@ void full(ConRec* cr, Rdma::AsynchIO&, Rdma::Buffer* buf) { cr->queuedWrites.push(buf); } +void drained(Rdma::AsynchIO&) { + cout << "Drained:\n"; +} + void disconnected(Rdma::Connection::intrusive_ptr& ci) { ConRec* cr = ci->getContext<ConRec>(); cr->connection->disconnect(); - cr->data->queueWriteClose(); + cr->data->drainWriteQueue(drained); delete cr; cout << "Disconnected: " << cr << "\n"; } @@ -98,7 +102,7 @@ void connectionError(Rdma::Connection::intrusive_ptr& ci, Rdma::ErrorType) { ConRec* cr = ci->getContext<ConRec>(); cr->connection->disconnect(); if (cr) { - cr->data->queueWriteClose(); + cr->data->drainWriteQueue(drained); delete cr; } cout << "Connection error: " << cr << "\n"; |