diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:05:05 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:05:05 +0000 |
commit | fe9fca8fadffbb8658a001884b33c39fcd29f2c4 (patch) | |
tree | 893f0a17b612c43a8a3304483fdf95156963052a | |
parent | 18fe1fa364b7c0fa84d8e5eebdbe246e4c3fadee (diff) | |
download | qpid-python-fe9fca8fadffbb8658a001884b33c39fcd29f2c4.tar.gz |
Delay deleting the Rdma::AsynchIO associated with a Connection to just before
the callback to the ConnectionImpl shutdown function so that we make the
possibility of race between a write coming down and deleting it as small as
possible.
Rearranged scope of polling boolean to indicate that the data channel is
connected (or not) and changed name to better describe its function
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1021821 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/client/RdmaConnector.cpp | 124 |
1 files changed, 55 insertions, 69 deletions
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index 5558e27752..79f86d09c2 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -64,8 +64,8 @@ class RdmaConnector : public Connector, public sys::Codec framing::ProtocolVersion version; bool initiated; - sys::Mutex pollingLock; - bool polling; + sys::Mutex dataConnectedLock; + bool dataConnected; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; @@ -90,7 +90,7 @@ class RdmaConnector : public Connector, public sys::Codec void writeDataBlock(const framing::AMQDataBlock& data); void dataError(Rdma::AsynchIO&); void drained(); - void connectionStopped(Rdma::Connector* acon); + void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio); void dataStopped(Rdma::AsynchIO* aio); std::string identifier; @@ -144,7 +144,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, bounds(cimpl), version(ver), initiated(false), - polling(false), + dataConnected(false), shutdownHandler(0), aio(0), acon(0), @@ -171,14 +171,11 @@ RdmaConnector::~RdmaConnector() { if (acon) { acon->stop(deleteConnector); } - if (shutdownHandler) { - shutdownHandler->shutdown(); - } } void RdmaConnector::connect(const std::string& host, int port){ - Mutex::ScopedLock l(pollingLock); - assert(!polling); + Mutex::ScopedLock l(dataConnectedLock); + assert(!dataConnected); acon = new Rdma::Connector( Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), @@ -187,8 +184,6 @@ void RdmaConnector::connect(const std::string& host, int port){ boost::bind(&RdmaConnector::disconnected, this), boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); - polling = true; - SocketAddress sa(host, boost::lexical_cast<std::string>(port)); acon->start(poller, sa); } @@ -196,6 +191,8 @@ void RdmaConnector::connect(const std::string& host, int port){ // The following only gets run when connected void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) { try { + Mutex::ScopedLock l(dataConnectedLock); + assert(!dataConnected); Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); aio = new Rdma::AsynchIO(ci->getQueuePair(), @@ -210,67 +207,54 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru writeDataBlock(init); aio->start(poller); + + dataConnected = true; + return; } catch (const Rdma::Exception& e) { QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what()); } catch (const std::exception& e) { QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what()); } - { - Mutex::ScopedLock l(pollingLock); - assert(polling); - polling = false; - } - connectionStopped(acon); + dataConnected = false; + connectionStopped(acon, aio); } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { QPID_LOG(debug, "Connection Error " << identifier); - { - Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to stopped() anyway - if (!polling) return; - polling = false; - } - connectionStopped(acon); -} - -void RdmaConnector::disconnected() { - QPID_LOG(debug, "Connection disconnected " << identifier); - { - Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to drained() anyway - if (!polling) return; - polling = false; - } - // Make sure that all the disconnected actions take place on the data "thread" - aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); + connectionStopped(acon, aio); } // Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects // so we need to check whether the data connection is started or not in here void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) { QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); - { - Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to stopped() anyway - if (!polling) return; - polling = false; - } if (dataConnected) { disconnected(); } else { - connectionStopped(acon); + connectionStopped(acon, aio); } } +void RdmaConnector::disconnected() { + QPID_LOG(debug, "Connection disconnected " << identifier); + { + Mutex::ScopedLock l(dataConnectedLock); + // If we're closed already then we'll get to drained() anyway + if (!dataConnected) return; + dataConnected = false; + } + // Make sure that all the disconnected actions take place on the data "thread" + aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); +} + void RdmaConnector::dataError(Rdma::AsynchIO&) { QPID_LOG(debug, "Data Error " << identifier); { - Mutex::ScopedLock l(pollingLock); + Mutex::ScopedLock l(dataConnectedLock); // If we're closed already then we'll get to drained() anyway - if (!polling) return; - polling = false; + if (!dataConnected) return; + dataConnected = false; } drained(); } @@ -278,39 +262,37 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) { void RdmaConnector::close() { QPID_LOG(debug, "RdmaConnector::close " << identifier); { - Mutex::ScopedLock l(pollingLock); - if (!polling) return; - polling = false; + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) return; + dataConnected = false; } - if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); + aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::drained() { QPID_LOG(debug, "RdmaConnector::drained " << identifier); - assert(!polling); - if (aio) { - Rdma::AsynchIO* a = aio; - aio = 0; - a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); - } + assert(!dataConnected); + assert(aio); + Rdma::AsynchIO* a = aio; + aio = 0; + a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); } void RdmaConnector::dataStopped(Rdma::AsynchIO* a) { QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier); - assert(!polling); - aio = 0; - delete a; - if (acon) { - Rdma::Connector* c = acon; - acon = 0; - c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c)); - } + assert(!dataConnected); + assert(acon); + Rdma::Connector* c = acon; + acon = 0; + c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a)); } -void RdmaConnector::connectionStopped(Rdma::Connector* c) { +void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) { QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier); - assert(!polling); + assert(!dataConnected); + aio = 0; acon = 0; + delete a; delete c; if (shutdownHandler) { ShutdownHandler* s = shutdownHandler; @@ -340,6 +322,10 @@ const std::string& RdmaConnector::getIdentifier() const { } void RdmaConnector::send(AMQFrame& frame) { + // It is possible that we are called to write after we are already shutting down + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) return; + bool notifyWrite = false; { Mutex::ScopedLock l(lock); @@ -354,15 +340,15 @@ void RdmaConnector::send(AMQFrame& frame) { notifyWrite = (currentSize >= maxFrameSize); } } - if (notifyWrite && polling) aio->notifyPendingWrite(); + if (notifyWrite) aio->notifyPendingWrite(); } // Called in IO thread. (write idle routine) // This is NOT only called in response to previously calling notifyPendingWrite void RdmaConnector::writebuff(Rdma::AsynchIO&) { // It's possible to be disconnected and be writable - Mutex::ScopedLock l(pollingLock); - if (!polling) + Mutex::ScopedLock l(dataConnectedLock); + if (!dataConnected) return; Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; |