summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-18 20:43:19 +0000
committerAlan Conway <aconway@apache.org>2012-06-18 20:43:19 +0000
commitc45ee73853cb7c84bb2a7dd0c7f9fdecd7aa9286 (patch)
treef31c2dc75fdcb8ec16ea9732a84a4907c5306850
parentbd64faa59c1a90d41e389115a8445f9917a804df (diff)
downloadqpid-python-c45ee73853cb7c84bb2a7dd0c7f9fdecd7aa9286.tar.gz
QPID-4072: HA use backup messages in failover.
ReplicatingSubscription syncs the primary and backup queues, and does not re-send messages that are already on the backup. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351481 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/ha.mk1
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp31
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h18
-rw-r--r--qpid/cpp/src/qpid/ha/QueueRange.h71
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp134
-rw-r--r--qpid/cpp/src/tests/brokertest.py5
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py5
8 files changed, 152 insertions, 114 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 9791c391fe..cb037573a1 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -648,6 +648,7 @@ if (BUILD_HA)
qpid/ha/Primary.cpp
qpid/ha/Primary.h
qpid/ha/PrimaryConnectionMonitor.h
+ qpid/ha/QueueRange.h
qpid/ha/QueueReplicator.cpp
qpid/ha/QueueReplicator.h
qpid/ha/ReplicatingSubscription.cpp
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index da8e35c90d..bf1bd10256 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -43,6 +43,7 @@ ha_la_SOURCES = \
qpid/ha/PrimaryConnectionMonitor.h \
qpid/ha/QueueGuard.cpp \
qpid/ha/QueueGuard.h \
+ qpid/ha/QueueRange.h \
qpid/ha/QueueReplicator.cpp \
qpid/ha/QueueReplicator.h \
qpid/ha/ReplicatingSubscription.cpp \
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 6293f640e1..1f4ff4e48b 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -50,15 +50,15 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0)
+ : queue(q), subscription(0), range(q)
{
+ // NOTE: The QueueGuard is created before the queue becomes active: either
+ // when a backup is promoted, or when a new queue is created on the primary.
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
- // Set after addObserver to ensure we dont miss an enqueue.
- firstSafe = queue.getPosition() + 1; // Next message will be protected by the guard.
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -99,24 +99,24 @@ void QueueGuard::cancel() {
queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
}
+void QueueGuard::attach(ReplicatingSubscription& rs) {
+ Mutex::ScopedLock l(lock);
+ subscription = &rs;
+}
+
namespace {
void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) {
if (qm.position <= position) guard->complete(qm);
}
}
-void QueueGuard::attach(ReplicatingSubscription& rs) {
- // NOTE: attach is called before the ReplicatingSubscription is active so
- // it's position is not changing.
- assert(firstSafe >= rs.getPosition());
- // Complete any messages before or at the ReplicatingSubscription position.
- if (!delayed.empty() && delayed.front() <= rs.getPosition()) {
+bool QueueGuard::subscriptionStart(SequenceNumber position) {
+ // Complete any messages before or at the ReplicatingSubscription start position.
+ if (!delayed.empty() && delayed.front() <= position) {
// FIXME aconway 2012-06-15: queue iteration, only messages in delayed
- queue.eachMessage(boost::bind(&completeBefore, this, rs.getPosition(), _1));
+ queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
}
- Mutex::ScopedLock l(lock);
- // FIXME aconway 2012-06-15: complete messages before rs.getPosition
- subscription = &rs;
+ return position >= range.back;
}
void QueueGuard::complete(const QueuedMessage& qm) {
@@ -135,11 +135,6 @@ void QueueGuard::complete(const QueuedMessage& qm) {
qm.payload->getIngressCompletion().finishCompleter();
}
-framing::SequenceNumber QueueGuard::getFirstSafe() {
- // No lock, firstSafe is immutable.
- return firstSafe;
-}
-
// FIXME aconway 2012-06-04: TODO support for timeout.
}} // namespaces qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 9a68222467..9c6fb55015 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -23,6 +23,7 @@
*/
#include "types.h"
+#include "QueueRange.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/types/Uuid.h"
@@ -78,10 +79,19 @@ class QueueGuard {
void attach(ReplicatingSubscription&);
/**
- * The first sequence number to be protected by this guard. All messages at
- * or after this position are protected.
+ * Return the queue range at the time the QueueGuard was created. The
+ * QueueGuard is created before the queue becomes active: either when a
+ * backup is promoted, or when a new queue is created on the primary.
+ *
+ * NOTE: The first position protected by the guard is getRange().getBack()+1
*/
- framing::SequenceNumber getFirstSafe();
+ const QueueRange& getRange() const { return range; } // range is immutable, no lock needed.
+
+ /** Inform the guard of the stating position for the attached subscription.
+ * Complete messages that will not be seen by the subscriptino.
+ *@return true if the subscription has already advanced to a guarded position.
+ */
+ bool subscriptionStart(framing::SequenceNumber position);
private:
class QueueObserver;
@@ -92,7 +102,7 @@ class QueueGuard {
framing::SequenceSet delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
- framing::SequenceNumber firstSafe; // Immutable
+ const QueueRange range;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueRange.h b/qpid/cpp/src/qpid/ha/QueueRange.h
new file mode 100644
index 0000000000..3ca034e411
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueRange.h
@@ -0,0 +1,71 @@
+#ifndef QPID_HA_QUEUERANGE_H
+#define QPID_HA_QUEUERANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
+#include <iostream>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Get the front/back range of a queue or from a ReplicatingSubscription arguments table.
+ */
+struct QueueRange {
+ public:
+ framing::SequenceNumber front, back;
+
+ QueueRange() { }
+
+ QueueRange(broker::Queue& q) {
+ back = q.getPosition();
+ front = back+1; // assume empty
+ ReplicatingSubscription::getFront(q, front);
+ assert(front <= back + 1);
+ }
+
+ QueueRange(const framing::FieldTable& args) {
+ back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
+ front = back+1;
+ if (args.isSet(ReplicatingSubscription::QPID_FRONT))
+ front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
+ if (back+1 < front)
+ throw Exception(QPID_MSG("Invalid range [" << front << "," << back <<"]"));
+ }
+
+ bool empty() const { return front == back+1; }
+};
+
+
+inline std::ostream& operator<<(std::ostream& o, const QueueRange& qr) {
+ if (qr.front > qr.back) return o << "[-" << qr.back << "]";
+ else return o << "[" << qr.front << "," << qr.back << "]";
+}
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_QUEUERANGE_H*/
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 0d6cbb7ddc..629014b215 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,6 +20,7 @@
*/
#include "QueueGuard.h"
+#include "QueueRange.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Primary.h"
@@ -56,31 +57,31 @@ class DequeueScanner
public:
DequeueScanner(
ReplicatingSubscription& rs,
- const SequenceNumber& first_,
- const SequenceNumber& last_ // Inclusive
- ) : subscription(rs), first(first_), last(last_)
+ SequenceNumber front_,
+ SequenceNumber back_ // Inclusive
+ ) : subscription(rs), front(front_), back(back_)
{
- assert(first <= last);
- // INVARIANT no deques are needed for positions <= at.
- at = first - 1;
+ assert(front <= back);
+ // INVARIANT deques have been added for positions <= at.
+ at = front - 1;
}
void operator()(const QueuedMessage& qm) {
- if (qm.position >= first && qm.position <= last) {
- if (qm.position > at+1)
- subscription.dequeued(at+1, qm.position-1);
+ if (qm.position >= front && qm.position <= back) {
+ if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
at = qm.position;
}
}
+
// Must call after scanning the queue.
void finish() {
- if (at < last) subscription.dequeued(at+1, last);
+ if (at < back) subscription.dequeued(at+1, back);
}
private:
ReplicatingSubscription& subscription;
- SequenceNumber first;
- SequenceNumber last;
+ SequenceNumber front;
+ SequenceNumber back;
SequenceNumber at;
};
@@ -146,38 +147,6 @@ ReplicatingSubscription::Factory::create(
return rs;
}
-struct QueueRange {
- bool empty;
- SequenceNumber front;
- SequenceNumber back;
-
- QueueRange() { }
-
- QueueRange(broker::Queue& q) {
- back = q.getPosition();
- front = back+1; // Assume empty
- empty = !ReplicatingSubscription::getFront(q, front);
- assert(empty || front <= back);
- }
-
- QueueRange(const framing::FieldTable args) {
- back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
- front = back+1; // Assume empty
- empty = !args.isSet(ReplicatingSubscription::QPID_FRONT);
- if (!empty) {
- front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
- if (back < front)
- throw InvalidArgumentException(
- QPID_MSG("Invalid range [" << front << "," << back <<"]"));
- }
- }
-};
-
-ostream& operator<<(ostream& o, const QueueRange& qr) {
- if (qr.front > qr.back) return o << "[-" << qr.back << "]";
- else return o << "[" << qr.front << "," << qr.back << "]";
-}
-
ReplicatingSubscription::ReplicatingSubscription(
SemanticState* parent,
const string& name,
@@ -205,16 +174,9 @@ ReplicatingSubscription::ReplicatingSubscription(
os << "Primary " << queue->getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
- // FIXME aconway 2012-06-10: unsafe to rely in queue front or position they are changing?
-
- QueueRange primary(*queue); // The local primary queue.
- QueueRange backup(arguments); // The remote backup state.
- backupPosition = backup.back;
-
// NOTE: Once the guard is attached we can have concurrent
- // calles to dequeued so we need to lock use of this->deques.
+ // calls to dequeued so we need to lock use of this->deques.
//
-
// However we must attach the guard _before_ we scan for
// initial dequeues to be sure we don't miss any dequeues
// between the scan and attaching the guard.
@@ -222,43 +184,44 @@ ReplicatingSubscription::ReplicatingSubscription(
if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
guard->attach(*this);
- // We can re-use some backup messages if backup and primary queues
- // overlap and the backup is not missing messages at the front of the queue.
+ QueueRange backup(arguments); // The remote backup state.
+ QueueRange primary(guard->getRange()); // The local state at the time the guard was set.
+ backupPosition = backup.back;
+
+ // Sync backup and primary queues, don't send messages already on the backup
- /* if (!primary.empty && // Primary not empty
- !backup.empty && // Backup not empty
- primary.front >= backup.front && // Not missing messages at the front
- primary.front <= backup.back // Overlap
- )
+ if (backup.back < primary.front || backup.front > primary.back
+ || primary.empty() || backup.empty())
{
- // Scan primary queue for gaps that should be dequeued on the backup.
- // NOTE: this runs concurrently with the guard calling dequeued()
- // FIXME aconway 2012-05-22: optimize queue iteration
+ // No overlap - erase backup and start from the beginning
+ if (!backup.empty()) dequeued(backup.front, backup.back);
+ position = primary.front-1;
+ }
+ else { // backup and primary do overlap.
+ // Remove messages from backup that are not in primary.
+ if (primary.back < backup.back) {
+ dequeued(primary.back+1, backup.back); // Trim excess messages at back
+ backup.back = primary.back;
+ }
+ if (backup.front < primary.front) {
+ dequeued(backup.front, primary.front-1); // Trim excess messages at front
+ backup.front = primary.front;
+ }
DequeueScanner scan(*this, backup.front, backup.back);
- queue->eachMessage(scan);
+ // FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
+ queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
scan.finish();
- // If the backup was ahead it has been pruned back to the primary.
- position = std::min(guard->getFirstSafe(), backup.back);
+ position = backup.back;
}
- else */ {
- // Clear the backup queue and reset to start browsing at the
- // front of the primary queue.
- if (!backup.empty) dequeued(backup.front, backup.back);
- position = primary.front - 1; // Start consuming from front.
- }
- QPID_LOG(debug, logPrefix << "Subscribed: "
+
+ QPID_LOG(debug, logPrefix << "Subscribed:"
<< " backup:" << backup
<< " primary:" << primary
- << " position:" << position
- << " safe position: " << guard->getFirstSafe()
- );
-
- // Are we ready yet?
- if (position+1 >= guard->getFirstSafe()) // Next message will be safe.
- setReady();
- else
- QPID_LOG(debug, logPrefix << "Catching up from "
- << position << " to " << guard->getFirstSafe());
+ << " catch-up: " << position << "-" << primary.back
+ << "(" << primary.back-position << ")");
+
+ // Check if we are ready yet.
+ if (guard->subscriptionStart(position)) setReady();
}
catch (const std::exception& e) {
throw InvalidArgumentException(QPID_MSG(logPrefix << e.what()
@@ -307,11 +270,8 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
// Backup will automatically advance by 1 on delivery of message.
backupPosition = qm.position;
}
- // Deliver the message
- return ConsumerImpl::deliver(qm);
}
- else
- return ConsumerImpl::deliver(qm); // Message is for internal event queue.
+ return ConsumerImpl::deliver(qm);
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Error replicating " << qm
<< ": " << e.what());
@@ -344,7 +304,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
guard->complete(qm);
// If next message is protected by the guard then we are ready
- if (qm.position+1 >= guard->getFirstSafe()) setReady();
+ if (qm.position >= guard->getRange().back) setReady();
}
ConsumerImpl::acknowledged(qm);
}
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index b8b642c504..aea4460e5a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -572,7 +572,7 @@ class NumberedSender(Thread):
"""
Thread.__init__(self)
cmd = ["qpid-send",
- "--broker", url or broker.host_port(),
+ "--broker", url or broker.host_port(),
"--address", "%s;{create:always}"%queue,
"--connection-options", "{%s}"%(connection_options),
"--content-stdin"
@@ -647,6 +647,7 @@ class NumberedReceiver(Thread):
self.error = None
self.sender = sender
self.received = 0
+ self.queue = queue
def read_message(self):
n = int(self.receiver.stdout.readline())
@@ -657,7 +658,7 @@ class NumberedReceiver(Thread):
m = self.read_message()
while m != -1:
self.receiver.assert_running()
- assert m <= self.received, "%s missing message %s>%s"%(queue, m, self.received)
+ assert m <= self.received, "%s missing message %s>%s"%(self.queue, m, self.received)
if (m == self.received): # Ignore duplicates
self.received += 1
if self.sender:
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e5a204d03c..7338136bfd 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -729,7 +729,7 @@ class LongTests(BrokerTest):
brokers = HaCluster(self, 3)
# Start sender and receiver threads
- n = 10;
+ n = 10
senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
queue="test%s"%(i)) for i in xrange(n)]
receivers = [NumberedReceiver(brokers[0], sender=senders[i],
@@ -760,8 +760,7 @@ class LongTests(BrokerTest):
def enough(): # Verify we're still running
receivers[0].check() # Verify no exceptions
return receivers[0].received > n + 100
- # FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec.
- assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n)
+ assert retry(enough), "Stalled: %s < %s+100"%(receivers[0].received, n)
except:
traceback.print_exc()
raise