diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-01-27 21:17:47 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-27 21:17:47 +0000 |
commit | d37870c56f30099fccf8e876ff7ea70829a1458a (patch) | |
tree | 76231e78fd79ee7385139bf1c30c096e4a9ab7b7 /qpid | |
parent | e499bb4d3bcff953fc8dc79cf16a325fe5e25160 (diff) | |
download | qpid-python-d37870c56f30099fccf8e876ff7ea70829a1458a.tar.gz |
Producer side rate throttling:
This uses the Message.Flow command to send credit from
broker to client to ensure that the client doesn't
exceed a rate configured on the broker per session.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@738247 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/RateFlowcontrol.h | 96 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 110 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 101 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.h | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Semaphore.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/RateFlowcontrolTest.cpp | 64 |
11 files changed, 388 insertions, 36 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index e4e9a98aa7..e07267da37 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -539,6 +539,7 @@ nobase_include_HEADERS = \ qpid/broker/QueuedMessage.h \ qpid/broker/QueuePolicy.h \ qpid/broker/QueueRegistry.h \ + qpid/broker/RateFlowcontrol.h \ qpid/broker/RateTracker.h \ qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index e4c5c9b5e9..f692ff72f3 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -90,7 +90,8 @@ Broker::Options::Options(const std::string& name) : replayHardLimit(0), queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(false), - requireEncrypted(false) + requireEncrypted(false), + maxSessionRate(0) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -119,7 +120,8 @@ Broker::Options::Options(const std::string& name) : ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") - ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)"); + ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") + ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)"); } const std::string empty; diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index d97737c707..c50ef46baa 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -105,6 +105,7 @@ class Broker : public sys::Runnable, public Plugin::Target, bool tcpNoDelay; bool requireEncrypted; std::string knownHosts; + uint32_t maxSessionRate; }; private: diff --git a/qpid/cpp/src/qpid/broker/RateFlowcontrol.h b/qpid/cpp/src/qpid/broker/RateFlowcontrol.h new file mode 100644 index 0000000000..3323097eff --- /dev/null +++ b/qpid/cpp/src/qpid/broker/RateFlowcontrol.h @@ -0,0 +1,96 @@ +#ifndef broker_RateFlowcontrol_h +#define broker_RateFlowcontrol_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" +#include "qpid/sys/IntegerTypes.h" + +#include <algorithm> + +namespace qpid { +namespace broker { + +// Class to keep track of issuing flow control to make sure that the peer doesn't exceed +// a given message rate +// +// Create the object with the target rate +// Then call sendCredit() whenever credit is issued to the peer +// Call receivedMessage() whenever a message is received, it returns the credit to issue. +// +// sentCredit() be sensibly called with a 0 parameter to indicate +// that we sent credit but treat it as if the value was 0 (we may do this at the start of the connection +// to allow our peer to send messages) +// +// receivedMessage() can be called with 0 to indicate that we've not received a message, but +// tell me what credit I can send. +class RateFlowcontrol { + uint32_t rate; // messages per second + uint32_t maxCredit; // max credit issued to client (issued at start) + uint32_t requestedCredit; + qpid::sys::AbsTime creditSent; + +public: + RateFlowcontrol(uint32_t r) : + rate(r), + maxCredit(0), + requestedCredit(0), + creditSent(qpid::sys::FAR_FUTURE) + {} + + uint32_t getRate() const { + return rate; + } + void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit); + uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs); + bool flowStopped() const; +}; + +inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t credit) { + // If the client isn't currently requesting credit (ie it's not sent us anything yet) then + // this credit goes to the max credit held by the client (it can't go to reduce credit + // less than 0) + int32_t nextRequestedCredit = requestedCredit - credit; + if ( nextRequestedCredit<0 ) { + requestedCredit = 0; + maxCredit -= nextRequestedCredit; + } else { + requestedCredit = nextRequestedCredit; + } + creditSent = t; +} + +inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) { + requestedCredit +=msgs; + qpid::sys::Duration d(creditSent, t); + // Could be -ve before first sentCredit + int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit)); + return toSend > 0 ? toSend : 0; +} + +inline bool RateFlowcontrol::flowStopped() const { + return requestedCredit >= maxCredit; +} + +}} + +#endif // broker_RateFlowcontrol_h diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 0a24a39d38..5039b31874 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,8 @@ #include "DeliveryRecord.h" #include "SessionManager.h" #include "SessionHandler.h" +#include "RateFlowcontrol.h" +#include "Timer.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQMethodBody.h" @@ -31,6 +33,7 @@ #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementBroker.h" +#include "qpid/framing/AMQP_ClientProxy.h" #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> @@ -46,17 +49,19 @@ using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +using qpid::sys::AbsTime; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( - Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) + Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) : qpid::SessionState(id, config), broker(b), handler(&h), semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), - mgmtObject(0) + mgmtObject(0), + rateFlowcontrol(0) { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { @@ -71,12 +76,19 @@ SessionState::SessionState( agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); } } + uint32_t maxRate = broker.getOptions().maxSessionRate; + if (maxRate) { + rateFlowcontrol = new RateFlowcontrol(maxRate); + } attach(h); } SessionState::~SessionState() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); + + if (flowControlTimer) + flowControlTimer->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -102,7 +114,7 @@ void SessionState::detach() { mgmtObject->set_attached (0); } -void SessionState::disableOutput() +void SessionState::disableOutput() { semanticState.detached();//prevents further activateOutput calls until reattached getConnection().outputTasks.removeOutputTask(&semanticState); @@ -120,12 +132,12 @@ void SessionState::attach(SessionHandler& h) { } void SessionState::activateOutput() { - if (isAttached()) + if (isAttached()) getConnection().outputTasks.activateOutput(); } void SessionState::giveReadCredit(int32_t credit) { - if (isAttached()) + if (isAttached()) getConnection().outputTasks.giveReadCredit(credit); } @@ -170,18 +182,49 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { Invoker::Result invocation = invoke(adapter, *method); - receiverCompleted(id); + receiverCompleted(id); if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync()) { + if (method->isSync()) { incomplete.process(enqueuedOp, true); sendAcceptAndCompletion(); } } +struct ScheduledCreditTask : public TimerTask { + Timer& timer; + SessionState& sessionState; + RateFlowcontrol& flowControl; + ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t, + SessionState& s, RateFlowcontrol& f) : + TimerTask(d), + timer(t), + sessionState(s), + flowControl(f) + {} + + void fire() { + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // Send credit + AbsTime now = AbsTime::now(); + uint32_t sendCredit = flowControl.receivedMessage(now, 0); + if ( sendCredit>0 ) { + QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit); + sessionState.getProxy().getMessage().flow("", 0, sendCredit); + flowControl.sentCredit(now, sendCredit); + } else if ( flowControl.flowStopped() ) { + QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); + reset(); + timer.add(this); + } + } + } +}; + void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) { if (frame.getBof() && frame.getBos()) //start of frameset @@ -194,10 +237,10 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) AMQFrame header((AMQHeaderBody())); header.setBof(false); header.setEof(false); - msg->getFrames().append(header); + msg->getFrames().append(header); } msg->setPublisher(&getConnection()); - semanticState.handle(msg); + semanticState.handle(msg); msgBuilder.end(); if (msg->isEnqueueComplete()) { @@ -206,14 +249,39 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) incomplete.add(msg); } - //hold up execution until async enqueue is complete - if (msg->getFrames().getMethod()->isSync()) { + //hold up execution until async enqueue is complete + if (msg->getFrames().getMethod()->isSync()) { incomplete.process(enqueuedOp, true); sendAcceptAndCompletion(); } else { incomplete.process(enqueuedOp, false); } } + + // Handle producer session flow control + if (rateFlowcontrol && frame.getBof() && frame.getBos()) { + // Check for violating flow control + if ( rateFlowcontrol->flowStopped() ) { + QPID_LOG(warning, getId() << ": producer throttling violation"); + // TODO: Probably do message.stop("") first time then disconnect + getProxy().getMessage().stop(""); + } else { + AbsTime now = AbsTime::now(); + uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1); + if ( sendCredit>0 ) { + QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); + getProxy().getMessage().flow("", 0, sendCredit); + rateFlowcontrol->sentCredit(now, sendCredit); + } else if ( rateFlowcontrol->flowStopped() ) { + QPID_LOG(debug, getId() << ": Schedule sending credit"); + Timer& timer = getBroker().getTimer(); + // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms + sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); + flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol); + timer.add(flowControlTimer); + } + } + } } void SessionState::sendAcceptAndCompletion() @@ -222,7 +290,7 @@ void SessionState::sendAcceptAndCompletion() getProxy().getMessage().accept(accepted); accepted.clear(); } - sendCompletion(); + sendCompletion(); } void SessionState::enqueued(boost::intrusive_ptr<Message> msg) @@ -240,7 +308,7 @@ void SessionState::handleIn(AMQFrame& frame) { if (m == 0 || m->isContentBearing()) { handleContent(frame, commandId); } else if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod(), commandId); + handleCommand(frame.getMethod(), commandId); } else { throw InternalErrorException("Cannot handle multi-frame command segments yet"); } @@ -265,8 +333,8 @@ void SessionState::deliver(DeliveryRecord& msg, bool sync) } } -void SessionState::sendCompletion() { - handler->sendCompletion(); +void SessionState::sendCompletion() { + handler->sendCompletion(); } void SessionState::senderCompleted(const SequenceSet& commands) { @@ -282,6 +350,14 @@ void SessionState::readyToSend() { sys::AggregateOutput& tasks = handler->getConnection().outputTasks; tasks.addOutputTask(&semanticState); tasks.activateOutput(); + + if (rateFlowcontrol) { + // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth + QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U)); + getProxy().getMessage().setFlowMode("", 0); + getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U)); + rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U)); + } } Broker& SessionState::getBroker() { return broker; } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index f5f1bde2a2..29ca2665ea 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -55,6 +55,8 @@ class ConnectionState; class Message; class SessionHandler; class SessionManager; +class RateFlowcontrol; +class TimerTask; /** * Broker-side session state includes session's handler chains, which @@ -132,7 +134,11 @@ class SessionState : public qpid::SessionState, qmf::org::apache::qpid::broker::Session* mgmtObject; qpid::framing::SequenceSet accepted; - friend class SessionManager; + // State used for producer flow control (rate limited) + RateFlowcontrol* rateFlowcontrol; + boost::intrusive_ptr<TimerTask> flowControlTimer; + + friend class SessionManager; }; diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 4fadf236f8..7cf68956ea 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -62,7 +62,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> con ioHandler(*this), proxy(ioHandler), nextIn(0), - nextOut(0) + nextOut(0), + sendMsgCredit(0) { channel.next = connectionShared.get(); } @@ -76,6 +77,7 @@ SessionImpl::~SessionImpl() { handleClosed(); state.waitWaiters(); } + delete sendMsgCredit; } boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock(); if (c) c->erase(channel); @@ -359,7 +361,7 @@ void SessionImpl::sendContent(const MethodContent& content) uint64_t data_length = content.getData().length(); if(data_length > 0){ header.setLastSegment(false); - handleOut(header); + handleContentOut(header); /*Note: end of frame marker included in overhead but not in size*/ const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); @@ -388,7 +390,7 @@ void SessionImpl::sendContent(const MethodContent& content) } } } else { - handleOut(header); + handleContentOut(header); } } @@ -414,16 +416,18 @@ bool isContentFrame(AMQFrame& frame) void SessionImpl::handleIn(AMQFrame& frame) // network thread { try { - if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { - if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { - //make sure the command id sequence and completion - //tracking takes account of execution commands - Lock l(state); - completedIn.add(nextIn++); - } else { - //if not handled by this class, its for the application: - deliver(frame); - } + if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + ; + } else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { + //make sure the command id sequence and completion + //tracking takes account of execution commands + Lock l(state); + completedIn.add(nextIn++); + } else if (invoke(static_cast<MessageHandler&>(*this), *frame.getBody())) { + ; + } else { + //if not handled by this class, its for the application: + deliver(frame); } } catch (const SessionException& e) { @@ -439,6 +443,14 @@ void SessionImpl::handleOut(AMQFrame& frame) // user thread sendFrame(frame, true); } +void SessionImpl::handleContentOut(AMQFrame& frame) // user thread +{ + if (sendMsgCredit) { + sendMsgCredit->acquire(); + } + sendFrame(frame, true); +} + void SessionImpl::proxyOut(AMQFrame& frame) // network thread { //Note: this case is treated slightly differently that command @@ -631,6 +643,69 @@ void SessionImpl::exception(uint16_t errorCode, setTimeout(0); } +// Message methods: +void SessionImpl::accept(const qpid::framing::SequenceSet&) +{ +} + +void SessionImpl::reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&) +{ +} + +void SessionImpl::release(const qpid::framing::SequenceSet&, bool) +{ +} + +MessageResumeResult SessionImpl::resume(const std::string&, const std::string&) +{ + throw NotImplementedException("resuming transfers not yet supported"); +} + +namespace { + const std::string QPID_SESSION_DEST = ""; + const uint8_t FLOW_MODE_CREDIT = 0; + const uint8_t CREDIT_MODE_MSG = 0; +} + +void SessionImpl::setFlowMode(const std::string& dest, uint8_t flowMode) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + + if ( flowMode != FLOW_MODE_CREDIT ) { + throw NotImplementedException("window flow control mode not supported by producer"); + } + Lock l(state); + sendMsgCredit = new sys::Semaphore(0); +} + +void SessionImpl::flow(const std::string& dest, uint8_t mode, uint32_t credit) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + + if ( mode != CREDIT_MODE_MSG ) { + return; + } + if (sendMsgCredit) { + sendMsgCredit->release(credit); + } +} + +void SessionImpl::stop(const std::string& dest) +{ + if ( dest != QPID_SESSION_DEST ) { + QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest); + return; + } + if (sendMsgCredit) { + sendMsgCredit->forceLock(); + } +} //private utility methods: diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index ea7776634a..9d0c4ff796 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -61,7 +61,8 @@ class SessionHandler; class SessionImpl : public framing::FrameHandler::InOutHandler, public Execution, private framing::AMQP_ClientOperations::SessionHandler, - private framing::AMQP_ClientOperations::ExecutionHandler + private framing::AMQP_ClientOperations::ExecutionHandler, + private framing::AMQP_ClientOperations::MessageHandler { public: SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>); @@ -123,6 +124,7 @@ private: }; typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler; + typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler; typedef sys::StateMonitor<State, DETACHED> StateMonitor; typedef StateMonitor::Set States; @@ -138,6 +140,7 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); + void handleContentOut(framing::AMQFrame& frame); /** * Sends session controls. This case is treated slightly * differently than command frames sent by the application via @@ -181,6 +184,18 @@ private: uint8_t fieldIndex, const std::string& description, const framing::FieldTable& errorInfo); + + // Note: Following methods are called by network thread in + // response to message commands from the broker + // EXCEPT Message.Transfer + void accept(const qpid::framing::SequenceSet&); + void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&); + void release(const qpid::framing::SequenceSet&, bool); + qpid::framing::MessageResumeResult resume(const std::string&, const std::string&); + void setFlowMode(const std::string&, uint8_t); + void flow(const std::string&, uint8_t, uint32_t); + void stop(const std::string&); + sys::ExceptionHolder exceptionHolder; mutable StateMonitor state; @@ -211,6 +226,9 @@ private: SessionState sessionState; + // Only keep track of message credit + sys::Semaphore* sendMsgCredit; + friend class client::SessionHandler; }; diff --git a/qpid/cpp/src/qpid/sys/Semaphore.h b/qpid/cpp/src/qpid/sys/Semaphore.h index 3efb7ce2df..af937b60b8 100644 --- a/qpid/cpp/src/qpid/sys/Semaphore.h +++ b/qpid/cpp/src/qpid/sys/Semaphore.h @@ -51,10 +51,22 @@ public: count--; } + void release(uint n) + { + Monitor::ScopedLock l(monitor); + if (count==0) monitor.notifyAll(); + count+=n; + } + void release() { + release(1); + } + + void forceLock() + { Monitor::ScopedLock l(monitor); - if (!count++) monitor.notifyAll(); + count = 0; } private: diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index d675b587b1..fa9c5f3a01 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -91,7 +91,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ ConsoleTest.cpp \ QueueEvents.cpp \ ProxyTest.cpp \ - RetryList.cpp + RetryList.cpp \ + RateFlowcontrolTest.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp diff --git a/qpid/cpp/src/tests/RateFlowcontrolTest.cpp b/qpid/cpp/src/tests/RateFlowcontrolTest.cpp new file mode 100644 index 0000000000..3e2e2fa777 --- /dev/null +++ b/qpid/cpp/src/tests/RateFlowcontrolTest.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "unit_test.h" + +#include "qpid/broker/RateFlowcontrol.h" +#include "qpid/sys/Time.h" + +using namespace qpid::broker; +using namespace qpid::sys; + +QPID_AUTO_TEST_SUITE(RateFlowcontrolTestSuite) + +QPID_AUTO_TEST_CASE(RateFlowcontrolTest) +{ + // BOOST_CHECK(predicate); + // BOOST_CHECK_EQUAL(a, b); + + RateFlowcontrol fc(100); + AbsTime n=AbsTime::now(); + + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U ); + + fc.sentCredit(n, 0); + + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U ); + fc.sentCredit(n, 100); + + Duration d=250*TIME_MSEC; + + n = AbsTime(n,d); + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 48), 25U ); + fc.sentCredit(n, 25); + + n = AbsTime(n,d); + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 23U ); + fc.sentCredit(n, 23); + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 100), 0U); + BOOST_CHECK(fc.flowStopped()); + + n = AbsTime(n,d); + n = AbsTime(n,d); + BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 50U); +} + +QPID_AUTO_TEST_SUITE_END() |