diff options
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RateFlowcontrol.h | 105 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 87 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/tests/RateFlowcontrolTest.cpp | 71 |
11 files changed, 0 insertions, 294 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index a2ffcf8545..89532ae256 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -120,7 +120,6 @@ Broker::Options::Options(const std::string& name) : queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(false), requireEncrypted(false), - maxSessionRate(0), asyncQueueEvents(false), // Must be false in a cluster. qmf2Support(true), qmf1Support(true), @@ -160,7 +159,6 @@ Broker::Options::Options(const std::string& name) : ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") ("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location") - ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") @@ -339,12 +337,6 @@ Broker::Broker(const Broker::Options& conf) : knownBrokers.push_back(Url(conf.knownHosts)); } - // check for and warn if deprecated features have been configured - if (conf.maxSessionRate) { - QPID_LOG(warning, "The 'max-session-rate' feature will be removed in a future release of QPID." - " Queue-based flow control should be used instead."); - } - } catch (const std::exception& /*e*/) { finalize(); throw; diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index b3b751be98..840d47ac38 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -114,7 +114,6 @@ public: bool requireEncrypted; std::string knownHosts; std::string saslConfigPath; - uint32_t maxSessionRate; bool asyncQueueEvents; bool qmf2Support; bool qmf1Support; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 7cd91ae539..6048a46f79 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -46,11 +46,9 @@ const std::string PLAIN = "PLAIN"; const std::string en_US = "en_US"; const std::string QPID_FED_LINK = "qpid.fed_link"; const std::string QPID_FED_TAG = "qpid.federation_tag"; -const std::string SESSION_FLOW_CONTROL("qpid.session_flow"); const std::string CLIENT_PROCESS_NAME("qpid.client_process"); const std::string CLIENT_PID("qpid.client_pid"); const std::string CLIENT_PPID("qpid.client_ppid"); -const int SESSION_FLOW_CONTROL_VER = 1; const std::string SPACE(" "); } @@ -173,9 +171,6 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) } QPID_LOG(info, "Connection is a federation link"); } - if (clientProperties.getAsInt(SESSION_FLOW_CONTROL) == SESSION_FLOW_CONTROL_VER) { - connection.setClientThrottling(); - } if (connection.getMgmtObject() != 0) { string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME); diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 13205e3a3d..4dfd86fd8e 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -48,7 +48,6 @@ class ConnectionState : public ConnectionToken, public management::Manageable heartbeatmax(120), userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering) federationLink(true), - clientSupportsThrottling(false), clusterOrderOut(0), isDefaultRealm(false) {} @@ -92,9 +91,6 @@ class ConnectionState : public ConnectionToken, public management::Manageable return (id == userId || (isDefaultRealm && id == userName)); } - void setClientThrottling(bool set=true) { clientSupportsThrottling = set; } - bool getClientThrottling() const { return clientSupportsThrottling; } - Broker& getBroker() { return broker; } Broker& broker; @@ -128,7 +124,6 @@ class ConnectionState : public ConnectionToken, public management::Manageable bool federationLink; std::string federationPeerTag; std::vector<Url> knownHosts; - bool clientSupportsThrottling; framing::FrameHandler* clusterOrderOut; std::string userName; bool isDefaultRealm; diff --git a/cpp/src/qpid/broker/RateFlowcontrol.h b/cpp/src/qpid/broker/RateFlowcontrol.h deleted file mode 100644 index 99f9d2c0c4..0000000000 --- a/cpp/src/qpid/broker/RateFlowcontrol.h +++ /dev/null @@ -1,105 +0,0 @@ -#ifndef broker_RateFlowcontrol_h -#define broker_RateFlowcontrol_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" -#include "qpid/sys/IntegerTypes.h" - -#include <algorithm> - -namespace qpid { -namespace broker { - -// Class to keep track of issuing flow control to make sure that the peer doesn't exceed -// a given message rate -// -// Create the object with the target rate -// Then call sendCredit() whenever credit is issued to the peer -// Call receivedMessage() whenever a message is received, it returns the credit to issue. -// -// sentCredit() be sensibly called with a 0 parameter to indicate -// that we sent credit but treat it as if the value was 0 (we may do this at the start of the connection -// to allow our peer to send messages) -// -// receivedMessage() can be called with 0 to indicate that we've not received a message, but -// tell me what credit I can send. -class RateFlowcontrol { - uint32_t rate; // messages per second - uint32_t maxCredit; // max credit issued to client (issued at start) - uint32_t requestedCredit; - qpid::sys::AbsTime creditSent; - -public: - RateFlowcontrol(uint32_t r) : - rate(r), - maxCredit(0), - requestedCredit(0), - creditSent(qpid::sys::FAR_FUTURE) - {} - - uint32_t getRate() const { - return rate; - } - void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit); - uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs); - uint32_t availableCredit(const qpid::sys::AbsTime& t); - bool flowStopped() const; -}; - -inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t credit) { - // If the client isn't currently requesting credit (ie it's not sent us anything yet) then - // this credit goes to the max credit held by the client (it can't go to reduce credit - // less than 0) - int32_t nextRequestedCredit = requestedCredit - credit; - if ( nextRequestedCredit<0 ) { - requestedCredit = 0; - maxCredit -= nextRequestedCredit; - } else { - requestedCredit = nextRequestedCredit; - } - creditSent = t; -} - -inline uint32_t RateFlowcontrol::availableCredit(const qpid::sys::AbsTime& t) { - qpid::sys::Duration d(creditSent, t); - // Could be -ve before first sentCredit - int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit)); - return toSend > 0 ? toSend : 0; -} - -inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) { - requestedCredit +=msgs; - // Don't send credit for every message, only send if more than 0.5s since last credit or - // we've got less than .25 of the max left (heuristic) - return requestedCredit*4 >= maxCredit*3 || qpid::sys::Duration(creditSent, t) >= 500*qpid::sys::TIME_MSEC - ? availableCredit(t) - : 0; -} - -inline bool RateFlowcontrol::flowStopped() const { - return requestedCredit >= maxCredit; -} - -}} - -#endif // broker_RateFlowcontrol_h diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 1ab17e9893..3371421bf7 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -24,7 +24,6 @@ #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SessionManager.h" #include "qpid/broker/SessionHandler.h" -#include "qpid/broker/RateFlowcontrol.h" #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Timer.h" #include "qpid/framing/AMQContentBody.h" @@ -62,17 +61,8 @@ SessionState::SessionState( adapter(semanticState), msgBuilder(&broker.getStore()), mgmtObject(0), - rateFlowcontrol(0), asyncCommandCompleter(new AsyncCommandCompleter(this)) { - uint32_t maxRate = broker.getOptions().maxSessionRate; - if (maxRate) { - if (handler->getConnection().getClientThrottling()) { - rateFlowcontrol.reset(new RateFlowcontrol(maxRate)); - } else { - QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); - } - } if (!delayManagement) addManagementObject(); attach(h); } @@ -88,8 +78,6 @@ void SessionState::addManagementObject() { mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); - if (rateFlowcontrol) - mgmtObject->set_maxClientRate(rateFlowcontrol->getRate()); agent->addObject(mgmtObject); } } @@ -100,9 +88,6 @@ SessionState::~SessionState() { semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); - - if (flowControlTimer) - flowControlTimer->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -221,30 +206,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } } -struct ScheduledCreditTask : public sys::TimerTask { - sys::Timer& timer; - SessionState& sessionState; - ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t, - SessionState& s) : - TimerTask(d,"ScheduledCredit"), - timer(t), - sessionState(s) - {} - - void fire() { - // This is the best we can currently do to avoid a destruction/fire race - sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); - } - - void sendCredit() { - if ( !sessionState.processSendCredit(0) ) { - QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); - setupNextFire(); - timer.add(this); - } - } -}; - void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) { if (frame.getBof() && frame.getBos()) //start of frameset @@ -268,43 +229,6 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) IncompleteIngressMsgXfer xfer(this, msg); msg->getIngressCompletion().end(xfer); // allows msg to complete xfer } - - // Handle producer session flow control - if (rateFlowcontrol && frame.getBof() && frame.getBos()) { - if ( !processSendCredit(1) ) { - QPID_LOG(debug, getId() << ": Schedule sending credit"); - sys::Timer& timer = getBroker().getTimer(); - // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms - sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); - flowControlTimer = new ScheduledCreditTask(d, timer, *this); - timer.add(flowControlTimer); - } - } -} - -bool SessionState::processSendCredit(uint32_t msgs) -{ - qpid::sys::ScopedLock<Mutex> l(rateLock); - // Check for violating flow control - if ( msgs > 0 && rateFlowcontrol->flowStopped() ) { - QPID_LOG(warning, getId() << ": producer throttling violation"); - // TODO: Probably do message.stop("") first time then disconnect - // See comment on getClusterOrderProxy() in .h file - getClusterOrderProxy().getMessage().stop(""); - return true; - } - AbsTime now = AbsTime::now(); - uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs); - if (mgmtObject) mgmtObject->dec_clientCredit(msgs); - if ( sendCredit>0 ) { - QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); - getClusterOrderProxy().getMessage().flow("", 0, sendCredit); - rateFlowcontrol->sentCredit(now, sendCredit); - if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit); - return true; - } else { - return !rateFlowcontrol->flowStopped() ; - } } void SessionState::sendAcceptAndCompletion() @@ -399,17 +323,6 @@ void SessionState::readyToSend() { QPID_LOG(debug, getId() << ": ready to send, activating output."); assert(handler); semanticState.attached(); - if (rateFlowcontrol) { - qpid::sys::ScopedLock<Mutex> l(rateLock); - // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth - uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U); - QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit); - // See comment on getClusterOrderProxy() in .h file - getClusterOrderProxy().getMessage().setFlowMode("", 0); - getClusterOrderProxy().getMessage().flow("", 0, credit); - rateFlowcontrol->sentCredit(AbsTime::now(), credit); - if (mgmtObject) mgmtObject->inc_clientCredit(credit); - } } Broker& SessionState::getBroker() { return broker; } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 506af85c47..8db232a2d6 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -61,7 +61,6 @@ class ConnectionState; class Message; class SessionHandler; class SessionManager; -class RateFlowcontrol; /** * Broker-side session state includes session's handler chains, which @@ -121,8 +120,6 @@ class SessionState : public qpid::SessionState, boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); } SessionAdapter& getSessionAdapter() { return adapter; } - bool processSendCredit(uint32_t msgs); - const SessionId& getSessionId() const { return getId(); } // Used by ExecutionHandler sync command processing. Notifies @@ -168,11 +165,6 @@ class SessionState : public qpid::SessionState, qmf::org::apache::qpid::broker::Session* mgmtObject; qpid::framing::SequenceSet accepted; - // State used for producer flow control (rate limited) - qpid::sys::Mutex rateLock; - boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; - boost::intrusive_ptr<sys::TimerTask> flowControlTimer; - // sequence numbers for pending received Execution.Sync commands std::queue<SequenceNumber> pendingExecutionSyncs; bool currentCommandComplete; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index f77b6fc748..d81fbd0494 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -138,8 +138,6 @@ void Connection::init() { else { // Shadow or catch-up connection // Passive, discard cluster-order frames connection->setClusterOrderOutput(nullFrameHandler); - // Disable client throttling, done by active node. - connection->setClientThrottling(false); } if (!isCatchUp()) connection->setErrorListener(this); diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 872e1e3a70..16ae500140 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -133,7 +133,6 @@ set(unit_tests_to_build QueueEvents ProxyTest RetryList - RateFlowcontrolTest FrameDecoder ReplicationTest ClientMessageTest diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 2b8890eb26..dcad71fee6 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -116,7 +116,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ QueueEvents.cpp \ ProxyTest.cpp \ RetryList.cpp \ - RateFlowcontrolTest.cpp \ FrameDecoder.cpp \ ReplicationTest.cpp \ ClientMessageTest.cpp \ diff --git a/cpp/src/tests/RateFlowcontrolTest.cpp b/cpp/src/tests/RateFlowcontrolTest.cpp deleted file mode 100644 index 80ad06af8c..0000000000 --- a/cpp/src/tests/RateFlowcontrolTest.cpp +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "unit_test.h" - -#include "qpid/broker/RateFlowcontrol.h" -#include "qpid/sys/Time.h" - -using namespace qpid::broker; -using namespace qpid::sys; - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(RateFlowcontrolTestSuite) - -QPID_AUTO_TEST_CASE(RateFlowcontrolTest) -{ - // BOOST_CHECK(predicate); - // BOOST_CHECK_EQUAL(a, b); - - RateFlowcontrol fc(100); - AbsTime n=AbsTime::now(); - - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U ); - - fc.sentCredit(n, 0); - - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U ); - fc.sentCredit(n, 50); - - Duration d=250*TIME_MSEC; - - n = AbsTime(n,d); - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 25), 0U ); - BOOST_CHECK_EQUAL( fc.availableCredit(n), 25U ); - - n = AbsTime(n,d); - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 23), 48U ); - BOOST_CHECK_EQUAL( fc.availableCredit(n), 48U ); - fc.sentCredit(n, 48); - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 50), 0U); - BOOST_CHECK(fc.flowStopped()); - - n = AbsTime(n,d); - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 25U); - n = AbsTime(n,d); - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 50U); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests |