diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 25 |
1 files changed, 23 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index d5df660539..4d9a7f2a37 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/FifoDistributor.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Selector.h" //TODO: get rid of this #include "qpid/broker/amqp_0_10/MessageTransfer.h" @@ -209,6 +210,10 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, if ( settings.isBrowseOnly ) { QPID_LOG ( info, "Queue " << name << " is browse-only." ); } + if (settings.filter.size()) { + selector.reset(new Selector(settings.filter)); + QPID_LOG (info, "Queue " << name << " using filter: " << settings.filter); + } } Queue::~Queue() @@ -234,23 +239,35 @@ bool Queue::isExcluded(const Message& msg) return traceExclude.size() && msg.isExcluded(traceExclude); } -void Queue::deliver(Message msg, TxBuffer* txn){ +bool Queue::accept(const Message& msg) +{ //TODO: move some of this out of the queue and into the publishing //'link' for whatever protocol is used; that would let protocol //specific stuff be kept out the queue - if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg, 0); alternateExchange->route(deliverable); } + return false; } else if (isLocal(msg)) { //drop message QPID_LOG(info, "Dropping 'local' message from " << getName()); + return false; } else if (isExcluded(msg)) { //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); + return false; + } else if (selector) { + return selector->filter(msg); } else { + return true; + } +} + +void Queue::deliver(Message msg, TxBuffer* txn) +{ + if (accept(msg)) { if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); txn->enlist(op); @@ -1178,6 +1195,10 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::strin << " user:" << userId << " rhost:" << connectionId ); queue->destroyed(); + } else { + QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << queue->getName() + << " user:" << userId + << " rhost:" << connectionId ); } } |
