summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp31
1 files changed, 21 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 0c96e9326d..f763841c20 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -45,25 +45,36 @@ bool QueueHandler::handle(const framing::AMQFrame& frame) {
void QueueHandler::subscribe(const std::string& queue) {
find(queue)->subscribe(sender());
}
-void QueueHandler::unsubscribe(const std::string& queue) {
- find(queue)->unsubscribe(sender());
+
+void QueueHandler::unsubscribe(const std::string& queue,
+ bool resubscribe) {
+ find(queue)->unsubscribe(sender(), resubscribe);
}
-void QueueHandler::resubscribe(const std::string& queue) {
- find(queue)->resubscribe(sender());
+
+void QueueHandler::consumed(const std::string& queue,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued)
+{
+ find(queue)->consumed(sender(), acquired, dequeued);
}
void QueueHandler::left(const MemberId& member) {
// Unsubscribe for members that leave.
for (QueueMap::iterator i = queues.begin(); i != queues.end(); ++i)
- i->second->unsubscribe(member);
+ i->second->unsubscribe(member, false);
}
-void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
+void QueueHandler::add(broker::Queue& q) {
// Local queues already have a context, remote queues need one.
- if (!QueueContext::get(*q))
- new QueueContext(*q, group, consumeTicks); // Context attaches to the Queue
- queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
- new QueueReplica(q, self()));
+ if (!QueueContext::get(q))
+ new QueueContext(q, group, consumeTicks); // Context attaches to the Queue
+ assert(QueueContext::get(q));
+ queues[q.getName()] = boost::intrusive_ptr<QueueReplica>(
+ new QueueReplica(*QueueContext::get(q), self()));
+}
+
+void QueueHandler::remove(broker::Queue& q) {
+ queues.erase(q.getName());
}
boost::intrusive_ptr<QueueReplica> QueueHandler::find(const std::string& queue) {