diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 60 |
1 files changed, 33 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 9088be2e54..ed1deb37fc 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -63,6 +63,14 @@ SessionState::SessionState( mgmtObject(0), rateFlowcontrol(0) { + uint32_t maxRate = broker.getOptions().maxSessionRate; + if (maxRate) { + if (handler->getConnection().getClientThrottling()) { + rateFlowcontrol = new RateFlowcontrol(maxRate); + } else { + QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); + } + } Manageable* parent = broker.GetVhostObject (); if (parent != 0) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); @@ -72,25 +80,18 @@ SessionState::SessionState( mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); + if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate); ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); } } - uint32_t maxRate = broker.getOptions().maxSessionRate; - if (maxRate) { - if (handler->getConnection().getClientThrottling()) { - rateFlowcontrol = new RateFlowcontrol(maxRate); - } else { - QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); - } - } attach(h); } SessionState::~SessionState() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); - + if (flowControlTimer) flowControlTimer->cancel(); } @@ -213,14 +214,7 @@ struct ScheduledCreditTask : public TimerTask { void fire() { // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { - // Send credit - AbsTime now = AbsTime::now(); - uint32_t sendCredit = flowControl.receivedMessage(now, 0); - if ( sendCredit>0 ) { - QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit); - sessionState.getProxy().getMessage().flow("", 0, sendCredit); - flowControl.sentCredit(now, sendCredit); - } else if ( flowControl.flowStopped() ) { + if ( !sessionState.processSendCredit(0) && flowControl.flowStopped() ) { QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); reset(); timer.add(this); @@ -270,13 +264,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) // TODO: Probably do message.stop("") first time then disconnect getProxy().getMessage().stop(""); } else { - AbsTime now = AbsTime::now(); - uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1); - if ( sendCredit>0 ) { - QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); - getProxy().getMessage().flow("", 0, sendCredit); - rateFlowcontrol->sentCredit(now, sendCredit); - } else if ( rateFlowcontrol->flowStopped() ) { + if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) { QPID_LOG(debug, getId() << ": Schedule sending credit"); Timer& timer = getBroker().getTimer(); // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms @@ -288,6 +276,22 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) } } +bool SessionState::processSendCredit(uint32_t msgs) +{ + AbsTime now = AbsTime::now(); + uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs); + if (mgmtObject) mgmtObject->dec_clientCredit(msgs); + if ( sendCredit>0 ) { + QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); + getProxy().getMessage().flow("", 0, sendCredit); + rateFlowcontrol->sentCredit(now, sendCredit); + if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit); + return true; + } else { + return false; + } +} + void SessionState::sendAcceptAndCompletion() { if (!accepted.empty()) { @@ -357,10 +361,12 @@ void SessionState::readyToSend() { if (rateFlowcontrol) { // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth - QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U)); + uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U); + QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit); getProxy().getMessage().setFlowMode("", 0); - getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U)); - rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U)); + getProxy().getMessage().flow("", 0, credit); + rateFlowcontrol->sentCredit(AbsTime::now(), credit); + if (mgmtObject) mgmtObject->inc_clientCredit(credit); } } |