summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/OutputInterceptor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/OutputInterceptor.cpp')
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp46
1 files changed, 24 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 6af114a662..9062edc846 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -36,13 +36,16 @@ NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
: parent(p), closing(false), next(&h), sent(),
- writeEstimate(p.getCluster().getWriteEstimate()),
+ estimate(p.getCluster().getSettings().writeEstimate*1024),
+ minimum(p.getCluster().getSettings().writeMin*1024),
moreOutput(), doingOutput()
{}
void OutputInterceptor::send(framing::AMQFrame& f) {
parent.getCluster().checkQuorum();
{
+ // FIXME aconway 2009-04-28: locking around next-> may be redundant
+ // with the fixes to read-credit in the IO layer. Review.
sys::Mutex::ScopedLock l(lock);
next->send(f);
}
@@ -58,7 +61,7 @@ void OutputInterceptor::activateOutput() {
else if (!closing) { // Don't send do ouput after output stopped.
QPID_LOG(trace, parent << " activateOutput - sending doOutput");
moreOutput = true;
- sendDoOutput();
+ sendDoOutput(estimate);
}
}
@@ -77,36 +80,35 @@ bool OutputInterceptor::doOutput() { return false; }
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
size_t buf = getBuffered();
- if (parent.isLocal())
- writeEstimate.delivered(requested, sent, buf); // Update the estimate.
-
- // Run the real doOutput() till we have added the requested data or there's nothing to output.
+ if (parent.isLocal()) { // Adjust estimate for next sendDoOutput
+ sent = sent > buf ? sent - buf : 0; // Buffered data was not sent.
+ if (buf > 0) // Wrote to capacity, move estimate towards sent.
+ estimate = (estimate + sent) /2;
+ else if (sent >= estimate) // Last estimate was too small, increase it.
+ estimate *= 2;
+ if (estimate < minimum) estimate = minimum;
+ }
+ // Run the real doOutput() till we have added the requested data
+ // or there's nothing to output. Record how much we send.
sent = 0;
do {
moreOutput = parent.getBrokerConnection().doOutput();
} while (sent < requested && moreOutput);
- sent += buf; // Include buffered data in the sent total.
- QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput);
- if (parent.isLocal() && moreOutput) {
- QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available.");
- sendDoOutput();
- }
- else
+ sent += buf; // Include data previously in the buffer
+
+ if (parent.isLocal()) {
+ // Send the next doOutput request
doingOutput = false;
+ sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer?
+ }
}
// Send a doOutput request if one is not already in flight.
-void OutputInterceptor::sendDoOutput() {
- if (!parent.isLocal()) return;
+void OutputInterceptor::sendDoOutput(size_t request) {
+ if (!parent.isLocal() || doingOutput || !moreOutput) return;
doingOutput = true;
- size_t request = writeEstimate.sending(getBuffered());
-
- // Note we may send 0 size request if there's more than 2*estimate in the buffer.
- // Send it anyway to keep the doOutput chain going until we are sure there's no more output
- // (in deliverDoOutput)
- //
parent.getCluster().getMulticast().mcastControl(
- ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId());
+ ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId());
QPID_LOG(trace, parent << "Send doOutput request for " << request);
}