summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp28
1 files changed, 9 insertions, 19 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 551477a920..7bbd6e1422 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -44,7 +44,7 @@ std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps) {
return o;
}
-std::ostream& operator<<(std::ostream& o, QueueReplica::State s) {
+std::ostream& operator<<(std::ostream& o, QueueOwnership s) {
static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" };
return o << tags[s];
}
@@ -58,13 +58,13 @@ std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) {
// FIXME aconway 2011-05-17: error handling for asserts.
void QueueReplica::subscribe(const MemberId& member) {
- State before = getState();
+ QueueOwnership before = getState();
subscribers.push_back(member);
update(before);
}
void QueueReplica::unsubscribe(const MemberId& member) {
- State before = getState();
+ QueueOwnership before = getState();
MemberQueue::iterator i = std::remove(subscribers.begin(), subscribers.end(), member);
if (i != subscribers.end()) {
subscribers.erase(i, subscribers.end());
@@ -74,30 +74,20 @@ void QueueReplica::unsubscribe(const MemberId& member) {
void QueueReplica::resubscribe(const MemberId& member) {
assert (member == subscribers.front()); // FIXME aconway 2011-06-27: error handling
- State before = getState();
+ QueueOwnership before = getState();
subscribers.pop_front();
subscribers.push_back(member);
update(before);
}
-void QueueReplica::update(State before) {
- const int acquireLimit = 10; // FIXME aconway 2011-06-23: configurable
- State after = getState();
- if (before == after) return;
+void QueueReplica::update(QueueOwnership before) {
QPID_LOG(trace, "QueueReplica " << *this << " (was " << before << ")");
- switch (after) {
- case UNSUBSCRIBED: break;
- case SUBSCRIBED: break;
- case SOLE_OWNER:
- context->soleOwner();
- break;
- case SHARED_OWNER:
- context->sharedOwner(acquireLimit);
- break;
- }
+ QueueOwnership after = getState();
+ if (before == after) return;
+ context->replicaState(after);
}
-QueueReplica::State QueueReplica::getState() const {
+QueueOwnership QueueReplica::getState() const {
if (isOwner())
return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER;
return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;