summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp25
1 files changed, 23 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index d5df660539..4d9a7f2a37 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/FifoDistributor.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Selector.h"
//TODO: get rid of this
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -209,6 +210,10 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
if ( settings.isBrowseOnly ) {
QPID_LOG ( info, "Queue " << name << " is browse-only." );
}
+ if (settings.filter.size()) {
+ selector.reset(new Selector(settings.filter));
+ QPID_LOG (info, "Queue " << name << " using filter: " << settings.filter);
+ }
}
Queue::~Queue()
@@ -234,23 +239,35 @@ bool Queue::isExcluded(const Message& msg)
return traceExclude.size() && msg.isExcluded(traceExclude);
}
-void Queue::deliver(Message msg, TxBuffer* txn){
+bool Queue::accept(const Message& msg)
+{
//TODO: move some of this out of the queue and into the publishing
//'link' for whatever protocol is used; that would let protocol
//specific stuff be kept out the queue
-
if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg, 0);
alternateExchange->route(deliverable);
}
+ return false;
} else if (isLocal(msg)) {
//drop message
QPID_LOG(info, "Dropping 'local' message from " << getName());
+ return false;
} else if (isExcluded(msg)) {
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
+ return false;
+ } else if (selector) {
+ return selector->filter(msg);
} else {
+ return true;
+ }
+}
+
+void Queue::deliver(Message msg, TxBuffer* txn)
+{
+ if (accept(msg)) {
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
@@ -1178,6 +1195,10 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::strin
<< " user:" << userId
<< " rhost:" << connectionId );
queue->destroyed();
+ } else {
+ QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << queue->getName()
+ << " user:" << userId
+ << " rhost:" << connectionId );
}
}