diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 44 |
1 files changed, 22 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index ed1deb37fc..82ffede3f9 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -202,19 +202,17 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN struct ScheduledCreditTask : public TimerTask { Timer& timer; SessionState& sessionState; - RateFlowcontrol& flowControl; ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t, - SessionState& s, RateFlowcontrol& f) : + SessionState& s) : TimerTask(d), timer(t), - sessionState(s), - flowControl(f) + sessionState(s) {} void fire() { // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { - if ( !sessionState.processSendCredit(0) && flowControl.flowStopped() ) { + if ( !sessionState.processSendCredit(0) ) { QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); reset(); timer.add(this); @@ -258,26 +256,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) // Handle producer session flow control if (rateFlowcontrol && frame.getBof() && frame.getBos()) { - // Check for violating flow control - if ( rateFlowcontrol->flowStopped() ) { - QPID_LOG(warning, getId() << ": producer throttling violation"); - // TODO: Probably do message.stop("") first time then disconnect - getProxy().getMessage().stop(""); - } else { - if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) { - QPID_LOG(debug, getId() << ": Schedule sending credit"); - Timer& timer = getBroker().getTimer(); - // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms - sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); - flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol); - timer.add(flowControlTimer); - } + if ( !processSendCredit(1) ) { + QPID_LOG(debug, getId() << ": Schedule sending credit"); + Timer& timer = getBroker().getTimer(); + // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms + sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); + flowControlTimer = new ScheduledCreditTask(d, timer, *this); + timer.add(flowControlTimer); } } } bool SessionState::processSendCredit(uint32_t msgs) { + qpid::sys::ScopedLock<Mutex> l(rateLock); + // Check for violating flow control + if ( msgs > 0 && rateFlowcontrol->flowStopped() ) { + QPID_LOG(warning, getId() << ": producer throttling violation"); + // TODO: Probably do message.stop("") first time then disconnect + getProxy().getMessage().stop(""); + return true; + } AbsTime now = AbsTime::now(); uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs); if (mgmtObject) mgmtObject->dec_clientCredit(msgs); @@ -288,7 +287,7 @@ bool SessionState::processSendCredit(uint32_t msgs) if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit); return true; } else { - return false; + return !rateFlowcontrol->flowStopped() ; } } @@ -360,8 +359,9 @@ void SessionState::readyToSend() { tasks.activateOutput(); if (rateFlowcontrol) { - // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth - uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U); + qpid::sys::ScopedLock<Mutex> l(rateLock); + // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth + uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U); QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit); getProxy().getMessage().setFlowMode("", 0); getProxy().getMessage().flow("", 0, credit); |