summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp7
-rw-r--r--cpp/src/qpid/cluster/Cluster.h9
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp7
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp6
-rw-r--r--cpp/src/tests/latencytest.cpp2
5 files changed, 18 insertions, 13 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 602933b88b..8d9b5a1864 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -85,7 +85,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool useQuorum) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_) :
broker(b),
poller(b.getPoller()),
cpg(*this),
@@ -104,7 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
mgmtObject(0),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ readMax(readMax_)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -119,7 +120,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
deliverQueue.start();
mcastQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
- if (useQuorum) quorum.init();
+ if (quorum_) quorum.init();
cpg.join(name);
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 2ab2da6fa8..e172a0f180 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -64,11 +64,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
typedef std::vector<ConnectionPtr> Connections;
/**
- * Join a cluster.
- * @param name of the cluster.
- * @param url of this broker, sent to the cluster.
+ * Join a cluster.
*/
- Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum);
+ Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, size_t readMax);
virtual ~Cluster();
@@ -95,6 +93,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::function<bool ()> isQuorate;
void checkQuorum();
+
+ size_t getReadMax() { return readMax; }
private:
typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
@@ -215,6 +215,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
+ size_t readMax;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 02e6fffb71..5e6a5049a5 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -41,8 +41,10 @@ struct ClusterValues {
string name;
string url;
bool quorum;
+ size_t readMax;
- ClusterValues() : quorum(false) {}
+ // FIXME aconway 2008-12-09: revisit default.
+ ClusterValues() : quorum(false), readMax(4) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -66,6 +68,7 @@ struct ClusterOptions : public Options {
#if HAVE_LIBCMAN
("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
+ ("cluster-read-max", optValue(values.readMax,"N"), "Max un-delivered reads per client connection, 0 means unlimited.")
;
}
};
@@ -85,7 +88,7 @@ struct ClusterPlugin : public Plugin {
if (values.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum);
+ cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index fae81acf00..a422164c81 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -74,8 +74,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
if (isLocal() && !isCatchUp()) {
- // FIXME aconway 2008-12-05: configurable credit limit
- output.giveReadCredit(10);
+ output.giveReadCredit(cluster.getReadMax());
}
}
@@ -204,7 +203,8 @@ void Connection::deliverBuffer(Buffer& buf) {
++deliverSeq;
while (mcastDecoder.decode(buf))
delivered(mcastDecoder.frame);
- output.giveReadCredit(1);
+ if (cluster.getReadMax())
+ output.giveReadCredit(1);
}
broker::SessionState& Connection::sessionState() {
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 25de361d11..6895964133 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -56,7 +56,7 @@ struct Args : public qpid::TestOptions {
bool durable;
string base;
- Args() : size(256), count(1000), rate(0), reportFrequency(100),
+ Args() : size(256), count(1000), rate(0), reportFrequency(1000),
timeLimit(0), queues(1),
prefetch(100), ack(0),
durable(false), base("latency-test")