summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-06-16 21:21:09 +0000
committerAlan Conway <aconway@apache.org>2009-06-16 21:21:09 +0000
commit80d65b38008d9b7f31c825508819f9600d63b63c (patch)
tree316862bff35f1cae6f0d1152dcf4a6e3b0f967ed /cpp/src/qpid/cluster/UpdateClient.cpp
parentf5e98a6dfb8c4defe22755340f440e6f16c2559a (diff)
downloadqpid-python-80d65b38008d9b7f31c825508819f9600d63b63c.tar.gz
Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection holding only the OutputTasks that are potentially active. Tasks are droped from the list as soon as they return false, and added back when they may have output. Inlined frequently-used SequenceNumber functions. Replace std::list in QueueListeners with std::vector. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@785408 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp38
1 files changed, 28 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 332e74c512..7c305a2e92 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -54,6 +54,7 @@
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
+#include <boost/cast.hpp>
#include <algorithm>
namespace qpid {
@@ -64,6 +65,8 @@ using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
+using broker::SemanticState;
+
using namespace framing;
namespace arg=client::arg;
using client::SessionBase_0_10Access;
@@ -125,7 +128,8 @@ void UpdateClient::update() {
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
- // Update queue is used to transfer acquired messages that are no longer on their original queue.
+ // Update queue is used to transfer acquired messages that are no
+ // longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
@@ -256,6 +260,16 @@ void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& que
s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
+void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
+ const SemanticState::ConsumerImpl* cci =
+ boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
+ SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
+ uint16_t channel = ci->getParent().getSession().getChannel();
+ ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName());
+ QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+ << " channel=" << channel);
+}
+
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
shadowConnection = catchUpConnection();
@@ -266,6 +280,8 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
// Safe to use decoder here because we are stalled for update.
std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
+ bc.getOutputTasks().eachOutput(
+ boost::bind(&UpdateClient::updateOutputTask, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
@@ -294,9 +310,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updating exclusive queues.");
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
- // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
QPID_LOG(debug, updaterId << " updating consumers.");
- ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
+ ss->getSemanticState().eachConsumer(
+ boost::bind(&UpdateClient::updateConsumer, this, _1));
QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
@@ -304,7 +320,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
updateTxState(ss->getSemanticState()); // Tx transaction state.
- // Adjust for command counter for message in progress, will be sent after state update.
+ // Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
@@ -328,8 +344,11 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
-void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId());
+void UpdateClient::updateConsumer(
+ const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
+{
+ QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
+ << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -344,13 +363,12 @@ void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci)
shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
- ClusterConnectionConsumerStateBody state(
- ProtocolVersion(),
+ ClusterConnectionProxy(shadowSession).consumerState(
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabled()
+ ci->isNotifyEnabled(),
+ ci->getQueue()->getListeners().contains(ci)
);
- client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
}