summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp60
1 files changed, 33 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 9088be2e54..ed1deb37fc 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -63,6 +63,14 @@ SessionState::SessionState(
mgmtObject(0),
rateFlowcontrol(0)
{
+ uint32_t maxRate = broker.getOptions().maxSessionRate;
+ if (maxRate) {
+ if (handler->getConnection().getClientThrottling()) {
+ rateFlowcontrol = new RateFlowcontrol(maxRate);
+ } else {
+ QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
+ }
+ }
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
@@ -72,25 +80,18 @@ SessionState::SessionState(
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
+ if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate);
ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
}
}
- uint32_t maxRate = broker.getOptions().maxSessionRate;
- if (maxRate) {
- if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
- } else {
- QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
- }
- }
attach(h);
}
SessionState::~SessionState() {
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
-
+
if (flowControlTimer)
flowControlTimer->cancel();
}
@@ -213,14 +214,7 @@ struct ScheduledCreditTask : public TimerTask {
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
- // Send credit
- AbsTime now = AbsTime::now();
- uint32_t sendCredit = flowControl.receivedMessage(now, 0);
- if ( sendCredit>0 ) {
- QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit);
- sessionState.getProxy().getMessage().flow("", 0, sendCredit);
- flowControl.sentCredit(now, sendCredit);
- } else if ( flowControl.flowStopped() ) {
+ if ( !sessionState.processSendCredit(0) && flowControl.flowStopped() ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
reset();
timer.add(this);
@@ -270,13 +264,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
// TODO: Probably do message.stop("") first time then disconnect
getProxy().getMessage().stop("");
} else {
- AbsTime now = AbsTime::now();
- uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1);
- if ( sendCredit>0 ) {
- QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
- rateFlowcontrol->sentCredit(now, sendCredit);
- } else if ( rateFlowcontrol->flowStopped() ) {
+ if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) {
QPID_LOG(debug, getId() << ": Schedule sending credit");
Timer& timer = getBroker().getTimer();
// Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
@@ -288,6 +276,22 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
}
}
+bool SessionState::processSendCredit(uint32_t msgs)
+{
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs);
+ if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
+ getProxy().getMessage().flow("", 0, sendCredit);
+ rateFlowcontrol->sentCredit(now, sendCredit);
+ if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
+ return true;
+ } else {
+ return false;
+ }
+}
+
void SessionState::sendAcceptAndCompletion()
{
if (!accepted.empty()) {
@@ -357,10 +361,12 @@ void SessionState::readyToSend() {
if (rateFlowcontrol) {
// Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth
- QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U));
+ uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U);
+ QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U));
- rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U));
+ getProxy().getMessage().flow("", 0, credit);
+ rateFlowcontrol->sentCredit(AbsTime::now(), credit);
+ if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
}