diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 60 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.h | 1 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 3 |
5 files changed, 49 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 9088be2e54..ed1deb37fc 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -63,6 +63,14 @@ SessionState::SessionState( mgmtObject(0), rateFlowcontrol(0) { + uint32_t maxRate = broker.getOptions().maxSessionRate; + if (maxRate) { + if (handler->getConnection().getClientThrottling()) { + rateFlowcontrol = new RateFlowcontrol(maxRate); + } else { + QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); + } + } Manageable* parent = broker.GetVhostObject (); if (parent != 0) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); @@ -72,25 +80,18 @@ SessionState::SessionState( mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); + if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate); ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); } } - uint32_t maxRate = broker.getOptions().maxSessionRate; - if (maxRate) { - if (handler->getConnection().getClientThrottling()) { - rateFlowcontrol = new RateFlowcontrol(maxRate); - } else { - QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); - } - } attach(h); } SessionState::~SessionState() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); - + if (flowControlTimer) flowControlTimer->cancel(); } @@ -213,14 +214,7 @@ struct ScheduledCreditTask : public TimerTask { void fire() { // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { - // Send credit - AbsTime now = AbsTime::now(); - uint32_t sendCredit = flowControl.receivedMessage(now, 0); - if ( sendCredit>0 ) { - QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit); - sessionState.getProxy().getMessage().flow("", 0, sendCredit); - flowControl.sentCredit(now, sendCredit); - } else if ( flowControl.flowStopped() ) { + if ( !sessionState.processSendCredit(0) && flowControl.flowStopped() ) { QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); reset(); timer.add(this); @@ -270,13 +264,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) // TODO: Probably do message.stop("") first time then disconnect getProxy().getMessage().stop(""); } else { - AbsTime now = AbsTime::now(); - uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1); - if ( sendCredit>0 ) { - QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); - getProxy().getMessage().flow("", 0, sendCredit); - rateFlowcontrol->sentCredit(now, sendCredit); - } else if ( rateFlowcontrol->flowStopped() ) { + if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) { QPID_LOG(debug, getId() << ": Schedule sending credit"); Timer& timer = getBroker().getTimer(); // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms @@ -288,6 +276,22 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) } } +bool SessionState::processSendCredit(uint32_t msgs) +{ + AbsTime now = AbsTime::now(); + uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs); + if (mgmtObject) mgmtObject->dec_clientCredit(msgs); + if ( sendCredit>0 ) { + QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); + getProxy().getMessage().flow("", 0, sendCredit); + rateFlowcontrol->sentCredit(now, sendCredit); + if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit); + return true; + } else { + return false; + } +} + void SessionState::sendAcceptAndCompletion() { if (!accepted.empty()) { @@ -357,10 +361,12 @@ void SessionState::readyToSend() { if (rateFlowcontrol) { // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth - QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U)); + uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U); + QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit); getProxy().getMessage().setFlowMode("", 0); - getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U)); - rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U)); + getProxy().getMessage().flow("", 0, credit); + rateFlowcontrol->sentCredit(AbsTime::now(), credit); + if (mgmtObject) mgmtObject->inc_clientCredit(credit); } } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 29ca2665ea..b12cc15e1f 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -62,7 +62,7 @@ class TimerTask; * Broker-side session state includes session's handler chains, which * may themselves have state. */ -class SessionState : public qpid::SessionState, +class SessionState : public qpid::SessionState, public SessionContext, public DeliveryAdapter, public management::Manageable, @@ -79,7 +79,7 @@ class SessionState : public qpid::SessionState, /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); - + /** @pre isAttached() */ ConnectionState& getConnection(); bool isLocal(const ConnectionToken* t) const; @@ -91,7 +91,7 @@ class SessionState : public qpid::SessionState, void giveReadCredit(int32_t); void senderCompleted(const framing::SequenceSet& ranges); - + void sendCompletion(); //delivery adapter methods: @@ -108,6 +108,8 @@ class SessionState : public qpid::SessionState, SemanticState& getSemanticState() { return semanticState; } boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); } + bool processSendCredit(uint32_t msgs); + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); @@ -124,7 +126,7 @@ class SessionState : public qpid::SessionState, void sendAcceptAndCompletion(); Broker& broker; - SessionHandler* handler; + SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. SemanticState semanticState; SessionAdapter adapter; @@ -133,7 +135,7 @@ class SessionState : public qpid::SessionState, IncompleteMessageList::CompletionListener enqueuedOp; qmf::org::apache::qpid::broker::Session* mgmtObject; qpid::framing::SequenceSet accepted; - + // State used for producer flow control (rate limited) RateFlowcontrol* rateFlowcontrol; boost::intrusive_ptr<TimerTask> flowControlTimer; diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index c179a31853..ee542a9cf8 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -329,6 +329,10 @@ void SessionImpl::sendRawFrame(AMQFrame& frame) { Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) { + // Only message transfers have content + if (content && sendMsgCredit) { + sendMsgCredit->acquire(); + } Acquire a(sendLock); SequenceNumber id = nextOut++; { @@ -366,7 +370,7 @@ void SessionImpl::sendContent(const MethodContent& content) uint64_t data_length = content.getData().length(); if(data_length > 0){ header.setLastSegment(false); - handleContentOut(header); + handleOut(header); /*Note: end of frame marker included in overhead but not in size*/ const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); @@ -395,7 +399,7 @@ void SessionImpl::sendContent(const MethodContent& content) } } } else { - handleContentOut(header); + handleOut(header); } } @@ -448,14 +452,6 @@ void SessionImpl::handleOut(AMQFrame& frame) // user thread sendFrame(frame, true); } -void SessionImpl::handleContentOut(AMQFrame& frame) // user thread -{ - if (sendMsgCredit) { - sendMsgCredit->acquire(); - } - sendFrame(frame, true); -} - void SessionImpl::proxyOut(AMQFrame& frame) // network thread { //Note: this case is treated slightly differently that command diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index d826b759ae..851bd2ec47 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -146,7 +146,6 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); - void handleContentOut(framing::AMQFrame& frame); /** * Sends session controls. This case is treated slightly * differently than command frames sent by the application via diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 3baa1fea4a..307ced1245 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -285,6 +285,7 @@ <property name="detachedLifespan" type="uint32" access="RO" unit="second"/> <property name="attached" type="bool" access="RO"/> <property name="expireTime" type="absTime" access="RO" optional="y"/> + <property name="maxClientRate" type="uint32" access="RO" unit="msgs/sec" optional="y"/> <statistic name="framesOutstanding" type="count32"/> @@ -293,6 +294,8 @@ <statistic name="TxnRejects" type="count64" unit="transaction" desc="Total transactions rejected"/> <statistic name="TxnCount" type="count32" unit="transaction" desc="Current pending transactions"/> + <statistic name="clientCredit" type="count32" unit="message" desc="Client message credit"/> + <method name="solicitAck"/> <method name="detach"/> <method name="resetLifespan"/> |