diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp index 11a7496582..66a7a81f33 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp @@ -28,9 +28,8 @@ namespace qpid { namespace cluster { -QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q, - const MemberId& self_) - : queue(q), self(self_), context(QueueContext::get(*q)) +QueueReplica::QueueReplica(QueueContext& qc, const MemberId& self_) + : self(self_), context(qc) {} struct PrintSubscribers { @@ -56,27 +55,28 @@ void QueueReplica::subscribe(const MemberId& member) { update(before); } -// FIXME aconway 2011-09-20: need to requeue. -void QueueReplica::unsubscribe(const MemberId& member) { +void QueueReplica::unsubscribe(const MemberId& member, bool resubscribe) +{ + assert(!resubscribe || member == subscribers.front()); QueueOwnership before = getState(); MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member); - if (i != subscribers.end()) subscribers.erase(i, subscribers.end()); + subscribers.erase(i, subscribers.end()); + if (resubscribe) subscribers.push_back(member); update(before); } -void QueueReplica::resubscribe(const MemberId& member) { - assert (member == subscribers.front()); - QueueOwnership before = getState(); - subscribers.pop_front(); - subscribers.push_back(member); - update(before); +void QueueReplica::consumed(const MemberId& member, + const framing::SequenceSet& acquired, + const framing::SequenceSet& dequeued) +{ + context.consumed(member, acquired, dequeued); } void QueueReplica::update(QueueOwnership before) { QueueOwnership after = getState(); - QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": " + QPID_LOG(trace, "cluster: queue replica: " << context.getQueue().getName() << ": " << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]"); - context->replicaState(before, after); + context.replicaState(before, after); } QueueOwnership QueueReplica::getState() const { |