diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-28 18:24:31 +0000 |
| commit | 83ae9111c7aecd7e75423b32e225d1070157416c (patch) | |
| tree | 52c5bb92d9c2deff0b54439129aea4d3e73e4ac6 /cpp/src/qpid/ha/QueueReplicator.cpp | |
| parent | a6da1a2f32df0a4cce974c5faf6fc8ddd3841b80 (diff) | |
| download | qpid-python-83ae9111c7aecd7e75423b32e225d1070157416c.tar.gz | |
QPID-3603: Failover optimization restored.
A backup broker that fails over to a new primary can avoid downloading messages
that it already has from the previous primary. The backup sends its position to
the primary as a client-arg and the primary sends back any necessary dequeues
and starts replicating after the messages on the backup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1343350 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 35 |
1 files changed, 14 insertions, 21 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 76840ea92e..47fc3afdeb 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -112,25 +112,19 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; - - // FIXME aconway 2011-12-09: Failover optimization removed. - // There was code here to re-use messages already on the backup - // during fail-over. This optimization was removed to simplify - // the logic till we get the basic replication stable, it - // can be re-introduced later. Last revision with the optimization: - // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - - // Clear out any old messages, reset the queue to start replicating fresh. - queue->purge(); // FIXME aconway 2012-05-02: race - queue->setPosition(0); - settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - // TODO aconway 2011-12-19: optimize. - settings.setInt(QPID_SYNC_FREQUENCY, 1); - peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings); + settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? + settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); + SequenceNumber front; + if (ReplicatingSubscription::getFront(*queue, front)) + settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front); + peer.getMessage().subscribe( + args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, + false/*exclusive*/, "", 0, settings); + // FIXME aconway 2012-05-22: use a finite credit window peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName); + QPID_LOG(debug, logPrefix << "Subscribed bridge: " << bridgeName << " " << settings); } namespace { @@ -174,11 +168,10 @@ void QueueReplicator::route(Deliverable& msg) SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() << " to " << position); - if (queue->getPosition() > position) { - throw Exception( - QPID_MSG(logPrefix << "Invalid position update from " - << queue->getPosition() << " to " << position)); - } + // Verify that there are no messages after the new position in the queue. + SequenceNumber next; + if (ReplicatingSubscription::getNext(*queue, position, next)) + throw Exception("Invalid position move, preceeds messages"); queue->setPosition(position); } // Ignore unknown event keys, may be introduced in later versions. |
