diff options
Diffstat (limited to 'cpp/src/qpid/broker/amqp/Session.cpp')
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.cpp | 74 |
1 files changed, 72 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index ae5660448c..5dd6ace943 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -22,6 +22,8 @@ #include "Outgoing.h" #include "Message.h" #include "ManagedConnection.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" @@ -104,8 +106,76 @@ void Session::attach(pn_link_t* link) QueueSettings settings(false, true); //TODO: populate settings from source details when available from engine queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; - //TODO: bind based on filter when that is exposed by engine - if (exchange->getType() == FanOutExchange::typeName) { + pn_data_t* filter = pn_terminus_filter(source); + pn_data_next(filter); + if (filter && !pn_data_is_null(filter)) { + if (pn_data_type(filter) == PN_MAP) { + pn_data_t* echo = pn_terminus_filter(pn_link_source(link)); + pn_data_put_map(echo); + pn_data_enter(echo); + size_t count = pn_data_get_map(filter)/2; + QPID_LOG(debug, "Got filter map with " << count << " entries"); + pn_data_enter(filter); + for (size_t i = 0; i < count; i++) { + pn_bytes_t fname = pn_data_get_symbol(filter); + pn_data_next(filter); + bool isDescribed = pn_data_is_described(filter); + qpid::amqp::Descriptor descriptor(0); + if (isDescribed) { + pn_data_enter(filter); + pn_data_next(filter); + //TODO: use or at least verify descriptor + if (pn_data_type(filter) == PN_ULONG) { + descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(filter)); + } else if (pn_data_type(filter) == PN_SYMBOL) { + pn_bytes_t d = pn_data_get_symbol(filter); + qpid::amqp::CharSequence c; + c.data = d.start; + c.size = d.size; + descriptor = qpid::amqp::Descriptor(c); + } else { + QPID_LOG(notice, "Ignoring filter with descriptor with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter)); + continue; + } + QPID_LOG(debug, "Got filter with descriptor " << descriptor); + pn_data_next(filter); + } else { + QPID_LOG(debug, "Got undescribed filter of type " << pn_data_type(filter)); + } + if (pn_data_type(filter) == PN_STRING) { + pn_bytes_t value = pn_data_get_string(filter); + pn_data_next(filter); + exchange->bind(queue, std::string(value.start, value.size), 0); + pn_data_put_symbol(echo, fname); + if (isDescribed) { + pn_data_put_described(echo); + pn_data_enter(echo); + pn_bytes_t symbol; + switch (descriptor.type) { + case qpid::amqp::Descriptor::NUMERIC: + pn_data_put_ulong(echo, descriptor.value.code); + break; + case qpid::amqp::Descriptor::SYMBOLIC: + symbol.start = const_cast<char*>(descriptor.value.symbol.data); + symbol.size = descriptor.value.symbol.size; + pn_data_put_symbol(echo, symbol); + break; + } + } + pn_data_put_string(echo, value); + if (isDescribed) pn_data_exit(echo); + + QPID_LOG(debug, "Binding using filter " << std::string(fname.start, fname.size) << ":" << std::string(value.start, value.size)); + } else { + //TODO: handle headers exchange filters + QPID_LOG(warning, "Ignoring unsupported filter type with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter)); + } + } + pn_data_exit(echo); + } else { + QPID_LOG(warning, "Filter should be map, got type: " << pn_data_type(filter)); + } + } else if (exchange->getType() == FanOutExchange::typeName) { exchange->bind(queue, std::string(), 0); } else if (exchange->getType() == TopicExchange::typeName) { exchange->bind(queue, "#", 0); |