From 2b97a69197fb986c209339c48ed98bb45203e107 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 21 Aug 2008 18:04:18 +0000 Subject: Pre-buffering output strategy for cluster. Additional hooks in broker code, should not affect standalone broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687813 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/ConnectionInterceptor.cpp | 36 +++++++++++--------------- 1 file changed, 15 insertions(+), 21 deletions(-) (limited to 'cpp/src/qpid/cluster/ConnectionInterceptor.cpp') diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp index c13651eccb..efcab1b731 100644 --- a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -22,6 +22,8 @@ #include "qpid/framing/ClusterConnectionCloseBody.h" #include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/framing/AMQFrame.h" +#include + namespace qpid { namespace cluster { @@ -32,24 +34,27 @@ template void shift(T& a, U& b, const V& c) { a = b; ConnectionInterceptor::ConnectionInterceptor( broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_) - : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_) + : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get()) { connection->addFinalizer(boost::bind(operator delete, this)); + connection->setOutputHandler(&output), // Attach my functions to Connection extension points. shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1)); shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this)); - shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this)); + shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output)); } ConnectionInterceptor::~ConnectionInterceptor() { assert(connection == 0); } +// Forward all received frames to the cluster, continue handling on delivery. void ConnectionInterceptor::received(framing::AMQFrame& f) { if (isClosed) return; cluster.send(f, this); } +// Continue normal handling of delivered frames. void ConnectionInterceptor::deliver(framing::AMQFrame& f) { receivedNext(f); } @@ -81,28 +86,17 @@ void ConnectionInterceptor::deliverClosed() { } void ConnectionInterceptor::dirtyClose() { - // Not closed via cluster self-delivery but closed locally. - // Used for dirty cluster shutdown where active connections - // must be cleaned up. + // Not closed via cluster self-delivery but closed locally. Used + // when local broker is shut down without a clean cluster shutdown. + // Release the connection, it will delete this. connection = 0; } -bool ConnectionInterceptor::doOutput() { - // FIXME aconway 2008-08-15: this is not correct. - // Run in write threads so order of execution of doOutput is not determinate. - // Will only work reliably for in single-consumer tests. - - if (connection->hasOutput()) { - cluster.send(AMQFrame(in_place()), this); - return doOutputNext(); - } - return false; -} - -void ConnectionInterceptor::deliverDoOutput() { - // FIXME aconway 2008-08-15: see comment in doOutput. - if (isShadow()) - doOutputNext(); +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void ConnectionInterceptor::deliverDoOutput(size_t requested) { + output.deliverDoOutput(requested); } }} // namespace qpid::cluster -- cgit v1.2.1