summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp39
-rw-r--r--cpp/src/qpid/client/Connection.h6
-rw-r--r--cpp/src/qpid/client/Connector.cpp23
-rw-r--r--cpp/src/qpid/client/Connector.h2
4 files changed, 53 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index 48616cf3d9..177c9c4b73 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -75,30 +75,49 @@ void Connection::open(
}
void Connection::shutdown() {
- close();
+ //this indicates that the socket to the server has closed we do
+ //not want to send a close request (or any other requests)
+ if(markClosed()) {
+ std::cout << "Connection to peer closed!" << std::endl;
+ closeChannels();
+ }
}
void Connection::close(
ReplyCode code, const string& msg, ClassId classId, MethodId methodId
)
{
- if(isOpen) {
+ if(markClosed()) {
// TODO aconway 2007-01-29: Exception handling - could end up
// partly closed with threads left unjoined.
- isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
make_shared_ptr(new ConnectionCloseBody(
- getVersion(), code, msg, classId, methodId)));
-
- using boost::bind;
- for_each(channels.begin(), channels.end(),
- bind(&Channel::closeInternal,
- bind(&ChannelMap::value_type::second, _1)));
- channels.clear();
+ getVersion(), code, msg, classId, methodId)));
+ closeChannels();
connector->close();
}
}
+bool Connection::markClosed()
+{
+ Mutex::ScopedLock locker(shutdownLock);
+ if (isOpen) {
+ isOpen = false;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void Connection::closeChannels()
+{
+ using boost::bind;
+ for_each(channels.begin(), channels.end(),
+ bind(&Channel::closeInternal,
+ bind(&ChannelMap::value_type::second, _1)));
+ channels.clear();
+}
+
void Connection::openChannel(Channel& channel) {
ChannelId id = ++channelIdCounter;
assert (channels.find(id) == channels.end());
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 071a1d9446..c3b9aa33d0 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -26,6 +26,7 @@
#include "qpid/QpidError.h"
#include "ClientChannel.h"
#include "Connector.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -81,13 +82,16 @@ class Connection : public ConnectionForChannel
Connector defaultConnector;
Connector* connector;
framing::OutputHandler* out;
- volatile bool isOpen;
+ bool isOpen;
+ sys::Mutex shutdownLock;
Channel channel0;
bool debug;
void erase(framing::ChannelId);
void channelException(
Channel&, framing::AMQMethodBody*, const QpidError&);
+ void closeChannels();
+ bool markClosed();
// TODO aconway 2007-01-26: too many friendships, untagle these classes.
friend class Channel;
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 9230050ff7..8e2ff9a09c 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -61,9 +61,10 @@ void Connector::init(){
}
void Connector::close(){
- closed = true;
- socket.close();
- receiver.join();
+ if (markClosed()) {
+ socket.close();
+ receiver.join();
+ }
}
void Connector::setInputHandler(InputHandler* handler){
@@ -106,9 +107,19 @@ void Connector::writeToSocket(char* data, size_t available){
}
void Connector::handleClosed(){
- closed = true;
- socket.close();
- if(shutdownHandler) shutdownHandler->shutdown();
+ if (markClosed()) {
+ socket.close();
+ if(shutdownHandler) shutdownHandler->shutdown();
+ }
+}
+
+bool Connector::markClosed(){
+ if (closed) {
+ return false;
+ } else {
+ closed = true;
+ return true;
+ }
}
void Connector::checkIdle(ssize_t status){
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index c63a1ce8ac..10bde1b8ea 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -46,6 +46,7 @@ class Connector : public framing::OutputHandler,
framing::ProtocolVersion version;
bool closed;
+ sys::Mutex closedLock;
int64_t lastIn;
int64_t lastOut;
@@ -74,6 +75,7 @@ class Connector : public framing::OutputHandler,
void run();
void handleClosed();
+ bool markClosed();
friend class Channel;
public: