diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/RdmaConnector.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/RdmaConnector.cpp | 124 |
1 files changed, 55 insertions, 69 deletions
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index 5558e27752..79f86d09c2 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -64,8 +64,8 @@ class RdmaConnector : public Connector, public sys::Codec framing::ProtocolVersion version; bool initiated; - sys::Mutex pollingLock; - bool polling; + sys::Mutex dataConnectedLock; + bool dataConnected; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; @@ -90,7 +90,7 @@ class RdmaConnector : public Connector, public sys::Codec void writeDataBlock(const framing::AMQDataBlock& data); void dataError(Rdma::AsynchIO&); void drained(); - void connectionStopped(Rdma::Connector* acon); + void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio); void dataStopped(Rdma::AsynchIO* aio); std::string identifier; @@ -144,7 +144,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, bounds(cimpl), version(ver), initiated(false), - polling(false), + dataConnected(false), shutdownHandler(0), aio(0), acon(0), @@ -171,14 +171,11 @@ RdmaConnector::~RdmaConnector() { if (acon) { acon->stop(deleteConnector); } - if (shutdownHandler) { - shutdownHandler->shutdown(); - } } void RdmaConnector::connect(const std::string& host, int port){ - Mutex::ScopedLock l(pollingLock); - assert(!polling); + Mutex::ScopedLock l(dataConnectedLock); + assert(!dataConnected); acon = new Rdma::Connector( Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), @@ -187,8 +184,6 @@ void RdmaConnector::connect(const std::string& host, int port){ boost::bind(&RdmaConnector::disconnected, this), boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); - polling = true; - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); acon->start(poller, sa); } @@ -196,6 +191,8 @@ void RdmaConnector::connect(const std::string& host, int port){ // The following only gets run when connected void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) { try { + Mutex::ScopedLock l(dataConnectedLock); + assert(!dataConnected); Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); aio = new Rdma::AsynchIO(ci->getQueuePair(), @@ -210,67 +207,54 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru writeDataBlock(init); aio->start(poller); + + dataConnected = true; + return; } catch (const Rdma::Exception& e) { QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what()); } catch (const std::exception& e) { QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what()); } - { - Mutex::ScopedLock l(pollingLock); - assert(polling); - polling = false; - } - connectionStopped(acon); + dataConnected = false; + connectionStopped(acon, aio); } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { QPID_LOG(debug, "Connection Error " << identifier); - { - Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to stopped() anyway - if (!polling) return; - polling = false; - } - connectionStopped(acon); -} - -void RdmaConnector::disconnected() { - QPID_LOG(debug, "Connection disconnected " << identifier); - { - Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to drained() anyway - if (!polling) return; - polling = false; - } - // Make sure that all the disconnected actions take place on the data "thread" - aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); + connectionStopped(acon, aio); } // Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects // so we need to check whether the data connection is started or not in here void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) { QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); - { - Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to stopped() anyway - if (!polling) return; - polling = false; - } if (dataConnected) { disconnected(); } else { - connectionStopped(acon); + connectionStopped(acon, aio); } } +void RdmaConnector::disconnected() { + QPID_LOG(debug, "Connection disconnected " << identifier); + { + Mutex::ScopedLock l(dataConnectedLock); + // If we're closed already then we'll get to drained() anyway + if (!dataConnected) return; + dataConnected = false; + } + // Make sure that all the disconnected actions take place on the data "thread" + aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); +} + void RdmaConnector::dataError(Rdma::AsynchIO&) { QPID_LOG(debug, "Data Error " << identifier); { - Mutex::ScopedLock l(pollingLock); + Mutex::ScopedLock l(dataConnectedLock); // If we're closed already then we'll get to drained() anyway - if (!polling) return; - polling = false; + if (!dataConnected) return; + dataConnected = false; } drained(); } @@ -278,39 +262,37 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) { void RdmaConnector::close() { QPID_LOG(debug, "RdmaConnector::close " << identifier); { - Mutex::ScopedLock l(pollingLock); - if (!polling) return; - polling = false; + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) return; + dataConnected = false; } - if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); + aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::drained() { QPID_LOG(debug, "RdmaConnector::drained " << identifier); - assert(!polling); - if (aio) { - Rdma::AsynchIO* a = aio; - aio = 0; - a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); - } + assert(!dataConnected); + assert(aio); + Rdma::AsynchIO* a = aio; + aio = 0; + a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); } void RdmaConnector::dataStopped(Rdma::AsynchIO* a) { QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier); - assert(!polling); - aio = 0; - delete a; - if (acon) { - Rdma::Connector* c = acon; - acon = 0; - c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c)); - } + assert(!dataConnected); + assert(acon); + Rdma::Connector* c = acon; + acon = 0; + c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a)); } -void RdmaConnector::connectionStopped(Rdma::Connector* c) { +void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) { QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier); - assert(!polling); + assert(!dataConnected); + aio = 0; acon = 0; + delete a; delete c; if (shutdownHandler) { ShutdownHandler* s = shutdownHandler; @@ -340,6 +322,10 @@ const std::string& RdmaConnector::getIdentifier() const { } void RdmaConnector::send(AMQFrame& frame) { + // It is possible that we are called to write after we are already shutting down + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) return; + bool notifyWrite = false; { Mutex::ScopedLock l(lock); @@ -354,15 +340,15 @@ void RdmaConnector::send(AMQFrame& frame) { notifyWrite = (currentSize >= maxFrameSize); } } - if (notifyWrite && polling) aio->notifyPendingWrite(); + if (notifyWrite) aio->notifyPendingWrite(); } // Called in IO thread. (write idle routine) // This is NOT only called in response to previously calling notifyPendingWrite void RdmaConnector::writebuff(Rdma::AsynchIO&) { // It's possible to be disconnected and be writable - Mutex::ScopedLock l(pollingLock); - if (!polling) + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) return; Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; |