diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java | 41 |
1 files changed, 35 insertions, 6 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index f9cbfeb78b..295a7191e7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -21,23 +21,24 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.binding.Binding; import javax.management.JMException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; /** @@ -145,6 +146,33 @@ public class HeadersExchange extends AbstractExchange return new ArrayList<BaseQueue>(queues); } + + public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) + { + CopyOnWriteArraySet<Binding> bindings; + if(bindingKey == null) + { + bindings = new CopyOnWriteArraySet<Binding>(getBindings()); + } + else + { + bindings = _bindingsByKey.get(bindingKey); + } + + if(bindings != null) + { + for(Binding binding : bindings) + { + if(queue == null || binding.getQueue().equals(queue)) + { + return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments); + } + } + } + + return false; + } + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) { //fixme isBound here should take the arguements in to consideration. @@ -250,10 +278,11 @@ public class HeadersExchange extends AbstractExchange { bindings.remove(binding); } - + + boolean removedBinding = _bindingHeaderMatchers.remove(new HeadersBinding(binding)); if(_logger.isDebugEnabled()) { - _logger.debug("Removing Binding: " + _bindingHeaderMatchers.remove(new HeadersBinding(binding))); + _logger.debug("Removing Binding: " + removedBinding); } } |