From ff4ec8aea5752b5672919bcaa39efcb0f8010ea9 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 19 Dec 2008 17:22:52 +0000 Subject: cluster: Increase initial estimate controlling writes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@728072 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/cluster/Cluster.cpp | 3 ++- qpid/cpp/src/qpid/cluster/Cluster.h | 5 ++++- qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp | 11 +++++++---- qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp | 8 +++++--- qpid/cpp/src/qpid/cluster/OutputInterceptor.h | 1 + qpid/cpp/src/qpid/cluster/WriteEstimate.cpp | 5 +++-- qpid/cpp/src/qpid/cluster/WriteEstimate.h | 9 +++++++-- 7 files changed, 29 insertions(+), 13 deletions(-) diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index aac5bc1dd8..dd9de68bf5 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -83,7 +83,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; -Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_) : +Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) : broker(b), poller(b.getPoller()), cpg(*this), @@ -91,6 +91,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b myUrl(url_), myId(cpg.self()), readMax(readMax_), + writeEstimate(writeEstimate_), cpgDispatchHandle( cpg, boost::bind(&Cluster::dispatch, this, _1), // read diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index f962f4c72f..b8fe61bf15 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -69,7 +69,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { /** * Join a cluster. */ - Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, size_t readMax); + Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, + size_t readMax, size_t writeEstimate); virtual ~Cluster(); @@ -95,6 +96,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void checkQuorum(); // called in connection threads. size_t getReadMax() { return readMax; } + size_t getWriteEstimate() { return writeEstimate; } private: typedef sys::LockPtr LockPtr; @@ -181,6 +183,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { const Url myUrl; const MemberId myId; const size_t readMax; + const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index 320d0bc778..0f4944d392 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -41,10 +41,10 @@ struct ClusterValues { string name; string url; bool quorum; - size_t readMax; + size_t readMax, writeEstimate; // FIXME aconway 2008-12-09: revisit default. - ClusterValues() : quorum(false), readMax(0) {} + ClusterValues() : quorum(false), readMax(0), writeEstimate(64) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); @@ -68,7 +68,10 @@ struct ClusterOptions : public Options { #if HAVE_LIBCMAN ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif - ("cluster-read-max", optValue(values.readMax,"N"), "Throttle read rate from client connections.") + ("cluster-read-max", optValue(values.readMax,"N"), + "Throttle read rate from client connections.") + ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"), + "Estimate connection write rate per multicast cycle") ; } }; @@ -88,7 +91,7 @@ struct ClusterPlugin : public Plugin { if (values.name.empty()) return; // Only if --cluster-name option was specified. Broker* broker = dynamic_cast(&target); if (!broker) return; - cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax); + cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax, values.writeEstimate*1024); broker->setConnectionFactory( boost::shared_ptr( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index ae2a040ef3..075023caea 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -32,8 +32,10 @@ namespace cluster { using namespace framing; -OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h) - : parent(p), next(&h), sent(), moreOutput(), doingOutput() +OutputInterceptor::OutputInterceptor( + cluster::Connection& p, sys::ConnectionOutputHandler& h) + : parent(p), next(&h), sent(), writeEstimate(p.getCluster().getWriteEstimate()), + moreOutput(), doingOutput() {} void OutputInterceptor::send(framing::AMQFrame& f) { @@ -69,7 +71,7 @@ bool OutputInterceptor::doOutput() { void OutputInterceptor::deliverDoOutput(size_t requested) { size_t buf = next->getBuffered(); if (parent.isLocal()) - writeEstimate.delivered(sent, buf); // Update the estimate. + writeEstimate.delivered(requested, sent, buf); // Update the estimate. // Run the real doOutput() till we have added the requested data or there's nothing to output. sent = 0; diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h index 783a443228..0ac15e747a 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h @@ -64,6 +64,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { mutable sys::Mutex lock; sys::ConnectionOutputHandler* next; size_t sent; + size_t lastDoOutput; WriteEstimate writeEstimate; bool moreOutput; bool doingOutput; diff --git a/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp b/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp index 81131be437..4d840947f3 100644 --- a/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp +++ b/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp @@ -27,7 +27,7 @@ namespace qpid { namespace cluster { WriteEstimate::WriteEstimate(size_t initial) - : growing(true), estimate(initial) {} + : growing(true), estimate(initial), lastEstimate(initial) {} size_t WriteEstimate::sending(size_t buffered) { // We want to send a doOutput request for enough data such @@ -42,7 +42,8 @@ size_t WriteEstimate::sending(size_t buffered) { size_t pad(size_t value) { return value + value/2; } -void WriteEstimate::delivered(size_t sent, size_t buffered) { +void WriteEstimate::delivered(size_t last, size_t sent, size_t buffered) { + lastEstimate = last; size_t wrote = sent > buffered ? sent - buffered : 0; if (wrote == 0) // No change return; diff --git a/qpid/cpp/src/qpid/cluster/WriteEstimate.h b/qpid/cpp/src/qpid/cluster/WriteEstimate.h index 01ab2a3e34..97b1435fcc 100644 --- a/qpid/cpp/src/qpid/cluster/WriteEstimate.h +++ b/qpid/cpp/src/qpid/cluster/WriteEstimate.h @@ -51,12 +51,17 @@ class WriteEstimate * doOutput request just delivered, not yet executed. Update the estimate. * and estimate how much data to request in the next onOutput * request. 0 means don't send an onOutput request. + * + * @param delivered value in doOutput control. */ - void delivered(size_t sent, size_t buffered); + void delivered(size_t delivered, size_t sent, size_t buffered); + + /** Last estimate delivered, i.e. known to cluster */ + size_t getLastEstimate() const { return estimate; } private: bool growing; - size_t estimate; + size_t estimate, lastEstimate; }; }} // namespace qpid::cluster -- cgit v1.2.1