summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp17
1 files changed, 11 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index dffc7cf6af..b64fc20787 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -66,7 +66,7 @@ SessionState::SessionState(
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
} else {
QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
}
@@ -210,7 +210,6 @@ struct ScheduledCreditTask : public TimerTask {
{}
void fire() {
- QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
if ( !sessionState.processSendCredit(0) ) {
@@ -275,7 +274,8 @@ bool SessionState::processSendCredit(uint32_t msgs)
if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
QPID_LOG(warning, getId() << ": producer throttling violation");
// TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().stop("");
return true;
}
AbsTime now = AbsTime::now();
@@ -283,7 +283,7 @@ bool SessionState::processSendCredit(uint32_t msgs)
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
if ( sendCredit>0 ) {
QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
+ getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
rateFlowcontrol->sentCredit(now, sendCredit);
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
@@ -364,8 +364,9 @@ void SessionState::readyToSend() {
// Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
- getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, credit);
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().setFlowMode("", 0);
+ getClusterOrderProxy().getMessage().flow("", 0, credit);
rateFlowcontrol->sentCredit(AbsTime::now(), credit);
if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
@@ -373,4 +374,8 @@ void SessionState::readyToSend() {
Broker& SessionState::getBroker() { return broker; }
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+ return handler->getClusterOrderProxy();
+}
+
}} // namespace qpid::broker