diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/topic')
7 files changed, 0 insertions, 1347 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java deleted file mode 100644 index 41dc0d749a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange.topic; - -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.message.InboundMessage; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; - -public final class TopicExchangeResult implements TopicMatcherResult -{ - private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); - private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); - private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>(); - - public void addUnfilteredQueue(AMQQueue queue) - { - Integer instances = _unfilteredQueues.get(queue); - if(instances == null) - { - _unfilteredQueues.put(queue, 1); - } - else - { - _unfilteredQueues.put(queue, instances + 1); - } - } - - public void removeUnfilteredQueue(AMQQueue queue) - { - Integer instances = _unfilteredQueues.get(queue); - if(instances == 1) - { - _unfilteredQueues.remove(queue); - } - else - { - _unfilteredQueues.put(queue,instances - 1); - } - - } - - public Collection<AMQQueue> getUnfilteredQueues() - { - return _unfilteredQueues.keySet(); - } - - public void addBinding(Binding binding) - { - _bindings.add(binding); - } - - public void removeBinding(Binding binding) - { - _bindings.remove(binding); - } - - public List<Binding> getBindings() - { - return new ArrayList<Binding>(_bindings); - } - - public void addFilteredQueue(AMQQueue queue, MessageFilter filter) - { - Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); - if(filters == null) - { - filters = new ConcurrentHashMap<MessageFilter,Integer>(); - _filteredQueues.put(queue, filters); - } - Integer instances = filters.get(filter); - if(instances == null) - { - filters.put(filter,1); - } - else - { - filters.put(filter, instances + 1); - } - - } - - public void removeFilteredQueue(AMQQueue queue, MessageFilter filter) - { - Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); - if(filters != null) - { - Integer instances = filters.get(filter); - if(instances != null) - { - if(instances == 1) - { - filters.remove(filter); - if(filters.isEmpty()) - { - _filteredQueues.remove(queue); - } - } - else - { - filters.put(filter, instances - 1); - } - } - - } - - } - - public void replaceQueueFilter(AMQQueue queue, - MessageFilter oldFilter, - MessageFilter newFilter) - { - Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); - Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters); - Integer oldFilterInstances = filters.get(oldFilter); - if(oldFilterInstances == 1) - { - newFilters.remove(oldFilter); - } - else - { - newFilters.put(oldFilter, oldFilterInstances-1); - } - Integer newFilterInstances = filters.get(newFilter); - if(newFilterInstances == null) - { - newFilters.put(newFilter, 1); - } - else - { - newFilters.put(newFilter, newFilterInstances+1); - } - _filteredQueues.put(queue,newFilters); - } - - public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues) - { - if(queues == null) - { - if(_filteredQueues.isEmpty()) - { - return new ArrayList<AMQQueue>(_unfilteredQueues.keySet()); - } - else - { - queues = new HashSet<AMQQueue>(); - } - } - else if(!(queues instanceof Set)) - { - queues = new HashSet<AMQQueue>(queues); - } - - queues.addAll(_unfilteredQueues.keySet()); - if(!_filteredQueues.isEmpty()) - { - for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet()) - { - if(!queues.contains(entry.getKey())) - { - for(MessageFilter filter : entry.getValue().keySet()) - { - if(filter.matches(msg)) - { - queues.add(entry.getKey()); - } - } - } - } - } - return queues; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java deleted file mode 100644 index 36076cf75b..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java +++ /dev/null @@ -1,295 +0,0 @@ -package org.apache.qpid.server.exchange.topic; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AMQShortStringTokenizer; - -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class TopicMatcherDFAState -{ - private static final AtomicInteger stateId = new AtomicInteger(); - - private final int _id = stateId.incrementAndGet(); - - private final Collection<TopicMatcherResult> _results; - private final Map<TopicWord, TopicMatcherDFAState> _nextStateMap; - private static final byte TOPIC_DELIMITTER = (byte)'.'; - - - public TopicMatcherDFAState(Map<TopicWord, TopicMatcherDFAState> nextStateMap, - Collection<TopicMatcherResult> results ) - { - _nextStateMap = nextStateMap; - _results = results; - } - - - public TopicMatcherDFAState nextState(TopicWord word) - { - final TopicMatcherDFAState nextState = _nextStateMap.get(word); - return nextState == null ? _nextStateMap.get(TopicWord.ANY_WORD) : nextState; - } - - public Collection<TopicMatcherResult> terminate() - { - return _results; - } - - - public Collection<TopicMatcherResult> parse(TopicWordDictionary dictionary, AMQShortString routingKey) - { - return parse(dictionary, routingKey.tokenize(TOPIC_DELIMITTER)); - } - - private Collection<TopicMatcherResult> parse(final TopicWordDictionary dictionary, - final AMQShortStringTokenizer tokens) - { - if(!tokens.hasMoreTokens()) - { - return _results; - } - TopicWord word = dictionary.getWord(tokens.nextToken()); - TopicMatcherDFAState nextState = _nextStateMap.get(word); - if(nextState == null && word != TopicWord.ANY_WORD) - { - nextState = _nextStateMap.get(TopicWord.ANY_WORD); - } - if(nextState == null) - { - return Collections.EMPTY_SET; - } - // Shortcut if we are at a looping terminal state - if((nextState == this) && (_nextStateMap.size() == 1) && _nextStateMap.containsKey(TopicWord.ANY_WORD)) - { - return _results; - } - - return nextState.parse(dictionary, tokens); - - } - - - public TopicMatcherDFAState mergeStateMachines(TopicMatcherDFAState otherStateMachine) - { - Map<Set<TopicMatcherDFAState>, TopicMatcherDFAState> newStateMap= new HashMap<Set<TopicMatcherDFAState>, TopicMatcherDFAState>(); - - Collection<TopicMatcherResult> results; - - if(_results.isEmpty()) - { - results = otherStateMachine._results; - } - else if(otherStateMachine._results.isEmpty()) - { - results = _results; - } - else - { - results = new HashSet<TopicMatcherResult>(_results); - results.addAll(otherStateMachine._results); - } - - - final Map<TopicWord, TopicMatcherDFAState> newNextStateMap = new HashMap<TopicWord, TopicMatcherDFAState>(); - - TopicMatcherDFAState newState = new TopicMatcherDFAState(newNextStateMap, results); - - - Set<TopicMatcherDFAState> oldStates = new HashSet<TopicMatcherDFAState>(); - oldStates.add(this); - oldStates.add(otherStateMachine); - - newStateMap.put(oldStates, newState); - - mergeStateMachines(oldStates, newNextStateMap, newStateMap); - - return newState; - - } - - private static void mergeStateMachines( - final Set<TopicMatcherDFAState> oldStates, - final Map<TopicWord, TopicMatcherDFAState> newNextStateMap, - final Map<Set<TopicMatcherDFAState>, TopicMatcherDFAState> newStateMap) - { - Map<TopicWord, Set<TopicMatcherDFAState>> nfaMap = new HashMap<TopicWord, Set<TopicMatcherDFAState>>(); - - for(TopicMatcherDFAState state : oldStates) - { - Map<TopicWord, TopicMatcherDFAState> map = state._nextStateMap; - for(Map.Entry<TopicWord, TopicMatcherDFAState> entry : map.entrySet()) - { - Set<TopicMatcherDFAState> states = nfaMap.get(entry.getKey()); - if(states == null) - { - states = new HashSet<TopicMatcherDFAState>(); - nfaMap.put(entry.getKey(), states); - } - states.add(entry.getValue()); - } - } - - Set<TopicMatcherDFAState> anyWordStates = nfaMap.get(TopicWord.ANY_WORD); - - for(Map.Entry<TopicWord, Set<TopicMatcherDFAState>> transition : nfaMap.entrySet()) - { - Set<TopicMatcherDFAState> destinations = transition.getValue(); - - if(anyWordStates != null) - { - destinations.addAll(anyWordStates); - } - - TopicMatcherDFAState nextState = newStateMap.get(destinations); - if(nextState == null) - { - - if(destinations.size() == 1) - { - nextState = destinations.iterator().next(); - newStateMap.put(destinations, nextState); - } - else - { - Collection<TopicMatcherResult> results; - - Set<Collection<TopicMatcherResult>> resultSets = new HashSet<Collection<TopicMatcherResult>>(); - for(TopicMatcherDFAState destination : destinations) - { - resultSets.add(destination._results); - } - resultSets.remove(Collections.EMPTY_SET); - if(resultSets.size() == 0) - { - results = Collections.EMPTY_SET; - } - else if(resultSets.size() == 1) - { - results = resultSets.iterator().next(); - } - else - { - results = new HashSet<TopicMatcherResult>(); - for(Collection<TopicMatcherResult> oldResult : resultSets) - { - results.addAll(oldResult); - } - } - - final Map<TopicWord, TopicMatcherDFAState> nextStateMap = new HashMap<TopicWord, TopicMatcherDFAState>(); - - nextState = new TopicMatcherDFAState(nextStateMap, results); - newStateMap.put(destinations, nextState); - - mergeStateMachines( - destinations, - nextStateMap, - newStateMap); - - - } - - - } - newNextStateMap.put(transition.getKey(),nextState); - } - - // Remove redundant transitions where defined tokenWord has same action as ANY_WORD - TopicMatcherDFAState anyWordState = newNextStateMap.get(TopicWord.ANY_WORD); - if(anyWordState != null) - { - List<TopicWord> removeList = new ArrayList<TopicWord>(); - for(Map.Entry<TopicWord,TopicMatcherDFAState> entry : newNextStateMap.entrySet()) - { - if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD) - { - removeList.add(entry.getKey()); - } - } - for(TopicWord removeKey : removeList) - { - newNextStateMap.remove(removeKey); - } - } - - - - } - - - public String toString() - { - StringBuilder transitions = new StringBuilder(); - for(Map.Entry<TopicWord, TopicMatcherDFAState> entry : _nextStateMap.entrySet()) - { - transitions.append("[ "); - transitions.append(entry.getKey()); - transitions.append("\t ->\t "); - transitions.append(entry.getValue()._id); - transitions.append(" ]\n"); - } - - - return "[ State " + _id + " ]\n" + transitions + "\n"; - - } - - public String reachableStates() - { - StringBuilder result = new StringBuilder("Start state: " + _id + "\n"); - - SortedSet<TopicMatcherDFAState> reachableStates = - new TreeSet<TopicMatcherDFAState>(new Comparator<TopicMatcherDFAState>() - { - public int compare(final TopicMatcherDFAState o1, final TopicMatcherDFAState o2) - { - return o1._id - o2._id; - } - }); - reachableStates.add(this); - - int count; - - do - { - count = reachableStates.size(); - Collection<TopicMatcherDFAState> originalStates = new ArrayList<TopicMatcherDFAState>(reachableStates); - for(TopicMatcherDFAState state : originalStates) - { - reachableStates.addAll(state._nextStateMap.values()); - } - } - while(reachableStates.size() != count); - - - - for(TopicMatcherDFAState state : reachableStates) - { - result.append(state.toString()); - } - - return result.toString(); - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java deleted file mode 100644 index 71d30adfac..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.apache.qpid.server.exchange.topic; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public interface TopicMatcherResult -{ -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java deleted file mode 100644 index 7e7cb6c0ae..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange.topic; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AMQShortStringTokenizer; -import org.apache.qpid.server.exchange.TopicExchange; - -import java.util.List; -import java.util.ArrayList; - -public class TopicNormalizer -{ - private static final byte TOPIC_SEPARATOR = (byte)'.'; - private static final byte HASH_BYTE = (byte)'#'; - private static final byte STAR_BYTE = (byte)'*'; - - private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString("."); - private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*"); - private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#"); - - public static AMQShortString normalize(AMQShortString routingKey) - { - if(routingKey == null) - { - return AMQShortString.EMPTY_STRING; - } - else if(!(routingKey.contains(HASH_BYTE) || routingKey.contains(STAR_BYTE))) - { - return routingKey; - } - else - { - AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); - - List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>(); - - while (routingTokens.hasMoreTokens()) - { - subscriptionList.add(routingTokens.nextToken()); - } - - int size = subscriptionList.size(); - - for (int index = 0; index < size; index++) - { - // if there are more levels - if ((index + 1) < size) - { - if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN)) - { - if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN)) - { - // we don't need #.# delete this one - subscriptionList.remove(index); - size--; - // redo this normalisation - index--; - } - - if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN)) - { - // we don't want #.* swap to *.# - // remove it and put it in at index + 1 - subscriptionList.add(index + 1, subscriptionList.remove(index)); - } - } - } // if we have more levels - } - - - - AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING); - - return normalizedString; - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java deleted file mode 100644 index 3e9facf412..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java +++ /dev/null @@ -1,613 +0,0 @@ -package org.apache.qpid.server.exchange.topic; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AMQShortStringTokenizer; - -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; -import java.io.IOException; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class TopicParser -{ - private static final byte TOPIC_DELIMITER = (byte)'.'; - - private final TopicWordDictionary _dictionary = new TopicWordDictionary(); - private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>(); - - private static class Position - { - private final TopicWord _word; - private final boolean _selfTransition; - private final int _position; - private final boolean _endState; - private boolean _followedByAnyLoop; - - - public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState) - { - _position = position; - _word = word; - _selfTransition = selfTransition; - _endState = endState; - } - - - } - - private static final Position ERROR_POSITION = new Position(Integer.MAX_VALUE,null, true, false); - - private static class SimpleState - { - Set<Position> _positions; - Map<TopicWord, SimpleState> _nextState; - } - - - public void addBinding(AMQShortString bindingKey, TopicMatcherResult result) - { - - TopicMatcherDFAState startingStateMachine; - TopicMatcherDFAState newStateMachine; - - do - { - startingStateMachine = _stateMachine.get(); - if(startingStateMachine == null) - { - newStateMachine = createStateMachine(bindingKey, result); - } - else - { - newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result)); - } - - } - while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine)); - - } - - public Collection<TopicMatcherResult> parse(AMQShortString routingKey) - { - TopicMatcherDFAState stateMachine = _stateMachine.get(); - if(stateMachine == null) - { - return Collections.EMPTY_SET; - } - else - { - return stateMachine.parse(_dictionary,routingKey); - } - } - - - TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result) - { - List<TopicWord> wordList = createTopicWordList(bindingKey); - int wildCards = 0; - for(TopicWord word : wordList) - { - if(word == TopicWord.WILDCARD_WORD) - { - wildCards++; - } - } - if(wildCards == 0) - { - TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1]; - states[states.length-1] = new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result)); - for(int i = states.length-2; i >= 0; i--) - { - states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET); - - } - return states[0]; - } - else if(wildCards == wordList.size()) - { - Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<TopicWord,TopicMatcherDFAState>(); - TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result)); - stateMap.put(TopicWord.ANY_WORD, state); - return state; - } - - - int positionCount = wordList.size() - wildCards; - - Position[] positions = new Position[positionCount+1]; - - int lastWord; - - if(wordList.get(wordList.size()-1)== TopicWord.WILDCARD_WORD) - { - lastWord = wordList.size()-1; - positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, true, true); - } - else - { - lastWord = wordList.size(); - positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, false, true); - } - - - int pos = 0; - int wordPos = 0; - - - while(wordPos < lastWord) - { - TopicWord word = wordList.get(wordPos++); - - if(word == TopicWord.WILDCARD_WORD) - { - int nextWordPos = wordPos++; - word = wordList.get(nextWordPos); - - positions[pos] = new Position(pos++,word,true,false); - } - else - { - positions[pos] = new Position(pos++,word,false,false); - } - - } - - - for(int p = 0; p<positionCount; p++) - { - boolean followedByWildcards = true; - - int n = p; - while(followedByWildcards && n<(positionCount+1)) - { - - if(positions[n]._selfTransition) - { - break; - } - else if(positions[n]._word!=TopicWord.ANY_WORD) - { - followedByWildcards = false; - } - n++; - } - - - positions[p]._followedByAnyLoop = followedByWildcards && (n!= positionCount+1); - } - - - // from each position you transition to a set of other positions. - // we approach this by examining steps of increasing length - so we - // look how far we can go from the start position in 1 word, 2 words, etc... - - Map<Set<Position>,SimpleState> stateMap = new HashMap<Set<Position>,SimpleState>(); - - - SimpleState state = new SimpleState(); - state._positions = Collections.singleton( positions[0] ); - stateMap.put(state._positions, state); - - calculateNextStates(state, stateMap, positions); - - SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]); - HashMap<TopicWord, TopicMatcherDFAState>[] dfaStateMaps = new HashMap[simpleStates.length]; - Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<SimpleState, TopicMatcherDFAState>(); - - for(int i = 0; i < simpleStates.length; i++) - { - - Collection<TopicMatcherResult> results; - boolean endState = false; - - for(Position p : simpleStates[i]._positions) - { - if(p._endState) - { - endState = true; - break; - } - } - - if(endState) - { - results = Collections.singleton(result); - } - else - { - results = Collections.EMPTY_SET; - } - - dfaStateMaps[i] = new HashMap<TopicWord, TopicMatcherDFAState>(); - simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i],results)); - - } - for(int i = 0; i < simpleStates.length; i++) - { - SimpleState simpleState = simpleStates[i]; - - Map<TopicWord, SimpleState> nextSimpleStateMap = simpleState._nextState; - for(Map.Entry<TopicWord, SimpleState> stateMapEntry : nextSimpleStateMap.entrySet()) - { - dfaStateMaps[i].put(stateMapEntry.getKey(), simple2DFAMap.get(stateMapEntry.getValue())); - } - - } - - return simple2DFAMap.get(state); - - } - - - - private void calculateNextStates(final SimpleState state, - final Map<Set<Position>, SimpleState> stateMap, - final Position[] positions) - { - Map<TopicWord, Set<Position>> transitions = new HashMap<TopicWord,Set<Position>>(); - - for(Position pos : state._positions) - { - if(pos._selfTransition) - { - Set<Position> dest = transitions.get(TopicWord.ANY_WORD); - if(dest == null) - { - dest = new HashSet<Position>(); - transitions.put(TopicWord.ANY_WORD,dest); - } - dest.add(pos); - } - - final int nextPos = pos._position + 1; - Position nextPosition = nextPos == positions.length ? ERROR_POSITION : positions[nextPos]; - - Set<Position> dest = transitions.get(pos._word); - if(dest == null) - { - dest = new HashSet<Position>(); - transitions.put(pos._word,dest); - } - dest.add(nextPosition); - - } - - Set<Position> anyWordTransitions = transitions.get(TopicWord.ANY_WORD); - if(anyWordTransitions != null) - { - for(Set<Position> dest : transitions.values()) - { - dest.addAll(anyWordTransitions); - } - } - - state._nextState = new HashMap<TopicWord, SimpleState>(); - - for(Map.Entry<TopicWord,Set<Position>> dest : transitions.entrySet()) - { - - if(dest.getValue().size()>1) - { - dest.getValue().remove(ERROR_POSITION); - } - Position loopingTerminal = null; - for(Position destPos : dest.getValue()) - { - if(destPos._selfTransition && destPos._endState) - { - loopingTerminal = destPos; - break; - } - } - - if(loopingTerminal!=null) - { - dest.setValue(Collections.singleton(loopingTerminal)); - } - else - { - Position anyLoop = null; - for(Position destPos : dest.getValue()) - { - if(destPos._followedByAnyLoop) - { - if(anyLoop == null || anyLoop._position<destPos._position) - { - anyLoop = destPos; - } - } - } - if(anyLoop != null) - { - Collection<Position> removals = new ArrayList<Position>(); - for(Position destPos : dest.getValue()) - { - if(destPos._position < anyLoop._position) - { - removals.add(destPos); - } - } - dest.getValue().removeAll(removals); - } - } - - SimpleState stateForEntry = stateMap.get(dest.getValue()); - if(stateForEntry == null) - { - stateForEntry = new SimpleState(); - stateForEntry._positions = dest.getValue(); - stateMap.put(dest.getValue(),stateForEntry); - calculateNextStates(stateForEntry, - stateMap, - positions); - } - state._nextState.put(dest.getKey(),stateForEntry); - - - - } - - // remove redundant transitions - SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD); - if(anyWordState != null) - { - List<TopicWord> removeList = new ArrayList<TopicWord>(); - for(Map.Entry<TopicWord,SimpleState> entry : state._nextState.entrySet()) - { - if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD) - { - removeList.add(entry.getKey()); - } - } - for(TopicWord removeKey : removeList) - { - state._nextState.remove(removeKey); - } - } - - - } - - private List<TopicWord> createTopicWordList(final AMQShortString bindingKey) - { - AMQShortStringTokenizer tokens = bindingKey.tokenize(TOPIC_DELIMITER); - TopicWord previousWord = null; - - List<TopicWord> wordList = new ArrayList<TopicWord>(); - - while(tokens.hasMoreTokens()) - { - TopicWord nextWord = _dictionary.getOrCreateWord(tokens.nextToken()); - if(previousWord == TopicWord.WILDCARD_WORD) - { - - if(nextWord == TopicWord.WILDCARD_WORD) - { - // consecutive wildcards can be merged - // i.e. subsequent wildcards can be discarded - continue; - } - else if(nextWord == TopicWord.ANY_WORD) - { - // wildcard and anyword can be reordered to always put anyword first - wordList.set(wordList.size()-1,TopicWord.ANY_WORD); - nextWord = TopicWord.WILDCARD_WORD; - } - } - wordList.add(nextWord); - previousWord = nextWord; - - } - return wordList; - } - - - public static void main(String[] args) - { - - printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); - printMatches(new String[]{ - "#.a.#", - "#.b.#", - "#.c.#", - "#.d.#", - "#.e.#", - "#.f.#", - "#.g.#", - "#.h.#", - "#.i.#", - "#.j.#", - "#.k.#", - "#.l.#", - "#.m.#", - "#.n.#", - "#.o.#", - "#.p.#", - "#.q.#" - - }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); -/* - printMatches(new String[]{ - "#.a.#", - "#.b.#", - "#.c.#", - "#.d.#", - "#.e.#", - "#.f.#", - "#.g.#", - "#.h.#", - "#.i.#", - "#.j.#", - "#.k.#", - "#.l.#", - "#.m.#", - "#.n.#", - "#.o.#", - "#.p.#", - "#.q.#", - "#.r.#", - "#.s.#", - "#.t.#", - "#.u.#", - "#.v.#", - "#.w.#", - "#.x.#", - "#.y.#", - "#.z.#" - - - },"a.b"); - - printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); - printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); - printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c"); - -*/ - - printMatches("",""); - printMatches("a","a"); - printMatches("a",""); - printMatches("","a"); - printMatches("a.b","a.b"); - printMatches("a","a.b"); - printMatches("a.b","a"); - printMatches("*","a"); - printMatches("*.b","a.b"); - printMatches("*.*","a.b"); - printMatches("a.*","a.b"); - printMatches("a.*.#","a.b"); - printMatches("a.#.b","a.b"); - - printMatches("#.b","a"); - printMatches("#.b","a.b"); - printMatches("#.a.b","a.b"); - - - printMatches("#",""); - printMatches("#","a"); - printMatches("#","a.b"); - printMatches("#.#","a.b"); - printMatches("#.*","a.b"); - - printMatches("#.a.b","a.b"); - printMatches("a.b.#","a.b"); - printMatches("a.#","a.b"); - printMatches("#.*.#","a.b"); - printMatches("#.*.b.#","a.b"); - printMatches("#.a.*.#","a.b"); - printMatches("#.a.#.b.#","a.b"); - printMatches("#.*.#.*.#","a.b"); - printMatches("*.#.*.#","a.b"); - printMatches("#.*.#.*","a.b"); - - - printMatches(new String[]{"a.#.b.#","a.*.#.b.#"},"a.b.b.b.b.b.b.b.c"); - - - printMatches(new String[]{"a.b", "a.c"},"a.b"); - printMatches(new String[]{"a.#", "a.c", "#.b"},"a.b"); - printMatches(new String[]{"a.#", "a.c", "#.b", "#", "*.*"},"a.b"); - - printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.e"); - printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.f.g"); - - - - - } - - private static void printMatches(final String[] bindingKeys, final String routingKey) - { - TopicMatcherDFAState sm = null; - Map<TopicMatcherResult, String> resultMap = new HashMap<TopicMatcherResult, String>(); - - TopicParser parser = new TopicParser(); - - long start = System.currentTimeMillis(); - for(int i = 0; i < bindingKeys.length; i++) - { - System.out.println((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]); - TopicMatcherResult r = new TopicMatcherResult(){}; - resultMap.put(r, bindingKeys[i]); - AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]); - - System.err.println("====================================================="); - System.err.println("Adding binding key: " + bindingKeyShortString); - System.err.println("-----------------------------------------------------"); - - - if(i==0) - { - sm = parser.createStateMachine(bindingKeyShortString, r); - } - else - { - sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r)); - } - System.err.println(sm.reachableStates()); - System.err.println("====================================================="); - try - { - System.in.read(); - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - AMQShortString routingKeyShortString = new AMQShortString(routingKey); - - Collection<TopicMatcherResult> results = sm.parse(parser._dictionary, routingKeyShortString); - Collection<String> resultStrings = new ArrayList<String>(); - - for(TopicMatcherResult result : results) - { - resultStrings.add(resultMap.get(result)); - } - - final ArrayList<String> nonMatches = new ArrayList<String>(Arrays.asList(bindingKeys)); - nonMatches.removeAll(resultStrings); - System.out.println("\""+routingKeyShortString+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches); - - - } - - private static void printMatches(String bindingKey, String routingKey) - { - printMatches(new String[] { bindingKey }, routingKey); - } - - - private static boolean matches(String bindingKey, String routingKey) - { - AMQShortString bindingKeyShortString = new AMQShortString(bindingKey); - AMQShortString routingKeyShortString = new AMQShortString(routingKey); - TopicParser parser = new TopicParser(); - - final TopicMatcherResult result = new TopicMatcherResult(){}; - - TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result); - return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty(); - - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java deleted file mode 100644 index f14d70f8a1..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.apache.qpid.server.exchange.topic; - -import org.apache.qpid.framing.AMQShortString; - -import java.util.Map; -import java.util.HashMap; -import java.util.concurrent.ConcurrentHashMap; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public final class TopicWord -{ - public static final TopicWord ANY_WORD = new TopicWord("*"); - public static final TopicWord WILDCARD_WORD = new TopicWord("#"); - private String _word; - - public TopicWord() - { - - } - - public TopicWord(String s) - { - _word = s; - } - - public TopicWord(final AMQShortString name) - { - _word = name.toString(); - } - - public String toString() - { - return _word; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java deleted file mode 100644 index 65a0cd3107..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.apache.qpid.server.exchange.topic; - -import org.apache.qpid.framing.AMQShortString; - -import java.util.concurrent.ConcurrentHashMap; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -public class TopicWordDictionary -{ - private final ConcurrentHashMap<AMQShortString,TopicWord> _dictionary = - new ConcurrentHashMap<AMQShortString,TopicWord>(); - - - - public TopicWordDictionary() - { - _dictionary.put(new AMQShortString("*"), TopicWord.ANY_WORD); - _dictionary.put(new AMQShortString("#"), TopicWord.WILDCARD_WORD); - } - - - - - public TopicWord getOrCreateWord(AMQShortString name) - { - TopicWord word = _dictionary.putIfAbsent(name, new TopicWord(name)); - if(word == null) - { - word = _dictionary.get(name); - } - return word; - } - - - public TopicWord getWord(AMQShortString name) - { - TopicWord word = _dictionary.get(name); - if(word == null) - { - word = TopicWord.ANY_WORD; - } - return word; - } -} |