summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:05:52 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:05:52 +0000
commitc7e4eead0c3010e28b6008f493174734c17ce787 (patch)
tree68ac7f7debc23307397054b18cafe6f5b4ede597
parent6e55ec37dfab608e3df77dcbc19df746035d2183 (diff)
downloadqpid-python-c7e4eead0c3010e28b6008f493174734c17ce787.tar.gz
QPID-3603: Failover optimization removed.
There was an optimization to re-use messages already on the backup after fail-over. This optimization was removed to simplify the logic while we basic replication working. It can be re-introduced later. Last revision with the optimization was: r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233661 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp86
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h2
3 files changed, 28 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index ccdc4dd0b1..515c3f4185 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -49,7 +49,6 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L
: Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
queue(q), link(l), current(queue->getPosition())
{
- // FIXME aconway 2011-11-24: consistent logging.
QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
// Declare the replicator bridge.
queue->getBroker()->getLinks().declare(
@@ -77,12 +76,20 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
+
+ // FIXME aconway 2011-12-09: Failover optimization removed.
+ // There was code here to re-use messages already on the backup
+ // during fail-over. This optimization was removed to simplify
+ // the logic till we get the basic replication stable, it
+ // can be re-introduced later. Last revision with the optimization:
+ // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
+ // Clear out any old messages, reset the queue to start replicating fresh.
+ queue->purge();
+ queue->setPosition(0);
+
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
settings.setInt(QPID_SYNC_FREQUENCY, 1);
- qpid::framing::SequenceNumber oldest;
- if (queue->getOldest(oldest))
- settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 5f7fe611cf..50d5fc55c7 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -34,44 +34,12 @@ using namespace broker;
using namespace std;
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
-const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
-const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
namespace {
const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
-class ReplicationStateInitialiser
-{
- public:
- ReplicationStateInitialiser(
- qpid::framing::SequenceSet& r,
- const qpid::framing::SequenceNumber& s,
- const qpid::framing::SequenceNumber& e) : dequeues(r), start(s), end(e)
- {
- dequeues.add(start, end);
- }
-
- void operator()(const QueuedMessage& message) {
- if (message.position < start) {
- //replica does not have a message that should still be on the queue
- QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
- // FIXME aconway 2011-12-09: we want the replica to dump
- // its messages and start from scratch in this case.
- } else if (message.position >= start && message.position <= end) {
- //i.e. message is within the intial range and has not been dequeued,
- //so remove it from the dequeues
- dequeues.remove(message.position);
- } //else message has not been seen by replica yet so can be ignored here
- }
-
- private:
- qpid::framing::SequenceSet& dequeues;
- const qpid::framing::SequenceNumber start;
- const qpid::framing::SequenceNumber end;
-};
-
string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
@@ -97,7 +65,6 @@ ReplicatingSubscription::Factory::create(
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, false, exclusive, tag,
resumeId, resumeTtl, arguments));
- // FIXME aconway 2011-12-08: need to removeObserver also.
queue->addObserver(rs);
}
return rs;
@@ -119,44 +86,23 @@ ReplicatingSubscription::ReplicatingSubscription(
events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
- // FIXME aconway 2011-12-09: Here we take advantage of existing
- // messages on the backup queue to reduce replication
- // effort. However if the backup queue is inconsistent with being
- // a backup of the primary queue, then we want to issue a warning
- // and tell the backup to dump its messages and start replicating
- // from scratch.
+ // FIXME aconway 2011-12-09: Failover optimization removed.
+ // There was code here to re-use messages already on the backup
+ // during fail-over. This optimization was removed to simplify
+ // the logic till we get the basic replication stable, it
+ // can be re-introduced later. Last revision with the optimization:
+ // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
- if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) {
- qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER);
- qpid::framing::SequenceNumber lwm;
- if (arguments.isSet(QPID_LOW_SEQUENCE_NUMBER)) {
- lwm = arguments.getAsInt(QPID_LOW_SEQUENCE_NUMBER);
- } else {
- lwm = hwm;
- }
- qpid::framing::SequenceNumber oldest;
- if (queue->getOldest(oldest)) {
- if (oldest >= hwm) {
- dequeues.add(lwm, --oldest);
- } else if (oldest >= lwm) {
- ReplicationStateInitialiser initialiser(dequeues, lwm, hwm);
- queue->eachMessage(initialiser);
- } else { //i.e. older message on master than is reported to exist on replica
- // FIXME aconway 2011-12-09: dump and start from scratch?
- QPID_LOG(warning, "HA: Replica missing message on primary");
- }
- } else {
- //local queue (i.e. master) is empty
- dequeues.add(lwm, queue->getPosition());
- // FIXME aconway 2011-12-09: if hwm >
- // queue->getPosition(), dump and start from scratch?
- }
- QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": "
- << dequeues << " (lwm=" << lwm << ", hwm=" << hwm
- << ", current=" << queue->getPosition() << ")");
- //set position of 'cursor'
- position = hwm;
- }
+ qpid::framing::SequenceNumber oldest;
+ if (queue->getOldest(oldest))
+ dequeues.add(0, --oldest);
+ else //local queue (i.e. master) is empty
+ dequeues.add(0, queue->getPosition());
+
+ QPID_LOG(debug, "HA: Initial dequeues for " << queue->getName() << ": " << dequeues);
+ // Set 'cursor' on backup queue. Will be updated by dequeue event sent above.
+ position = 0;
}
bool ReplicatingSubscription::deliver(QueuedMessage& m)
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index c946b7b993..6d75d6fb73 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -61,8 +61,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
// Argument names for consume command.
static const std::string QPID_REPLICATING_SUBSCRIPTION;
- static const std::string QPID_HIGH_SEQUENCE_NUMBER;
- static const std::string QPID_LOW_SEQUENCE_NUMBER;
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,