summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-02-02 22:28:17 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-02-02 22:28:17 +0000
commit3d0a65cddbb5579e1a4c3bc916b09986d3df54f9 (patch)
treea14d1c5a10963edee2913a627de5a73d58b34818
parent6f817e7c800c5d814b9cffd3399ad5db4dcb44da (diff)
downloadqpid-python-3d0a65cddbb5579e1a4c3bc916b09986d3df54f9.tar.gz
Send client property indicating that client supports
producer throttling in the Connection.OpenOK message. Broker only tries to apply flow control to client if it has received the property in the Connection.OpenOK message. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@740135 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.h7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp4
5 files changed, 21 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 80d828584d..71e03d76d4 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -115,7 +115,6 @@ class Connection : public sys::ConnectionInputHandler,
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
ChannelMap channels;
- //framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
const bool isLink;
bool mgmtClosing;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 86123d346f..f136d61462 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -45,6 +45,8 @@ const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
const std::string QPID_FED_LINK = "qpid.fed_link";
const std::string QPID_FED_TAG = "qpid.federation_tag";
+const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
+const int SESSION_FLOW_CONTROL_VER = 1;
}
void ConnectionHandler::close(connection::CloseCode code, const string& text)
@@ -139,6 +141,9 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProper
}
QPID_LOG(info, "Connection is a federation link");
}
+ if ( clientProperties.getAsInt(SESSION_FLOW_CONTROL) == SESSION_FLOW_CONTROL_VER ) {
+ connection.setClientThrottling();
+ }
}
void ConnectionHandler::Handler::secureOk(const string& response)
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h
index 09a92dfd4f..0e9d211b56 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionState.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionState.h
@@ -47,7 +47,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable
heartbeat(0),
heartbeatmax(120),
stagingThreshold(broker.getStagingThreshold()),
- federationLink(true)
+ federationLink(true),
+ clientSupportsThrottling(false)
{}
virtual ~ConnectionState () {}
@@ -73,6 +74,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); }
const string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
+
+ void setClientThrottling() { clientSupportsThrottling = true; }
+ bool getClientThrottling() const { return clientSupportsThrottling; }
Broker& getBroker() { return broker; }
@@ -98,6 +102,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
bool federationLink;
string federationPeerTag;
std::vector<Url> knownHosts;
+ bool clientSupportsThrottling;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 5039b31874..9088be2e54 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -77,8 +77,12 @@ SessionState::SessionState(
}
}
uint32_t maxRate = broker.getOptions().maxSessionRate;
- if (maxRate) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ if (maxRate) {
+ if (handler->getConnection().getClientThrottling()) {
+ rateFlowcontrol = new RateFlowcontrol(maxRate);
+ } else {
+ QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
+ }
}
attach(h);
}
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index d6d024cf3f..377b84c019 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -51,6 +51,8 @@ const std::string INVALID_STATE_TUNE("tune received in invalid state");
const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
+const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
+const int SESSION_FLOW_CONTROL_VER = 1;
}
CloseCode ConnectionHandler::convert(uint16_t replyCode)
@@ -76,6 +78,8 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio
FINISHED.insert(FAILED);
FINISHED.insert(CLOSED);
+
+ properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER);
}
void ConnectionHandler::incoming(AMQFrame& frame)