summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-11-16 21:32:11 +0000
committerGordon Sim <gsim@apache.org>2012-11-16 21:32:11 +0000
commitc2e7cfde679e83a51598fc65ebe5b45299d59a94 (patch)
treeeb35a55e3978d28f0bc73bb10db01dcb9c19ff27 /cpp/src
parent969e6bcd810f82dec73333d5bf4df4d96b68adc5 (diff)
downloadqpid-python-c2e7cfde679e83a51598fc65ebe5b45299d59a94.tar.gz
QPID-4368: Small improvements to setting and checking filter descriptors
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1410576 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/amqp/descriptors.h7
-rw-r--r--cpp/src/qpid/broker/amqp/Session.cpp31
-rw-r--r--cpp/src/qpid/messaging/amqp/ReceiverContext.cpp12
3 files changed, 45 insertions, 5 deletions
diff --git a/cpp/src/qpid/amqp/descriptors.h b/cpp/src/qpid/amqp/descriptors.h
index b0249f6a9f..b2616ed93a 100644
--- a/cpp/src/qpid/amqp/descriptors.h
+++ b/cpp/src/qpid/amqp/descriptors.h
@@ -73,7 +73,14 @@ const Descriptor SASL_INIT(SASL_INIT_CODE);
const Descriptor SASL_CHALLENGE(SASL_CHALLENGE_CODE);
const Descriptor SASL_RESPONSE(SASL_RESPONSE_CODE);
const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE);
+}
+
+namespace filters {
+const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
+const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
+const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000);
+const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001);
}
}} // namespace qpid::amqp
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp
index 5dd6ace943..5a6812edad 100644
--- a/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/cpp/src/qpid/broker/amqp/Session.cpp
@@ -24,10 +24,13 @@
#include "ManagedConnection.h"
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/Descriptor.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
@@ -134,13 +137,33 @@ void Session::attach(pn_link_t* link)
c.size = d.size;
descriptor = qpid::amqp::Descriptor(c);
} else {
- QPID_LOG(notice, "Ignoring filter with descriptor with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter));
+ QPID_LOG(notice, "Ignoring filter " << std::string(fname.start, fname.size) << " with descriptor of type " << pn_data_type(filter));
+ continue;
+ }
+ if (descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)) {
+ if (exchange->getType() == qpid::broker::DirectExchange::typeName) {
+ QPID_LOG(info, "Interpreting legacy topic filter as direct binding key for " << exchange->getName());
+ } else if (exchange->getType() == qpid::broker::FanOutExchange::typeName) {
+ QPID_LOG(info, "Ignoring legacy topic filter on fanout exchange " << exchange->getName());
+ for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value
+ continue;
+ }
+ } else if (descriptor.match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
+ if (exchange->getType() == qpid::broker::TopicExchange::typeName) {
+ QPID_LOG(info, "Interpreting legacy direct filter as topic binding key for " << exchange->getName());
+ } else if (exchange->getType() == qpid::broker::FanOutExchange::typeName) {
+ QPID_LOG(info, "Ignoring legacy direct filter on fanout exchange " << exchange->getName());
+ for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value
+ continue;
+ }
+ } else {
+ QPID_LOG(notice, "Ignoring filter with unsupported descriptor " << descriptor);
+ for (int i = 0; i < 3; ++i) pn_data_next(filter);//move off descriptor, then skip key and value
continue;
}
- QPID_LOG(debug, "Got filter with descriptor " << descriptor);
pn_data_next(filter);
} else {
- QPID_LOG(debug, "Got undescribed filter of type " << pn_data_type(filter));
+ QPID_LOG(info, "Got undescribed filter of type " << pn_data_type(filter));
}
if (pn_data_type(filter) == PN_STRING) {
pn_bytes_t value = pn_data_get_string(filter);
@@ -180,7 +203,7 @@ void Session::attach(pn_link_t* link)
} else if (exchange->getType() == TopicExchange::typeName) {
exchange->bind(queue, "#", 0);
} else {
- throw qpid::Exception("Exchange type not yet supported over 1.0: " + exchange->getType());/*not-supported?*/
+ throw qpid::Exception("Exchange type requires a filter: " + exchange->getType());/*not-supported?*/
}
boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true));
senders[link] = q;
diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 33ddabfc75..0a8f139839 100644
--- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -21,6 +21,7 @@
#include "ReceiverContext.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
+#include "qpid/amqp/descriptors.h"
extern "C" {
#include <proton/engine.h>
}
@@ -94,6 +95,15 @@ pn_bytes_t convert(const std::string& s)
result.size = s.size();
return result;
}
+bool hasWildcards(const std::string& key)
+{
+ return key.find('*') != std::string::npos || key.find('#') != std::string::npos;
+}
+
+uint64_t getFilterDescriptor(const std::string& key)
+{
+ return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
+}
}
void ReceiverContext::configure() const
@@ -110,7 +120,7 @@ void ReceiverContext::configure(pn_terminus_t* source) const
pn_data_put_symbol(filter, convert("subject"));
pn_data_put_described(filter);
pn_data_enter(filter);
- pn_data_put_ulong(filter, 0x0000468C00000001/*LEGACY_TOPIC_FILTER*/);
+ pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
pn_data_put_string(filter, convert(address.getSubject()));
pn_data_exit(filter);
pn_data_exit(filter);