summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-10-08 08:55:44 +0000
committerGordon Sim <gsim@apache.org>2009-10-08 08:55:44 +0000
commit4bd11cf3d3ccce1b70f58d85d1ba9f1270e4aa63 (patch)
tree13fa5633e9500d69dad10948a6db95d061e7231c
parente5af89ce4c567828691bf379996bdf0f2ca0d98a (diff)
downloadqpid-python-4bd11cf3d3ccce1b70f58d85d1ba9f1270e4aa63.tar.gz
QPID-2132: Applied patch from Ken Giusti
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@823094 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp43
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h6
2 files changed, 30 insertions, 19 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 093e9cea32..a537127119 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -78,7 +78,7 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
- interval(10), extThread(false),
+ interval(10), extThread(false), pipeHandle(0),
initialized(false), connected(false), lastFailure("never connected"),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
@@ -89,13 +89,11 @@ ManagementAgentImpl::ManagementAgentImpl() :
ManagementAgentImpl::~ManagementAgentImpl()
{
+ // shutdown the connection thread
connThreadBody.close();
+ connThread.join();
- // If the thread is doing work on the connection, we must wait for it to
- // complete before shutting down.
- if (!connThreadBody.isSleeping()) {
- connThread.join();
- }
+ // @todo need to shutdown pubThread?
// Release the memory associated with stored management objects.
{
@@ -777,6 +775,7 @@ void ManagementAgentImpl::ConnectionThread::run()
static const int delayFactor(2);
int delay(delayMin);
string dest("qmfagent");
+ ConnectionThread::shared_ptr tmp;
sessionId.generate();
queueName << "qmfagent-" << sessionId;
@@ -787,7 +786,7 @@ void ManagementAgentImpl::ConnectionThread::run()
QPID_LOG(debug, "QMF Agent attempting to connect to the broker...");
connection.open(agent.connectionSettings);
session = connection.newSession(queueName.str());
- subscriptions = new client::SubscriptionManager(session);
+ subscriptions.reset(new client::SubscriptionManager(session));
session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
arg::exclusive=true);
@@ -811,11 +810,12 @@ void ManagementAgentImpl::ConnectionThread::run()
operational = false;
agent.connected = false;
+ tmp = subscriptions;
+ subscriptions.reset();
}
+ tmp.reset(); // frees the subscription outside the lock
delay = delayMin;
connection.close();
- delete subscriptions;
- subscriptions = 0;
}
} catch (exception &e) {
if (delay < delayMax)
@@ -824,14 +824,19 @@ void ManagementAgentImpl::ConnectionThread::run()
}
{
+ // sleep for "delay" seconds, but peridically check if the
+ // agent is shutting down so we don't hang for up to delayMax
+ // seconds during agent shutdown
Mutex::ScopedLock _lock(connLock);
if (shutdown)
return;
sleeping = true;
- {
- Mutex::ScopedUnlock _unlock(connLock);
- ::sleep(delay);
- }
+ int totalSleep = 0;
+ do {
+ Mutex::ScopedUnlock _unlock(connLock);
+ ::sleep(delayMin);
+ totalSleep += delayMin;
+ } while (totalSleep < delay && !shutdown);
sleeping = false;
if (shutdown)
return;
@@ -848,10 +853,12 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
const string& exchange,
const string& routingKey)
{
+ ConnectionThread::shared_ptr s;
{
Mutex::ScopedLock _lock(connLock);
if (!operational)
return;
+ s = subscriptions;
}
Message msg;
@@ -866,8 +873,8 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
} catch(exception& e) {
QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
// Bounce the connection
- if (subscriptions)
- subscriptions->stop();
+ if (s)
+ s->stop();
}
}
@@ -881,12 +888,14 @@ void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint
void ManagementAgentImpl::ConnectionThread::close()
{
+ ConnectionThread::shared_ptr s;
{
Mutex::ScopedLock _lock(connLock);
shutdown = true;
+ s = subscriptions;
}
- if (subscriptions)
- subscriptions->stop();
+ if (s)
+ s->stop();
}
bool ManagementAgentImpl::ConnectionThread::isSleeping() const
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index f9cad9ebf5..63366823fe 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -163,12 +163,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
friend class ConnectionThread;
class ConnectionThread : public sys::Runnable
{
+ typedef boost::shared_ptr<client::SubscriptionManager> shared_ptr;
+
bool operational;
ManagementAgentImpl& agent;
framing::Uuid sessionId;
client::Connection connection;
client::Session session;
- client::SubscriptionManager* subscriptions;
+ ConnectionThread::shared_ptr subscriptions;
std::stringstream queueName;
mutable sys::Mutex connLock;
bool shutdown;
@@ -176,7 +178,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void run();
public:
ConnectionThread(ManagementAgentImpl& _agent) :
- operational(false), agent(_agent), subscriptions(0),
+ operational(false), agent(_agent),
shutdown(false), sleeping(false) {}
~ConnectionThread();
void sendBuffer(qpid::framing::Buffer& buf,