summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp87
1 files changed, 0 insertions, 87 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 1ab17e9893..3371421bf7 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -24,7 +24,6 @@
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
-#include "qpid/broker/RateFlowcontrol.h"
#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQContentBody.h"
@@ -62,17 +61,8 @@ SessionState::SessionState(
adapter(semanticState),
msgBuilder(&broker.getStore()),
mgmtObject(0),
- rateFlowcontrol(0),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
- uint32_t maxRate = broker.getOptions().maxSessionRate;
- if (maxRate) {
- if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
- } else {
- QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
- }
- }
if (!delayManagement) addManagementObject();
attach(h);
}
@@ -88,8 +78,6 @@ void SessionState::addManagementObject() {
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
- if (rateFlowcontrol)
- mgmtObject->set_maxClientRate(rateFlowcontrol->getRate());
agent->addObject(mgmtObject);
}
}
@@ -100,9 +88,6 @@ SessionState::~SessionState() {
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
-
- if (flowControlTimer)
- flowControlTimer->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -221,30 +206,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
}
}
-struct ScheduledCreditTask : public sys::TimerTask {
- sys::Timer& timer;
- SessionState& sessionState;
- ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t,
- SessionState& s) :
- TimerTask(d,"ScheduledCredit"),
- timer(t),
- sessionState(s)
- {}
-
- void fire() {
- // This is the best we can currently do to avoid a destruction/fire race
- sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
- }
-
- void sendCredit() {
- if ( !sessionState.processSendCredit(0) ) {
- QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
- setupNextFire();
- timer.add(this);
- }
- }
-};
-
void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
if (frame.getBof() && frame.getBos()) //start of frameset
@@ -268,43 +229,6 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
}
-
- // Handle producer session flow control
- if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
- if ( !processSendCredit(1) ) {
- QPID_LOG(debug, getId() << ": Schedule sending credit");
- sys::Timer& timer = getBroker().getTimer();
- // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
- sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
- flowControlTimer = new ScheduledCreditTask(d, timer, *this);
- timer.add(flowControlTimer);
- }
- }
-}
-
-bool SessionState::processSendCredit(uint32_t msgs)
-{
- qpid::sys::ScopedLock<Mutex> l(rateLock);
- // Check for violating flow control
- if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
- QPID_LOG(warning, getId() << ": producer throttling violation");
- // TODO: Probably do message.stop("") first time then disconnect
- // See comment on getClusterOrderProxy() in .h file
- getClusterOrderProxy().getMessage().stop("");
- return true;
- }
- AbsTime now = AbsTime::now();
- uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs);
- if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
- if ( sendCredit>0 ) {
- QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
- rateFlowcontrol->sentCredit(now, sendCredit);
- if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
- return true;
- } else {
- return !rateFlowcontrol->flowStopped() ;
- }
}
void SessionState::sendAcceptAndCompletion()
@@ -399,17 +323,6 @@ void SessionState::readyToSend() {
QPID_LOG(debug, getId() << ": ready to send, activating output.");
assert(handler);
semanticState.attached();
- if (rateFlowcontrol) {
- qpid::sys::ScopedLock<Mutex> l(rateLock);
- // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
- uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
- QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
- // See comment on getClusterOrderProxy() in .h file
- getClusterOrderProxy().getMessage().setFlowMode("", 0);
- getClusterOrderProxy().getMessage().flow("", 0, credit);
- rateFlowcontrol->sentCredit(AbsTime::now(), credit);
- if (mgmtObject) mgmtObject->inc_clientCredit(credit);
- }
}
Broker& SessionState::getBroker() { return broker; }