diff options
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueRange.h | 22 |
3 files changed, 27 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 28ec855c2d..85feadd2ab 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -50,15 +50,15 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : queue(q), subscription(0), range(q) + : queue(q), subscription(0) { - // NOTE: The QueueGuard is created before the queue becomes active: either - // when a backup is promoted, or when a new queue is created on the primary. std::ostringstream os; os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); queue.addObserver(observer); + // Set range after addObserver so we know that range.back+1 is a guarded position. + range = QueueRange(q); } QueueGuard::~QueueGuard() { cancel(); } @@ -112,6 +112,7 @@ void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMess bool QueueGuard::subscriptionStart(SequenceNumber position) { // Complete any messages before or at the ReplicatingSubscription start position. + // Those messages are already on the backup. if (!delayed.empty() && delayed.front() <= position) { // FIXME aconway 2012-06-15: queue iteration, only messages in delayed queue.eachMessage(boost::bind(&completeBefore, this, position, _1)); diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 9c6fb55015..8cc2055381 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -83,12 +83,14 @@ class QueueGuard { * QueueGuard is created before the queue becomes active: either when a * backup is promoted, or when a new queue is created on the primary. * - * NOTE: The first position protected by the guard is getRange().getBack()+1 + * NOTE: The first position guaranteed to be protected by the guard is + * getRange().getBack()+1. It is possible that the guard has protected + * some messages before that point. */ const QueueRange& getRange() const { return range; } // range is immutable, no lock needed. /** Inform the guard of the stating position for the attached subscription. - * Complete messages that will not be seen by the subscriptino. + * Complete messages that will not be seen by the subscription. *@return true if the subscription has already advanced to a guarded position. */ bool subscriptionStart(framing::SequenceNumber position); @@ -102,7 +104,7 @@ class QueueGuard { framing::SequenceSet delayed; ReplicatingSubscription* subscription; boost::shared_ptr<QueueObserver> observer; - const QueueRange range; + QueueRange range; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueRange.h b/qpid/cpp/src/qpid/ha/QueueRange.h index 3e83ff795e..d734326910 100644 --- a/qpid/cpp/src/qpid/ha/QueueRange.h +++ b/qpid/cpp/src/qpid/ha/QueueRange.h @@ -33,17 +33,31 @@ namespace ha { /** * Get the front/back range of a queue or from a ReplicatingSubscription arguments table. + * + * The *back* of the queue is the position of the latest (most recently pushed) + * message on the queue or, if the queue is empty, the back is n-1 where n is + * the position that will be assigned to the next message pushed onto the queue. + * + * The *front* of the queue is the position of the oldest (next to be consumed) message + * on the queue or, if the queue is empty, it is the position that will be occupied + * by the next message pushed onto the queue. + * + * This leads to the slightly surprising conclusion that for an empty queue + * front = back+1 */ struct QueueRange { public: framing::SequenceNumber front, back; - QueueRange() { } + QueueRange() : front(1), back(0) { } // Empty range. QueueRange(broker::Queue& q) { - back = q.getPosition(); - front = back+1; // assume empty - ReplicatingSubscription::getFront(q, front); + if (ReplicatingSubscription::getFront(q, front)) + back = q.getPosition(); + else { + back = q.getPosition(); + front = back+1; // empty + } assert(front <= back + 1); } |