diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java | 582 |
1 files changed, 317 insertions, 265 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 3ade1ee7f0..8d3110ef18 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; @@ -31,7 +32,12 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.topic.TopicParser; +import org.apache.qpid.server.exchange.topic.TopicMatcherResult; +import org.apache.qpid.server.filter.MessageFilter; +import org.apache.qpid.server.filter.JMSSelectorFilter; import javax.management.JMException; import javax.management.MBeanException; @@ -43,6 +49,9 @@ import javax.management.openmbean.TabularDataSupport; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; +import java.lang.ref.WeakReference; public class TopicExchange extends AbstractExchange { @@ -80,22 +89,204 @@ public class TopicExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(TopicExchange.class); +/* private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); +*/ // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private static final byte TOPIC_SEPARATOR = (byte)'.'; private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString("."); private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*"); private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#"); - private ConcurrentHashMap<AMQShortString, AMQShortString[]> _bindingKey2Tokenized = - new ConcurrentHashMap<AMQShortString, AMQShortString[]>(); + private static final byte HASH_BYTE = (byte)'#'; private static final byte STAR_BYTE = (byte)'*'; + private final TopicParser _parser = new TopicParser(); + + private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults = + new ConcurrentHashMap<AMQShortString, TopicExchangeResult>(); + + private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>(); + + private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>(); + + public static class Binding + { + private final AMQShortString _bindingKey; + private final AMQQueue _queue; + + public Binding(AMQShortString bindingKey, AMQQueue queue) + { + _bindingKey = bindingKey; + _queue = queue; + } + + public AMQShortString getBindingKey() + { + return _bindingKey; + } + + public AMQQueue getQueue() + { + return _queue; + } + + public int hashCode() + { + return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 + _queue.hashCode(); + } + + public boolean equals(Object o) + { + if(this == o) + { + return true; + } + if(o instanceof Binding) + { + Binding other = (Binding) o; + return (_queue == other._queue) + && ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey)); + } + return false; + } + } + + + + private final class TopicExchangeResult implements TopicMatcherResult + { + private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); + private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>(); + + public void addUnfilteredQueue(AMQQueue queue) + { + Integer instances = _unfilteredQueues.get(queue); + if(instances == null) + { + _unfilteredQueues.put(queue, 1); + } + else + { + _unfilteredQueues.put(queue, instances + 1); + } + } + + public void removeUnfilteredQueue(AMQQueue queue) + { + Integer instances = _unfilteredQueues.get(queue); + if(instances == 1) + { + _unfilteredQueues.remove(queue); + } + else + { + _unfilteredQueues.put(queue,instances - 1); + } + + } + + + public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter) + { + Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + if(filters == null) + { + filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(); + _filteredQueues.put(queue, filters); + } + Integer instances = filters.get(filter); + if(instances == null) + { + filters.put(filter,1); + } + else + { + filters.put(filter, instances + 1); + } + + } + + public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter) + { + Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + if(filters != null) + { + Integer instances = filters.get(filter); + if(instances == 1) + { + filters.remove(filter); + if(filters.isEmpty()) + { + _filteredQueues.remove(queue); + } + } + else if(instances != null) + { + filters.put(filter, instances - 1); + } + + } + + } + + public void replaceQueueFilter(AMQQueue queue, + MessageFilter<RuntimeException> oldFilter, + MessageFilter<RuntimeException> newFilter) + { + Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters); + Integer oldFilterInstances = filters.get(oldFilter); + if(oldFilterInstances == 1) + { + newFilters.remove(oldFilter); + } + else + { + newFilters.put(oldFilter, oldFilterInstances-1); + } + Integer newFilterInstances = filters.get(newFilter); + if(newFilterInstances == null) + { + newFilters.put(newFilter, 1); + } + else + { + newFilters.put(newFilter, newFilterInstances+1); + } + _filteredQueues.put(queue,newFilters); + } + + public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues) + { + queues.addAll(_unfilteredQueues.keySet()); + if(!_filteredQueues.isEmpty()) + { + for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet()) + { + if(!queues.contains(entry.getKey())) + { + for(MessageFilter<RuntimeException> filter : entry.getValue().keySet()) + { + if(filter.matches(msg)) + { + queues.add(entry.getKey()); + } + } + } + } + } + return queues; + } + + } + + /** TopicExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") private final class TopicExchangeMBean extends ExchangeMBean @@ -112,20 +303,24 @@ public class TopicExchange extends AbstractExchange public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); - for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _bindingKey2queues.entrySet()) + Map<String, List<String>> bindingData = new HashMap<String, List<String>>(); + for (Binding binding : _bindings.keySet()) { - AMQShortString key = entry.getKey(); - List<String> queueList = new ArrayList<String>(); - - List<AMQQueue> queues = getMatchedQueues(key); - for (AMQQueue q : queues) + String key = binding.getBindingKey().toString(); + List<String> queueNames = bindingData.get(key); + if(queueNames == null) { - queueList.add(q.getName().toString()); + queueNames = new ArrayList<String>(); + bindingData.put(key, queueNames); } + queueNames.add(binding.getQueue().getName().toString()); - Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[queueList.size()])}; - CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); - _bindingList.put(bindingData); + } + for(Map.Entry<String, List<String>> entry : bindingData.entrySet()) + { + Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]) }; + CompositeData bindingCompositeData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); + _bindingList.put(bindingCompositeData); } return _bindingList; @@ -163,73 +358,106 @@ public class TopicExchange extends AbstractExchange _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey); - // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition - List<AMQQueue> queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>()); - - + AMQShortString routingKey; - - - - // if we got null back, no previous value was associated with the specified routing key hence - // we need to read back the new value just put into the map - if (queueList == null) + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) { - queueList = _bindingKey2queues.get(rKey); + routingKey = normalize(rKey); + } + else + { + routingKey = rKey; } + Binding binding = new Binding(rKey, queue); - - if (!queueList.contains(queue)) + if(_bindings.containsKey(binding)) { - queueList.add(queue); + FieldTable oldArgs = _bindings.get(binding); + TopicExchangeResult result = _topicExchangeResults.get(routingKey); - - if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + if(argumentsContainSelector(args)) { - AMQShortString routingKey = normalize(rKey); - List<AMQQueue> queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); - - if(queueList2 == null) + if(argumentsContainSelector(oldArgs)) { - queueList2 = _wildCardBindingKey2queues.get(routingKey); - AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR); - - ArrayList<AMQShortString> keyTokList = new ArrayList<AMQShortString>(keyTok.countTokens()); - - while (keyTok.hasMoreTokens()) - { - keyTokList.add(keyTok.nextToken()); - } - - _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()])); + result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args)); + } + else + { + result.addFilteredQueue(queue,createSelectorFilter(args)); + result.removeUnfilteredQueue(queue); } - queueList2.add(queue); - } else { - List<AMQQueue> queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList<AMQQueue>()); - if(queueList2 == null) + if(argumentsContainSelector(oldArgs)) + { + result.addUnfilteredQueue(queue); + result.removeFilteredQueue(queue, createSelectorFilter(oldArgs)); + } + else { - queueList2 = _simpleBindingKey2queues.get(rKey); + // TODO - fix control flow + return; } - queueList2.add(queue); + } + } + else + { + + TopicExchangeResult result = _topicExchangeResults.get(routingKey); + if(result == null) + { + result = new TopicExchangeResult(); + if(argumentsContainSelector(args)) + { + result.addFilteredQueue(queue, createSelectorFilter(args)); + } + else + { + result.addUnfilteredQueue(queue); + } + _parser.addBinding(routingKey, result); + _topicExchangeResults.put(routingKey,result); } + else + { + if(argumentsContainSelector(args)) + { + result.addFilteredQueue(queue, createSelectorFilter(args)); + } + else + { + result.addUnfilteredQueue(queue); + } + } + _bindings.put(binding, args); + } + } + private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args) + throws AMQException + { - } - else if (_logger.isDebugEnabled()) + final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); + WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString); + JMSSelectorFilter selector = null; + + if(selectorRef == null || (selector = selectorRef.get())==null) { - _logger.debug("Queue " + queue + " is already registered with routing key " + rKey); + selector = new JMSSelectorFilter<RuntimeException>(selectorString); + _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector)); } + return selector; + } - - + private static boolean argumentsContainSelector(final FieldTable args) + { + return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0; } private AMQShortString normalize(AMQShortString routingKey) @@ -279,16 +507,6 @@ public class TopicExchange extends AbstractExchange AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING); -/* - StringBuilder sb = new StringBuilder(); - for (AMQShortString s : subscriptionList) - { - sb.append(s); - sb.append(TOPIC_SEPARATOR); - } - - sb.deleteCharAt(sb.length() - 1); -*/ return normalizedString; } @@ -298,11 +516,11 @@ public class TopicExchange extends AbstractExchange final AMQShortString routingKey = payload.getRoutingKey(); - List<AMQQueue> queues = getMatchedQueues(routingKey); + Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey); if(queues == null || queues.isEmpty()) { - _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes - " + _bindingKey2queues); + _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); } payload.enqueue(queues); @@ -316,23 +534,29 @@ public class TopicExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey)); + Binding binding = new Binding(routingKey, queue); - return (queues != null) && queues.contains(queue); + return _bindings.containsKey(binding); } public boolean isBound(AMQShortString routingKey) { - List<AMQQueue> queues = _bindingKey2queues.get(normalize(routingKey)); + for(Binding b : _bindings.keySet()) + { + if(b.getBindingKey().equals(routingKey)) + { + return true; + } + } - return (queues != null) && !queues.isEmpty(); + return false; } public boolean isBound(AMQQueue queue) { - for (List<AMQQueue> queues : _bindingKey2queues.values()) + for(Binding b : _bindings.keySet()) { - if (queues.contains(queue)) + if(b.getQueue().equals(queue)) { return true; } @@ -343,7 +567,7 @@ public class TopicExchange extends AbstractExchange public boolean hasBindings() { - return !_bindingKey2queues.isEmpty(); + return !_bindings.isEmpty(); } public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException @@ -351,52 +575,27 @@ public class TopicExchange extends AbstractExchange assert queue != null; assert rKey != null; - List<AMQQueue> queues = _bindingKey2queues.get(rKey); - if (queues == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + rKey + ". No queue was registered with that _routing key"); + Binding binding = new Binding(rKey, queue); - } - boolean removedQ = queues.remove(queue); - if (!removedQ) + if (!_bindings.containsKey(binding)) { - throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + rKey); + throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue.getName() + " was not registered with exchange " + this.getName() + + " with routing key " + rKey + "."); } - - if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + FieldTable bindingArgs = _bindings.remove(binding); + AMQShortString bindingKey = normalize(rKey); + TopicExchangeResult result = _topicExchangeResults.get(bindingKey); + if(argumentsContainSelector(bindingArgs)) { - AMQShortString bindingKey = normalize(rKey); - List<AMQQueue> queues2 = _wildCardBindingKey2queues.get(bindingKey); - queues2.remove(queue); - if(queues2.isEmpty()) - { - _wildCardBindingKey2queues.remove(bindingKey); - _bindingKey2Tokenized.remove(bindingKey); - } - + result.removeFilteredQueue(queue, createSelectorFilter(bindingArgs)); } else { - List<AMQQueue> queues2 = _simpleBindingKey2queues.get(rKey); - queues2.remove(queue); - if(queues2.isEmpty()) - { - _simpleBindingKey2queues.remove(rKey); - } - + result.removeUnfilteredQueue(queue); } - - - - if (queues.isEmpty()) - { - _bindingKey2queues.remove(rKey); - } } protected ExchangeMBean createMBean() throws AMQException @@ -412,172 +611,25 @@ public class TopicExchange extends AbstractExchange } } - public Map<AMQShortString, List<AMQQueue>> getBindings() + private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey) { - return _bindingKey2queues; - } - - private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) - { - - List<AMQQueue> list = null; - if(!_wildCardBindingKey2queues.isEmpty()) + Collection<TopicMatcherResult> results = _parser.parse(routingKey); + if(results.isEmpty()) { - - - AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); - - final int routingTokensCount = routingTokens.countTokens(); - - - AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount]; - - if(routingTokensCount == 1) - { - routingkeyTokens[0] =routingKey; - } - else - { - - - int token = 0; - while (routingTokens.hasMoreTokens()) - { - - AMQShortString next = routingTokens.nextToken(); - /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH)) - { - continue; - } - */ - - routingkeyTokens[token++] = next; - } - } - - _logger.info("Routing key tokens: " + Arrays.asList(routingkeyTokens)); - - for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet()) - { - - AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey); - - - boolean matching = true; - boolean done = false; - - int depthPlusRoutingSkip = 0; - int depthPlusQueueSkip = 0; - - final int bindingKeyTokensCount = bindingKeyTokens.length; - - while (matching && !done) - { - - if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip)) - { - done = true; - - // if it was the routing key that ran out of digits - if (routingTokensCount == depthPlusRoutingSkip) - { - if (bindingKeyTokensCount > depthPlusQueueSkip) - { // a hash and it is the last entry - matching = - bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN) - && (bindingKeyTokensCount == (depthPlusQueueSkip + 1)); - } - } - else if (routingTokensCount > depthPlusRoutingSkip) - { - // There is still more routing key to check - matching = false; - } - - continue; - } - - // if the values on the two topics don't match - if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip])) - { - if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN)) - { - depthPlusQueueSkip++; - depthPlusRoutingSkip++; - - continue; - } - else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)) - { - // Is this a # at the end - if (bindingKeyTokensCount == (depthPlusQueueSkip + 1)) - { - done = true; - - continue; - } - - // otherwise # in the middle - while (routingTokensCount > depthPlusRoutingSkip) - { - if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1])) - { - depthPlusQueueSkip += 2; - depthPlusRoutingSkip++; - - break; - } - - depthPlusRoutingSkip++; - } - - continue; - } - - matching = false; - } - - depthPlusQueueSkip++; - depthPlusRoutingSkip++; - } - - if (matching) - { - if(list == null) - { - list = new ArrayList<AMQQueue>(_wildCardBindingKey2queues.get(bindingKey)); - } - else - { - list.addAll(_wildCardBindingKey2queues.get(bindingKey)); - } - } - } - + return Collections.EMPTY_SET; } - if(!_simpleBindingKey2queues.isEmpty()) + else { - List<AMQQueue> queues = _simpleBindingKey2queues.get(routingKey); - if(list == null) - { - if(queues == null) - { - list = Collections.EMPTY_LIST; - } - else - { - list = new ArrayList<AMQQueue>(queues); - } - } - else if(queues != null) + Set<AMQQueue> queues = new HashSet<AMQQueue>(); + for(TopicMatcherResult result : results) { - list.addAll(queues); - } + ((TopicExchangeResult)result).processMessage(message, queues); + } + return queues; } - return list; } } |