diff options
-rw-r--r-- | cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterSettings.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 46 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/WriteEstimate.cpp | 64 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/WriteEstimate.h | 69 |
11 files changed, 37 insertions, 175 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index fdac229646..b7ee61e180 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -78,8 +78,6 @@ cluster_la_SOURCES = \ qpid/cluster/PollerDispatch.h \ qpid/cluster/ProxyInputHandler.h \ qpid/cluster/Quorum.h \ - qpid/cluster/WriteEstimate.cpp \ - qpid/cluster/WriteEstimate.h \ qpid/cluster/types.h cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index a17f54078c..221f12990e 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -96,8 +96,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : name(settings.name), myUrl(settings.url.empty() ? Url() : Url(settings.url)), self(cpg.self()), - readMax(settings.readMax), - writeEstimate(settings.writeEstimate), expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 8a94fc79dd..bd401f3715 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -101,8 +101,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void checkQuorum(); - size_t getReadMax() { return readMax; } - size_t getWriteEstimate() { return writeEstimate; } + const ClusterSettings& getSettings() const { return settings; } void deliverFrame(const EventFrame&); @@ -192,7 +191,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateOutDone(Lock&); // Immutable members set on construction, never changed. - ClusterSettings settings; + const ClusterSettings settings; broker::Broker& broker; qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle boost::shared_ptr<sys::Poller> poller; @@ -200,8 +199,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { const std::string name; Url myUrl; const MemberId self; - const size_t readMax; - const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; qpid::management::ManagementAgent* mAgent; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index adb6621caf..0067de8ec1 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -74,7 +74,9 @@ struct ClusterOptions : public Options { ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.") ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"), - "Experimental: initial estimate for connection write rate per multicast cycle") + "Experimental: initial estimate for write rate per multicast cycle") + ("cluster-write-min", optValue(settings.writeMin, "Kb"), + "Experimental: minimum estimate for write rate per multicast cycle") ; } }; diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h index 88e8829dfe..6b4a0b531c 100644 --- a/cpp/src/qpid/cluster/ClusterSettings.h +++ b/cpp/src/qpid/cluster/ClusterSettings.h @@ -32,10 +32,10 @@ struct ClusterSettings { std::string name; std::string url; bool quorum; - size_t readMax, writeEstimate; + size_t readMax, writeEstimate, writeMin; std::string username, password, mechanism; - ClusterSettings() : quorum(false), readMax(10), writeEstimate(64) {} + ClusterSettings() : quorum(false), readMax(10), writeEstimate(1), writeMin(1) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 97cafbabaa..c107552905 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -89,7 +89,7 @@ void Connection::init() { if (isLocalClient()) { connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node cluster.addLocalConnection(this); - giveReadCredit(cluster.getReadMax()); + giveReadCredit(cluster.getSettings().readMax); } else { // Shadow or catch-up connection connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames @@ -100,7 +100,7 @@ void Connection::init() { } void Connection::giveReadCredit(int credit) { - if (cluster.getReadMax() && credit) + if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 414e5c935f..499d515d73 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -23,7 +23,6 @@ */ #include "types.h" -#include "WriteEstimate.h" #include "OutputInterceptor.h" #include "EventFrame.h" #include "McastFrameHandler.h" @@ -178,7 +177,6 @@ class Connection : Cluster& cluster; ConnectionId self; bool catchUp; - WriteEstimate writeEstimate; OutputInterceptor output; framing::FrameDecoder localDecoder; broker::Connection connection; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 6af114a662..9062edc846 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -36,13 +36,16 @@ NoOpConnectionOutputHandler OutputInterceptor::discardHandler; OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) : parent(p), closing(false), next(&h), sent(), - writeEstimate(p.getCluster().getWriteEstimate()), + estimate(p.getCluster().getSettings().writeEstimate*1024), + minimum(p.getCluster().getSettings().writeMin*1024), moreOutput(), doingOutput() {} void OutputInterceptor::send(framing::AMQFrame& f) { parent.getCluster().checkQuorum(); { + // FIXME aconway 2009-04-28: locking around next-> may be redundant + // with the fixes to read-credit in the IO layer. Review. sys::Mutex::ScopedLock l(lock); next->send(f); } @@ -58,7 +61,7 @@ void OutputInterceptor::activateOutput() { else if (!closing) { // Don't send do ouput after output stopped. QPID_LOG(trace, parent << " activateOutput - sending doOutput"); moreOutput = true; - sendDoOutput(); + sendDoOutput(estimate); } } @@ -77,36 +80,35 @@ bool OutputInterceptor::doOutput() { return false; } // void OutputInterceptor::deliverDoOutput(size_t requested) { size_t buf = getBuffered(); - if (parent.isLocal()) - writeEstimate.delivered(requested, sent, buf); // Update the estimate. - - // Run the real doOutput() till we have added the requested data or there's nothing to output. + if (parent.isLocal()) { // Adjust estimate for next sendDoOutput + sent = sent > buf ? sent - buf : 0; // Buffered data was not sent. + if (buf > 0) // Wrote to capacity, move estimate towards sent. + estimate = (estimate + sent) /2; + else if (sent >= estimate) // Last estimate was too small, increase it. + estimate *= 2; + if (estimate < minimum) estimate = minimum; + } + // Run the real doOutput() till we have added the requested data + // or there's nothing to output. Record how much we send. sent = 0; do { moreOutput = parent.getBrokerConnection().doOutput(); } while (sent < requested && moreOutput); - sent += buf; // Include buffered data in the sent total. - QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput); - if (parent.isLocal() && moreOutput) { - QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available."); - sendDoOutput(); - } - else + sent += buf; // Include data previously in the buffer + + if (parent.isLocal()) { + // Send the next doOutput request doingOutput = false; + sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer? + } } // Send a doOutput request if one is not already in flight. -void OutputInterceptor::sendDoOutput() { - if (!parent.isLocal()) return; +void OutputInterceptor::sendDoOutput(size_t request) { + if (!parent.isLocal() || doingOutput || !moreOutput) return; doingOutput = true; - size_t request = writeEstimate.sending(getBuffered()); - - // Note we may send 0 size request if there's more than 2*estimate in the buffer. - // Send it anyway to keep the doOutput chain going until we are sure there's no more output - // (in deliverDoOutput) - // parent.getCluster().getMulticast().mcastControl( - ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId()); + ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId()); QPID_LOG(trace, parent << "Send doOutput request for " << request); } diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h index 61e246bb89..ea603dbe06 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/cpp/src/qpid/cluster/OutputInterceptor.h @@ -22,9 +22,9 @@ * */ -#include "WriteEstimate.h" #include "NoOpConnectionOutputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/Mutex.h" #include "qpid/broker/ConnectionFactory.h" #include <boost/function.hpp> @@ -60,14 +60,14 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { private: typedef sys::Mutex::ScopedLock Locker; - void sendDoOutput(); + void sendDoOutput(size_t); mutable sys::Mutex lock; bool closing; sys::ConnectionOutputHandler* next; size_t sent; - size_t lastDoOutput; - WriteEstimate writeEstimate; + size_t estimate; + size_t minimum; bool moreOutput; bool doingOutput; static NoOpConnectionOutputHandler discardHandler; diff --git a/cpp/src/qpid/cluster/WriteEstimate.cpp b/cpp/src/qpid/cluster/WriteEstimate.cpp deleted file mode 100644 index 4d840947f3..0000000000 --- a/cpp/src/qpid/cluster/WriteEstimate.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "WriteEstimate.h" -#include "qpid/log/Statement.h" -#include <boost/current_function.hpp> - -namespace qpid { -namespace cluster { - -WriteEstimate::WriteEstimate(size_t initial) - : growing(true), estimate(initial), lastEstimate(initial) {} - -size_t WriteEstimate::sending(size_t buffered) { - // We want to send a doOutput request for enough data such - // that if estimate bytes are written before it is self - // delivered then what is left in the buffer plus the doOutput - // request will be estimate bytes. - - size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0; - size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0; - return request; -} - -size_t pad(size_t value) { return value + value/2; } - -void WriteEstimate::delivered(size_t last, size_t sent, size_t buffered) { - lastEstimate = last; - size_t wrote = sent > buffered ? sent - buffered : 0; - if (wrote == 0) // No change - return; - if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity. - growing = false; - estimate = pad(wrote); // Estimate at 1.5 write for padding. - } - else if (wrote > estimate) { // Wrote everything, buffer was under-stocked - if (growing) - estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock. - else - estimate = pad(wrote); - } -} - -}} // namespace qpid::cluster - - diff --git a/cpp/src/qpid/cluster/WriteEstimate.h b/cpp/src/qpid/cluster/WriteEstimate.h deleted file mode 100644 index 97b1435fcc..0000000000 --- a/cpp/src/qpid/cluster/WriteEstimate.h +++ /dev/null @@ -1,69 +0,0 @@ -#ifndef QPID_CLUSTER_WRITEESTIMATE_H -#define QPID_CLUSTER_WRITEESTIMATE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Mutex.h" - -namespace qpid { -namespace cluster { - -/** - * Estimate the amount of data that a connection can write between sending - * a doOutput notice and re-receiving it. - * - * The goal is to avoid ever write-idling the connection by sending - * the next doOutput request as soon as we've processed the previous - * one, such that data generated by the previous request will keep the - * writer busy till the next one is delivered. - * - */ -class WriteEstimate -{ - public: - WriteEstimate(size_t initial=4096); - - /** About to send a doOutput request. - * Update estimation state and return size for next request. - */ - size_t sending(size_t buffered); - - /** - * doOutput request just delivered, not yet executed. Update the estimate. - * and estimate how much data to request in the next onOutput - * request. 0 means don't send an onOutput request. - * - * @param delivered value in doOutput control. - */ - void delivered(size_t delivered, size_t sent, size_t buffered); - - /** Last estimate delivered, i.e. known to cluster */ - size_t getLastEstimate() const { return estimate; } - - private: - bool growing; - size_t estimate, lastEstimate; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_WRITEESTIMATE_H*/ |