summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp60
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h12
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp16
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.h1
-rw-r--r--qpid/specs/management-schema.xml3
5 files changed, 49 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 9088be2e54..ed1deb37fc 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -63,6 +63,14 @@ SessionState::SessionState(
mgmtObject(0),
rateFlowcontrol(0)
{
+ uint32_t maxRate = broker.getOptions().maxSessionRate;
+ if (maxRate) {
+ if (handler->getConnection().getClientThrottling()) {
+ rateFlowcontrol = new RateFlowcontrol(maxRate);
+ } else {
+ QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
+ }
+ }
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
@@ -72,25 +80,18 @@ SessionState::SessionState(
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
+ if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate);
ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
}
}
- uint32_t maxRate = broker.getOptions().maxSessionRate;
- if (maxRate) {
- if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
- } else {
- QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
- }
- }
attach(h);
}
SessionState::~SessionState() {
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
-
+
if (flowControlTimer)
flowControlTimer->cancel();
}
@@ -213,14 +214,7 @@ struct ScheduledCreditTask : public TimerTask {
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
- // Send credit
- AbsTime now = AbsTime::now();
- uint32_t sendCredit = flowControl.receivedMessage(now, 0);
- if ( sendCredit>0 ) {
- QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit);
- sessionState.getProxy().getMessage().flow("", 0, sendCredit);
- flowControl.sentCredit(now, sendCredit);
- } else if ( flowControl.flowStopped() ) {
+ if ( !sessionState.processSendCredit(0) && flowControl.flowStopped() ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
reset();
timer.add(this);
@@ -270,13 +264,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
// TODO: Probably do message.stop("") first time then disconnect
getProxy().getMessage().stop("");
} else {
- AbsTime now = AbsTime::now();
- uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1);
- if ( sendCredit>0 ) {
- QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
- rateFlowcontrol->sentCredit(now, sendCredit);
- } else if ( rateFlowcontrol->flowStopped() ) {
+ if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) {
QPID_LOG(debug, getId() << ": Schedule sending credit");
Timer& timer = getBroker().getTimer();
// Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
@@ -288,6 +276,22 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
}
}
+bool SessionState::processSendCredit(uint32_t msgs)
+{
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs);
+ if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
+ getProxy().getMessage().flow("", 0, sendCredit);
+ rateFlowcontrol->sentCredit(now, sendCredit);
+ if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
+ return true;
+ } else {
+ return false;
+ }
+}
+
void SessionState::sendAcceptAndCompletion()
{
if (!accepted.empty()) {
@@ -357,10 +361,12 @@ void SessionState::readyToSend() {
if (rateFlowcontrol) {
// Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth
- QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U));
+ uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U);
+ QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U));
- rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U));
+ getProxy().getMessage().flow("", 0, credit);
+ rateFlowcontrol->sentCredit(AbsTime::now(), credit);
+ if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 29ca2665ea..b12cc15e1f 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -62,7 +62,7 @@ class TimerTask;
* Broker-side session state includes session's handler chains, which
* may themselves have state.
*/
-class SessionState : public qpid::SessionState,
+class SessionState : public qpid::SessionState,
public SessionContext,
public DeliveryAdapter,
public management::Manageable,
@@ -79,7 +79,7 @@ class SessionState : public qpid::SessionState,
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
-
+
/** @pre isAttached() */
ConnectionState& getConnection();
bool isLocal(const ConnectionToken* t) const;
@@ -91,7 +91,7 @@ class SessionState : public qpid::SessionState,
void giveReadCredit(int32_t);
void senderCompleted(const framing::SequenceSet& ranges);
-
+
void sendCompletion();
//delivery adapter methods:
@@ -108,6 +108,8 @@ class SessionState : public qpid::SessionState,
SemanticState& getSemanticState() { return semanticState; }
boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+ bool processSendCredit(uint32_t msgs);
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
@@ -124,7 +126,7 @@ class SessionState : public qpid::SessionState,
void sendAcceptAndCompletion();
Broker& broker;
- SessionHandler* handler;
+ SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
SemanticState semanticState;
SessionAdapter adapter;
@@ -133,7 +135,7 @@ class SessionState : public qpid::SessionState,
IncompleteMessageList::CompletionListener enqueuedOp;
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
-
+
// State used for producer flow control (rate limited)
RateFlowcontrol* rateFlowcontrol;
boost::intrusive_ptr<TimerTask> flowControlTimer;
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
index c179a31853..ee542a9cf8 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -329,6 +329,10 @@ void SessionImpl::sendRawFrame(AMQFrame& frame) {
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
{
+ // Only message transfers have content
+ if (content && sendMsgCredit) {
+ sendMsgCredit->acquire();
+ }
Acquire a(sendLock);
SequenceNumber id = nextOut++;
{
@@ -366,7 +370,7 @@ void SessionImpl::sendContent(const MethodContent& content)
uint64_t data_length = content.getData().length();
if(data_length > 0){
header.setLastSegment(false);
- handleContentOut(header);
+ handleOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
@@ -395,7 +399,7 @@ void SessionImpl::sendContent(const MethodContent& content)
}
}
} else {
- handleContentOut(header);
+ handleOut(header);
}
}
@@ -448,14 +452,6 @@ void SessionImpl::handleOut(AMQFrame& frame) // user thread
sendFrame(frame, true);
}
-void SessionImpl::handleContentOut(AMQFrame& frame) // user thread
-{
- if (sendMsgCredit) {
- sendMsgCredit->acquire();
- }
- sendFrame(frame, true);
-}
-
void SessionImpl::proxyOut(AMQFrame& frame) // network thread
{
//Note: this case is treated slightly differently that command
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h
index d826b759ae..851bd2ec47 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/SessionImpl.h
@@ -146,7 +146,6 @@ private:
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
- void handleContentOut(framing::AMQFrame& frame);
/**
* Sends session controls. This case is treated slightly
* differently than command frames sent by the application via
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 3baa1fea4a..307ced1245 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -285,6 +285,7 @@
<property name="detachedLifespan" type="uint32" access="RO" unit="second"/>
<property name="attached" type="bool" access="RO"/>
<property name="expireTime" type="absTime" access="RO" optional="y"/>
+ <property name="maxClientRate" type="uint32" access="RO" unit="msgs/sec" optional="y"/>
<statistic name="framesOutstanding" type="count32"/>
@@ -293,6 +294,8 @@
<statistic name="TxnRejects" type="count64" unit="transaction" desc="Total transactions rejected"/>
<statistic name="TxnCount" type="count32" unit="transaction" desc="Current pending transactions"/>
+ <statistic name="clientCredit" type="count32" unit="message" desc="Client message credit"/>
+
<method name="solicitAck"/>
<method name="detach"/>
<method name="resetLifespan"/>