summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java582
1 files changed, 317 insertions, 265 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 3ade1ee7f0..8d3110ef18 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -31,7 +32,12 @@ import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.topic.TopicParser;
+import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -43,6 +49,9 @@ import javax.management.openmbean.TabularDataSupport;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.ref.WeakReference;
public class TopicExchange extends AbstractExchange
{
@@ -80,22 +89,204 @@ public class TopicExchange extends AbstractExchange
private static final Logger _logger = Logger.getLogger(TopicExchange.class);
+/*
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+*/
// private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final byte TOPIC_SEPARATOR = (byte)'.';
private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
- private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized =
- new ConcurrentHashMap<AMQShortString, AMQShortString[]>();
+
private static final byte HASH_BYTE = (byte)'#';
private static final byte STAR_BYTE = (byte)'*';
+ private final TopicParser _parser = new TopicParser();
+
+ private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
+ new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
+
+ private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
+
+ private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
+
+ public static class Binding
+ {
+ private final AMQShortString _bindingKey;
+ private final AMQQueue _queue;
+
+ public Binding(AMQShortString bindingKey, AMQQueue queue)
+ {
+ _bindingKey = bindingKey;
+ _queue = queue;
+ }
+
+ public AMQShortString getBindingKey()
+ {
+ return _bindingKey;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public int hashCode()
+ {
+ return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 + _queue.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ if(this == o)
+ {
+ return true;
+ }
+ if(o instanceof Binding)
+ {
+ Binding other = (Binding) o;
+ return (_queue == other._queue)
+ && ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey));
+ }
+ return false;
+ }
+ }
+
+
+
+ private final class TopicExchangeResult implements TopicMatcherResult
+ {
+ private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
+ private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
+
+ public void addUnfilteredQueue(AMQQueue queue)
+ {
+ Integer instances = _unfilteredQueues.get(queue);
+ if(instances == null)
+ {
+ _unfilteredQueues.put(queue, 1);
+ }
+ else
+ {
+ _unfilteredQueues.put(queue, instances + 1);
+ }
+ }
+
+ public void removeUnfilteredQueue(AMQQueue queue)
+ {
+ Integer instances = _unfilteredQueues.get(queue);
+ if(instances == 1)
+ {
+ _unfilteredQueues.remove(queue);
+ }
+ else
+ {
+ _unfilteredQueues.put(queue,instances - 1);
+ }
+
+ }
+
+
+ public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ {
+ Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ if(filters == null)
+ {
+ filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
+ _filteredQueues.put(queue, filters);
+ }
+ Integer instances = filters.get(filter);
+ if(instances == null)
+ {
+ filters.put(filter,1);
+ }
+ else
+ {
+ filters.put(filter, instances + 1);
+ }
+
+ }
+
+ public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ {
+ Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ if(filters != null)
+ {
+ Integer instances = filters.get(filter);
+ if(instances == 1)
+ {
+ filters.remove(filter);
+ if(filters.isEmpty())
+ {
+ _filteredQueues.remove(queue);
+ }
+ }
+ else if(instances != null)
+ {
+ filters.put(filter, instances - 1);
+ }
+
+ }
+
+ }
+
+ public void replaceQueueFilter(AMQQueue queue,
+ MessageFilter<RuntimeException> oldFilter,
+ MessageFilter<RuntimeException> newFilter)
+ {
+ Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
+ Integer oldFilterInstances = filters.get(oldFilter);
+ if(oldFilterInstances == 1)
+ {
+ newFilters.remove(oldFilter);
+ }
+ else
+ {
+ newFilters.put(oldFilter, oldFilterInstances-1);
+ }
+ Integer newFilterInstances = filters.get(newFilter);
+ if(newFilterInstances == null)
+ {
+ newFilters.put(newFilter, 1);
+ }
+ else
+ {
+ newFilters.put(newFilter, newFilterInstances+1);
+ }
+ _filteredQueues.put(queue,newFilters);
+ }
+
+ public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues)
+ {
+ queues.addAll(_unfilteredQueues.keySet());
+ if(!_filteredQueues.isEmpty())
+ {
+ for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
+ {
+ if(!queues.contains(entry.getKey()))
+ {
+ for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
+ {
+ if(filter.matches(msg))
+ {
+ queues.add(entry.getKey());
+ }
+ }
+ }
+ }
+ }
+ return queues;
+ }
+
+ }
+
+
/** TopicExchangeMBean class implements the management interface for the Topic exchanges. */
@MBeanDescription("Management Bean for Topic Exchange")
private final class TopicExchangeMBean extends ExchangeMBean
@@ -112,20 +303,24 @@ public class TopicExchange extends AbstractExchange
public TabularData bindings() throws OpenDataException
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
- for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet())
+ Map<String, List<String>> bindingData = new HashMap<String, List<String>>();
+ for (Binding binding : _bindings.keySet())
{
- AMQShortString key = entry.getKey();
- List<String> queueList = new ArrayList<String>();
-
- List<AMQQueue> queues = getMatchedQueues(key);
- for (AMQQueue q : queues)
+ String key = binding.getBindingKey().toString();
+ List<String> queueNames = bindingData.get(key);
+ if(queueNames == null)
{
- queueList.add(q.getName().toString());
+ queueNames = new ArrayList<String>();
+ bindingData.put(key, queueNames);
}
+ queueNames.add(binding.getQueue().getName().toString());
- Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[queueList.size()])};
- CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
- _bindingList.put(bindingData);
+ }
+ for(Map.Entry<String, List<String>> entry : bindingData.entrySet())
+ {
+ Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]) };
+ CompositeData bindingCompositeData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
+ _bindingList.put(bindingCompositeData);
}
return _bindingList;
@@ -163,73 +358,106 @@ public class TopicExchange extends AbstractExchange
_logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
- // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
- List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
-
-
+ AMQShortString routingKey;
-
-
-
- // if we got null back, no previous value was associated with the specified routing key hence
- // we need to read back the new value just put into the map
- if (queueList == null)
+ if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
{
- queueList = _bindingKey2queues.get(rKey);
+ routingKey = normalize(rKey);
+ }
+ else
+ {
+ routingKey = rKey;
}
+ Binding binding = new Binding(rKey, queue);
-
- if (!queueList.contains(queue))
+ if(_bindings.containsKey(binding))
{
- queueList.add(queue);
+ FieldTable oldArgs = _bindings.get(binding);
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
-
- if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ if(argumentsContainSelector(args))
{
- AMQShortString routingKey = normalize(rKey);
- List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
-
- if(queueList2 == null)
+ if(argumentsContainSelector(oldArgs))
{
- queueList2 = _wildCardBindingKey2queues.get(routingKey);
- AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR);
-
- ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens());
-
- while (keyTok.hasMoreTokens())
- {
- keyTokList.add(keyTok.nextToken());
- }
-
- _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()]));
+ result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args));
+ }
+ else
+ {
+ result.addFilteredQueue(queue,createSelectorFilter(args));
+ result.removeUnfilteredQueue(queue);
}
- queueList2.add(queue);
-
}
else
{
- List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>());
- if(queueList2 == null)
+ if(argumentsContainSelector(oldArgs))
+ {
+ result.addUnfilteredQueue(queue);
+ result.removeFilteredQueue(queue, createSelectorFilter(oldArgs));
+ }
+ else
{
- queueList2 = _simpleBindingKey2queues.get(rKey);
+ // TODO - fix control flow
+ return;
}
- queueList2.add(queue);
+ }
+ }
+ else
+ {
+
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
+ if(result == null)
+ {
+ result = new TopicExchangeResult();
+ if(argumentsContainSelector(args))
+ {
+ result.addFilteredQueue(queue, createSelectorFilter(args));
+ }
+ else
+ {
+ result.addUnfilteredQueue(queue);
+ }
+ _parser.addBinding(routingKey, result);
+ _topicExchangeResults.put(routingKey,result);
}
+ else
+ {
+ if(argumentsContainSelector(args))
+ {
+ result.addFilteredQueue(queue, createSelectorFilter(args));
+ }
+ else
+ {
+ result.addUnfilteredQueue(queue);
+ }
+ }
+ _bindings.put(binding, args);
+ }
+ }
+ private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
+ throws AMQException
+ {
- }
- else if (_logger.isDebugEnabled())
+ final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+ WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
+ JMSSelectorFilter selector = null;
+
+ if(selectorRef == null || (selector = selectorRef.get())==null)
{
- _logger.debug("Queue " + queue + " is already registered with routing key " + rKey);
+ selector = new JMSSelectorFilter<RuntimeException>(selectorString);
+ _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
}
+ return selector;
+ }
-
-
+ private static boolean argumentsContainSelector(final FieldTable args)
+ {
+ return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0;
}
private AMQShortString normalize(AMQShortString routingKey)
@@ -279,16 +507,6 @@ public class TopicExchange extends AbstractExchange
AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
-/*
- StringBuilder sb = new StringBuilder();
- for (AMQShortString s : subscriptionList)
- {
- sb.append(s);
- sb.append(TOPIC_SEPARATOR);
- }
-
- sb.deleteCharAt(sb.length() - 1);
-*/
return normalizedString;
}
@@ -298,11 +516,11 @@ public class TopicExchange extends AbstractExchange
final AMQShortString routingKey = payload.getRoutingKey();
- List<AMQQueue> queues = getMatchedQueues(routingKey);
+ Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey);
if(queues == null || queues.isEmpty())
{
- _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes - " + _bindingKey2queues);
+ _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
}
payload.enqueue(queues);
@@ -316,23 +534,29 @@ public class TopicExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
+ Binding binding = new Binding(routingKey, queue);
- return (queues != null) && queues.contains(queue);
+ return _bindings.containsKey(binding);
}
public boolean isBound(AMQShortString routingKey)
{
- List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey));
+ for(Binding b : _bindings.keySet())
+ {
+ if(b.getBindingKey().equals(routingKey))
+ {
+ return true;
+ }
+ }
- return (queues != null) && !queues.isEmpty();
+ return false;
}
public boolean isBound(AMQQueue queue)
{
- for (List<AMQQueue> queues : _bindingKey2queues.values())
+ for(Binding b : _bindings.keySet())
{
- if (queues.contains(queue))
+ if(b.getQueue().equals(queue))
{
return true;
}
@@ -343,7 +567,7 @@ public class TopicExchange extends AbstractExchange
public boolean hasBindings()
{
- return !_bindingKey2queues.isEmpty();
+ return !_bindings.isEmpty();
}
public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
@@ -351,52 +575,27 @@ public class TopicExchange extends AbstractExchange
assert queue != null;
assert rKey != null;
- List<AMQQueue> queues = _bindingKey2queues.get(rKey);
- if (queues == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + rKey + ". No queue was registered with that _routing key");
+ Binding binding = new Binding(rKey, queue);
- }
- boolean removedQ = queues.remove(queue);
- if (!removedQ)
+ if (!_bindings.containsKey(binding))
{
- throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + rKey);
+ throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue.getName() + " was not registered with exchange " + this.getName()
+ + " with routing key " + rKey + ".");
}
-
- if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
+ FieldTable bindingArgs = _bindings.remove(binding);
+ AMQShortString bindingKey = normalize(rKey);
+ TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+ if(argumentsContainSelector(bindingArgs))
{
- AMQShortString bindingKey = normalize(rKey);
- List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey);
- queues2.remove(queue);
- if(queues2.isEmpty())
- {
- _wildCardBindingKey2queues.remove(bindingKey);
- _bindingKey2Tokenized.remove(bindingKey);
- }
-
+ result.removeFilteredQueue(queue, createSelectorFilter(bindingArgs));
}
else
{
- List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey);
- queues2.remove(queue);
- if(queues2.isEmpty())
- {
- _simpleBindingKey2queues.remove(rKey);
- }
-
+ result.removeUnfilteredQueue(queue);
}
-
-
-
- if (queues.isEmpty())
- {
- _bindingKey2queues.remove(rKey);
- }
}
protected ExchangeMBean createMBean() throws AMQException
@@ -412,172 +611,25 @@ public class TopicExchange extends AbstractExchange
}
}
- public Map<AMQShortString, List<AMQQueue>> getBindings()
+ private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
{
- return _bindingKey2queues;
- }
-
- private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
- {
-
- List<AMQQueue> list = null;
- if(!_wildCardBindingKey2queues.isEmpty())
+ Collection<TopicMatcherResult> results = _parser.parse(routingKey);
+ if(results.isEmpty())
{
-
-
- AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
-
- final int routingTokensCount = routingTokens.countTokens();
-
-
- AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount];
-
- if(routingTokensCount == 1)
- {
- routingkeyTokens[0] =routingKey;
- }
- else
- {
-
-
- int token = 0;
- while (routingTokens.hasMoreTokens())
- {
-
- AMQShortString next = routingTokens.nextToken();
- /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH))
- {
- continue;
- }
- */
-
- routingkeyTokens[token++] = next;
- }
- }
-
- _logger.info("Routing key tokens: " + Arrays.asList(routingkeyTokens));
-
- for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet())
- {
-
- AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey);
-
-
- boolean matching = true;
- boolean done = false;
-
- int depthPlusRoutingSkip = 0;
- int depthPlusQueueSkip = 0;
-
- final int bindingKeyTokensCount = bindingKeyTokens.length;
-
- while (matching && !done)
- {
-
- if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip))
- {
- done = true;
-
- // if it was the routing key that ran out of digits
- if (routingTokensCount == depthPlusRoutingSkip)
- {
- if (bindingKeyTokensCount > depthPlusQueueSkip)
- { // a hash and it is the last entry
- matching =
- bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)
- && (bindingKeyTokensCount == (depthPlusQueueSkip + 1));
- }
- }
- else if (routingTokensCount > depthPlusRoutingSkip)
- {
- // There is still more routing key to check
- matching = false;
- }
-
- continue;
- }
-
- // if the values on the two topics don't match
- if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip]))
- {
- if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN))
- {
- depthPlusQueueSkip++;
- depthPlusRoutingSkip++;
-
- continue;
- }
- else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN))
- {
- // Is this a # at the end
- if (bindingKeyTokensCount == (depthPlusQueueSkip + 1))
- {
- done = true;
-
- continue;
- }
-
- // otherwise # in the middle
- while (routingTokensCount > depthPlusRoutingSkip)
- {
- if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1]))
- {
- depthPlusQueueSkip += 2;
- depthPlusRoutingSkip++;
-
- break;
- }
-
- depthPlusRoutingSkip++;
- }
-
- continue;
- }
-
- matching = false;
- }
-
- depthPlusQueueSkip++;
- depthPlusRoutingSkip++;
- }
-
- if (matching)
- {
- if(list == null)
- {
- list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey));
- }
- else
- {
- list.addAll(_wildCardBindingKey2queues.get(bindingKey));
- }
- }
- }
-
+ return Collections.EMPTY_SET;
}
- if(!_simpleBindingKey2queues.isEmpty())
+ else
{
- List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey);
- if(list == null)
- {
- if(queues == null)
- {
- list = Collections.EMPTY_LIST;
- }
- else
- {
- list = new ArrayList<AMQQueue>(queues);
- }
- }
- else if(queues != null)
+ Set<AMQQueue> queues = new HashSet<AMQQueue>();
+ for(TopicMatcherResult result : results)
{
- list.addAll(queues);
- }
+ ((TopicExchangeResult)result).processMessage(message, queues);
+ }
+ return queues;
}
- return list;
}
}