diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:05:52 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:05:52 +0000 |
commit | c7e4eead0c3010e28b6008f493174734c17ce787 (patch) | |
tree | 68ac7f7debc23307397054b18cafe6f5b4ede597 | |
parent | 6e55ec37dfab608e3df77dcbc19df746035d2183 (diff) | |
download | qpid-python-c7e4eead0c3010e28b6008f493174734c17ce787.tar.gz |
QPID-3603: Failover optimization removed.
There was an optimization to re-use messages already on the backup
after fail-over. This optimization was removed to simplify the logic
while we basic replication working. It can be re-introduced
later. Last revision with the optimization was:
r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233661 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 86 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 2 |
3 files changed, 28 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index ccdc4dd0b1..515c3f4185 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -49,7 +49,6 @@ QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<L : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management? queue(q), link(l), current(queue->getPosition()) { - // FIXME aconway 2011-11-24: consistent logging. QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings()); // Declare the replicator bridge. queue->getBroker()->getLinks().declare( @@ -77,12 +76,20 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; + + // FIXME aconway 2011-12-09: Failover optimization removed. + // There was code here to re-use messages already on the backup + // during fail-over. This optimization was removed to simplify + // the logic till we get the basic replication stable, it + // can be re-introduced later. Last revision with the optimization: + // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. + + // Clear out any old messages, reset the queue to start replicating fresh. + queue->purge(); + queue->setPosition(0); + settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); settings.setInt(QPID_SYNC_FREQUENCY, 1); - qpid::framing::SequenceNumber oldest; - if (queue->getOldest(oldest)) - settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest); peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 5f7fe611cf..50d5fc55c7 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -34,44 +34,12 @@ using namespace broker; using namespace std; const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); -const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); -const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); namespace { const string DOLLAR("$"); const string INTERNAL("-internal"); } // namespace -class ReplicationStateInitialiser -{ - public: - ReplicationStateInitialiser( - qpid::framing::SequenceSet& r, - const qpid::framing::SequenceNumber& s, - const qpid::framing::SequenceNumber& e) : dequeues(r), start(s), end(e) - { - dequeues.add(start, end); - } - - void operator()(const QueuedMessage& message) { - if (message.position < start) { - //replica does not have a message that should still be on the queue - QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message)); - // FIXME aconway 2011-12-09: we want the replica to dump - // its messages and start from scratch in this case. - } else if (message.position >= start && message.position <= end) { - //i.e. message is within the intial range and has not been dequeued, - //so remove it from the dequeues - dequeues.remove(message.position); - } //else message has not been seen by replica yet so can be ignored here - } - - private: - qpid::framing::SequenceSet& dequeues; - const qpid::framing::SequenceNumber start; - const qpid::framing::SequenceNumber end; -}; - string mask(const string& in) { return DOLLAR + in + INTERNAL; @@ -97,7 +65,6 @@ ReplicatingSubscription::Factory::create( rs.reset(new ReplicatingSubscription( parent, name, queue, ack, false, exclusive, tag, resumeId, resumeTtl, arguments)); - // FIXME aconway 2011-12-08: need to removeObserver also. queue->addObserver(rs); } return rs; @@ -119,44 +86,23 @@ ReplicatingSubscription::ReplicatingSubscription( events(new Queue(mask(name))), consumer(new DelegatingConsumer(*this)) { - // FIXME aconway 2011-12-09: Here we take advantage of existing - // messages on the backup queue to reduce replication - // effort. However if the backup queue is inconsistent with being - // a backup of the primary queue, then we want to issue a warning - // and tell the backup to dump its messages and start replicating - // from scratch. + // FIXME aconway 2011-12-09: Failover optimization removed. + // There was code here to re-use messages already on the backup + // during fail-over. This optimization was removed to simplify + // the logic till we get the basic replication stable, it + // can be re-introduced later. Last revision with the optimization: + // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. + QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName()); - if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) { - qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER); - qpid::framing::SequenceNumber lwm; - if (arguments.isSet(QPID_LOW_SEQUENCE_NUMBER)) { - lwm = arguments.getAsInt(QPID_LOW_SEQUENCE_NUMBER); - } else { - lwm = hwm; - } - qpid::framing::SequenceNumber oldest; - if (queue->getOldest(oldest)) { - if (oldest >= hwm) { - dequeues.add(lwm, --oldest); - } else if (oldest >= lwm) { - ReplicationStateInitialiser initialiser(dequeues, lwm, hwm); - queue->eachMessage(initialiser); - } else { //i.e. older message on master than is reported to exist on replica - // FIXME aconway 2011-12-09: dump and start from scratch? - QPID_LOG(warning, "HA: Replica missing message on primary"); - } - } else { - //local queue (i.e. master) is empty - dequeues.add(lwm, queue->getPosition()); - // FIXME aconway 2011-12-09: if hwm > - // queue->getPosition(), dump and start from scratch? - } - QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": " - << dequeues << " (lwm=" << lwm << ", hwm=" << hwm - << ", current=" << queue->getPosition() << ")"); - //set position of 'cursor' - position = hwm; - } + qpid::framing::SequenceNumber oldest; + if (queue->getOldest(oldest)) + dequeues.add(0, --oldest); + else //local queue (i.e. master) is empty + dequeues.add(0, queue->getPosition()); + + QPID_LOG(debug, "HA: Initial dequeues for " << queue->getName() << ": " << dequeues); + // Set 'cursor' on backup queue. Will be updated by dequeue event sent above. + position = 0; } bool ReplicatingSubscription::deliver(QueuedMessage& m) diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index c946b7b993..6d75d6fb73 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -61,8 +61,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, // Argument names for consume command. static const std::string QPID_REPLICATING_SUBSCRIPTION; - static const std::string QPID_HIGH_SEQUENCE_NUMBER; - static const std::string QPID_LOW_SEQUENCE_NUMBER; ReplicatingSubscription(broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , |