summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp22
1 files changed, 18 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 1a3f7c4ef7..9c2b4f1638 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,7 +62,8 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
+ connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+ mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
// Local connections
@@ -70,15 +71,20 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink)
+ expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocalClient()) {
+ if (isLocalClient()) {
+ connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
cluster.addLocalConnection(this);
giveReadCredit(cluster.getReadMax());
}
+ else { // Shadow or catch-up connection
+ connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
+ connection.setClientThrottling(false); // Disable client throttling, done by active node.
+ }
}
void Connection::giveReadCredit(int credit) {
@@ -143,7 +149,15 @@ void Connection::deliveredFrame(const EventFrame& f) {
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
+ // FIXME aconway 2009-02-24: Using the DATA/CONTROL
+ // distinction to distinguish incoming vs. outgoing frames is
+ // very unclear.
+ if (f.type == DATA) // incoming data frames to broker::Connection
+ connection.received(const_cast<AMQFrame&>(f.frame));
+ else { // outgoing data frame, send via SessionState
+ broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+ }
}
giveReadCredit(f.readCredit);
}