summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-19 17:22:52 +0000
committerAlan Conway <aconway@apache.org>2008-12-19 17:22:52 +0000
commitff4ec8aea5752b5672919bcaa39efcb0f8010ea9 (patch)
treec3200de90517f0f1d20f3400c0d112225696d0ad
parent7695fc5636fd86b6c3eeffe64e9f4949db61d129 (diff)
downloadqpid-python-ff4ec8aea5752b5672919bcaa39efcb0f8010ea9.tar.gz
cluster: Increase initial estimate controlling writes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@728072 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h5
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/WriteEstimate.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/WriteEstimate.h9
7 files changed, 29 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index aac5bc1dd8..dd9de68bf5 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -83,7 +83,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) :
broker(b),
poller(b.getPoller()),
cpg(*this),
@@ -91,6 +91,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
myUrl(url_),
myId(cpg.self()),
readMax(readMax_),
+ writeEstimate(writeEstimate_),
cpgDispatchHandle(
cpg,
boost::bind(&Cluster::dispatch, this, _1), // read
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index f962f4c72f..b8fe61bf15 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -69,7 +69,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
/**
* Join a cluster.
*/
- Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, size_t readMax);
+ Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
+ size_t readMax, size_t writeEstimate);
virtual ~Cluster();
@@ -95,6 +96,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void checkQuorum(); // called in connection threads.
size_t getReadMax() { return readMax; }
+ size_t getWriteEstimate() { return writeEstimate; }
private:
typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
@@ -181,6 +183,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const Url myUrl;
const MemberId myId;
const size_t readMax;
+ const size_t writeEstimate;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 320d0bc778..0f4944d392 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -41,10 +41,10 @@ struct ClusterValues {
string name;
string url;
bool quorum;
- size_t readMax;
+ size_t readMax, writeEstimate;
// FIXME aconway 2008-12-09: revisit default.
- ClusterValues() : quorum(false), readMax(0) {}
+ ClusterValues() : quorum(false), readMax(0), writeEstimate(64) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -68,7 +68,10 @@ struct ClusterOptions : public Options {
#if HAVE_LIBCMAN
("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
- ("cluster-read-max", optValue(values.readMax,"N"), "Throttle read rate from client connections.")
+ ("cluster-read-max", optValue(values.readMax,"N"),
+ "Throttle read rate from client connections.")
+ ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
+ "Estimate connection write rate per multicast cycle")
;
}
};
@@ -88,7 +91,7 @@ struct ClusterPlugin : public Plugin {
if (values.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax);
+ cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax, values.writeEstimate*1024);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index ae2a040ef3..075023caea 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -32,8 +32,10 @@ namespace cluster {
using namespace framing;
-OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h)
- : parent(p), next(&h), sent(), moreOutput(), doingOutput()
+OutputInterceptor::OutputInterceptor(
+ cluster::Connection& p, sys::ConnectionOutputHandler& h)
+ : parent(p), next(&h), sent(), writeEstimate(p.getCluster().getWriteEstimate()),
+ moreOutput(), doingOutput()
{}
void OutputInterceptor::send(framing::AMQFrame& f) {
@@ -69,7 +71,7 @@ bool OutputInterceptor::doOutput() {
void OutputInterceptor::deliverDoOutput(size_t requested) {
size_t buf = next->getBuffered();
if (parent.isLocal())
- writeEstimate.delivered(sent, buf); // Update the estimate.
+ writeEstimate.delivered(requested, sent, buf); // Update the estimate.
// Run the real doOutput() till we have added the requested data or there's nothing to output.
sent = 0;
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
index 783a443228..0ac15e747a 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -64,6 +64,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
mutable sys::Mutex lock;
sys::ConnectionOutputHandler* next;
size_t sent;
+ size_t lastDoOutput;
WriteEstimate writeEstimate;
bool moreOutput;
bool doingOutput;
diff --git a/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp b/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
index 81131be437..4d840947f3 100644
--- a/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
+++ b/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
@@ -27,7 +27,7 @@ namespace qpid {
namespace cluster {
WriteEstimate::WriteEstimate(size_t initial)
- : growing(true), estimate(initial) {}
+ : growing(true), estimate(initial), lastEstimate(initial) {}
size_t WriteEstimate::sending(size_t buffered) {
// We want to send a doOutput request for enough data such
@@ -42,7 +42,8 @@ size_t WriteEstimate::sending(size_t buffered) {
size_t pad(size_t value) { return value + value/2; }
-void WriteEstimate::delivered(size_t sent, size_t buffered) {
+void WriteEstimate::delivered(size_t last, size_t sent, size_t buffered) {
+ lastEstimate = last;
size_t wrote = sent > buffered ? sent - buffered : 0;
if (wrote == 0) // No change
return;
diff --git a/qpid/cpp/src/qpid/cluster/WriteEstimate.h b/qpid/cpp/src/qpid/cluster/WriteEstimate.h
index 01ab2a3e34..97b1435fcc 100644
--- a/qpid/cpp/src/qpid/cluster/WriteEstimate.h
+++ b/qpid/cpp/src/qpid/cluster/WriteEstimate.h
@@ -51,12 +51,17 @@ class WriteEstimate
* doOutput request just delivered, not yet executed. Update the estimate.
* and estimate how much data to request in the next onOutput
* request. 0 means don't send an onOutput request.
+ *
+ * @param delivered value in doOutput control.
*/
- void delivered(size_t sent, size_t buffered);
+ void delivered(size_t delivered, size_t sent, size_t buffered);
+
+ /** Last estimate delivered, i.e. known to cluster */
+ size_t getLastEstimate() const { return estimate; }
private:
bool growing;
- size_t estimate;
+ size_t estimate, lastEstimate;
};
}} // namespace qpid::cluster