diff options
Diffstat (limited to 'cpp/src/qpid/cluster/ConnectionInterceptor.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.cpp | 36 |
1 files changed, 15 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp index c13651eccb..efcab1b731 100644 --- a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -22,6 +22,8 @@ #include "qpid/framing/ClusterConnectionCloseBody.h" #include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/framing/AMQFrame.h" +#include <boost/current_function.hpp> + namespace qpid { namespace cluster { @@ -32,24 +34,27 @@ template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; ConnectionInterceptor::ConnectionInterceptor( broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_) - : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_) + : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get()) { connection->addFinalizer(boost::bind(operator delete, this)); + connection->setOutputHandler(&output), // Attach my functions to Connection extension points. shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1)); shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this)); - shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this)); + shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output)); } ConnectionInterceptor::~ConnectionInterceptor() { assert(connection == 0); } +// Forward all received frames to the cluster, continue handling on delivery. void ConnectionInterceptor::received(framing::AMQFrame& f) { if (isClosed) return; cluster.send(f, this); } +// Continue normal handling of delivered frames. void ConnectionInterceptor::deliver(framing::AMQFrame& f) { receivedNext(f); } @@ -81,28 +86,17 @@ void ConnectionInterceptor::deliverClosed() { } void ConnectionInterceptor::dirtyClose() { - // Not closed via cluster self-delivery but closed locally. - // Used for dirty cluster shutdown where active connections - // must be cleaned up. + // Not closed via cluster self-delivery but closed locally. Used + // when local broker is shut down without a clean cluster shutdown. + // Release the connection, it will delete this. connection = 0; } -bool ConnectionInterceptor::doOutput() { - // FIXME aconway 2008-08-15: this is not correct. - // Run in write threads so order of execution of doOutput is not determinate. - // Will only work reliably for in single-consumer tests. - - if (connection->hasOutput()) { - cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this); - return doOutputNext(); - } - return false; -} - -void ConnectionInterceptor::deliverDoOutput() { - // FIXME aconway 2008-08-15: see comment in doOutput. - if (isShadow()) - doOutputNext(); +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void ConnectionInterceptor::deliverDoOutput(size_t requested) { + output.deliverDoOutput(requested); } }} // namespace qpid::cluster |