summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/NodeClone.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/NodeClone.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/NodeClone.cpp219
1 files changed, 0 insertions, 219 deletions
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.cpp b/qpid/cpp/src/qpid/broker/NodeClone.cpp
deleted file mode 100644
index e8fc227884..0000000000
--- a/qpid/cpp/src/qpid/broker/NodeClone.cpp
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "NodeClone.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/log/Statement.h"
-#include "qpid/amqp_0_10/Codecs.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-
-using qmf::org::apache::qpid::broker::EventQueueDeclare;
-using qmf::org::apache::qpid::broker::EventQueueDelete;
-using qmf::org::apache::qpid::broker::EventExchangeDeclare;
-using qmf::org::apache::qpid::broker::EventExchangeDelete;
-
-namespace qpid {
-namespace broker {
-
-namespace{
-bool isQMFv2(const Message& message)
-{
- const qpid::framing::MessageProperties* props = message.getProperties<qpid::framing::MessageProperties>();
- return props && props->getAppId() == "qmf2";
-}
-
-template <class T> bool match(qpid::types::Variant::Map& schema)
-{
- return T::match(schema["_class_name"], schema["_package_name"]);
-}
-
-}
-
-NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), broker(b) {}
-
-NodeClone::~NodeClone() {}
-
-void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable* headers)
-{
- if (isQMFv2(msg.getMessage()) && headers) {
- if (headers->getAsString("qmf.content") == "_event") {
- //decode as list
- std::string content = msg.getMessage().getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- if (list.empty()) {
- QPID_LOG(error, "Error parsing QMF event, expected non-empty list");
- } else {
- try {
- qpid::types::Variant::Map& map = list.front().asMap();
- qpid::types::Variant::Map& schema = map["_schema_id"].asMap();
- qpid::types::Variant::Map& values = map["_values"].asMap();
- if (match<EventQueueDeclare>(schema)) {
- if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["args"].asMap(), args);
- if (!broker.createQueue(
- values["qName"].asString(),
- values["durable"].asBool(),
- values["autoDel"].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- values["altEx"].asString(),
- args,
- values["user"].asString(),
- values["rhost"].asString()).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- }
- } else if (match<EventQueueDelete>(schema)) {
- std::string name = values["qName"].asString();
- QPID_LOG(debug, "Notified of deletion of queue " << name);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) {
- broker.deleteQueue(
- name,
- values["user"].asString(),
- values["rhost"].asString());
- } else {
- if (queue) {
- QPID_LOG(debug, "Ignoring deletion notification for non-propagated queue " << name);
- } else {
- QPID_LOG(debug, "No such queue " << name);
- }
- }
- } else if (match<EventExchangeDeclare>(schema)) {
- if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["args"].asMap(), args);
- if (!broker.createExchange(
- values["exName"].asString(),
- values["exType"].asString(),
- values["durable"].asBool(),
- values["altEx"].asString(),
- args,
- values["user"].asString(),
- values["rhost"].asString()).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- }
- } else if (match<EventExchangeDelete>(schema)) {
- std::string name = values["exName"].asString();
- QPID_LOG(debug, "Notified of deletion of exchange " << name);
- try {
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
- if (exchange && exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) {
- broker.deleteExchange(
- name,
- values["user"].asString(),
- values["rhost"].asString());
- } else {
- if (exchange) {
- QPID_LOG(debug, "Ignoring deletion notification for non-propagated exchange " << name);
- } else {
- QPID_LOG(debug, "No such exchange " << name);
- }
- }
- } catch (const qpid::framing::NotFoundException&) {}
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Error propagating configuration: " << e.what());
- }
- }
- } else if (headers->getAsString("qmf.opcode") == "_query_response") {
- //decode as list
- std::string content = msg.getMessage().getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- QPID_LOG(debug, "Got query response (" << list.size() << ")");
- for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
- std::string type = i->asMap()["_schema_id"].asMap()["_class_name"];
- qpid::types::Variant::Map& values = i->asMap()["_values"].asMap();
- QPID_LOG(debug, "class: " << type << ", values: " << values);
- if (values["arguments"].asMap()["qpid.propagate"]) {
- qpid::framing::FieldTable args;
- qpid::amqp_0_10::translate(values["arguments"].asMap(), args);
- if (type == "queue") {
- if (!broker.createQueue(
- values["name"].asString(),
- values["durable"].asBool(),
- values["autoDelete"].asBool(),
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "Propagatable queue " << values["name"] << " already exists");
- }
- } else if (type == "exchange") {
- if (!broker.createExchange(
- values["name"].asString(),
- values["type"].asString(),
- values["durable"].asBool(),
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
- }
- } else {
- QPID_LOG(warning, "Ignoring unknow object class: " << type);
- }
- }
- }
- } else {
- QPID_LOG(debug, "Dropping QMFv2 message with headers: " << *headers);
- }
- } else {
- QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event or query response");
- }
-}
-
-bool NodeClone::isNodeCloneDestination(const std::string& target)
-{
- return target == "qpid.node-cloner";
-}
-
-boost::shared_ptr<Exchange> NodeClone::create(const std::string& target, Broker& broker)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isNodeCloneDestination(target)) {
- //TODO: need to cache the exchange
- QPID_LOG(info, "Creating node cloner");
- exchange.reset(new NodeClone(target, broker));
- }
- return exchange;
-}
-
-bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
-bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
-
-const std::string NodeClone::typeName("node-cloner");
-
-std::string NodeClone::getType() const
-{
- return typeName;
-}
-
-}} // namespace qpid::broker