summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueReplicator.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-28 18:24:31 +0000
committerAlan Conway <aconway@apache.org>2012-05-28 18:24:31 +0000
commit83ae9111c7aecd7e75423b32e225d1070157416c (patch)
tree52c5bb92d9c2deff0b54439129aea4d3e73e4ac6 /cpp/src/qpid/ha/QueueReplicator.cpp
parenta6da1a2f32df0a4cce974c5faf6fc8ddd3841b80 (diff)
downloadqpid-python-83ae9111c7aecd7e75423b32e225d1070157416c.tar.gz
QPID-3603: Failover optimization restored.
A backup broker that fails over to a new primary can avoid downloading messages that it already has from the previous primary. The backup sends its position to the primary as a client-arg and the primary sends back any necessary dequeues and starts replicating after the messages on the backup. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1343350 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp35
1 files changed, 14 insertions, 21 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 76840ea92e..47fc3afdeb 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -112,25 +112,19 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
-
- // FIXME aconway 2011-12-09: Failover optimization removed.
- // There was code here to re-use messages already on the backup
- // during fail-over. This optimization was removed to simplify
- // the logic till we get the basic replication stable, it
- // can be re-introduced later. Last revision with the optimization:
- // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
-
- // Clear out any old messages, reset the queue to start replicating fresh.
- queue->purge(); // FIXME aconway 2012-05-02: race
- queue->setPosition(0);
-
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- // TODO aconway 2011-12-19: optimize.
- settings.setInt(QPID_SYNC_FREQUENCY, 1);
- peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings);
+ settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+ settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+ SequenceNumber front;
+ if (ReplicatingSubscription::getFront(*queue, front))
+ settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front);
+ peer.getMessage().subscribe(
+ args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
+ false/*exclusive*/, "", 0, settings);
+ // FIXME aconway 2012-05-22: use a finite credit window
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
+ QPID_LOG(debug, logPrefix << "Subscribed bridge: " << bridgeName << " " << settings);
}
namespace {
@@ -174,11 +168,10 @@ void QueueReplicator::route(Deliverable& msg)
SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
<< " to " << position);
- if (queue->getPosition() > position) {
- throw Exception(
- QPID_MSG(logPrefix << "Invalid position update from "
- << queue->getPosition() << " to " << position));
- }
+ // Verify that there are no messages after the new position in the queue.
+ SequenceNumber next;
+ if (ReplicatingSubscription::getNext(*queue, position, next))
+ throw Exception("Invalid position move, preceeds messages");
queue->setPosition(position);
}
// Ignore unknown event keys, may be introduced in later versions.