summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/amqp/Filter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/amqp/Filter.cpp')
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.cpp150
1 files changed, 150 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp
new file mode 100644
index 0000000000..38baba0df1
--- /dev/null
+++ b/cpp/src/qpid/broker/amqp/Filter.cpp
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/log/Statement.h"
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+void Filter::read(pn_data_t* data)
+{
+ try {
+ DataReader reader(*this);
+ reader.read(data);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Error parsing filter: " << e.what());
+ }
+}
+
+void Filter::write(pn_data_t* data)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ subjectFilter.write(data);
+ pn_data_exit(data);
+}
+
+void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor)
+{
+ StringFilter filter;
+ filter.key = std::string(key.data, key.size);
+ filter.value = std::string(value.data, value.size);
+ if (descriptor) {
+ filter.described = true;
+ filter.descriptor = *descriptor;
+ if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)
+ || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
+ setSubjectFilter(filter);
+ } else {
+ QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor);
+ }
+ } else {
+ setSubjectFilter(filter);
+ }
+}
+
+bool Filter::hasSubjectFilter() const
+{
+ return !subjectFilter.value.empty();
+}
+
+std::string Filter::getSubjectFilter() const
+{
+ return subjectFilter.value;
+}
+
+
+void Filter::setSubjectFilter(const StringFilter& filter)
+{
+ if (hasSubjectFilter()) {
+ QPID_LOG(notice, "Skipping filter with key " << filter.key << "; subject filter already set");
+ } else {
+ subjectFilter = filter;
+ }
+}
+
+void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
+{
+ subjectFilter.bind(exchange, queue);
+}
+
+Filter::StringFilter::StringFilter() : described(false), descriptor(0) {}
+namespace {
+pn_bytes_t convert(const std::string& in)
+{
+ pn_bytes_t out;
+ out.start = const_cast<char*>(in.data());
+ out.size = in.size();
+ return out;
+}
+pn_bytes_t convert(const qpid::amqp::CharSequence& in)
+{
+ pn_bytes_t out;
+ out.start = const_cast<char*>(in.data);
+ out.size = in.size;
+ return out;
+}
+}
+void Filter::StringFilter::write(pn_data_t* data)
+{
+ pn_data_put_symbol(data, convert(key));
+ if (described) {
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ switch (descriptor.type) {
+ case qpid::amqp::Descriptor::NUMERIC:
+ pn_data_put_ulong(data, descriptor.value.code);
+ break;
+ case qpid::amqp::Descriptor::SYMBOLIC:
+ pn_data_put_symbol(data, convert(descriptor.value.symbol));
+ break;
+ }
+ }
+ pn_data_put_string(data, convert(value));
+ if (described) pn_data_exit(data);
+}
+
+void Filter::StringFilter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
+{
+ if (described && exchange->getType() == DirectExchange::typeName
+ && descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
+ QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " << exchange->getName());
+ if (descriptor.type == qpid::amqp::Descriptor::NUMERIC) {
+ descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE);
+ } else {
+ qpid::amqp::CharSequence symbol;
+ symbol.data = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.data();
+ symbol.size = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.size();
+ descriptor = qpid::amqp::Descriptor(symbol);
+ }
+ }
+ exchange->bind(queue, value, 0);
+}
+
+}}} // namespace qpid::broker::amqp