diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:04:47 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:04:47 +0000 |
commit | c6621dcb50cb819ee370311ff252134bbcf96c60 (patch) | |
tree | 9028cd5fc9c74a12661d116fa3a322368c2913e6 | |
parent | fa5145ec6ac34efdc3009b4a09e945aa446c16fb (diff) | |
download | qpid-python-c6621dcb50cb819ee370311ff252134bbcf96c60.tar.gz |
Serialise close into the data callbacks:
Rejig Rdma::ConnectionManager to have a stop function with a callback and
use this to ensure that the Rdma::Connector used by qpid::sys::RdmaConnector
is correctly deleted only after it has been actually stopped
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1021819 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/client/RdmaConnector.cpp | 84 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/rdma/RdmaIO.h | 11 |
3 files changed, 72 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index 208d42f672..026952bd99 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -73,7 +73,7 @@ class RdmaConnector : public Connector, public sys::Codec framing::OutputHandler* output; Rdma::AsynchIO* aio; - std::auto_ptr<Rdma::Connector> acon; + Rdma::Connector* acon; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; @@ -82,7 +82,6 @@ class RdmaConnector : public Connector, public sys::Codec // Callbacks void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType); - void disconnectAction(); void disconnected(); void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); @@ -91,7 +90,8 @@ class RdmaConnector : public Connector, public sys::Codec void writeDataBlock(const framing::AMQDataBlock& data); void dataError(Rdma::AsynchIO&); void drained(); - void stopped(Rdma::AsynchIO* aio=0); + void connectionStopped(Rdma::Connector* acon); + void dataStopped(Rdma::AsynchIO* aio); std::string identifier; @@ -147,6 +147,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, polling(false), shutdownHandler(0), aio(0), + acon(0), poller(p) { QPID_LOG(debug, "RdmaConnector created for " << version); @@ -156,14 +157,20 @@ namespace { void deleteAsynchIO(Rdma::AsynchIO& aio) { delete &aio; } + + void deleteConnector(Rdma::ConnectionManager& con) { + delete &con; + } } RdmaConnector::~RdmaConnector() { QPID_LOG(debug, "~RdmaConnector " << identifier); - close(); if (aio) { aio->stop(deleteAsynchIO); } + if (acon) { + acon->stop(deleteConnector); + } if (shutdownHandler) { shutdownHandler->shutdown(); } @@ -173,12 +180,12 @@ void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); - acon.reset(new Rdma::Connector( + acon = new Rdma::Connector( Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), boost::bind(&RdmaConnector::disconnected, this), - boost::bind(&RdmaConnector::rejected, this, poller, _1, _2))); + boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); polling = true; @@ -211,11 +218,10 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru } { Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to stopped() anyway - if (!polling) return; + assert(polling); polling = false; } - stopped(); + connectionStopped(acon); } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { @@ -226,10 +232,10 @@ void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::i if (!polling) return; polling = false; } - stopped(); + connectionStopped(acon); } -void RdmaConnector::disconnectAction() { +void RdmaConnector::disconnected() { QPID_LOG(debug, "Connection disconnected " << identifier); { Mutex::ScopedLock l(pollingLock); @@ -237,11 +243,8 @@ void RdmaConnector::disconnectAction() { if (!polling) return; polling = false; } - drained(); -} - -void RdmaConnector::disconnected() { - aio->requestCallback(boost::bind(&RdmaConnector::disconnectAction, this)); + // Make sure that all the disconnected actions take place on the data "thread" + aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) { @@ -252,7 +255,7 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv if (!polling) return; polling = false; } - stopped(); + connectionStopped(acon); } void RdmaConnector::dataError(Rdma::AsynchIO&) { @@ -266,35 +269,48 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) { drained(); } -void RdmaConnector::stopped(Rdma::AsynchIO* a) { - QPID_LOG(debug, "RdmaConnector::stopped " << identifier); - assert(!polling); - aio = 0; - delete a; - if (shutdownHandler) { - ShutdownHandler* s = shutdownHandler; - shutdownHandler = 0; - s->shutdown(); +void RdmaConnector::close() { + QPID_LOG(debug, "RdmaConnector::close " << identifier); + { + Mutex::ScopedLock l(pollingLock); + if (!polling) return; + polling = false; } + if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::drained() { QPID_LOG(debug, "RdmaConnector::drained " << identifier); assert(!polling); - acon->stop(); if (aio) { Rdma::AsynchIO* a = aio; aio = 0; - a->stop(boost::bind(&RdmaConnector::stopped, this, a)); + a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); } } -void RdmaConnector::close() { - QPID_LOG(debug, "RdmaConnector::close " << identifier); - Mutex::ScopedLock l(pollingLock); - if (!polling) return; - polling = false; - if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); +void RdmaConnector::dataStopped(Rdma::AsynchIO* a) { + QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier); + assert(!polling); + aio = 0; + delete a; + if (acon) { + Rdma::Connector* c = acon; + acon = 0; + c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c)); + } +} + +void RdmaConnector::connectionStopped(Rdma::Connector* c) { + QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier); + assert(!polling); + acon = 0; + delete c; + if (shutdownHandler) { + ShutdownHandler* s = shutdownHandler; + shutdownHandler = 0; + s->shutdown(); + } } void RdmaConnector::setInputHandler(InputHandler* handler){ diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp index fe7062d3ea..23660a0b9f 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -518,6 +518,7 @@ namespace Rdma { ErrorCallback errc, DisconnectedCallback dc ) : + state(IDLE), ci(Connection::make()), handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0), errorCallback(errc), @@ -537,11 +538,23 @@ namespace Rdma { handle.startWatch(poller); } - void ConnectionManager::stop() { + void ConnectionManager::doStoppedCallback() { + // Ensure we can't get any more callbacks (except for the stopped callback) handle.stopWatch(); + + NotifyCallback nc; + nc.swap(notifyCallback); + nc(*this); + } + + void ConnectionManager::stop(NotifyCallback nc) { + state = STOPPED; + notifyCallback = nc; + handle.call(boost::bind(&ConnectionManager::doStoppedCallback, this)); } void ConnectionManager::event(DispatchHandle&) { + if (state.get() == STOPPED) return; connectionEvent(ci); } diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h index 55174ea8a1..00eba28716 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -26,7 +26,6 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" -#include "qpid/sys/Mutex.h" #include "qpid/sys/SocketAddress.h" #include <netinet/in.h> @@ -163,14 +162,19 @@ namespace Rdma { typedef boost::function1<void, Rdma::Connection::intrusive_ptr> DisconnectedCallback; class ConnectionManager { + typedef boost::function1<void, ConnectionManager&> NotifyCallback; + + enum State {IDLE, STOPPED}; + qpid::sys::AtomicValue<State> state; Connection::intrusive_ptr ci; qpid::sys::DispatchHandleRef handle; + NotifyCallback notifyCallback; protected: ErrorCallback errorCallback; DisconnectedCallback disconnectedCallback; - public: + public: ConnectionManager( ErrorCallback errc, DisconnectedCallback dc @@ -179,10 +183,11 @@ namespace Rdma { virtual ~ConnectionManager(); void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr); - void stop(); + void stop(NotifyCallback); private: void event(qpid::sys::DispatchHandle& handle); + void doStoppedCallback(); virtual void startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) = 0; virtual void connectionEvent(Connection::intrusive_ptr ci) = 0; |