diff options
author | Gordon Sim <gsim@apache.org> | 2013-06-12 17:56:12 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-06-12 17:56:12 +0000 |
commit | 4d4276904ea9fa45d36ea8b44b08b60c1816f3d7 (patch) | |
tree | c08a3bc48717eb85d8a9e9c572a4f53278eef404 | |
parent | ec18c64e929a9fb5521a9a5bb0070084237fbd28 (diff) | |
download | qpid-python-4d4276904ea9fa45d36ea8b44b08b60c1816f3d7.tar.gz |
QPID-4766: Added generic filter support to address handling in qpid::messaging. Added support for legacy-headers-binding and newly defined xquery filters.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1492310 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/amqp/MapReader.cpp | 72 | ||||
-rw-r--r-- | cpp/src/qpid/amqp/MapReader.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/amqp/descriptors.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Filter.cpp | 305 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Filter.h | 75 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 147 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/AddressHelper.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 47 | ||||
-rw-r--r-- | specs/apache-filters.xml | 18 |
10 files changed, 566 insertions, 150 deletions
diff --git a/cpp/src/qpid/amqp/MapReader.cpp b/cpp/src/qpid/amqp/MapReader.cpp index 2bace74d34..aff885c5d3 100644 --- a/cpp/src/qpid/amqp/MapReader.cpp +++ b/cpp/src/qpid/amqp/MapReader.cpp @@ -30,7 +30,7 @@ void MapReader::onNull(const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onNullValue(key, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -40,7 +40,7 @@ void MapReader::onBoolean(bool v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onBooleanValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -51,7 +51,7 @@ void MapReader::onUByte(uint8_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onUByteValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -62,7 +62,7 @@ void MapReader::onUShort(uint16_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onUShortValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -73,7 +73,7 @@ void MapReader::onUInt(uint32_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onUIntValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -84,7 +84,7 @@ void MapReader::onULong(uint64_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onULongValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -95,7 +95,7 @@ void MapReader::onByte(int8_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onByteValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -106,7 +106,7 @@ void MapReader::onShort(int16_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onShortValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -117,7 +117,7 @@ void MapReader::onInt(int32_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onIntValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -128,7 +128,7 @@ void MapReader::onLong(int64_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onLongValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -139,7 +139,7 @@ void MapReader::onFloat(float v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onFloatValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -150,7 +150,7 @@ void MapReader::onDouble(double v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onDoubleValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -161,7 +161,7 @@ void MapReader::onUuid(const CharSequence& v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onUuidValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -172,7 +172,7 @@ void MapReader::onTimestamp(int64_t v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onTimestampValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -183,7 +183,7 @@ void MapReader::onBinary(const CharSequence& v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onBinaryValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -194,9 +194,13 @@ void MapReader::onString(const CharSequence& v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onStringValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { - throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str())); + if (keyType & STRING_KEY) { + key = v; + } else { + throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str())); + } } } @@ -205,9 +209,13 @@ void MapReader::onSymbol(const CharSequence& v, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onSymbolValue(key, v, d); - key.data = 0; key.size = 0; + clearKey(); } else { - key = v; + if (keyType & SYMBOL_KEY) { + key = v; + } else { + throw qpid::Exception(QPID_MSG("Expecting string as key, got symbol " << v.str())); + } } } @@ -216,7 +224,7 @@ bool MapReader::onStartList(uint32_t count, const CharSequence&, const Descripto if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { bool step = onStartListValue(key, count, d); - key.data = 0; key.size = 0; + clearKey(); return step; } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); @@ -229,7 +237,7 @@ bool MapReader::onStartMap(uint32_t count, const CharSequence&, const Descriptor if (level++) { if (key) { bool step = onStartMapValue(key, count, d); - key.data = 0; key.size = 0; + clearKey(); return step; } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); @@ -243,7 +251,7 @@ bool MapReader::onStartArray(uint32_t count, const CharSequence&, const Construc if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { bool step = onStartArrayValue(key, count, c, d); - key.data = 0; key.size = 0; + clearKey(); return step; } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); @@ -256,7 +264,7 @@ void MapReader::onEndList(uint32_t count, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onEndListValue(key, count, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } @@ -266,7 +274,7 @@ void MapReader::onEndMap(uint32_t count, const Descriptor* d) { if (--level) { onEndMapValue(key, count, d); - key.data = 0; key.size = 0; + clearKey(); } } @@ -275,15 +283,27 @@ void MapReader::onEndArray(uint32_t count, const Descriptor* d) if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum")); if (key) { onEndArrayValue(key, count, d); - key.data = 0; key.size = 0; + clearKey(); } else { throw qpid::Exception(QPID_MSG("Expecting symbol as key")); } } -MapReader::MapReader() : level(0) +MapReader::MapReader() : level(0), keyType(SYMBOL_KEY) +{ + clearKey(); +} + +void MapReader::setAllowedKeyType(int t) +{ + keyType = t; +} + +void MapReader::clearKey() { key.data = 0; key.size = 0; } +const int MapReader::SYMBOL_KEY(1); +const int MapReader::STRING_KEY(2); }} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/MapReader.h b/cpp/src/qpid/amqp/MapReader.h index fe8f65b30c..cb977e1326 100644 --- a/cpp/src/qpid/amqp/MapReader.h +++ b/cpp/src/qpid/amqp/MapReader.h @@ -95,9 +95,15 @@ class MapReader : public Reader void onEndArray(uint32_t /*count*/, const Descriptor*); MapReader(); + static const int SYMBOL_KEY; + static const int STRING_KEY; + void setAllowedKeyType(int); private: CharSequence key; size_t level; + int keyType; + + void clearKey(); }; }} // namespace qpid::amqp diff --git a/cpp/src/qpid/amqp/descriptors.h b/cpp/src/qpid/amqp/descriptors.h index 11bb7df6bb..6545433947 100644 --- a/cpp/src/qpid/amqp/descriptors.h +++ b/cpp/src/qpid/amqp/descriptors.h @@ -78,11 +78,15 @@ const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE); namespace filters { const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string"); const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-topic-binding:string"); +const std::string LEGACY_HEADERS_FILTER_SYMBOL("apache.org:legacy-amqp-headers-binding:map"); const std::string SELECTOR_FILTER_SYMBOL("apache.org:selector-filter:string"); +const std::string XQUERY_FILTER_SYMBOL("apache.org:xquery-filter:string"); const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000ULL); const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001ULL); +const uint64_t LEGACY_HEADERS_FILTER_CODE(0x0000468C00000002ULL); const uint64_t SELECTOR_FILTER_CODE(0x0000468C00000004ULL); +const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL); } }} // namespace qpid::amqp diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp index bbe3330939..48d9334387 100644 --- a/cpp/src/qpid/broker/amqp/Filter.cpp +++ b/cpp/src/qpid/broker/amqp/Filter.cpp @@ -20,9 +20,12 @@ */ #include "qpid/broker/amqp/Filter.h" #include "qpid/broker/amqp/DataReader.h" +#include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/DirectExchange.h" +#include "qpid/broker/HeadersExchange.h" #include "qpid/broker/TopicExchange.h" #include "qpid/amqp/descriptors.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/log/Statement.h" extern "C" { #include <proton/engine.h> @@ -31,6 +34,18 @@ extern "C" { namespace qpid { namespace broker { namespace amqp { +namespace { +const std::string XQUERY("xquery"); +const std::string XML("xml"); +const std::string DEFAULT_SUBJECT_FILTER("default-subject-filter"); +const std::string DEFAULT_HEADERS_FILTER("default-headers-filter"); +const std::string XMATCH("x-match"); +const std::string ALL("all"); +const std::string DEFAULT_XQUERY_FILTER("default-xquery-filter"); +const std::string DEFAULT_XQUERY_VALUE("true()"); +const std::string WILDCARD("#"); +} +Filter::Filter() : inHeadersMap(false) {} void Filter::read(pn_data_t* data) { @@ -44,52 +59,141 @@ void Filter::read(pn_data_t* data) void Filter::write(pn_data_t* data) { - pn_data_put_map(data); - pn_data_enter(data); - subjectFilter.write(data); - pn_data_exit(data); + if (!active.empty()) { + pn_data_put_map(data); + pn_data_enter(data); + for (std::vector<FilterBase*>::const_iterator i = active.begin(); i != active.end(); ++i) { + (*i)->write(data); + } + pn_data_exit(data); + } } void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor) { - StringFilter filter; - filter.key = std::string(key.data, key.size); - filter.value = std::string(value.data, value.size); - if (descriptor) { - filter.described = true; - filter.descriptor = *descriptor; - if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE) - || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) { - setSubjectFilter(filter); - } else if (descriptor->match(qpid::amqp::filters::SELECTOR_FILTER_SYMBOL, qpid::amqp::filters::SELECTOR_FILTER_CODE)) { - setSelectorFilter(filter); + if (inHeadersMap) { + headersFilter.value[std::string(key.data, key.size)] = std::string(value.data, value.size); + } else { + StringFilter filter; + filter.key = std::string(key.data, key.size); + filter.value = std::string(value.data, value.size); + if (descriptor) { + filter.described = true; + filter.descriptor = *descriptor; + if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE) + || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) { + setFilter(subjectFilter, filter); + } else if (descriptor->match(qpid::amqp::filters::SELECTOR_FILTER_SYMBOL, qpid::amqp::filters::SELECTOR_FILTER_CODE)) { + setFilter(selectorFilter, filter); + } else if (descriptor->match(qpid::amqp::filters::XQUERY_FILTER_SYMBOL, qpid::amqp::filters::XQUERY_FILTER_CODE)) { + setFilter(xqueryFilter, filter); + } else { + QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor); + } } else { - QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor); + setFilter(subjectFilter, filter); } - } else { - setSubjectFilter(filter); } } +void Filter::onNullValue(const qpid::amqp::CharSequence& key, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = qpid::types::Variant(); +} +void Filter::onBooleanValue(const qpid::amqp::CharSequence& key, bool value, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = value; +} -bool Filter::hasSubjectFilter() const +void Filter::onUByteValue(const qpid::amqp::CharSequence& key, uint8_t value, const qpid::amqp::Descriptor*) { - return !subjectFilter.value.empty(); + headersFilter.value[std::string(key.data, key.size)] = value; } -std::string Filter::getSubjectFilter() const +void Filter::onUShortValue(const qpid::amqp::CharSequence& key, uint16_t value, const qpid::amqp::Descriptor*) { - return subjectFilter.value; + headersFilter.value[std::string(key.data, key.size)] = value; } +void Filter::onUIntValue(const qpid::amqp::CharSequence& key, uint32_t value, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = value; +} -void Filter::setSubjectFilter(const StringFilter& filter) +void Filter::onULongValue(const qpid::amqp::CharSequence& key, uint64_t value, const qpid::amqp::Descriptor*) { - if (hasSubjectFilter()) { - QPID_LOG(notice, "Skipping filter with key " << filter.key << "; subject filter already set"); + headersFilter.value[std::string(key.data, key.size)] = value; +} + +void Filter::onByteValue(const qpid::amqp::CharSequence& key, int8_t value, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = value; +} + +void Filter::onShortValue(const qpid::amqp::CharSequence& key, int16_t value, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = value; +} + +void Filter::onIntValue(const qpid::amqp::CharSequence& key, int32_t value, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = value; +} + +void Filter::onLongValue(const qpid::amqp::CharSequence& key, int64_t value, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = value; +} + +void Filter::onFloatValue(const qpid::amqp::CharSequence& key, float value, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = value; +} + +void Filter::onDoubleValue(const qpid::amqp::CharSequence& key, double value, const qpid::amqp::Descriptor*) +{ + headersFilter.value[std::string(key.data, key.size)] = value; +} + +bool Filter::onStartMapValue(const qpid::amqp::CharSequence& key, uint32_t /*count*/, const qpid::amqp::Descriptor* descriptor) +{ + if (inHeadersMap) { + QPID_LOG(warning, "Skipping illegal nested map data in headers filter"); + return false; + } else if (descriptor && descriptor->match(qpid::amqp::filters::LEGACY_HEADERS_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_HEADERS_FILTER_CODE)) { + inHeadersMap = true; + setAllowedKeyType(STRING_KEY); + headersFilter.requested = true; + headersFilter.described = true; + headersFilter.descriptor = *descriptor; + headersFilter.key = std::string(key.data, key.size); + return true; } else { - subjectFilter = filter; + if (descriptor) { + QPID_LOG(info, "Skipping unrecognised map data in filter: " << *descriptor); + } else { + QPID_LOG(info, "Skipping undescribed map data in filter"); + } + return false; } } +void Filter::onEndMapValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*) +{ + if (inHeadersMap) { + inHeadersMap = false; + setAllowedKeyType(SYMBOL_KEY); + } +} + +bool Filter::hasSubjectFilter() const +{ + return !subjectFilter.value.empty(); +} + +std::string Filter::getSubjectFilter() const +{ + return subjectFilter.value; +} + bool Filter::hasSelectorFilter() const { @@ -101,22 +205,106 @@ std::string Filter::getSelectorFilter() const return selectorFilter.value; } +void Filter::setFilter(StringFilter& lhs, const StringFilter& rhs) +{ + if (!lhs.value.empty()) { + QPID_LOG(notice, "Skipping filter with key " << rhs.key << "; value provided for " << lhs.key << " already"); + } else { + lhs = rhs; + lhs.requested = true; + } +} -void Filter::setSelectorFilter(const StringFilter& filter) +void Filter::apply(boost::shared_ptr<Outgoing> queue) { + if (hasSubjectFilter()) { + queue->setSubjectFilter(getSubjectFilter()); + active.push_back(&subjectFilter); + } if (hasSelectorFilter()) { - QPID_LOG(notice, "Skipping filter with key " << filter.key << "; selector filter already set"); - } else { - selectorFilter = filter; + queue->setSelectorFilter(getSelectorFilter()); + active.push_back(&selectorFilter); + } +} + +void Filter::configure(QueueSettings& settings) +{ + if (hasSelectorFilter()) { + settings.filter = getSelectorFilter(); + active.push_back(&selectorFilter); } } void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue) { - subjectFilter.bind(exchange, queue); + qpid::framing::FieldTable bindingArgs; + if (exchange->getType() == TopicExchange::typeName) { + setDefaultSubjectFilter(true); + active.push_back(&subjectFilter); + } else if (exchange->getType() == DirectExchange::typeName) { + if (!setDefaultSubjectFilter() && adjustDirectFilter()) { + QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName()); + } + active.push_back(&subjectFilter); + } else if (exchange->getType() == HeadersExchange::typeName) { + setDefaultHeadersFilter(); + qpid::amqp_0_10::translate(headersFilter.value, bindingArgs); + active.push_back(&headersFilter); + } else if (exchange->getType() == XML) { + setDefaultXQueryFilter(); + setDefaultSubjectFilter(); + bindingArgs.setString(XQUERY, xqueryFilter.value); + active.push_back(&subjectFilter); + active.push_back(&xqueryFilter); + } + exchange->bind(queue, subjectFilter.value, &bindingArgs); +} + +void Filter::setDefaultXQueryFilter() +{ + if (!xqueryFilter.requested) { + xqueryFilter.key = DEFAULT_XQUERY_FILTER; + xqueryFilter.value = DEFAULT_XQUERY_VALUE; + xqueryFilter.setDescriptor(qpid::amqp::Descriptor(qpid::amqp::filters::XQUERY_FILTER_CODE)); + } +} +void Filter::setDefaultHeadersFilter() +{ + if (!headersFilter.requested) { + headersFilter.key = DEFAULT_HEADERS_FILTER; + headersFilter.value[XMATCH] = ALL; + headersFilter.setDescriptor(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_HEADERS_FILTER_CODE)); + } } -Filter::StringFilter::StringFilter() : described(false), descriptor(0) {} +bool Filter::setDefaultSubjectFilter(const qpid::amqp::Descriptor& d, const std::string& value) +{ + if (!subjectFilter.requested) { + subjectFilter.key = DEFAULT_SUBJECT_FILTER; + subjectFilter.value = value; + subjectFilter.setDescriptor(d); + return true; + } else { + return false; + } +} + +bool Filter::setDefaultSubjectFilter(bool wildcards) +{ + if (wildcards) { + return setDefaultSubjectFilter(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE), WILDCARD); + } else { + return setDefaultSubjectFilter(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)); + } +} + +Filter::FilterBase::FilterBase() : described(false), requested(false), descriptor(0) {} +Filter::FilterBase::~FilterBase() {} +void Filter::FilterBase::setDescriptor(const qpid::amqp::Descriptor& d) +{ + described = true; + descriptor = d; +} namespace { pn_bytes_t convert(const std::string& in) { @@ -132,8 +320,30 @@ pn_bytes_t convert(const qpid::amqp::CharSequence& in) out.size = in.size; return out; } +qpid::amqp::Descriptor symbolicDescriptor(const std::string& symbol) +{ + qpid::amqp::CharSequence cs; + cs.data = symbol.data(); + cs.size = symbol.size(); + return qpid::amqp::Descriptor(cs); +} } -void Filter::StringFilter::write(pn_data_t* data) + +bool Filter::adjustDirectFilter() +{ + if (subjectFilter.descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) { + if (subjectFilter.descriptor.type == qpid::amqp::Descriptor::NUMERIC) { + subjectFilter.descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE); + } else { + subjectFilter.descriptor = symbolicDescriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL); + } + return true; + } else { + return false; + } +} + +void Filter::FilterBase::write(pn_data_t* data) { pn_data_put_symbol(data, convert(key)); if (described) { @@ -147,26 +357,27 @@ void Filter::StringFilter::write(pn_data_t* data) pn_data_put_symbol(data, convert(descriptor.value.symbol)); break; } + writeValue(data); + pn_data_exit(data); + } else { + writeValue(data); } +} +void Filter::StringFilter::writeValue(pn_data_t* data) +{ pn_data_put_string(data, convert(value)); - if (described) pn_data_exit(data); } -void Filter::StringFilter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue) +void Filter::MapFilter::writeValue(pn_data_t* data) { - if (described && exchange->getType() == DirectExchange::typeName - && descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) { - QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName()); - if (descriptor.type == qpid::amqp::Descriptor::NUMERIC) { - descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE); - } else { - qpid::amqp::CharSequence symbol; - symbol.data = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.data(); - symbol.size = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.size(); - descriptor = qpid::amqp::Descriptor(symbol); - } + pn_data_put_map(data); + pn_data_enter(data); + for (ValueType::const_iterator i = value.begin(); i != value.end(); ++i) { + pn_data_put_string(data, convert(i->first)); + pn_data_put_string(data, convert(i->second));//TODO: other types? } - exchange->bind(queue, value, 0); + pn_data_exit(data); } + }}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h index 72f92b57a5..a76928eb01 100644 --- a/cpp/src/qpid/broker/amqp/Filter.h +++ b/cpp/src/qpid/broker/amqp/Filter.h @@ -23,6 +23,9 @@ */ #include "qpid/amqp/MapReader.h" #include "qpid/amqp/Descriptor.h" +#include "qpid/types/Variant.h" +#include <map> +#include <vector> #include <boost/shared_ptr.hpp> struct pn_data_t; @@ -30,37 +33,87 @@ namespace qpid { namespace broker { class Exchange; class Queue; +class QueueSettings; namespace amqp { - +class Outgoing; class Filter : qpid::amqp::MapReader { public: + Filter(); void read(pn_data_t*); void write(pn_data_t*); - bool hasSubjectFilter() const; - std::string getSubjectFilter() const; - bool hasSelectorFilter() const; - std::string getSelectorFilter() const; + + /** + * Apply filters where source is a queue + */ + void apply(boost::shared_ptr<Outgoing> queue); + + /** + * Configure subscription queue for case where source is an exchange + */ + void configure(QueueSettings&); + /** + * Bind subscription queue for case where source is an exchange + */ void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); private: - struct StringFilter + struct FilterBase { bool described; + bool requested; qpid::amqp::Descriptor descriptor; std::string key; - std::string value; - StringFilter(); + FilterBase(); + virtual ~FilterBase(); void write(pn_data_t*); - void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); + virtual void writeValue(pn_data_t*) = 0; + void setDescriptor(const qpid::amqp::Descriptor&); + }; + struct StringFilter : FilterBase + { + std::string value; + void writeValue(pn_data_t*); + }; + struct MapFilter : FilterBase + { + typedef std::map<std::string, qpid::types::Variant> ValueType; + ValueType value; + void writeValue(pn_data_t*); }; void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor); - void setSubjectFilter(const StringFilter&); - void setSelectorFilter(const StringFilter&); + void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*); + void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*); + void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*); + void onUShortValue(const qpid::amqp::CharSequence&, uint16_t, const qpid::amqp::Descriptor*); + void onUIntValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*); + void onULongValue(const qpid::amqp::CharSequence&, uint64_t, const qpid::amqp::Descriptor*); + void onByteValue(const qpid::amqp::CharSequence&, int8_t, const qpid::amqp::Descriptor*); + void onShortValue(const qpid::amqp::CharSequence&, int16_t, const qpid::amqp::Descriptor*); + void onIntValue(const qpid::amqp::CharSequence&, int32_t, const qpid::amqp::Descriptor*); + void onLongValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*); + void onFloatValue(const qpid::amqp::CharSequence&, float, const qpid::amqp::Descriptor*); + void onDoubleValue(const qpid::amqp::CharSequence&, double, const qpid::amqp::Descriptor*); + bool onStartMapValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor* descriptor); + void onEndMapValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor* descriptor); + void setFilter(StringFilter&, const StringFilter&); + bool hasSubjectFilter() const; + std::string getSubjectFilter() const; + bool hasSelectorFilter() const; + std::string getSelectorFilter() const; + bool setDefaultSubjectFilter(bool wildcards=false); + bool setDefaultSubjectFilter(const qpid::amqp::Descriptor& descriptor, const std::string& value=std::string()); + bool adjustDirectFilter(); + void setDefaultHeadersFilter(); + void setDefaultXQueryFilter(); StringFilter subjectFilter; StringFilter selectorFilter; + StringFilter xqueryFilter; + MapFilter headersFilter; + std::vector<FilterBase*> active; + bool inHeadersMap; }; }}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index adde339fa5..8d07a23499 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -278,34 +278,20 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (node.queue) { boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false)); q->init(); - if (filter.hasSubjectFilter()) { - q->setSubjectFilter(filter.getSubjectFilter()); - } - if (filter.hasSelectorFilter()) { - q->setSelectorFilter(filter.getSelectorFilter()); - } + filter.apply(q); outgoing[link] = q; } else if (node.exchange) { bool durable = pn_terminus_get_durability(source); QueueSettings settings(durable, !durable); - if (filter.hasSelectorFilter()) { - settings.filter = filter.getSelectorFilter(); - QPID_LOG(debug, "Selector specified for outgoing link from exchange " << node.exchange->getName() << ": " << settings.filter); - } + filter.configure(settings); //TODO: populate settings from source details when available from engine std::stringstream queueName;//combination of container id and link name is unique queueName << connection.getContainerId() << "_" << pn_link_name(link); boost::shared_ptr<qpid::broker::Queue> queue = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first; queue->setExclusiveOwner(this); - if (filter.hasSubjectFilter()) { - filter.bind(node.exchange, queue); - filter.write(pn_terminus_filter(pn_link_source(link))); - } else if (node.exchange->getType() == TopicExchange::typeName) { - node.exchange->bind(queue, "#", 0); - } else { - node.exchange->bind(queue, std::string(), 0); - } + + filter.bind(node.exchange, queue); boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, true)); outgoing[link] = q; q->init(); @@ -317,6 +303,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ } + filter.write(pn_terminus_filter(pn_link_source(link))); QPID_LOG(debug, "Outgoing link attached"); } diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 258b4b84bd..f3a42b201b 100644 --- a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -21,6 +21,7 @@ #include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/AddressImpl.h" +#include "qpid/amqp/descriptors.h" #include "qpid/log/Statement.h" #include <vector> #include <set> @@ -64,6 +65,10 @@ const std::string DURABLE("durable"); const std::string NAME("name"); const std::string RELIABILITY("reliability"); const std::string SELECTOR("selector"); +const std::string FILTER("filter"); +const std::string DESCRIPTOR("descriptor"); +const std::string VALUE("value"); +const std::string SUBJECT_FILTER("subject-filter"); //distribution modes: const std::string MOVE("move"); @@ -100,7 +105,15 @@ pn_bytes_t convert(const std::string& s) result.size = s.size(); return result; } +bool hasWildcards(const std::string& key) +{ + return key.find('*') != std::string::npos || key.find('#') != std::string::npos; +} +uint64_t getFilterDescriptor(const std::string& key) +{ + return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; +} bool contains(const Variant::List& list, const std::string& item) { for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { @@ -183,6 +196,58 @@ void flatten(Variant::Map& base, const std::string& nested) base.erase(i); } } +void write(pn_data_t* data, const Variant& value); + +void write(pn_data_t* data, const Variant::Map& map) +{ + pn_data_put_map(data); + pn_data_enter(data); + for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + pn_data_put_string(data, convert(i->first)); + write(data, i->second); + } + pn_data_exit(data); +} +void write(pn_data_t* data, const Variant::List& list) +{ + pn_data_put_list(data); + pn_data_enter(data); + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + write(data, *i); + } + pn_data_exit(data); +} +void write(pn_data_t* data, const Variant& value) +{ + switch (value.getType()) { + case qpid::types::VAR_VOID: + pn_data_put_null(data); + break; + case qpid::types::VAR_BOOL: + pn_data_put_bool(data, value.asBool()); + break; + case qpid::types::VAR_UINT64: + pn_data_put_ulong(data, value.asUint64()); + break; + case qpid::types::VAR_INT64: + pn_data_put_long(data, value.asInt64()); + break; + case qpid::types::VAR_DOUBLE: + pn_data_put_double(data, value.asDouble()); + break; + case qpid::types::VAR_STRING: + pn_data_put_string(data, convert(value.asString())); + break; + case qpid::types::VAR_MAP: + write(data, value.asMap()); + break; + case qpid::types::VAR_LIST: + write(data, value.asList()); + break; + default: + break; + } +} } AddressHelper::AddressHelper(const Address& address) : @@ -241,6 +306,67 @@ AddressHelper::AddressHelper(const Address& address) : if (properties.size() && !(isTemporary || createPolicy.size())) { QPID_LOG(warning, "Properties will be ignored! " << address); } + + qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR); + if (selector != link.end()) { + addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second); + } + if (!address.getSubject().empty()) { + addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject()); + } + qpid::types::Variant::Map::const_iterator filter = link.find(FILTER); + if (filter != link.end()) { + if (filter->second.getType() == qpid::types::VAR_MAP) { + addFilter(filter->second.asMap()); + } else if (filter->second.getType() == qpid::types::VAR_LIST) { + addFilters(filter->second.asList()); + } else { + throw qpid::messaging::AddressError("Filter must be a map or a list of maps, each containing name, descriptor and value."); + } + } +} + +void AddressHelper::addFilters(const qpid::types::Variant::List& f) +{ + for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) { + addFilter(i->asMap()); + } +} + +void AddressHelper::addFilter(const qpid::types::Variant::Map& f) +{ + qpid::types::Variant::Map::const_iterator name = f.find(NAME); + qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR); + qpid::types::Variant::Map::const_iterator value = f.find(VALUE); + //all fields are required at present (may relax this at a later stage): + if (name == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify name"); + } + if (descriptor == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify descriptor"); + } + if (value == f.end()) { + throw qpid::messaging::AddressError("Filter entry must specify value"); + } + try { + addFilter(name->second.asString(), descriptor->second.asUint64(), value->second); + } catch (const qpid::types::InvalidConversion&) { + addFilter(name->second.asString(), descriptor->second.asString(), value->second); + } + +} + +AddressHelper::Filter::Filter() : descriptorCode(0){} +AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant& v) : name(n), descriptorCode(d), value(v) {} +AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant& v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v) {} + +void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value) +{ + filters.push_back(Filter(name, descriptor, value)); +} +void AddressHelper::addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value) +{ + filters.push_back(Filter(name, descriptor, value)); } void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) @@ -331,6 +457,26 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode) if (mode == FOR_RECEIVER && browse) { //when PROTON-139 is resolved, set the required delivery-mode } + //set filter(s): + if (mode == FOR_RECEIVER && !filters.empty()) { + pn_data_t* filter = pn_terminus_filter(terminus); + pn_data_put_map(filter); + pn_data_enter(filter); + for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) { + pn_data_put_symbol(filter, convert(i->name)); + pn_data_put_described(filter); + pn_data_enter(filter); + if (i->descriptorSymbol.size()) { + pn_data_put_symbol(filter, convert(i->descriptorSymbol)); + } else { + pn_data_put_ulong(filter, i->descriptorCode); + } + write(filter, i->value); + pn_data_exit(filter); + } + pn_data_exit(filter); + } + } void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) @@ -401,6 +547,7 @@ Verifier::Verifier() link[X_DECLARE] = true; link[X_BINDINGS] = true; link[SELECTOR] = true; + link[FILTER] = true; defined[LINK] = link; } void Verifier::verify(const Address& address) const diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.h b/cpp/src/qpid/messaging/amqp/AddressHelper.h index d03370b597..2562adfa1e 100644 --- a/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -22,6 +22,7 @@ * */ #include "qpid/types/Variant.h" +#include <vector> struct pn_terminus_t; @@ -29,7 +30,6 @@ namespace qpid { namespace messaging { class Address; namespace amqp { - class AddressHelper { public: @@ -43,6 +43,18 @@ class AddressHelper const qpid::types::Variant::Map& getLinkProperties() const; static std::string getLinkName(const Address& address); private: + struct Filter + { + std::string name; + std::string descriptorSymbol; + uint64_t descriptorCode; + qpid::types::Variant value; + + Filter(); + Filter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value); + Filter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value); + }; + bool isTemporary; std::string createPolicy; std::string assertPolicy; @@ -56,12 +68,17 @@ class AddressHelper bool durableNode; bool durableLink; bool browse; + std::vector<Filter> filters; bool enabled(const std::string& policy, CheckMode mode) const; bool createEnabled(CheckMode mode) const; bool assertEnabled(CheckMode mode) const; void setCapabilities(pn_terminus_t* terminus, bool create); void setNodeProperties(pn_terminus_t* terminus); + void addFilter(const qpid::types::Variant::Map&); + void addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value); + void addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value); + void addFilters(const qpid::types::Variant::List&); }; }}} // namespace qpid::messaging::amqp diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index a495bc25c6..081a0ae78b 100644 --- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -22,7 +22,6 @@ #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" -#include "qpid/amqp/descriptors.h" #include "qpid/log/Statement.h" extern "C" { #include <proton/engine.h> @@ -90,24 +89,6 @@ const std::string& ReceiverContext::getSource() const { return address.getName(); } -namespace { -pn_bytes_t convert(const std::string& s) -{ - pn_bytes_t result; - result.start = const_cast<char*>(s.data()); - result.size = s.size(); - return result; -} -bool hasWildcards(const std::string& key) -{ - return key.find('*') != std::string::npos || key.find('#') != std::string::npos; -} - -uint64_t getFilterDescriptor(const std::string& key) -{ - return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; -} -} void ReceiverContext::verify(pn_terminus_t* source) { helper.checkAssertion(source, AddressHelper::FOR_RECEIVER); @@ -119,34 +100,6 @@ void ReceiverContext::configure() void ReceiverContext::configure(pn_terminus_t* source) { helper.configure(source, AddressHelper::FOR_RECEIVER); - - // Look specifically for qpid.selector link property and add a filter for it - qpid::types::Variant::Map::const_iterator i = helper.getLinkProperties().find("selector"); - if (i!=helper.getLinkProperties().end()) { - pn_data_t* filter = pn_terminus_filter(source); - pn_data_put_map(filter); - pn_data_enter(filter); - pn_data_put_symbol(filter, convert("selector")); - pn_data_put_described(filter); - pn_data_enter(filter); - pn_data_put_ulong(filter, qpid::amqp::filters::SELECTOR_FILTER_CODE); - pn_data_put_string(filter, convert(i->second)); - pn_data_exit(filter); - pn_data_exit(filter); - } - if (!address.getSubject().empty()) { - //filter: - pn_data_t* filter = pn_terminus_filter(source); - pn_data_put_map(filter); - pn_data_enter(filter); - pn_data_put_symbol(filter, convert("subject")); - pn_data_put_described(filter); - pn_data_enter(filter); - pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject())); - pn_data_put_string(filter, convert(address.getSubject())); - pn_data_exit(filter); - pn_data_exit(filter); - } } Address ReceiverContext::getAddress() const diff --git a/specs/apache-filters.xml b/specs/apache-filters.xml index a4e40aa93c..644c796893 100644 --- a/specs/apache-filters.xml +++ b/specs/apache-filters.xml @@ -224,4 +224,22 @@ symbol | String </doc> </type> </section> + <section name="xquery" title="An xquery based filter"> + <type class="restricted" name="xquery" source="string" provides="filter"> + <descriptor name="apache.org:xquery-filter:string" code="0x0000468C:0x00000005"/> + <doc> + <p> + The xquery filter consists of a described string value + containing a valid xquery string against which messages + are matched. + </p> + <p> + Containers which support the filter defined in this + section should advertise the capability + <term>APACHE.ORG:XQUERY</term> in their connection + capabilities when sending the open performative. + </p> + </doc> + </type> + </section> </amqp> |