summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-01-27 21:17:47 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-01-27 21:17:47 +0000
commitd37870c56f30099fccf8e876ff7ea70829a1458a (patch)
tree76231e78fd79ee7385139bf1c30c096e4a9ab7b7
parente499bb4d3bcff953fc8dc79cf16a325fe5e25160 (diff)
downloadqpid-python-d37870c56f30099fccf8e876ff7ea70829a1458a.tar.gz
Producer side rate throttling:
This uses the Message.Flow command to send credit from broker to client to ensure that the client doesn't exceed a rate configured on the broker per session. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@738247 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/RateFlowcontrol.h96
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp110
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h8
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp101
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.h20
-rw-r--r--qpid/cpp/src/qpid/sys/Semaphore.h14
-rw-r--r--qpid/cpp/src/tests/Makefile.am3
-rw-r--r--qpid/cpp/src/tests/RateFlowcontrolTest.cpp64
11 files changed, 388 insertions, 36 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index e4e9a98aa7..e07267da37 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -539,6 +539,7 @@ nobase_include_HEADERS = \
qpid/broker/QueuedMessage.h \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.h \
+ qpid/broker/RateFlowcontrol.h \
qpid/broker/RateTracker.h \
qpid/broker/RecoverableConfig.h \
qpid/broker/RecoverableExchange.h \
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index e4c5c9b5e9..f692ff72f3 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -90,7 +90,8 @@ Broker::Options::Options(const std::string& name) :
replayHardLimit(0),
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(false),
- requireEncrypted(false)
+ requireEncrypted(false),
+ maxSessionRate(0)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -119,7 +120,8 @@ Broker::Options::Options(const std::string& name) :
("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)")
("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
- ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)");
+ ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
+ ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)");
}
const std::string empty;
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index d97737c707..c50ef46baa 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -105,6 +105,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
bool tcpNoDelay;
bool requireEncrypted;
std::string knownHosts;
+ uint32_t maxSessionRate;
};
private:
diff --git a/qpid/cpp/src/qpid/broker/RateFlowcontrol.h b/qpid/cpp/src/qpid/broker/RateFlowcontrol.h
new file mode 100644
index 0000000000..3323097eff
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/RateFlowcontrol.h
@@ -0,0 +1,96 @@
+#ifndef broker_RateFlowcontrol_h
+#define broker_RateFlowcontrol_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include "qpid/sys/IntegerTypes.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace broker {
+
+// Class to keep track of issuing flow control to make sure that the peer doesn't exceed
+// a given message rate
+//
+// Create the object with the target rate
+// Then call sendCredit() whenever credit is issued to the peer
+// Call receivedMessage() whenever a message is received, it returns the credit to issue.
+//
+// sentCredit() be sensibly called with a 0 parameter to indicate
+// that we sent credit but treat it as if the value was 0 (we may do this at the start of the connection
+// to allow our peer to send messages)
+//
+// receivedMessage() can be called with 0 to indicate that we've not received a message, but
+// tell me what credit I can send.
+class RateFlowcontrol {
+ uint32_t rate; // messages per second
+ uint32_t maxCredit; // max credit issued to client (issued at start)
+ uint32_t requestedCredit;
+ qpid::sys::AbsTime creditSent;
+
+public:
+ RateFlowcontrol(uint32_t r) :
+ rate(r),
+ maxCredit(0),
+ requestedCredit(0),
+ creditSent(qpid::sys::FAR_FUTURE)
+ {}
+
+ uint32_t getRate() const {
+ return rate;
+ }
+ void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit);
+ uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs);
+ bool flowStopped() const;
+};
+
+inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t credit) {
+ // If the client isn't currently requesting credit (ie it's not sent us anything yet) then
+ // this credit goes to the max credit held by the client (it can't go to reduce credit
+ // less than 0)
+ int32_t nextRequestedCredit = requestedCredit - credit;
+ if ( nextRequestedCredit<0 ) {
+ requestedCredit = 0;
+ maxCredit -= nextRequestedCredit;
+ } else {
+ requestedCredit = nextRequestedCredit;
+ }
+ creditSent = t;
+}
+
+inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) {
+ requestedCredit +=msgs;
+ qpid::sys::Duration d(creditSent, t);
+ // Could be -ve before first sentCredit
+ int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit));
+ return toSend > 0 ? toSend : 0;
+}
+
+inline bool RateFlowcontrol::flowStopped() const {
+ return requestedCredit >= maxCredit;
+}
+
+}}
+
+#endif // broker_RateFlowcontrol_h
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 0a24a39d38..5039b31874 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,8 @@
#include "DeliveryRecord.h"
#include "SessionManager.h"
#include "SessionHandler.h"
+#include "RateFlowcontrol.h"
+#include "Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -31,6 +33,7 @@
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementBroker.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
@@ -46,17 +49,19 @@ using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using qpid::sys::AbsTime;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
- Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
+ Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
: qpid::SessionState(id, config),
broker(b), handler(&h),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
- mgmtObject(0)
+ mgmtObject(0),
+ rateFlowcontrol(0)
{
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
@@ -71,12 +76,19 @@ SessionState::SessionState(
agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
}
}
+ uint32_t maxRate = broker.getOptions().maxSessionRate;
+ if (maxRate) {
+ rateFlowcontrol = new RateFlowcontrol(maxRate);
+ }
attach(h);
}
SessionState::~SessionState() {
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
+
+ if (flowControlTimer)
+ flowControlTimer->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -102,7 +114,7 @@ void SessionState::detach() {
mgmtObject->set_attached (0);
}
-void SessionState::disableOutput()
+void SessionState::disableOutput()
{
semanticState.detached();//prevents further activateOutput calls until reattached
getConnection().outputTasks.removeOutputTask(&semanticState);
@@ -120,12 +132,12 @@ void SessionState::attach(SessionHandler& h) {
}
void SessionState::activateOutput() {
- if (isAttached())
+ if (isAttached())
getConnection().outputTasks.activateOutput();
}
void SessionState::giveReadCredit(int32_t credit) {
- if (isAttached())
+ if (isAttached())
getConnection().outputTasks.giveReadCredit(credit);
}
@@ -170,18 +182,49 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
Invoker::Result invocation = invoke(adapter, *method);
- receiverCompleted(id);
+ receiverCompleted(id);
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync()) {
+ if (method->isSync()) {
incomplete.process(enqueuedOp, true);
sendAcceptAndCompletion();
}
}
+struct ScheduledCreditTask : public TimerTask {
+ Timer& timer;
+ SessionState& sessionState;
+ RateFlowcontrol& flowControl;
+ ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
+ SessionState& s, RateFlowcontrol& f) :
+ TimerTask(d),
+ timer(t),
+ sessionState(s),
+ flowControl(f)
+ {}
+
+ void fire() {
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (!isCancelled()) {
+ // Send credit
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = flowControl.receivedMessage(now, 0);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit);
+ sessionState.getProxy().getMessage().flow("", 0, sendCredit);
+ flowControl.sentCredit(now, sendCredit);
+ } else if ( flowControl.flowStopped() ) {
+ QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
+ reset();
+ timer.add(this);
+ }
+ }
+ }
+};
+
void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
if (frame.getBof() && frame.getBos()) //start of frameset
@@ -194,10 +237,10 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
- msg->getFrames().append(header);
+ msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
- semanticState.handle(msg);
+ semanticState.handle(msg);
msgBuilder.end();
if (msg->isEnqueueComplete()) {
@@ -206,14 +249,39 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
incomplete.add(msg);
}
- //hold up execution until async enqueue is complete
- if (msg->getFrames().getMethod()->isSync()) {
+ //hold up execution until async enqueue is complete
+ if (msg->getFrames().getMethod()->isSync()) {
incomplete.process(enqueuedOp, true);
sendAcceptAndCompletion();
} else {
incomplete.process(enqueuedOp, false);
}
}
+
+ // Handle producer session flow control
+ if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
+ // Check for violating flow control
+ if ( rateFlowcontrol->flowStopped() ) {
+ QPID_LOG(warning, getId() << ": producer throttling violation");
+ // TODO: Probably do message.stop("") first time then disconnect
+ getProxy().getMessage().stop("");
+ } else {
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
+ getProxy().getMessage().flow("", 0, sendCredit);
+ rateFlowcontrol->sentCredit(now, sendCredit);
+ } else if ( rateFlowcontrol->flowStopped() ) {
+ QPID_LOG(debug, getId() << ": Schedule sending credit");
+ Timer& timer = getBroker().getTimer();
+ // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
+ sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
+ flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol);
+ timer.add(flowControlTimer);
+ }
+ }
+ }
}
void SessionState::sendAcceptAndCompletion()
@@ -222,7 +290,7 @@ void SessionState::sendAcceptAndCompletion()
getProxy().getMessage().accept(accepted);
accepted.clear();
}
- sendCompletion();
+ sendCompletion();
}
void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
@@ -240,7 +308,7 @@ void SessionState::handleIn(AMQFrame& frame) {
if (m == 0 || m->isContentBearing()) {
handleContent(frame, commandId);
} else if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
+ handleCommand(frame.getMethod(), commandId);
} else {
throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
@@ -265,8 +333,8 @@ void SessionState::deliver(DeliveryRecord& msg, bool sync)
}
}
-void SessionState::sendCompletion() {
- handler->sendCompletion();
+void SessionState::sendCompletion() {
+ handler->sendCompletion();
}
void SessionState::senderCompleted(const SequenceSet& commands) {
@@ -282,6 +350,14 @@ void SessionState::readyToSend() {
sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
tasks.addOutputTask(&semanticState);
tasks.activateOutput();
+
+ if (rateFlowcontrol) {
+ // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth
+ QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U));
+ getProxy().getMessage().setFlowMode("", 0);
+ getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U));
+ rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U));
+ }
}
Broker& SessionState::getBroker() { return broker; }
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index f5f1bde2a2..29ca2665ea 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -55,6 +55,8 @@ class ConnectionState;
class Message;
class SessionHandler;
class SessionManager;
+class RateFlowcontrol;
+class TimerTask;
/**
* Broker-side session state includes session's handler chains, which
@@ -132,7 +134,11 @@ class SessionState : public qpid::SessionState,
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
- friend class SessionManager;
+ // State used for producer flow control (rate limited)
+ RateFlowcontrol* rateFlowcontrol;
+ boost::intrusive_ptr<TimerTask> flowControlTimer;
+
+ friend class SessionManager;
};
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
index 4fadf236f8..7cf68956ea 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -62,7 +62,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> con
ioHandler(*this),
proxy(ioHandler),
nextIn(0),
- nextOut(0)
+ nextOut(0),
+ sendMsgCredit(0)
{
channel.next = connectionShared.get();
}
@@ -76,6 +77,7 @@ SessionImpl::~SessionImpl() {
handleClosed();
state.waitWaiters();
}
+ delete sendMsgCredit;
}
boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
if (c) c->erase(channel);
@@ -359,7 +361,7 @@ void SessionImpl::sendContent(const MethodContent& content)
uint64_t data_length = content.getData().length();
if(data_length > 0){
header.setLastSegment(false);
- handleOut(header);
+ handleContentOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
@@ -388,7 +390,7 @@ void SessionImpl::sendContent(const MethodContent& content)
}
}
} else {
- handleOut(header);
+ handleContentOut(header);
}
}
@@ -414,16 +416,18 @@ bool isContentFrame(AMQFrame& frame)
void SessionImpl::handleIn(AMQFrame& frame) // network thread
{
try {
- if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
- if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
- //make sure the command id sequence and completion
- //tracking takes account of execution commands
- Lock l(state);
- completedIn.add(nextIn++);
- } else {
- //if not handled by this class, its for the application:
- deliver(frame);
- }
+ if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ ;
+ } else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
+ //make sure the command id sequence and completion
+ //tracking takes account of execution commands
+ Lock l(state);
+ completedIn.add(nextIn++);
+ } else if (invoke(static_cast<MessageHandler&>(*this), *frame.getBody())) {
+ ;
+ } else {
+ //if not handled by this class, its for the application:
+ deliver(frame);
}
}
catch (const SessionException& e) {
@@ -439,6 +443,14 @@ void SessionImpl::handleOut(AMQFrame& frame) // user thread
sendFrame(frame, true);
}
+void SessionImpl::handleContentOut(AMQFrame& frame) // user thread
+{
+ if (sendMsgCredit) {
+ sendMsgCredit->acquire();
+ }
+ sendFrame(frame, true);
+}
+
void SessionImpl::proxyOut(AMQFrame& frame) // network thread
{
//Note: this case is treated slightly differently that command
@@ -631,6 +643,69 @@ void SessionImpl::exception(uint16_t errorCode,
setTimeout(0);
}
+// Message methods:
+void SessionImpl::accept(const qpid::framing::SequenceSet&)
+{
+}
+
+void SessionImpl::reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&)
+{
+}
+
+void SessionImpl::release(const qpid::framing::SequenceSet&, bool)
+{
+}
+
+MessageResumeResult SessionImpl::resume(const std::string&, const std::string&)
+{
+ throw NotImplementedException("resuming transfers not yet supported");
+}
+
+namespace {
+ const std::string QPID_SESSION_DEST = "";
+ const uint8_t FLOW_MODE_CREDIT = 0;
+ const uint8_t CREDIT_MODE_MSG = 0;
+}
+
+void SessionImpl::setFlowMode(const std::string& dest, uint8_t flowMode)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+
+ if ( flowMode != FLOW_MODE_CREDIT ) {
+ throw NotImplementedException("window flow control mode not supported by producer");
+ }
+ Lock l(state);
+ sendMsgCredit = new sys::Semaphore(0);
+}
+
+void SessionImpl::flow(const std::string& dest, uint8_t mode, uint32_t credit)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+
+ if ( mode != CREDIT_MODE_MSG ) {
+ return;
+ }
+ if (sendMsgCredit) {
+ sendMsgCredit->release(credit);
+ }
+}
+
+void SessionImpl::stop(const std::string& dest)
+{
+ if ( dest != QPID_SESSION_DEST ) {
+ QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+ return;
+ }
+ if (sendMsgCredit) {
+ sendMsgCredit->forceLock();
+ }
+}
//private utility methods:
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h
index ea7776634a..9d0c4ff796 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/SessionImpl.h
@@ -61,7 +61,8 @@ class SessionHandler;
class SessionImpl : public framing::FrameHandler::InOutHandler,
public Execution,
private framing::AMQP_ClientOperations::SessionHandler,
- private framing::AMQP_ClientOperations::ExecutionHandler
+ private framing::AMQP_ClientOperations::ExecutionHandler,
+ private framing::AMQP_ClientOperations::MessageHandler
{
public:
SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>);
@@ -123,6 +124,7 @@ private:
};
typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
+ typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler;
typedef sys::StateMonitor<State, DETACHED> StateMonitor;
typedef StateMonitor::Set States;
@@ -138,6 +140,7 @@ private:
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
+ void handleContentOut(framing::AMQFrame& frame);
/**
* Sends session controls. This case is treated slightly
* differently than command frames sent by the application via
@@ -181,6 +184,18 @@ private:
uint8_t fieldIndex,
const std::string& description,
const framing::FieldTable& errorInfo);
+
+ // Note: Following methods are called by network thread in
+ // response to message commands from the broker
+ // EXCEPT Message.Transfer
+ void accept(const qpid::framing::SequenceSet&);
+ void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&);
+ void release(const qpid::framing::SequenceSet&, bool);
+ qpid::framing::MessageResumeResult resume(const std::string&, const std::string&);
+ void setFlowMode(const std::string&, uint8_t);
+ void flow(const std::string&, uint8_t, uint32_t);
+ void stop(const std::string&);
+
sys::ExceptionHolder exceptionHolder;
mutable StateMonitor state;
@@ -211,6 +226,9 @@ private:
SessionState sessionState;
+ // Only keep track of message credit
+ sys::Semaphore* sendMsgCredit;
+
friend class client::SessionHandler;
};
diff --git a/qpid/cpp/src/qpid/sys/Semaphore.h b/qpid/cpp/src/qpid/sys/Semaphore.h
index 3efb7ce2df..af937b60b8 100644
--- a/qpid/cpp/src/qpid/sys/Semaphore.h
+++ b/qpid/cpp/src/qpid/sys/Semaphore.h
@@ -51,10 +51,22 @@ public:
count--;
}
+ void release(uint n)
+ {
+ Monitor::ScopedLock l(monitor);
+ if (count==0) monitor.notifyAll();
+ count+=n;
+ }
+
void release()
{
+ release(1);
+ }
+
+ void forceLock()
+ {
Monitor::ScopedLock l(monitor);
- if (!count++) monitor.notifyAll();
+ count = 0;
}
private:
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index d675b587b1..fa9c5f3a01 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -91,7 +91,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
ConsoleTest.cpp \
QueueEvents.cpp \
ProxyTest.cpp \
- RetryList.cpp
+ RetryList.cpp \
+ RateFlowcontrolTest.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
diff --git a/qpid/cpp/src/tests/RateFlowcontrolTest.cpp b/qpid/cpp/src/tests/RateFlowcontrolTest.cpp
new file mode 100644
index 0000000000..3e2e2fa777
--- /dev/null
+++ b/qpid/cpp/src/tests/RateFlowcontrolTest.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/broker/RateFlowcontrol.h"
+#include "qpid/sys/Time.h"
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+QPID_AUTO_TEST_SUITE(RateFlowcontrolTestSuite)
+
+QPID_AUTO_TEST_CASE(RateFlowcontrolTest)
+{
+ // BOOST_CHECK(predicate);
+ // BOOST_CHECK_EQUAL(a, b);
+
+ RateFlowcontrol fc(100);
+ AbsTime n=AbsTime::now();
+
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U );
+
+ fc.sentCredit(n, 0);
+
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U );
+ fc.sentCredit(n, 100);
+
+ Duration d=250*TIME_MSEC;
+
+ n = AbsTime(n,d);
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 48), 25U );
+ fc.sentCredit(n, 25);
+
+ n = AbsTime(n,d);
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 23U );
+ fc.sentCredit(n, 23);
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 100), 0U);
+ BOOST_CHECK(fc.flowStopped());
+
+ n = AbsTime(n,d);
+ n = AbsTime(n,d);
+ BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 50U);
+}
+
+QPID_AUTO_TEST_SUITE_END()