summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/amqp/Session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/amqp/Session.cpp')
-rw-r--r--cpp/src/qpid/broker/amqp/Session.cpp74
1 files changed, 72 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp
index ae5660448c..5dd6ace943 100644
--- a/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/cpp/src/qpid/broker/amqp/Session.cpp
@@ -22,6 +22,8 @@
#include "Outgoing.h"
#include "Message.h"
#include "ManagedConnection.h"
+#include "qpid/amqp/CharSequence.h"
+#include "qpid/amqp/Descriptor.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
@@ -104,8 +106,76 @@ void Session::attach(pn_link_t* link)
QueueSettings settings(false, true);
//TODO: populate settings from source details when available from engine
queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
- //TODO: bind based on filter when that is exposed by engine
- if (exchange->getType() == FanOutExchange::typeName) {
+ pn_data_t* filter = pn_terminus_filter(source);
+ pn_data_next(filter);
+ if (filter && !pn_data_is_null(filter)) {
+ if (pn_data_type(filter) == PN_MAP) {
+ pn_data_t* echo = pn_terminus_filter(pn_link_source(link));
+ pn_data_put_map(echo);
+ pn_data_enter(echo);
+ size_t count = pn_data_get_map(filter)/2;
+ QPID_LOG(debug, "Got filter map with " << count << " entries");
+ pn_data_enter(filter);
+ for (size_t i = 0; i < count; i++) {
+ pn_bytes_t fname = pn_data_get_symbol(filter);
+ pn_data_next(filter);
+ bool isDescribed = pn_data_is_described(filter);
+ qpid::amqp::Descriptor descriptor(0);
+ if (isDescribed) {
+ pn_data_enter(filter);
+ pn_data_next(filter);
+ //TODO: use or at least verify descriptor
+ if (pn_data_type(filter) == PN_ULONG) {
+ descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(filter));
+ } else if (pn_data_type(filter) == PN_SYMBOL) {
+ pn_bytes_t d = pn_data_get_symbol(filter);
+ qpid::amqp::CharSequence c;
+ c.data = d.start;
+ c.size = d.size;
+ descriptor = qpid::amqp::Descriptor(c);
+ } else {
+ QPID_LOG(notice, "Ignoring filter with descriptor with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter));
+ continue;
+ }
+ QPID_LOG(debug, "Got filter with descriptor " << descriptor);
+ pn_data_next(filter);
+ } else {
+ QPID_LOG(debug, "Got undescribed filter of type " << pn_data_type(filter));
+ }
+ if (pn_data_type(filter) == PN_STRING) {
+ pn_bytes_t value = pn_data_get_string(filter);
+ pn_data_next(filter);
+ exchange->bind(queue, std::string(value.start, value.size), 0);
+ pn_data_put_symbol(echo, fname);
+ if (isDescribed) {
+ pn_data_put_described(echo);
+ pn_data_enter(echo);
+ pn_bytes_t symbol;
+ switch (descriptor.type) {
+ case qpid::amqp::Descriptor::NUMERIC:
+ pn_data_put_ulong(echo, descriptor.value.code);
+ break;
+ case qpid::amqp::Descriptor::SYMBOLIC:
+ symbol.start = const_cast<char*>(descriptor.value.symbol.data);
+ symbol.size = descriptor.value.symbol.size;
+ pn_data_put_symbol(echo, symbol);
+ break;
+ }
+ }
+ pn_data_put_string(echo, value);
+ if (isDescribed) pn_data_exit(echo);
+
+ QPID_LOG(debug, "Binding using filter " << std::string(fname.start, fname.size) << ":" << std::string(value.start, value.size));
+ } else {
+ //TODO: handle headers exchange filters
+ QPID_LOG(warning, "Ignoring unsupported filter type with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter));
+ }
+ }
+ pn_data_exit(echo);
+ } else {
+ QPID_LOG(warning, "Filter should be map, got type: " << pn_data_type(filter));
+ }
+ } else if (exchange->getType() == FanOutExchange::typeName) {
exchange->bind(queue, std::string(), 0);
} else if (exchange->getType() == TopicExchange::typeName) {
exchange->bind(queue, "#", 0);