diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientConnection.cpp | 39 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 2 |
4 files changed, 53 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index 48616cf3d9..177c9c4b73 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -75,30 +75,49 @@ void Connection::open( } void Connection::shutdown() { - close(); + //this indicates that the socket to the server has closed we do + //not want to send a close request (or any other requests) + if(markClosed()) { + std::cout << "Connection to peer closed!" << std::endl; + closeChannels(); + } } void Connection::close( ReplyCode code, const string& msg, ClassId classId, MethodId methodId ) { - if(isOpen) { + if(markClosed()) { // TODO aconway 2007-01-29: Exception handling - could end up // partly closed with threads left unjoined. - isOpen = false; channel0.sendAndReceive<ConnectionCloseOkBody>( make_shared_ptr(new ConnectionCloseBody( - getVersion(), code, msg, classId, methodId))); - - using boost::bind; - for_each(channels.begin(), channels.end(), - bind(&Channel::closeInternal, - bind(&ChannelMap::value_type::second, _1))); - channels.clear(); + getVersion(), code, msg, classId, methodId))); + closeChannels(); connector->close(); } } +bool Connection::markClosed() +{ + Mutex::ScopedLock locker(shutdownLock); + if (isOpen) { + isOpen = false; + return true; + } else { + return false; + } +} + +void Connection::closeChannels() +{ + using boost::bind; + for_each(channels.begin(), channels.end(), + bind(&Channel::closeInternal, + bind(&ChannelMap::value_type::second, _1))); + channels.clear(); +} + void Connection::openChannel(Channel& channel) { ChannelId id = ++channelIdCounter; assert (channels.find(id) == channels.end()); diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 071a1d9446..c3b9aa33d0 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -26,6 +26,7 @@ #include "qpid/QpidError.h" #include "ClientChannel.h" #include "Connector.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -81,13 +82,16 @@ class Connection : public ConnectionForChannel Connector defaultConnector; Connector* connector; framing::OutputHandler* out; - volatile bool isOpen; + bool isOpen; + sys::Mutex shutdownLock; Channel channel0; bool debug; void erase(framing::ChannelId); void channelException( Channel&, framing::AMQMethodBody*, const QpidError&); + void closeChannels(); + bool markClosed(); // TODO aconway 2007-01-26: too many friendships, untagle these classes. friend class Channel; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 9230050ff7..8e2ff9a09c 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -61,9 +61,10 @@ void Connector::init(){ } void Connector::close(){ - closed = true; - socket.close(); - receiver.join(); + if (markClosed()) { + socket.close(); + receiver.join(); + } } void Connector::setInputHandler(InputHandler* handler){ @@ -106,9 +107,19 @@ void Connector::writeToSocket(char* data, size_t available){ } void Connector::handleClosed(){ - closed = true; - socket.close(); - if(shutdownHandler) shutdownHandler->shutdown(); + if (markClosed()) { + socket.close(); + if(shutdownHandler) shutdownHandler->shutdown(); + } +} + +bool Connector::markClosed(){ + if (closed) { + return false; + } else { + closed = true; + return true; + } } void Connector::checkIdle(ssize_t status){ diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index c63a1ce8ac..10bde1b8ea 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -46,6 +46,7 @@ class Connector : public framing::OutputHandler, framing::ProtocolVersion version; bool closed; + sys::Mutex closedLock; int64_t lastIn; int64_t lastOut; @@ -74,6 +75,7 @@ class Connector : public framing::OutputHandler, void run(); void handleClosed(); + bool markClosed(); friend class Channel; public: |