summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/AgentSession.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-08-15 16:47:56 +0000
committerTed Ross <tross@apache.org>2011-08-15 16:47:56 +0000
commitde19ad9e7157f1b03442f7e1f8136a4f280c0f2f (patch)
treed89370f6b2526c3966216d40b343e50ccde17476 /cpp/src/qmf/AgentSession.cpp
parent67663fcc5f0d01f1df1a8cc006ec86725031f10a (diff)
downloadqpid-python-de19ad9e7157f1b03442f7e1f8136a4f280c0f2f.tar.gz
QPID-3423 - Timing and Performance Improvements in QMF Libraries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1157907 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/AgentSession.cpp')
-rw-r--r--cpp/src/qmf/AgentSession.cpp41
1 files changed, 30 insertions, 11 deletions
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index 71d369325f..a88782d107 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -120,6 +120,7 @@ namespace qmf {
bool publicEvents;
bool listenOnDirect;
bool strictSecurity;
+ uint32_t maxThreadWaitTime;
uint64_t schemaUpdateTime;
string directBase;
string topicBase;
@@ -185,7 +186,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
- listenOnDirect(true), strictSecurity(false),
+ listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -246,7 +247,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
}
+
+ if (maxThreadWaitTime > interval)
+ maxThreadWaitTime = interval;
}
@@ -254,6 +262,11 @@ AgentSessionImpl::~AgentSessionImpl()
{
if (opened)
close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
}
@@ -262,6 +275,12 @@ void AgentSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
const string addrArgs(";{create:never,node:{type:topic}}");
const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
attributes["_direct_subject"] = routableAddr;
@@ -304,13 +323,8 @@ void AgentSessionImpl::close()
if (!opened)
return;
- // Stop and join the receiver thread
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
threadCanceled = true;
- thread->join();
- delete thread;
-
- // Close the AMQP session
- session.close();
opened = false;
}
@@ -320,9 +334,13 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty() && milliseconds > 0)
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
- qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
if (!eventQueue.empty()) {
event = eventQueue.front();
@@ -1050,7 +1068,7 @@ void AgentSessionImpl::run()
periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND);
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
if (threadCanceled)
break;
if (valid) {
@@ -1067,6 +1085,7 @@ void AgentSessionImpl::run()
enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
}
+ session.close();
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}