diff options
Diffstat (limited to 'qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp index 2d8af3b052..e3990a13cc 100644 --- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -57,7 +57,6 @@ void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeu { FieldTable headers; headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName()); - headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE); headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position); boost::intrusive_ptr<Message> msg(createMessage(headers)); @@ -69,7 +68,6 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); - headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence); headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); queue->deliver(msg); } @@ -131,12 +129,14 @@ void ReplicatingEventListener::initialize(Plugin::Target& target) { Broker* broker = dynamic_cast<broker::Broker*>(&target); if (broker && !options.queue.empty()) { + broker->addFinalizer(boost::bind(&ReplicatingEventListener::shutdown, this)); if (options.createQueue) { queue = broker->getQueues().declare(options.queue).first; } else { queue = broker->getQueues().find(options.queue); } if (queue) { + queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO); QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1); broker->getQueueEvents().registerListener(options.name, callback); QPID_LOG(info, "Registered replicating queue event listener"); @@ -147,6 +147,7 @@ void ReplicatingEventListener::initialize(Plugin::Target& target) } void ReplicatingEventListener::earlyInitialize(Target&) {} +void ReplicatingEventListener::shutdown() { queue.reset(); } ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"), name("replicator"), |