diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp index 0c96e9326d..f763841c20 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -45,25 +45,36 @@ bool QueueHandler::handle(const framing::AMQFrame& frame) { void QueueHandler::subscribe(const std::string& queue) { find(queue)->subscribe(sender()); } -void QueueHandler::unsubscribe(const std::string& queue) { - find(queue)->unsubscribe(sender()); + +void QueueHandler::unsubscribe(const std::string& queue, + bool resubscribe) { + find(queue)->unsubscribe(sender(), resubscribe); } -void QueueHandler::resubscribe(const std::string& queue) { - find(queue)->resubscribe(sender()); + +void QueueHandler::consumed(const std::string& queue, + const framing::SequenceSet& acquired, + const framing::SequenceSet& dequeued) +{ + find(queue)->consumed(sender(), acquired, dequeued); } void QueueHandler::left(const MemberId& member) { // Unsubscribe for members that leave. for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i) - i->second->unsubscribe(member); + i->second->unsubscribe(member, false); } -void QueueHandler::add(boost::shared_ptr<broker::Queue> q) { +void QueueHandler::add(broker::Queue& q) { // Local queues already have a context, remote queues need one. - if (!QueueContext::get(*q)) - new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue - queues[q->getName()] = boost::intrusive_ptr<QueueReplica>( - new QueueReplica(q, self())); + if (!QueueContext::get(q)) + new QueueContext(q, group, consumeTicks); // Context attaches to the Queue + assert(QueueContext::get(q)); + queues[q.getName()] = boost::intrusive_ptr<QueueReplica>( + new QueueReplica(*QueueContext::get(q), self())); +} + +void QueueHandler::remove(broker::Queue& q) { + queues.erase(q.getName()); } boost::intrusive_ptr<QueueReplica> QueueHandler::find(const std::string& queue) { |