diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-04-28 12:25:59 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-04-28 12:25:59 +0000 |
commit | 9b7442210d74846fac84e5e86236f0f2fc21886c (patch) | |
tree | 6269e80bae30d0bf18f2ad72b8943f14f3bcaf6a /cpp/src/qpid/cluster/UpdateClient.cpp | |
parent | 55c1e336b7ba8f30a9c673f59150eb75ff62505e (diff) | |
download | qpid-python-9b7442210d74846fac84e5e86236f0f2fc21886c.tar.gz |
QPID-3076: enable flow control for clustered broker configurations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1097432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 23 |
1 files changed, 23 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 8f751add9b..a15c14ff48 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -49,6 +49,7 @@ #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -167,6 +168,9 @@ void UpdateClient::update() { boost::bind(&UpdateClient::updateConnection, this, _1)); session.queueDelete(arg::queue=UPDATE); + // some Queue Observers need session state & msgs synced first, so sync observers now + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); + // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); @@ -615,4 +619,23 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) ClusterConnectionProxy(session).config(encode(*bridge)); } +void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q) +{ + q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1)); +} + +void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, + boost::shared_ptr<broker::QueueObserver> o) +{ + qpid::framing::FieldTable state; + broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (so) { + so->getState( state ); + std::string id(so->getId()); + QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); + ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); + } +} + + }} // namespace qpid::cluster |