summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp18
1 files changed, 15 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 165830151d..436431fce1 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -59,6 +59,7 @@ Queue::Queue(const string& _name, bool _autodelete,
owner(_owner),
consumerCount(0),
exclusive(false),
+ noLocal(false),
persistenceId(0)
{
if (parent != 0)
@@ -90,6 +91,10 @@ void Queue::notifyDurableIOComplete()
notify();
}
+bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+{
+ return noLocal && owner && owner->isLocal(msg->getPublisher());
+}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
@@ -97,9 +102,10 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
DeliverableMessage deliverable(msg);
alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
}
+ } else if (isLocal(msg)) {
+ //drop message
+ QPID_LOG(debug, "Dropping 'local' message from " << getName());
} else {
-
-
// if no store then mark as enqueued
if (!enqueue(0, msg)){
push(msg);
@@ -468,6 +474,7 @@ namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
+ const std::string qpidNoLocal("no-local");
}
void Queue::create(const FieldTable& _settings)
@@ -484,8 +491,13 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings)
{
std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
- if (_policy->getMaxCount() || _policy->getMaxSize())
+ if (_policy->getMaxCount() || _policy->getMaxSize()) {
setPolicy(_policy);
+ }
+ if (owner) {
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ }
}
void Queue::destroy()