summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-02-04 22:57:34 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-02-04 22:57:34 +0000
commitf3c08d2ea46e5fbb94e6117f67a874a0e3328fd2 (patch)
tree94428b4048504851c1867874bd6a533daebddde6 /cpp
parent3de263a28e2421bb916e1da0cf0f09881801785c (diff)
downloadqpid-python-f3c08d2ea46e5fbb94e6117f67a874a0e3328fd2.tar.gz
Fixes to producer flow control to reduce the number
of flow messages sent to the client and to eliminate a concurrency issue updating the broker flow control state. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@740933 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/RateFlowcontrol.h13
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp44
-rw-r--r--cpp/src/qpid/broker/SessionState.h1
-rw-r--r--cpp/src/tests/RateFlowcontrolTest.cpp14
4 files changed, 42 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/RateFlowcontrol.h b/cpp/src/qpid/broker/RateFlowcontrol.h
index 3323097eff..99f9d2c0c4 100644
--- a/cpp/src/qpid/broker/RateFlowcontrol.h
+++ b/cpp/src/qpid/broker/RateFlowcontrol.h
@@ -62,6 +62,7 @@ public:
}
void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit);
uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs);
+ uint32_t availableCredit(const qpid::sys::AbsTime& t);
bool flowStopped() const;
};
@@ -79,14 +80,22 @@ inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t cr
creditSent = t;
}
-inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) {
- requestedCredit +=msgs;
+inline uint32_t RateFlowcontrol::availableCredit(const qpid::sys::AbsTime& t) {
qpid::sys::Duration d(creditSent, t);
// Could be -ve before first sentCredit
int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit));
return toSend > 0 ? toSend : 0;
}
+inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) {
+ requestedCredit +=msgs;
+ // Don't send credit for every message, only send if more than 0.5s since last credit or
+ // we've got less than .25 of the max left (heuristic)
+ return requestedCredit*4 >= maxCredit*3 || qpid::sys::Duration(creditSent, t) >= 500*qpid::sys::TIME_MSEC
+ ? availableCredit(t)
+ : 0;
+}
+
inline bool RateFlowcontrol::flowStopped() const {
return requestedCredit >= maxCredit;
}
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index ed1deb37fc..82ffede3f9 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -202,19 +202,17 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
struct ScheduledCreditTask : public TimerTask {
Timer& timer;
SessionState& sessionState;
- RateFlowcontrol& flowControl;
ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
- SessionState& s, RateFlowcontrol& f) :
+ SessionState& s) :
TimerTask(d),
timer(t),
- sessionState(s),
- flowControl(f)
+ sessionState(s)
{}
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
- if ( !sessionState.processSendCredit(0) && flowControl.flowStopped() ) {
+ if ( !sessionState.processSendCredit(0) ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
reset();
timer.add(this);
@@ -258,26 +256,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
// Handle producer session flow control
if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
- // Check for violating flow control
- if ( rateFlowcontrol->flowStopped() ) {
- QPID_LOG(warning, getId() << ": producer throttling violation");
- // TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
- } else {
- if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) {
- QPID_LOG(debug, getId() << ": Schedule sending credit");
- Timer& timer = getBroker().getTimer();
- // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
- sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
- flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol);
- timer.add(flowControlTimer);
- }
+ if ( !processSendCredit(1) ) {
+ QPID_LOG(debug, getId() << ": Schedule sending credit");
+ Timer& timer = getBroker().getTimer();
+ // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
+ sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
+ flowControlTimer = new ScheduledCreditTask(d, timer, *this);
+ timer.add(flowControlTimer);
}
}
}
bool SessionState::processSendCredit(uint32_t msgs)
{
+ qpid::sys::ScopedLock<Mutex> l(rateLock);
+ // Check for violating flow control
+ if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
+ QPID_LOG(warning, getId() << ": producer throttling violation");
+ // TODO: Probably do message.stop("") first time then disconnect
+ getProxy().getMessage().stop("");
+ return true;
+ }
AbsTime now = AbsTime::now();
uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs);
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
@@ -288,7 +287,7 @@ bool SessionState::processSendCredit(uint32_t msgs)
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
} else {
- return false;
+ return !rateFlowcontrol->flowStopped() ;
}
}
@@ -360,8 +359,9 @@ void SessionState::readyToSend() {
tasks.activateOutput();
if (rateFlowcontrol) {
- // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth
- uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U);
+ qpid::sys::ScopedLock<Mutex> l(rateLock);
+ // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
+ uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
getProxy().getMessage().setFlowMode("", 0);
getProxy().getMessage().flow("", 0, credit);
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index b12cc15e1f..c435a741f8 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -137,6 +137,7 @@ class SessionState : public qpid::SessionState,
qpid::framing::SequenceSet accepted;
// State used for producer flow control (rate limited)
+ qpid::sys::Mutex rateLock;
RateFlowcontrol* rateFlowcontrol;
boost::intrusive_ptr<TimerTask> flowControlTimer;
diff --git a/cpp/src/tests/RateFlowcontrolTest.cpp b/cpp/src/tests/RateFlowcontrolTest.cpp
index 3e2e2fa777..b8fda09f61 100644
--- a/cpp/src/tests/RateFlowcontrolTest.cpp
+++ b/cpp/src/tests/RateFlowcontrolTest.cpp
@@ -42,21 +42,23 @@ QPID_AUTO_TEST_CASE(RateFlowcontrolTest)
fc.sentCredit(n, 0);
BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U );
- fc.sentCredit(n, 100);
+ fc.sentCredit(n, 50);
Duration d=250*TIME_MSEC;
n = AbsTime(n,d);
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 48), 25U );
- fc.sentCredit(n, 25);
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 25), 0U );
+ BOOST_CHECK_EQUAL( fc.availableCredit(n), 25U );
n = AbsTime(n,d);
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 23U );
- fc.sentCredit(n, 23);
- BOOST_CHECK_EQUAL( fc.receivedMessage(n, 100), 0U);
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 23), 48U );
+ BOOST_CHECK_EQUAL( fc.availableCredit(n), 48U );
+ fc.sentCredit(n, 48);
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 50), 0U);
BOOST_CHECK(fc.flowStopped());
n = AbsTime(n,d);
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 25U);
n = AbsTime(n,d);
BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 50U);
}