From c36cdc3923d757f57582cb00ef4396471ff7fdc0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 13 Dec 2013 19:30:12 +0000 Subject: QPID-5421: HA replication error in stand-alone replication There were replication errors because with stand-alone replication an IdSetter was not set on the original queue until queue replication was set up. Any messages on the queue *before* replication was setup had 0 replication IDs. When one of those messages was dequeued on the source queue, an incorrect message was dequeued on the replica queue. The fix is to add an IdSetter to every queue when replication is enabled. The unit test ha_tests.ReplicationTests.test_standalone_queue_replica has been updated to test for this issue. This commit also has some general tidy-up work around IdSetter and QueueSnapshot. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1550819 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/ha/ReplicatingSubscription.cpp | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp') diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 2db7845067..635d5047bd 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -22,7 +22,7 @@ #include "Event.h" #include "IdSetter.h" #include "QueueGuard.h" -#include "QueueSnapshots.h" +#include "QueueSnapshot.h" #include "ReplicatingSubscription.h" #include "TxReplicatingSubscription.h" #include "Primary.h" @@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize() { info.printId(os) << ": "; logPrefix = os.str(); - // If this is a non-cluster standalone replication then we need to - // set up an IdSetter if there is not already one. - boost::shared_ptr idSetter; - queue->getMessageInterceptors().each( - boost::bind(©If, _1, boost::ref(idSetter))); - if (!idSetter) { - QPID_LOG(debug, logPrefix << "Standalone replication"); - queue->getMessageInterceptors().add( - boost::shared_ptr(new IdSetter(queue->getName(), 1))); - } - // If there's already a guard (we are in failover) use it, else create one. if (primary) guard = primary->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); @@ -152,14 +141,14 @@ void ReplicatingSubscription::initialize() { // between the snapshot and attaching the observer. queue->getObservers().add( boost::dynamic_pointer_cast(shared_from_this())); - boost::shared_ptr snapshot = haBroker.getQueueSnapshots()->get(queue); + boost::shared_ptr snapshot = queue->getObservers().findType(); // There may be no snapshot if the queue is being deleted concurrently. if (!snapshot) { queue->getObservers().remove( boost::dynamic_pointer_cast(shared_from_this())); throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted"); } - ReplicationIdSet primaryIds = snapshot->snapshot(); + ReplicationIdSet primaryIds = snapshot->getSnapshot(); std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET); ReplicationIdSet backupIds; if (!backupStr.empty()) backupIds = decodeStr(backupStr); -- cgit v1.2.1