summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-11-10 21:58:04 +0000
committerGordon Sim <gsim@apache.org>2011-11-10 21:58:04 +0000
commita9dfa76b664106f875a04b09f9dc9a7ee2d62cf3 (patch)
tree856a80358272fe019597dd0fb0c94ebcfaf7cf6b
parentc24a572078bede3f82300a48eddd12be82576886 (diff)
downloadqpid-python-a9dfa76b664106f875a04b09f9dc9a7ee2d62cf3.tar.gz
QPID-3603: Checked in files left out by accident in last commit to this branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1200589 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/NodeClone.cpp219
-rw-r--r--qpid/cpp/src/qpid/broker/NodeClone.h54
2 files changed, 273 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.cpp b/qpid/cpp/src/qpid/broker/NodeClone.cpp
new file mode 100644
index 0000000000..e8fc227884
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/NodeClone.cpp
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "NodeClone.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+
+namespace qpid {
+namespace broker {
+
+namespace{
+bool isQMFv2(const Message& message)
+{
+ const qpid::framing::MessageProperties* props = message.getProperties<qpid::framing::MessageProperties>();
+ return props && props->getAppId() == "qmf2";
+}
+
+template <class T> bool match(qpid::types::Variant::Map& schema)
+{
+ return T::match(schema["_class_name"], schema["_package_name"]);
+}
+
+}
+
+NodeClone::NodeClone(const std::string& name, Broker& b) : Exchange(name), broker(b) {}
+
+NodeClone::~NodeClone() {}
+
+void NodeClone::route(Deliverable& msg, const std::string& /*key*/, const qpid::framing::FieldTable* headers)
+{
+ if (isQMFv2(msg.getMessage()) && headers) {
+ if (headers->getAsString("qmf.content") == "_event") {
+ //decode as list
+ std::string content = msg.getMessage().getFrames().getContent();
+ qpid::types::Variant::List list;
+ qpid::amqp_0_10::ListCodec::decode(content, list);
+ if (list.empty()) {
+ QPID_LOG(error, "Error parsing QMF event, expected non-empty list");
+ } else {
+ try {
+ qpid::types::Variant::Map& map = list.front().asMap();
+ qpid::types::Variant::Map& schema = map["_schema_id"].asMap();
+ qpid::types::Variant::Map& values = map["_values"].asMap();
+ if (match<EventQueueDeclare>(schema)) {
+ if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
+ qpid::framing::FieldTable args;
+ qpid::amqp_0_10::translate(values["args"].asMap(), args);
+ if (!broker.createQueue(
+ values["qName"].asString(),
+ values["durable"].asBool(),
+ values["autoDel"].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values["altEx"].asString(),
+ args,
+ values["user"].asString(),
+ values["rhost"].asString()).second) {
+ QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
+ }
+ }
+ } else if (match<EventQueueDelete>(schema)) {
+ std::string name = values["qName"].asString();
+ QPID_LOG(debug, "Notified of deletion of queue " << name);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+ if (queue && queue->getSettings().isSet("qpid.propagate")/*TODO: check value*/) {
+ broker.deleteQueue(
+ name,
+ values["user"].asString(),
+ values["rhost"].asString());
+ } else {
+ if (queue) {
+ QPID_LOG(debug, "Ignoring deletion notification for non-propagated queue " << name);
+ } else {
+ QPID_LOG(debug, "No such queue " << name);
+ }
+ }
+ } else if (match<EventExchangeDeclare>(schema)) {
+ if (values["disp"] == "created" && values["args"].asMap()["qpid.propagate"]) {
+ qpid::framing::FieldTable args;
+ qpid::amqp_0_10::translate(values["args"].asMap(), args);
+ if (!broker.createExchange(
+ values["exName"].asString(),
+ values["exType"].asString(),
+ values["durable"].asBool(),
+ values["altEx"].asString(),
+ args,
+ values["user"].asString(),
+ values["rhost"].asString()).second) {
+ QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
+ }
+ }
+ } else if (match<EventExchangeDelete>(schema)) {
+ std::string name = values["exName"].asString();
+ QPID_LOG(debug, "Notified of deletion of exchange " << name);
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
+ if (exchange && exchange->getArgs().isSet("qpid.propagate")/*TODO: check value*/) {
+ broker.deleteExchange(
+ name,
+ values["user"].asString(),
+ values["rhost"].asString());
+ } else {
+ if (exchange) {
+ QPID_LOG(debug, "Ignoring deletion notification for non-propagated exchange " << name);
+ } else {
+ QPID_LOG(debug, "No such exchange " << name);
+ }
+ }
+ } catch (const qpid::framing::NotFoundException&) {}
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Error propagating configuration: " << e.what());
+ }
+ }
+ } else if (headers->getAsString("qmf.opcode") == "_query_response") {
+ //decode as list
+ std::string content = msg.getMessage().getFrames().getContent();
+ qpid::types::Variant::List list;
+ qpid::amqp_0_10::ListCodec::decode(content, list);
+ QPID_LOG(debug, "Got query response (" << list.size() << ")");
+ for (qpid::types::Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ std::string type = i->asMap()["_schema_id"].asMap()["_class_name"];
+ qpid::types::Variant::Map& values = i->asMap()["_values"].asMap();
+ QPID_LOG(debug, "class: " << type << ", values: " << values);
+ if (values["arguments"].asMap()["qpid.propagate"]) {
+ qpid::framing::FieldTable args;
+ qpid::amqp_0_10::translate(values["arguments"].asMap(), args);
+ if (type == "queue") {
+ if (!broker.createQueue(
+ values["name"].asString(),
+ values["durable"].asBool(),
+ values["autoDelete"].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second) {
+ QPID_LOG(warning, "Propagatable queue " << values["name"] << " already exists");
+ }
+ } else if (type == "exchange") {
+ if (!broker.createExchange(
+ values["name"].asString(),
+ values["type"].asString(),
+ values["durable"].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second) {
+ QPID_LOG(warning, "Propagatable queue " << values["qName"] << " already exists");
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring unknow object class: " << type);
+ }
+ }
+ }
+ } else {
+ QPID_LOG(debug, "Dropping QMFv2 message with headers: " << *headers);
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring message which is not a valid QMFv2 event or query response");
+ }
+}
+
+bool NodeClone::isNodeCloneDestination(const std::string& target)
+{
+ return target == "qpid.node-cloner";
+}
+
+boost::shared_ptr<Exchange> NodeClone::create(const std::string& target, Broker& broker)
+{
+ boost::shared_ptr<Exchange> exchange;
+ if (isNodeCloneDestination(target)) {
+ //TODO: need to cache the exchange
+ QPID_LOG(info, "Creating node cloner");
+ exchange.reset(new NodeClone(target, broker));
+ }
+ return exchange;
+}
+
+bool NodeClone::bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool NodeClone::unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*) { return false; }
+bool NodeClone::isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const) { return false; }
+
+const std::string NodeClone::typeName("node-cloner");
+
+std::string NodeClone::getType() const
+{
+ return typeName;
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/NodeClone.h b/qpid/cpp/src/qpid/broker/NodeClone.h
new file mode 100644
index 0000000000..71cac619ad
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/NodeClone.h
@@ -0,0 +1,54 @@
+#ifndef QPID_BROKER_NODEPROPAGATOR_H
+#define QPID_BROKER_NODEPROPAGATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+
+/**
+ * Pseudo-exchange for recreating local queues and/or exchanges on
+ * receipt of QMF events indicating their creation on another node
+ */
+class NodeClone : public Exchange
+{
+ public:
+ NodeClone(const std::string&, Broker&);
+ ~NodeClone();
+ std::string getType() const;
+ bool bind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
+ bool unbind(boost::shared_ptr<Queue>, const std::string&, const qpid::framing::FieldTable*);
+ void route(Deliverable&, const std::string&, const qpid::framing::FieldTable*);
+ bool isBound(boost::shared_ptr<Queue>, const std::string* const, const qpid::framing::FieldTable* const);
+
+ static bool isNodeCloneDestination(const std::string&);
+ static boost::shared_ptr<Exchange> create(const std::string&, Broker&);
+ static const std::string typeName;
+ private:
+ Broker& broker;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_NODEPROPAGATOR_H*/