summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java48
1 files changed, 25 insertions, 23 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index be7a1dc196..71481ec730 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -30,13 +30,13 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortStringTokenizer;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.message.InboundMessage;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -109,7 +109,7 @@ public class TopicExchange extends AbstractExchange
private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
- private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
+ private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
public static class Binding
{
@@ -160,7 +160,7 @@ public class TopicExchange extends AbstractExchange
private final class TopicExchangeResult implements TopicMatcherResult
{
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
- private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
+ private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
public void addUnfilteredQueue(AMQQueue queue)
{
@@ -190,12 +190,12 @@ public class TopicExchange extends AbstractExchange
}
- public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
if(filters == null)
{
- filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
+ filters = new ConcurrentHashMap<MessageFilter,Integer>();
_filteredQueues.put(queue, filters);
}
Integer instances = filters.get(filter);
@@ -210,9 +210,9 @@ public class TopicExchange extends AbstractExchange
}
- public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ public void removeFilteredQueue(AMQQueue queue, MessageFilter filter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
if(filters != null)
{
Integer instances = filters.get(filter);
@@ -237,11 +237,11 @@ public class TopicExchange extends AbstractExchange
}
public void replaceQueueFilter(AMQQueue queue,
- MessageFilter<RuntimeException> oldFilter,
- MessageFilter<RuntimeException> newFilter)
+ MessageFilter oldFilter,
+ MessageFilter newFilter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
- Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters);
Integer oldFilterInstances = filters.get(oldFilter);
if(oldFilterInstances == 1)
{
@@ -263,7 +263,7 @@ public class TopicExchange extends AbstractExchange
_filteredQueues.put(queue,newFilters);
}
- public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues)
{
if(queues == null)
{
@@ -284,11 +284,11 @@ public class TopicExchange extends AbstractExchange
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
- for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
+ for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet())
{
if(!queues.contains(entry.getKey()))
{
- for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
+ for(MessageFilter filter : entry.getValue().keySet())
{
if(filter.matches(msg))
{
@@ -456,18 +456,18 @@ public class TopicExchange extends AbstractExchange
}
- private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
+ private JMSSelectorFilter createSelectorFilter(final FieldTable args)
throws AMQException
{
final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
- WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
+ WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
JMSSelectorFilter selector = null;
if(selectorRef == null || (selector = selectorRef.get())==null)
{
- selector = new JMSSelectorFilter<RuntimeException>(selectorString);
- _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
+ selector = new JMSSelectorFilter(selectorString);
+ _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
}
return selector;
}
@@ -528,10 +528,12 @@ public class TopicExchange extends AbstractExchange
return normalizedString;
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
- final AMQShortString routingKey = payload.getRoutingKey();
+ final AMQShortString routingKey = payload.getRoutingKey() == null
+ ? AMQShortString.EMPTY_STRING
+ : new AMQShortString(payload.getRoutingKey());
// The copy here is unfortunate, but not too bad relevant to the amount of
// things created and copied in getMatchedQueues
@@ -543,7 +545,7 @@ public class TopicExchange extends AbstractExchange
_logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
}
- payload.enqueue(queues);
+ return queues;
}
@@ -646,7 +648,7 @@ public class TopicExchange extends AbstractExchange
}
}
- private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
+ private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);