diff options
author | Gordon Sim <gsim@apache.org> | 2012-11-17 17:08:14 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2012-11-17 17:08:14 +0000 |
commit | aefe511f1c39025c8ddd8b83d3c48787b96cba50 (patch) | |
tree | 85acb3bd6f909e97b48126c967fd564b7f7c8897 | |
parent | 2b87d88b857e666ef81d31bcde3db0b17bb72ef3 (diff) | |
download | qpid-python-aefe511f1c39025c8ddd8b83d3c48787b96cba50.tar.gz |
QPID-4368: Added support for subject filtering on queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1410750 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Filter.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Filter.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 7 |
5 files changed, 66 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.cpp b/qpid/cpp/src/qpid/broker/amqp/Filter.cpp index 61e377c72f..38baba0df1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Filter.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Filter.cpp @@ -74,6 +74,11 @@ bool Filter::hasSubjectFilter() const return !subjectFilter.value.empty(); } +std::string Filter::getSubjectFilter() const +{ + return subjectFilter.value; +} + void Filter::setSubjectFilter(const StringFilter& filter) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.h b/qpid/cpp/src/qpid/broker/amqp/Filter.h index 5e2dee4d6e..20cceb372a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Filter.h +++ b/qpid/cpp/src/qpid/broker/amqp/Filter.h @@ -39,6 +39,7 @@ class Filter : qpid::amqp::MapReader void read(pn_data_t*); void write(pn_data_t*); bool hasSubjectFilter() const; + std::string getSubjectFilter() const; void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); private: struct StringFilter diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 70c6b9ebd5..665bf2def4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/amqp/Header.h" #include "qpid/broker/amqp/Translation.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/TopicKeyNode.h" #include "qpid/sys/OutputControl.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/log/Statement.h" @@ -163,6 +164,57 @@ bool Outgoing::accept(const qpid::broker::Message&) return canDeliver(); } +void Outgoing::setSubjectFilter(const std::string& f) +{ + subjectFilter = f; +} + +namespace { + +bool match(TokenIterator& filter, TokenIterator& target) +{ + bool wild = false; + while (!filter.finished()) + { + if (filter.match1('*')) { + if (target.finished()) return false; + //else move to next word in filter target + filter.next(); + target.next(); + } else if (filter.match1('#')) { + // i.e. filter word is '#' which can match a variable number of words in the target + filter.next(); + if (filter.finished()) return true; + else if (target.finished()) return false; + wild = true; + } else { + //filter word needs to match target exactly + if (target.finished()) return false; + std::string word; + target.pop(word); + if (filter.match(word)) { + wild = false; + filter.next(); + } else if (!wild) { + return false; + } + } + } + return target.finished(); +} +bool match(const std::string& filter, const std::string& target) +{ + TokenIterator lhs(filter); + TokenIterator rhs(target); + return match(lhs, rhs); +} +} + +bool Outgoing::filter(const qpid::broker::Message& m) +{ + return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()); +} + void Outgoing::cancel() {} void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {} diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 91670bcd79..a8450a48cf 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -60,6 +60,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from { public: Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic); + void setSubjectFilter(const std::string&); void init(); bool dispatch(); void write(const char* data, size_t size); @@ -71,6 +72,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); void notify(); bool accept(const qpid::broker::Message&); + bool filter(const qpid::broker::Message&); void cancel(); void acknowledged(const qpid::broker::DeliveryRecord&); qpid::broker::OwnershipToken* getSession(); @@ -99,6 +101,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from size_t current; int outstanding; std::vector<char> buffer; + std::string subjectFilter; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 93b8a747dc..760fa2d902 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -122,18 +122,21 @@ void Session::attach(pn_link_t* link) pn_terminus_set_address(pn_link_source(link), name.c_str()); ResolvedNode node = resolve(name, source); + Filter filter; + filter.read(pn_terminus_filter(source)); if (node.queue) { boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false)); q->init(); + if (filter.hasSubjectFilter()) { + q->setSubjectFilter(filter.getSubjectFilter()); + } senders[link] = q; } else if (node.exchange) { QueueSettings settings(false, true); //TODO: populate settings from source details when available from engine boost::shared_ptr<qpid::broker::Queue> queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; - Filter filter; - filter.read(pn_terminus_filter(source)); if (filter.hasSubjectFilter()) { filter.bind(node.exchange, queue); filter.write(pn_terminus_filter(pn_link_source(link))); |