diff options
Diffstat (limited to 'java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java')
-rw-r--r-- | java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 9870551313..ea0faf132e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.filter; import java.lang.ref.WeakReference; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.WeakHashMap; @@ -29,6 +30,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -120,19 +122,25 @@ public class FilterSupport public static final class NoLocalFilter implements MessageFilter { - private final MessageSource _queue; + private final MessageSource<?,?> _queue; - public NoLocalFilter(MessageSource queue) + private NoLocalFilter(MessageSource queue) { _queue = queue; } public boolean matches(Filterable message) { - final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); - return exclusiveOwningSession == null || - exclusiveOwningSession.getConnectionReference() != message.getConnectionReference(); + final Collection<? extends Consumer> consumers = _queue.getConsumers(); + for(Consumer c : consumers) + { + if(c.getSessionModel().getConnectionReference() == message.getConnectionReference()) + { + return false; + } + } + return !consumers.isEmpty(); } @Override |