diff options
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 17 |
1 files changed, 3 insertions, 14 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 2db7845067..635d5047bd 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -22,7 +22,7 @@ #include "Event.h" #include "IdSetter.h" #include "QueueGuard.h" -#include "QueueSnapshots.h" +#include "QueueSnapshot.h" #include "ReplicatingSubscription.h" #include "TxReplicatingSubscription.h" #include "Primary.h" @@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize() { info.printId(os) << ": "; logPrefix = os.str(); - // If this is a non-cluster standalone replication then we need to - // set up an IdSetter if there is not already one. - boost::shared_ptr<IdSetter> idSetter; - queue->getMessageInterceptors().each( - boost::bind(©If, _1, boost::ref(idSetter))); - if (!idSetter) { - QPID_LOG(debug, logPrefix << "Standalone replication"); - queue->getMessageInterceptors().add( - boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1))); - } - // If there's already a guard (we are in failover) use it, else create one. if (primary) guard = primary->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); @@ -152,14 +141,14 @@ void ReplicatingSubscription::initialize() { // between the snapshot and attaching the observer. queue->getObservers().add( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); - boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue); + boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>(); // There may be no snapshot if the queue is being deleted concurrently. if (!snapshot) { queue->getObservers().remove( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted"); } - ReplicationIdSet primaryIds = snapshot->snapshot(); + ReplicationIdSet primaryIds = snapshot->getSnapshot(); std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET); ReplicationIdSet backupIds; if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr); |
