diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java | 48 |
1 files changed, 25 insertions, 23 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index be7a1dc196..71481ec730 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -30,13 +30,13 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortStringTokenizer; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.message.InboundMessage; import javax.management.JMException; import javax.management.MBeanException; @@ -109,7 +109,7 @@ public class TopicExchange extends AbstractExchange private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>(); - private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>(); + private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>(); public static class Binding { @@ -160,7 +160,7 @@ public class TopicExchange extends AbstractExchange private final class TopicExchangeResult implements TopicMatcherResult { private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); - private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>(); + private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>(); public void addUnfilteredQueue(AMQQueue queue) { @@ -190,12 +190,12 @@ public class TopicExchange extends AbstractExchange } - public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter) + public void addFilteredQueue(AMQQueue queue, MessageFilter filter) { - Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); if(filters == null) { - filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(); + filters = new ConcurrentHashMap<MessageFilter,Integer>(); _filteredQueues.put(queue, filters); } Integer instances = filters.get(filter); @@ -210,9 +210,9 @@ public class TopicExchange extends AbstractExchange } - public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter) + public void removeFilteredQueue(AMQQueue queue, MessageFilter filter) { - Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); if(filters != null) { Integer instances = filters.get(filter); @@ -237,11 +237,11 @@ public class TopicExchange extends AbstractExchange } public void replaceQueueFilter(AMQQueue queue, - MessageFilter<RuntimeException> oldFilter, - MessageFilter<RuntimeException> newFilter) + MessageFilter oldFilter, + MessageFilter newFilter) { - Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); - Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters); + Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); + Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters); Integer oldFilterInstances = filters.get(oldFilter); if(oldFilterInstances == 1) { @@ -263,7 +263,7 @@ public class TopicExchange extends AbstractExchange _filteredQueues.put(queue,newFilters); } - public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues) + public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues) { if(queues == null) { @@ -284,11 +284,11 @@ public class TopicExchange extends AbstractExchange queues.addAll(_unfilteredQueues.keySet()); if(!_filteredQueues.isEmpty()) { - for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet()) + for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet()) { if(!queues.contains(entry.getKey())) { - for(MessageFilter<RuntimeException> filter : entry.getValue().keySet()) + for(MessageFilter filter : entry.getValue().keySet()) { if(filter.matches(msg)) { @@ -456,18 +456,18 @@ public class TopicExchange extends AbstractExchange } - private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args) + private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQException { final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); - WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString); + WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); JMSSelectorFilter selector = null; if(selectorRef == null || (selector = selectorRef.get())==null) { - selector = new JMSSelectorFilter<RuntimeException>(selectorString); - _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector)); + selector = new JMSSelectorFilter(selectorString); + _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); } return selector; } @@ -528,10 +528,12 @@ public class TopicExchange extends AbstractExchange return normalizedString; } - public void route(IncomingMessage payload) throws AMQException + public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException { - final AMQShortString routingKey = payload.getRoutingKey(); + final AMQShortString routingKey = payload.getRoutingKey() == null + ? AMQShortString.EMPTY_STRING + : new AMQShortString(payload.getRoutingKey()); // The copy here is unfortunate, but not too bad relevant to the amount of // things created and copied in getMatchedQueues @@ -543,7 +545,7 @@ public class TopicExchange extends AbstractExchange _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); } - payload.enqueue(queues); + return queues; } @@ -646,7 +648,7 @@ public class TopicExchange extends AbstractExchange } } - private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey) + private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey) { Collection<TopicMatcherResult> results = _parser.parse(routingKey); |