summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/OutputInterceptor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/OutputInterceptor.cpp')
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp73
1 files changed, 29 insertions, 44 deletions
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index a7ec82128b..f898957351 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -32,17 +32,17 @@ namespace qpid {
namespace cluster {
using namespace framing;
+using namespace std;
NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
- : parent(p), closing(false), next(&h), sent(),
- estimate(p.getCluster().getSettings().writeEstimate*1024),
- minimum(p.getCluster().getSettings().writeMin*1024),
- moreOutput(), doingOutput()
+ : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false)
{}
-LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;)
+#if defined QPID_LATENCY_TRACKER
+extern sys::LatencyTracker<const AMQBody*> doOutputTracker;
+#endif
void OutputInterceptor::send(framing::AMQFrame& f) {
LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
@@ -53,8 +53,6 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
sys::Mutex::ScopedLock l(lock);
next->send(f);
}
- if (!parent.isCatchUp())
- sent += f.encodedSize();
}
void OutputInterceptor::activateOutput() {
@@ -62,11 +60,8 @@ void OutputInterceptor::activateOutput() {
sys::Mutex::ScopedLock l(lock);
next->activateOutput();
}
- else if (!closing) { // Don't send do ouput after output stopped.
- QPID_LOG(trace, parent << " activateOutput - sending doOutput");
- moreOutput = true;
- sendDoOutput(estimate);
- }
+ else
+ sendDoOutput(sendMax);
}
void OutputInterceptor::giveReadCredit(int32_t credit) {
@@ -77,43 +72,33 @@ void OutputInterceptor::giveReadCredit(int32_t credit) {
// Called in write thread when the IO layer has no more data to write.
// We do nothing in the write thread, we run doOutput only on delivery
// of doOutput requests.
-bool OutputInterceptor::doOutput() { return false; }
-
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which tranfers frames to the codec for writing.
-//
-void OutputInterceptor::deliverDoOutput(size_t requested) {
- size_t buf = getBuffered();
- if (parent.isLocal()) { // Adjust estimate for next sendDoOutput
- sent = sent > buf ? sent - buf : 0; // Buffered data was not sent.
- if (buf > 0) // Wrote to capacity, move estimate towards sent.
- estimate = (estimate + sent) /2;
- else if (sent >= estimate) // Last estimate was too small, increase it.
- estimate *= 2;
- if (estimate < minimum) estimate = minimum;
- }
- // Run the real doOutput() till we have added the requested data
- // or there's nothing to output. Record how much we send.
- sent = 0;
- do {
- moreOutput = parent.getBrokerConnection().doOutput();
- } while (sent < requested && moreOutput);
- sent += buf; // Include data previously in the buffer
+bool OutputInterceptor::doOutput() { return false; }
+// Send output up to limit, calculate new limit.
+void OutputInterceptor::deliverDoOutput(uint32_t limit) {
+ sentDoOutput = false;
+ sendMax = limit;
+ size_t newLimit = limit;
if (parent.isLocal()) {
- // Send the next doOutput request
- doingOutput = false;
- sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer?
+ size_t buffered = getBuffered();
+ if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
+ newLimit = sendMax*2;
+ else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
+ newLimit = sent-1;
}
+ sent = 0;
+ while (sent < limit && parent.getBrokerConnection().doOutput())
+ ++sent;
+ if (sent == limit) sendDoOutput(newLimit);
}
-// Send a doOutput request if one is not already in flight.
-void OutputInterceptor::sendDoOutput(size_t request) {
- if (!parent.isLocal() || doingOutput || !moreOutput) return;
- doingOutput = true;
- parent.getCluster().getMulticast().mcastControl(
- ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId());
- QPID_LOG(trace, parent << "Send doOutput request for " << request);
+void OutputInterceptor::sendDoOutput(size_t newLimit) {
+ if (parent.isLocal() && !sentDoOutput && !closing) {
+ sentDoOutput = true;
+ parent.getCluster().getMulticast().mcastControl(
+ ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
+ parent.getId());
+ }
}
void OutputInterceptor::closeOutput() {