diff options
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index fbd7dd3361..7879989bf0 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -32,6 +32,7 @@ #include "qpid/broker/TopicExchange.h" #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" @@ -118,7 +119,9 @@ Broker::Options::Options(const std::string& name) : maxSessionRate(0), asyncQueueEvents(false), // Must be false in a cluster. qmf2Support(true), - qmf1Support(true) + qmf1Support(true), + queueFlowStopRatio(80), + queueFlowResumeRatio(70) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -151,7 +154,9 @@ Broker::Options::Options(const std::string& name) : ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") ("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location") ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") - ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication"); + ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") + ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "%MESSAGES"), "Queue capacity level at which flow control is activated.") + ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "%MESSAGES"), "Queue capacity level at which flow control is de-activated."); } const std::string empty; @@ -182,8 +187,9 @@ Broker::Broker(const Broker::Options& conf) : conf.replayHardLimit*1024), *this), queueCleaner(queues, timer), - queueEvents(poller,!conf.asyncQueueEvents), + queueEvents(poller,!conf.asyncQueueEvents), recovery(true), + inCluster(false), clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), connectionCounter(conf.maxConnections), @@ -240,8 +246,16 @@ Broker::Broker(const Broker::Options& conf) : // Early-Initialize plugins Plugin::earlyInitAll(*this); + /** todo KAG - remove once cluster support for flow control done */ + if (isInCluster()) { + QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default."); + QueueFlowLimit::setDefaults(0, 0, 0); + } else { + QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); + } + // If no plugin store module registered itself, set up the null store. - if (NullMessageStore::isNullStore(store.get())) + if (NullMessageStore::isNullStore(store.get())) setStore(); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. @@ -360,14 +374,14 @@ void Broker::run() { Dispatcher d(poller); int numIOThreads = config.workerThreads; std::vector<Thread> t(numIOThreads-1); - + // Run n-1 io threads for (int i=0; i<numIOThreads-1; ++i) t[i] = Thread(d); - + // Run final thread d.run(); - + // Now wait for n-1 io threads to exit for (int i=0; i<numIOThreads-1; ++i) { t[i].join(); @@ -414,9 +428,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, { case _qmf::Broker::METHOD_ECHO : QPID_LOG (debug, "Broker::echo(" - << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence - << ", " - << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence + << ", " + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body << ")"); status = Manageable::STATUS_OK; break; |