diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 40 |
1 files changed, 24 insertions, 16 deletions
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index b43658365c..eda3f96180 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -21,8 +21,9 @@ #include "Event.h" #include "HaBroker.h" +#include "IdSetter.h" #include "QueueReplicator.h" -#include "QueueSnapshots.h" +#include "QueueSnapshot.h" #include "ReplicatingSubscription.h" #include "Settings.h" #include "types.h" @@ -122,6 +123,11 @@ QueueReplicator::QueueReplicator(HaBroker& hb, settings(hb.getSettings()), nextId(0), maxId(0) { + // The QueueReplicator will take over setting replication IDs. + boost::shared_ptr<IdSetter> setter = + q->getMessageInterceptors().findType<IdSetter>(); + if (setter) q->getMessageInterceptors().remove(setter); + args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); @@ -212,8 +218,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType()); arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize? arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable()); - arguments.setString(ReplicatingSubscription::QPID_ID_SET, - encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot())); + boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>(); + if (qs) arguments.setString(ReplicatingSubscription::QPID_ID_SET, encodeStr(qs->getSnapshot())); + try { peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, @@ -254,6 +261,7 @@ void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) { } // Called in connection thread of the queues bridge to primary. + void QueueReplicator::route(Deliverable& deliverable) { try { @@ -293,11 +301,6 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) { nextId = decodeStr<IdEvent>(data).id; } -ReplicationId QueueReplicator::getMaxId() { - Mutex::ScopedLock l(lock); - return maxId; -} - void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) { if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) { // If the queue is destroyed at the same time we are subscribing, we may @@ -320,14 +323,19 @@ bool QueueReplicator::hasBindings() { return false; } std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; } void QueueReplicator::promoted() { - // Promoted to primary, deal with auto-delete now. - if (queue && queue->isAutoDelete() && subscribed) { - // Make a temporary shared_ptr to prevent premature deletion of queue. - // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue - // which could delete the queue while it's still running it's destroyed logic. - boost::shared_ptr<Queue> q(queue); - q->releaseFromUse(); - q->scheduleAutoDelete(); + if (queue) { + // On primary QueueReplicator no longer sets IDs, start an IdSetter. + queue->getMessageInterceptors().add( + boost::shared_ptr<IdSetter>(new IdSetter(maxId+1))); + // Process auto-deletes + if (queue->isAutoDelete() && subscribed) { + // Make a temporary shared_ptr to prevent premature deletion of queue. + // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue + // which could delete the queue while it's still running it's destroyed logic. + boost::shared_ptr<Queue> q(queue); + q->releaseFromUse(); + q->scheduleAutoDelete(); + } } } |
