summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-12-13 19:30:12 +0000
committerAlan Conway <aconway@apache.org>2013-12-13 19:30:12 +0000
commitc36cdc3923d757f57582cb00ef4396471ff7fdc0 (patch)
tree6704e06e45c7657138201fbfe9bb808b677e4a23 /cpp/src/qpid/ha/ReplicatingSubscription.cpp
parent50593c783fbcf9e85ae11152f159a7545beb0647 (diff)
downloadqpid-python-c36cdc3923d757f57582cb00ef4396471ff7fdc0.tar.gz
QPID-5421: HA replication error in stand-alone replication
There were replication errors because with stand-alone replication an IdSetter was not set on the original queue until queue replication was set up. Any messages on the queue *before* replication was setup had 0 replication IDs. When one of those messages was dequeued on the source queue, an incorrect message was dequeued on the replica queue. The fix is to add an IdSetter to every queue when replication is enabled. The unit test ha_tests.ReplicationTests.test_standalone_queue_replica has been updated to test for this issue. This commit also has some general tidy-up work around IdSetter and QueueSnapshot. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1550819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp17
1 files changed, 3 insertions, 14 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 2db7845067..635d5047bd 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -22,7 +22,7 @@
#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "TxReplicatingSubscription.h"
#include "Primary.h"
@@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize() {
info.printId(os) << ": ";
logPrefix = os.str();
- // If this is a non-cluster standalone replication then we need to
- // set up an IdSetter if there is not already one.
- boost::shared_ptr<IdSetter> idSetter;
- queue->getMessageInterceptors().each(
- boost::bind(&copyIf, _1, boost::ref(idSetter)));
- if (!idSetter) {
- QPID_LOG(debug, logPrefix << "Standalone replication");
- queue->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
- }
-
// If there's already a guard (we are in failover) use it, else create one.
if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -152,14 +141,14 @@ void ReplicatingSubscription::initialize() {
// between the snapshot and attaching the observer.
queue->getObservers().add(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
- boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+ boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>();
// There may be no snapshot if the queue is being deleted concurrently.
if (!snapshot) {
queue->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
}
- ReplicationIdSet primaryIds = snapshot->snapshot();
+ ReplicationIdSet primaryIds = snapshot->getSnapshot();
std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
ReplicationIdSet backupIds;
if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);