summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-04-28 12:25:59 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-04-28 12:25:59 +0000
commit9b7442210d74846fac84e5e86236f0f2fc21886c (patch)
tree6269e80bae30d0bf18f2ad72b8943f14f3bcaf6a /cpp/src/qpid/cluster/Connection.cpp
parent55c1e336b7ba8f30a9c673f59150eb75ff62505e (diff)
downloadqpid-python-9b7442210d74846fac84e5e86236f0f2fc21886c.tar.gz
QPID-3076: enable flow control for clustered broker configurations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1097432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp43
1 files changed, 43 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index f2ea466a9b..b9895290e9 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -35,6 +35,7 @@
#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -558,6 +559,48 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri
}
}
+
+namespace {
+ // find a StatefulQueueObserver that matches a given identifier
+ class ObserverFinder {
+ const std::string id;
+ boost::shared_ptr<broker::QueueObserver> target;
+ ObserverFinder(const ObserverFinder&) {}
+ public:
+ ObserverFinder(const std::string& _id) : id(_id) {}
+ broker::StatefulQueueObserver *getObserver()
+ {
+ if (target)
+ return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+ return 0;
+ }
+ void operator() (boost::shared_ptr<broker::QueueObserver> o)
+ {
+ if (!target) {
+ broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (p && p->getId() == id) {
+ target = o;
+ }
+ }
+ }
+ };
+}
+
+
+void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
+{
+ boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+ ObserverFinder finder(observerId); // find this observer
+ queue->eachObserver<ObserverFinder &>(finder);
+ broker::StatefulQueueObserver *so = finder.getObserver();
+ if (so) {
+ so->setState( state );
+ QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
+ return;
+ }
+ QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
+}
+
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
}