summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp45
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h11
2 files changed, 49 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 1c737155a7..257d6f6999 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -87,6 +87,17 @@ ManagementAgentImpl::ManagementAgentImpl() :
// TODO: Establish system ID
}
+ManagementAgentImpl::~ManagementAgentImpl()
+{
+ connThreadBody.close();
+
+ // If the thread is doing work on the connection, we must wait for it to
+ // complete before shutting down.
+ if (!connThreadBody.isSleeping()) {
+ connThread.join();
+ }
+}
+
void ManagementAgentImpl::init(string brokerHost,
uint16_t brokerPort,
uint16_t intervalSeconds,
@@ -725,21 +736,31 @@ void ManagementAgentImpl::ConnectionThread::run()
delete subscriptions;
subscriptions = 0;
session.close();
+ connection.close();
}
} catch (std::exception &e) {
if (delay < delayMax)
delay *= delayFactor;
}
- ::sleep(delay);
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (shutdown)
+ return;
+ sleeping = true;
+ {
+ Mutex::ScopedUnlock _unlock(connLock);
+ ::sleep(delay);
+ }
+ sleeping = false;
+ if (shutdown)
+ return;
+ }
}
}
ManagementAgentImpl::ConnectionThread::~ConnectionThread()
{
- if (subscriptions != 0) {
- delete subscriptions;
- }
}
void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
@@ -773,6 +794,22 @@ void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint
arg::bindingKey=key.str());
}
+void ManagementAgentImpl::ConnectionThread::close()
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ shutdown = true;
+ }
+ if (subscriptions)
+ subscriptions->stop();
+}
+
+bool ManagementAgentImpl::ConnectionThread::isSleeping() const
+{
+ Mutex::ScopedLock _lock(connLock);
+ return sleeping;
+}
+
void ManagementAgentImpl::PublishThread::run()
{
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index d8ede8a80d..28fbb72f6f 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -43,7 +43,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
public:
ManagementAgentImpl();
- virtual ~ManagementAgentImpl() {};
+ virtual ~ManagementAgentImpl();
//
// Methods from ManagementAgent
@@ -156,17 +156,22 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
client::Session session;
client::SubscriptionManager* subscriptions;
std::stringstream queueName;
- sys::Mutex connLock;
+ mutable sys::Mutex connLock;
+ bool shutdown;
+ bool sleeping;
void run();
public:
ConnectionThread(ManagementAgentImpl& _agent) :
- operational(false), agent(_agent), subscriptions(0) {}
+ operational(false), agent(_agent), subscriptions(0),
+ shutdown(false), sleeping(false) {}
~ConnectionThread();
void sendBuffer(qpid::framing::Buffer& buf,
uint32_t length,
const std::string& exchange,
const std::string& routingKey);
void bindToBank(uint32_t brokerBank, uint32_t agentBank);
+ void close();
+ bool isSleeping() const;
};
class PublishThread : public sys::Runnable