summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.h7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp4
5 files changed, 21 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 80d828584d..71e03d76d4 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -115,7 +115,6 @@ class Connection : public sys::ConnectionInputHandler,
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
ChannelMap channels;
- //framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
const bool isLink;
bool mgmtClosing;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 86123d346f..f136d61462 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -45,6 +45,8 @@ const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
const std::string QPID_FED_LINK = "qpid.fed_link";
const std::string QPID_FED_TAG = "qpid.federation_tag";
+const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
+const int SESSION_FLOW_CONTROL_VER = 1;
}
void ConnectionHandler::close(connection::CloseCode code, const string& text)
@@ -139,6 +141,9 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper
}
QPID_LOG(info, "Connection is a federation link");
}
+ if ( clientProperties.getAsInt(SESSION_FLOW_CONTROL) == SESSION_FLOW_CONTROL_VER ) {
+ connection.setClientThrottling();
+ }
}
void ConnectionHandler::Handler::secureOk(const string& response)
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h
index 09a92dfd4f..0e9d211b56 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionState.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionState.h
@@ -47,7 +47,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable
heartbeat(0),
heartbeatmax(120),
stagingThreshold(broker.getStagingThreshold()),
- federationLink(true)
+ federationLink(true),
+ clientSupportsThrottling(false)
{}
virtual ~ConnectionState () {}
@@ -73,6 +74,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); }
const string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
+
+ void setClientThrottling() { clientSupportsThrottling = true; }
+ bool getClientThrottling() const { return clientSupportsThrottling; }
Broker& getBroker() { return broker; }
@@ -98,6 +102,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
bool federationLink;
string federationPeerTag;
std::vector<Url> knownHosts;
+ bool clientSupportsThrottling;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 5039b31874..9088be2e54 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -77,8 +77,12 @@ SessionState::SessionState(
}
}
uint32_t maxRate = broker.getOptions().maxSessionRate;
- if (maxRate) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ if (maxRate) {
+ if (handler->getConnection().getClientThrottling()) {
+ rateFlowcontrol = new RateFlowcontrol(maxRate);
+ } else {
+ QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
+ }
}
attach(h);
}
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index d6d024cf3f..377b84c019 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -51,6 +51,8 @@ const std::string INVALID_STATE_TUNE("tune received in invalid state");
const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
+const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
+const int SESSION_FLOW_CONTROL_VER = 1;
}
CloseCode ConnectionHandler::convert(uint16_t replyCode)
@@ -76,6 +78,8 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio
FINISHED.insert(FAILED);
FINISHED.insert(CLOSED);
+
+ properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER);
}
void ConnectionHandler::incoming(AMQFrame& frame)