diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 6 |
4 files changed, 16 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index d0a12c8522..e284d57bec 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -76,10 +76,11 @@ TCPConnector::TCPConnector(Poller::shared_ptr p, initiated(false), closed(true), shutdownHandler(0), + connector(0), aio(0), poller(p) { - QPID_LOG(debug, "TCPConnector created for " << version.toString()); + QPID_LOG(debug, "TCPConnector created for " << version); settings.configureSocket(socket); } @@ -90,17 +91,18 @@ TCPConnector::~TCPConnector() { void TCPConnector::connect(const std::string& host, int port) { Mutex::ScopedLock l(lock); assert(closed); - AsynchConnector* c = AsynchConnector::create( + connector = AsynchConnector::create( socket, host, port, boost::bind(&TCPConnector::connected, this, _1), boost::bind(&TCPConnector::connectFailed, this, _3)); closed = false; - c->start(poller); + connector->start(poller); } void TCPConnector::connected(const Socket&) { + connector = 0; aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), @@ -128,6 +130,7 @@ void TCPConnector::initAmqp() { } void TCPConnector::connectFailed(const std::string& msg) { + connector = 0; QPID_LOG(warning, "Connect failed: " << msg); socket.close(); if (!closed) @@ -158,8 +161,9 @@ void TCPConnector::abort() { if (aio) { // Established connection aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); - } else { + } else if (connector) { // We're still connecting + connector->stop(); connectFailed("Connection timedout"); } } diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index bce5f593c6..c756469182 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -71,6 +71,7 @@ class TCPConnector : public Connector, public sys::Codec sys::Socket socket; + sys::AsynchConnector* connector; sys::AsynchIO* aio; std::string identifier; boost::shared_ptr<sys::Poller> poller; diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index f1841639ed..50da8fa4fc 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -69,7 +69,7 @@ public: ConnectedCallback connCb, FailedCallback failCb); virtual void start(boost::shared_ptr<Poller> poller) = 0; - + virtual void stop() {}; protected: AsynchConnector() {} virtual ~AsynchConnector() {} diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index cef9f1fcef..7d85b4325b 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -157,6 +157,7 @@ public: ConnectedCallback connCb, FailedCallback failCb); void start(Poller::shared_ptr poller); + void stop(); }; AsynchConnector::AsynchConnector(const Socket& s, @@ -183,6 +184,11 @@ void AsynchConnector::start(Poller::shared_ptr poller) startWatch(poller); } +void AsynchConnector::stop() +{ + stopWatch(); +} + void AsynchConnector::connComplete(DispatchHandle& h) { h.stopWatch(); |