summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp28
1 files changed, 14 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 11a7496582..66a7a81f33 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -28,9 +28,8 @@
namespace qpid {
namespace cluster {
-QueueReplica::QueueReplica(boost::shared_ptr<broker::Queue> q,
- const MemberId& self_)
- : queue(q), self(self_), context(QueueContext::get(*q))
+QueueReplica::QueueReplica(QueueContext& qc, const MemberId& self_)
+ : self(self_), context(qc)
{}
struct PrintSubscribers {
@@ -56,27 +55,28 @@ void QueueReplica::subscribe(const MemberId& member) {
update(before);
}
-// FIXME aconway 2011-09-20: need to requeue.
-void QueueReplica::unsubscribe(const MemberId& member) {
+void QueueReplica::unsubscribe(const MemberId& member, bool resubscribe)
+{
+ assert(!resubscribe || member == subscribers.front());
QueueOwnership before = getState();
MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
- if (i != subscribers.end()) subscribers.erase(i, subscribers.end());
+ subscribers.erase(i, subscribers.end());
+ if (resubscribe) subscribers.push_back(member);
update(before);
}
-void QueueReplica::resubscribe(const MemberId& member) {
- assert (member == subscribers.front());
- QueueOwnership before = getState();
- subscribers.pop_front();
- subscribers.push_back(member);
- update(before);
+void QueueReplica::consumed(const MemberId& member,
+ const framing::SequenceSet& acquired,
+ const framing::SequenceSet& dequeued)
+{
+ context.consumed(member, acquired, dequeued);
}
void QueueReplica::update(QueueOwnership before) {
QueueOwnership after = getState();
- QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": "
+ QPID_LOG(trace, "cluster: queue replica: " << context.getQueue().getName() << ": "
<< before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
- context->replicaState(before, after);
+ context.replicaState(before, after);
}
QueueOwnership QueueReplica::getState() const {