summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/replication
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/replication
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/replication')
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.cpp201
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.h78
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.cpp234
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.h72
-rw-r--r--cpp/src/qpid/replication/constants.h34
5 files changed, 0 insertions, 619 deletions
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
deleted file mode 100644
index b7d52372f4..0000000000
--- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/replication/ReplicatingEventListener.h"
-#include "qpid/replication/constants.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/QueueEvents.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace replication {
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::replication::constants;
-
-void ReplicatingEventListener::handle(QueueEvents::Event event)
-{
- switch (event.type) {
- case QueueEvents::ENQUEUE:
- deliverEnqueueMessage(event.msg);
- QPID_LOG(debug, "Queuing 'enqueue' event on " << event.msg.queue->getName() << " for replication");
- break;
- case QueueEvents::DEQUEUE:
- deliverDequeueMessage(event.msg);
- QPID_LOG(debug, "Queuing 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position "
- << event.msg.position << ")");
- break;
- }
-}
-
-namespace {
-const std::string EMPTY;
-}
-
-void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeued)
-{
- FieldTable headers;
- headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
- headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
- headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
- boost::intrusive_ptr<Message> msg(createMessage(headers));
- DeliveryProperties* props = msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
- props->setRoutingKey(dequeued.queue->getName());
- route(msg);
-}
-
-void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
-{
- boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
- FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
- headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
- headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
- headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
- route(msg);
-}
-
-void ReplicatingEventListener::route(boost::intrusive_ptr<qpid::broker::Message> msg)
-{
- try {
- if (exchange) {
- DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
- } else if (queue) {
- queue->deliver(msg);
- } else {
- QPID_LOG(error, "Cannot route replication event, neither replication queue nor exchange configured");
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Error enqueing replication event: " << e.what());
- }
-}
-
-
-boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers)
-{
- boost::intrusive_ptr<Message> msg(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), EMPTY, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- header.setBof(false);
- header.setEof(true);
- header.setBos(true);
- header.setEos(true);
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setApplicationHeaders(headers);
- return msg;
-}
-
-struct AppendingHandler : FrameHandler
-{
- boost::intrusive_ptr<Message> msg;
-
- AppendingHandler(boost::intrusive_ptr<Message> m) : msg(m) {}
-
- void handle(AMQFrame& f)
- {
- msg->getFrames().append(f);
- }
-};
-
-boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original)
-{
- boost::intrusive_ptr<Message> copy(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), EMPTY, 0, 0)));
- AppendingHandler handler(copy);
- handler.handle(method);
-
- //To avoid modifying original headers, create new frame with
- //cloned body:
- AMQFrame header(*original->getFrames().getHeaders());
- header.setBof(false);
- header.setEof(!original->getFrames().getContentSize());//if there is any content then the header is not the end of the frameset
- header.setBos(true);
- header.setEos(true);
- handler.handle(header);
-
- original->sendContent(queue, handler, std::numeric_limits<int16_t>::max());
- return copy;
-}
-
-Options* ReplicatingEventListener::getOptions()
-{
- return &options;
-}
-
-void ReplicatingEventListener::initialize(Plugin::Target& target)
-{
- Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker) {
- broker->addFinalizer(boost::bind(&ReplicatingEventListener::shutdown, this));
- if (!options.exchange.empty()) {
- if (!options.queue.empty()) {
- QPID_LOG(warning, "Replication queue option ignored as replication exchange has been specified");
- }
- try {
- exchange = broker->getExchanges().declare(options.exchange, options.exchangeType).first;
- } catch (const UnknownExchangeTypeException&) {
- QPID_LOG(error, "Replication disabled due to invalid type: " << options.exchangeType);
- }
- } else if (!options.queue.empty()) {
- if (options.createQueue) {
- queue = broker->getQueues().declare(options.queue).first;
- } else {
- queue = broker->getQueues().find(options.queue);
- }
- if (queue) {
- queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO);
- } else {
- QPID_LOG(error, "Replication queue named '" << options.queue << "' does not exist; replication plugin disabled.");
- }
- }
- if (queue || exchange) {
- QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1);
- broker->getQueueEvents().registerListener(options.name, callback);
- QPID_LOG(info, "Registered replicating queue event listener");
- }
- }
-}
-
-void ReplicatingEventListener::earlyInitialize(Target&) {}
-void ReplicatingEventListener::shutdown() { queue.reset(); exchange.reset(); }
-
-ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"),
- exchangeType("direct"),
- name("replicator"),
- createQueue(false)
-{
- addOptions()
- ("replication-exchange-name", optValue(exchange, "EXCHANGE"), "Exchange to which events for other queues are routed")
- ("replication-exchange-type", optValue(exchangeType, "direct|topic etc"), "Type of exchange to use")
- ("replication-queue", optValue(queue, "QUEUE"), "Queue on which events for other queues are recorded")
- ("replication-listener-name", optValue(name, "NAME"), "name by which to register the replicating event listener")
- ("create-replication-queue", optValue(createQueue), "if set, the replication will be created if it does not exist");
-}
-
-static ReplicatingEventListener plugin;
-
-}} // namespace qpid::replication
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.h b/cpp/src/qpid/replication/ReplicatingEventListener.h
deleted file mode 100644
index 74418d00e6..0000000000
--- a/cpp/src/qpid/replication/ReplicatingEventListener.h
+++ /dev/null
@@ -1,78 +0,0 @@
-#ifndef QPID_REPLICATION_REPLICATINGEVENTLISTENER_H
-#define QPID_REPLICATION_REPLICATINGEVENTLISTENER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/Plugin.h"
-#include "qpid/Options.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueEvents.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/SequenceNumber.h"
-
-namespace qpid {
-namespace replication {
-
-/**
- * An event listener plugin that records queue events as messages on a
- * replication queue, from where they can be consumed (e.g. by an
- * inter-broker link to the corresponding QueueReplicationExchange
- * plugin.
- */
-class ReplicatingEventListener : public Plugin
-{
- public:
- Options* getOptions();
- void earlyInitialize(Plugin::Target& target);
- void initialize(Plugin::Target& target);
- void handle(qpid::broker::QueueEvents::Event);
- private:
- struct PluginOptions : public Options
- {
- std::string queue;
- std::string exchange;
- std::string exchangeType;
- std::string name;
- bool createQueue;
-
- PluginOptions();
- };
-
- PluginOptions options;
- qpid::broker::Queue::shared_ptr queue;
- qpid::broker::Exchange::shared_ptr exchange;
-
- void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued);
- void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued);
- void route(boost::intrusive_ptr<qpid::broker::Message>);
- void shutdown();
-
- boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers);
- boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue,
- boost::intrusive_ptr<qpid::broker::Message> original);
-};
-
-}} // namespace qpid::replication
-
-#endif /*!QPID_REPLICATION_REPLICATINGEVENTLISTENER_H*/
diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp
deleted file mode 100644
index 4b6d25ac7d..0000000000
--- a/cpp/src/qpid/replication/ReplicationExchange.cpp
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/replication/ReplicationExchange.h"
-#include "qpid/replication/constants.h"
-#include "qpid/Plugin.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace replication {
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::replication::constants;
-
-const std::string SEQUENCE_VALUE("qpid.replication-event.sequence");
-ReplicationExchange::ReplicationExchange(const std::string& name, bool durable,
- const FieldTable& _args,
- QueueRegistry& qr,
- Manageable* parent, Broker* broker)
- : Exchange(name, durable, _args, parent, broker), queues(qr), sequence(args.getAsInt64(SEQUENCE_VALUE)), init(false)
-{
- args.setInt64(SEQUENCE_VALUE, sequence);
- if (mgmtExchange != 0)
- mgmtExchange->set_type(typeName);
-}
-
-std::string ReplicationExchange::getType() const { return typeName; }
-
-void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args)
-{
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- }
- if (args) {
- int eventType = args->getAsInt(REPLICATION_EVENT_TYPE);
- if (eventType) {
- if (isDuplicate(args)) return;
- switch (eventType) {
- case ENQUEUE:
- handleEnqueueEvent(args, msg);
- return;
- case DEQUEUE:
- handleDequeueEvent(args, msg);
- return;
- default:
- throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
- }
- }
- } else {
- QPID_LOG(warning, "Dropping unexpected message with no headers");
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- }
- }
-}
-
-void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg)
-{
- std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
- Queue::shared_ptr queue = queues.find(queueName);
- if (queue) {
-
- SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION));
-
- // note that queue will ++ before enqueue.
- if (queue->getPosition() > --seqno1) // test queue.pos < seqnumber
- {
- QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue");
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- } else {
- queue->setPosition(seqno1);
-
- FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
- headers.erase(REPLICATION_TARGET_QUEUE);
- headers.erase(REPLICATION_EVENT_SEQNO);
- headers.erase(REPLICATION_EVENT_TYPE);
- headers.erase(QUEUE_MESSAGE_POSITION);
- msg.deliverTo(queue);
- QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes();
- mgmtExchange->inc_byteRoutes( msg.contentSize());
- }
- }
- } else {
- QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist");
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- }
- }
-}
-
-void ReplicationExchange::handleDequeueEvent(const FieldTable* args, Deliverable& msg)
-{
- std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
- Queue::shared_ptr queue = queues.find(queueName);
- if (queue) {
- SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
- QueuedMessage dequeued;
- if (queue->acquireMessageAt(position, dequeued)) {
- queue->dequeue(0, dequeued);
- QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes();
- mgmtExchange->inc_byteRoutes(msg.contentSize());
- }
- } else {
- QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- }
- }
- } else {
- QPID_LOG(error, "Cannot process replicated 'dequeue' event. Queue " << queueName << " does not exist");
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- }
- }
-}
-
-bool ReplicationExchange::isDuplicate(const FieldTable* args)
-{
- if (!args->get(REPLICATION_EVENT_SEQNO)) return false;
- SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO));
- if (!init) {
- init = true;
- sequence = seqno;
- return false;
- } else if (seqno > sequence) {
- if (seqno - sequence > 1) {
- QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno);
- }
- sequence = seqno;
- return false;
- } else {
- QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")");
- return true;
- }
-}
-
-bool ReplicationExchange::bind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/)
-{
- throw NotImplementedException("Replication exchange does not support bind operation");
-}
-
-bool ReplicationExchange::unbind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/)
-{
- throw NotImplementedException("Replication exchange does not support unbind operation");
-}
-
-bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* const /*routingKey*/, const FieldTable* const /*args*/)
-{
- return false;
-}
-
-const std::string ReplicationExchange::typeName("replication");
-
-
-void ReplicationExchange::encode(Buffer& buffer) const
-{
- args.setInt64(std::string(SEQUENCE_VALUE), sequence);
- Exchange::encode(buffer);
-}
-
-
-struct ReplicationExchangePlugin : Plugin
-{
- Broker* broker;
-
- ReplicationExchangePlugin();
- void earlyInitialize(Plugin::Target& target);
- void initialize(Plugin::Target& target);
- Exchange::shared_ptr create(const std::string& name, bool durable,
- const framing::FieldTable& args,
- management::Manageable* parent,
- qpid::broker::Broker* broker);
-};
-
-ReplicationExchangePlugin::ReplicationExchangePlugin() : broker(0) {}
-
-Exchange::shared_ptr ReplicationExchangePlugin::create(const std::string& name, bool durable,
- const framing::FieldTable& args,
- management::Manageable* parent, qpid::broker::Broker* broker)
-{
- Exchange::shared_ptr e(new ReplicationExchange(name, durable, args, broker->getQueues(), parent, broker));
- return e;
-}
-
-
-void ReplicationExchangePlugin::earlyInitialize(Plugin::Target& target)
-{
- broker = dynamic_cast<broker::Broker*>(&target);
- if (broker) {
- ExchangeRegistry::FactoryFunction f = boost::bind(&ReplicationExchangePlugin::create, this, _1, _2, _3, _4, _5);
- broker->getExchanges().registerType(ReplicationExchange::typeName, f);
- QPID_LOG(info, "Registered replication exchange");
- }
-}
-
-void ReplicationExchangePlugin::initialize(Target&) {}
-
-static ReplicationExchangePlugin exchangePlugin;
-
-}} // namespace qpid::replication
diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h
deleted file mode 100644
index 4b34e0df13..0000000000
--- a/cpp/src/qpid/replication/ReplicationExchange.h
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef QPID_REPLICATION_REPLICATIONEXCHANGE_H
-#define QPID_REPLICATION_REPLICATIONEXCHANGE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/Exchange.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/SequenceNumber.h"
-
-namespace qpid {
-
-namespace broker {
-class QueueRegistry;
-}
-
-namespace replication {
-
-/**
- * A custom exchange plugin that processes incoming messages
- * representing enqueue or dequeue events for particular queues and
- * carries out the corresponding action to replicate that on the local
- * broker.
- */
-class ReplicationExchange : public qpid::broker::Exchange
-{
- public:
- static const std::string typeName;
-
- ReplicationExchange(const std::string& name, bool durable,
- const qpid::framing::FieldTable& args,
- qpid::broker::QueueRegistry& queues,
- qpid::management::Manageable* parent = 0,
- qpid::broker::Broker* broker = 0);
-
- std::string getType() const;
-
- void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
- private:
- qpid::broker::QueueRegistry& queues;
- qpid::framing::SequenceNumber sequence;
- bool init;
-
- bool isDuplicate(const qpid::framing::FieldTable* args);
- void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
- void handleDequeueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
- void encode(framing::Buffer& buffer) const;
-};
-}} // namespace qpid::replication
-
-#endif /*!QPID_REPLICATION_REPLICATIONEXCHANGE_H*/
diff --git a/cpp/src/qpid/replication/constants.h b/cpp/src/qpid/replication/constants.h
deleted file mode 100644
index c5ba7d3d6a..0000000000
--- a/cpp/src/qpid/replication/constants.h
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-namespace qpid {
-namespace replication {
-namespace constants {
-
-const std::string REPLICATION_EVENT_TYPE("qpid.replication.type");
-const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno");
-const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue");
-const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message");
-const std::string QUEUE_MESSAGE_POSITION("qpid.replication.queue.position");
-
-const int ENQUEUE(1);
-const int DEQUEUE(2);
-
-}}}