diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-04-28 12:25:59 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-04-28 12:25:59 +0000 |
commit | 9b7442210d74846fac84e5e86236f0f2fc21886c (patch) | |
tree | 6269e80bae30d0bf18f2ad72b8943f14f3bcaf6a /cpp/src/qpid/cluster | |
parent | 55c1e336b7ba8f30a9c673f59150eb75ff62505e (diff) | |
download | qpid-python-9b7442210d74846fac84e5e86236f0f2fc21886c.tar.gz |
QPID-3076: enable flow control for clustered broker configurations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1097432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 43 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 3 |
5 files changed, 71 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 30d6a6d13f..0daf0c7f5a 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1058747; +const uint32_t Cluster::CLUSTER_VERSION = 1097431; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index f2ea466a9b..b9895290e9 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -35,6 +35,7 @@ #include "qpid/broker/Fairshare.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" @@ -558,6 +559,48 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri } } + +namespace { + // find a StatefulQueueObserver that matches a given identifier + class ObserverFinder { + const std::string id; + boost::shared_ptr<broker::QueueObserver> target; + ObserverFinder(const ObserverFinder&) {} + public: + ObserverFinder(const std::string& _id) : id(_id) {} + broker::StatefulQueueObserver *getObserver() + { + if (target) + return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); + return 0; + } + void operator() (boost::shared_ptr<broker::QueueObserver> o) + { + if (!target) { + broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (p && p->getId() == id) { + target = o; + } + } + } + }; +} + + +void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state) +{ + boost::shared_ptr<broker::Queue> queue(findQueue(qname)); + ObserverFinder finder(observerId); // find this observer + queue->eachObserver<ObserverFinder &>(finder); + broker::StatefulQueueObserver *so = finder.getObserver(); + if (so) { + so->setState( state ); + QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ..."); + return; + } + QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); +} + void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index a4436e84a8..04ace724da 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -153,6 +153,7 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); + void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); void expiryId(uint64_t); void txStart(); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 8f751add9b..a15c14ff48 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -49,6 +49,7 @@ #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -167,6 +168,9 @@ void UpdateClient::update() { boost::bind(&UpdateClient::updateConnection, this, _1)); session.queueDelete(arg::queue=UPDATE); + // some Queue Observers need session state & msgs synced first, so sync observers now + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); + // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); @@ -615,4 +619,23 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) ClusterConnectionProxy(session).config(encode(*bridge)); } +void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q) +{ + q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1)); +} + +void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, + boost::shared_ptr<broker::QueueObserver> o) +{ + qpid::framing::FieldTable state; + broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (so) { + so->getState( state ); + std::string id(so->getId()); + QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); + ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); + } +} + + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 7520bb82cb..bbf7a948bc 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -51,6 +51,7 @@ class SemanticState; class Decoder; class Link; class Bridge; +class QueueObserver; } // namespace broker @@ -104,6 +105,8 @@ class UpdateClient : public sys::Runnable { void updateLinks(); void updateLink(const boost::shared_ptr<broker::Link>&); void updateBridge(const boost::shared_ptr<broker::Bridge>&); + void updateQueueObservers(const boost::shared_ptr<broker::Queue>&); + void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>); Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering; |