diff options
Diffstat (limited to 'cpp/src/qpid/cluster/OutputInterceptor.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 73 |
1 files changed, 29 insertions, 44 deletions
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index a7ec82128b..f898957351 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -32,17 +32,17 @@ namespace qpid { namespace cluster { using namespace framing; +using namespace std; NoOpConnectionOutputHandler OutputInterceptor::discardHandler; OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) - : parent(p), closing(false), next(&h), sent(), - estimate(p.getCluster().getSettings().writeEstimate*1024), - minimum(p.getCluster().getSettings().writeMin*1024), - moreOutput(), doingOutput() + : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false) {} -LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;) +#if defined QPID_LATENCY_TRACKER +extern sys::LatencyTracker<const AMQBody*> doOutputTracker; +#endif void OutputInterceptor::send(framing::AMQFrame& f) { LATENCY_TRACK(doOutputTracker.finish(f.getBody())); @@ -53,8 +53,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) { sys::Mutex::ScopedLock l(lock); next->send(f); } - if (!parent.isCatchUp()) - sent += f.encodedSize(); } void OutputInterceptor::activateOutput() { @@ -62,11 +60,8 @@ void OutputInterceptor::activateOutput() { sys::Mutex::ScopedLock l(lock); next->activateOutput(); } - else if (!closing) { // Don't send do ouput after output stopped. - QPID_LOG(trace, parent << " activateOutput - sending doOutput"); - moreOutput = true; - sendDoOutput(estimate); - } + else + sendDoOutput(sendMax); } void OutputInterceptor::giveReadCredit(int32_t credit) { @@ -77,43 +72,33 @@ void OutputInterceptor::giveReadCredit(int32_t credit) { // Called in write thread when the IO layer has no more data to write. // We do nothing in the write thread, we run doOutput only on delivery // of doOutput requests. -bool OutputInterceptor::doOutput() { return false; } - -// Delivery of doOutput allows us to run the real connection doOutput() -// which tranfers frames to the codec for writing. -// -void OutputInterceptor::deliverDoOutput(size_t requested) { - size_t buf = getBuffered(); - if (parent.isLocal()) { // Adjust estimate for next sendDoOutput - sent = sent > buf ? sent - buf : 0; // Buffered data was not sent. - if (buf > 0) // Wrote to capacity, move estimate towards sent. - estimate = (estimate + sent) /2; - else if (sent >= estimate) // Last estimate was too small, increase it. - estimate *= 2; - if (estimate < minimum) estimate = minimum; - } - // Run the real doOutput() till we have added the requested data - // or there's nothing to output. Record how much we send. - sent = 0; - do { - moreOutput = parent.getBrokerConnection().doOutput(); - } while (sent < requested && moreOutput); - sent += buf; // Include data previously in the buffer +bool OutputInterceptor::doOutput() { return false; } +// Send output up to limit, calculate new limit. +void OutputInterceptor::deliverDoOutput(uint32_t limit) { + sentDoOutput = false; + sendMax = limit; + size_t newLimit = limit; if (parent.isLocal()) { - // Send the next doOutput request - doingOutput = false; - sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer? + size_t buffered = getBuffered(); + if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit. + newLimit = sendMax*2; + else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit. + newLimit = sent-1; } + sent = 0; + while (sent < limit && parent.getBrokerConnection().doOutput()) + ++sent; + if (sent == limit) sendDoOutput(newLimit); } -// Send a doOutput request if one is not already in flight. -void OutputInterceptor::sendDoOutput(size_t request) { - if (!parent.isLocal() || doingOutput || !moreOutput) return; - doingOutput = true; - parent.getCluster().getMulticast().mcastControl( - ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId()); - QPID_LOG(trace, parent << "Send doOutput request for " << request); +void OutputInterceptor::sendDoOutput(size_t newLimit) { + if (parent.isLocal() && !sentDoOutput && !closing) { + sentDoOutput = true; + parent.getCluster().getMulticast().mcastControl( + ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit), + parent.getId()); + } } void OutputInterceptor::closeOutput() { |