summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-06-12 17:56:12 +0000
committerGordon Sim <gsim@apache.org>2013-06-12 17:56:12 +0000
commit4d4276904ea9fa45d36ea8b44b08b60c1816f3d7 (patch)
treec08a3bc48717eb85d8a9e9c572a4f53278eef404
parentec18c64e929a9fb5521a9a5bb0070084237fbd28 (diff)
downloadqpid-python-4d4276904ea9fa45d36ea8b44b08b60c1816f3d7.tar.gz
QPID-4766: Added generic filter support to address handling in qpid::messaging. Added support for legacy-headers-binding and newly defined xquery filters.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1492310 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/amqp/MapReader.cpp72
-rw-r--r--cpp/src/qpid/amqp/MapReader.h6
-rw-r--r--cpp/src/qpid/amqp/descriptors.h4
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.cpp305
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.h75
-rw-r--r--cpp/src/qpid/broker/amqp/Session.cpp23
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.cpp147
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.h19
-rw-r--r--cpp/src/qpid/messaging/amqp/ReceiverContext.cpp47
-rw-r--r--specs/apache-filters.xml18
10 files changed, 566 insertions, 150 deletions
diff --git a/cpp/src/qpid/amqp/MapReader.cpp b/cpp/src/qpid/amqp/MapReader.cpp
index 2bace74d34..aff885c5d3 100644
--- a/cpp/src/qpid/amqp/MapReader.cpp
+++ b/cpp/src/qpid/amqp/MapReader.cpp
@@ -30,7 +30,7 @@ void MapReader::onNull(const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onNullValue(key, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -40,7 +40,7 @@ void MapReader::onBoolean(bool v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onBooleanValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -51,7 +51,7 @@ void MapReader::onUByte(uint8_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onUByteValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -62,7 +62,7 @@ void MapReader::onUShort(uint16_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onUShortValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -73,7 +73,7 @@ void MapReader::onUInt(uint32_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onUIntValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -84,7 +84,7 @@ void MapReader::onULong(uint64_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onULongValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -95,7 +95,7 @@ void MapReader::onByte(int8_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onByteValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -106,7 +106,7 @@ void MapReader::onShort(int16_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onShortValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -117,7 +117,7 @@ void MapReader::onInt(int32_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onIntValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -128,7 +128,7 @@ void MapReader::onLong(int64_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onLongValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -139,7 +139,7 @@ void MapReader::onFloat(float v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onFloatValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -150,7 +150,7 @@ void MapReader::onDouble(double v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onDoubleValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -161,7 +161,7 @@ void MapReader::onUuid(const CharSequence& v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onUuidValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -172,7 +172,7 @@ void MapReader::onTimestamp(int64_t v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onTimestampValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -183,7 +183,7 @@ void MapReader::onBinary(const CharSequence& v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onBinaryValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -194,9 +194,13 @@ void MapReader::onString(const CharSequence& v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onStringValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
- throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str()));
+ if (keyType & STRING_KEY) {
+ key = v;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str()));
+ }
}
}
@@ -205,9 +209,13 @@ void MapReader::onSymbol(const CharSequence& v, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onSymbolValue(key, v, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
- key = v;
+ if (keyType & SYMBOL_KEY) {
+ key = v;
+ } else {
+ throw qpid::Exception(QPID_MSG("Expecting string as key, got symbol " << v.str()));
+ }
}
}
@@ -216,7 +224,7 @@ bool MapReader::onStartList(uint32_t count, const CharSequence&, const Descripto
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
bool step = onStartListValue(key, count, d);
- key.data = 0; key.size = 0;
+ clearKey();
return step;
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
@@ -229,7 +237,7 @@ bool MapReader::onStartMap(uint32_t count, const CharSequence&, const Descriptor
if (level++) {
if (key) {
bool step = onStartMapValue(key, count, d);
- key.data = 0; key.size = 0;
+ clearKey();
return step;
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
@@ -243,7 +251,7 @@ bool MapReader::onStartArray(uint32_t count, const CharSequence&, const Construc
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
bool step = onStartArrayValue(key, count, c, d);
- key.data = 0; key.size = 0;
+ clearKey();
return step;
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
@@ -256,7 +264,7 @@ void MapReader::onEndList(uint32_t count, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onEndListValue(key, count, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
@@ -266,7 +274,7 @@ void MapReader::onEndMap(uint32_t count, const Descriptor* d)
{
if (--level) {
onEndMapValue(key, count, d);
- key.data = 0; key.size = 0;
+ clearKey();
}
}
@@ -275,15 +283,27 @@ void MapReader::onEndArray(uint32_t count, const Descriptor* d)
if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
if (key) {
onEndArrayValue(key, count, d);
- key.data = 0; key.size = 0;
+ clearKey();
} else {
throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
}
}
-MapReader::MapReader() : level(0)
+MapReader::MapReader() : level(0), keyType(SYMBOL_KEY)
+{
+ clearKey();
+}
+
+void MapReader::setAllowedKeyType(int t)
+{
+ keyType = t;
+}
+
+void MapReader::clearKey()
{
key.data = 0; key.size = 0;
}
+const int MapReader::SYMBOL_KEY(1);
+const int MapReader::STRING_KEY(2);
}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/amqp/MapReader.h b/cpp/src/qpid/amqp/MapReader.h
index fe8f65b30c..cb977e1326 100644
--- a/cpp/src/qpid/amqp/MapReader.h
+++ b/cpp/src/qpid/amqp/MapReader.h
@@ -95,9 +95,15 @@ class MapReader : public Reader
void onEndArray(uint32_t /*count*/, const Descriptor*);
MapReader();
+ static const int SYMBOL_KEY;
+ static const int STRING_KEY;
+ void setAllowedKeyType(int);
private:
CharSequence key;
size_t level;
+ int keyType;
+
+ void clearKey();
};
}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/amqp/descriptors.h b/cpp/src/qpid/amqp/descriptors.h
index 11bb7df6bb..6545433947 100644
--- a/cpp/src/qpid/amqp/descriptors.h
+++ b/cpp/src/qpid/amqp/descriptors.h
@@ -78,11 +78,15 @@ const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE);
namespace filters {
const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-topic-binding:string");
+const std::string LEGACY_HEADERS_FILTER_SYMBOL("apache.org:legacy-amqp-headers-binding:map");
const std::string SELECTOR_FILTER_SYMBOL("apache.org:selector-filter:string");
+const std::string XQUERY_FILTER_SYMBOL("apache.org:xquery-filter:string");
const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000ULL);
const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001ULL);
+const uint64_t LEGACY_HEADERS_FILTER_CODE(0x0000468C00000002ULL);
const uint64_t SELECTOR_FILTER_CODE(0x0000468C00000004ULL);
+const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL);
}
}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp
index bbe3330939..48d9334387 100644
--- a/cpp/src/qpid/broker/amqp/Filter.cpp
+++ b/cpp/src/qpid/broker/amqp/Filter.cpp
@@ -20,9 +20,12 @@
*/
#include "qpid/broker/amqp/Filter.h"
#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/amqp/Outgoing.h"
#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/log/Statement.h"
extern "C" {
#include <proton/engine.h>
@@ -31,6 +34,18 @@ extern "C" {
namespace qpid {
namespace broker {
namespace amqp {
+namespace {
+const std::string XQUERY("xquery");
+const std::string XML("xml");
+const std::string DEFAULT_SUBJECT_FILTER("default-subject-filter");
+const std::string DEFAULT_HEADERS_FILTER("default-headers-filter");
+const std::string XMATCH("x-match");
+const std::string ALL("all");
+const std::string DEFAULT_XQUERY_FILTER("default-xquery-filter");
+const std::string DEFAULT_XQUERY_VALUE("true()");
+const std::string WILDCARD("#");
+}
+Filter::Filter() : inHeadersMap(false) {}
void Filter::read(pn_data_t* data)
{
@@ -44,52 +59,141 @@ void Filter::read(pn_data_t* data)
void Filter::write(pn_data_t* data)
{
- pn_data_put_map(data);
- pn_data_enter(data);
- subjectFilter.write(data);
- pn_data_exit(data);
+ if (!active.empty()) {
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ for (std::vector<FilterBase*>::const_iterator i = active.begin(); i != active.end(); ++i) {
+ (*i)->write(data);
+ }
+ pn_data_exit(data);
+ }
}
void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor)
{
- StringFilter filter;
- filter.key = std::string(key.data, key.size);
- filter.value = std::string(value.data, value.size);
- if (descriptor) {
- filter.described = true;
- filter.descriptor = *descriptor;
- if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)
- || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
- setSubjectFilter(filter);
- } else if (descriptor->match(qpid::amqp::filters::SELECTOR_FILTER_SYMBOL, qpid::amqp::filters::SELECTOR_FILTER_CODE)) {
- setSelectorFilter(filter);
+ if (inHeadersMap) {
+ headersFilter.value[std::string(key.data, key.size)] = std::string(value.data, value.size);
+ } else {
+ StringFilter filter;
+ filter.key = std::string(key.data, key.size);
+ filter.value = std::string(value.data, value.size);
+ if (descriptor) {
+ filter.described = true;
+ filter.descriptor = *descriptor;
+ if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)
+ || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
+ setFilter(subjectFilter, filter);
+ } else if (descriptor->match(qpid::amqp::filters::SELECTOR_FILTER_SYMBOL, qpid::amqp::filters::SELECTOR_FILTER_CODE)) {
+ setFilter(selectorFilter, filter);
+ } else if (descriptor->match(qpid::amqp::filters::XQUERY_FILTER_SYMBOL, qpid::amqp::filters::XQUERY_FILTER_CODE)) {
+ setFilter(xqueryFilter, filter);
+ } else {
+ QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor);
+ }
} else {
- QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor);
+ setFilter(subjectFilter, filter);
}
- } else {
- setSubjectFilter(filter);
}
}
+void Filter::onNullValue(const qpid::amqp::CharSequence& key, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = qpid::types::Variant();
+}
+void Filter::onBooleanValue(const qpid::amqp::CharSequence& key, bool value, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
-bool Filter::hasSubjectFilter() const
+void Filter::onUByteValue(const qpid::amqp::CharSequence& key, uint8_t value, const qpid::amqp::Descriptor*)
{
- return !subjectFilter.value.empty();
+ headersFilter.value[std::string(key.data, key.size)] = value;
}
-std::string Filter::getSubjectFilter() const
+void Filter::onUShortValue(const qpid::amqp::CharSequence& key, uint16_t value, const qpid::amqp::Descriptor*)
{
- return subjectFilter.value;
+ headersFilter.value[std::string(key.data, key.size)] = value;
}
+void Filter::onUIntValue(const qpid::amqp::CharSequence& key, uint32_t value, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
-void Filter::setSubjectFilter(const StringFilter& filter)
+void Filter::onULongValue(const qpid::amqp::CharSequence& key, uint64_t value, const qpid::amqp::Descriptor*)
{
- if (hasSubjectFilter()) {
- QPID_LOG(notice, "Skipping filter with key " << filter.key << "; subject filter already set");
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onByteValue(const qpid::amqp::CharSequence& key, int8_t value, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onShortValue(const qpid::amqp::CharSequence& key, int16_t value, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onIntValue(const qpid::amqp::CharSequence& key, int32_t value, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onLongValue(const qpid::amqp::CharSequence& key, int64_t value, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onFloatValue(const qpid::amqp::CharSequence& key, float value, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onDoubleValue(const qpid::amqp::CharSequence& key, double value, const qpid::amqp::Descriptor*)
+{
+ headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+bool Filter::onStartMapValue(const qpid::amqp::CharSequence& key, uint32_t /*count*/, const qpid::amqp::Descriptor* descriptor)
+{
+ if (inHeadersMap) {
+ QPID_LOG(warning, "Skipping illegal nested map data in headers filter");
+ return false;
+ } else if (descriptor && descriptor->match(qpid::amqp::filters::LEGACY_HEADERS_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_HEADERS_FILTER_CODE)) {
+ inHeadersMap = true;
+ setAllowedKeyType(STRING_KEY);
+ headersFilter.requested = true;
+ headersFilter.described = true;
+ headersFilter.descriptor = *descriptor;
+ headersFilter.key = std::string(key.data, key.size);
+ return true;
} else {
- subjectFilter = filter;
+ if (descriptor) {
+ QPID_LOG(info, "Skipping unrecognised map data in filter: " << *descriptor);
+ } else {
+ QPID_LOG(info, "Skipping undescribed map data in filter");
+ }
+ return false;
}
}
+void Filter::onEndMapValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*)
+{
+ if (inHeadersMap) {
+ inHeadersMap = false;
+ setAllowedKeyType(SYMBOL_KEY);
+ }
+}
+
+bool Filter::hasSubjectFilter() const
+{
+ return !subjectFilter.value.empty();
+}
+
+std::string Filter::getSubjectFilter() const
+{
+ return subjectFilter.value;
+}
+
bool Filter::hasSelectorFilter() const
{
@@ -101,22 +205,106 @@ std::string Filter::getSelectorFilter() const
return selectorFilter.value;
}
+void Filter::setFilter(StringFilter& lhs, const StringFilter& rhs)
+{
+ if (!lhs.value.empty()) {
+ QPID_LOG(notice, "Skipping filter with key " << rhs.key << "; value provided for " << lhs.key << " already");
+ } else {
+ lhs = rhs;
+ lhs.requested = true;
+ }
+}
-void Filter::setSelectorFilter(const StringFilter& filter)
+void Filter::apply(boost::shared_ptr<Outgoing> queue)
{
+ if (hasSubjectFilter()) {
+ queue->setSubjectFilter(getSubjectFilter());
+ active.push_back(&subjectFilter);
+ }
if (hasSelectorFilter()) {
- QPID_LOG(notice, "Skipping filter with key " << filter.key << "; selector filter already set");
- } else {
- selectorFilter = filter;
+ queue->setSelectorFilter(getSelectorFilter());
+ active.push_back(&selectorFilter);
+ }
+}
+
+void Filter::configure(QueueSettings& settings)
+{
+ if (hasSelectorFilter()) {
+ settings.filter = getSelectorFilter();
+ active.push_back(&selectorFilter);
}
}
void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
{
- subjectFilter.bind(exchange, queue);
+ qpid::framing::FieldTable bindingArgs;
+ if (exchange->getType() == TopicExchange::typeName) {
+ setDefaultSubjectFilter(true);
+ active.push_back(&subjectFilter);
+ } else if (exchange->getType() == DirectExchange::typeName) {
+ if (!setDefaultSubjectFilter() && adjustDirectFilter()) {
+ QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName());
+ }
+ active.push_back(&subjectFilter);
+ } else if (exchange->getType() == HeadersExchange::typeName) {
+ setDefaultHeadersFilter();
+ qpid::amqp_0_10::translate(headersFilter.value, bindingArgs);
+ active.push_back(&headersFilter);
+ } else if (exchange->getType() == XML) {
+ setDefaultXQueryFilter();
+ setDefaultSubjectFilter();
+ bindingArgs.setString(XQUERY, xqueryFilter.value);
+ active.push_back(&subjectFilter);
+ active.push_back(&xqueryFilter);
+ }
+ exchange->bind(queue, subjectFilter.value, &bindingArgs);
+}
+
+void Filter::setDefaultXQueryFilter()
+{
+ if (!xqueryFilter.requested) {
+ xqueryFilter.key = DEFAULT_XQUERY_FILTER;
+ xqueryFilter.value = DEFAULT_XQUERY_VALUE;
+ xqueryFilter.setDescriptor(qpid::amqp::Descriptor(qpid::amqp::filters::XQUERY_FILTER_CODE));
+ }
+}
+void Filter::setDefaultHeadersFilter()
+{
+ if (!headersFilter.requested) {
+ headersFilter.key = DEFAULT_HEADERS_FILTER;
+ headersFilter.value[XMATCH] = ALL;
+ headersFilter.setDescriptor(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_HEADERS_FILTER_CODE));
+ }
}
-Filter::StringFilter::StringFilter() : described(false), descriptor(0) {}
+bool Filter::setDefaultSubjectFilter(const qpid::amqp::Descriptor& d, const std::string& value)
+{
+ if (!subjectFilter.requested) {
+ subjectFilter.key = DEFAULT_SUBJECT_FILTER;
+ subjectFilter.value = value;
+ subjectFilter.setDescriptor(d);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool Filter::setDefaultSubjectFilter(bool wildcards)
+{
+ if (wildcards) {
+ return setDefaultSubjectFilter(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE), WILDCARD);
+ } else {
+ return setDefaultSubjectFilter(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE));
+ }
+}
+
+Filter::FilterBase::FilterBase() : described(false), requested(false), descriptor(0) {}
+Filter::FilterBase::~FilterBase() {}
+void Filter::FilterBase::setDescriptor(const qpid::amqp::Descriptor& d)
+{
+ described = true;
+ descriptor = d;
+}
namespace {
pn_bytes_t convert(const std::string& in)
{
@@ -132,8 +320,30 @@ pn_bytes_t convert(const qpid::amqp::CharSequence& in)
out.size = in.size;
return out;
}
+qpid::amqp::Descriptor symbolicDescriptor(const std::string& symbol)
+{
+ qpid::amqp::CharSequence cs;
+ cs.data = symbol.data();
+ cs.size = symbol.size();
+ return qpid::amqp::Descriptor(cs);
+}
}
-void Filter::StringFilter::write(pn_data_t* data)
+
+bool Filter::adjustDirectFilter()
+{
+ if (subjectFilter.descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
+ if (subjectFilter.descriptor.type == qpid::amqp::Descriptor::NUMERIC) {
+ subjectFilter.descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE);
+ } else {
+ subjectFilter.descriptor = symbolicDescriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL);
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void Filter::FilterBase::write(pn_data_t* data)
{
pn_data_put_symbol(data, convert(key));
if (described) {
@@ -147,26 +357,27 @@ void Filter::StringFilter::write(pn_data_t* data)
pn_data_put_symbol(data, convert(descriptor.value.symbol));
break;
}
+ writeValue(data);
+ pn_data_exit(data);
+ } else {
+ writeValue(data);
}
+}
+void Filter::StringFilter::writeValue(pn_data_t* data)
+{
pn_data_put_string(data, convert(value));
- if (described) pn_data_exit(data);
}
-void Filter::StringFilter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
+void Filter::MapFilter::writeValue(pn_data_t* data)
{
- if (described && exchange->getType() == DirectExchange::typeName
- && descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
- QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName());
- if (descriptor.type == qpid::amqp::Descriptor::NUMERIC) {
- descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE);
- } else {
- qpid::amqp::CharSequence symbol;
- symbol.data = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.data();
- symbol.size = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.size();
- descriptor = qpid::amqp::Descriptor(symbol);
- }
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ for (ValueType::const_iterator i = value.begin(); i != value.end(); ++i) {
+ pn_data_put_string(data, convert(i->first));
+ pn_data_put_string(data, convert(i->second));//TODO: other types?
}
- exchange->bind(queue, value, 0);
+ pn_data_exit(data);
}
+
}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h
index 72f92b57a5..a76928eb01 100644
--- a/cpp/src/qpid/broker/amqp/Filter.h
+++ b/cpp/src/qpid/broker/amqp/Filter.h
@@ -23,6 +23,9 @@
*/
#include "qpid/amqp/MapReader.h"
#include "qpid/amqp/Descriptor.h"
+#include "qpid/types/Variant.h"
+#include <map>
+#include <vector>
#include <boost/shared_ptr.hpp>
struct pn_data_t;
@@ -30,37 +33,87 @@ namespace qpid {
namespace broker {
class Exchange;
class Queue;
+class QueueSettings;
namespace amqp {
-
+class Outgoing;
class Filter : qpid::amqp::MapReader
{
public:
+ Filter();
void read(pn_data_t*);
void write(pn_data_t*);
- bool hasSubjectFilter() const;
- std::string getSubjectFilter() const;
- bool hasSelectorFilter() const;
- std::string getSelectorFilter() const;
+
+ /**
+ * Apply filters where source is a queue
+ */
+ void apply(boost::shared_ptr<Outgoing> queue);
+
+ /**
+ * Configure subscription queue for case where source is an exchange
+ */
+ void configure(QueueSettings&);
+ /**
+ * Bind subscription queue for case where source is an exchange
+ */
void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
private:
- struct StringFilter
+ struct FilterBase
{
bool described;
+ bool requested;
qpid::amqp::Descriptor descriptor;
std::string key;
- std::string value;
- StringFilter();
+ FilterBase();
+ virtual ~FilterBase();
void write(pn_data_t*);
- void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
+ virtual void writeValue(pn_data_t*) = 0;
+ void setDescriptor(const qpid::amqp::Descriptor&);
+ };
+ struct StringFilter : FilterBase
+ {
+ std::string value;
+ void writeValue(pn_data_t*);
+ };
+ struct MapFilter : FilterBase
+ {
+ typedef std::map<std::string, qpid::types::Variant> ValueType;
+ ValueType value;
+ void writeValue(pn_data_t*);
};
void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor);
- void setSubjectFilter(const StringFilter&);
- void setSelectorFilter(const StringFilter&);
+ void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+ void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*);
+ void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*);
+ void onUShortValue(const qpid::amqp::CharSequence&, uint16_t, const qpid::amqp::Descriptor*);
+ void onUIntValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*);
+ void onULongValue(const qpid::amqp::CharSequence&, uint64_t, const qpid::amqp::Descriptor*);
+ void onByteValue(const qpid::amqp::CharSequence&, int8_t, const qpid::amqp::Descriptor*);
+ void onShortValue(const qpid::amqp::CharSequence&, int16_t, const qpid::amqp::Descriptor*);
+ void onIntValue(const qpid::amqp::CharSequence&, int32_t, const qpid::amqp::Descriptor*);
+ void onLongValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*);
+ void onFloatValue(const qpid::amqp::CharSequence&, float, const qpid::amqp::Descriptor*);
+ void onDoubleValue(const qpid::amqp::CharSequence&, double, const qpid::amqp::Descriptor*);
+ bool onStartMapValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor* descriptor);
+ void onEndMapValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor* descriptor);
+ void setFilter(StringFilter&, const StringFilter&);
+ bool hasSubjectFilter() const;
+ std::string getSubjectFilter() const;
+ bool hasSelectorFilter() const;
+ std::string getSelectorFilter() const;
+ bool setDefaultSubjectFilter(bool wildcards=false);
+ bool setDefaultSubjectFilter(const qpid::amqp::Descriptor& descriptor, const std::string& value=std::string());
+ bool adjustDirectFilter();
+ void setDefaultHeadersFilter();
+ void setDefaultXQueryFilter();
StringFilter subjectFilter;
StringFilter selectorFilter;
+ StringFilter xqueryFilter;
+ MapFilter headersFilter;
+ std::vector<FilterBase*> active;
+ bool inHeadersMap;
};
}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp
index adde339fa5..8d07a23499 100644
--- a/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/cpp/src/qpid/broker/amqp/Session.cpp
@@ -278,34 +278,20 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (node.queue) {
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false));
q->init();
- if (filter.hasSubjectFilter()) {
- q->setSubjectFilter(filter.getSubjectFilter());
- }
- if (filter.hasSelectorFilter()) {
- q->setSelectorFilter(filter.getSelectorFilter());
- }
+ filter.apply(q);
outgoing[link] = q;
} else if (node.exchange) {
bool durable = pn_terminus_get_durability(source);
QueueSettings settings(durable, !durable);
- if (filter.hasSelectorFilter()) {
- settings.filter = filter.getSelectorFilter();
- QPID_LOG(debug, "Selector specified for outgoing link from exchange " << node.exchange->getName() << ": " << settings.filter);
- }
+ filter.configure(settings);
//TODO: populate settings from source details when available from engine
std::stringstream queueName;//combination of container id and link name is unique
queueName << connection.getContainerId() << "_" << pn_link_name(link);
boost::shared_ptr<qpid::broker::Queue> queue
= broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first;
queue->setExclusiveOwner(this);
- if (filter.hasSubjectFilter()) {
- filter.bind(node.exchange, queue);
- filter.write(pn_terminus_filter(pn_link_source(link)));
- } else if (node.exchange->getType() == TopicExchange::typeName) {
- node.exchange->bind(queue, "#", 0);
- } else {
- node.exchange->bind(queue, std::string(), 0);
- }
+
+ filter.bind(node.exchange, queue);
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true));
outgoing[link] = q;
q->init();
@@ -317,6 +303,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
}
+ filter.write(pn_terminus_filter(pn_link_source(link)));
QPID_LOG(debug, "Outgoing link attached");
}
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 258b4b84bd..f3a42b201b 100644
--- a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -21,6 +21,7 @@
#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/AddressImpl.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/log/Statement.h"
#include <vector>
#include <set>
@@ -64,6 +65,10 @@ const std::string DURABLE("durable");
const std::string NAME("name");
const std::string RELIABILITY("reliability");
const std::string SELECTOR("selector");
+const std::string FILTER("filter");
+const std::string DESCRIPTOR("descriptor");
+const std::string VALUE("value");
+const std::string SUBJECT_FILTER("subject-filter");
//distribution modes:
const std::string MOVE("move");
@@ -100,7 +105,15 @@ pn_bytes_t convert(const std::string& s)
result.size = s.size();
return result;
}
+bool hasWildcards(const std::string& key)
+{
+ return key.find('*') != std::string::npos || key.find('#') != std::string::npos;
+}
+uint64_t getFilterDescriptor(const std::string& key)
+{
+ return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
+}
bool contains(const Variant::List& list, const std::string& item)
{
for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
@@ -183,6 +196,58 @@ void flatten(Variant::Map& base, const std::string& nested)
base.erase(i);
}
}
+void write(pn_data_t* data, const Variant& value);
+
+void write(pn_data_t* data, const Variant::Map& map)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ pn_data_put_string(data, convert(i->first));
+ write(data, i->second);
+ }
+ pn_data_exit(data);
+}
+void write(pn_data_t* data, const Variant::List& list)
+{
+ pn_data_put_list(data);
+ pn_data_enter(data);
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ write(data, *i);
+ }
+ pn_data_exit(data);
+}
+void write(pn_data_t* data, const Variant& value)
+{
+ switch (value.getType()) {
+ case qpid::types::VAR_VOID:
+ pn_data_put_null(data);
+ break;
+ case qpid::types::VAR_BOOL:
+ pn_data_put_bool(data, value.asBool());
+ break;
+ case qpid::types::VAR_UINT64:
+ pn_data_put_ulong(data, value.asUint64());
+ break;
+ case qpid::types::VAR_INT64:
+ pn_data_put_long(data, value.asInt64());
+ break;
+ case qpid::types::VAR_DOUBLE:
+ pn_data_put_double(data, value.asDouble());
+ break;
+ case qpid::types::VAR_STRING:
+ pn_data_put_string(data, convert(value.asString()));
+ break;
+ case qpid::types::VAR_MAP:
+ write(data, value.asMap());
+ break;
+ case qpid::types::VAR_LIST:
+ write(data, value.asList());
+ break;
+ default:
+ break;
+ }
+}
}
AddressHelper::AddressHelper(const Address& address) :
@@ -241,6 +306,67 @@ AddressHelper::AddressHelper(const Address& address) :
if (properties.size() && !(isTemporary || createPolicy.size())) {
QPID_LOG(warning, "Properties will be ignored! " << address);
}
+
+ qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR);
+ if (selector != link.end()) {
+ addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second);
+ }
+ if (!address.getSubject().empty()) {
+ addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject());
+ }
+ qpid::types::Variant::Map::const_iterator filter = link.find(FILTER);
+ if (filter != link.end()) {
+ if (filter->second.getType() == qpid::types::VAR_MAP) {
+ addFilter(filter->second.asMap());
+ } else if (filter->second.getType() == qpid::types::VAR_LIST) {
+ addFilters(filter->second.asList());
+ } else {
+ throw qpid::messaging::AddressError("Filter must be a map or a list of maps, each containing name, descriptor and value.");
+ }
+ }
+}
+
+void AddressHelper::addFilters(const qpid::types::Variant::List& f)
+{
+ for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) {
+ addFilter(i->asMap());
+ }
+}
+
+void AddressHelper::addFilter(const qpid::types::Variant::Map& f)
+{
+ qpid::types::Variant::Map::const_iterator name = f.find(NAME);
+ qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR);
+ qpid::types::Variant::Map::const_iterator value = f.find(VALUE);
+ //all fields are required at present (may relax this at a later stage):
+ if (name == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify name");
+ }
+ if (descriptor == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify descriptor");
+ }
+ if (value == f.end()) {
+ throw qpid::messaging::AddressError("Filter entry must specify value");
+ }
+ try {
+ addFilter(name->second.asString(), descriptor->second.asUint64(), value->second);
+ } catch (const qpid::types::InvalidConversion&) {
+ addFilter(name->second.asString(), descriptor->second.asString(), value->second);
+ }
+
+}
+
+AddressHelper::Filter::Filter() : descriptorCode(0){}
+AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant& v) : name(n), descriptorCode(d), value(v) {}
+AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant& v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v) {}
+
+void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value)
+{
+ filters.push_back(Filter(name, descriptor, value));
+}
+void AddressHelper::addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value)
+{
+ filters.push_back(Filter(name, descriptor, value));
}
void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
@@ -331,6 +457,26 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
if (mode == FOR_RECEIVER && browse) {
//when PROTON-139 is resolved, set the required delivery-mode
}
+ //set filter(s):
+ if (mode == FOR_RECEIVER && !filters.empty()) {
+ pn_data_t* filter = pn_terminus_filter(terminus);
+ pn_data_put_map(filter);
+ pn_data_enter(filter);
+ for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
+ pn_data_put_symbol(filter, convert(i->name));
+ pn_data_put_described(filter);
+ pn_data_enter(filter);
+ if (i->descriptorSymbol.size()) {
+ pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+ } else {
+ pn_data_put_ulong(filter, i->descriptorCode);
+ }
+ write(filter, i->value);
+ pn_data_exit(filter);
+ }
+ pn_data_exit(filter);
+ }
+
}
void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
@@ -401,6 +547,7 @@ Verifier::Verifier()
link[X_DECLARE] = true;
link[X_BINDINGS] = true;
link[SELECTOR] = true;
+ link[FILTER] = true;
defined[LINK] = link;
}
void Verifier::verify(const Address& address) const
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.h b/cpp/src/qpid/messaging/amqp/AddressHelper.h
index d03370b597..2562adfa1e 100644
--- a/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -22,6 +22,7 @@
*
*/
#include "qpid/types/Variant.h"
+#include <vector>
struct pn_terminus_t;
@@ -29,7 +30,6 @@ namespace qpid {
namespace messaging {
class Address;
namespace amqp {
-
class AddressHelper
{
public:
@@ -43,6 +43,18 @@ class AddressHelper
const qpid::types::Variant::Map& getLinkProperties() const;
static std::string getLinkName(const Address& address);
private:
+ struct Filter
+ {
+ std::string name;
+ std::string descriptorSymbol;
+ uint64_t descriptorCode;
+ qpid::types::Variant value;
+
+ Filter();
+ Filter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value);
+ Filter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value);
+ };
+
bool isTemporary;
std::string createPolicy;
std::string assertPolicy;
@@ -56,12 +68,17 @@ class AddressHelper
bool durableNode;
bool durableLink;
bool browse;
+ std::vector<Filter> filters;
bool enabled(const std::string& policy, CheckMode mode) const;
bool createEnabled(CheckMode mode) const;
bool assertEnabled(CheckMode mode) const;
void setCapabilities(pn_terminus_t* terminus, bool create);
void setNodeProperties(pn_terminus_t* terminus);
+ void addFilter(const qpid::types::Variant::Map&);
+ void addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value);
+ void addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value);
+ void addFilters(const qpid::types::Variant::List&);
};
}}} // namespace qpid::messaging::amqp
diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index a495bc25c6..081a0ae78b 100644
--- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -22,7 +22,6 @@
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
-#include "qpid/amqp/descriptors.h"
#include "qpid/log/Statement.h"
extern "C" {
#include <proton/engine.h>
@@ -90,24 +89,6 @@ const std::string& ReceiverContext::getSource() const
{
return address.getName();
}
-namespace {
-pn_bytes_t convert(const std::string& s)
-{
- pn_bytes_t result;
- result.start = const_cast<char*>(s.data());
- result.size = s.size();
- return result;
-}
-bool hasWildcards(const std::string& key)
-{
- return key.find('*') != std::string::npos || key.find('#') != std::string::npos;
-}
-
-uint64_t getFilterDescriptor(const std::string& key)
-{
- return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
-}
-}
void ReceiverContext::verify(pn_terminus_t* source)
{
helper.checkAssertion(source, AddressHelper::FOR_RECEIVER);
@@ -119,34 +100,6 @@ void ReceiverContext::configure()
void ReceiverContext::configure(pn_terminus_t* source)
{
helper.configure(source, AddressHelper::FOR_RECEIVER);
-
- // Look specifically for qpid.selector link property and add a filter for it
- qpid::types::Variant::Map::const_iterator i = helper.getLinkProperties().find("selector");
- if (i!=helper.getLinkProperties().end()) {
- pn_data_t* filter = pn_terminus_filter(source);
- pn_data_put_map(filter);
- pn_data_enter(filter);
- pn_data_put_symbol(filter, convert("selector"));
- pn_data_put_described(filter);
- pn_data_enter(filter);
- pn_data_put_ulong(filter, qpid::amqp::filters::SELECTOR_FILTER_CODE);
- pn_data_put_string(filter, convert(i->second));
- pn_data_exit(filter);
- pn_data_exit(filter);
- }
- if (!address.getSubject().empty()) {
- //filter:
- pn_data_t* filter = pn_terminus_filter(source);
- pn_data_put_map(filter);
- pn_data_enter(filter);
- pn_data_put_symbol(filter, convert("subject"));
- pn_data_put_described(filter);
- pn_data_enter(filter);
- pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
- pn_data_put_string(filter, convert(address.getSubject()));
- pn_data_exit(filter);
- pn_data_exit(filter);
- }
}
Address ReceiverContext::getAddress() const
diff --git a/specs/apache-filters.xml b/specs/apache-filters.xml
index a4e40aa93c..644c796893 100644
--- a/specs/apache-filters.xml
+++ b/specs/apache-filters.xml
@@ -224,4 +224,22 @@ symbol | String
</doc>
</type>
</section>
+ <section name="xquery" title="An xquery based filter">
+ <type class="restricted" name="xquery" source="string" provides="filter">
+ <descriptor name="apache.org:xquery-filter:string" code="0x0000468C:0x00000005"/>
+ <doc>
+ <p>
+ The xquery filter consists of a described string value
+ containing a valid xquery string against which messages
+ are matched.
+ </p>
+ <p>
+ Containers which support the filter defined in this
+ section should advertise the capability
+ <term>APACHE.ORG:XQUERY</term> in their connection
+ capabilities when sending the open performative.
+ </p>
+ </doc>
+ </type>
+ </section>
</amqp>