summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-01-13 18:22:40 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-01-13 18:22:40 +0000
commit2e958af105d5cb992fc69c3969b9ccccd330ea40 (patch)
tree2a66675fe1ada6a345048ce21969edfc3cb0a210
parent39f068763c649f63f2c42ab19e76d4f76d612ff5 (diff)
downloadqpid-python-2e958af105d5cb992fc69c3969b9ccccd330ea40.tar.gz
QPID-3630: remove deprecated rate limiting feature
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1231221 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Broker.cpp8
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp5
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h5
-rw-r--r--cpp/src/qpid/broker/RateFlowcontrol.h105
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp87
-rw-r--r--cpp/src/qpid/broker/SessionState.h8
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp2
-rw-r--r--cpp/src/tests/CMakeLists.txt1
-rw-r--r--cpp/src/tests/Makefile.am1
-rw-r--r--cpp/src/tests/RateFlowcontrolTest.cpp71
11 files changed, 0 insertions, 294 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index a2ffcf8545..89532ae256 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -120,7 +120,6 @@ Broker::Options::Options(const std::string& name) :
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(false),
requireEncrypted(false),
- maxSessionRate(0),
asyncQueueEvents(false), // Must be false in a cluster.
qmf2Support(true),
qmf1Support(true),
@@ -160,7 +159,6 @@ Broker::Options::Options(const std::string& name) :
("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location")
- ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
@@ -339,12 +337,6 @@ Broker::Broker(const Broker::Options& conf) :
knownBrokers.push_back(Url(conf.knownHosts));
}
- // check for and warn if deprecated features have been configured
- if (conf.maxSessionRate) {
- QPID_LOG(warning, "The 'max-session-rate' feature will be removed in a future release of QPID."
- " Queue-based flow control should be used instead.");
- }
-
} catch (const std::exception& /*e*/) {
finalize();
throw;
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index b3b751be98..840d47ac38 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -114,7 +114,6 @@ public:
bool requireEncrypted;
std::string knownHosts;
std::string saslConfigPath;
- uint32_t maxSessionRate;
bool asyncQueueEvents;
bool qmf2Support;
bool qmf1Support;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 7cd91ae539..6048a46f79 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -46,11 +46,9 @@ const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
const std::string QPID_FED_LINK = "qpid.fed_link";
const std::string QPID_FED_TAG = "qpid.federation_tag";
-const std::string SESSION_FLOW_CONTROL("qpid.session_flow");
const std::string CLIENT_PROCESS_NAME("qpid.client_process");
const std::string CLIENT_PID("qpid.client_pid");
const std::string CLIENT_PPID("qpid.client_ppid");
-const int SESSION_FLOW_CONTROL_VER = 1;
const std::string SPACE(" ");
}
@@ -173,9 +171,6 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
}
QPID_LOG(info, "Connection is a federation link");
}
- if (clientProperties.getAsInt(SESSION_FLOW_CONTROL) == SESSION_FLOW_CONTROL_VER) {
- connection.setClientThrottling();
- }
if (connection.getMgmtObject() != 0) {
string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 13205e3a3d..4dfd86fd8e 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -48,7 +48,6 @@ class ConnectionState : public ConnectionToken, public management::Manageable
heartbeatmax(120),
userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering)
federationLink(true),
- clientSupportsThrottling(false),
clusterOrderOut(0),
isDefaultRealm(false)
{}
@@ -92,9 +91,6 @@ class ConnectionState : public ConnectionToken, public management::Manageable
return (id == userId || (isDefaultRealm && id == userName));
}
- void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
- bool getClientThrottling() const { return clientSupportsThrottling; }
-
Broker& getBroker() { return broker; }
Broker& broker;
@@ -128,7 +124,6 @@ class ConnectionState : public ConnectionToken, public management::Manageable
bool federationLink;
std::string federationPeerTag;
std::vector<Url> knownHosts;
- bool clientSupportsThrottling;
framing::FrameHandler* clusterOrderOut;
std::string userName;
bool isDefaultRealm;
diff --git a/cpp/src/qpid/broker/RateFlowcontrol.h b/cpp/src/qpid/broker/RateFlowcontrol.h
deleted file mode 100644
index 99f9d2c0c4..0000000000
--- a/cpp/src/qpid/broker/RateFlowcontrol.h
+++ /dev/null
@@ -1,105 +0,0 @@
-#ifndef broker_RateFlowcontrol_h
-#define broker_RateFlowcontrol_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Time.h"
-#include "qpid/sys/IntegerTypes.h"
-
-#include <algorithm>
-
-namespace qpid {
-namespace broker {
-
-// Class to keep track of issuing flow control to make sure that the peer doesn't exceed
-// a given message rate
-//
-// Create the object with the target rate
-// Then call sendCredit() whenever credit is issued to the peer
-// Call receivedMessage() whenever a message is received, it returns the credit to issue.
-//
-// sentCredit() be sensibly called with a 0 parameter to indicate
-// that we sent credit but treat it as if the value was 0 (we may do this at the start of the connection
-// to allow our peer to send messages)
-//
-// receivedMessage() can be called with 0 to indicate that we've not received a message, but
-// tell me what credit I can send.
-class RateFlowcontrol {
- uint32_t rate; // messages per second
- uint32_t maxCredit; // max credit issued to client (issued at start)
- uint32_t requestedCredit;
- qpid::sys::AbsTime creditSent;
-
-public:
- RateFlowcontrol(uint32_t r) :
- rate(r),
- maxCredit(0),
- requestedCredit(0),
- creditSent(qpid::sys::FAR_FUTURE)
- {}
-
- uint32_t getRate() const {
- return rate;
- }
- void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit);
- uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs);
- uint32_t availableCredit(const qpid::sys::AbsTime& t);
- bool flowStopped() const;
-};
-
-inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t credit) {
- // If the client isn't currently requesting credit (ie it's not sent us anything yet) then
- // this credit goes to the max credit held by the client (it can't go to reduce credit
- // less than 0)
- int32_t nextRequestedCredit = requestedCredit - credit;
- if ( nextRequestedCredit<0 ) {
- requestedCredit = 0;
- maxCredit -= nextRequestedCredit;
- } else {
- requestedCredit = nextRequestedCredit;
- }
- creditSent = t;
-}
-
-inline uint32_t RateFlowcontrol::availableCredit(const qpid::sys::AbsTime& t) {
- qpid::sys::Duration d(creditSent, t);
- // Could be -ve before first sentCredit
- int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit));
- return toSend > 0 ? toSend : 0;
-}
-
-inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) {
- requestedCredit +=msgs;
- // Don't send credit for every message, only send if more than 0.5s since last credit or
- // we've got less than .25 of the max left (heuristic)
- return requestedCredit*4 >= maxCredit*3 || qpid::sys::Duration(creditSent, t) >= 500*qpid::sys::TIME_MSEC
- ? availableCredit(t)
- : 0;
-}
-
-inline bool RateFlowcontrol::flowStopped() const {
- return requestedCredit >= maxCredit;
-}
-
-}}
-
-#endif // broker_RateFlowcontrol_h
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 1ab17e9893..3371421bf7 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -24,7 +24,6 @@
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
-#include "qpid/broker/RateFlowcontrol.h"
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQContentBody.h"
@@ -62,17 +61,8 @@ SessionState::SessionState(
adapter(semanticState),
msgBuilder(&broker.getStore()),
mgmtObject(0),
- rateFlowcontrol(0),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
- uint32_t maxRate = broker.getOptions().maxSessionRate;
- if (maxRate) {
- if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
- } else {
- QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
- }
- }
if (!delayManagement) addManagementObject();
attach(h);
}
@@ -88,8 +78,6 @@ void SessionState::addManagementObject() {
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
- if (rateFlowcontrol)
- mgmtObject->set_maxClientRate(rateFlowcontrol->getRate());
agent->addObject(mgmtObject);
}
}
@@ -100,9 +88,6 @@ SessionState::~SessionState() {
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
-
- if (flowControlTimer)
- flowControlTimer->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -221,30 +206,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
}
}
-struct ScheduledCreditTask : public sys::TimerTask {
- sys::Timer& timer;
- SessionState& sessionState;
- ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t,
- SessionState& s) :
- TimerTask(d,"ScheduledCredit"),
- timer(t),
- sessionState(s)
- {}
-
- void fire() {
- // This is the best we can currently do to avoid a destruction/fire race
- sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
- }
-
- void sendCredit() {
- if ( !sessionState.processSendCredit(0) ) {
- QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
- setupNextFire();
- timer.add(this);
- }
- }
-};
-
void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
if (frame.getBof() && frame.getBos()) //start of frameset
@@ -268,43 +229,6 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
}
-
- // Handle producer session flow control
- if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
- if ( !processSendCredit(1) ) {
- QPID_LOG(debug, getId() << ": Schedule sending credit");
- sys::Timer& timer = getBroker().getTimer();
- // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
- sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
- flowControlTimer = new ScheduledCreditTask(d, timer, *this);
- timer.add(flowControlTimer);
- }
- }
-}
-
-bool SessionState::processSendCredit(uint32_t msgs)
-{
- qpid::sys::ScopedLock<Mutex> l(rateLock);
- // Check for violating flow control
- if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
- QPID_LOG(warning, getId() << ": producer throttling violation");
- // TODO: Probably do message.stop("") first time then disconnect
- // See comment on getClusterOrderProxy() in .h file
- getClusterOrderProxy().getMessage().stop("");
- return true;
- }
- AbsTime now = AbsTime::now();
- uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs);
- if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
- if ( sendCredit>0 ) {
- QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
- rateFlowcontrol->sentCredit(now, sendCredit);
- if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
- return true;
- } else {
- return !rateFlowcontrol->flowStopped() ;
- }
}
void SessionState::sendAcceptAndCompletion()
@@ -399,17 +323,6 @@ void SessionState::readyToSend() {
QPID_LOG(debug, getId() << ": ready to send, activating output.");
assert(handler);
semanticState.attached();
- if (rateFlowcontrol) {
- qpid::sys::ScopedLock<Mutex> l(rateLock);
- // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
- uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
- QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
- // See comment on getClusterOrderProxy() in .h file
- getClusterOrderProxy().getMessage().setFlowMode("", 0);
- getClusterOrderProxy().getMessage().flow("", 0, credit);
- rateFlowcontrol->sentCredit(AbsTime::now(), credit);
- if (mgmtObject) mgmtObject->inc_clientCredit(credit);
- }
}
Broker& SessionState::getBroker() { return broker; }
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 506af85c47..8db232a2d6 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -61,7 +61,6 @@ class ConnectionState;
class Message;
class SessionHandler;
class SessionManager;
-class RateFlowcontrol;
/**
* Broker-side session state includes session's handler chains, which
@@ -121,8 +120,6 @@ class SessionState : public qpid::SessionState,
boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
SessionAdapter& getSessionAdapter() { return adapter; }
- bool processSendCredit(uint32_t msgs);
-
const SessionId& getSessionId() const { return getId(); }
// Used by ExecutionHandler sync command processing. Notifies
@@ -168,11 +165,6 @@ class SessionState : public qpid::SessionState,
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
- // State used for producer flow control (rate limited)
- qpid::sys::Mutex rateLock;
- boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
- boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
-
// sequence numbers for pending received Execution.Sync commands
std::queue<SequenceNumber> pendingExecutionSyncs;
bool currentCommandComplete;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index f77b6fc748..d81fbd0494 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -138,8 +138,6 @@ void Connection::init() {
else { // Shadow or catch-up connection
// Passive, discard cluster-order frames
connection->setClusterOrderOutput(nullFrameHandler);
- // Disable client throttling, done by active node.
- connection->setClientThrottling(false);
}
if (!isCatchUp())
connection->setErrorListener(this);
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index 872e1e3a70..16ae500140 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -133,7 +133,6 @@ set(unit_tests_to_build
QueueEvents
ProxyTest
RetryList
- RateFlowcontrolTest
FrameDecoder
ReplicationTest
ClientMessageTest
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 2b8890eb26..dcad71fee6 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -116,7 +116,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
QueueEvents.cpp \
ProxyTest.cpp \
RetryList.cpp \
- RateFlowcontrolTest.cpp \
FrameDecoder.cpp \
ReplicationTest.cpp \
ClientMessageTest.cpp \
diff --git a/cpp/src/tests/RateFlowcontrolTest.cpp b/cpp/src/tests/RateFlowcontrolTest.cpp
deleted file mode 100644
index 80ad06af8c..0000000000
--- a/cpp/src/tests/RateFlowcontrolTest.cpp
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "unit_test.h"
-
-#include "qpid/broker/RateFlowcontrol.h"
-#include "qpid/sys/Time.h"
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-
-namespace qpid {
-namespace tests {
-
-QPID_AUTO_TEST_SUITE(RateFlowcontrolTestSuite)
-
-QPID_AUTO_TEST_CASE(RateFlowcontrolTest)
-{
- // BOOST_CHECK(predicate);
- // BOOST_CHECK_EQUAL(a, b);
-
- RateFlowcontrol fc(100);
- AbsTime n=AbsTime::now();
-
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U );
-
- fc.sentCredit(n, 0);
-
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U );
- fc.sentCredit(n, 50);
-
- Duration d=250*TIME_MSEC;
-
- n = AbsTime(n,d);
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 25), 0U );
- BOOST_CHECK_EQUAL( fc.availableCredit(n), 25U );
-
- n = AbsTime(n,d);
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 23), 48U );
- BOOST_CHECK_EQUAL( fc.availableCredit(n), 48U );
- fc.sentCredit(n, 48);
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 50), 0U);
- BOOST_CHECK(fc.flowStopped());
-
- n = AbsTime(n,d);
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 25U);
- n = AbsTime(n,d);
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 50U);
-}
-
-QPID_AUTO_TEST_SUITE_END()
-
-}} // namespace qpid::tests