diff options
author | Ted Ross <tross@apache.org> | 2011-08-15 16:47:56 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2011-08-15 16:47:56 +0000 |
commit | de19ad9e7157f1b03442f7e1f8136a4f280c0f2f (patch) | |
tree | d89370f6b2526c3966216d40b343e50ccde17476 | |
parent | 67663fcc5f0d01f1df1a8cc006ec86725031f10a (diff) | |
download | qpid-python-de19ad9e7157f1b03442f7e1f8136a4f280c0f2f.tar.gz |
QPID-3423 - Timing and Performance Improvements in QMF Libraries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1157907 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/include/qmf/AgentSession.h | 5 | ||||
-rw-r--r-- | cpp/include/qmf/ConsoleSession.h | 4 | ||||
-rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 43 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleSessionImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 19 |
6 files changed, 86 insertions, 27 deletions
diff --git a/cpp/include/qmf/AgentSession.h b/cpp/include/qmf/AgentSession.h index 1eeb252143..5ecfb0412c 100644 --- a/cpp/include/qmf/AgentSession.h +++ b/cpp/include/qmf/AgentSession.h @@ -71,6 +71,11 @@ namespace qmf { * If False: Listen only on the routable direct address * strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network * - If False: Operate more flexibly with regard to use of messaging facilities [default] + * max-thread-wait-time:N - Time (in seconds) the session thread will wait for messages from the network between + * periodic background processing passes. [default: 5] + * Must not be greater than 'interval'. Larger numbers will cause fewer wake-ups but will + * increase the time it takes to shut down the process. This setting will not affect the + * agent's response time for queries or method invocation. */ QMF_EXTERN AgentSession(qpid::messaging::Connection& conn, const std::string& options=""); diff --git a/cpp/include/qmf/ConsoleSession.h b/cpp/include/qmf/ConsoleSession.h index 6008036eec..5e3a091e5d 100644 --- a/cpp/include/qmf/ConsoleSession.h +++ b/cpp/include/qmf/ConsoleSession.h @@ -61,6 +61,10 @@ namespace qmf { * If False: Listen only on the routable direct address * strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network * - If False: Operate more flexibly with regard to use of messaging facilities [default] + * max-thread-wait-time:N - Time (in seconds) the session thread will wait for messages from the network between + * periodic background processing passes. + * Must not be greater than 60. Larger numbers will cause fewer wake-ups but will + * increase the time it takes to shut down the process. [default: 5] */ QMF_EXTERN ConsoleSession(qpid::messaging::Connection& conn, const std::string& options=""); diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index 71d369325f..a88782d107 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -120,6 +120,7 @@ namespace qmf { bool publicEvents; bool listenOnDirect; bool strictSecurity; + uint32_t maxThreadWaitTime; uint64_t schemaUpdateTime; string directBase; string topicBase; @@ -185,7 +186,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), - listenOnDirect(true), strictSecurity(false), + listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // @@ -246,7 +247,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); + + iter = optMap.find("max-thread-wait-time"); + if (iter != optMap.end()) + maxThreadWaitTime = iter->second.asUint32(); } + + if (maxThreadWaitTime > interval) + maxThreadWaitTime = interval; } @@ -254,6 +262,11 @@ AgentSessionImpl::~AgentSessionImpl() { if (opened) close(); + + if (thread) { + thread->join(); + delete thread; + } } @@ -262,6 +275,12 @@ void AgentSessionImpl::open() if (opened) throw QmfException("The session is already open"); + // If the thread exists, join and delete it before creating a new one. + if (thread) { + thread->join(); + delete thread; + } + const string addrArgs(";{create:never,node:{type:topic}}"); const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str()); attributes["_direct_subject"] = routableAddr; @@ -304,13 +323,8 @@ void AgentSessionImpl::close() if (!opened) return; - // Stop and join the receiver thread + // Stop the receiver thread. Don't join it until the destructor is called or open() is called. threadCanceled = true; - thread->join(); - delete thread; - - // Close the AMQP session - session.close(); opened = false; } @@ -320,9 +334,13 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty() && milliseconds > 0) - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), - qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (eventQueue.empty() && milliseconds > 0) { + int64_t nsecs(qpid::sys::TIME_INFINITE); + if ((uint64_t)(nsecs / 1000000) > milliseconds) + nsecs = (int64_t) milliseconds * 1000000; + qpid::sys::Duration then(nsecs); + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); + } if (!eventQueue.empty()) { event = eventQueue.front(); @@ -1050,7 +1068,7 @@ void AgentSessionImpl::run() periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND); + bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); if (threadCanceled) break; if (valid) { @@ -1067,6 +1085,7 @@ void AgentSessionImpl::run() enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED))); } + session.close(); QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); } diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index 7b51d80032..af835959cf 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -66,7 +66,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), + connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1) { @@ -92,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); + + iter = optMap.find("max-thread-wait-time"); + if (iter != optMap.end()) + maxThreadWaitTime = iter->second.asUint32(); } + + if (maxThreadWaitTime > 60) + maxThreadWaitTime = 60; } @@ -100,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl() { if (opened) close(); + + if (thread) { + thread->join(); + delete thread; + } } @@ -154,6 +166,12 @@ void ConsoleSessionImpl::open() if (opened) throw QmfException("The session is already open"); + // If the thread exists, join and delete it before creating a new one. + if (thread) { + thread->join(); + delete thread; + } + // Establish messaging addresses directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; @@ -182,14 +200,13 @@ void ConsoleSessionImpl::open() // Start the receiver thread threadCanceled = false; + opened = true; thread = new qpid::sys::Thread(*this); // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. sendBrokerLocate(); if (agentQuery) sendAgentLocate(); - - opened = true; } @@ -198,13 +215,8 @@ void ConsoleSessionImpl::close() if (!opened) throw QmfException("The session is already closed"); - // Stop and join the receiver thread + // Stop the receiver thread. Don't join it until the destructor is called or open() is called. threadCanceled = true; - thread->join(); - delete thread; - - // Close the AMQP session - session.close(); opened = false; } @@ -214,9 +226,13 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty() && milliseconds > 0) - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), - qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (eventQueue.empty() && milliseconds > 0) { + int64_t nsecs(qpid::sys::TIME_INFINITE); + if ((uint64_t)(nsecs / 1000000) > milliseconds) + nsecs = (int64_t) milliseconds * 1000000; + qpid::sys::Duration then(nsecs); + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); + } if (!eventQueue.empty()) { event = eventQueue.front(); @@ -596,7 +612,7 @@ void ConsoleSessionImpl::run() qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND); + bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); if (threadCanceled) break; if (valid) { @@ -613,6 +629,7 @@ void ConsoleSessionImpl::run() enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED))); } + session.close(); QPID_LOG(debug, "ConsoleSession thread exiting"); } diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h index 429dfc4881..478d24e56b 100644 --- a/cpp/src/qmf/ConsoleSessionImpl.h +++ b/cpp/src/qmf/ConsoleSessionImpl.h @@ -76,6 +76,7 @@ namespace qmf { uint32_t maxAgentAgeMinutes; bool listenOnDirect; bool strictSecurity; + uint32_t maxThreadWaitTime; Query agentQuery; bool opened; std::queue<ConsoleEvent> eventQueue; diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 633401ef5b..f183ff8e0c 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -1378,13 +1378,26 @@ bool ManagementAgentImpl::ConnectionThread::isSleeping() const void ManagementAgentImpl::PublishThread::run() { - uint16_t totalSleep; + uint16_t totalSleep; + uint16_t sleepTime; while (!shutdown) { agent.periodicProcessing(); totalSleep = 0; - while (totalSleep++ < agent.getInterval() && !shutdown) { - ::sleep(1); + + // + // Calculate a sleep time that is no greater than 5 seconds and + // no less than 1 second. + // + sleepTime = agent.getInterval(); + if (sleepTime > 5) + sleepTime = 5; + else if (sleepTime == 0) + sleepTime = 1; + + while (totalSleep < agent.getInterval() && !shutdown) { + ::sleep(sleepTime); + totalSleep += sleepTime; } } } |