summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp44
1 files changed, 22 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index ed1deb37fc..82ffede3f9 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -202,19 +202,17 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
struct ScheduledCreditTask : public TimerTask {
Timer& timer;
SessionState& sessionState;
- RateFlowcontrol& flowControl;
ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
- SessionState& s, RateFlowcontrol& f) :
+ SessionState& s) :
TimerTask(d),
timer(t),
- sessionState(s),
- flowControl(f)
+ sessionState(s)
{}
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
- if ( !sessionState.processSendCredit(0) && flowControl.flowStopped() ) {
+ if ( !sessionState.processSendCredit(0) ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
reset();
timer.add(this);
@@ -258,26 +256,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
// Handle producer session flow control
if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
- // Check for violating flow control
- if ( rateFlowcontrol->flowStopped() ) {
- QPID_LOG(warning, getId() << ": producer throttling violation");
- // TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
- } else {
- if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) {
- QPID_LOG(debug, getId() << ": Schedule sending credit");
- Timer& timer = getBroker().getTimer();
- // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
- sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
- flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol);
- timer.add(flowControlTimer);
- }
+ if ( !processSendCredit(1) ) {
+ QPID_LOG(debug, getId() << ": Schedule sending credit");
+ Timer& timer = getBroker().getTimer();
+ // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
+ sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
+ flowControlTimer = new ScheduledCreditTask(d, timer, *this);
+ timer.add(flowControlTimer);
}
}
}
bool SessionState::processSendCredit(uint32_t msgs)
{
+ qpid::sys::ScopedLock<Mutex> l(rateLock);
+ // Check for violating flow control
+ if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
+ QPID_LOG(warning, getId() << ": producer throttling violation");
+ // TODO: Probably do message.stop("") first time then disconnect
+ getProxy().getMessage().stop("");
+ return true;
+ }
AbsTime now = AbsTime::now();
uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs);
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
@@ -288,7 +287,7 @@ bool SessionState::processSendCredit(uint32_t msgs)
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
} else {
- return false;
+ return !rateFlowcontrol->flowStopped() ;
}
}
@@ -360,8 +359,9 @@ void SessionState::readyToSend() {
tasks.activateOutput();
if (rateFlowcontrol) {
- // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth
- uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U);
+ qpid::sys::ScopedLock<Mutex> l(rateLock);
+ // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
+ uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
getProxy().getMessage().setFlowMode("", 0);
getProxy().getMessage().flow("", 0, credit);