summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/amqp/NodeProperties.cpp94
-rw-r--r--cpp/src/qpid/broker/amqp/NodeProperties.h7
-rw-r--r--cpp/src/qpid/broker/amqp/Session.cpp31
-rw-r--r--cpp/src/qpid/broker/amqp/Session.h2
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.cpp200
5 files changed, 304 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/cpp/src/qpid/broker/amqp/NodeProperties.cpp
index cc17d6e1a2..e30ae5a68d 100644
--- a/cpp/src/qpid/broker/amqp/NodeProperties.cpp
+++ b/cpp/src/qpid/broker/amqp/NodeProperties.cpp
@@ -20,6 +20,8 @@
*/
#include "qpid/broker/amqp/NodeProperties.h"
#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/Descriptor.h"
@@ -100,7 +102,7 @@ bool getLifetimeDescriptorSymbol(QueueSettings::LifetimePolicy policy, pn_bytes_
}
-NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {}
+NodeProperties::NodeProperties() : received(false), queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {}
void NodeProperties::read(pn_data_t* data)
{
@@ -108,26 +110,92 @@ void NodeProperties::read(pn_data_t* data)
reader.read(data);
}
-void NodeProperties::write(pn_data_t* data)
+void NodeProperties::write(pn_data_t* data, boost::shared_ptr<Queue> node)
{
- pn_data_put_map(data);
- pn_data_enter(data);
- pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
- pn_data_put_string(data, convert(queue ? MOVE : COPY));
- pn_bytes_t symbol;
- if (autoDelete && getLifetimeDescriptorSymbol(lifetime, symbol)) {
- pn_data_put_symbol(data, convert(LIFETIME_POLICY));
- pn_data_put_described(data);
+ if (received) {
+ pn_data_put_map(data);
pn_data_enter(data);
- pn_data_put_symbol(data, symbol);
- pn_data_put_list(data);
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(MOVE));//TODO: should really add COPY as well, since queues can be browsed
+ pn_bytes_t symbol;
+ if (autoDelete && node->isAutoDelete() && getLifetimeDescriptorSymbol(node->getSettings().lifetime, symbol)) {
+ pn_data_put_symbol(data, convert(LIFETIME_POLICY));
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, symbol);
+ pn_data_put_list(data);
+ pn_data_exit(data);
+ }
+ if (durable && node->isDurable()) {
+ pn_data_put_symbol(data, convert(DURABLE));
+ pn_data_put_bool(data, true);
+ }
+ if (exclusive && node->hasExclusiveOwner()) {
+ pn_data_put_symbol(data, convert(EXCLUSIVE));
+ pn_data_put_bool(data, true);
+ }
+ if (!alternateExchange.empty() && node->getAlternateExchange()) {
+ pn_data_put_symbol(data, convert(ALTERNATE_EXCHANGE));
+ pn_data_put_string(data, convert(node->getAlternateExchange()->getName()));
+ }
+
+ qpid::types::Variant::Map actual = node->getSettings().asMap();
+ qpid::types::Variant::Map unrecognised;
+ QueueSettings dummy;
+ dummy.populate(actual, unrecognised);
+ for (qpid::types::Variant::Map::const_iterator i = unrecognised.begin(); i != unrecognised.end(); ++i) {
+ actual.erase(i->first);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ qpid::types::Variant::Map::const_iterator j = actual.find(i->first);
+ if (j != actual.end()) {
+ pn_data_put_symbol(data, convert(j->first));
+ pn_data_put_string(data, convert(j->second.asString()));
+ }
+ }
+
+ pn_data_exit(data);
+ }
+}
+namespace {
+const std::string QPID_MSG_SEQUENCE("qpid.msg_sequence");
+const std::string QPID_IVE("qpid.ive");
+}
+void NodeProperties::write(pn_data_t* data, boost::shared_ptr<Exchange> node)
+{
+ if (received) {
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(COPY));
+ if (durable && node->isDurable()) {
+ pn_data_put_symbol(data, convert(DURABLE));
+ pn_data_put_bool(data, true);
+ }
+ if (!exchangeType.empty()) {
+ pn_data_put_symbol(data, convert(EXCHANGE_TYPE));
+ pn_data_put_string(data, convert(node->getType()));
+ }
+ if (!alternateExchange.empty() && node->getAlternate()) {
+ pn_data_put_symbol(data, convert(ALTERNATE_EXCHANGE));
+ pn_data_put_string(data, convert(node->getAlternate()->getName()));
+ }
+
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if ((i->first == QPID_MSG_SEQUENCE || i->first == QPID_IVE) && node->getArgs().isSet(i->first)) {
+ pn_data_put_symbol(data, convert(i->first));
+ pn_data_put_bool(data, true);
+ }
+ }
+
pn_data_exit(data);
}
- pn_data_exit(data);
}
+
void NodeProperties::process(const std::string& key, const qpid::types::Variant& value, const Descriptor* d)
{
+ received = true;
QPID_LOG(debug, "Processing node property " << key << " = " << value);
if (key == SUPPORTED_DIST_MODES) {
if (value == MOVE) queue = true;
diff --git a/cpp/src/qpid/broker/amqp/NodeProperties.h b/cpp/src/qpid/broker/amqp/NodeProperties.h
index 8a759062c0..df96d5a023 100644
--- a/cpp/src/qpid/broker/amqp/NodeProperties.h
+++ b/cpp/src/qpid/broker/amqp/NodeProperties.h
@@ -24,10 +24,13 @@
#include "qpid/amqp/MapReader.h"
#include "qpid/types/Variant.h"
#include "qpid/broker/QueueSettings.h"
+#include <boost/shared_ptr.hpp>
struct pn_data_t;
namespace qpid {
namespace broker {
+class Exchange;
+class Queue;
struct QueueSettings;
namespace amqp {
@@ -36,7 +39,8 @@ class NodeProperties : public qpid::amqp::MapReader
public:
NodeProperties();
void read(pn_data_t*);
- void write(pn_data_t*);
+ void write(pn_data_t*,boost::shared_ptr<Queue>);
+ void write(pn_data_t*,boost::shared_ptr<Exchange>);
void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*);
void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*);
@@ -63,6 +67,7 @@ class NodeProperties : public qpid::amqp::MapReader
bool trackControllingLink() const;
const qpid::types::Variant::Map& getProperties() const;
private:
+ bool received;
bool queue;
bool durable;
bool autoDelete;
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp
index 1f8e01772c..0c8f9a3b06 100644
--- a/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/cpp/src/qpid/broker/amqp/Session.cpp
@@ -203,17 +203,24 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
node.exchange = connection.getBroker().getExchanges().find(name);
node.queue = connection.getBroker().getQueues().find(name);
node.topic = connection.getTopics().get(name);
+ bool createOnDemand = is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus));
+ //Strictly speaking, properties should only be specified when the
+ //terminus is dynamic. However we will not enforce that here. If
+ //properties are set on the attach request, we will set them on
+ //our reply. This allows the 'create' and 'assert' options in the
+ //qpid messaging API to be implemented over 1.0.
+ node.properties.read(pn_terminus_properties(terminus));
+
if (node.topic) node.exchange = node.topic->getExchange();
- if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
- node.properties.read(pn_terminus_properties(terminus));
+ if (node.exchange && !node.queue && createOnDemand) {
if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
+ //emulate 0-10 exchange-declare behaviour
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
}
}
if (!node.queue && !node.exchange) {
- if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
+ if (pn_terminus_is_dynamic(terminus) || createOnDemand) {
//is it a queue or an exchange?
- node.properties.read(pn_terminus_properties(terminus));
if (node.properties.isQueue()) {
node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
} else {
@@ -222,7 +229,6 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
args, connection.getUserId(), connection.getId()).first;
}
- node.created = true;
} else {
size_t i = name.find('@');
if (i != std::string::npos && (i+1) < name.length()) {
@@ -324,12 +330,11 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
if (node.queue) {
setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
authorise.incoming(node.queue);
+ node.properties.write(pn_terminus_properties(pn_link_target(link)), node.queue);
} else if (node.exchange) {
setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
authorise.incoming(node.exchange);
- }
- if (node.created) {
- node.properties.write(pn_terminus_properties(pn_link_target(link)));
+ node.properties.write(pn_terminus_properties(pn_link_target(link)), node.exchange);
}
const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
@@ -361,10 +366,12 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name)
{
ResolvedNode node = resolve(name, source, false);
- if (node.queue) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
- else if (node.exchange) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
- if (node.created) {
- node.properties.write(pn_terminus_properties(pn_link_source(link)));
+ if (node.queue) {
+ setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
+ node.properties.write(pn_terminus_properties(pn_link_source(link)), node.queue);
+ } else if (node.exchange) {
+ setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
+ node.properties.write(pn_terminus_properties(pn_link_source(link)), node.exchange);
}
Filter filter;
diff --git a/cpp/src/qpid/broker/amqp/Session.h b/cpp/src/qpid/broker/amqp/Session.h
index b94d3c226d..a991ac9e3e 100644
--- a/cpp/src/qpid/broker/amqp/Session.h
+++ b/cpp/src/qpid/broker/amqp/Session.h
@@ -100,8 +100,6 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
boost::shared_ptr<qpid::broker::amqp::Topic> topic;
boost::shared_ptr<Relay> relay;
NodeProperties properties;
- bool created;
- ResolvedNode() : created(false) {}
};
ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 2a35442ed7..a8fa39e70a 100644
--- a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -22,6 +22,7 @@
#include "qpid/messaging/Address.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/types/encodings.h"
#include "qpid/log/Statement.h"
#include <vector>
#include <set>
@@ -290,6 +291,127 @@ void write(pn_data_t* data, const Variant& value)
break;
}
}
+bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value);
+bool read(pn_data_t* data, qpid::types::Variant& value)
+{
+ return read(data, pn_data_type(data), value);
+}
+void readList(pn_data_t* data, qpid::types::Variant::List& value)
+{
+ size_t count = pn_data_get_list(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ qpid::types::Variant e;
+ if (read(data, e)) value.push_back(e);
+ }
+ pn_data_exit(data);
+}
+void readMap(pn_data_t* data, qpid::types::Variant::Map& value)
+{
+ size_t count = pn_data_get_list(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
+ std::string key = convert(pn_data_get_symbol(data));
+ pn_data_next(data);
+ qpid::types::Variant e;
+ if (read(data, e)) value[key]= e;
+ }
+ pn_data_exit(data);
+}
+void readArray(pn_data_t* data, qpid::types::Variant::List& value)
+{
+ size_t count = pn_data_get_array(data);
+ pn_type_t type = pn_data_get_array_type(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ qpid::types::Variant e;
+ if (read(data, type, e)) value.push_back(e);
+ }
+ pn_data_exit(data);
+}
+bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value)
+{
+ switch (type) {
+ case PN_NULL:
+ if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant();
+ return true;
+ case PN_BOOL:
+ value = pn_data_get_bool(data);
+ return true;
+ case PN_UBYTE:
+ value = pn_data_get_ubyte(data);
+ return true;
+ case PN_BYTE:
+ value = pn_data_get_byte(data);
+ return true;
+ case PN_USHORT:
+ value = pn_data_get_ushort(data);
+ return true;
+ case PN_SHORT:
+ value = pn_data_get_short(data);
+ return true;
+ case PN_UINT:
+ value = pn_data_get_uint(data);
+ return true;
+ case PN_INT:
+ value = pn_data_get_int(data);
+ return true;
+ case PN_CHAR:
+ value = pn_data_get_char(data);
+ return true;
+ case PN_ULONG:
+ value = pn_data_get_ulong(data);
+ return true;
+ case PN_LONG:
+ value = pn_data_get_long(data);
+ return true;
+ case PN_TIMESTAMP:
+ value = pn_data_get_timestamp(data);
+ return true;
+ case PN_FLOAT:
+ value = pn_data_get_float(data);
+ return true;
+ case PN_DOUBLE:
+ value = pn_data_get_double(data);
+ return true;
+ case PN_UUID:
+ value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
+ return true;
+ case PN_BINARY:
+ value = convert(pn_data_get_binary(data));
+ value.setEncoding(qpid::types::encodings::BINARY);
+ return true;
+ case PN_STRING:
+ value = convert(pn_data_get_string(data));
+ value.setEncoding(qpid::types::encodings::UTF8);
+ return true;
+ case PN_SYMBOL:
+ value = convert(pn_data_get_string(data));
+ value.setEncoding(qpid::types::encodings::ASCII);
+ return true;
+ case PN_LIST:
+ value = qpid::types::Variant::List();
+ readList(data, value.asList());
+ return true;
+ break;
+ case PN_MAP:
+ value = qpid::types::Variant::Map();
+ readMap(data, value.asMap());
+ return true;
+ case PN_ARRAY:
+ value = qpid::types::Variant::List();
+ readArray(data, value.asList());
+ return true;
+ case PN_DESCRIBED:
+ case PN_DECIMAL32:
+ case PN_DECIMAL64:
+ case PN_DECIMAL128:
+ default:
+ return false;
+ }
+
+}
+
const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
const uint32_t DEFAULT_TIMEOUT(0);
}
@@ -354,7 +476,7 @@ AddressHelper::AddressHelper(const Address& address) :
properties[LIFETIME_POLICY] = DELETE_ON_CLOSE;
}
- if (properties.size() && !(isTemporary || createPolicy.size())) {
+ if (properties.size() && !(isTemporary || !createPolicy.empty() || !assertPolicy.empty())) {
QPID_LOG(warning, "Properties will be ignored! " << address);
}
@@ -420,10 +542,48 @@ void AddressHelper::addFilter(const std::string& name, const std::string& descri
filters.push_back(Filter(name, descriptor, value));
}
+namespace {
+bool checkLifetimePolicy(const std::string& requested, const std::string& actual)
+{
+ if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL && requested == DELETE_ON_CLOSE) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL && requested == DELETE_IF_UNUSED) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL && requested == DELETE_IF_EMPTY) return true;
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL && requested == DELETE_IF_UNUSED_AND_EMPTY) return true;
+ else return actual == requested;
+}
+bool checkLifetimePolicy(const std::string& requested, uint64_t actual)
+{
+ if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL);
+ else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE)
+ return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL);
+ else
+ return false;
+}
+bool checkLifetimePolicy(const std::string& requested, pn_data_t* actual)
+{
+ bool result(false);
+ if (pn_data_is_described(actual)) {
+ pn_data_enter(actual);
+ pn_data_next(actual);
+ if (pn_data_type(actual) == PN_ULONG) {
+ result = checkLifetimePolicy(requested, pn_data_get_ulong(actual));
+ } else if (pn_data_type(actual) == PN_SYMBOL) {
+ result = checkLifetimePolicy(requested, convert(pn_data_get_symbol(actual)));
+ }
+ pn_data_exit(actual);
+ }
+ return result;
+}
+}
void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
{
if (assertEnabled(mode)) {
- QPID_LOG(debug, "checking assertions: " << capabilities);
+ QPID_LOG(debug, "checking capabilities: " << capabilities);
//ensure all desired capabilities have been offered
std::set<std::string> desired;
for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
@@ -493,6 +653,40 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
}
}
if (!first) throw qpid::messaging::AssertionFailed(missing.str());
+
+ //assert on properties (Note: this violates the AMQP 1.0
+ //specification - as does the create option - by sending
+ //node-properties even if the dynamic option is not
+ //set. However this can be avoided by not specifying any node
+ //properties when asserting)
+ if (!type.empty() || durableNode || !properties.empty()) {
+ qpid::types::Variant::Map requested = properties;
+ if (!type.empty()) requested[SUPPORTED_DIST_MODES] = type == TOPIC ? COPY : MOVE;
+ if (durableNode) requested[DURABLE] = true;
+
+ data = pn_terminus_properties(terminus);
+ if (pn_data_next(data)) {
+ size_t count = pn_data_get_map(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ std::string key = convert(pn_data_get_symbol(data));
+ pn_data_next(data);
+ qpid::types::Variant::Map::const_iterator j = requested.find(key);
+ qpid::types::Variant v;
+ if (j != requested.end() &&
+ ((key == LIFETIME_POLICY && checkLifetimePolicy(j->second.asString(), data)) ||
+ (read(data, v) && v.asString() == j->second.asString()))) {
+ requested.erase(j->first);
+ }
+ }
+ pn_data_exit(data);
+ if (!requested.empty()) {
+ std::stringstream missing;
+ missing << "Requested node properties not met: " << requested;
+ throw qpid::messaging::AssertionFailed(missing.str());
+ }
+ }
+ }
}
}
@@ -582,6 +776,8 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod
//application expects name of node to be as specified
setNodeProperties(terminus);
createOnDemand = true;
+ } else if (assertEnabled(mode)) {
+ setNodeProperties(terminus);
}
}