diff options
author | Alan Conway <aconway@apache.org> | 2009-02-24 19:48:54 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-02-24 19:48:54 +0000 |
commit | 5996f46bccf1c0fa6bda145566d11b01064ef6dd (patch) | |
tree | 61cee350c55444ffb2ab02262c50fb2699037e7f /cpp/src/qpid/broker/ConnectionState.h | |
parent | 338297ff8c2c65a4226f3bc3fdd4da49269cfc9a (diff) | |
download | qpid-python-5996f46bccf1c0fa6bda145566d11b01064ef6dd.tar.gz |
Fixed issue with producer flow control in a cluster.
Producer flow control uses a Timer and other clock-based calculations to send flow control commands.
These commands are not predictably ordered from the clusters point of view.
Added getClusterOrderProxy() to SessionState. In a cluster it returns
a proxy that defers sending a command to the client until it is
multicast to the cluster. In a stand alone broker it is just the
normal proxy. Updated producer flow control to use this proxy.
Cluster flow control is turned off in shadow connections. Only the
directly connected node does flow control calculations and multicasts
the commands to send. All nodes sending of the commands thru SessionState
to ensure consistent session state (e.g. command numbering.)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747528 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/ConnectionState.h')
-rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 21 |
1 files changed, 16 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 0e9d211b56..0d7fbc5b3b 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -48,8 +48,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable heartbeatmax(120), stagingThreshold(broker.getStagingThreshold()), federationLink(true), - clientSupportsThrottling(false) - {} + clientSupportsThrottling(false), + clusterOrderOut(0) + {} virtual ~ConnectionState () {} @@ -75,7 +76,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable const string& getFederationPeerTag() const { return federationPeerTag; } std::vector<Url>& getKnownHosts() { return knownHosts; } - void setClientThrottling() { clientSupportsThrottling = true; } + void setClientThrottling(bool set=true) { clientSupportsThrottling = set; } bool getClientThrottling() const { return clientSupportsThrottling; } Broker& getBroker() { return broker; } @@ -86,11 +87,20 @@ class ConnectionState : public ConnectionToken, public management::Manageable //contained output tasks sys::AggregateOutput outputTasks; - sys::ConnectionOutputHandlerPtr& getOutput() { return out; } + sys::ConnectionOutputHandler& getOutput() { return out; } framing::ProtocolVersion getVersion() const { return version; } - void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); } + /** + * If the broker is part of a cluster, this is a handler provided + * by cluster code. It ensures consistent ordering of commands + * that are sent based on criteria that are not predictably + * ordered cluster-wide, e.g. a timer firing. + */ + framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; } + void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh; } + + protected: framing::ProtocolVersion version; uint32_t framemax; @@ -103,6 +113,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable string federationPeerTag; std::vector<Url> knownHosts; bool clientSupportsThrottling; + framing::FrameHandler* clusterOrderOut; }; }} |