summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-04-28 12:25:59 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-04-28 12:25:59 +0000
commit9b7442210d74846fac84e5e86236f0f2fc21886c (patch)
tree6269e80bae30d0bf18f2ad72b8943f14f3bcaf6a /cpp/src/qpid/cluster
parent55c1e336b7ba8f30a9c673f59150eb75ff62505e (diff)
downloadqpid-python-9b7442210d74846fac84e5e86236f0f2fc21886c.tar.gz
QPID-3076: enable flow control for clustered broker configurations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1097432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp43
-rw-r--r--cpp/src/qpid/cluster/Connection.h1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp23
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h3
5 files changed, 71 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 30d6a6d13f..0daf0c7f5a 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1058747;
+const uint32_t Cluster::CLUSTER_VERSION = 1097431;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index f2ea466a9b..b9895290e9 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -35,6 +35,7 @@
#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -558,6 +559,48 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri
}
}
+
+namespace {
+ // find a StatefulQueueObserver that matches a given identifier
+ class ObserverFinder {
+ const std::string id;
+ boost::shared_ptr<broker::QueueObserver> target;
+ ObserverFinder(const ObserverFinder&) {}
+ public:
+ ObserverFinder(const std::string& _id) : id(_id) {}
+ broker::StatefulQueueObserver *getObserver()
+ {
+ if (target)
+ return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+ return 0;
+ }
+ void operator() (boost::shared_ptr<broker::QueueObserver> o)
+ {
+ if (!target) {
+ broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (p && p->getId() == id) {
+ target = o;
+ }
+ }
+ }
+ };
+}
+
+
+void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
+{
+ boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+ ObserverFinder finder(observerId); // find this observer
+ queue->eachObserver<ObserverFinder &>(finder);
+ broker::StatefulQueueObserver *so = finder.getObserver();
+ if (so) {
+ so->setState( state );
+ QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
+ return;
+ }
+ QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
+}
+
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index a4436e84a8..04ace724da 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -153,6 +153,7 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
+ void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
void expiryId(uint64_t);
void txStart();
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 8f751add9b..a15c14ff48 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -49,6 +49,7 @@
#include "qpid/broker/TxPublish.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -167,6 +168,9 @@ void UpdateClient::update() {
boost::bind(&UpdateClient::updateConnection, this, _1));
session.queueDelete(arg::queue=UPDATE);
+ // some Queue Observers need session state & msgs synced first, so sync observers now
+ b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
+
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
@@ -615,4 +619,23 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge)
ClusterConnectionProxy(session).config(encode(*bridge));
}
+void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q)
+{
+ q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1));
+}
+
+void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
+ boost::shared_ptr<broker::QueueObserver> o)
+{
+ qpid::framing::FieldTable state;
+ broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (so) {
+ so->getState( state );
+ std::string id(so->getId());
+ QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id);
+ ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state );
+ }
+}
+
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 7520bb82cb..bbf7a948bc 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -51,6 +51,7 @@ class SemanticState;
class Decoder;
class Link;
class Bridge;
+class QueueObserver;
} // namespace broker
@@ -104,6 +105,8 @@ class UpdateClient : public sys::Runnable {
void updateLinks();
void updateLink(const boost::shared_ptr<broker::Link>&);
void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+ void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
+ void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;