summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp13
1 files changed, 3 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 465a5de021..fa247ae8f5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -109,7 +109,6 @@ void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
void BrokerContext::acquire(const broker::QueuedMessage& qm) {
if (tssNoReplicate) return;
- QueueContext::get(*qm.queue)->acquire();
core.mcast(ClusterMessageAcquireBody(
ProtocolVersion(), qm.queue->getName(), qm.position));
}
@@ -177,15 +176,12 @@ void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
// n is the number of consumers including the one just added.
// FIXME aconway 2011-06-27: rename, conflicting terms.
void BrokerContext::consume(broker::Queue& q, size_t n) {
- if (n == 1) {
- // FIXME aconway 2011-06-27: should be on QueueContext for symmetry?
- core.mcast(ClusterQueueSubscribeBody(ProtocolVersion(), q.getName()));
- }
+ QueueContext::get(q)->consume(n);
}
// n is the number of consumers after the cancel.
void BrokerContext::cancel(broker::Queue& q, size_t n) {
- if (n == 0) QueueContext::get(q)->unsubscribed();
+ QueueContext::get(q)->cancel(n);
}
void BrokerContext::empty(broker::Queue& ) {
@@ -196,10 +192,7 @@ void BrokerContext::stopped(broker::Queue& q) {
boost::intrusive_ptr<QueueContext> qc = QueueContext::get(q);
// Don't forward the stopped call if the queue does not yet have a cluster context
// this when the queue is first created locally.
- if (qc){
- QPID_LOG(critical, "FIXME BrokerContext::stopped " << q.getName());
- qc->stopped();
- }
+ if (qc) qc->stopped();
}
}} // namespace qpid::cluster