diff options
| author | Gordon Sim <gsim@apache.org> | 2010-04-15 18:08:04 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2010-04-15 18:08:04 +0000 |
| commit | 339c98598ac775e21e456c60bdbbdb84655f7a40 (patch) | |
| tree | 2a3e1bc9a33aad9d3f881a7b025136d59305a367 /cpp/src | |
| parent | 1da2769ba27a7b40ad88b6acc526311a45fed3ee (diff) | |
| download | qpid-python-339c98598ac775e21e456c60bdbbdb84655f7a40.tar.gz | |
QPID-2511: Altered shutdown sequence in connectors; after connect() has returned there will always now be a shutdown callback made and only at that point is it guaranteed that there will be no further io callbacks. ConnectionImpl consequently waits for this before allowing itself to be deleted.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@934503 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 33 | ||||
| -rw-r--r-- | cpp/src/qpid/client/TCPConnector.h | 4 |
5 files changed, 73 insertions, 48 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index b6043518e8..6b352e2e65 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -166,7 +166,8 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), version(v), - nextChannel(1) + nextChannel(1), + shutdownComplete(false) { QPID_LOG(debug, "ConnectionImpl created for " << version.toString()); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); @@ -181,10 +182,12 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max(); ConnectionImpl::~ConnectionImpl() { - // Important to close the connector first, to ensure the - // connector thread does not call on us while the destructor - // is running. - if (connector) connector->close(); + if (connector) { + connector->close(); + //wait until we get the shutdown callback to ensure that the + //io threads will not call back on us after deletion + waitForShutdownComplete(); + } theIO().sub(); } @@ -244,7 +247,13 @@ void ConnectionImpl::open() connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); - connector->connect(host, port); + try { + connector->connect(host, port); + } catch (const std::exception& e) { + QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what()); + connector.reset(); + throw; + } connector->init(); // Enable heartbeat if requested @@ -330,9 +339,22 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } +void ConnectionImpl::shutdown() { + //May need to take a temporary reference to ourselves to prevent + //our destructor being called until after we have notified of + //shutdown completion; the destructor may be called as a result of + //the call to failedConnection(). + boost::shared_ptr<ConnectionImpl> temp; + if (!handler.isClosed()) { + temp = shared_from_this(); + failedConnection(); + } + notifyShutdownComplete(); +} + static const std::string CONN_CLOSED("Connection closed"); -void ConnectionImpl::shutdown() { +void ConnectionImpl::failedConnection() { if ( failureCallback ) failureCallback(); @@ -375,4 +397,16 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na return simpl; } +void ConnectionImpl::waitForShutdownComplete() +{ + Mutex::ScopedLock l(lock); + while(!shutdownComplete) lock.wait(); +} +void ConnectionImpl::notifyShutdownComplete() +{ + Mutex::ScopedLock l(lock); + shutdownComplete = true; + lock.notifyAll(); +} + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 1d88fcaf99..c2313e72bc 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -26,7 +26,7 @@ #include "qpid/client/ConnectionHandler.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -60,7 +60,8 @@ class ConnectionImpl : public Bounds, boost::scoped_ptr<Connector> connector; framing::ProtocolVersion version; uint16_t nextChannel; - sys::Mutex lock; + sys::Monitor lock; + bool shutdownComplete; boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask; @@ -72,6 +73,9 @@ class ConnectionImpl : public Bounds, void idleOut(); void idleIn(); void shutdown(); + void failedConnection(); + void waitForShutdownComplete(); + void notifyShutdownComplete(); boost::function<void ()> failureCallback; diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 0c794145db..990ca90de7 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -106,9 +106,6 @@ class SslConnector : public Connector ~SslConnector(); - void handleClosed(); - bool closeInternal(); - void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); void writebuff(qpid::sys::ssl::SslIO&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -128,6 +125,7 @@ class SslConnector : public Connector framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; const SecuritySettings* getSecuritySettings(); + void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); public: SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, @@ -204,7 +202,7 @@ void SslConnector::connect(const std::string& host, int port){ boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), boost::bind(&SslConnector::eof, this, _1), - 0, // closed + boost::bind(&SslConnector::socketClosed, this, _1, _2), 0, // nobuffs boost::bind(&SslConnector::writebuff, this, _1)); writer.init(identifier, aio); @@ -220,19 +218,20 @@ void SslConnector::init(){ aio->start(poller); } -bool SslConnector::closeInternal() { +void SslConnector::close() { Mutex::ScopedLock l(closedLock); - bool ret = !closed; if (!closed) { closed = true; - aio->queueForDeletion(); - socket.close(); + if (aio) + aio->queueWriteClose(); } - return ret; } -void SslConnector::close() { - closeInternal(); +void SslConnector::socketClosed(SslIO&, const SslSocket&) { + if (aio) + aio->queueForDeletion(); + if (shutdownHandler) + shutdownHandler->shutdown(); } void SslConnector::setInputHandler(InputHandler* handler){ @@ -259,11 +258,6 @@ void SslConnector::send(AMQFrame& frame) { writer.handle(frame); } -void SslConnector::handleClosed() { - if (closeInternal() && shutdownHandler) - shutdownHandler->shutdown(); -} - SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -365,7 +359,7 @@ void SslConnector::writeDataBlock(const AMQDataBlock& data) { } void SslConnector::eof(SslIO&) { - handleClosed(); + close(); } const SecuritySettings* SslConnector::getSecuritySettings() diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index 1a245fe2c8..3ead7e2ce0 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -105,7 +105,7 @@ void TCPConnector::connected(const Socket&) { boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), boost::bind(&TCPConnector::eof, this, _1), - 0, // closed + boost::bind(&TCPConnector::socketClosed, this, _1, _2), 0, // nobuffs boost::bind(&TCPConnector::writebuff, this, _1)); start(aio); @@ -128,28 +128,28 @@ void TCPConnector::initAmqp() { } void TCPConnector::connectFailed(const std::string& msg) { - QPID_LOG(warning, "Connecting failed: " << msg); + QPID_LOG(warning, "Connect failed: " << msg); socket.close(); - if (!closed && shutdownHandler) { + if (!closed) closed = true; + if (shutdownHandler) shutdownHandler->shutdown(); - } } -bool TCPConnector::closeInternal() { +void TCPConnector::close() { Mutex::ScopedLock l(lock); - bool ret = !closed; - closed = true; - if (ret) { + if (!closed) { + closed = true; if (aio) - aio->queueForDeletion(); - socket.close(); + aio->queueWriteClose(); } - return ret; } -void TCPConnector::close() { - closeInternal(); +void TCPConnector::socketClosed(AsynchIO&, const Socket&) { + if (aio) + aio->queueForDeletion(); + if (shutdownHandler) + shutdownHandler->shutdown(); } void TCPConnector::abort() { @@ -214,11 +214,6 @@ void TCPConnector::send(AMQFrame& frame) { } } -void TCPConnector::handleClosed() { - if (closeInternal() && shutdownHandler) - shutdownHandler->shutdown(); -} - void TCPConnector::writebuff(AsynchIO& /*aio*/) { // It's possible to be disconnected and be writable @@ -315,7 +310,7 @@ void TCPConnector::writeDataBlock(const AMQDataBlock& data) { } void TCPConnector::eof(AsynchIO&) { - handleClosed(); + close(); } void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index 6d447def2e..04504a5173 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -76,9 +76,6 @@ class TCPConnector : public Connector, public sys::Codec boost::shared_ptr<sys::Poller> poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; - void handleClosed(); - bool closeInternal(); - virtual void connected(const sys::Socket&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -107,6 +104,7 @@ protected: bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void eof(qpid::sys::AsynchIO&); + void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&); public: TCPConnector(boost::shared_ptr<sys::Poller>, |
