summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h8
-rw-r--r--qpid/cpp/src/qpid/ha/QueueRange.h22
3 files changed, 27 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 28ec855c2d..85feadd2ab 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -50,15 +50,15 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0), range(q)
+ : queue(q), subscription(0)
{
- // NOTE: The QueueGuard is created before the queue becomes active: either
- // when a backup is promoted, or when a new queue is created on the primary.
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
+ // Set range after addObserver so we know that range.back+1 is a guarded position.
+ range = QueueRange(q);
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -112,6 +112,7 @@ void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMess
bool QueueGuard::subscriptionStart(SequenceNumber position) {
// Complete any messages before or at the ReplicatingSubscription start position.
+ // Those messages are already on the backup.
if (!delayed.empty() && delayed.front() <= position) {
// FIXME aconway 2012-06-15: queue iteration, only messages in delayed
queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 9c6fb55015..8cc2055381 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -83,12 +83,14 @@ class QueueGuard {
* QueueGuard is created before the queue becomes active: either when a
* backup is promoted, or when a new queue is created on the primary.
*
- * NOTE: The first position protected by the guard is getRange().getBack()+1
+ * NOTE: The first position guaranteed to be protected by the guard is
+ * getRange().getBack()+1. It is possible that the guard has protected
+ * some messages before that point.
*/
const QueueRange& getRange() const { return range; } // range is immutable, no lock needed.
/** Inform the guard of the stating position for the attached subscription.
- * Complete messages that will not be seen by the subscriptino.
+ * Complete messages that will not be seen by the subscription.
*@return true if the subscription has already advanced to a guarded position.
*/
bool subscriptionStart(framing::SequenceNumber position);
@@ -102,7 +104,7 @@ class QueueGuard {
framing::SequenceSet delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
- const QueueRange range;
+ QueueRange range;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueRange.h b/qpid/cpp/src/qpid/ha/QueueRange.h
index 3e83ff795e..d734326910 100644
--- a/qpid/cpp/src/qpid/ha/QueueRange.h
+++ b/qpid/cpp/src/qpid/ha/QueueRange.h
@@ -33,17 +33,31 @@ namespace ha {
/**
* Get the front/back range of a queue or from a ReplicatingSubscription arguments table.
+ *
+ * The *back* of the queue is the position of the latest (most recently pushed)
+ * message on the queue or, if the queue is empty, the back is n-1 where n is
+ * the position that will be assigned to the next message pushed onto the queue.
+ *
+ * The *front* of the queue is the position of the oldest (next to be consumed) message
+ * on the queue or, if the queue is empty, it is the position that will be occupied
+ * by the next message pushed onto the queue.
+ *
+ * This leads to the slightly surprising conclusion that for an empty queue
+ * front = back+1
*/
struct QueueRange {
public:
framing::SequenceNumber front, back;
- QueueRange() { }
+ QueueRange() : front(1), back(0) { } // Empty range.
QueueRange(broker::Queue& q) {
- back = q.getPosition();
- front = back+1; // assume empty
- ReplicatingSubscription::getFront(q, front);
+ if (ReplicatingSubscription::getFront(q, front))
+ back = q.getPosition();
+ else {
+ back = q.getPosition();
+ front = back+1; // empty
+ }
assert(front <= back + 1);
}