summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp34
1 files changed, 24 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index fbd7dd3361..7879989bf0 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -32,6 +32,7 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
@@ -118,7 +119,9 @@ Broker::Options::Options(const std::string& name) :
maxSessionRate(0),
asyncQueueEvents(false), // Must be false in a cluster.
qmf2Support(true),
- qmf1Support(true)
+ qmf1Support(true),
+ queueFlowStopRatio(80),
+ queueFlowResumeRatio(70)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -151,7 +154,9 @@ Broker::Options::Options(const std::string& name) :
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location")
("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
- ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
+ ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
+ ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "%MESSAGES"), "Queue capacity level at which flow control is activated.")
+ ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "%MESSAGES"), "Queue capacity level at which flow control is de-activated.");
}
const std::string empty;
@@ -182,8 +187,9 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayHardLimit*1024),
*this),
queueCleaner(queues, timer),
- queueEvents(poller,!conf.asyncQueueEvents),
+ queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
+ inCluster(false),
clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
connectionCounter(conf.maxConnections),
@@ -240,8 +246,16 @@ Broker::Broker(const Broker::Options& conf) :
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
+ /** todo KAG - remove once cluster support for flow control done */
+ if (isInCluster()) {
+ QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default.");
+ QueueFlowLimit::setDefaults(0, 0, 0);
+ } else {
+ QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+ }
+
// If no plugin store module registered itself, set up the null store.
- if (NullMessageStore::isNullStore(store.get()))
+ if (NullMessageStore::isNullStore(store.get()))
setStore();
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -360,14 +374,14 @@ void Broker::run() {
Dispatcher d(poller);
int numIOThreads = config.workerThreads;
std::vector<Thread> t(numIOThreads-1);
-
+
// Run n-1 io threads
for (int i=0; i<numIOThreads-1; ++i)
t[i] = Thread(d);
-
+
// Run final thread
d.run();
-
+
// Now wait for n-1 io threads to exit
for (int i=0; i<numIOThreads-1; ++i) {
t[i].join();
@@ -414,9 +428,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
{
case _qmf::Broker::METHOD_ECHO :
QPID_LOG (debug, "Broker::echo("
- << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
- << ", "
- << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
+ << ", "
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
<< ")");
status = Manageable::STATUS_OK;
break;