summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-06-11 08:42:37 +0000
committerGordon Sim <gsim@apache.org>2010-06-11 08:42:37 +0000
commit4467d34858ea0ab6d11fbc90f81162e642fe3133 (patch)
tree55725d96eb38456f755e59410253ad6ccb737cf1
parent85e7cf723b05620e1ed44d50a9533811e823585f (diff)
downloadqpid-python-4467d34858ea0ab6d11fbc90f81162e642fe3133.tar.gz
Ensure that AsynchConnector is disassociated from Poller when aborting connection attempt due to a heartbeat timeout
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@953610 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp12
-rw-r--r--cpp/src/qpid/client/TCPConnector.h1
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h2
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp6
4 files changed, 16 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
index d0a12c8522..e284d57bec 100644
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -76,10 +76,11 @@ TCPConnector::TCPConnector(Poller::shared_ptr p,
initiated(false),
closed(true),
shutdownHandler(0),
+ connector(0),
aio(0),
poller(p)
{
- QPID_LOG(debug, "TCPConnector created for " << version.toString());
+ QPID_LOG(debug, "TCPConnector created for " << version);
settings.configureSocket(socket);
}
@@ -90,17 +91,18 @@ TCPConnector::~TCPConnector() {
void TCPConnector::connect(const std::string& host, int port) {
Mutex::ScopedLock l(lock);
assert(closed);
- AsynchConnector* c = AsynchConnector::create(
+ connector = AsynchConnector::create(
socket,
host, port,
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
closed = false;
- c->start(poller);
+ connector->start(poller);
}
void TCPConnector::connected(const Socket&) {
+ connector = 0;
aio = AsynchIO::create(socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
@@ -128,6 +130,7 @@ void TCPConnector::initAmqp() {
}
void TCPConnector::connectFailed(const std::string& msg) {
+ connector = 0;
QPID_LOG(warning, "Connect failed: " << msg);
socket.close();
if (!closed)
@@ -158,8 +161,9 @@ void TCPConnector::abort() {
if (aio) {
// Established connection
aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
- } else {
+ } else if (connector) {
// We're still connecting
+ connector->stop();
connectFailed("Connection timedout");
}
}
diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h
index bce5f593c6..c756469182 100644
--- a/cpp/src/qpid/client/TCPConnector.h
+++ b/cpp/src/qpid/client/TCPConnector.h
@@ -71,6 +71,7 @@ class TCPConnector : public Connector, public sys::Codec
sys::Socket socket;
+ sys::AsynchConnector* connector;
sys::AsynchIO* aio;
std::string identifier;
boost::shared_ptr<sys::Poller> poller;
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index f1841639ed..50da8fa4fc 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -69,7 +69,7 @@ public:
ConnectedCallback connCb,
FailedCallback failCb);
virtual void start(boost::shared_ptr<Poller> poller) = 0;
-
+ virtual void stop() {};
protected:
AsynchConnector() {}
virtual ~AsynchConnector() {}
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index cef9f1fcef..7d85b4325b 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -157,6 +157,7 @@ public:
ConnectedCallback connCb,
FailedCallback failCb);
void start(Poller::shared_ptr poller);
+ void stop();
};
AsynchConnector::AsynchConnector(const Socket& s,
@@ -183,6 +184,11 @@ void AsynchConnector::start(Poller::shared_ptr poller)
startWatch(poller);
}
+void AsynchConnector::stop()
+{
+ stopWatch();
+}
+
void AsynchConnector::connComplete(DispatchHandle& h)
{
h.stopWatch();