summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-21 18:04:18 +0000
committerAlan Conway <aconway@apache.org>2008-08-21 18:04:18 +0000
commit2b97a69197fb986c209339c48ed98bb45203e107 (patch)
tree8bd157cc9d19757b6d9c00c5ab2c353ca336f8bf /cpp/src/qpid/cluster/ConnectionInterceptor.cpp
parentc6c237e2450250a6ef18c5af93e2a733aba10932 (diff)
downloadqpid-python-2b97a69197fb986c209339c48ed98bb45203e107.tar.gz
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/ConnectionInterceptor.cpp')
-rw-r--r--cpp/src/qpid/cluster/ConnectionInterceptor.cpp36
1 files changed, 15 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
index c13651eccb..efcab1b731 100644
--- a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
+++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
@@ -22,6 +22,8 @@
#include "qpid/framing/ClusterConnectionCloseBody.h"
#include "qpid/framing/ClusterConnectionDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
namespace qpid {
namespace cluster {
@@ -32,24 +34,27 @@ template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b;
ConnectionInterceptor::ConnectionInterceptor(
broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_)
- : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_)
+ : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get())
{
connection->addFinalizer(boost::bind(operator delete, this));
+ connection->setOutputHandler(&output),
// Attach my functions to Connection extension points.
shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
- shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this));
+ shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output));
}
ConnectionInterceptor::~ConnectionInterceptor() {
assert(connection == 0);
}
+// Forward all received frames to the cluster, continue handling on delivery.
void ConnectionInterceptor::received(framing::AMQFrame& f) {
if (isClosed) return;
cluster.send(f, this);
}
+// Continue normal handling of delivered frames.
void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
receivedNext(f);
}
@@ -81,28 +86,17 @@ void ConnectionInterceptor::deliverClosed() {
}
void ConnectionInterceptor::dirtyClose() {
- // Not closed via cluster self-delivery but closed locally.
- // Used for dirty cluster shutdown where active connections
- // must be cleaned up.
+ // Not closed via cluster self-delivery but closed locally. Used
+ // when local broker is shut down without a clean cluster shutdown.
+ // Release the connection, it will delete this.
connection = 0;
}
-bool ConnectionInterceptor::doOutput() {
- // FIXME aconway 2008-08-15: this is not correct.
- // Run in write threads so order of execution of doOutput is not determinate.
- // Will only work reliably for in single-consumer tests.
-
- if (connection->hasOutput()) {
- cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
- return doOutputNext();
- }
- return false;
-}
-
-void ConnectionInterceptor::deliverDoOutput() {
- // FIXME aconway 2008-08-15: see comment in doOutput.
- if (isShadow())
- doOutputNext();
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void ConnectionInterceptor::deliverDoOutput(size_t requested) {
+ output.deliverDoOutput(requested);
}
}} // namespace qpid::cluster