diff options
author | Charles E. Rolke <chug@apache.org> | 2013-11-21 18:24:09 +0000 |
---|---|---|
committer | Charles E. Rolke <chug@apache.org> | 2013-11-21 18:24:09 +0000 |
commit | 187a35a20459cdb9be1bb94309ce4a91c7b38572 (patch) | |
tree | 0fcaa507526523aab01356c9061828933efc071f | |
parent | ca56df8136bb82f074fae29f877e856a834120fc (diff) | |
download | qpid-python-187a35a20459cdb9be1bb94309ce4a91c7b38572.tar.gz |
QPID-5363: Add locks to prevent race condition in Amqp 1.0 transport - merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1544277 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp | 49 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h | 2 |
2 files changed, 35 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp index c0a9560c6f..c66f64567a 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp @@ -48,7 +48,7 @@ struct StaticInit } init; } -TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p) {} +TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p), closed(false) {} void TcpTransport::connect(const std::string& host, const std::string& port) { @@ -66,6 +66,7 @@ void TcpTransport::connect(const std::string& host, const std::string& port) void TcpTransport::failed(const std::string& msg) { QPID_LOG(debug, "Failed to connect: " << msg); + closed = true; connector = 0; socket->close(); context.closed(); @@ -118,9 +119,12 @@ void TcpTransport::write(AsynchIO&) void TcpTransport::close() { - QPID_LOG(debug, id << " TcpTransport closing..."); - if (aio) - aio->queueWriteClose(); + qpid::sys::Mutex::ScopedLock l(lock); + if (!closed) { + QPID_LOG(debug, id << " TcpTransport closing..."); + if (aio) + aio->queueWriteClose(); + } } void TcpTransport::eof(AsynchIO&) @@ -136,31 +140,44 @@ void TcpTransport::disconnected(AsynchIO&) void TcpTransport::socketClosed(AsynchIO&, const Socket&) { - if (aio) - aio->queueForDeletion(); - context.closed(); - QPID_LOG(debug, id << " Socket closed"); + bool notify(false); + { + qpid::sys::Mutex::ScopedLock l(lock); + if (!closed) { + closed = true; + if (aio) + aio->queueForDeletion(); + QPID_LOG(debug, id << " Socket closed"); + notify = true; + } //else has already been closed + } + if (notify) context.closed(); } void TcpTransport::abort() { - if (aio) { - // Established connection - aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1)); - } else if (connector) { - // We're still connecting - connector->stop(); - failed("Connection timedout"); + qpid::sys::Mutex::ScopedLock l(lock); + if (!closed) { + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1)); + } else if (connector) { + // We're still connecting + connector->stop(); + failed("Connection timedout"); + } } } void TcpTransport::activateOutput() { - if (aio) aio->notifyPendingWrite(); + qpid::sys::Mutex::ScopedLock l(lock); + if (!closed && aio) aio->notifyPendingWrite(); } const qpid::sys::SecuritySettings* TcpTransport::getSecuritySettings() { return 0; } + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h index 406791417c..cf5d4a0855 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h +++ b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h @@ -59,6 +59,8 @@ class TcpTransport : public Transport qpid::sys::AsynchIO* aio; boost::shared_ptr<qpid::sys::Poller> poller; std::string id; + bool closed; + qpid::sys::Mutex lock; void connected(const qpid::sys::Socket&); void failed(const std::string& msg); |