diff options
Diffstat (limited to 'cpp/src/qpid/replication/ReplicatingEventListener.cpp')
-rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.cpp | 200 |
1 files changed, 0 insertions, 200 deletions
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp deleted file mode 100644 index 9284bda388..0000000000 --- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ /dev/null @@ -1,200 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/replication/ReplicatingEventListener.h" -#include "qpid/replication/constants.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/QueueEvents.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace replication { - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::replication::constants; - -void ReplicatingEventListener::handle(QueueEvents::Event event) -{ - switch (event.type) { - case QueueEvents::ENQUEUE: - deliverEnqueueMessage(event.msg); - QPID_LOG(debug, "Queuing 'enqueue' event on " << event.msg.queue->getName() << " for replication"); - break; - case QueueEvents::DEQUEUE: - deliverDequeueMessage(event.msg); - QPID_LOG(debug, "Queuing 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position " - << event.msg.position << ")"); - break; - } -} - -namespace { -const std::string EMPTY; -} - -void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeued) -{ - FieldTable headers; - headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName()); - headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE); - headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position); - boost::intrusive_ptr<Message> msg(createMessage(headers)); - DeliveryProperties* props = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); - props->setRoutingKey(dequeued.queue->getName()); - route(msg); -} - -void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued) -{ - boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); - msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); - msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE); - msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position); - route(msg); -} - -void ReplicatingEventListener::route(boost::intrusive_ptr<qpid::broker::Message> msg) -{ - try { - if (exchange) { - DeliverableMessage deliverable(msg); - exchange->route(deliverable); - } else if (queue) { - queue->deliver(msg); - } else { - QPID_LOG(error, "Cannot route replication event, neither replication queue nor exchange configured"); - } - } catch (const std::exception& e) { - QPID_LOG(error, "Error enqueing replication event: " << e.what()); - } -} - - -boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers) -{ - boost::intrusive_ptr<Message> msg(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), EMPTY, 0, 0))); - AMQFrame header((AMQHeaderBody())); - header.setBof(false); - header.setEof(true); - header.setBos(true); - header.setEos(true); - msg->getFrames().append(method); - msg->getFrames().append(header); - MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setApplicationHeaders(headers); - return msg; -} - -struct AppendingHandler : FrameHandler -{ - boost::intrusive_ptr<Message> msg; - - AppendingHandler(boost::intrusive_ptr<Message> m) : msg(m) {} - - void handle(AMQFrame& f) - { - msg->getFrames().append(f); - } -}; - -boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original) -{ - boost::intrusive_ptr<Message> copy(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), EMPTY, 0, 0))); - AppendingHandler handler(copy); - handler.handle(method); - - //To avoid modifying original headers, create new frame with - //cloned body: - AMQFrame header(*original->getFrames().getHeaders()); - header.setBof(false); - header.setEof(!original->getFrames().hasContent());//if there are any content frames then the header is not the end of the frameset - header.setBos(true); - header.setEos(true); - handler.handle(header); - - original->sendContent(queue, handler, std::numeric_limits<int16_t>::max()); - return copy; -} - -Options* ReplicatingEventListener::getOptions() -{ - return &options; -} - -void ReplicatingEventListener::initialize(Plugin::Target& target) -{ - Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker) { - broker->addFinalizer(boost::bind(&ReplicatingEventListener::shutdown, this)); - if (!options.exchange.empty()) { - if (!options.queue.empty()) { - QPID_LOG(warning, "Replication queue option ignored as replication exchange has been specified"); - } - try { - exchange = broker->getExchanges().declare(options.exchange, options.exchangeType).first; - } catch (const UnknownExchangeTypeException&) { - QPID_LOG(error, "Replication disabled due to invalid type: " << options.exchangeType); - } - } else if (!options.queue.empty()) { - if (options.createQueue) { - queue = broker->getQueues().declare(options.queue).first; - } else { - queue = broker->getQueues().find(options.queue); - } - if (queue) { - queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO); - } else { - QPID_LOG(error, "Replication queue named '" << options.queue << "' does not exist; replication plugin disabled."); - } - } - if (queue || exchange) { - QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1); - broker->getQueueEvents().registerListener(options.name, callback); - QPID_LOG(info, "Registered replicating queue event listener"); - } - } -} - -void ReplicatingEventListener::earlyInitialize(Target&) {} -void ReplicatingEventListener::shutdown() { queue.reset(); exchange.reset(); } - -ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"), - exchangeType("direct"), - name("replicator"), - createQueue(false) -{ - addOptions() - ("replication-exchange-name", optValue(exchange, "EXCHANGE"), "Exchange to which events for other queues are routed") - ("replication-exchange-type", optValue(exchangeType, "direct|topic etc"), "Type of exchange to use") - ("replication-queue", optValue(queue, "QUEUE"), "Queue on which events for other queues are recorded") - ("replication-listener-name", optValue(name, "NAME"), "name by which to register the replicating event listener") - ("create-replication-queue", optValue(createQueue), "if set, the replication will be created if it does not exist"); -} - -static ReplicatingEventListener plugin; - -}} // namespace qpid::replication |