diff options
Diffstat (limited to 'cpp/src/qpid/cluster/OutputInterceptor.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 46 |
1 files changed, 24 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 6af114a662..9062edc846 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -36,13 +36,16 @@ NoOpConnectionOutputHandler OutputInterceptor::discardHandler; OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) : parent(p), closing(false), next(&h), sent(), - writeEstimate(p.getCluster().getWriteEstimate()), + estimate(p.getCluster().getSettings().writeEstimate*1024), + minimum(p.getCluster().getSettings().writeMin*1024), moreOutput(), doingOutput() {} void OutputInterceptor::send(framing::AMQFrame& f) { parent.getCluster().checkQuorum(); { + // FIXME aconway 2009-04-28: locking around next-> may be redundant + // with the fixes to read-credit in the IO layer. Review. sys::Mutex::ScopedLock l(lock); next->send(f); } @@ -58,7 +61,7 @@ void OutputInterceptor::activateOutput() { else if (!closing) { // Don't send do ouput after output stopped. QPID_LOG(trace, parent << " activateOutput - sending doOutput"); moreOutput = true; - sendDoOutput(); + sendDoOutput(estimate); } } @@ -77,36 +80,35 @@ bool OutputInterceptor::doOutput() { return false; } // void OutputInterceptor::deliverDoOutput(size_t requested) { size_t buf = getBuffered(); - if (parent.isLocal()) - writeEstimate.delivered(requested, sent, buf); // Update the estimate. - - // Run the real doOutput() till we have added the requested data or there's nothing to output. + if (parent.isLocal()) { // Adjust estimate for next sendDoOutput + sent = sent > buf ? sent - buf : 0; // Buffered data was not sent. + if (buf > 0) // Wrote to capacity, move estimate towards sent. + estimate = (estimate + sent) /2; + else if (sent >= estimate) // Last estimate was too small, increase it. + estimate *= 2; + if (estimate < minimum) estimate = minimum; + } + // Run the real doOutput() till we have added the requested data + // or there's nothing to output. Record how much we send. sent = 0; do { moreOutput = parent.getBrokerConnection().doOutput(); } while (sent < requested && moreOutput); - sent += buf; // Include buffered data in the sent total. - QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput); - if (parent.isLocal() && moreOutput) { - QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available."); - sendDoOutput(); - } - else + sent += buf; // Include data previously in the buffer + + if (parent.isLocal()) { + // Send the next doOutput request doingOutput = false; + sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer? + } } // Send a doOutput request if one is not already in flight. -void OutputInterceptor::sendDoOutput() { - if (!parent.isLocal()) return; +void OutputInterceptor::sendDoOutput(size_t request) { + if (!parent.isLocal() || doingOutput || !moreOutput) return; doingOutput = true; - size_t request = writeEstimate.sending(getBuffered()); - - // Note we may send 0 size request if there's more than 2*estimate in the buffer. - // Send it anyway to keep the doOutput chain going until we are sure there's no more output - // (in deliverDoOutput) - // parent.getCluster().getMulticast().mcastControl( - ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId()); + ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId()); QPID_LOG(trace, parent << "Send doOutput request for " << request); } |