summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-24 13:33:32 +0000
committerAlan Conway <aconway@apache.org>2012-07-24 13:33:32 +0000
commit5b452196ca1f8fcbc3dd220758db5a6e2aa2e4ad (patch)
treebd6b869881a1f32ceb2bb2e1f97813e6cc9059f6
parent8396d0cde153879411ec9add0da1499f35011805 (diff)
downloadqpid-python-5b452196ca1f8fcbc3dd220758db5a6e2aa2e4ad.tar.gz
QPID-4159: HA Missing messages in failover test.
QueueGuard was taking its snapshot of the initial queue range *before* it registered its QueueObserver. That means it was possible to have unguarded messages between the end of the snapshot and the first position protected by the guard. Fixed race condition in QueueRange constructor: Must call getPosition() *after* getFront() since both may be advancing and we want to end up with a valid range front <= back+1. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1365044 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h8
-rw-r--r--qpid/cpp/src/qpid/ha/QueueRange.h22
3 files changed, 27 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 28ec855c2d..85feadd2ab 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -50,15 +50,15 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0), range(q)
+ : queue(q), subscription(0)
{
- // NOTE: The QueueGuard is created before the queue becomes active: either
- // when a backup is promoted, or when a new queue is created on the primary.
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
+ // Set range after addObserver so we know that range.back+1 is a guarded position.
+ range = QueueRange(q);
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -112,6 +112,7 @@ void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMess
bool QueueGuard::subscriptionStart(SequenceNumber position) {
// Complete any messages before or at the ReplicatingSubscription start position.
+ // Those messages are already on the backup.
if (!delayed.empty() && delayed.front() <= position) {
// FIXME aconway 2012-06-15: queue iteration, only messages in delayed
queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 9c6fb55015..8cc2055381 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -83,12 +83,14 @@ class QueueGuard {
* QueueGuard is created before the queue becomes active: either when a
* backup is promoted, or when a new queue is created on the primary.
*
- * NOTE: The first position protected by the guard is getRange().getBack()+1
+ * NOTE: The first position guaranteed to be protected by the guard is
+ * getRange().getBack()+1. It is possible that the guard has protected
+ * some messages before that point.
*/
const QueueRange& getRange() const { return range; } // range is immutable, no lock needed.
/** Inform the guard of the stating position for the attached subscription.
- * Complete messages that will not be seen by the subscriptino.
+ * Complete messages that will not be seen by the subscription.
*@return true if the subscription has already advanced to a guarded position.
*/
bool subscriptionStart(framing::SequenceNumber position);
@@ -102,7 +104,7 @@ class QueueGuard {
framing::SequenceSet delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
- const QueueRange range;
+ QueueRange range;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueRange.h b/qpid/cpp/src/qpid/ha/QueueRange.h
index 3e83ff795e..d734326910 100644
--- a/qpid/cpp/src/qpid/ha/QueueRange.h
+++ b/qpid/cpp/src/qpid/ha/QueueRange.h
@@ -33,17 +33,31 @@ namespace ha {
/**
* Get the front/back range of a queue or from a ReplicatingSubscription arguments table.
+ *
+ * The *back* of the queue is the position of the latest (most recently pushed)
+ * message on the queue or, if the queue is empty, the back is n-1 where n is
+ * the position that will be assigned to the next message pushed onto the queue.
+ *
+ * The *front* of the queue is the position of the oldest (next to be consumed) message
+ * on the queue or, if the queue is empty, it is the position that will be occupied
+ * by the next message pushed onto the queue.
+ *
+ * This leads to the slightly surprising conclusion that for an empty queue
+ * front = back+1
*/
struct QueueRange {
public:
framing::SequenceNumber front, back;
- QueueRange() { }
+ QueueRange() : front(1), back(0) { } // Empty range.
QueueRange(broker::Queue& q) {
- back = q.getPosition();
- front = back+1; // assume empty
- ReplicatingSubscription::getFront(q, front);
+ if (ReplicatingSubscription::getFront(q, front))
+ back = q.getPosition();
+ else {
+ back = q.getPosition();
+ front = back+1; // empty
+ }
assert(front <= back + 1);
}