summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-26 21:41:52 +0000
committerAlan Conway <aconway@apache.org>2009-05-26 21:41:52 +0000
commitb9ce6e056f3ec10fc0efc710a29e2b9d60657c27 (patch)
tree7402007454c505975f11f9b889631e5d92db9cf5 /qpid/cpp/src
parente6e47a46b03da1a275cf6646614139f5d9abf513 (diff)
downloadqpid-python-b9ce6e056f3ec10fc0efc710a29e2b9d60657c27.tar.gz
Improved doOutput algorithm.
Simpler & more robust algorithm based on message count rather than byte size. Self-tuning, removes 2 hard-to-explain cluster options. Similar or marginally better performance. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@778896 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp9
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterSettings.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp73
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.h14
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableQueue.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp3
8 files changed, 50 insertions, 74 deletions
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index daed608eb8..c2c07c052a 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -70,13 +70,8 @@ struct ClusterOptions : public Options {
#if HAVE_LIBCMAN_H
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
- ("cluster-read-max", optValue(settings.readMax,"N"),
- "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.")
- ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"),
- "Experimental: initial estimate for write rate per multicast cycle")
- ("cluster-write-min", optValue(settings.writeMin, "Kb"),
- "Experimental: minimum estimate for write rate per multicast cycle")
- // FIXME aconway 2009-05-20: temporary
+ ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
+ // FIXME aconway 2009-05-20: temporary
("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
;
}
diff --git a/qpid/cpp/src/qpid/cluster/ClusterSettings.h b/qpid/cpp/src/qpid/cluster/ClusterSettings.h
index 8ae77cd807..c82be9d227 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterSettings.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterSettings.h
@@ -32,11 +32,11 @@ struct ClusterSettings {
std::string name;
std::string url;
bool quorum;
- size_t readMax, writeEstimate, writeMin;
+ size_t readMax;
std::string username, password, mechanism;
bool checkErrors;
- ClusterSettings() : quorum(false), readMax(10), writeEstimate(1), writeMin(1),
+ ClusterSettings() : quorum(false), readMax(10),
checkErrors(true) // FIXME aconway 2009-05-20: temporary
{}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index cc0af77029..dda4b5435b 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -113,14 +113,6 @@ bool Connection::doOutput() {
return output.doOutput();
}
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which stocks up the write buffers with data.
-//
-void Connection::deliverDoOutput(uint32_t requested) {
- assert(!catchUp);
- output.deliverDoOutput(requested);
-}
-
// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
@@ -279,7 +271,7 @@ void Connection::sessionState(
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
ConnectionId shadowId = ConnectionId(memberId, connectionId);
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
@@ -287,6 +279,7 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
// OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
connection.setErrorListener(this);
+ output.setSendMax(sendMax);
}
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 969d191bd7..a0be2203e4 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -115,7 +115,7 @@ class Connection :
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
+ void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
@@ -150,6 +150,8 @@ class Connection :
void deliverClose();
+ OutputInterceptor& getOutput() { return output; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -164,8 +166,7 @@ class Connection :
void init();
bool checkUnsupported(const framing::AMQBody& body);
- void deliverDoOutput(uint32_t requested);
- void sendDoOutput();
+ void deliverDoOutput(uint32_t limit) { output.deliverDoOutput(limit); }
boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
broker::SessionState& sessionState();
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
index a7ec82128b..f898957351 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -32,17 +32,17 @@ namespace qpid {
namespace cluster {
using namespace framing;
+using namespace std;
NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
- : parent(p), closing(false), next(&h), sent(),
- estimate(p.getCluster().getSettings().writeEstimate*1024),
- minimum(p.getCluster().getSettings().writeMin*1024),
- moreOutput(), doingOutput()
+ : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false)
{}
-LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;)
+#if defined QPID_LATENCY_TRACKER
+extern sys::LatencyTracker<const AMQBody*> doOutputTracker;
+#endif
void OutputInterceptor::send(framing::AMQFrame& f) {
LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
@@ -53,8 +53,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
sys::Mutex::ScopedLock l(lock);
next->send(f);
}
- if (!parent.isCatchUp())
- sent += f.encodedSize();
}
void OutputInterceptor::activateOutput() {
@@ -62,11 +60,8 @@ void OutputInterceptor::activateOutput() {
sys::Mutex::ScopedLock l(lock);
next->activateOutput();
}
- else if (!closing) { // Don't send do ouput after output stopped.
- QPID_LOG(trace, parent << " activateOutput - sending doOutput");
- moreOutput = true;
- sendDoOutput(estimate);
- }
+ else
+ sendDoOutput(sendMax);
}
void OutputInterceptor::giveReadCredit(int32_t credit) {
@@ -77,43 +72,33 @@ void OutputInterceptor::giveReadCredit(int32_t credit) {
// Called in write thread when the IO layer has no more data to write.
// We do nothing in the write thread, we run doOutput only on delivery
// of doOutput requests.
-bool OutputInterceptor::doOutput() { return false; }
-
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which tranfers frames to the codec for writing.
-//
-void OutputInterceptor::deliverDoOutput(size_t requested) {
- size_t buf = getBuffered();
- if (parent.isLocal()) { // Adjust estimate for next sendDoOutput
- sent = sent > buf ? sent - buf : 0; // Buffered data was not sent.
- if (buf > 0) // Wrote to capacity, move estimate towards sent.
- estimate = (estimate + sent) /2;
- else if (sent >= estimate) // Last estimate was too small, increase it.
- estimate *= 2;
- if (estimate < minimum) estimate = minimum;
- }
- // Run the real doOutput() till we have added the requested data
- // or there's nothing to output. Record how much we send.
- sent = 0;
- do {
- moreOutput = parent.getBrokerConnection().doOutput();
- } while (sent < requested && moreOutput);
- sent += buf; // Include data previously in the buffer
+bool OutputInterceptor::doOutput() { return false; }
+// Send output up to limit, calculate new limit.
+void OutputInterceptor::deliverDoOutput(uint32_t limit) {
+ sentDoOutput = false;
+ sendMax = limit;
+ size_t newLimit = limit;
if (parent.isLocal()) {
- // Send the next doOutput request
- doingOutput = false;
- sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer?
+ size_t buffered = getBuffered();
+ if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
+ newLimit = sendMax*2;
+ else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
+ newLimit = sent-1;
}
+ sent = 0;
+ while (sent < limit && parent.getBrokerConnection().doOutput())
+ ++sent;
+ if (sent == limit) sendDoOutput(newLimit);
}
-// Send a doOutput request if one is not already in flight.
-void OutputInterceptor::sendDoOutput(size_t request) {
- if (!parent.isLocal() || doingOutput || !moreOutput) return;
- doingOutput = true;
- parent.getCluster().getMulticast().mcastControl(
- ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId());
- QPID_LOG(trace, parent << "Send doOutput request for " << request);
+void OutputInterceptor::sendDoOutput(size_t newLimit) {
+ if (parent.isLocal() && !sentDoOutput && !closing) {
+ sentDoOutput = true;
+ parent.getCluster().getMulticast().mcastControl(
+ ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
+ parent.getId());
+ }
}
void OutputInterceptor::closeOutput() {
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
index ea603dbe06..7d6c718b82 100644
--- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -49,28 +49,28 @@ class OutputInterceptor : public sys::ConnectionOutputHandler {
size_t getBuffered() const;
// Delivery point for doOutput requests.
- void deliverDoOutput(size_t requested);
+ void deliverDoOutput(uint32_t limit);
// Intercept doOutput requests on Connection.
bool doOutput();
void closeOutput();
+ uint32_t getSendMax() const { return sendMax; }
+ void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; }
+
cluster::Connection& parent;
private:
typedef sys::Mutex::ScopedLock Locker;
- void sendDoOutput(size_t);
+ void sendDoOutput(size_t newLimit);
mutable sys::Mutex lock;
bool closing;
sys::ConnectionOutputHandler* next;
- size_t sent;
- size_t estimate;
- size_t minimum;
- bool moreOutput;
- bool doingOutput;
static NoOpConnectionOutputHandler discardHandler;
+ uint32_t sendMax, sent;
+ bool sentDoOutput;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
index db98faa9da..65f615d8b6 100644
--- a/qpid/cpp/src/qpid/cluster/PollableQueue.h
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -41,7 +41,8 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
const boost::shared_ptr<sys::Poller>& poller)
: sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
poller),
- callback(f), error(err), message(msg) {}
+ callback(f), error(err), message(msg)
+ {}
typename sys::PollableQueue<T>::Batch::const_iterator
handleBatch(const typename sys::PollableQueue<T>::Batch& values) {
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index bb4df8890a..edd83463d2 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -253,7 +253,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
bc.getUserId(),
- string(fragment.first, fragment.second)
+ string(fragment.first, fragment.second),
+ updateConnection->getOutput().getSendMax()
);
shadowConnection.close();
QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);