summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp31
1 files changed, 20 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index dffc7cf6af..7e5f605753 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -66,7 +66,7 @@ SessionState::SessionState(
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
} else {
QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
}
@@ -210,14 +210,17 @@ struct ScheduledCreditTask : public TimerTask {
{}
void fire() {
- QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
- if ( !sessionState.processSendCredit(0) ) {
- QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
- reset();
- timer.add(this);
- }
+ sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
+ }
+ }
+
+ void sendCredit() {
+ if ( !sessionState.processSendCredit(0) ) {
+ QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
+ reset();
+ timer.add(this);
}
}
};
@@ -275,7 +278,8 @@ bool SessionState::processSendCredit(uint32_t msgs)
if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
QPID_LOG(warning, getId() << ": producer throttling violation");
// TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().stop("");
return true;
}
AbsTime now = AbsTime::now();
@@ -283,7 +287,7 @@ bool SessionState::processSendCredit(uint32_t msgs)
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
if ( sendCredit>0 ) {
QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
+ getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
rateFlowcontrol->sentCredit(now, sendCredit);
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
@@ -364,8 +368,9 @@ void SessionState::readyToSend() {
// Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
- getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, credit);
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().setFlowMode("", 0);
+ getClusterOrderProxy().getMessage().flow("", 0, credit);
rateFlowcontrol->sentCredit(AbsTime::now(), credit);
if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
@@ -373,4 +378,8 @@ void SessionState::readyToSend() {
Broker& SessionState::getBroker() { return broker; }
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+ return handler->getClusterOrderProxy();
+}
+
}} // namespace qpid::broker