diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-02-04 22:57:34 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-02-04 22:57:34 +0000 |
commit | f3c08d2ea46e5fbb94e6117f67a874a0e3328fd2 (patch) | |
tree | 94428b4048504851c1867874bd6a533daebddde6 /cpp | |
parent | 3de263a28e2421bb916e1da0cf0f09881801785c (diff) | |
download | qpid-python-f3c08d2ea46e5fbb94e6117f67a874a0e3328fd2.tar.gz |
Fixes to producer flow control to reduce the number
of flow messages sent to the client and to eliminate
a concurrency issue updating the broker flow control
state.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@740933 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/RateFlowcontrol.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/RateFlowcontrolTest.cpp | 14 |
4 files changed, 42 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/RateFlowcontrol.h b/cpp/src/qpid/broker/RateFlowcontrol.h index 3323097eff..99f9d2c0c4 100644 --- a/cpp/src/qpid/broker/RateFlowcontrol.h +++ b/cpp/src/qpid/broker/RateFlowcontrol.h @@ -62,6 +62,7 @@ public: } void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit); uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs); + uint32_t availableCredit(const qpid::sys::AbsTime& t); bool flowStopped() const; }; @@ -79,14 +80,22 @@ inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t cr creditSent = t; } -inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) { - requestedCredit +=msgs; +inline uint32_t RateFlowcontrol::availableCredit(const qpid::sys::AbsTime& t) { qpid::sys::Duration d(creditSent, t); // Could be -ve before first sentCredit int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit)); return toSend > 0 ? toSend : 0; } +inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) { + requestedCredit +=msgs; + // Don't send credit for every message, only send if more than 0.5s since last credit or + // we've got less than .25 of the max left (heuristic) + return requestedCredit*4 >= maxCredit*3 || qpid::sys::Duration(creditSent, t) >= 500*qpid::sys::TIME_MSEC + ? availableCredit(t) + : 0; +} + inline bool RateFlowcontrol::flowStopped() const { return requestedCredit >= maxCredit; } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index ed1deb37fc..82ffede3f9 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -202,19 +202,17 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN struct ScheduledCreditTask : public TimerTask { Timer& timer; SessionState& sessionState; - RateFlowcontrol& flowControl; ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t, - SessionState& s, RateFlowcontrol& f) : + SessionState& s) : TimerTask(d), timer(t), - sessionState(s), - flowControl(f) + sessionState(s) {} void fire() { // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { - if ( !sessionState.processSendCredit(0) && flowControl.flowStopped() ) { + if ( !sessionState.processSendCredit(0) ) { QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); reset(); timer.add(this); @@ -258,26 +256,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) // Handle producer session flow control if (rateFlowcontrol && frame.getBof() && frame.getBos()) { - // Check for violating flow control - if ( rateFlowcontrol->flowStopped() ) { - QPID_LOG(warning, getId() << ": producer throttling violation"); - // TODO: Probably do message.stop("") first time then disconnect - getProxy().getMessage().stop(""); - } else { - if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) { - QPID_LOG(debug, getId() << ": Schedule sending credit"); - Timer& timer = getBroker().getTimer(); - // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms - sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); - flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol); - timer.add(flowControlTimer); - } + if ( !processSendCredit(1) ) { + QPID_LOG(debug, getId() << ": Schedule sending credit"); + Timer& timer = getBroker().getTimer(); + // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms + sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); + flowControlTimer = new ScheduledCreditTask(d, timer, *this); + timer.add(flowControlTimer); } } } bool SessionState::processSendCredit(uint32_t msgs) { + qpid::sys::ScopedLock<Mutex> l(rateLock); + // Check for violating flow control + if ( msgs > 0 && rateFlowcontrol->flowStopped() ) { + QPID_LOG(warning, getId() << ": producer throttling violation"); + // TODO: Probably do message.stop("") first time then disconnect + getProxy().getMessage().stop(""); + return true; + } AbsTime now = AbsTime::now(); uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs); if (mgmtObject) mgmtObject->dec_clientCredit(msgs); @@ -288,7 +287,7 @@ bool SessionState::processSendCredit(uint32_t msgs) if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit); return true; } else { - return false; + return !rateFlowcontrol->flowStopped() ; } } @@ -360,8 +359,9 @@ void SessionState::readyToSend() { tasks.activateOutput(); if (rateFlowcontrol) { - // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth - uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U); + qpid::sys::ScopedLock<Mutex> l(rateLock); + // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth + uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U); QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit); getProxy().getMessage().setFlowMode("", 0); getProxy().getMessage().flow("", 0, credit); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index b12cc15e1f..c435a741f8 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -137,6 +137,7 @@ class SessionState : public qpid::SessionState, qpid::framing::SequenceSet accepted; // State used for producer flow control (rate limited) + qpid::sys::Mutex rateLock; RateFlowcontrol* rateFlowcontrol; boost::intrusive_ptr<TimerTask> flowControlTimer; diff --git a/cpp/src/tests/RateFlowcontrolTest.cpp b/cpp/src/tests/RateFlowcontrolTest.cpp index 3e2e2fa777..b8fda09f61 100644 --- a/cpp/src/tests/RateFlowcontrolTest.cpp +++ b/cpp/src/tests/RateFlowcontrolTest.cpp @@ -42,21 +42,23 @@ QPID_AUTO_TEST_CASE(RateFlowcontrolTest) fc.sentCredit(n, 0); BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U ); - fc.sentCredit(n, 100); + fc.sentCredit(n, 50); Duration d=250*TIME_MSEC; n = AbsTime(n,d); - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 48), 25U ); - fc.sentCredit(n, 25); + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 25), 0U ); + BOOST_CHECK_EQUAL( fc.availableCredit(n), 25U ); n = AbsTime(n,d); - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 23U ); - fc.sentCredit(n, 23); - BOOST_CHECK_EQUAL( fc.receivedMessage(n, 100), 0U); + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 23), 48U ); + BOOST_CHECK_EQUAL( fc.availableCredit(n), 48U ); + fc.sentCredit(n, 48); + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 50), 0U); BOOST_CHECK(fc.flowStopped()); n = AbsTime(n,d); + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 25U); n = AbsTime(n,d); BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 50U); } |