diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java | 670 |
1 files changed, 670 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java new file mode 100644 index 0000000000..bc303a219d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -0,0 +1,670 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortStringTokenizer; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.topic.TopicParser; +import org.apache.qpid.server.exchange.topic.TopicMatcherResult; +import org.apache.qpid.server.filter.MessageFilter; +import org.apache.qpid.server.filter.JMSSelectorFilter; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.lang.ref.WeakReference; + +public class TopicExchange extends AbstractExchange +{ + + public static final ExchangeType<TopicExchange> TYPE = new ExchangeType<TopicExchange>() + { + + public AMQShortString getName() + { + return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + } + + public Class<TopicExchange> getExchangeClass() + { + return TopicExchange.class; + } + + public TopicExchange newInstance(VirtualHost host, + AMQShortString name, + boolean durable, + int ticket, + boolean autoDelete) throws AMQException + { + TopicExchange exch = new TopicExchange(); + exch.initialise(host, name, durable, ticket, autoDelete); + return exch; + } + + public AMQShortString getDefaultExchangeName() + { + return ExchangeDefaults.TOPIC_EXCHANGE_NAME; + } + }; + + + private static final Logger _logger = Logger.getLogger(TopicExchange.class); + +/* + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); +*/ + // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); + private static final byte TOPIC_SEPARATOR = (byte)'.'; + private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString("."); + private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*"); + private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#"); + + private static final byte HASH_BYTE = (byte)'#'; + private static final byte STAR_BYTE = (byte)'*'; + + private final TopicParser _parser = new TopicParser(); + + private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults = + new ConcurrentHashMap<AMQShortString, TopicExchangeResult>(); + + private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>(); + + private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>(); + + public static class Binding + { + private final AMQShortString _bindingKey; + private final AMQQueue _queue; + private final FieldTable _args; + + public Binding(AMQShortString bindingKey, AMQQueue queue, FieldTable args) + { + _bindingKey = bindingKey; + _queue = queue; + _args = args; + } + + public AMQShortString getBindingKey() + { + return _bindingKey; + } + + public AMQQueue getQueue() + { + return _queue; + } + + public int hashCode() + { + return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 +_queue.hashCode(); + } + + public boolean equals(Object o) + { + if(this == o) + { + return true; + } + if(o instanceof Binding) + { + Binding other = (Binding) o; + return (_queue == other._queue) + && ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey)); + } + return false; + } + } + + + + private final class TopicExchangeResult implements TopicMatcherResult + { + private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); + private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>(); + + public void addUnfilteredQueue(AMQQueue queue) + { + Integer instances = _unfilteredQueues.get(queue); + if(instances == null) + { + _unfilteredQueues.put(queue, 1); + } + else + { + _unfilteredQueues.put(queue, instances + 1); + } + } + + public void removeUnfilteredQueue(AMQQueue queue) + { + Integer instances = _unfilteredQueues.get(queue); + if(instances == 1) + { + _unfilteredQueues.remove(queue); + } + else + { + _unfilteredQueues.put(queue,instances - 1); + } + + } + + + public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter) + { + Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + if(filters == null) + { + filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(); + _filteredQueues.put(queue, filters); + } + Integer instances = filters.get(filter); + if(instances == null) + { + filters.put(filter,1); + } + else + { + filters.put(filter, instances + 1); + } + + } + + public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter) + { + Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + if(filters != null) + { + Integer instances = filters.get(filter); + if(instances != null) + { + if(instances == 1) + { + filters.remove(filter); + if(filters.isEmpty()) + { + _filteredQueues.remove(queue); + } + } + else + { + filters.put(filter, instances - 1); + } + } + + } + + } + + public void replaceQueueFilter(AMQQueue queue, + MessageFilter<RuntimeException> oldFilter, + MessageFilter<RuntimeException> newFilter) + { + Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters); + Integer oldFilterInstances = filters.get(oldFilter); + if(oldFilterInstances == 1) + { + newFilters.remove(oldFilter); + } + else + { + newFilters.put(oldFilter, oldFilterInstances-1); + } + Integer newFilterInstances = filters.get(newFilter); + if(newFilterInstances == null) + { + newFilters.put(newFilter, 1); + } + else + { + newFilters.put(newFilter, newFilterInstances+1); + } + _filteredQueues.put(queue,newFilters); + } + + public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues) + { + if(queues == null) + { + if(_filteredQueues.isEmpty()) + { + return new ArrayList<AMQQueue>(_unfilteredQueues.keySet()); + } + else + { + queues = new HashSet<AMQQueue>(); + } + } + else if(!(queues instanceof Set)) + { + queues = new HashSet<AMQQueue>(queues); + } + + queues.addAll(_unfilteredQueues.keySet()); + if(!_filteredQueues.isEmpty()) + { + for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet()) + { + if(!queues.contains(entry.getKey())) + { + for(MessageFilter<RuntimeException> filter : entry.getValue().keySet()) + { + if(filter.matches(msg)) + { + queues.add(entry.getKey()); + } + } + } + } + } + return queues; + } + + } + + + /** TopicExchangeMBean class implements the management interface for the Topic exchanges. */ + @MBeanDescription("Management Bean for Topic Exchange") + private final class TopicExchangeMBean extends ExchangeMBean + { + @MBeanConstructor("Creates an MBean for AMQ topic exchange") + public TopicExchangeMBean() throws JMException + { + super(); + _exchangeType = "topic"; + init(); + } + + /** returns exchange bindings in tabular form */ + public TabularData bindings() throws OpenDataException + { + _bindingList = new TabularDataSupport(_bindinglistDataType); + Map<String, List<String>> bindingData = new HashMap<String, List<String>>(); + for (Binding binding : _bindings.keySet()) + { + String key = binding.getBindingKey().toString(); + List<String> queueNames = bindingData.get(key); + if(queueNames == null) + { + queueNames = new ArrayList<String>(); + bindingData.put(key, queueNames); + } + queueNames.add(binding.getQueue().getName().toString()); + + } + for(Map.Entry<String, List<String>> entry : bindingData.entrySet()) + { + Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]) }; + CompositeData bindingCompositeData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); + _bindingList.put(bindingCompositeData); + } + + return _bindingList; + } + + public void createNewBinding(String queueName, String binding) throws JMException + { + AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName)); + if (queue == null) + { + throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } + + try + { + queue.bind(TopicExchange.this, new AMQShortString(binding), null); + } + catch (AMQException ex) + { + throw new MBeanException(ex); + } + } + + } // End of MBean class + + public AMQShortString getType() + { + return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + } + + public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + assert rKey != null; + + _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey); + + + AMQShortString routingKey; + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + routingKey = normalize(rKey); + } + else + { + routingKey = rKey; + } + + Binding binding = new Binding(rKey, queue, args); + + if(_bindings.containsKey(binding)) + { + FieldTable oldArgs = _bindings.get(binding); + TopicExchangeResult result = _topicExchangeResults.get(routingKey); + + if(argumentsContainSelector(args)) + { + if(argumentsContainSelector(oldArgs)) + { + result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args)); + } + else + { + result.addFilteredQueue(queue,createSelectorFilter(args)); + result.removeUnfilteredQueue(queue); + } + } + else + { + if(argumentsContainSelector(oldArgs)) + { + result.addUnfilteredQueue(queue); + result.removeFilteredQueue(queue, createSelectorFilter(oldArgs)); + } + else + { + // TODO - fix control flow + return; + } + } + + } + else + { + + TopicExchangeResult result = _topicExchangeResults.get(routingKey); + if(result == null) + { + result = new TopicExchangeResult(); + if(argumentsContainSelector(args)) + { + result.addFilteredQueue(queue, createSelectorFilter(args)); + } + else + { + result.addUnfilteredQueue(queue); + } + _parser.addBinding(routingKey, result); + _topicExchangeResults.put(routingKey,result); + } + else + { + if(argumentsContainSelector(args)) + { + result.addFilteredQueue(queue, createSelectorFilter(args)); + } + else + { + result.addUnfilteredQueue(queue); + } + } + _bindings.put(binding, args); + } + + + } + + private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args) + throws AMQException + { + + final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); + WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString); + JMSSelectorFilter selector = null; + + if(selectorRef == null || (selector = selectorRef.get())==null) + { + selector = new JMSSelectorFilter<RuntimeException>(selectorString); + _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector)); + } + return selector; + } + + private static boolean argumentsContainSelector(final FieldTable args) + { + return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0; + } + + private AMQShortString normalize(AMQShortString routingKey) + { + if(routingKey == null) + { + routingKey = AMQShortString.EMPTY_STRING; + } + + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>(); + + while (routingTokens.hasMoreTokens()) + { + subscriptionList.add(routingTokens.nextToken()); + } + + int size = subscriptionList.size(); + + for (int index = 0; index < size; index++) + { + // if there are more levels + if ((index + 1) < size) + { + if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN)) + { + if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN)) + { + // we don't need #.# delete this one + subscriptionList.remove(index); + size--; + // redo this normalisation + index--; + } + + if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN)) + { + // we don't want #.* swap to *.# + // remove it and put it in at index + 1 + subscriptionList.add(index + 1, subscriptionList.remove(index)); + } + } + } // if we have more levels + } + + + + AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING); + + return normalizedString; + } + + public void route(IncomingMessage payload) throws AMQException + { + + final AMQShortString routingKey = payload.getRoutingKey(); + + // The copy here is unfortunate, but not too bad relevant to the amount of + // things created and copied in getMatchedQueues + ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); + queues.addAll(getMatchedQueues(payload, routingKey)); + + if(queues == null || queues.isEmpty()) + { + _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); + } + + payload.enqueue(queues); + + } + + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) + { + Binding binding = new Binding(routingKey, queue, arguments); + if (arguments == null) + { + return _bindings.containsKey(binding); + } + else + { + FieldTable o = _bindings.get(binding); + if (o != null) + { + return o.equals(arguments); + } + else + { + return false; + } + + } + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return isBound(routingKey, null, queue); + } + + public boolean isBound(AMQShortString routingKey) + { + for(Binding b : _bindings.keySet()) + { + if(b.getBindingKey().equals(routingKey)) + { + return true; + } + } + + return false; + } + + public boolean isBound(AMQQueue queue) + { + for(Binding b : _bindings.keySet()) + { + if(b.getQueue().equals(queue)) + { + return true; + } + } + + return false; + } + + public boolean hasBindings() + { + return !_bindings.isEmpty(); + } + + public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException + { + assert queue != null; + assert rKey != null; + + Binding binding = new Binding(rKey, queue, args); + + + if (!_bindings.containsKey(binding)) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue.getName() + " was not registered with exchange " + this.getName() + + " with routing key " + rKey + "."); + } + + FieldTable bindingArgs = _bindings.remove(binding); + AMQShortString bindingKey = normalize(rKey); + TopicExchangeResult result = _topicExchangeResults.get(bindingKey); + if(argumentsContainSelector(bindingArgs)) + { + result.removeFilteredQueue(queue, createSelectorFilter(bindingArgs)); + } + else + { + result.removeUnfilteredQueue(queue); + } + + } + + protected ExchangeMBean createMBean() throws AMQException + { + try + { + return new TopicExchangeMBean(); + } + catch (JMException ex) + { + _logger.error("Exception occured in creating the topic exchenge mbean", ex); + throw new AMQException("Exception occured in creating the topic exchenge mbean", ex); + } + } + + private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey) + { + + Collection<TopicMatcherResult> results = _parser.parse(routingKey); + if(results.isEmpty()) + { + return Collections.EMPTY_SET; + } + else + { + Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>(); + for(TopicMatcherResult result : results) + { + + queues = ((TopicExchangeResult)result).processMessage(message, queues); + } + return queues; + } + + + } +} |