From 80d65b38008d9b7f31c825508819f9600d63b63c Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 16 Jun 2009 21:21:09 +0000 Subject: Performance improvements in AggregateOutput and SemanticState. Replaced AggregateOutput hierarchy with a flat list per connection holding only the OutputTasks that are potentially active. Tasks are droped from the list as soon as they return false, and added back when they may have output. Inlined frequently-used SequenceNumber functions. Replace std::list in QueueListeners with std::vector. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@785408 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/UpdateClient.cpp | 38 ++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 10 deletions(-) (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp') diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 332e74c512..7c305a2e92 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -54,6 +54,7 @@ #include "qpid/log/Statement.h" #include "qpid/Url.h" #include +#include #include namespace qpid { @@ -64,6 +65,8 @@ using broker::Exchange; using broker::Queue; using broker::QueueBinding; using broker::Message; +using broker::SemanticState; + using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; @@ -125,7 +128,8 @@ void UpdateClient::update() { Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); - // Update queue is used to transfer acquired messages that are no longer on their original queue. + // Update queue is used to transfer acquired messages that are no + // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); @@ -256,6 +260,16 @@ void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& que s.exchangeBind(queue, binding.exchange, binding.key, binding.args); } +void UpdateClient::updateOutputTask(const sys::OutputTask* task) { + const SemanticState::ConsumerImpl* cci = + boost::polymorphic_downcast (task); + SemanticState::ConsumerImpl* ci = const_cast(cci); + uint16_t channel = ci->getParent().getSession().getChannel(); + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); + QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + << " channel=" << channel); +} + void UpdateClient::updateConnection(const boost::intrusive_ptr& updateConnection) { QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); shadowConnection = catchUpConnection(); @@ -266,6 +280,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr& upda bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); // Safe to use decoder here because we are stalled for update. std::pair fragment = decoder.get(updateConnection->getId()).getFragment(); + bc.getOutputTasks().eachOutput( + boost::bind(&UpdateClient::updateOutputTask, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), updateConnection->getId().getNumber(), @@ -294,9 +310,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, updaterId << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33. QPID_LOG(debug, updaterId << " updating consumers."); - ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this)); + ss->getSemanticState().eachConsumer( + boost::bind(&UpdateClient::updateConsumer, this, _1)); QPID_LOG(debug, updaterId << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); @@ -304,7 +320,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { updateTxState(ss->getSemanticState()); // Tx transaction state. - // Adjust for command counter for message in progress, will be sent after state update. + // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) @@ -328,8 +344,11 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); } -void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); +void UpdateClient::updateConsumer( + const broker::SemanticState::ConsumerImpl::shared_ptr& ci) +{ + QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -344,13 +363,12 @@ void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); - ClusterConnectionConsumerStateBody state( - ProtocolVersion(), + ClusterConnectionProxy(shadowSession).consumerState( ci->getName(), ci->isBlocked(), - ci->isNotifyEnabled() + ci->isNotifyEnabled(), + ci->getQueue()->getListeners().contains(ci) ); - client::SessionBase_0_10Access(shadowSession).get()->send(state); QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } -- cgit v1.2.1