summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp48
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h8
-rw-r--r--cpp/src/qpid/client/SslConnector.cpp28
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp33
-rw-r--r--cpp/src/qpid/client/TCPConnector.h4
5 files changed, 73 insertions, 48 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index b6043518e8..6b352e2e65 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -166,7 +166,8 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
version(v),
- nextChannel(1)
+ nextChannel(1),
+ shutdownComplete(false)
{
QPID_LOG(debug, "ConnectionImpl created for " << version.toString());
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
@@ -181,10 +182,12 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti
const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max();
ConnectionImpl::~ConnectionImpl() {
- // Important to close the connector first, to ensure the
- // connector thread does not call on us while the destructor
- // is running.
- if (connector) connector->close();
+ if (connector) {
+ connector->close();
+ //wait until we get the shutdown callback to ensure that the
+ //io threads will not call back on us after deletion
+ waitForShutdownComplete();
+ }
theIO().sub();
}
@@ -244,7 +247,13 @@ void ConnectionImpl::open()
connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this));
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
- connector->connect(host, port);
+ try {
+ connector->connect(host, port);
+ } catch (const std::exception& e) {
+ QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
+ connector.reset();
+ throw;
+ }
connector->init();
// Enable heartbeat if requested
@@ -330,9 +339,22 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) {
closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
}
+void ConnectionImpl::shutdown() {
+ //May need to take a temporary reference to ourselves to prevent
+ //our destructor being called until after we have notified of
+ //shutdown completion; the destructor may be called as a result of
+ //the call to failedConnection().
+ boost::shared_ptr<ConnectionImpl> temp;
+ if (!handler.isClosed()) {
+ temp = shared_from_this();
+ failedConnection();
+ }
+ notifyShutdownComplete();
+}
+
static const std::string CONN_CLOSED("Connection closed");
-void ConnectionImpl::shutdown() {
+void ConnectionImpl::failedConnection() {
if ( failureCallback )
failureCallback();
@@ -375,4 +397,16 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na
return simpl;
}
+void ConnectionImpl::waitForShutdownComplete()
+{
+ Mutex::ScopedLock l(lock);
+ while(!shutdownComplete) lock.wait();
+}
+void ConnectionImpl::notifyShutdownComplete()
+{
+ Mutex::ScopedLock l(lock);
+ shutdownComplete = true;
+ lock.notifyAll();
+}
+
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index 1d88fcaf99..c2313e72bc 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -26,7 +26,7 @@
#include "qpid/client/ConnectionHandler.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -60,7 +60,8 @@ class ConnectionImpl : public Bounds,
boost::scoped_ptr<Connector> connector;
framing::ProtocolVersion version;
uint16_t nextChannel;
- sys::Mutex lock;
+ sys::Monitor lock;
+ bool shutdownComplete;
boost::intrusive_ptr<qpid::sys::TimerTask> heartbeatTask;
@@ -72,6 +73,9 @@ class ConnectionImpl : public Bounds,
void idleOut();
void idleIn();
void shutdown();
+ void failedConnection();
+ void waitForShutdownComplete();
+ void notifyShutdownComplete();
boost::function<void ()> failureCallback;
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp
index 0c794145db..990ca90de7 100644
--- a/cpp/src/qpid/client/SslConnector.cpp
+++ b/cpp/src/qpid/client/SslConnector.cpp
@@ -106,9 +106,6 @@ class SslConnector : public Connector
~SslConnector();
- void handleClosed();
- bool closeInternal();
-
void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*);
void writebuff(qpid::sys::ssl::SslIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
@@ -128,6 +125,7 @@ class SslConnector : public Connector
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
const SecuritySettings* getSecuritySettings();
+ void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
public:
SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
@@ -204,7 +202,7 @@ void SslConnector::connect(const std::string& host, int port){
boost::bind(&SslConnector::readbuff, this, _1, _2),
boost::bind(&SslConnector::eof, this, _1),
boost::bind(&SslConnector::eof, this, _1),
- 0, // closed
+ boost::bind(&SslConnector::socketClosed, this, _1, _2),
0, // nobuffs
boost::bind(&SslConnector::writebuff, this, _1));
writer.init(identifier, aio);
@@ -220,19 +218,20 @@ void SslConnector::init(){
aio->start(poller);
}
-bool SslConnector::closeInternal() {
+void SslConnector::close() {
Mutex::ScopedLock l(closedLock);
- bool ret = !closed;
if (!closed) {
closed = true;
- aio->queueForDeletion();
- socket.close();
+ if (aio)
+ aio->queueWriteClose();
}
- return ret;
}
-void SslConnector::close() {
- closeInternal();
+void SslConnector::socketClosed(SslIO&, const SslSocket&) {
+ if (aio)
+ aio->queueForDeletion();
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
}
void SslConnector::setInputHandler(InputHandler* handler){
@@ -259,11 +258,6 @@ void SslConnector::send(AMQFrame& frame) {
writer.handle(frame);
}
-void SslConnector::handleClosed() {
- if (closeInternal() && shutdownHandler)
- shutdownHandler->shutdown();
-}
-
SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
{
}
@@ -365,7 +359,7 @@ void SslConnector::writeDataBlock(const AMQDataBlock& data) {
}
void SslConnector::eof(SslIO&) {
- handleClosed();
+ close();
}
const SecuritySettings* SslConnector::getSecuritySettings()
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
index 1a245fe2c8..3ead7e2ce0 100644
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -105,7 +105,7 @@ void TCPConnector::connected(const Socket&) {
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
boost::bind(&TCPConnector::eof, this, _1),
- 0, // closed
+ boost::bind(&TCPConnector::socketClosed, this, _1, _2),
0, // nobuffs
boost::bind(&TCPConnector::writebuff, this, _1));
start(aio);
@@ -128,28 +128,28 @@ void TCPConnector::initAmqp() {
}
void TCPConnector::connectFailed(const std::string& msg) {
- QPID_LOG(warning, "Connecting failed: " << msg);
+ QPID_LOG(warning, "Connect failed: " << msg);
socket.close();
- if (!closed && shutdownHandler) {
+ if (!closed)
closed = true;
+ if (shutdownHandler)
shutdownHandler->shutdown();
- }
}
-bool TCPConnector::closeInternal() {
+void TCPConnector::close() {
Mutex::ScopedLock l(lock);
- bool ret = !closed;
- closed = true;
- if (ret) {
+ if (!closed) {
+ closed = true;
if (aio)
- aio->queueForDeletion();
- socket.close();
+ aio->queueWriteClose();
}
- return ret;
}
-void TCPConnector::close() {
- closeInternal();
+void TCPConnector::socketClosed(AsynchIO&, const Socket&) {
+ if (aio)
+ aio->queueForDeletion();
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
}
void TCPConnector::abort() {
@@ -214,11 +214,6 @@ void TCPConnector::send(AMQFrame& frame) {
}
}
-void TCPConnector::handleClosed() {
- if (closeInternal() && shutdownHandler)
- shutdownHandler->shutdown();
-}
-
void TCPConnector::writebuff(AsynchIO& /*aio*/)
{
// It's possible to be disconnected and be writable
@@ -315,7 +310,7 @@ void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
}
void TCPConnector::eof(AsynchIO&) {
- handleClosed();
+ close();
}
void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h
index 6d447def2e..04504a5173 100644
--- a/cpp/src/qpid/client/TCPConnector.h
+++ b/cpp/src/qpid/client/TCPConnector.h
@@ -76,9 +76,6 @@ class TCPConnector : public Connector, public sys::Codec
boost::shared_ptr<sys::Poller> poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
- void handleClosed();
- bool closeInternal();
-
virtual void connected(const sys::Socket&);
void writeDataBlock(const framing::AMQDataBlock& data);
@@ -107,6 +104,7 @@ protected:
bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void eof(qpid::sys::AsynchIO&);
+ void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&);
public:
TCPConnector(boost::shared_ptr<sys::Poller>,