diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionState.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 4 |
5 files changed, 21 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 80d828584d..71e03d76d4 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -115,7 +115,6 @@ class Connection : public sys::ConnectionInputHandler, typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; ChannelMap channels; - //framing::AMQP_ClientProxy::Connection* client; ConnectionHandler adapter; const bool isLink; bool mgmtClosing; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 86123d346f..f136d61462 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -45,6 +45,8 @@ const std::string PLAIN = "PLAIN"; const std::string en_US = "en_US"; const std::string QPID_FED_LINK = "qpid.fed_link"; const std::string QPID_FED_TAG = "qpid.federation_tag"; +const std::string SESSION_FLOW_CONTROL("qpid.session_flow"); +const int SESSION_FLOW_CONTROL_VER = 1; } void ConnectionHandler::close(connection::CloseCode code, const string& text) @@ -139,6 +141,9 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper } QPID_LOG(info, "Connection is a federation link"); } + if ( clientProperties.getAsInt(SESSION_FLOW_CONTROL) == SESSION_FLOW_CONTROL_VER ) { + connection.setClientThrottling(); + } } void ConnectionHandler::Handler::secureOk(const string& response) diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h index 09a92dfd4f..0e9d211b56 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionState.h +++ b/qpid/cpp/src/qpid/broker/ConnectionState.h @@ -47,7 +47,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable heartbeat(0), heartbeatmax(120), stagingThreshold(broker.getStagingThreshold()), - federationLink(true) + federationLink(true), + clientSupportsThrottling(false) {} virtual ~ConnectionState () {} @@ -73,6 +74,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); } const string& getFederationPeerTag() const { return federationPeerTag; } std::vector<Url>& getKnownHosts() { return knownHosts; } + + void setClientThrottling() { clientSupportsThrottling = true; } + bool getClientThrottling() const { return clientSupportsThrottling; } Broker& getBroker() { return broker; } @@ -98,6 +102,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable bool federationLink; string federationPeerTag; std::vector<Url> knownHosts; + bool clientSupportsThrottling; }; }} diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 5039b31874..9088be2e54 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -77,8 +77,12 @@ SessionState::SessionState( } } uint32_t maxRate = broker.getOptions().maxSessionRate; - if (maxRate) { - rateFlowcontrol = new RateFlowcontrol(maxRate); + if (maxRate) { + if (handler->getConnection().getClientThrottling()) { + rateFlowcontrol = new RateFlowcontrol(maxRate); + } else { + QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); + } } attach(h); } diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index d6d024cf3f..377b84c019 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -51,6 +51,8 @@ const std::string INVALID_STATE_TUNE("tune received in invalid state"); const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); +const std::string SESSION_FLOW_CONTROL("qpid.session_flow"); +const int SESSION_FLOW_CONTROL_VER = 1; } CloseCode ConnectionHandler::convert(uint16_t replyCode) @@ -76,6 +78,8 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio FINISHED.insert(FAILED); FINISHED.insert(CLOSED); + + properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER); } void ConnectionHandler::incoming(AMQFrame& frame) |