From 9b7442210d74846fac84e5e86236f0f2fc21886c Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Thu, 28 Apr 2011 12:25:59 +0000 Subject: QPID-3076: enable flow control for clustered broker configurations. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1097432 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Connection.cpp | 43 +++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) (limited to 'cpp/src/qpid/cluster/Connection.cpp') diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index f2ea466a9b..b9895290e9 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -35,6 +35,7 @@ #include "qpid/broker/Fairshare.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" @@ -558,6 +559,48 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri } } + +namespace { + // find a StatefulQueueObserver that matches a given identifier + class ObserverFinder { + const std::string id; + boost::shared_ptr target; + ObserverFinder(const ObserverFinder&) {} + public: + ObserverFinder(const std::string& _id) : id(_id) {} + broker::StatefulQueueObserver *getObserver() + { + if (target) + return dynamic_cast(target.get()); + return 0; + } + void operator() (boost::shared_ptr o) + { + if (!target) { + broker::StatefulQueueObserver *p = dynamic_cast(o.get()); + if (p && p->getId() == id) { + target = o; + } + } + } + }; +} + + +void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state) +{ + boost::shared_ptr queue(findQueue(qname)); + ObserverFinder finder(observerId); // find this observer + queue->eachObserver(finder); + broker::StatefulQueueObserver *so = finder.getObserver(); + if (so) { + so->setState( state ); + QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ..."); + return; + } + QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); +} + void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); } -- cgit v1.2.1