diff options
author | Gordon Sim <gsim@apache.org> | 2013-11-12 13:42:50 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-11-12 13:42:50 +0000 |
commit | 785bfe3e9a9e6afe5494e48d02be2665dc599bb8 (patch) | |
tree | e42e33549654a7f633d9c38d2f7cbdbba90287a0 | |
parent | b955a41e69f69a1ada69c780d9fb7260c0bfc3f2 (diff) | |
download | qpid-python-785bfe3e9a9e6afe5494e48d02be2665dc599bb8.tar.gz |
QPID-5251: allow policies to be specified that will create topics or queues on demand if they match the specified pattern
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541059 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/amqp.cmake | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableObject.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableObject.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/BrokerContext.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp | 325 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/NodePolicy.h | 117 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp | 31 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 44 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Topic.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Topic.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/management-schema.xml | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/regex.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/policies.py | 209 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/swig_python_tests | 2 |
16 files changed, 752 insertions, 31 deletions
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 52316d22b3..ba6e552cab 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -104,6 +104,8 @@ if (BUILD_AMQP) qpid/broker/amqp/ManagedOutgoingLink.cpp qpid/broker/amqp/Message.h qpid/broker/amqp/Message.cpp + qpid/broker/amqp/NodePolicy.h + qpid/broker/amqp/NodePolicy.cpp qpid/broker/amqp/NodeProperties.h qpid/broker/amqp/NodeProperties.cpp qpid/broker/amqp/Outgoing.h diff --git a/qpid/cpp/src/qpid/broker/PersistableObject.cpp b/qpid/cpp/src/qpid/broker/PersistableObject.cpp index 822f795954..575ef09270 100644 --- a/qpid/cpp/src/qpid/broker/PersistableObject.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableObject.cpp @@ -33,6 +33,7 @@ PersistableObject::PersistableObject(const std::string& n, const std::string& t, PersistableObject::PersistableObject() : id(0) {} PersistableObject::~PersistableObject() {} const std::string& PersistableObject::getName() const { return name; } +const std::string& PersistableObject::getType() const { return type; } void PersistableObject::setPersistenceId(uint64_t i) const { id = i; } uint64_t PersistableObject::getPersistenceId() const { return id; } void PersistableObject::encode(framing::Buffer& buffer) const diff --git a/qpid/cpp/src/qpid/broker/PersistableObject.h b/qpid/cpp/src/qpid/broker/PersistableObject.h index 4d7e5e4498..da4bd44601 100644 --- a/qpid/cpp/src/qpid/broker/PersistableObject.h +++ b/qpid/cpp/src/qpid/broker/PersistableObject.h @@ -41,6 +41,7 @@ class PersistableObject : public PersistableConfig QPID_BROKER_EXTERN PersistableObject(const std::string& name, const std::string& type, const qpid::types::Variant::Map properties); QPID_BROKER_EXTERN virtual ~PersistableObject(); QPID_BROKER_EXTERN const std::string& getName() const; + QPID_BROKER_EXTERN const std::string& getType() const; QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const; QPID_BROKER_EXTERN uint64_t getPersistenceId() const; QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const; diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index 53194cf064..8de8539579 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -62,6 +62,7 @@ const std::string FILTER("qpid.filter"); const std::string LIFETIME_POLICY("qpid.lifetime-policy"); const std::string DELETE_IF_UNUSED_KEY("delete-if-unused"); const std::string DELETE_IF_UNUSED_AND_EMPTY_KEY("delete-if-unused-and-empty"); +const std::string MANUAL("manual"); const std::string LVQ_LEGACY("qpid.last_value_queue"); const std::string LVQ_LEGACY_KEY("qpid.LVQ_key"); @@ -227,8 +228,12 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v } else if (key == LIFETIME_POLICY) { if (value.asString() == DELETE_IF_UNUSED_KEY) { lifetime = DELETE_IF_UNUSED; + autodelete = true; } else if (value.asString() == DELETE_IF_UNUSED_AND_EMPTY_KEY) { lifetime = DELETE_IF_UNUSED_AND_EMPTY; + autodelete = true; + } else if (value.asString() == MANUAL) { + autodelete = false; } else { QPID_LOG(warning, "Invalid value for " << LIFETIME_POLICY << ": " << value); } diff --git a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp index b109da961e..9f7ae17293 100644 --- a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp @@ -23,10 +23,11 @@ namespace qpid { namespace broker { namespace amqp { -BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, const std::string& d) : broker(b), interconnects(i), topics(t), domain(d) {} -BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), domain(c.domain) {} +BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, NodePolicyRegistry& np, const std::string& d) : broker(b), interconnects(i), topics(t), nodePolicies(np), domain(d) {} +BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), nodePolicies(c.nodePolicies), domain(c.domain) {} Broker& BrokerContext::getBroker() { return broker; } Interconnects& BrokerContext::getInterconnects() { return interconnects; } TopicRegistry& BrokerContext::getTopics() { return topics; } +NodePolicyRegistry& BrokerContext::getNodePolicies() { return nodePolicies; } std::string BrokerContext::getDomain() { return domain; } }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h index 81c449c68d..feb35e39c4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h +++ b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h @@ -29,22 +29,25 @@ class Broker; namespace amqp { class Interconnects; class TopicRegistry; +class NodePolicyRegistry; /** * Context providing access to broker scoped entities. */ class BrokerContext { public: - BrokerContext(Broker&, Interconnects&, TopicRegistry&, const std::string&); + BrokerContext(Broker&, Interconnects&, TopicRegistry&, NodePolicyRegistry&, const std::string&); BrokerContext(BrokerContext&); Broker& getBroker(); Interconnects& getInterconnects(); TopicRegistry& getTopics(); + NodePolicyRegistry& getNodePolicies(); std::string getDomain(); private: Broker& broker; Interconnects& interconnects; TopicRegistry& topics; + NodePolicyRegistry& nodePolicies; std::string domain; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp b/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp new file mode 100644 index 0000000000..6cefe36f67 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp @@ -0,0 +1,325 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/NodePolicy.h" +#include "qpid/broker/amqp/Connection.h" +#include "qpid/broker/amqp/Topic.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Exchange.h" +#include "qpid/types/Exception.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +const std::string DURABLE("durable"); +const std::string AUTO_DELETE("auto-delete"); +const std::string LIFETIME_POLICY("qpid.lifetime-policy"); +const std::string MANUAL("manual"); +const std::string UNUSED("delete-if-unused"); +const std::string UNUSED_AND_EMPTY("delete-if-unused-and-empty"); +const std::string QUEUE_POLICY("QueuePolicy"); +const std::string TOPIC_POLICY("TopicPolicy"); +const std::string QUEUE("queue"); +const std::string TOPIC("topic"); +const std::string ALTERNATE_EXCHANGE("alternate-exchange"); +const std::string EXCHANGE_TYPE("exchange-type"); +const std::string QPID_MSG_SEQUENCE("qpid.msg_sequence"); +const std::string QPID_IVE("qpid.ive"); +const std::string EMPTY; + +template <typename T> +T get(const std::string& k, const qpid::types::Variant::Map& m, T defaultValue) +{ + qpid::types::Variant::Map::const_iterator i = m.find(k); + if (i == m.end()) return defaultValue; + else return i->second; +} + +std::string getProperty(const std::string& k, const qpid::types::Variant::Map& m) +{ + return get(k, m, EMPTY); +} + +bool testProperty(const std::string& k, const qpid::types::Variant::Map& m) +{ + return get(k, m, false); +} + +qpid::types::Variant::Map filterForQueue(const qpid::types::Variant::Map& properties) +{ + qpid::types::Variant::Map filtered = properties; + filtered.erase(DURABLE); + filtered.erase(AUTO_DELETE); + filtered.erase(ALTERNATE_EXCHANGE); + return filtered; +} +qpid::types::Variant::Map filterForTopic(const qpid::types::Variant::Map& properties) +{ + qpid::types::Variant::Map filtered = properties; + filtered.erase(DURABLE); + filtered.erase(EXCHANGE_TYPE); + filtered.erase(AUTO_DELETE); + filtered.erase(QPID_IVE); + filtered.erase(QPID_MSG_SEQUENCE); + return filtered; +} +void copy(const std::string& key, const qpid::types::Variant::Map& from, qpid::types::Variant::Map& to) +{ + qpid::types::Variant::Map::const_iterator i = from.find(key); + if (i != from.end()) to.insert(*i); +} + +} +NodePolicy::NodePolicy(const std::string& type, const std::string& ptrn, const qpid::types::Variant::Map& props) + : PersistableObject(ptrn, type, props), pattern(ptrn), + durable(testProperty(DURABLE, props)), + alternateExchange(getProperty(ALTERNATE_EXCHANGE, props)), + compiled(pattern) {} + +NodePolicy::~NodePolicy() {} + +const std::string& NodePolicy::getPattern() const +{ + return pattern; +} + +bool NodePolicy::isDurable() const +{ + return durable; +} + +bool NodePolicy::match(const std::string& name) const +{ + return qpid::sys::regex_match(name, compiled); +} + +QueuePolicy::QueuePolicy(Broker& broker, const std::string& pattern, const qpid::types::Variant::Map& props) + : NodePolicy(QUEUE_POLICY, pattern, props), + queueSettings(durable, testProperty(AUTO_DELETE, props)) +{ + qpid::types::Variant::Map unused; + qpid::types::Variant::Map filtered = filterForQueue(props); + //if queue is not durable and neither lifetime policy nor + //autodelete were explicitly specified, clean it up when not + //needed by default: + if (!queueSettings.durable && props.find(LIFETIME_POLICY) == props.end() && props.find(AUTO_DELETE) == props.end()) { + filtered[LIFETIME_POLICY] = UNUSED_AND_EMPTY; + } + queueSettings.populate(filtered, unused); + qpid::amqp_0_10::translate(filtered, queueSettings.storeSettings); + + qpid::management::ManagementAgent* agent = broker.getManagementAgent(); + if (agent != 0) { + policy = _qmf::QueuePolicy::shared_ptr(new _qmf::QueuePolicy(agent, this, pattern)); + policy->set_properties(props); + agent->addObject(policy); + } +} +QueuePolicy::~QueuePolicy() +{ + if (policy != 0) policy->resourceDestroy(); +} + + +std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > QueuePolicy::create(const std::string& name, Connection& connection) +{ + std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result; + result.first = connection.getBroker().createQueue(name, queueSettings, 0/*not exclusive*/, alternateExchange, connection.getUserId(), connection.getId()).first; + return result; +} + +boost::shared_ptr<qpid::management::ManagementObject> QueuePolicy::GetManagementObject() const +{ + return policy; +} + +TopicPolicy::TopicPolicy(Broker& broker, const std::string& pattern, const qpid::types::Variant::Map& props) + : NodePolicy(TOPIC_POLICY, pattern, props), exchangeType(getProperty(EXCHANGE_TYPE, props)), + autodelete(get(AUTO_DELETE, props, !durable)) +{ + qpid::types::Variant::Map::const_iterator i = props.find(LIFETIME_POLICY); + if (i != props.end()) { + if (i->second == MANUAL) { + autodelete = false; + } else if (i->second == UNUSED || i->second == UNUSED_AND_EMPTY/*though empty doesn't mean much for an exchange*/) { + autodelete = true; + } else { + QPID_LOG(warning, "Did not recognise lifetime policy " << i->second << " in topic policy for " << pattern); + } + } + topicSettings = filterForTopic(props); + copy(QPID_IVE, props, exchangeSettings); + copy(QPID_MSG_SEQUENCE, props, exchangeSettings); + if (exchangeType.empty()) exchangeType = TOPIC; + + qpid::management::ManagementAgent* agent = broker.getManagementAgent(); + if (agent != 0) { + policy = _qmf::TopicPolicy::shared_ptr(new _qmf::TopicPolicy(agent, this, pattern)); + policy->set_properties(props); + agent->addObject(policy); + } +} + +TopicPolicy::~TopicPolicy() +{ + if (policy != 0) policy->resourceDestroy(); +} + +std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > TopicPolicy::create(const std::string& name, Connection& connection) +{ + std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result; + qpid::framing::FieldTable args; + qpid::amqp_0_10::translate(exchangeSettings, args); + boost::shared_ptr<Exchange> exchange = connection.getBroker().createExchange(name, exchangeType, isDurable(), autodelete, alternateExchange, + args, connection.getUserId(), connection.getId()).first; + result.second = connection.getTopics().createTopic(connection.getBroker(), name, exchange, topicSettings); + return result; +} + +boost::shared_ptr<qpid::management::ManagementObject> TopicPolicy::GetManagementObject() const +{ + return policy; +} + +boost::shared_ptr<NodePolicy> NodePolicyRegistry::createQueuePolicy(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties) +{ + boost::shared_ptr<NodePolicy> nodePolicy(new QueuePolicy(broker, name, properties)); + add(nodePolicy); + return nodePolicy; +} + +boost::shared_ptr<NodePolicy> NodePolicyRegistry::createTopicPolicy(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties) +{ + boost::shared_ptr<NodePolicy> nodePolicy(new TopicPolicy(broker, name, properties)); + add(nodePolicy); + return nodePolicy; +} + +boost::shared_ptr<NodePolicy> NodePolicyRegistry::createNodePolicy(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties) +{ + if (type == QUEUE_POLICY) { + return createQueuePolicy(broker, name, properties); + } else if (type == TOPIC_POLICY) { + return createTopicPolicy(broker, name, properties); + } else { + return boost::shared_ptr<NodePolicy>(); + } +} + +bool NodePolicyRegistry::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& /*userId*/, const std::string& /*connectionId*/) +{ + boost::shared_ptr<NodePolicy> nodePolicy = createNodePolicy(broker, type, name, properties); + if (nodePolicy) { + if (nodePolicy->isDurable()) broker.getStore().create(*nodePolicy); + return true; + } else { + return false; + } +} +bool NodePolicyRegistry::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map&, + const std::string& /*userId*/, const std::string& /*connectionId*/) +{ + if (type == QUEUE_POLICY || type == TOPIC_POLICY) { + boost::shared_ptr<NodePolicy> nodePolicy = remove(name, type); + if (nodePolicy) { + if (nodePolicy->isDurable()) broker.getStore().destroy(*nodePolicy); + return true; + } else { + return false; + } + } else { + return false; + } +} +bool NodePolicyRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId) +{ + + boost::shared_ptr<NodePolicy> nodePolicy = createNodePolicy(broker, type, name, properties); + if (nodePolicy) { + nodePolicy->setPersistenceId(persistenceId); + return true; + } else { + return false; + } +} + +void NodePolicyRegistry::add(boost::shared_ptr<NodePolicy> nodePolicy) +{ + qpid::sys::Mutex::ScopedLock l(lock); + NodePolicies::const_iterator i = nodePolicies.find(nodePolicy->getName()); + if (i == nodePolicies.end()) { + nodePolicies.insert(NodePolicies::value_type(nodePolicy->getName(), nodePolicy)); + } else { + if (i->second->getType() != nodePolicy->getType()) { + throw qpid::types::Exception(QPID_MSG("Cannot create object of type " << nodePolicy->getType() << " with key " + << nodePolicy->getName() << " as an object of type " << i->second->getType() << " already exists with the same key")); + } else { + throw qpid::types::Exception(QPID_MSG("An object of type " << nodePolicy->getType() << " with key " << nodePolicy->getName() << " already exists")); + } + } +} +boost::shared_ptr<NodePolicy> NodePolicyRegistry::remove(const std::string& pattern, const std::string& type) +{ + boost::shared_ptr<NodePolicy> result; + qpid::sys::Mutex::ScopedLock l(lock); + NodePolicies::iterator i = nodePolicies.find(pattern); + if (i != nodePolicies.end()) { + if (i->second->getType() != type) { + throw qpid::types::Exception(QPID_MSG("Object with key " << i->first << " is of type " << i->second->getType() << " not " << type)); + } + result = i->second; + nodePolicies.erase(i); + } + return result; +} +boost::shared_ptr<NodePolicy> NodePolicyRegistry::get(const std::string& pattern) +{ + qpid::sys::Mutex::ScopedLock l(lock); + NodePolicies::const_iterator i = nodePolicies.find(pattern); + if (i == nodePolicies.end()) { + return boost::shared_ptr<NodePolicy>(); + } else { + return i->second; + } +} + +boost::shared_ptr<NodePolicy> NodePolicyRegistry::match(const std::string& name) +{ + qpid::sys::Mutex::ScopedLock l(lock); + boost::shared_ptr<NodePolicy> best; + for (NodePolicies::const_iterator i = nodePolicies.begin(); i != nodePolicies.end(); ++i) { + //where multiple policies match, pick the one with the longest + //pattern as a crude guesstimate of the more specific one + if (i->second->match(name) && (!best || i->first.size() > best->getPattern().size())) { + best = i->second; + } + } + return best; +} + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h b/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h new file mode 100644 index 0000000000..d6e987d85f --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h @@ -0,0 +1,117 @@ +#ifndef QPID_BROKER_AMQP_NODEPOLICY_H +#define QPID_BROKER_AMQP_NODEPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/ObjectFactory.h" +#include "qpid/broker/PersistableObject.h" +#include "qpid/broker/QueueSettings.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/regex.h" +#include "qpid/types/Variant.h" +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/QueuePolicy.h" +#include "qmf/org/apache/qpid/broker/TopicPolicy.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { +class Broker; +class Queue; +namespace amqp { +class Connection; +class Topic; + +/** + * Policy for creation of nodes 'on-demand' + */ +class NodePolicy : public PersistableObject, public management::Manageable +{ + public: + NodePolicy(const std::string& type, const std::string& ptrn, const qpid::types::Variant::Map& props); + virtual ~NodePolicy(); + const std::string& getPattern() const; + bool match(const std::string&) const; + bool isDurable() const; + virtual std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > create(const std::string&, Connection&) = 0; + virtual boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const = 0; + protected: + NodePolicy(Broker&, const std::string& type, const std::string& pattern, const qpid::types::Variant::Map& properties); + const std::string pattern; + bool durable; + std::string alternateExchange; + qpid::sys::regex compiled; +}; + +class QueuePolicy : public NodePolicy +{ + public: + QueuePolicy(Broker&, const std::string& pattern, const qpid::types::Variant::Map& properties); + ~QueuePolicy(); + std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > create(const std::string&, Connection&); + boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const; + private: + qpid::broker::QueueSettings queueSettings; + qmf::org::apache::qpid::broker::QueuePolicy::shared_ptr policy; +}; + +class TopicPolicy : public NodePolicy +{ + public: + TopicPolicy(Broker&, const std::string& pattern, const qpid::types::Variant::Map& properties); + ~TopicPolicy(); + std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > create(const std::string&, Connection&); + boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const; + private: + qpid::types::Variant::Map topicSettings; + std::string exchangeType; + bool autodelete; + qpid::types::Variant::Map exchangeSettings; + qmf::org::apache::qpid::broker::TopicPolicy::shared_ptr policy; +}; + +class NodePolicyRegistry : public ObjectFactory +{ + public: + bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId); + + boost::shared_ptr<NodePolicy> match(const std::string& name); + boost::shared_ptr<NodePolicy> createQueuePolicy(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); + boost::shared_ptr<NodePolicy> createTopicPolicy(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); + private: + typedef std::map<std::string, boost::shared_ptr<NodePolicy> > NodePolicies; + qpid::sys::Mutex lock; + NodePolicies nodePolicies; + + boost::shared_ptr<NodePolicy> createNodePolicy(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties); + void add(boost::shared_ptr<NodePolicy> nodePolicy); + boost::shared_ptr<NodePolicy> remove(const std::string& pattern, const std::string& type); + boost::shared_ptr<NodePolicy> get(const std::string& pattern); +}; + +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_NODEPOLICY_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index d0311c34d2..cd31ef7788 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -29,6 +29,7 @@ #include "qpid/broker/amqp/Connection.h" #include "qpid/broker/amqp/Interconnects.h" #include "qpid/broker/amqp/Message.h" +#include "qpid/broker/amqp/NodePolicy.h" #include "qpid/broker/amqp/Sasl.h" #include "qpid/broker/amqp/Topic.h" #include "qpid/broker/amqp/Translation.h" @@ -44,22 +45,27 @@ namespace amqp { struct Options : public qpid::Options { std::string domain; + std::vector<std::string> queuePatterns; + std::vector<std::string> topicPatterns; Options() : qpid::Options("AMQP 1.0 Options") { addOptions() - ("domain", optValue(domain, "DOMAIN"), "Domain of this broker"); + ("domain", optValue(domain, "DOMAIN"), "Domain of this broker") + ("queue-patterns", optValue(queuePatterns, "PATTERN"), "Pattern for on-demand queues") + ("topic-patterns", optValue(topicPatterns, "PATTERN"), "Pattern for on-demand topics"); } }; class ProtocolImpl : public BrokerContext, public Protocol { public: - ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, Broker& broker, const std::string& domain) - : BrokerContext(broker, *interconnects, *topics, domain) + ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, NodePolicyRegistry* policies, Broker& broker, const std::string& domain) + : BrokerContext(broker, *interconnects, *topics, *policies, domain) { interconnects->setContext(*this); broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown broker.getObjectFactoryRegistry().add(topics);//registry deletes on shutdown + broker.getObjectFactoryRegistry().add(policies);//registry deletes on shutdown } qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&); @@ -71,18 +77,33 @@ struct ProtocolPlugin : public Plugin { Options options; Options* getOptions() { return &options; } + NodePolicyRegistry* policies; + + ProtocolPlugin() : policies(0) {} void earlyInitialize(Plugin::Target& target) { //need to register protocol before recovery from store broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target); if (broker) { - ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), *broker, options.domain); + policies = new NodePolicyRegistry(); + ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), policies, *broker, options.domain); broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown } } - void initialize(Plugin::Target&) {} + void initialize(Plugin::Target& target) + { + broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target); + if (broker) { + for (std::vector<std::string>::const_iterator i = options.queuePatterns.begin(); i != options.queuePatterns.end(); ++i) { + policies->createQueuePolicy(*broker, *i, qpid::types::Variant::Map()); + } + for (std::vector<std::string>::const_iterator i = options.topicPatterns.begin(); i != options.topicPatterns.end(); ++i) { + policies->createTopicPolicy(*broker, *i, qpid::types::Variant::Map()); + } + } + } }; ProtocolPlugin instance; // Static initialization diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 7170da0797..ab677faac3 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -26,6 +26,7 @@ #include "Domain.h" #include "Exception.h" #include "Interconnects.h" +#include "NodePolicy.h" #include "Relay.h" #include "Topic.h" #include "qpid/amqp/descriptors.h" @@ -260,19 +261,36 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first; } } else { - size_t i = name.find('@'); - if (i != std::string::npos && (i+1) < name.length()) { - std::string domain = name.substr(i+1); - std::string local = name.substr(0, i); - std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str(); - //does this domain exist? - boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain); - if (d) { - node.relay = boost::shared_ptr<Relay>(new Relay(1000)); - if (incoming) { - d->connect(false, id, name, local, connection, node.relay); - } else { - d->connect(true, id, local, name, connection, node.relay); + boost::shared_ptr<NodePolicy> nodePolicy = connection.getNodePolicies().match(name); + if (nodePolicy) { + std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result = nodePolicy->create(name, connection); + node.queue = result.first; + node.topic = result.second; + if (node.topic) node.exchange = node.topic->getExchange(); + + if (node.queue) { + QPID_LOG(info, "Created queue " << name << " from policy with pattern " << nodePolicy->getPattern()); + } else if (node.topic) { + QPID_LOG(info, "Created topic " << name << " from policy with pattern " << nodePolicy->getPattern()); + } else { + QPID_LOG(debug, "Created neither a topic nor a queue for " << name << " from policy with pattern " << nodePolicy->getPattern()); + } + + } else { + size_t i = name.find('@'); + if (i != std::string::npos && (i+1) < name.length()) { + std::string domain = name.substr(i+1); + std::string local = name.substr(0, i); + std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str(); + //does this domain exist? + boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain); + if (d) { + node.relay = boost::shared_ptr<Relay>(new Relay(1000)); + if (incoming) { + d->connect(false, id, name, local, connection, node.relay); + } else { + d->connect(true, id, local, name, connection, node.relay); + } } } } diff --git a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp index c04f62b3d1..4e3de21c74 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp @@ -58,8 +58,8 @@ qpid::types::Variant::Map filter(const qpid::types::Variant::Map& properties) } } -Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties) - : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(broker.getExchanges().get(getProperty(EXCHANGE, properties))), +Topic::Topic(Broker& broker, const std::string& n, boost::shared_ptr<Exchange> e, const qpid::types::Variant::Map& properties) + : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(e), alternateExchange(getProperty(ALTERNATE_EXCHANGE, properties)) { if (exchange->getName().empty()) throw qpid::Exception("Exchange must be specified."); @@ -107,9 +107,9 @@ const std::string& Topic::getAlternateExchange() const { return alternateExchange; } -boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties) +boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::string& name, boost::shared_ptr<Exchange> exchange, const qpid::types::Variant::Map& properties) { - boost::shared_ptr<Topic> topic(new Topic(broker, name, properties)); + boost::shared_ptr<Topic> topic(new Topic(broker, name, exchange, properties)); add(topic); topic->getExchange()->setDeletionListener(name, boost::bind(&TopicRegistry::remove, this, name)); return topic; @@ -119,7 +119,7 @@ bool TopicRegistry::createObject(Broker& broker, const std::string& type, const const std::string& /*userId*/, const std::string& /*connectionId*/) { if (type == TOPIC) { - boost::shared_ptr<Topic> topic = createTopic(broker, name, props); + boost::shared_ptr<Topic> topic = createTopic(broker, name, broker.getExchanges().get(getProperty(EXCHANGE, props)), props); if (topic->isDurable()) broker.getStore().create(*topic); return true; } else { @@ -147,7 +147,7 @@ bool TopicRegistry::recoverObject(Broker& broker, const std::string& type, const uint64_t persistenceId) { if (type == TOPIC) { - boost::shared_ptr<Topic> topic = createTopic(broker, name, properties); + boost::shared_ptr<Topic> topic = createTopic(broker, name, broker.getExchanges().get(getProperty(EXCHANGE, properties)), properties); topic->setPersistenceId(persistenceId); return true; } else { diff --git a/qpid/cpp/src/qpid/broker/amqp/Topic.h b/qpid/cpp/src/qpid/broker/amqp/Topic.h index e08830ba0f..df16f4a738 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Topic.h +++ b/qpid/cpp/src/qpid/broker/amqp/Topic.h @@ -47,7 +47,7 @@ namespace amqp { class Topic : public PersistableObject, public management::Manageable { public: - Topic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); + Topic(Broker&, const std::string& name, boost::shared_ptr<Exchange>, const qpid::types::Variant::Map& properties); ~Topic(); const std::string& getName() const; const QueueSettings& getPolicy() const; @@ -77,12 +77,12 @@ class TopicRegistry : public ObjectFactory bool add(boost::shared_ptr<Topic> topic); boost::shared_ptr<Topic> remove(const std::string& name); boost::shared_ptr<Topic> get(const std::string& name); + boost::shared_ptr<Topic> createTopic(Broker&, const std::string& name, boost::shared_ptr<Exchange> exchange, const qpid::types::Variant::Map& properties); private: typedef std::map<std::string, boost::shared_ptr<Topic> > Topics; qpid::sys::Mutex lock; Topics topics; - boost::shared_ptr<Topic> createTopic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/management-schema.xml b/qpid/cpp/src/qpid/broker/management-schema.xml index 4d66b72318..bf6514b855 100644 --- a/qpid/cpp/src/qpid/broker/management-schema.xml +++ b/qpid/cpp/src/qpid/broker/management-schema.xml @@ -437,6 +437,24 @@ <property name="durable" type="bool" access="RC"/> <property name="properties" type="map" access="RO"/> </class> + <!-- + =============================================================== + AMQP 1.0 QueuePolicy + =============================================================== + --> + <class name="QueuePolicy"> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="properties" type="map" access="RO"/> + </class> + <!-- + =============================================================== + AMQP 1.0 TopicPolicy + =============================================================== + --> + <class name="TopicPolicy"> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="properties" type="map" access="RO"/> + </class> <!-- diff --git a/qpid/cpp/src/qpid/sys/regex.h b/qpid/cpp/src/qpid/sys/regex.h index c183991eb7..77de6a7f5c 100644 --- a/qpid/cpp/src/qpid/sys/regex.h +++ b/qpid/cpp/src/qpid/sys/regex.h @@ -56,7 +56,7 @@ public: friend bool regex_match(const std::string& s, const regex& re); }; -bool regex_match(const std::string& s, const regex& re) { +inline bool regex_match(const std::string& s, const regex& re) { return ::regexec(&(re.re), s.c_str(), 0, 0, 0)==0; } diff --git a/qpid/cpp/src/tests/policies.py b/qpid/cpp/src/tests/policies.py new file mode 100644 index 0000000000..ec0191f91e --- /dev/null +++ b/qpid/cpp/src/tests/policies.py @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.tests.messaging.implementation import * +from qpid.tests.messaging import VersionTest + +class Mgmt: + """ + Simple QMF management utility (qpidtoollibs uses + qpid.messaging.Message rather than swigged version) + """ + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def list(self, class_name): + props = {'method' : 'request', + 'qmf.opcode' : '_query_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name.lower()}} + + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + + + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item['_values']) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + return items + + def do_qmf_method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(timeout) + self.sess.acknowledge() + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def create(self, _type, name, properties={}): + return self.do_qmf_method('create', {'type': _type, 'name': name, 'properties': properties}) + + def delete(self, _type, name): + return self.do_qmf_method('delete', {'type': _type, 'name': name}) + + +class PoliciesTests (VersionTest): + """ + Tests for node policies with qpidd + """ + + def do_simple_queue_test(self, pattern, name, properties={}, autodeleted=True): + mgmt = self.create_connection("amqp0-10", True) + agent = Mgmt(mgmt) + agent.create('QueuePolicy', pattern, properties) + try: + snd = self.ssn.sender(name) + msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']] + for m in msgs: snd.send(m) + snd.close() + + for expected in msgs: + rcv = self.ssn.receiver(name) + msg = rcv.fetch(0) + assert msg.content == expected.content, (msg.content, expected.content) + self.ssn.acknowledge() + rcv.close() #close after each message to ensure queue isn't deleted with messages in it + self.ssn.close() + self.conn.close() + + matched = [q for q in agent.list("Queue") if q['name'] == name] + if autodeleted: + # ensure that queue is no longer there (as empty and unused) + assert len(matched) == 0, (matched) + else: + # ensure that queue is still there though empty and unused + assert len(matched) == 1, (matched) + finally: + agent.delete('QueuePolicy', pattern) + mgmt.close() + + def test_queue(self): + self.do_simple_queue_test("queue-*", "queue-1") + + def test_queue_not_autodeleted(self): + self.do_simple_queue_test("permanent-queue-*", "permanent-queue-1", {'auto-delete':False}, False) + + def test_queue_manual_delete(self): + self.do_simple_queue_test("permanent-queue-*", "permanent-queue-1", {'qpid.lifetime-policy':'manual'}, False) + + def test_queue_delete_if_unused_and_empty(self): + self.do_simple_queue_test("queue-*", "queue-1", {'qpid.lifetime-policy':'delete-if-unused-and-empty'}, True) + + def do_simple_topic_test(self, pattern, name, properties={}, autodeleted=True): + mgmt = self.create_connection("amqp0-10", True) + agent = Mgmt(mgmt) + agent.create('TopicPolicy', pattern, properties) + try: + snd = self.ssn.sender(name) + rcv1 = self.ssn.receiver(name) + rcv2 = self.ssn.receiver(name) + + msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']] + for m in msgs: snd.send(m) + + for rcv in [rcv1, rcv2]: + for expected in msgs: + msg = rcv.fetch(0) + assert msg.content == expected.content, (msg.content, expected.content) + self.ssn.acknowledge() + rcv1.close() + rcv2.close() + snd.close() + + matched = [e for e in agent.list("Exchange") if e['name'] == name] + if autodeleted: + # ensure that exchange is no longer there (as it is now unused) + assert len(matched) == 0, (matched) + else: + # ensure that exchange has not been autodeleted in spite of being unused + assert len(matched) == 1, (matched) + finally: + agent.delete('TopicPolicy', pattern) + mgmt.close() + + def test_topic(self): + self.do_simple_topic_test('fanout-*', 'fanout-1', {'exchange-type':'fanout'}) + + def test_topic_not_autodelete(self): + self.do_simple_topic_test('permanent-fanout-*', 'permanent-fanout-1', {'exchange-type':'fanout', 'auto-delete':False}, False) + + def test_topic_manual_delete(self): + self.do_simple_topic_test('permanent-fanout-*', 'permanent-fanout-1', {'exchange-type':'fanout', 'qpid.lifetime-policy':'manual'}, False) + + def test_topic_delete_if_unused(self): + self.do_simple_topic_test('fanout-*', 'fanout-1', {'exchange-type':'fanout', 'qpid.lifetime-policy':'delete-if-unused'}, True) + + def test_mgmt(self): + mgmt = self.create_connection("amqp0-10", True) + agent = Mgmt(mgmt) + agent.create('QueuePolicy', 'queue-*') + agent.create('QueuePolicy', 'alt.queue.*') + agent.create('TopicPolicy', 'topic-*') + try: + queues = [q['name'] for q in agent.list("QueuePolicy")] + topics = [t['name'] for t in agent.list("TopicPolicy")] + assert 'queue-*' in queues, (queues) + assert 'alt.queue.*' in queues, (queues) + + try: + agent.delete('TopicPolicy', 'queue-*') + assert False, ('Deletion of policy using wrong type should fail') + except: None + + finally: + agent.delete('QueuePolicy', 'queue-*') + agent.delete('QueuePolicy', 'alt.queue.*') + agent.delete('TopicPolicy', 'topic-*') + mgmt.close() diff --git a/qpid/cpp/src/tests/swig_python_tests b/qpid/cpp/src/tests/swig_python_tests index dd793373d2..2eb8ce67b3 100755 --- a/qpid/cpp/src/tests/swig_python_tests +++ b/qpid/cpp/src/tests/swig_python_tests @@ -53,7 +53,7 @@ export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG export QPID_USE_SWIG_CLIENT=1 $QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1 if [[ -a $AMQP_LIB ]] ; then - $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 + $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 fi stop_broker if [[ $FAILED -eq 1 ]]; then |