summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/AddressHelper.cpp')
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.cpp147
1 files changed, 147 insertions, 0 deletions
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 258b4b84bd..f3a42b201b 100644
--- a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -21,6 +21,7 @@
#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/AddressImpl.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/log/Statement.h"
#include <vector>
#include <set>
@@ -64,6 +65,10 @@ const std::string DURABLE("durable");
const std::string NAME("name");
const std::string RELIABILITY("reliability");
const std::string SELECTOR("selector");
+const std::string FILTER("filter");
+const std::string DESCRIPTOR("descriptor");
+const std::string VALUE("value");
+const std::string SUBJECT_FILTER("subject-filter");
//distribution modes:
const std::string MOVE("move");
@@ -100,7 +105,15 @@ pn_bytes_t convert(const std::string& s)
result.size = s.size();
return result;
}
+bool hasWildcards(const std::string& key)
+{
+ return key.find('*') != std::string::npos || key.find('#') != std::string::npos;
+}
+uint64_t getFilterDescriptor(const std::string& key)
+{
+ return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
+}
bool contains(const Variant::List& list, const std::string& item)
{
for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
@@ -183,6 +196,58 @@ void flatten(Variant::Map& base, const std::string& nested)
base.erase(i);
}
}
+void write(pn_data_t* data, const Variant& value);
+
+void write(pn_data_t* data, const Variant::Map& map)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ pn_data_put_string(data, convert(i->first));
+ write(data, i->second);
+ }
+ pn_data_exit(data);
+}
+void write(pn_data_t* data, const Variant::List& list)
+{
+ pn_data_put_list(data);
+ pn_data_enter(data);
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ write(data, *i);
+ }
+ pn_data_exit(data);
+}
+void write(pn_data_t* data, const Variant& value)
+{
+ switch (value.getType()) {
+ case qpid::types::VAR_VOID:
+ pn_data_put_null(data);
+ break;
+ case qpid::types::VAR_BOOL:
+ pn_data_put_bool(data, value.asBool());
+ break;
+ case qpid::types::VAR_UINT64:
+ pn_data_put_ulong(data, value.asUint64());
+ break;
+ case qpid::types::VAR_INT64:
+ pn_data_put_long(data, value.asInt64());
+ break;
+ case qpid::types::VAR_DOUBLE:
+ pn_data_put_double(data, value.asDouble());
+ break;
+ case qpid::types::VAR_STRING:
+ pn_data_put_string(data, convert(value.asString()));
+ break;
+ case qpid::types::VAR_MAP:
+ write(data, value.asMap());
+ break;
+ case qpid::types::VAR_LIST:
+ write(data, value.asList());
+ break;
+ default:
+ break;
+ }
+}
}
AddressHelper::AddressHelper(const Address& address) :
@@ -241,6 +306,67 @@ AddressHelper::AddressHelper(const Address& address) :
if (properties.size() && !(isTemporary || createPolicy.size())) {
QPID_LOG(warning, "Properties will be ignored! " << address);
}
+
+ qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR);
+ if (selector != link.end()) {
+ addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second);
+ }
+ if (!address.getSubject().empty()) {
+ addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject());
+ }
+ qpid::types::Variant::Map::const_iterator filter = link.find(FILTER);
+ if (filter != link.end()) {
+ if (filter->second.getType() == qpid::types::VAR_MAP) {
+ addFilter(filter->second.asMap());
+ } else if (filter->second.getType() == qpid::types::VAR_LIST) {
+ addFilters(filter->second.asList());
+ } else {
+ throw qpid::messaging::AddressError("Filter must be a map or a list of maps, each containing name, descriptor and value.");
+ }
+ }
+}
+
+void AddressHelper::addFilters(const qpid::types::Variant::List& f)
+{
+ for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) {
+ addFilter(i->asMap());
+ }
+}
+
+void AddressHelper::addFilter(const qpid::types::Variant::Map& f)
+{
+ qpid::types::Variant::Map::const_iterator name = f.find(NAME);
+ qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR);
+ qpid::types::Variant::Map::const_iterator value = f.find(VALUE);
+ //all fields are required at present (may relax this at a later stage):
+ if (name == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify name");
+ }
+ if (descriptor == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify descriptor");
+ }
+ if (value == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify value");
+ }
+ try {
+ addFilter(name->second.asString(), descriptor->second.asUint64(), value->second);
+ } catch (const qpid::types::InvalidConversion&) {
+ addFilter(name->second.asString(), descriptor->second.asString(), value->second);
+ }
+
+}
+
+AddressHelper::Filter::Filter() : descriptorCode(0){}
+AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant& v) : name(n), descriptorCode(d), value(v) {}
+AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant& v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v) {}
+
+void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value)
+{
+ filters.push_back(Filter(name, descriptor, value));
+}
+void AddressHelper::addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value)
+{
+ filters.push_back(Filter(name, descriptor, value));
}
void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
@@ -331,6 +457,26 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
if (mode == FOR_RECEIVER && browse) {
//when PROTON-139 is resolved, set the required delivery-mode
}
+ //set filter(s):
+ if (mode == FOR_RECEIVER && !filters.empty()) {
+ pn_data_t* filter = pn_terminus_filter(terminus);
+ pn_data_put_map(filter);
+ pn_data_enter(filter);
+ for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
+ pn_data_put_symbol(filter, convert(i->name));
+ pn_data_put_described(filter);
+ pn_data_enter(filter);
+ if (i->descriptorSymbol.size()) {
+ pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+ } else {
+ pn_data_put_ulong(filter, i->descriptorCode);
+ }
+ write(filter, i->value);
+ pn_data_exit(filter);
+ }
+ pn_data_exit(filter);
+ }
+
}
void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
@@ -401,6 +547,7 @@ Verifier::Verifier()
link[X_DECLARE] = true;
link[X_BINDINGS] = true;
link[SELECTOR] = true;
+ link[FILTER] = true;
defined[LINK] = link;
}
void Verifier::verify(const Address& address) const