summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles E. Rolke <chug@apache.org>2013-11-21 18:24:09 +0000
committerCharles E. Rolke <chug@apache.org>2013-11-21 18:24:09 +0000
commit187a35a20459cdb9be1bb94309ce4a91c7b38572 (patch)
tree0fcaa507526523aab01356c9061828933efc071f
parentca56df8136bb82f074fae29f877e856a834120fc (diff)
downloadqpid-python-187a35a20459cdb9be1bb94309ce4a91c7b38572.tar.gz
QPID-5363: Add locks to prevent race condition in Amqp 1.0 transport - merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1544277 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp49
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h2
2 files changed, 35 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
index c0a9560c6f..c66f64567a 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
@@ -48,7 +48,7 @@ struct StaticInit
} init;
}
-TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p) {}
+TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p), closed(false) {}
void TcpTransport::connect(const std::string& host, const std::string& port)
{
@@ -66,6 +66,7 @@ void TcpTransport::connect(const std::string& host, const std::string& port)
void TcpTransport::failed(const std::string& msg)
{
QPID_LOG(debug, "Failed to connect: " << msg);
+ closed = true;
connector = 0;
socket->close();
context.closed();
@@ -118,9 +119,12 @@ void TcpTransport::write(AsynchIO&)
void TcpTransport::close()
{
- QPID_LOG(debug, id << " TcpTransport closing...");
- if (aio)
- aio->queueWriteClose();
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!closed) {
+ QPID_LOG(debug, id << " TcpTransport closing...");
+ if (aio)
+ aio->queueWriteClose();
+ }
}
void TcpTransport::eof(AsynchIO&)
@@ -136,31 +140,44 @@ void TcpTransport::disconnected(AsynchIO&)
void TcpTransport::socketClosed(AsynchIO&, const Socket&)
{
- if (aio)
- aio->queueForDeletion();
- context.closed();
- QPID_LOG(debug, id << " Socket closed");
+ bool notify(false);
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!closed) {
+ closed = true;
+ if (aio)
+ aio->queueForDeletion();
+ QPID_LOG(debug, id << " Socket closed");
+ notify = true;
+ } //else has already been closed
+ }
+ if (notify) context.closed();
}
void TcpTransport::abort()
{
- if (aio) {
- // Established connection
- aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1));
- } else if (connector) {
- // We're still connecting
- connector->stop();
- failed("Connection timedout");
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!closed) {
+ if (aio) {
+ // Established connection
+ aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1));
+ } else if (connector) {
+ // We're still connecting
+ connector->stop();
+ failed("Connection timedout");
+ }
}
}
void TcpTransport::activateOutput()
{
- if (aio) aio->notifyPendingWrite();
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (!closed && aio) aio->notifyPendingWrite();
}
const qpid::sys::SecuritySettings* TcpTransport::getSecuritySettings()
{
return 0;
}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
index 406791417c..cf5d4a0855 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
@@ -59,6 +59,8 @@ class TcpTransport : public Transport
qpid::sys::AsynchIO* aio;
boost::shared_ptr<qpid::sys::Poller> poller;
std::string id;
+ bool closed;
+ qpid::sys::Mutex lock;
void connected(const qpid::sys::Socket&);
void failed(const std::string& msg);