summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-05-08 11:55:56 +0000
committerGordon Sim <gsim@apache.org>2013-05-08 11:55:56 +0000
commit4edff64bd50163999a1389a30a0d204b6a78b06c (patch)
treebd1272b47925b062968d9b9b96d2bed31a813bdd /qpid/cpp
parentdd56c78da224817eab19215b0b57073aa1f59be3 (diff)
downloadqpid-python-4edff64bd50163999a1389a30a0d204b6a78b06c.tar.gz
QPID-4706: allow selectors to be used on links from an exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1480239 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp3
6 files changed, 40 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index d5df660539..4d9a7f2a37 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/FifoDistributor.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Selector.h"
//TODO: get rid of this
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -209,6 +210,10 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
if ( settings.isBrowseOnly ) {
QPID_LOG ( info, "Queue " << name << " is browse-only." );
}
+ if (settings.filter.size()) {
+ selector.reset(new Selector(settings.filter));
+ QPID_LOG (info, "Queue " << name << " using filter: " << settings.filter);
+ }
}
Queue::~Queue()
@@ -234,23 +239,35 @@ bool Queue::isExcluded(const Message& msg)
return traceExclude.size() && msg.isExcluded(traceExclude);
}
-void Queue::deliver(Message msg, TxBuffer* txn){
+bool Queue::accept(const Message& msg)
+{
//TODO: move some of this out of the queue and into the publishing
//'link' for whatever protocol is used; that would let protocol
//specific stuff be kept out the queue
-
if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg, 0);
alternateExchange->route(deliverable);
}
+ return false;
} else if (isLocal(msg)) {
//drop message
QPID_LOG(info, "Dropping 'local' message from " << getName());
+ return false;
} else if (isExcluded(msg)) {
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
+ return false;
+ } else if (selector) {
+ return selector->filter(msg);
} else {
+ return true;
+ }
+}
+
+void Queue::deliver(Message msg, TxBuffer* txn)
+{
+ if (accept(msg)) {
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
@@ -1178,6 +1195,10 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::strin
<< " user:" << userId
<< " rhost:" << connectionId );
queue->destroyed();
+ } else {
+ QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << queue->getName()
+ << " user:" << userId
+ << " rhost:" << connectionId );
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index ef4d956826..ee9c54df29 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -43,6 +43,7 @@
#include "qmf/org/apache/qpid/broker/Broker.h"
#include "qpid/framing/amqp_types.h"
+#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -66,6 +67,7 @@ class QueueDepth;
class QueueEvents;
class QueueRegistry;
class QueueFactory;
+class Selector;
class TransactionContext;
class TxBuffer;
class MessageDistributor;
@@ -165,8 +167,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
UsageBarrier barrier;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
boost::shared_ptr<MessageDistributor> allocator;
+ boost::scoped_ptr<Selector> selector;
virtual void push(Message& msg, bool isRecovery=false);
+ bool accept(const Message&);
void process(Message& msg);
bool enqueue(TransactionContext* ctxt, Message& msg);
bool getNextMessage(Message& msg, Consumer::shared_ptr& c);
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
index d28fa38fde..93b832733c 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
@@ -57,6 +57,7 @@ const std::string FAIRSHARE_ALIAS("x-qpid-fairshare");
const std::string PAGING("qpid.paging");
const std::string MAX_PAGES("qpid.max_pages_loaded");
const std::string PAGE_FACTOR("qpid.page_factor");
+const std::string FILTER("qpid.filter");
const std::string LVQ_LEGACY("qpid.last_value_queue");
const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
@@ -202,6 +203,9 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v
} else if (key == PAGE_FACTOR) {
pageFactor = value;
return true;
+ } else if (key == FILTER) {
+ filter = value.asString();
+ return true;
} else {
return false;
}
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h
index 3f42b53f23..cf430db76d 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.h
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.h
@@ -83,6 +83,8 @@ struct QueueSettings
uint64_t maxFileSize;
uint64_t maxFileCount;
+ std::string filter;
+
//yuck, yuck
qpid::framing::FieldTable storeSettings;
std::map<std::string, qpid::types::Variant> original;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index dd0faae4e7..7360edc764 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -272,6 +272,10 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
outgoing[link] = q;
} else if (node.exchange) {
QueueSettings settings(false, true);
+ if (filter.hasSelectorFilter()) {
+ settings.filter = filter.getSelectorFilter();
+ QPID_LOG(debug, "Selector specified for outgoing link from exchange " << node.exchange->getName() << ": " << settings.filter);
+ }
//TODO: populate settings from source details when available from engine
boost::shared_ptr<qpid::broker::Queue> queue
= broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index a9f2758605..ac3253c572 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -95,6 +95,7 @@ const std::string X_SUBSCRIBE("x-subscribe");
const std::string X_BINDINGS("x-bindings");
const std::string SELECTOR("selector");
const std::string APACHE_SELECTOR("x-apache-selector");
+const std::string QPID_FILTER("qpid.filter");
const std::string EXCHANGE("exchange");
const std::string QUEUE("queue");
const std::string KEY("key");
@@ -523,6 +524,8 @@ Subscription::Subscription(const Address& address, const std::string& type)
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
+ std::string selector = Opt(address)/LINK/SELECTOR;
+ if (!selector.empty()) queueOptions.setString(QPID_FILTER, selector);
if (!address.getSubject().empty()) bindSubject(address.getSubject());
else if (linkBindings.empty()) bindAll();