summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp')
-rw-r--r--qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp5
1 files changed, 3 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
index 2d8af3b052..e3990a13cc 100644
--- a/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
+++ b/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
@@ -57,7 +57,6 @@ void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeu
{
FieldTable headers;
headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
- headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
boost::intrusive_ptr<Message> msg(createMessage(headers));
@@ -69,7 +68,6 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu
boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
- headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
queue->deliver(msg);
}
@@ -131,12 +129,14 @@ void ReplicatingEventListener::initialize(Plugin::Target& target)
{
Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && !options.queue.empty()) {
+ broker->addFinalizer(boost::bind(&ReplicatingEventListener::shutdown, this));
if (options.createQueue) {
queue = broker->getQueues().declare(options.queue).first;
} else {
queue = broker->getQueues().find(options.queue);
}
if (queue) {
+ queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO);
QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1);
broker->getQueueEvents().registerListener(options.name, callback);
QPID_LOG(info, "Registered replicating queue event listener");
@@ -147,6 +147,7 @@ void ReplicatingEventListener::initialize(Plugin::Target& target)
}
void ReplicatingEventListener::earlyInitialize(Target&) {}
+void ReplicatingEventListener::shutdown() { queue.reset(); }
ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"),
name("replicator"),