diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java | 232 |
1 files changed, 203 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 3a8a86e654..27166e4384 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -20,29 +20,39 @@ */ package org.apache.qpid.server.exchange; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import javax.management.JMException; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.filter.SelectorParsingException; +import org.apache.qpid.filter.selector.ParseException; +import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.topic.*; +import org.apache.qpid.server.exchange.topic.TopicExchangeResult; +import org.apache.qpid.server.exchange.topic.TopicMatcherResult; +import org.apache.qpid.server.exchange.topic.TopicNormalizer; +import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; - -import javax.management.JMException; -import java.sql.Array; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import java.lang.ref.WeakReference; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.Filterable; +import org.apache.qpid.server.virtualhost.VirtualHost; public class TopicExchange extends AbstractExchange { @@ -113,24 +123,26 @@ public class TopicExchange extends AbstractExchange FieldTable oldArgs = _bindings.get(binding); TopicExchangeResult result = _topicExchangeResults.get(routingKey); - if(argumentsContainSelector(args)) + if(argumentsContainFilter(args)) { - if(argumentsContainSelector(oldArgs)) + if(argumentsContainFilter(oldArgs)) { - result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args)); + result.replaceQueueFilter(queue, + createMessageFilter(oldArgs, queue), + createMessageFilter(args, queue)); } else { - result.addFilteredQueue(queue,createSelectorFilter(args)); + result.addFilteredQueue(queue, createMessageFilter(args, queue)); result.removeUnfilteredQueue(queue); } } else { - if(argumentsContainSelector(oldArgs)) + if(argumentsContainFilter(oldArgs)) { result.addUnfilteredQueue(queue); - result.removeFilteredQueue(queue, createSelectorFilter(oldArgs)); + result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue)); } else { @@ -149,9 +161,9 @@ public class TopicExchange extends AbstractExchange if(result == null) { result = new TopicExchangeResult(); - if(argumentsContainSelector(args)) + if(argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createSelectorFilter(args)); + result.addFilteredQueue(queue, createMessageFilter(args, queue)); } else { @@ -162,9 +174,9 @@ public class TopicExchange extends AbstractExchange } else { - if(argumentsContainSelector(args)) + if(argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createSelectorFilter(args)); + result.addFilteredQueue(queue, createMessageFilter(args, queue)); } else { @@ -178,26 +190,74 @@ public class TopicExchange extends AbstractExchange } - private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQInvalidArgumentException + private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + + } + + private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException + { final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); JMSSelectorFilter selector = null; if(selectorRef == null || (selector = selectorRef.get())==null) { - selector = new JMSSelectorFilter(selectorString); + try + { + selector = new JMSSelectorFilter(selectorString); + } + catch (ParseException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (SelectorParsingException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (TokenMgrError e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); } return selector; } - private static boolean argumentsContainSelector(final FieldTable args) + private static boolean argumentsContainFilter(final FieldTable args) { - return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0; + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); } + private static boolean argumentsContainNoLocal(final FieldTable args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); + } + + private static boolean argumentsContainJMSSelector(final FieldTable args) + { + return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) + && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); + } + + public ArrayList<BaseQueue> doRoute(InboundMessage payload) { @@ -251,6 +311,28 @@ public class TopicExchange extends AbstractExchange } } + public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) + { + Binding binding = new Binding(null, bindingKey, queue, this, arguments); + if (arguments == null) + { + return _bindings.containsKey(binding); + } + else + { + FieldTable o = _bindings.get(binding); + if (o != null) + { + return arguments.equals(FieldTable.convertToMap(o)); + } + else + { + return false; + } + } + + } + public boolean isBound(AMQShortString routingKey, AMQQueue queue) { return isBound(routingKey, null, queue); @@ -297,11 +379,11 @@ public class TopicExchange extends AbstractExchange result.removeBinding(binding); - if(argumentsContainSelector(bindingArgs)) + if(argumentsContainFilter(bindingArgs)) { try { - result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs)); + result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue())); } catch (AMQInvalidArgumentException e) { @@ -378,4 +460,96 @@ public class TopicExchange extends AbstractExchange deregisterQueue(binding); } + private static final class NoLocalFilter implements MessageFilter + { + private final AMQQueue _queue; + + public NoLocalFilter(AMQQueue queue) + { + _queue = queue; + } + + public boolean matches(Filterable message) + { + InboundMessage inbound = (InboundMessage) message; + final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); + return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); + + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (o == null || getClass() != o.getClass()) + { + return false; + } + + NoLocalFilter that = (NoLocalFilter) o; + + return _queue == null ? that._queue == null : _queue.equals(that._queue); + } + + @Override + public int hashCode() + { + return _queue != null ? _queue.hashCode() : 0; + } + } + + private static final class CompoundFilter implements MessageFilter + { + private MessageFilter _noLocalFilter; + private MessageFilter _jmsSelectorFilter; + + public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) + { + _noLocalFilter = filter; + _jmsSelectorFilter = jmsSelectorFilter; + } + + public boolean matches(Filterable message) + { + return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + CompoundFilter that = (CompoundFilter) o; + + if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) + { + return false; + } + if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; + result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); + return result; + } + } } |