summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/cluster.mk2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Cluster.h7
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp4
-rw-r--r--cpp/src/qpid/cluster/ClusterSettings.h4
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp46
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.h8
-rw-r--r--cpp/src/qpid/cluster/WriteEstimate.cpp64
-rw-r--r--cpp/src/qpid/cluster/WriteEstimate.h69
11 files changed, 37 insertions, 175 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index fdac229646..b7ee61e180 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -78,8 +78,6 @@ cluster_la_SOURCES = \
qpid/cluster/PollerDispatch.h \
qpid/cluster/ProxyInputHandler.h \
qpid/cluster/Quorum.h \
- qpid/cluster/WriteEstimate.cpp \
- qpid/cluster/WriteEstimate.h \
qpid/cluster/types.h
cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index a17f54078c..221f12990e 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -96,8 +96,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
name(settings.name),
myUrl(settings.url.empty() ? Url() : Url(settings.url)),
self(cpg.self()),
- readMax(settings.readMax),
- writeEstimate(settings.writeEstimate),
expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 8a94fc79dd..bd401f3715 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -101,8 +101,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void checkQuorum();
- size_t getReadMax() { return readMax; }
- size_t getWriteEstimate() { return writeEstimate; }
+ const ClusterSettings& getSettings() const { return settings; }
void deliverFrame(const EventFrame&);
@@ -192,7 +191,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void updateOutDone(Lock&);
// Immutable members set on construction, never changed.
- ClusterSettings settings;
+ const ClusterSettings settings;
broker::Broker& broker;
qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
boost::shared_ptr<sys::Poller> poller;
@@ -200,8 +199,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const std::string name;
Url myUrl;
const MemberId self;
- const size_t readMax;
- const size_t writeEstimate;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
qpid::management::ManagementAgent* mAgent;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index adb6621caf..0067de8ec1 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -74,7 +74,9 @@ struct ClusterOptions : public Options {
("cluster-read-max", optValue(settings.readMax,"N"),
"Experimental: Limit per-client-connection queue of read buffers. 0=no limit.")
("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"),
- "Experimental: initial estimate for connection write rate per multicast cycle")
+ "Experimental: initial estimate for write rate per multicast cycle")
+ ("cluster-write-min", optValue(settings.writeMin, "Kb"),
+ "Experimental: minimum estimate for write rate per multicast cycle")
;
}
};
diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h
index 88e8829dfe..6b4a0b531c 100644
--- a/cpp/src/qpid/cluster/ClusterSettings.h
+++ b/cpp/src/qpid/cluster/ClusterSettings.h
@@ -32,10 +32,10 @@ struct ClusterSettings {
std::string name;
std::string url;
bool quorum;
- size_t readMax, writeEstimate;
+ size_t readMax, writeEstimate, writeMin;
std::string username, password, mechanism;
- ClusterSettings() : quorum(false), readMax(10), writeEstimate(64) {}
+ ClusterSettings() : quorum(false), readMax(10), writeEstimate(1), writeMin(1) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 97cafbabaa..c107552905 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -89,7 +89,7 @@ void Connection::init() {
if (isLocalClient()) {
connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
cluster.addLocalConnection(this);
- giveReadCredit(cluster.getReadMax());
+ giveReadCredit(cluster.getSettings().readMax);
}
else { // Shadow or catch-up connection
connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
@@ -100,7 +100,7 @@ void Connection::init() {
}
void Connection::giveReadCredit(int credit) {
- if (cluster.getReadMax() && credit)
+ if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 414e5c935f..499d515d73 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -23,7 +23,6 @@
*/
#include "types.h"
-#include "WriteEstimate.h"
#include "OutputInterceptor.h"
#include "EventFrame.h"
#include "McastFrameHandler.h"
@@ -178,7 +177,6 @@ class Connection :
Cluster& cluster;
ConnectionId self;
bool catchUp;
- WriteEstimate writeEstimate;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
broker::Connection connection;
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 6af114a662..9062edc846 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -36,13 +36,16 @@ NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
: parent(p), closing(false), next(&h), sent(),
- writeEstimate(p.getCluster().getWriteEstimate()),
+ estimate(p.getCluster().getSettings().writeEstimate*1024),
+ minimum(p.getCluster().getSettings().writeMin*1024),
moreOutput(), doingOutput()
{}
void OutputInterceptor::send(framing::AMQFrame& f) {
parent.getCluster().checkQuorum();
{
+ // FIXME aconway 2009-04-28: locking around next-> may be redundant
+ // with the fixes to read-credit in the IO layer. Review.
sys::Mutex::ScopedLock l(lock);
next->send(f);
}
@@ -58,7 +61,7 @@ void OutputInterceptor::activateOutput() {
else if (!closing) { // Don't send do ouput after output stopped.
QPID_LOG(trace, parent << " activateOutput - sending doOutput");
moreOutput = true;
- sendDoOutput();
+ sendDoOutput(estimate);
}
}
@@ -77,36 +80,35 @@ bool OutputInterceptor::doOutput() { return false; }
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
size_t buf = getBuffered();
- if (parent.isLocal())
- writeEstimate.delivered(requested, sent, buf); // Update the estimate.
-
- // Run the real doOutput() till we have added the requested data or there's nothing to output.
+ if (parent.isLocal()) { // Adjust estimate for next sendDoOutput
+ sent = sent > buf ? sent - buf : 0; // Buffered data was not sent.
+ if (buf > 0) // Wrote to capacity, move estimate towards sent.
+ estimate = (estimate + sent) /2;
+ else if (sent >= estimate) // Last estimate was too small, increase it.
+ estimate *= 2;
+ if (estimate < minimum) estimate = minimum;
+ }
+ // Run the real doOutput() till we have added the requested data
+ // or there's nothing to output. Record how much we send.
sent = 0;
do {
moreOutput = parent.getBrokerConnection().doOutput();
} while (sent < requested && moreOutput);
- sent += buf; // Include buffered data in the sent total.
- QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput);
- if (parent.isLocal() && moreOutput) {
- QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available.");
- sendDoOutput();
- }
- else
+ sent += buf; // Include data previously in the buffer
+
+ if (parent.isLocal()) {
+ // Send the next doOutput request
doingOutput = false;
+ sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer?
+ }
}
// Send a doOutput request if one is not already in flight.
-void OutputInterceptor::sendDoOutput() {
- if (!parent.isLocal()) return;
+void OutputInterceptor::sendDoOutput(size_t request) {
+ if (!parent.isLocal() || doingOutput || !moreOutput) return;
doingOutput = true;
- size_t request = writeEstimate.sending(getBuffered());
-
- // Note we may send 0 size request if there's more than 2*estimate in the buffer.
- // Send it anyway to keep the doOutput chain going until we are sure there's no more output
- // (in deliverDoOutput)
- //
parent.getCluster().getMulticast().mcastControl(
- ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId());
+ ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId());
QPID_LOG(trace, parent << "Send doOutput request for " << request);
}
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h
index 61e246bb89..ea603dbe06 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -22,9 +22,9 @@
*
*/
-#include "WriteEstimate.h"
#include "NoOpConnectionOutputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/broker/ConnectionFactory.h"
#include <boost/function.hpp>
@@ -60,14 +60,14 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
private:
typedef sys::Mutex::ScopedLock Locker;
- void sendDoOutput();
+ void sendDoOutput(size_t);
mutable sys::Mutex lock;
bool closing;
sys::ConnectionOutputHandler* next;
size_t sent;
- size_t lastDoOutput;
- WriteEstimate writeEstimate;
+ size_t estimate;
+ size_t minimum;
bool moreOutput;
bool doingOutput;
static NoOpConnectionOutputHandler discardHandler;
diff --git a/cpp/src/qpid/cluster/WriteEstimate.cpp b/cpp/src/qpid/cluster/WriteEstimate.cpp
deleted file mode 100644
index 4d840947f3..0000000000
--- a/cpp/src/qpid/cluster/WriteEstimate.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "WriteEstimate.h"
-#include "qpid/log/Statement.h"
-#include <boost/current_function.hpp>
-
-namespace qpid {
-namespace cluster {
-
-WriteEstimate::WriteEstimate(size_t initial)
- : growing(true), estimate(initial), lastEstimate(initial) {}
-
-size_t WriteEstimate::sending(size_t buffered) {
- // We want to send a doOutput request for enough data such
- // that if estimate bytes are written before it is self
- // delivered then what is left in the buffer plus the doOutput
- // request will be estimate bytes.
-
- size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0;
- size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0;
- return request;
-}
-
-size_t pad(size_t value) { return value + value/2; }
-
-void WriteEstimate::delivered(size_t last, size_t sent, size_t buffered) {
- lastEstimate = last;
- size_t wrote = sent > buffered ? sent - buffered : 0;
- if (wrote == 0) // No change
- return;
- if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity.
- growing = false;
- estimate = pad(wrote); // Estimate at 1.5 write for padding.
- }
- else if (wrote > estimate) { // Wrote everything, buffer was under-stocked
- if (growing)
- estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock.
- else
- estimate = pad(wrote);
- }
-}
-
-}} // namespace qpid::cluster
-
-
diff --git a/cpp/src/qpid/cluster/WriteEstimate.h b/cpp/src/qpid/cluster/WriteEstimate.h
deleted file mode 100644
index 97b1435fcc..0000000000
--- a/cpp/src/qpid/cluster/WriteEstimate.h
+++ /dev/null
@@ -1,69 +0,0 @@
-#ifndef QPID_CLUSTER_WRITEESTIMATE_H
-#define QPID_CLUSTER_WRITEESTIMATE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Mutex.h"
-
-namespace qpid {
-namespace cluster {
-
-/**
- * Estimate the amount of data that a connection can write between sending
- * a doOutput notice and re-receiving it.
- *
- * The goal is to avoid ever write-idling the connection by sending
- * the next doOutput request as soon as we've processed the previous
- * one, such that data generated by the previous request will keep the
- * writer busy till the next one is delivered.
- *
- */
-class WriteEstimate
-{
- public:
- WriteEstimate(size_t initial=4096);
-
- /** About to send a doOutput request.
- * Update estimation state and return size for next request.
- */
- size_t sending(size_t buffered);
-
- /**
- * doOutput request just delivered, not yet executed. Update the estimate.
- * and estimate how much data to request in the next onOutput
- * request. 0 means don't send an onOutput request.
- *
- * @param delivered value in doOutput control.
- */
- void delivered(size_t delivered, size_t sent, size_t buffered);
-
- /** Last estimate delivered, i.e. known to cluster */
- size_t getLastEstimate() const { return estimate; }
-
- private:
- bool growing;
- size_t estimate, lastEstimate;
-};
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_WRITEESTIMATE_H*/