diff options
author | Alan Conway <aconway@apache.org> | 2009-06-16 21:21:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-06-16 21:21:09 +0000 |
commit | 80d65b38008d9b7f31c825508819f9600d63b63c (patch) | |
tree | 316862bff35f1cae6f0d1152dcf4a6e3b0f967ed /cpp/src/qpid/cluster/UpdateClient.cpp | |
parent | f5e98a6dfb8c4defe22755340f440e6f16c2559a (diff) | |
download | qpid-python-80d65b38008d9b7f31c825508819f9600d63b63c.tar.gz |
Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection
holding only the OutputTasks that are potentially active. Tasks are
droped from the list as soon as they return false, and added back when
they may have output.
Inlined frequently-used SequenceNumber functions.
Replace std::list in QueueListeners with std::vector.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@785408 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 38 |
1 files changed, 28 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 332e74c512..7c305a2e92 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -54,6 +54,7 @@ #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> +#include <boost/cast.hpp> #include <algorithm> namespace qpid { @@ -64,6 +65,8 @@ using broker::Exchange; using broker::Queue; using broker::QueueBinding; using broker::Message; +using broker::SemanticState; + using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; @@ -125,7 +128,8 @@ void UpdateClient::update() { Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); - // Update queue is used to transfer acquired messages that are no longer on their original queue. + // Update queue is used to transfer acquired messages that are no + // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); @@ -256,6 +260,16 @@ void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& que s.exchangeBind(queue, binding.exchange, binding.key, binding.args); } +void UpdateClient::updateOutputTask(const sys::OutputTask* task) { + const SemanticState::ConsumerImpl* cci = + boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); + SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); + uint16_t channel = ci->getParent().getSession().getChannel(); + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); + QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + << " channel=" << channel); +} + void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); shadowConnection = catchUpConnection(); @@ -266,6 +280,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); // Safe to use decoder here because we are stalled for update. std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment(); + bc.getOutputTasks().eachOutput( + boost::bind(&UpdateClient::updateOutputTask, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), updateConnection->getId().getNumber(), @@ -294,9 +310,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, updaterId << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33. QPID_LOG(debug, updaterId << " updating consumers."); - ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this)); + ss->getSemanticState().eachConsumer( + boost::bind(&UpdateClient::updateConsumer, this, _1)); QPID_LOG(debug, updaterId << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); @@ -304,7 +320,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { updateTxState(ss->getSemanticState()); // Tx transaction state. - // Adjust for command counter for message in progress, will be sent after state update. + // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) @@ -328,8 +344,11 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); } -void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); +void UpdateClient::updateConsumer( + const broker::SemanticState::ConsumerImpl::shared_ptr& ci) +{ + QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -344,13 +363,12 @@ void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); - ClusterConnectionConsumerStateBody state( - ProtocolVersion(), + ClusterConnectionProxy(shadowSession).consumerState( ci->getName(), ci->isBlocked(), - ci->isNotifyEnabled() + ci->isNotifyEnabled(), + ci->getQueue()->getListeners().contains(ci) ); - client::SessionBase_0_10Access(shadowSession).get()->send(state); QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } |