diff options
author | Gordon Sim <gsim@apache.org> | 2013-09-24 11:26:07 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-09-24 11:26:07 +0000 |
commit | 1ee5f62da3dc1e19c5308ab75b81bb2d443f88f0 (patch) | |
tree | 646bcfb0553d14c8bce320d7ff032637fc04c680 /cpp | |
parent | 941ffcd4717571f6d2cb6d92c7896a6b96685565 (diff) | |
download | qpid-python-1ee5f62da3dc1e19c5308ab75b81bb2d443f88f0.tar.gz |
QPID-5156: check node properties on assert
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1525858 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/amqp/NodeProperties.cpp | 94 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/NodeProperties.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 200 |
5 files changed, 304 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/cpp/src/qpid/broker/amqp/NodeProperties.cpp index cc17d6e1a2..e30ae5a68d 100644 --- a/cpp/src/qpid/broker/amqp/NodeProperties.cpp +++ b/cpp/src/qpid/broker/amqp/NodeProperties.cpp @@ -20,6 +20,8 @@ */ #include "qpid/broker/amqp/NodeProperties.h" #include "qpid/broker/amqp/DataReader.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" #include "qpid/broker/QueueSettings.h" #include "qpid/amqp/CharSequence.h" #include "qpid/amqp/Descriptor.h" @@ -100,7 +102,7 @@ bool getLifetimeDescriptorSymbol(QueueSettings::LifetimePolicy policy, pn_bytes_ } -NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {} +NodeProperties::NodeProperties() : received(false), queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {} void NodeProperties::read(pn_data_t* data) { @@ -108,26 +110,92 @@ void NodeProperties::read(pn_data_t* data) reader.read(data); } -void NodeProperties::write(pn_data_t* data) +void NodeProperties::write(pn_data_t* data, boost::shared_ptr<Queue> node) { - pn_data_put_map(data); - pn_data_enter(data); - pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); - pn_data_put_string(data, convert(queue ? MOVE : COPY)); - pn_bytes_t symbol; - if (autoDelete && getLifetimeDescriptorSymbol(lifetime, symbol)) { - pn_data_put_symbol(data, convert(LIFETIME_POLICY)); - pn_data_put_described(data); + if (received) { + pn_data_put_map(data); pn_data_enter(data); - pn_data_put_symbol(data, symbol); - pn_data_put_list(data); + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(MOVE));//TODO: should really add COPY as well, since queues can be browsed + pn_bytes_t symbol; + if (autoDelete && node->isAutoDelete() && getLifetimeDescriptorSymbol(node->getSettings().lifetime, symbol)) { + pn_data_put_symbol(data, convert(LIFETIME_POLICY)); + pn_data_put_described(data); + pn_data_enter(data); + pn_data_put_symbol(data, symbol); + pn_data_put_list(data); + pn_data_exit(data); + } + if (durable && node->isDurable()) { + pn_data_put_symbol(data, convert(DURABLE)); + pn_data_put_bool(data, true); + } + if (exclusive && node->hasExclusiveOwner()) { + pn_data_put_symbol(data, convert(EXCLUSIVE)); + pn_data_put_bool(data, true); + } + if (!alternateExchange.empty() && node->getAlternateExchange()) { + pn_data_put_symbol(data, convert(ALTERNATE_EXCHANGE)); + pn_data_put_string(data, convert(node->getAlternateExchange()->getName())); + } + + qpid::types::Variant::Map actual = node->getSettings().asMap(); + qpid::types::Variant::Map unrecognised; + QueueSettings dummy; + dummy.populate(actual, unrecognised); + for (qpid::types::Variant::Map::const_iterator i = unrecognised.begin(); i != unrecognised.end(); ++i) { + actual.erase(i->first); + } + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + qpid::types::Variant::Map::const_iterator j = actual.find(i->first); + if (j != actual.end()) { + pn_data_put_symbol(data, convert(j->first)); + pn_data_put_string(data, convert(j->second.asString())); + } + } + + pn_data_exit(data); + } +} +namespace { +const std::string QPID_MSG_SEQUENCE("qpid.msg_sequence"); +const std::string QPID_IVE("qpid.ive"); +} +void NodeProperties::write(pn_data_t* data, boost::shared_ptr<Exchange> node) +{ + if (received) { + pn_data_put_map(data); + pn_data_enter(data); + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(COPY)); + if (durable && node->isDurable()) { + pn_data_put_symbol(data, convert(DURABLE)); + pn_data_put_bool(data, true); + } + if (!exchangeType.empty()) { + pn_data_put_symbol(data, convert(EXCHANGE_TYPE)); + pn_data_put_string(data, convert(node->getType())); + } + if (!alternateExchange.empty() && node->getAlternate()) { + pn_data_put_symbol(data, convert(ALTERNATE_EXCHANGE)); + pn_data_put_string(data, convert(node->getAlternate()->getName())); + } + + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if ((i->first == QPID_MSG_SEQUENCE || i->first == QPID_IVE) && node->getArgs().isSet(i->first)) { + pn_data_put_symbol(data, convert(i->first)); + pn_data_put_bool(data, true); + } + } + pn_data_exit(data); } - pn_data_exit(data); } + void NodeProperties::process(const std::string& key, const qpid::types::Variant& value, const Descriptor* d) { + received = true; QPID_LOG(debug, "Processing node property " << key << " = " << value); if (key == SUPPORTED_DIST_MODES) { if (value == MOVE) queue = true; diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.h b/cpp/src/qpid/broker/amqp/NodeProperties.h index 8a759062c0..df96d5a023 100644 --- a/cpp/src/qpid/broker/amqp/NodeProperties.h +++ b/cpp/src/qpid/broker/amqp/NodeProperties.h @@ -24,10 +24,13 @@ #include "qpid/amqp/MapReader.h" #include "qpid/types/Variant.h" #include "qpid/broker/QueueSettings.h" +#include <boost/shared_ptr.hpp> struct pn_data_t; namespace qpid { namespace broker { +class Exchange; +class Queue; struct QueueSettings; namespace amqp { @@ -36,7 +39,8 @@ class NodeProperties : public qpid::amqp::MapReader public: NodeProperties(); void read(pn_data_t*); - void write(pn_data_t*); + void write(pn_data_t*,boost::shared_ptr<Queue>); + void write(pn_data_t*,boost::shared_ptr<Exchange>); void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*); void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*); @@ -63,6 +67,7 @@ class NodeProperties : public qpid::amqp::MapReader bool trackControllingLink() const; const qpid::types::Variant::Map& getProperties() const; private: + bool received; bool queue; bool durable; bool autoDelete; diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index 1f8e01772c..0c8f9a3b06 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -203,17 +203,24 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te node.exchange = connection.getBroker().getExchanges().find(name); node.queue = connection.getBroker().getQueues().find(name); node.topic = connection.getTopics().get(name); + bool createOnDemand = is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus)); + //Strictly speaking, properties should only be specified when the + //terminus is dynamic. However we will not enforce that here. If + //properties are set on the attach request, we will set them on + //our reply. This allows the 'create' and 'assert' options in the + //qpid messaging API to be implemented over 1.0. + node.properties.read(pn_terminus_properties(terminus)); + if (node.topic) node.exchange = node.topic->getExchange(); - if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) { - node.properties.read(pn_terminus_properties(terminus)); + if (node.exchange && !node.queue && createOnDemand) { if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) { + //emulate 0-10 exchange-declare behaviour throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists"); } } if (!node.queue && !node.exchange) { - if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) { + if (pn_terminus_is_dynamic(terminus) || createOnDemand) { //is it a queue or an exchange? - node.properties.read(pn_terminus_properties(terminus)); if (node.properties.isQueue()) { node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first; } else { @@ -222,7 +229,6 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(), args, connection.getUserId(), connection.getId()).first; } - node.created = true; } else { size_t i = name.find('@'); if (i != std::string::npos && (i+1) < name.length()) { @@ -324,12 +330,11 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s if (node.queue) { setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue); authorise.incoming(node.queue); + node.properties.write(pn_terminus_properties(pn_link_target(link)), node.queue); } else if (node.exchange) { setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange); authorise.incoming(node.exchange); - } - if (node.created) { - node.properties.write(pn_terminus_properties(pn_link_target(link))); + node.properties.write(pn_terminus_properties(pn_link_target(link)), node.exchange); } const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link)); @@ -361,10 +366,12 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name) { ResolvedNode node = resolve(name, source, false); - if (node.queue) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue); - else if (node.exchange) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange); - if (node.created) { - node.properties.write(pn_terminus_properties(pn_link_source(link))); + if (node.queue) { + setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue); + node.properties.write(pn_terminus_properties(pn_link_source(link)), node.queue); + } else if (node.exchange) { + setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange); + node.properties.write(pn_terminus_properties(pn_link_source(link)), node.exchange); } Filter filter; diff --git a/cpp/src/qpid/broker/amqp/Session.h b/cpp/src/qpid/broker/amqp/Session.h index b94d3c226d..a991ac9e3e 100644 --- a/cpp/src/qpid/broker/amqp/Session.h +++ b/cpp/src/qpid/broker/amqp/Session.h @@ -100,8 +100,6 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses boost::shared_ptr<qpid::broker::amqp::Topic> topic; boost::shared_ptr<Relay> relay; NodeProperties properties; - bool created; - ResolvedNode() : created(false) {} }; ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming); diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 2a35442ed7..a8fa39e70a 100644 --- a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -22,6 +22,7 @@ #include "qpid/messaging/Address.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/amqp/descriptors.h" +#include "qpid/types/encodings.h" #include "qpid/log/Statement.h" #include <vector> #include <set> @@ -290,6 +291,127 @@ void write(pn_data_t* data, const Variant& value) break; } } +bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value); +bool read(pn_data_t* data, qpid::types::Variant& value) +{ + return read(data, pn_data_type(data), value); +} +void readList(pn_data_t* data, qpid::types::Variant::List& value) +{ + size_t count = pn_data_get_list(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + qpid::types::Variant e; + if (read(data, e)) value.push_back(e); + } + pn_data_exit(data); +} +void readMap(pn_data_t* data, qpid::types::Variant::Map& value) +{ + size_t count = pn_data_get_list(data); + pn_data_enter(data); + for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) { + std::string key = convert(pn_data_get_symbol(data)); + pn_data_next(data); + qpid::types::Variant e; + if (read(data, e)) value[key]= e; + } + pn_data_exit(data); +} +void readArray(pn_data_t* data, qpid::types::Variant::List& value) +{ + size_t count = pn_data_get_array(data); + pn_type_t type = pn_data_get_array_type(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + qpid::types::Variant e; + if (read(data, type, e)) value.push_back(e); + } + pn_data_exit(data); +} +bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value) +{ + switch (type) { + case PN_NULL: + if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant(); + return true; + case PN_BOOL: + value = pn_data_get_bool(data); + return true; + case PN_UBYTE: + value = pn_data_get_ubyte(data); + return true; + case PN_BYTE: + value = pn_data_get_byte(data); + return true; + case PN_USHORT: + value = pn_data_get_ushort(data); + return true; + case PN_SHORT: + value = pn_data_get_short(data); + return true; + case PN_UINT: + value = pn_data_get_uint(data); + return true; + case PN_INT: + value = pn_data_get_int(data); + return true; + case PN_CHAR: + value = pn_data_get_char(data); + return true; + case PN_ULONG: + value = pn_data_get_ulong(data); + return true; + case PN_LONG: + value = pn_data_get_long(data); + return true; + case PN_TIMESTAMP: + value = pn_data_get_timestamp(data); + return true; + case PN_FLOAT: + value = pn_data_get_float(data); + return true; + case PN_DOUBLE: + value = pn_data_get_double(data); + return true; + case PN_UUID: + value = qpid::types::Uuid(pn_data_get_uuid(data).bytes); + return true; + case PN_BINARY: + value = convert(pn_data_get_binary(data)); + value.setEncoding(qpid::types::encodings::BINARY); + return true; + case PN_STRING: + value = convert(pn_data_get_string(data)); + value.setEncoding(qpid::types::encodings::UTF8); + return true; + case PN_SYMBOL: + value = convert(pn_data_get_string(data)); + value.setEncoding(qpid::types::encodings::ASCII); + return true; + case PN_LIST: + value = qpid::types::Variant::List(); + readList(data, value.asList()); + return true; + break; + case PN_MAP: + value = qpid::types::Variant::Map(); + readMap(data, value.asMap()); + return true; + case PN_ARRAY: + value = qpid::types::Variant::List(); + readArray(data, value.asList()); + return true; + case PN_DESCRIBED: + case PN_DECIMAL32: + case PN_DECIMAL64: + case PN_DECIMAL128: + default: + return false; + } + +} + const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes const uint32_t DEFAULT_TIMEOUT(0); } @@ -354,7 +476,7 @@ AddressHelper::AddressHelper(const Address& address) : properties[LIFETIME_POLICY] = DELETE_ON_CLOSE; } - if (properties.size() && !(isTemporary || createPolicy.size())) { + if (properties.size() && !(isTemporary || !createPolicy.empty() || !assertPolicy.empty())) { QPID_LOG(warning, "Properties will be ignored! " << address); } @@ -420,10 +542,48 @@ void AddressHelper::addFilter(const std::string& name, const std::string& descri filters.push_back(Filter(name, descriptor, value)); } +namespace { +bool checkLifetimePolicy(const std::string& requested, const std::string& actual) +{ + if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL && requested == DELETE_ON_CLOSE) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL && requested == DELETE_IF_UNUSED) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL && requested == DELETE_IF_EMPTY) return true; + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL && requested == DELETE_IF_UNUSED_AND_EMPTY) return true; + else return actual == requested; +} +bool checkLifetimePolicy(const std::string& requested, uint64_t actual) +{ + if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL); + else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE) + return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL); + else + return false; +} +bool checkLifetimePolicy(const std::string& requested, pn_data_t* actual) +{ + bool result(false); + if (pn_data_is_described(actual)) { + pn_data_enter(actual); + pn_data_next(actual); + if (pn_data_type(actual) == PN_ULONG) { + result = checkLifetimePolicy(requested, pn_data_get_ulong(actual)); + } else if (pn_data_type(actual) == PN_SYMBOL) { + result = checkLifetimePolicy(requested, convert(pn_data_get_symbol(actual))); + } + pn_data_exit(actual); + } + return result; +} +} void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) { if (assertEnabled(mode)) { - QPID_LOG(debug, "checking assertions: " << capabilities); + QPID_LOG(debug, "checking capabilities: " << capabilities); //ensure all desired capabilities have been offered std::set<std::string> desired; for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { @@ -493,6 +653,40 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) } } if (!first) throw qpid::messaging::AssertionFailed(missing.str()); + + //assert on properties (Note: this violates the AMQP 1.0 + //specification - as does the create option - by sending + //node-properties even if the dynamic option is not + //set. However this can be avoided by not specifying any node + //properties when asserting) + if (!type.empty() || durableNode || !properties.empty()) { + qpid::types::Variant::Map requested = properties; + if (!type.empty()) requested[SUPPORTED_DIST_MODES] = type == TOPIC ? COPY : MOVE; + if (durableNode) requested[DURABLE] = true; + + data = pn_terminus_properties(terminus); + if (pn_data_next(data)) { + size_t count = pn_data_get_map(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + std::string key = convert(pn_data_get_symbol(data)); + pn_data_next(data); + qpid::types::Variant::Map::const_iterator j = requested.find(key); + qpid::types::Variant v; + if (j != requested.end() && + ((key == LIFETIME_POLICY && checkLifetimePolicy(j->second.asString(), data)) || + (read(data, v) && v.asString() == j->second.asString()))) { + requested.erase(j->first); + } + } + pn_data_exit(data); + if (!requested.empty()) { + std::stringstream missing; + missing << "Requested node properties not met: " << requested; + throw qpid::messaging::AssertionFailed(missing.str()); + } + } + } } } @@ -582,6 +776,8 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod //application expects name of node to be as specified setNodeProperties(terminus); createOnDemand = true; + } else if (assertEnabled(mode)) { + setNodeProperties(terminus); } } |