diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 87 |
1 files changed, 0 insertions, 87 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 1ab17e9893..3371421bf7 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -24,7 +24,6 @@ #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SessionManager.h" #include "qpid/broker/SessionHandler.h" -#include "qpid/broker/RateFlowcontrol.h" #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Timer.h" #include "qpid/framing/AMQContentBody.h" @@ -62,17 +61,8 @@ SessionState::SessionState( adapter(semanticState), msgBuilder(&broker.getStore()), mgmtObject(0), - rateFlowcontrol(0), asyncCommandCompleter(new AsyncCommandCompleter(this)) { - uint32_t maxRate = broker.getOptions().maxSessionRate; - if (maxRate) { - if (handler->getConnection().getClientThrottling()) { - rateFlowcontrol.reset(new RateFlowcontrol(maxRate)); - } else { - QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); - } - } if (!delayManagement) addManagementObject(); attach(h); } @@ -88,8 +78,6 @@ void SessionState::addManagementObject() { mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); - if (rateFlowcontrol) - mgmtObject->set_maxClientRate(rateFlowcontrol->getRate()); agent->addObject(mgmtObject); } } @@ -100,9 +88,6 @@ SessionState::~SessionState() { semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); - - if (flowControlTimer) - flowControlTimer->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -221,30 +206,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } } -struct ScheduledCreditTask : public sys::TimerTask { - sys::Timer& timer; - SessionState& sessionState; - ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t, - SessionState& s) : - TimerTask(d,"ScheduledCredit"), - timer(t), - sessionState(s) - {} - - void fire() { - // This is the best we can currently do to avoid a destruction/fire race - sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); - } - - void sendCredit() { - if ( !sessionState.processSendCredit(0) ) { - QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); - setupNextFire(); - timer.add(this); - } - } -}; - void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) { if (frame.getBof() && frame.getBos()) //start of frameset @@ -268,43 +229,6 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) IncompleteIngressMsgXfer xfer(this, msg); msg->getIngressCompletion().end(xfer); // allows msg to complete xfer } - - // Handle producer session flow control - if (rateFlowcontrol && frame.getBof() && frame.getBos()) { - if ( !processSendCredit(1) ) { - QPID_LOG(debug, getId() << ": Schedule sending credit"); - sys::Timer& timer = getBroker().getTimer(); - // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms - sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); - flowControlTimer = new ScheduledCreditTask(d, timer, *this); - timer.add(flowControlTimer); - } - } -} - -bool SessionState::processSendCredit(uint32_t msgs) -{ - qpid::sys::ScopedLock<Mutex> l(rateLock); - // Check for violating flow control - if ( msgs > 0 && rateFlowcontrol->flowStopped() ) { - QPID_LOG(warning, getId() << ": producer throttling violation"); - // TODO: Probably do message.stop("") first time then disconnect - // See comment on getClusterOrderProxy() in .h file - getClusterOrderProxy().getMessage().stop(""); - return true; - } - AbsTime now = AbsTime::now(); - uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs); - if (mgmtObject) mgmtObject->dec_clientCredit(msgs); - if ( sendCredit>0 ) { - QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); - getClusterOrderProxy().getMessage().flow("", 0, sendCredit); - rateFlowcontrol->sentCredit(now, sendCredit); - if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit); - return true; - } else { - return !rateFlowcontrol->flowStopped() ; - } } void SessionState::sendAcceptAndCompletion() @@ -399,17 +323,6 @@ void SessionState::readyToSend() { QPID_LOG(debug, getId() << ": ready to send, activating output."); assert(handler); semanticState.attached(); - if (rateFlowcontrol) { - qpid::sys::ScopedLock<Mutex> l(rateLock); - // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth - uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U); - QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit); - // See comment on getClusterOrderProxy() in .h file - getClusterOrderProxy().getMessage().setFlowMode("", 0); - getClusterOrderProxy().getMessage().flow("", 0, credit); - rateFlowcontrol->sentCredit(AbsTime::now(), credit); - if (mgmtObject) mgmtObject->inc_clientCredit(credit); - } } Broker& SessionState::getBroker() { return broker; } |