summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp164
1 files changed, 129 insertions, 35 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
index a937c1171e..eb30c78128 100644
--- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
@@ -20,10 +20,16 @@
*/
#include "qpid/broker/amqp/NodeProperties.h"
#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/Descriptor.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/types/Variant.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/log/Statement.h"
+extern "C" {
+#include <proton/engine.h>
+}
using qpid::amqp::CharSequence;
using qpid::amqp::Descriptor;
@@ -36,6 +42,7 @@ namespace {
const std::string MOVE("move");
const std::string COPY("copy");
const std::string SUPPORTED_DIST_MODES("supported-dist-modes");
+const std::string LIFETIME_POLICY("lifetime-policy");
//AMQP 0-10 standard parameters:
const std::string DURABLE("durable");
@@ -43,9 +50,57 @@ const std::string EXCLUSIVE("exclusive");
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
const std::string EXCHANGE_TYPE("exchange-type");
+
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
}
-NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic") {}
+bool getLifetimePolicy(const Descriptor& d, QueueSettings::LifetimePolicy& policy)
+{
+ if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE)) {
+ policy = QueueSettings::DELETE_ON_CLOSE;
+ return true;
+ } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE)) {
+ policy = QueueSettings::DELETE_IF_UNUSED;
+ return true;
+ } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE)) {
+ policy = QueueSettings::DELETE_IF_EMPTY;
+ return true;
+ } else if (d.match(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE)) {
+ policy = QueueSettings::DELETE_IF_UNUSED_AND_EMPTY;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool getLifetimeDescriptorSymbol(QueueSettings::LifetimePolicy policy, pn_bytes_t& out)
+{
+ switch (policy) {
+ case QueueSettings::DELETE_ON_CLOSE:
+ out = convert(qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL);
+ return true;
+ case QueueSettings::DELETE_IF_UNUSED:
+ out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL);
+ return true;
+ case QueueSettings::DELETE_IF_EMPTY:
+ out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL);
+ return true;
+ case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY:
+ out = convert(qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL);
+ return true;
+ default:
+ return false;
+ }
+}
+
+}
+
+NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {}
void NodeProperties::read(pn_data_t* data)
{
@@ -53,12 +108,38 @@ void NodeProperties::read(pn_data_t* data)
reader.read(data);
}
-void NodeProperties::process(const std::string& key, const qpid::types::Variant& value)
+void NodeProperties::write(pn_data_t* data)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(queue ? MOVE : COPY));
+ pn_bytes_t symbol;
+ if (autoDelete && getLifetimeDescriptorSymbol(lifetime, symbol)) {
+ pn_data_put_symbol(data, convert(LIFETIME_POLICY));
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, symbol);
+ pn_data_put_list(data);
+ pn_data_exit(data);
+ }
+ pn_data_exit(data);
+}
+
+void NodeProperties::process(const std::string& key, const qpid::types::Variant& value, const Descriptor* d)
{
- QPID_LOG(notice, "Processing node property " << key << " = " << value);
+ QPID_LOG(debug, "Processing node property " << key << " = " << value);
if (key == SUPPORTED_DIST_MODES) {
if (value == MOVE) queue = true;
else if (value == COPY) queue = false;
+ } else if (key == LIFETIME_POLICY) {
+ if (d) {
+ if (getLifetimePolicy(*d, lifetime)) {
+ autoDelete = true;
+ } else {
+ QPID_LOG(warning, "Unrecognised lifetime policy: " << *d);
+ }
+ }
} else if (key == DURABLE) {
durable = value;
} else if (key == EXCLUSIVE) {
@@ -74,84 +155,91 @@ void NodeProperties::process(const std::string& key, const qpid::types::Variant&
}
}
-void NodeProperties::onNullValue(const CharSequence& key, const Descriptor*)
+bool NodeProperties::onStartListValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor* d)
{
- process(key.str(), qpid::types::Variant());
+ QPID_LOG(debug, "NodeProperties::onStartListValue(" << std::string(key.data, key.size) << ", " << count << ", " << d);
+ process(key.str(), qpid::types::Variant(), d);
+ return true;
}
-void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor*)
+void NodeProperties::onNullValue(const CharSequence& key, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), qpid::types::Variant(), d);
}
-void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*)
+void NodeProperties::onBooleanValue(const CharSequence& key, bool value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*)
+void NodeProperties::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*)
+void NodeProperties::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*)
+void NodeProperties::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor*)
+void NodeProperties::onULongValue(const CharSequence& key, uint64_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor*)
+void NodeProperties::onByteValue(const CharSequence& key, int8_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor*)
+void NodeProperties::onShortValue(const CharSequence& key, int16_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor*)
+void NodeProperties::onIntValue(const CharSequence& key, int32_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor*)
+void NodeProperties::onLongValue(const CharSequence& key, int64_t value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor*)
+void NodeProperties::onFloatValue(const CharSequence& key, float value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value, d);
}
-void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+void NodeProperties::onDoubleValue(const CharSequence& key, double value, const Descriptor* d)
{
- process(key.str(), value.str());
+ process(key.str(), value, d);
}
-void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*)
+void NodeProperties::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor* d)
{
- process(key.str(), value);
+ process(key.str(), value.str(), d);
}
-void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+void NodeProperties::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor* d)
{
- process(key.str(), value.str());
+ process(key.str(), value, d);
}
-void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+void NodeProperties::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor* d)
{
- process(key.str(), value.str());
+ process(key.str(), value.str(), d);
+}
+
+void NodeProperties::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor* d)
+{
+ process(key.str(), value.str(), d);
}
QueueSettings NodeProperties::getQueueSettings()
@@ -159,6 +247,7 @@ QueueSettings NodeProperties::getQueueSettings()
QueueSettings settings(durable, autoDelete);
qpid::types::Variant::Map unused;
settings.populate(properties, unused);
+ settings.lifetime = lifetime;
return settings;
}
@@ -183,4 +272,9 @@ std::string NodeProperties::getAlternateExchange() const
return alternateExchange;
}
+bool NodeProperties::trackControllingLink() const
+{
+ return lifetime == QueueSettings::DELETE_ON_CLOSE || lifetime == QueueSettings::DELETE_IF_EMPTY;
+}
+
}}} // namespace qpid::broker::amqp