summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-04-28 12:25:59 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-04-28 12:25:59 +0000
commit5f29278bbdc32b348847f866769dfb375761423c (patch)
tree384f45e9e5ce9a73eb295413cc0000f07139faa8 /qpid/cpp/src/qpid
parent70973bf809ce4e94d2486e082bc2ab47290ee88e (diff)
downloadqpid-python-5f29278bbdc32b348847f866769dfb375761423c.tar.gz
QPID-3076: enable flow control for clustered broker configurations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1097432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h5
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp135
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h11
-rw-r--r--qpid/cpp/src/qpid/broker/StatefulQueueObserver.h63
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp43
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp23
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h3
10 files changed, 223 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 764da735e3..240766c443 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -248,13 +248,7 @@ Broker::Broker(const Broker::Options& conf) :
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
- /** todo KAG - remove once cluster support for flow control done */
- if (isInCluster()) {
- QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default.");
- QueueFlowLimit::setDefaults(0, 0, 0);
- } else {
- QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
- }
+ QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
// If no plugin store module registered itself, set up the null store.
if (NullMessageStore::isNullStore(store.get()))
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 73d52ec9ca..c4f1bcc07e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -348,6 +348,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bindings.eachBinding(f);
}
+ /** Apply f to each Observer on the queue */
+ template <class F> void eachObserver(F f) {
+ std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
+ }
+
/** Set the position sequence number for the next message on the queue.
* Must be >= the current sequence number.
* Used by cluster to replicate queues.
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 3494288f7b..20679972ff 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -92,7 +92,7 @@ namespace {
QueueFlowLimit::QueueFlowLimit(Queue *_queue,
uint32_t _flowStopCount, uint32_t _flowResumeCount,
uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : queue(_queue), queueName("<unknown>"),
+ : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
@@ -123,8 +123,6 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
void QueueFlowLimit::enqueued(const QueuedMessage& msg)
{
- if (!msg.payload) return;
-
sys::Mutex::ScopedLock l(indexLock);
++count;
@@ -152,7 +150,9 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
}
QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
- index.insert(msg.payload);
+ bool unique;
+ unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+ assert(unique);
}
}
@@ -160,8 +160,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
void QueueFlowLimit::dequeued(const QueuedMessage& msg)
{
- if (!msg.payload) return;
-
sys::Mutex::ScopedLock l(indexLock);
if (count > 0) {
@@ -189,16 +187,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
if (!index.empty()) {
if (!flowStopped) {
// flow enabled - release all pending msgs
- while (!index.empty()) {
- std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
- (*itr)->getIngressCompletion().finishCompleter();
- index.erase(itr);
- }
+ for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ itr != index.end(); ++itr)
+ if (itr->second)
+ itr->second->getIngressCompletion().finishCompleter();
+ index.clear();
} else {
// even if flow controlled, we must release this msg as it is being dequeued
- std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
+ std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position);
if (itr != index.end()) { // this msg is flow controlled, release it:
- (*itr)->getIngressCompletion().finishCompleter();
+ msg.payload->getIngressCompletion().finishCompleter();
index.erase(itr);
}
}
@@ -206,34 +204,6 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
}
-/** used by clustering: is the given message's completion blocked due to flow
- * control? True if message is blocked. (for the clustering updater: done
- * after msgs have been replicated to the updatee).
- */
-bool QueueFlowLimit::getState(const QueuedMessage& msg) const
-{
- sys::Mutex::ScopedLock l(indexLock);
- return (index.find(msg.payload) != index.end());
-}
-
-
-/** artificially force the flow control state of a given message
- * (for the clustering updatee: done after msgs have been replicated to
- * the updatee's queue)
- */
-void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked)
-{
- if (blocked && msg.payload) {
-
- sys::Mutex::ScopedLock l(indexLock);
- assert(index.find(msg.payload) == index.end());
-
- QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC");
- index.insert(msg.payload);
- }
-}
-
-
void QueueFlowLimit::encode(Buffer& buffer) const
{
buffer.putLong(flowStopCount);
@@ -281,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint
defaultFlowStopRatio = flowStopRatio;
defaultFlowResumeRatio = flowResumeRatio;
- /** @todo Verify valid range on Broker::Options instead of here */
+ /** @todo KAG: Verify valid range on Broker::Options instead of here */
if (flowStopRatio > 100 || flowResumeRatio > 100)
throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:"
<< " flowStopRatio=" << flowStopRatio
@@ -320,14 +290,6 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F
if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control
return 0;
}
- /** @todo KAG - remove once cluster support for flow control done. */
- // TODO aconway 2011-02-16: is queue==0 only in tests?
- // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */
- if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
- QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
- << queue->getName());
- return 0;
- }
return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
@@ -335,17 +297,76 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F
uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+ return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+ }
+ return 0;
+}
- /** todo KAG - remove once cluster support for flow control done. */
- if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
- QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
- << queue->getName());
- return 0;
+/* Cluster replication */
+
+namespace {
+ /** pack a set of sequence number ranges into a framing::Array */
+ void buildSeqRangeArray(qpid::framing::Array *seqs,
+ const qpid::framing::SequenceNumber first,
+ const qpid::framing::SequenceNumber last)
+ {
+ seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first)));
+ seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last)));
+ }
+}
+
+/** Runs on UPDATER to snapshot current state */
+void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
+{
+ sys::Mutex::ScopedLock l(indexLock);
+ state.clear();
+
+ framing::SequenceSet ss;
+ if (!index.empty()) {
+ /* replicate the set of messages pending flow control */
+ for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin();
+ itr != index.end(); ++itr) {
+ ss.add(itr->first);
}
+ framing::Array seqs(TYPE_CODE_UINT32);
+ ss.for_each(boost::bind(&buildSeqRangeArray, &seqs, _1, _2));
+ state.setArray("pendingMsgSeqs", seqs);
+ }
+ QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss);
+}
- return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+
+/** called on UPDATEE to set state from snapshot */
+void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
+{
+ sys::Mutex::ScopedLock l(indexLock);
+ index.clear();
+
+ framing::SequenceSet fcmsg;
+ framing::Array seqArray(TYPE_CODE_UINT32);
+ if (state.getArray("pendingMsgSeqs", seqArray)) {
+ assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges
+ framing::Array::const_iterator i = seqArray.begin();
+ while (i != seqArray.end()) {
+ framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>());
+ ++i;
+ framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>());
+ ++i;
+ fcmsg.add(first, last);
+ for (SequenceNumber seq = first; seq <= last; ++seq) {
+ QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
+ bool unique;
+ unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+ assert(unique);
+ }
+ }
}
- return 0;
+
+ flowStopped = index.size() != 0;
+ if (queueMgmtObj) {
+ queueMgmtObj->set_flowStopped(isFlowControlActive());
+ }
+ QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg)
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 69d91df45a..5fdae39c29 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -27,7 +27,7 @@
#include <memory>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/QueuedMessage.h"
-#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
@@ -53,7 +53,7 @@ class Broker;
* passing _either_ level may turn flow control ON, but _both_ must be
* below level before flow control will be turned OFF.
*/
- class QueueFlowLimit : public QueueObserver
+ class QueueFlowLimit : public StatefulQueueObserver
{
static uint64_t defaultMaxSize;
static uint defaultFlowStopRatio;
@@ -86,9 +86,8 @@ class Broker;
QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
/** for clustering: */
- /** true if the given message is flow controlled, and cannot be completed. */
- bool getState(const QueuedMessage&) const;
- void setState(const QueuedMessage&, bool blocked);
+ QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
+ QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&);
uint32_t getFlowStopCount() const { return flowStopCount; }
uint32_t getFlowResumeCount() const { return flowResumeCount; }
@@ -111,7 +110,7 @@ class Broker;
protected:
// msgs waiting for flow to become available.
- std::set< boost::intrusive_ptr<Message> > index;
+ std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index;
mutable qpid::sys::Mutex indexLock;
_qmfBroker::Queue *queueMgmtObj;
diff --git a/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h b/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h
new file mode 100644
index 0000000000..c682d460b7
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h
@@ -0,0 +1,63 @@
+#ifndef QPID_BROKER_STATEFULQUEUEOBSERVER_H
+#define QPID_BROKER_STATEFULQUEUEOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Specialized type of QueueObserver that maintains internal state that has to
+ * be replicated across clustered brokers.
+ */
+class StatefulQueueObserver : public QueueObserver
+{
+ public:
+ StatefulQueueObserver(std::string _id) : id(_id) {}
+ virtual ~StatefulQueueObserver() {}
+
+ /** This identifier must uniquely identify this particular observer amoung
+ * all observers on a queue. For cluster replication, this id will be used
+ * to identify the peer queue observer for synchronization across
+ * brokers.
+ */
+ const std::string& getId() const { return id; }
+
+ /** This method should return the observer's internal state as an opaque
+ * map.
+ */
+ virtual void getState(qpid::framing::FieldTable& state ) const = 0;
+
+ /** The input map represents the internal state of the peer observer that
+ * this observer should synchonize to.
+ */
+ virtual void setState(const qpid::framing::FieldTable&) = 0;
+
+
+ private:
+ std::string id;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_STATEFULQUEUEOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 30d6a6d13f..0daf0c7f5a 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1058747;
+const uint32_t Cluster::CLUSTER_VERSION = 1097431;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index f2ea466a9b..b9895290e9 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -35,6 +35,7 @@
#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -558,6 +559,48 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri
}
}
+
+namespace {
+ // find a StatefulQueueObserver that matches a given identifier
+ class ObserverFinder {
+ const std::string id;
+ boost::shared_ptr<broker::QueueObserver> target;
+ ObserverFinder(const ObserverFinder&) {}
+ public:
+ ObserverFinder(const std::string& _id) : id(_id) {}
+ broker::StatefulQueueObserver *getObserver()
+ {
+ if (target)
+ return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+ return 0;
+ }
+ void operator() (boost::shared_ptr<broker::QueueObserver> o)
+ {
+ if (!target) {
+ broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (p && p->getId() == id) {
+ target = o;
+ }
+ }
+ }
+ };
+}
+
+
+void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
+{
+ boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+ ObserverFinder finder(observerId); // find this observer
+ queue->eachObserver<ObserverFinder &>(finder);
+ broker::StatefulQueueObserver *so = finder.getObserver();
+ if (so) {
+ so->setState( state );
+ QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
+ return;
+ }
+ QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
+}
+
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index a4436e84a8..04ace724da 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -153,6 +153,7 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
+ void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
void expiryId(uint64_t);
void txStart();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 8f751add9b..a15c14ff48 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -49,6 +49,7 @@
#include "qpid/broker/TxPublish.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -167,6 +168,9 @@ void UpdateClient::update() {
boost::bind(&UpdateClient::updateConnection, this, _1));
session.queueDelete(arg::queue=UPDATE);
+ // some Queue Observers need session state & msgs synced first, so sync observers now
+ b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
+
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
@@ -615,4 +619,23 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge)
ClusterConnectionProxy(session).config(encode(*bridge));
}
+void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q)
+{
+ q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1));
+}
+
+void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
+ boost::shared_ptr<broker::QueueObserver> o)
+{
+ qpid::framing::FieldTable state;
+ broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+ if (so) {
+ so->getState( state );
+ std::string id(so->getId());
+ QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id);
+ ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state );
+ }
+}
+
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index 7520bb82cb..bbf7a948bc 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -51,6 +51,7 @@ class SemanticState;
class Decoder;
class Link;
class Bridge;
+class QueueObserver;
} // namespace broker
@@ -104,6 +105,8 @@ class UpdateClient : public sys::Runnable {
void updateLinks();
void updateLink(const boost::shared_ptr<broker::Link>&);
void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+ void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
+ void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;