summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/exchange/topic
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/topic')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java201
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java295
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java96
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java613
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java54
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java63
7 files changed, 0 insertions, 1347 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
deleted file mode 100644
index 41dc0d749a..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange.topic;
-
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public final class TopicExchangeResult implements TopicMatcherResult
-{
- private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
- private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
- private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
-
- public void addUnfilteredQueue(AMQQueue queue)
- {
- Integer instances = _unfilteredQueues.get(queue);
- if(instances == null)
- {
- _unfilteredQueues.put(queue, 1);
- }
- else
- {
- _unfilteredQueues.put(queue, instances + 1);
- }
- }
-
- public void removeUnfilteredQueue(AMQQueue queue)
- {
- Integer instances = _unfilteredQueues.get(queue);
- if(instances == 1)
- {
- _unfilteredQueues.remove(queue);
- }
- else
- {
- _unfilteredQueues.put(queue,instances - 1);
- }
-
- }
-
- public Collection<AMQQueue> getUnfilteredQueues()
- {
- return _unfilteredQueues.keySet();
- }
-
- public void addBinding(Binding binding)
- {
- _bindings.add(binding);
- }
-
- public void removeBinding(Binding binding)
- {
- _bindings.remove(binding);
- }
-
- public List<Binding> getBindings()
- {
- return new ArrayList<Binding>(_bindings);
- }
-
- public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
- {
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
- if(filters == null)
- {
- filters = new ConcurrentHashMap<MessageFilter,Integer>();
- _filteredQueues.put(queue, filters);
- }
- Integer instances = filters.get(filter);
- if(instances == null)
- {
- filters.put(filter,1);
- }
- else
- {
- filters.put(filter, instances + 1);
- }
-
- }
-
- public void removeFilteredQueue(AMQQueue queue, MessageFilter filter)
- {
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
- if(filters != null)
- {
- Integer instances = filters.get(filter);
- if(instances != null)
- {
- if(instances == 1)
- {
- filters.remove(filter);
- if(filters.isEmpty())
- {
- _filteredQueues.remove(queue);
- }
- }
- else
- {
- filters.put(filter, instances - 1);
- }
- }
-
- }
-
- }
-
- public void replaceQueueFilter(AMQQueue queue,
- MessageFilter oldFilter,
- MessageFilter newFilter)
- {
- Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
- Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters);
- Integer oldFilterInstances = filters.get(oldFilter);
- if(oldFilterInstances == 1)
- {
- newFilters.remove(oldFilter);
- }
- else
- {
- newFilters.put(oldFilter, oldFilterInstances-1);
- }
- Integer newFilterInstances = filters.get(newFilter);
- if(newFilterInstances == null)
- {
- newFilters.put(newFilter, 1);
- }
- else
- {
- newFilters.put(newFilter, newFilterInstances+1);
- }
- _filteredQueues.put(queue,newFilters);
- }
-
- public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues)
- {
- if(queues == null)
- {
- if(_filteredQueues.isEmpty())
- {
- return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
- }
- else
- {
- queues = new HashSet<AMQQueue>();
- }
- }
- else if(!(queues instanceof Set))
- {
- queues = new HashSet<AMQQueue>(queues);
- }
-
- queues.addAll(_unfilteredQueues.keySet());
- if(!_filteredQueues.isEmpty())
- {
- for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet())
- {
- if(!queues.contains(entry.getKey()))
- {
- for(MessageFilter filter : entry.getValue().keySet())
- {
- if(filter.matches(msg))
- {
- queues.add(entry.getKey());
- }
- }
- }
- }
- }
- return queues;
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
deleted file mode 100644
index 36076cf75b..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
+++ /dev/null
@@ -1,295 +0,0 @@
-package org.apache.qpid.server.exchange.topic;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AMQShortStringTokenizer;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class TopicMatcherDFAState
-{
- private static final AtomicInteger stateId = new AtomicInteger();
-
- private final int _id = stateId.incrementAndGet();
-
- private final Collection<TopicMatcherResult> _results;
- private final Map<TopicWord, TopicMatcherDFAState> _nextStateMap;
- private static final byte TOPIC_DELIMITTER = (byte)'.';
-
-
- public TopicMatcherDFAState(Map<TopicWord, TopicMatcherDFAState> nextStateMap,
- Collection<TopicMatcherResult> results )
- {
- _nextStateMap = nextStateMap;
- _results = results;
- }
-
-
- public TopicMatcherDFAState nextState(TopicWord word)
- {
- final TopicMatcherDFAState nextState = _nextStateMap.get(word);
- return nextState == null ? _nextStateMap.get(TopicWord.ANY_WORD) : nextState;
- }
-
- public Collection<TopicMatcherResult> terminate()
- {
- return _results;
- }
-
-
- public Collection<TopicMatcherResult> parse(TopicWordDictionary dictionary, AMQShortString routingKey)
- {
- return parse(dictionary, routingKey.tokenize(TOPIC_DELIMITTER));
- }
-
- private Collection<TopicMatcherResult> parse(final TopicWordDictionary dictionary,
- final AMQShortStringTokenizer tokens)
- {
- if(!tokens.hasMoreTokens())
- {
- return _results;
- }
- TopicWord word = dictionary.getWord(tokens.nextToken());
- TopicMatcherDFAState nextState = _nextStateMap.get(word);
- if(nextState == null && word != TopicWord.ANY_WORD)
- {
- nextState = _nextStateMap.get(TopicWord.ANY_WORD);
- }
- if(nextState == null)
- {
- return Collections.EMPTY_SET;
- }
- // Shortcut if we are at a looping terminal state
- if((nextState == this) && (_nextStateMap.size() == 1) && _nextStateMap.containsKey(TopicWord.ANY_WORD))
- {
- return _results;
- }
-
- return nextState.parse(dictionary, tokens);
-
- }
-
-
- public TopicMatcherDFAState mergeStateMachines(TopicMatcherDFAState otherStateMachine)
- {
- Map<Set<TopicMatcherDFAState>, TopicMatcherDFAState> newStateMap= new HashMap<Set<TopicMatcherDFAState>, TopicMatcherDFAState>();
-
- Collection<TopicMatcherResult> results;
-
- if(_results.isEmpty())
- {
- results = otherStateMachine._results;
- }
- else if(otherStateMachine._results.isEmpty())
- {
- results = _results;
- }
- else
- {
- results = new HashSet<TopicMatcherResult>(_results);
- results.addAll(otherStateMachine._results);
- }
-
-
- final Map<TopicWord, TopicMatcherDFAState> newNextStateMap = new HashMap<TopicWord, TopicMatcherDFAState>();
-
- TopicMatcherDFAState newState = new TopicMatcherDFAState(newNextStateMap, results);
-
-
- Set<TopicMatcherDFAState> oldStates = new HashSet<TopicMatcherDFAState>();
- oldStates.add(this);
- oldStates.add(otherStateMachine);
-
- newStateMap.put(oldStates, newState);
-
- mergeStateMachines(oldStates, newNextStateMap, newStateMap);
-
- return newState;
-
- }
-
- private static void mergeStateMachines(
- final Set<TopicMatcherDFAState> oldStates,
- final Map<TopicWord, TopicMatcherDFAState> newNextStateMap,
- final Map<Set<TopicMatcherDFAState>, TopicMatcherDFAState> newStateMap)
- {
- Map<TopicWord, Set<TopicMatcherDFAState>> nfaMap = new HashMap<TopicWord, Set<TopicMatcherDFAState>>();
-
- for(TopicMatcherDFAState state : oldStates)
- {
- Map<TopicWord, TopicMatcherDFAState> map = state._nextStateMap;
- for(Map.Entry<TopicWord, TopicMatcherDFAState> entry : map.entrySet())
- {
- Set<TopicMatcherDFAState> states = nfaMap.get(entry.getKey());
- if(states == null)
- {
- states = new HashSet<TopicMatcherDFAState>();
- nfaMap.put(entry.getKey(), states);
- }
- states.add(entry.getValue());
- }
- }
-
- Set<TopicMatcherDFAState> anyWordStates = nfaMap.get(TopicWord.ANY_WORD);
-
- for(Map.Entry<TopicWord, Set<TopicMatcherDFAState>> transition : nfaMap.entrySet())
- {
- Set<TopicMatcherDFAState> destinations = transition.getValue();
-
- if(anyWordStates != null)
- {
- destinations.addAll(anyWordStates);
- }
-
- TopicMatcherDFAState nextState = newStateMap.get(destinations);
- if(nextState == null)
- {
-
- if(destinations.size() == 1)
- {
- nextState = destinations.iterator().next();
- newStateMap.put(destinations, nextState);
- }
- else
- {
- Collection<TopicMatcherResult> results;
-
- Set<Collection<TopicMatcherResult>> resultSets = new HashSet<Collection<TopicMatcherResult>>();
- for(TopicMatcherDFAState destination : destinations)
- {
- resultSets.add(destination._results);
- }
- resultSets.remove(Collections.EMPTY_SET);
- if(resultSets.size() == 0)
- {
- results = Collections.EMPTY_SET;
- }
- else if(resultSets.size() == 1)
- {
- results = resultSets.iterator().next();
- }
- else
- {
- results = new HashSet<TopicMatcherResult>();
- for(Collection<TopicMatcherResult> oldResult : resultSets)
- {
- results.addAll(oldResult);
- }
- }
-
- final Map<TopicWord, TopicMatcherDFAState> nextStateMap = new HashMap<TopicWord, TopicMatcherDFAState>();
-
- nextState = new TopicMatcherDFAState(nextStateMap, results);
- newStateMap.put(destinations, nextState);
-
- mergeStateMachines(
- destinations,
- nextStateMap,
- newStateMap);
-
-
- }
-
-
- }
- newNextStateMap.put(transition.getKey(),nextState);
- }
-
- // Remove redundant transitions where defined tokenWord has same action as ANY_WORD
- TopicMatcherDFAState anyWordState = newNextStateMap.get(TopicWord.ANY_WORD);
- if(anyWordState != null)
- {
- List<TopicWord> removeList = new ArrayList<TopicWord>();
- for(Map.Entry<TopicWord,TopicMatcherDFAState> entry : newNextStateMap.entrySet())
- {
- if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD)
- {
- removeList.add(entry.getKey());
- }
- }
- for(TopicWord removeKey : removeList)
- {
- newNextStateMap.remove(removeKey);
- }
- }
-
-
-
- }
-
-
- public String toString()
- {
- StringBuilder transitions = new StringBuilder();
- for(Map.Entry<TopicWord, TopicMatcherDFAState> entry : _nextStateMap.entrySet())
- {
- transitions.append("[ ");
- transitions.append(entry.getKey());
- transitions.append("\t ->\t ");
- transitions.append(entry.getValue()._id);
- transitions.append(" ]\n");
- }
-
-
- return "[ State " + _id + " ]\n" + transitions + "\n";
-
- }
-
- public String reachableStates()
- {
- StringBuilder result = new StringBuilder("Start state: " + _id + "\n");
-
- SortedSet<TopicMatcherDFAState> reachableStates =
- new TreeSet<TopicMatcherDFAState>(new Comparator<TopicMatcherDFAState>()
- {
- public int compare(final TopicMatcherDFAState o1, final TopicMatcherDFAState o2)
- {
- return o1._id - o2._id;
- }
- });
- reachableStates.add(this);
-
- int count;
-
- do
- {
- count = reachableStates.size();
- Collection<TopicMatcherDFAState> originalStates = new ArrayList<TopicMatcherDFAState>(reachableStates);
- for(TopicMatcherDFAState state : originalStates)
- {
- reachableStates.addAll(state._nextStateMap.values());
- }
- }
- while(reachableStates.size() != count);
-
-
-
- for(TopicMatcherDFAState state : reachableStates)
- {
- result.append(state.toString());
- }
-
- return result.toString();
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java
deleted file mode 100644
index 71d30adfac..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherResult.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.qpid.server.exchange.topic;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public interface TopicMatcherResult
-{
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java
deleted file mode 100644
index 7e7cb6c0ae..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicNormalizer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange.topic;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AMQShortStringTokenizer;
-import org.apache.qpid.server.exchange.TopicExchange;
-
-import java.util.List;
-import java.util.ArrayList;
-
-public class TopicNormalizer
-{
- private static final byte TOPIC_SEPARATOR = (byte)'.';
- private static final byte HASH_BYTE = (byte)'#';
- private static final byte STAR_BYTE = (byte)'*';
-
- private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
- private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
- private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
-
- public static AMQShortString normalize(AMQShortString routingKey)
- {
- if(routingKey == null)
- {
- return AMQShortString.EMPTY_STRING;
- }
- else if(!(routingKey.contains(HASH_BYTE) || routingKey.contains(STAR_BYTE)))
- {
- return routingKey;
- }
- else
- {
- AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
-
- List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
-
- while (routingTokens.hasMoreTokens())
- {
- subscriptionList.add(routingTokens.nextToken());
- }
-
- int size = subscriptionList.size();
-
- for (int index = 0; index < size; index++)
- {
- // if there are more levels
- if ((index + 1) < size)
- {
- if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
- {
- if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
- {
- // we don't need #.# delete this one
- subscriptionList.remove(index);
- size--;
- // redo this normalisation
- index--;
- }
-
- if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
- {
- // we don't want #.* swap to *.#
- // remove it and put it in at index + 1
- subscriptionList.add(index + 1, subscriptionList.remove(index));
- }
- }
- } // if we have more levels
- }
-
-
-
- AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
-
- return normalizedString;
- }
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
deleted file mode 100644
index 3e9facf412..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
+++ /dev/null
@@ -1,613 +0,0 @@
-package org.apache.qpid.server.exchange.topic;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.AMQShortStringTokenizer;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.io.IOException;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class TopicParser
-{
- private static final byte TOPIC_DELIMITER = (byte)'.';
-
- private final TopicWordDictionary _dictionary = new TopicWordDictionary();
- private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
-
- private static class Position
- {
- private final TopicWord _word;
- private final boolean _selfTransition;
- private final int _position;
- private final boolean _endState;
- private boolean _followedByAnyLoop;
-
-
- public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
- {
- _position = position;
- _word = word;
- _selfTransition = selfTransition;
- _endState = endState;
- }
-
-
- }
-
- private static final Position ERROR_POSITION = new Position(Integer.MAX_VALUE,null, true, false);
-
- private static class SimpleState
- {
- Set<Position> _positions;
- Map<TopicWord, SimpleState> _nextState;
- }
-
-
- public void addBinding(AMQShortString bindingKey, TopicMatcherResult result)
- {
-
- TopicMatcherDFAState startingStateMachine;
- TopicMatcherDFAState newStateMachine;
-
- do
- {
- startingStateMachine = _stateMachine.get();
- if(startingStateMachine == null)
- {
- newStateMachine = createStateMachine(bindingKey, result);
- }
- else
- {
- newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result));
- }
-
- }
- while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine));
-
- }
-
- public Collection<TopicMatcherResult> parse(AMQShortString routingKey)
- {
- TopicMatcherDFAState stateMachine = _stateMachine.get();
- if(stateMachine == null)
- {
- return Collections.EMPTY_SET;
- }
- else
- {
- return stateMachine.parse(_dictionary,routingKey);
- }
- }
-
-
- TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result)
- {
- List<TopicWord> wordList = createTopicWordList(bindingKey);
- int wildCards = 0;
- for(TopicWord word : wordList)
- {
- if(word == TopicWord.WILDCARD_WORD)
- {
- wildCards++;
- }
- }
- if(wildCards == 0)
- {
- TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1];
- states[states.length-1] = new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result));
- for(int i = states.length-2; i >= 0; i--)
- {
- states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET);
-
- }
- return states[0];
- }
- else if(wildCards == wordList.size())
- {
- Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<TopicWord,TopicMatcherDFAState>();
- TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result));
- stateMap.put(TopicWord.ANY_WORD, state);
- return state;
- }
-
-
- int positionCount = wordList.size() - wildCards;
-
- Position[] positions = new Position[positionCount+1];
-
- int lastWord;
-
- if(wordList.get(wordList.size()-1)== TopicWord.WILDCARD_WORD)
- {
- lastWord = wordList.size()-1;
- positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, true, true);
- }
- else
- {
- lastWord = wordList.size();
- positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, false, true);
- }
-
-
- int pos = 0;
- int wordPos = 0;
-
-
- while(wordPos < lastWord)
- {
- TopicWord word = wordList.get(wordPos++);
-
- if(word == TopicWord.WILDCARD_WORD)
- {
- int nextWordPos = wordPos++;
- word = wordList.get(nextWordPos);
-
- positions[pos] = new Position(pos++,word,true,false);
- }
- else
- {
- positions[pos] = new Position(pos++,word,false,false);
- }
-
- }
-
-
- for(int p = 0; p<positionCount; p++)
- {
- boolean followedByWildcards = true;
-
- int n = p;
- while(followedByWildcards && n<(positionCount+1))
- {
-
- if(positions[n]._selfTransition)
- {
- break;
- }
- else if(positions[n]._word!=TopicWord.ANY_WORD)
- {
- followedByWildcards = false;
- }
- n++;
- }
-
-
- positions[p]._followedByAnyLoop = followedByWildcards && (n!= positionCount+1);
- }
-
-
- // from each position you transition to a set of other positions.
- // we approach this by examining steps of increasing length - so we
- // look how far we can go from the start position in 1 word, 2 words, etc...
-
- Map<Set<Position>,SimpleState> stateMap = new HashMap<Set<Position>,SimpleState>();
-
-
- SimpleState state = new SimpleState();
- state._positions = Collections.singleton( positions[0] );
- stateMap.put(state._positions, state);
-
- calculateNextStates(state, stateMap, positions);
-
- SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]);
- HashMap<TopicWord, TopicMatcherDFAState>[] dfaStateMaps = new HashMap[simpleStates.length];
- Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<SimpleState, TopicMatcherDFAState>();
-
- for(int i = 0; i < simpleStates.length; i++)
- {
-
- Collection<TopicMatcherResult> results;
- boolean endState = false;
-
- for(Position p : simpleStates[i]._positions)
- {
- if(p._endState)
- {
- endState = true;
- break;
- }
- }
-
- if(endState)
- {
- results = Collections.singleton(result);
- }
- else
- {
- results = Collections.EMPTY_SET;
- }
-
- dfaStateMaps[i] = new HashMap<TopicWord, TopicMatcherDFAState>();
- simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i],results));
-
- }
- for(int i = 0; i < simpleStates.length; i++)
- {
- SimpleState simpleState = simpleStates[i];
-
- Map<TopicWord, SimpleState> nextSimpleStateMap = simpleState._nextState;
- for(Map.Entry<TopicWord, SimpleState> stateMapEntry : nextSimpleStateMap.entrySet())
- {
- dfaStateMaps[i].put(stateMapEntry.getKey(), simple2DFAMap.get(stateMapEntry.getValue()));
- }
-
- }
-
- return simple2DFAMap.get(state);
-
- }
-
-
-
- private void calculateNextStates(final SimpleState state,
- final Map<Set<Position>, SimpleState> stateMap,
- final Position[] positions)
- {
- Map<TopicWord, Set<Position>> transitions = new HashMap<TopicWord,Set<Position>>();
-
- for(Position pos : state._positions)
- {
- if(pos._selfTransition)
- {
- Set<Position> dest = transitions.get(TopicWord.ANY_WORD);
- if(dest == null)
- {
- dest = new HashSet<Position>();
- transitions.put(TopicWord.ANY_WORD,dest);
- }
- dest.add(pos);
- }
-
- final int nextPos = pos._position + 1;
- Position nextPosition = nextPos == positions.length ? ERROR_POSITION : positions[nextPos];
-
- Set<Position> dest = transitions.get(pos._word);
- if(dest == null)
- {
- dest = new HashSet<Position>();
- transitions.put(pos._word,dest);
- }
- dest.add(nextPosition);
-
- }
-
- Set<Position> anyWordTransitions = transitions.get(TopicWord.ANY_WORD);
- if(anyWordTransitions != null)
- {
- for(Set<Position> dest : transitions.values())
- {
- dest.addAll(anyWordTransitions);
- }
- }
-
- state._nextState = new HashMap<TopicWord, SimpleState>();
-
- for(Map.Entry<TopicWord,Set<Position>> dest : transitions.entrySet())
- {
-
- if(dest.getValue().size()>1)
- {
- dest.getValue().remove(ERROR_POSITION);
- }
- Position loopingTerminal = null;
- for(Position destPos : dest.getValue())
- {
- if(destPos._selfTransition && destPos._endState)
- {
- loopingTerminal = destPos;
- break;
- }
- }
-
- if(loopingTerminal!=null)
- {
- dest.setValue(Collections.singleton(loopingTerminal));
- }
- else
- {
- Position anyLoop = null;
- for(Position destPos : dest.getValue())
- {
- if(destPos._followedByAnyLoop)
- {
- if(anyLoop == null || anyLoop._position<destPos._position)
- {
- anyLoop = destPos;
- }
- }
- }
- if(anyLoop != null)
- {
- Collection<Position> removals = new ArrayList<Position>();
- for(Position destPos : dest.getValue())
- {
- if(destPos._position < anyLoop._position)
- {
- removals.add(destPos);
- }
- }
- dest.getValue().removeAll(removals);
- }
- }
-
- SimpleState stateForEntry = stateMap.get(dest.getValue());
- if(stateForEntry == null)
- {
- stateForEntry = new SimpleState();
- stateForEntry._positions = dest.getValue();
- stateMap.put(dest.getValue(),stateForEntry);
- calculateNextStates(stateForEntry,
- stateMap,
- positions);
- }
- state._nextState.put(dest.getKey(),stateForEntry);
-
-
-
- }
-
- // remove redundant transitions
- SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD);
- if(anyWordState != null)
- {
- List<TopicWord> removeList = new ArrayList<TopicWord>();
- for(Map.Entry<TopicWord,SimpleState> entry : state._nextState.entrySet())
- {
- if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD)
- {
- removeList.add(entry.getKey());
- }
- }
- for(TopicWord removeKey : removeList)
- {
- state._nextState.remove(removeKey);
- }
- }
-
-
- }
-
- private List<TopicWord> createTopicWordList(final AMQShortString bindingKey)
- {
- AMQShortStringTokenizer tokens = bindingKey.tokenize(TOPIC_DELIMITER);
- TopicWord previousWord = null;
-
- List<TopicWord> wordList = new ArrayList<TopicWord>();
-
- while(tokens.hasMoreTokens())
- {
- TopicWord nextWord = _dictionary.getOrCreateWord(tokens.nextToken());
- if(previousWord == TopicWord.WILDCARD_WORD)
- {
-
- if(nextWord == TopicWord.WILDCARD_WORD)
- {
- // consecutive wildcards can be merged
- // i.e. subsequent wildcards can be discarded
- continue;
- }
- else if(nextWord == TopicWord.ANY_WORD)
- {
- // wildcard and anyword can be reordered to always put anyword first
- wordList.set(wordList.size()-1,TopicWord.ANY_WORD);
- nextWord = TopicWord.WILDCARD_WORD;
- }
- }
- wordList.add(nextWord);
- previousWord = nextWord;
-
- }
- return wordList;
- }
-
-
- public static void main(String[] args)
- {
-
- printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
- printMatches(new String[]{
- "#.a.#",
- "#.b.#",
- "#.c.#",
- "#.d.#",
- "#.e.#",
- "#.f.#",
- "#.g.#",
- "#.h.#",
- "#.i.#",
- "#.j.#",
- "#.k.#",
- "#.l.#",
- "#.m.#",
- "#.n.#",
- "#.o.#",
- "#.p.#",
- "#.q.#"
-
- }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
-/*
- printMatches(new String[]{
- "#.a.#",
- "#.b.#",
- "#.c.#",
- "#.d.#",
- "#.e.#",
- "#.f.#",
- "#.g.#",
- "#.h.#",
- "#.i.#",
- "#.j.#",
- "#.k.#",
- "#.l.#",
- "#.m.#",
- "#.n.#",
- "#.o.#",
- "#.p.#",
- "#.q.#",
- "#.r.#",
- "#.s.#",
- "#.t.#",
- "#.u.#",
- "#.v.#",
- "#.w.#",
- "#.x.#",
- "#.y.#",
- "#.z.#"
-
-
- },"a.b");
-
- printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
- printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
- printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c");
-
-*/
-
- printMatches("","");
- printMatches("a","a");
- printMatches("a","");
- printMatches("","a");
- printMatches("a.b","a.b");
- printMatches("a","a.b");
- printMatches("a.b","a");
- printMatches("*","a");
- printMatches("*.b","a.b");
- printMatches("*.*","a.b");
- printMatches("a.*","a.b");
- printMatches("a.*.#","a.b");
- printMatches("a.#.b","a.b");
-
- printMatches("#.b","a");
- printMatches("#.b","a.b");
- printMatches("#.a.b","a.b");
-
-
- printMatches("#","");
- printMatches("#","a");
- printMatches("#","a.b");
- printMatches("#.#","a.b");
- printMatches("#.*","a.b");
-
- printMatches("#.a.b","a.b");
- printMatches("a.b.#","a.b");
- printMatches("a.#","a.b");
- printMatches("#.*.#","a.b");
- printMatches("#.*.b.#","a.b");
- printMatches("#.a.*.#","a.b");
- printMatches("#.a.#.b.#","a.b");
- printMatches("#.*.#.*.#","a.b");
- printMatches("*.#.*.#","a.b");
- printMatches("#.*.#.*","a.b");
-
-
- printMatches(new String[]{"a.#.b.#","a.*.#.b.#"},"a.b.b.b.b.b.b.b.c");
-
-
- printMatches(new String[]{"a.b", "a.c"},"a.b");
- printMatches(new String[]{"a.#", "a.c", "#.b"},"a.b");
- printMatches(new String[]{"a.#", "a.c", "#.b", "#", "*.*"},"a.b");
-
- printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.e");
- printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.f.g");
-
-
-
-
- }
-
- private static void printMatches(final String[] bindingKeys, final String routingKey)
- {
- TopicMatcherDFAState sm = null;
- Map<TopicMatcherResult, String> resultMap = new HashMap<TopicMatcherResult, String>();
-
- TopicParser parser = new TopicParser();
-
- long start = System.currentTimeMillis();
- for(int i = 0; i < bindingKeys.length; i++)
- {
- System.out.println((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]);
- TopicMatcherResult r = new TopicMatcherResult(){};
- resultMap.put(r, bindingKeys[i]);
- AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]);
-
- System.err.println("=====================================================");
- System.err.println("Adding binding key: " + bindingKeyShortString);
- System.err.println("-----------------------------------------------------");
-
-
- if(i==0)
- {
- sm = parser.createStateMachine(bindingKeyShortString, r);
- }
- else
- {
- sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r));
- }
- System.err.println(sm.reachableStates());
- System.err.println("=====================================================");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- AMQShortString routingKeyShortString = new AMQShortString(routingKey);
-
- Collection<TopicMatcherResult> results = sm.parse(parser._dictionary, routingKeyShortString);
- Collection<String> resultStrings = new ArrayList<String>();
-
- for(TopicMatcherResult result : results)
- {
- resultStrings.add(resultMap.get(result));
- }
-
- final ArrayList<String> nonMatches = new ArrayList<String>(Arrays.asList(bindingKeys));
- nonMatches.removeAll(resultStrings);
- System.out.println("\""+routingKeyShortString+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
-
-
- }
-
- private static void printMatches(String bindingKey, String routingKey)
- {
- printMatches(new String[] { bindingKey }, routingKey);
- }
-
-
- private static boolean matches(String bindingKey, String routingKey)
- {
- AMQShortString bindingKeyShortString = new AMQShortString(bindingKey);
- AMQShortString routingKeyShortString = new AMQShortString(routingKey);
- TopicParser parser = new TopicParser();
-
- final TopicMatcherResult result = new TopicMatcherResult(){};
-
- TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result);
- return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty();
-
- }
-
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java
deleted file mode 100644
index f14d70f8a1..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWord.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.qpid.server.exchange.topic;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public final class TopicWord
-{
- public static final TopicWord ANY_WORD = new TopicWord("*");
- public static final TopicWord WILDCARD_WORD = new TopicWord("#");
- private String _word;
-
- public TopicWord()
- {
-
- }
-
- public TopicWord(String s)
- {
- _word = s;
- }
-
- public TopicWord(final AMQShortString name)
- {
- _word = name.toString();
- }
-
- public String toString()
- {
- return _word;
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java
deleted file mode 100644
index 65a0cd3107..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicWordDictionary.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.qpid.server.exchange.topic;
-
-import org.apache.qpid.framing.AMQShortString;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-public class TopicWordDictionary
-{
- private final ConcurrentHashMap<AMQShortString,TopicWord> _dictionary =
- new ConcurrentHashMap<AMQShortString,TopicWord>();
-
-
-
- public TopicWordDictionary()
- {
- _dictionary.put(new AMQShortString("*"), TopicWord.ANY_WORD);
- _dictionary.put(new AMQShortString("#"), TopicWord.WILDCARD_WORD);
- }
-
-
-
-
- public TopicWord getOrCreateWord(AMQShortString name)
- {
- TopicWord word = _dictionary.putIfAbsent(name, new TopicWord(name));
- if(word == null)
- {
- word = _dictionary.get(name);
- }
- return word;
- }
-
-
- public TopicWord getWord(AMQShortString name)
- {
- TopicWord word = _dictionary.get(name);
- if(word == null)
- {
- word = TopicWord.ANY_WORD;
- }
- return word;
- }
-}