summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-24 19:48:54 +0000
committerAlan Conway <aconway@apache.org>2009-02-24 19:48:54 +0000
commit5996f46bccf1c0fa6bda145566d11b01064ef6dd (patch)
tree61cee350c55444ffb2ab02262c50fb2699037e7f /cpp/src/qpid/cluster/Connection.cpp
parent338297ff8c2c65a4226f3bc3fdd4da49269cfc9a (diff)
downloadqpid-python-5996f46bccf1c0fa6bda145566d11b01064ef6dd.tar.gz
Fixed issue with producer flow control in a cluster.
Producer flow control uses a Timer and other clock-based calculations to send flow control commands. These commands are not predictably ordered from the clusters point of view. Added getClusterOrderProxy() to SessionState. In a cluster it returns a proxy that defers sending a command to the client until it is multicast to the cluster. In a stand alone broker it is just the normal proxy. Updated producer flow control to use this proxy. Cluster flow control is turned off in shadow connections. Only the directly connected node does flow control calculations and multicasts the commands to send. All nodes sending of the commands thru SessionState to ensure consistent session state (e.g. command numbering.) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747528 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp22
1 files changed, 18 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 1a3f7c4ef7..9c2b4f1638 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,7 +62,8 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
+ connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+ mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
// Local connections
@@ -70,15 +71,20 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink)
+ expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocalClient()) {
+ if (isLocalClient()) {
+ connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
cluster.addLocalConnection(this);
giveReadCredit(cluster.getReadMax());
}
+ else { // Shadow or catch-up connection
+ connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
+ connection.setClientThrottling(false); // Disable client throttling, done by active node.
+ }
}
void Connection::giveReadCredit(int credit) {
@@ -143,7 +149,15 @@ void Connection::deliveredFrame(const EventFrame& f) {
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
+ // FIXME aconway 2009-02-24: Using the DATA/CONTROL
+ // distinction to distinguish incoming vs. outgoing frames is
+ // very unclear.
+ if (f.type == DATA) // incoming data frames to broker::Connection
+ connection.received(const_cast<AMQFrame&>(f.frame));
+ else { // outgoing data frame, send via SessionState
+ broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+ }
}
giveReadCredit(f.readCredit);
}