diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 33 | ||||
| -rw-r--r-- | cpp/src/qpid/client/TCPConnector.h | 4 |
5 files changed, 73 insertions, 48 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index b6043518e8..6b352e2e65 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -166,7 +166,8 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), version(v), - nextChannel(1) + nextChannel(1), + shutdownComplete(false) { QPID_LOG(debug, "ConnectionImpl created for " << version.toString()); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); @@ -181,10 +182,12 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max(); ConnectionImpl::~ConnectionImpl() { - // Important to close the connector first, to ensure the - // connector thread does not call on us while the destructor - // is running. - if (connector) connector->close(); + if (connector) { + connector->close(); + //wait until we get the shutdown callback to ensure that the + //io threads will not call back on us after deletion + waitForShutdownComplete(); + } theIO().sub(); } @@ -244,7 +247,13 @@ void ConnectionImpl::open() connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); - connector->connect(host, port); + try { + connector->connect(host, port); + } catch (const std::exception& e) { + QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what()); + connector.reset(); + throw; + } connector->init(); // Enable heartbeat if requested @@ -330,9 +339,22 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } +void ConnectionImpl::shutdown() { + //May need to take a temporary reference to ourselves to prevent + //our destructor being called until after we have notified of + //shutdown completion; the destructor may be called as a result of + //the call to failedConnection(). + boost::shared_ptr<ConnectionImpl> temp; + if (!handler.isClosed()) { + temp = shared_from_this(); + failedConnection(); + } + notifyShutdownComplete(); +} + static const std::string CONN_CLOSED("Connection closed"); -void ConnectionImpl::shutdown() { +void ConnectionImpl::failedConnection() { if ( failureCallback ) failureCallback(); @@ -375,4 +397,16 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na return simpl; } +void ConnectionImpl::waitForShutdownComplete() +{ + Mutex::ScopedLock l(lock); + while(!shutdownComplete) lock.wait(); +} +void ConnectionImpl::notifyShutdownComplete() +{ + Mutex::ScopedLock l(lock); + shutdownComplete = true; + lock.notifyAll(); +} + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 1d88fcaf99..c2313e72bc 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -26,7 +26,7 @@ #include "qpid/client/ConnectionHandler.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -60,7 +60,8 @@ class ConnectionImpl : public Bounds, boost::scoped_ptr<Connector> connector; framing::ProtocolVersion version; uint16_t nextChannel; - sys::Mutex lock; + sys::Monitor lock; + bool shutdownComplete; boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask; @@ -72,6 +73,9 @@ class ConnectionImpl : public Bounds, void idleOut(); void idleIn(); void shutdown(); + void failedConnection(); + void waitForShutdownComplete(); + void notifyShutdownComplete(); boost::function<void ()> failureCallback; diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 0c794145db..990ca90de7 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -106,9 +106,6 @@ class SslConnector : public Connector ~SslConnector(); - void handleClosed(); - bool closeInternal(); - void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); void writebuff(qpid::sys::ssl::SslIO&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -128,6 +125,7 @@ class SslConnector : public Connector framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; const SecuritySettings* getSecuritySettings(); + void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); public: SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, @@ -204,7 +202,7 @@ void SslConnector::connect(const std::string& host, int port){ boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), boost::bind(&SslConnector::eof, this, _1), - 0, // closed + boost::bind(&SslConnector::socketClosed, this, _1, _2), 0, // nobuffs boost::bind(&SslConnector::writebuff, this, _1)); writer.init(identifier, aio); @@ -220,19 +218,20 @@ void SslConnector::init(){ aio->start(poller); } -bool SslConnector::closeInternal() { +void SslConnector::close() { Mutex::ScopedLock l(closedLock); - bool ret = !closed; if (!closed) { closed = true; - aio->queueForDeletion(); - socket.close(); + if (aio) + aio->queueWriteClose(); } - return ret; } -void SslConnector::close() { - closeInternal(); +void SslConnector::socketClosed(SslIO&, const SslSocket&) { + if (aio) + aio->queueForDeletion(); + if (shutdownHandler) + shutdownHandler->shutdown(); } void SslConnector::setInputHandler(InputHandler* handler){ @@ -259,11 +258,6 @@ void SslConnector::send(AMQFrame& frame) { writer.handle(frame); } -void SslConnector::handleClosed() { - if (closeInternal() && shutdownHandler) - shutdownHandler->shutdown(); -} - SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -365,7 +359,7 @@ void SslConnector::writeDataBlock(const AMQDataBlock& data) { } void SslConnector::eof(SslIO&) { - handleClosed(); + close(); } const SecuritySettings* SslConnector::getSecuritySettings() diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index 1a245fe2c8..3ead7e2ce0 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -105,7 +105,7 @@ void TCPConnector::connected(const Socket&) { boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), boost::bind(&TCPConnector::eof, this, _1), - 0, // closed + boost::bind(&TCPConnector::socketClosed, this, _1, _2), 0, // nobuffs boost::bind(&TCPConnector::writebuff, this, _1)); start(aio); @@ -128,28 +128,28 @@ void TCPConnector::initAmqp() { } void TCPConnector::connectFailed(const std::string& msg) { - QPID_LOG(warning, "Connecting failed: " << msg); + QPID_LOG(warning, "Connect failed: " << msg); socket.close(); - if (!closed && shutdownHandler) { + if (!closed) closed = true; + if (shutdownHandler) shutdownHandler->shutdown(); - } } -bool TCPConnector::closeInternal() { +void TCPConnector::close() { Mutex::ScopedLock l(lock); - bool ret = !closed; - closed = true; - if (ret) { + if (!closed) { + closed = true; if (aio) - aio->queueForDeletion(); - socket.close(); + aio->queueWriteClose(); } - return ret; } -void TCPConnector::close() { - closeInternal(); +void TCPConnector::socketClosed(AsynchIO&, const Socket&) { + if (aio) + aio->queueForDeletion(); + if (shutdownHandler) + shutdownHandler->shutdown(); } void TCPConnector::abort() { @@ -214,11 +214,6 @@ void TCPConnector::send(AMQFrame& frame) { } } -void TCPConnector::handleClosed() { - if (closeInternal() && shutdownHandler) - shutdownHandler->shutdown(); -} - void TCPConnector::writebuff(AsynchIO& /*aio*/) { // It's possible to be disconnected and be writable @@ -315,7 +310,7 @@ void TCPConnector::writeDataBlock(const AMQDataBlock& data) { } void TCPConnector::eof(AsynchIO&) { - handleClosed(); + close(); } void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index 6d447def2e..04504a5173 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -76,9 +76,6 @@ class TCPConnector : public Connector, public sys::Codec boost::shared_ptr<sys::Poller> poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; - void handleClosed(); - bool closeInternal(); - virtual void connected(const sys::Socket&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -107,6 +104,7 @@ protected: bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void eof(qpid::sys::AsynchIO&); + void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&); public: TCPConnector(boost::shared_ptr<sys::Poller>, |
