diff options
author | Gordon Sim <gsim@apache.org> | 2013-05-08 11:55:56 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-05-08 11:55:56 +0000 |
commit | 4edff64bd50163999a1389a30a0d204b6a78b06c (patch) | |
tree | bd1272b47925b062968d9b9b96d2bed31a813bdd /qpid/cpp | |
parent | dd56c78da224817eab19215b0b57073aa1f59be3 (diff) | |
download | qpid-python-4edff64bd50163999a1389a30a0d204b6a78b06c.tar.gz |
QPID-4706: allow selectors to be used on links from an exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1480239 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 3 |
6 files changed, 40 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d5df660539..4d9a7f2a37 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/FifoDistributor.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Selector.h" //TODO: get rid of this #include "qpid/broker/amqp_0_10/MessageTransfer.h" @@ -209,6 +210,10 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, if ( settings.isBrowseOnly ) { QPID_LOG ( info, "Queue " << name << " is browse-only." ); } + if (settings.filter.size()) { + selector.reset(new Selector(settings.filter)); + QPID_LOG (info, "Queue " << name << " using filter: " << settings.filter); + } } Queue::~Queue() @@ -234,23 +239,35 @@ bool Queue::isExcluded(const Message& msg) return traceExclude.size() && msg.isExcluded(traceExclude); } -void Queue::deliver(Message msg, TxBuffer* txn){ +bool Queue::accept(const Message& msg) +{ //TODO: move some of this out of the queue and into the publishing //'link' for whatever protocol is used; that would let protocol //specific stuff be kept out the queue - if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg, 0); alternateExchange->route(deliverable); } + return false; } else if (isLocal(msg)) { //drop message QPID_LOG(info, "Dropping 'local' message from " << getName()); + return false; } else if (isExcluded(msg)) { //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); + return false; + } else if (selector) { + return selector->filter(msg); } else { + return true; + } +} + +void Queue::deliver(Message msg, TxBuffer* txn) +{ + if (accept(msg)) { if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); txn->enlist(op); @@ -1178,6 +1195,10 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::strin << " user:" << userId << " rhost:" << connectionId ); queue->destroyed(); + } else { + QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << queue->getName() + << " user:" << userId + << " rhost:" << connectionId ); } } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index ef4d956826..ee9c54df29 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -43,6 +43,7 @@ #include "qmf/org/apache/qpid/broker/Broker.h" #include "qpid/framing/amqp_types.h" +#include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> #include <boost/enable_shared_from_this.hpp> @@ -66,6 +67,7 @@ class QueueDepth; class QueueEvents; class QueueRegistry; class QueueFactory; +class Selector; class TransactionContext; class TxBuffer; class MessageDistributor; @@ -165,8 +167,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; boost::shared_ptr<MessageDistributor> allocator; + boost::scoped_ptr<Selector> selector; virtual void push(Message& msg, bool isRecovery=false); + bool accept(const Message&); void process(Message& msg); bool enqueue(TransactionContext* ctxt, Message& msg); bool getNextMessage(Message& msg, Consumer::shared_ptr& c); diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index d28fa38fde..93b832733c 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -57,6 +57,7 @@ const std::string FAIRSHARE_ALIAS("x-qpid-fairshare"); const std::string PAGING("qpid.paging"); const std::string MAX_PAGES("qpid.max_pages_loaded"); const std::string PAGE_FACTOR("qpid.page_factor"); +const std::string FILTER("qpid.filter"); const std::string LVQ_LEGACY("qpid.last_value_queue"); const std::string LVQ_LEGACY_KEY("qpid.LVQ_key"); @@ -202,6 +203,9 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v } else if (key == PAGE_FACTOR) { pageFactor = value; return true; + } else if (key == FILTER) { + filter = value.asString(); + return true; } else { return false; } diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h index 3f42b53f23..cf430db76d 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.h +++ b/qpid/cpp/src/qpid/broker/QueueSettings.h @@ -83,6 +83,8 @@ struct QueueSettings uint64_t maxFileSize; uint64_t maxFileCount; + std::string filter; + //yuck, yuck qpid::framing::FieldTable storeSettings; std::map<std::string, qpid::types::Variant> original; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index dd0faae4e7..7360edc764 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -272,6 +272,10 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s outgoing[link] = q; } else if (node.exchange) { QueueSettings settings(false, true); + if (filter.hasSelectorFilter()) { + settings.filter = filter.getSelectorFilter(); + QPID_LOG(debug, "Selector specified for outgoing link from exchange " << node.exchange->getName() << ": " << settings.filter); + } //TODO: populate settings from source details when available from engine boost::shared_ptr<qpid::broker::Queue> queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index a9f2758605..ac3253c572 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -95,6 +95,7 @@ const std::string X_SUBSCRIBE("x-subscribe"); const std::string X_BINDINGS("x-bindings"); const std::string SELECTOR("selector"); const std::string APACHE_SELECTOR("x-apache-selector"); +const std::string QPID_FILTER("qpid.filter"); const std::string EXCHANGE("exchange"); const std::string QUEUE("queue"); const std::string KEY("key"); @@ -523,6 +524,8 @@ Subscription::Subscription(const Address& address, const std::string& type) { (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); + std::string selector = Opt(address)/LINK/SELECTOR; + if (!selector.empty()) queueOptions.setString(QPID_FILTER, selector); if (!address.getSubject().empty()) bindSubject(address.getSubject()); else if (linkBindings.empty()) bindAll(); |