diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp | 164 |
1 files changed, 129 insertions, 35 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp index a937c1171e..eb30c78128 100644 --- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp @@ -20,10 +20,16 @@ */ #include "qpid/broker/amqp/NodeProperties.h" #include "qpid/broker/amqp/DataReader.h" +#include "qpid/broker/QueueSettings.h" #include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" #include "qpid/types/Variant.h" #include "qpid/broker/QueueSettings.h" #include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} using qpid::amqp::CharSequence; using qpid::amqp::Descriptor; @@ -36,6 +42,7 @@ namespace { const std::string MOVE("move"); const std::string COPY("copy"); const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); +const std::string LIFETIME_POLICY("lifetime-policy"); //AMQP 0-10 standard parameters: const std::string DURABLE("durable"); @@ -43,9 +50,57 @@ const std::string EXCLUSIVE("exclusive"); const std::string AUTO_DELETE("auto-delete"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); const std::string EXCHANGE_TYPE("exchange-type"); + +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; } -NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic") {} +bool getLifetimePolicy(const Descriptor& d, QueueSettings::LifetimePolicy& policy) +{ + if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE)) { + policy = QueueSettings::DELETE_ON_CLOSE; + return true; + } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE)) { + policy = QueueSettings::DELETE_IF_UNUSED; + return true; + } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE)) { + policy = QueueSettings::DELETE_IF_EMPTY; + return true; + } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE)) { + policy = QueueSettings::DELETE_IF_UNUSED_AND_EMPTY; + return true; + } else { + return false; + } +} + +bool getLifetimeDescriptorSymbol(QueueSettings::LifetimePolicy policy, pn_bytes_t& out) +{ + switch (policy) { + case QueueSettings::DELETE_ON_CLOSE: + out = convert(qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL); + return true; + case QueueSettings::DELETE_IF_UNUSED: + out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL); + return true; + case QueueSettings::DELETE_IF_EMPTY: + out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL); + return true; + case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY: + out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL); + return true; + default: + return false; + } +} + +} + +NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {} void NodeProperties::read(pn_data_t* data) { @@ -53,12 +108,38 @@ void NodeProperties::read(pn_data_t* data) reader.read(data); } -void NodeProperties::process(const std::string& key, const qpid::types::Variant& value) +void NodeProperties::write(pn_data_t* data) +{ + pn_data_put_map(data); + pn_data_enter(data); + pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); + pn_data_put_string(data, convert(queue ? MOVE : COPY)); + pn_bytes_t symbol; + if (autoDelete && getLifetimeDescriptorSymbol(lifetime, symbol)) { + pn_data_put_symbol(data, convert(LIFETIME_POLICY)); + pn_data_put_described(data); + pn_data_enter(data); + pn_data_put_symbol(data, symbol); + pn_data_put_list(data); + pn_data_exit(data); + } + pn_data_exit(data); +} + +void NodeProperties::process(const std::string& key, const qpid::types::Variant& value, const Descriptor* d) { - QPID_LOG(notice, "Processing node property " << key << " = " << value); + QPID_LOG(debug, "Processing node property " << key << " = " << value); if (key == SUPPORTED_DIST_MODES) { if (value == MOVE) queue = true; else if (value == COPY) queue = false; + } else if (key == LIFETIME_POLICY) { + if (d) { + if (getLifetimePolicy(*d, lifetime)) { + autoDelete = true; + } else { + QPID_LOG(warning, "Unrecognised lifetime policy: " << *d); + } + } } else if (key == DURABLE) { durable = value; } else if (key == EXCLUSIVE) { @@ -74,84 +155,91 @@ void NodeProperties::process(const std::string& key, const qpid::types::Variant& } } -void NodeProperties::onNullValue(const CharSequence& key, const Descriptor*) +bool NodeProperties::onStartListValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor* d) { - process(key.str(), qpid::types::Variant()); + QPID_LOG(debug, "NodeProperties::onStartListValue(" << std::string(key.data, key.size) << ", " << count << ", " << d); + process(key.str(), qpid::types::Variant(), d); + return true; } -void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor*) +void NodeProperties::onNullValue(const CharSequence& key, const Descriptor* d) { - process(key.str(), value); + process(key.str(), qpid::types::Variant(), d); } -void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*) +void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*) +void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*) +void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*) +void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor*) +void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor*) +void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor*) +void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor*) +void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor*) +void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor*) +void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value, d); } -void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor* d) { - process(key.str(), value.str()); + process(key.str(), value, d); } -void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*) +void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor* d) { - process(key.str(), value); + process(key.str(), value.str(), d); } -void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor* d) { - process(key.str(), value.str()); + process(key.str(), value, d); } -void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor* d) { - process(key.str(), value.str()); + process(key.str(), value.str(), d); +} + +void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor* d) +{ + process(key.str(), value.str(), d); } QueueSettings NodeProperties::getQueueSettings() @@ -159,6 +247,7 @@ QueueSettings NodeProperties::getQueueSettings() QueueSettings settings(durable, autoDelete); qpid::types::Variant::Map unused; settings.populate(properties, unused); + settings.lifetime = lifetime; return settings; } @@ -183,4 +272,9 @@ std::string NodeProperties::getAlternateExchange() const return alternateExchange; } +bool NodeProperties::trackControllingLink() const +{ + return lifetime == QueueSettings::DELETE_ON_CLOSE || lifetime == QueueSettings::DELETE_IF_EMPTY; +} + }}} // namespace qpid::broker::amqp |