diff options
author | Gordon Sim <gsim@apache.org> | 2012-11-16 21:32:11 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2012-11-16 21:32:11 +0000 |
commit | c2e7cfde679e83a51598fc65ebe5b45299d59a94 (patch) | |
tree | eb35a55e3978d28f0bc73bb10db01dcb9c19ff27 /cpp/src | |
parent | 969e6bcd810f82dec73333d5bf4df4d96b68adc5 (diff) | |
download | qpid-python-c2e7cfde679e83a51598fc65ebe5b45299d59a94.tar.gz |
QPID-4368: Small improvements to setting and checking filter descriptors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1410576 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/amqp/descriptors.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Session.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 12 |
3 files changed, 45 insertions, 5 deletions
diff --git a/cpp/src/qpid/amqp/descriptors.h b/cpp/src/qpid/amqp/descriptors.h index b0249f6a9f..b2616ed93a 100644 --- a/cpp/src/qpid/amqp/descriptors.h +++ b/cpp/src/qpid/amqp/descriptors.h @@ -73,7 +73,14 @@ const Descriptor SASL_INIT(SASL_INIT_CODE); const Descriptor SASL_CHALLENGE(SASL_CHALLENGE_CODE); const Descriptor SASL_RESPONSE(SASL_RESPONSE_CODE); const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE); +} + +namespace filters { +const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string"); +const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string"); +const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000); +const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001); } }} // namespace qpid::amqp diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index 5dd6ace943..5a6812edad 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -24,10 +24,13 @@ #include "ManagedConnection.h" #include "qpid/amqp/CharSequence.h" #include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/descriptors.h" #include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/DirectExchange.h" +#include "qpid/broker/TopicExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" @@ -134,13 +137,33 @@ void Session::attach(pn_link_t* link) c.size = d.size; descriptor = qpid::amqp::Descriptor(c); } else { - QPID_LOG(notice, "Ignoring filter with descriptor with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter)); + QPID_LOG(notice, "Ignoring filter " << std::string(fname.start, fname.size) << " with descriptor of type " << pn_data_type(filter)); + continue; + } + if (descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) { + if (exchange->getType() == qpid::broker::DirectExchange::typeName) { + QPID_LOG(info, "Interpreting legacy topic filter as direct binding key for " << exchange->getName()); + } else if (exchange->getType() == qpid::broker::FanOutExchange::typeName) { + QPID_LOG(info, "Ignoring legacy topic filter on fanout exchange " << exchange->getName()); + for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value + continue; + } + } else if (descriptor.match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) { + if (exchange->getType() == qpid::broker::TopicExchange::typeName) { + QPID_LOG(info, "Interpreting legacy direct filter as topic binding key for " << exchange->getName()); + } else if (exchange->getType() == qpid::broker::FanOutExchange::typeName) { + QPID_LOG(info, "Ignoring legacy direct filter on fanout exchange " << exchange->getName()); + for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value + continue; + } + } else { + QPID_LOG(notice, "Ignoring filter with unsupported descriptor " << descriptor); + for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value continue; } - QPID_LOG(debug, "Got filter with descriptor " << descriptor); pn_data_next(filter); } else { - QPID_LOG(debug, "Got undescribed filter of type " << pn_data_type(filter)); + QPID_LOG(info, "Got undescribed filter of type " << pn_data_type(filter)); } if (pn_data_type(filter) == PN_STRING) { pn_bytes_t value = pn_data_get_string(filter); @@ -180,7 +203,7 @@ void Session::attach(pn_link_t* link) } else if (exchange->getType() == TopicExchange::typeName) { exchange->bind(queue, "#", 0); } else { - throw qpid::Exception("Exchange type not yet supported over 1.0: " + exchange->getType());/*not-supported?*/ + throw qpid::Exception("Exchange type requires a filter: " + exchange->getType());/*not-supported?*/ } boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true)); senders[link] = q; diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 33ddabfc75..0a8f139839 100644 --- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -21,6 +21,7 @@ #include "ReceiverContext.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" +#include "qpid/amqp/descriptors.h" extern "C" { #include <proton/engine.h> } @@ -94,6 +95,15 @@ pn_bytes_t convert(const std::string& s) result.size = s.size(); return result; } +bool hasWildcards(const std::string& key) +{ + return key.find('*') != std::string::npos || key.find('#') != std::string::npos; +} + +uint64_t getFilterDescriptor(const std::string& key) +{ + return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; +} } void ReceiverContext::configure() const @@ -110,7 +120,7 @@ void ReceiverContext::configure(pn_terminus_t* source) const pn_data_put_symbol(filter, convert("subject")); pn_data_put_described(filter); pn_data_enter(filter); - pn_data_put_ulong(filter, 0x0000468C00000001/*LEGACY_TOPIC_FILTER*/); + pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject())); pn_data_put_string(filter, convert(address.getSubject())); pn_data_exit(filter); pn_data_exit(filter); |