summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-11-17 17:08:14 +0000
committerGordon Sim <gsim@apache.org>2012-11-17 17:08:14 +0000
commitaefe511f1c39025c8ddd8b83d3c48787b96cba50 (patch)
tree85acb3bd6f909e97b48126c967fd564b7f7c8897
parent2b87d88b857e666ef81d31bcde3db0b17bb72ef3 (diff)
downloadqpid-python-aefe511f1c39025c8ddd8b83d3c48787b96cba50.tar.gz
QPID-4368: Added support for subject filtering on queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1410750 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Filter.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Filter.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp52
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp7
5 files changed, 66 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.cpp b/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
index 61e377c72f..38baba0df1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
@@ -74,6 +74,11 @@ bool Filter::hasSubjectFilter() const
return !subjectFilter.value.empty();
}
+std::string Filter::getSubjectFilter() const
+{
+ return subjectFilter.value;
+}
+
void Filter::setSubjectFilter(const StringFilter& filter)
{
diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.h b/qpid/cpp/src/qpid/broker/amqp/Filter.h
index 5e2dee4d6e..20cceb372a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Filter.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Filter.h
@@ -39,6 +39,7 @@ class Filter : qpid::amqp::MapReader
void read(pn_data_t*);
void write(pn_data_t*);
bool hasSubjectFilter() const;
+ std::string getSubjectFilter() const;
void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
private:
struct StringFilter
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 70c6b9ebd5..665bf2def4 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/amqp/Header.h"
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/TopicKeyNode.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/log/Statement.h"
@@ -163,6 +164,57 @@ bool Outgoing::accept(const qpid::broker::Message&)
return canDeliver();
}
+void Outgoing::setSubjectFilter(const std::string& f)
+{
+ subjectFilter = f;
+}
+
+namespace {
+
+bool match(TokenIterator& filter, TokenIterator& target)
+{
+ bool wild = false;
+ while (!filter.finished())
+ {
+ if (filter.match1('*')) {
+ if (target.finished()) return false;
+ //else move to next word in filter target
+ filter.next();
+ target.next();
+ } else if (filter.match1('#')) {
+ // i.e. filter word is '#' which can match a variable number of words in the target
+ filter.next();
+ if (filter.finished()) return true;
+ else if (target.finished()) return false;
+ wild = true;
+ } else {
+ //filter word needs to match target exactly
+ if (target.finished()) return false;
+ std::string word;
+ target.pop(word);
+ if (filter.match(word)) {
+ wild = false;
+ filter.next();
+ } else if (!wild) {
+ return false;
+ }
+ }
+ }
+ return target.finished();
+}
+bool match(const std::string& filter, const std::string& target)
+{
+ TokenIterator lhs(filter);
+ TokenIterator rhs(target);
+ return match(lhs, rhs);
+}
+}
+
+bool Outgoing::filter(const qpid::broker::Message& m)
+{
+ return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey());
+}
+
void Outgoing::cancel() {}
void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 91670bcd79..a8450a48cf 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -60,6 +60,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from
{
public:
Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic);
+ void setSubjectFilter(const std::string&);
void init();
bool dispatch();
void write(const char* data, size_t size);
@@ -71,6 +72,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from
bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
void notify();
bool accept(const qpid::broker::Message&);
+ bool filter(const qpid::broker::Message&);
void cancel();
void acknowledged(const qpid::broker::DeliveryRecord&);
qpid::broker::OwnershipToken* getSession();
@@ -99,6 +101,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from
size_t current;
int outstanding;
std::vector<char> buffer;
+ std::string subjectFilter;
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 93b8a747dc..760fa2d902 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -122,18 +122,21 @@ void Session::attach(pn_link_t* link)
pn_terminus_set_address(pn_link_source(link), name.c_str());
ResolvedNode node = resolve(name, source);
+ Filter filter;
+ filter.read(pn_terminus_filter(source));
if (node.queue) {
boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false));
q->init();
+ if (filter.hasSubjectFilter()) {
+ q->setSubjectFilter(filter.getSubjectFilter());
+ }
senders[link] = q;
} else if (node.exchange) {
QueueSettings settings(false, true);
//TODO: populate settings from source details when available from engine
boost::shared_ptr<qpid::broker::Queue> queue
= broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
- Filter filter;
- filter.read(pn_terminus_filter(source));
if (filter.hasSubjectFilter()) {
filter.bind(node.exchange, queue);
filter.write(pn_terminus_filter(pn_link_source(link)));