package org.apache.qpid.server.exchange.topic; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.AMQShortStringTokenizer; import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.io.IOException; /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ public class TopicParser { private static final byte TOPIC_DELIMITER = (byte)'.'; private final TopicWordDictionary _dictionary = new TopicWordDictionary(); private final AtomicReference _stateMachine = new AtomicReference(); private static class Position { private final TopicWord _word; private final boolean _selfTransition; private final int _position; private final boolean _endState; private boolean _followedByAnyLoop; public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState) { _position = position; _word = word; _selfTransition = selfTransition; _endState = endState; } } private static final Position ERROR_POSITION = new Position(Integer.MAX_VALUE,null, true, false); private static class SimpleState { Set _positions; Map _nextState; } public void addBinding(AMQShortString bindingKey, TopicMatcherResult result) { TopicMatcherDFAState startingStateMachine; TopicMatcherDFAState newStateMachine; do { startingStateMachine = _stateMachine.get(); if(startingStateMachine == null) { newStateMachine = createStateMachine(bindingKey, result); } else { newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result)); } } while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine)); } public Collection parse(AMQShortString routingKey) { TopicMatcherDFAState stateMachine = _stateMachine.get(); if(stateMachine == null) { return Collections.EMPTY_SET; } else { return stateMachine.parse(_dictionary,routingKey); } } TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result) { List wordList = createTopicWordList(bindingKey); int wildCards = 0; for(TopicWord word : wordList) { if(word == TopicWord.WILDCARD_WORD) { wildCards++; } } if(wildCards == 0) { TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1]; states[states.length-1] = new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result)); for(int i = states.length-2; i >= 0; i--) { states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET); } return states[0]; } else if(wildCards == wordList.size()) { Map stateMap = new HashMap(); TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result)); stateMap.put(TopicWord.ANY_WORD, state); return state; } int positionCount = wordList.size() - wildCards; Position[] positions = new Position[positionCount+1]; int lastWord; if(wordList.get(wordList.size()-1)== TopicWord.WILDCARD_WORD) { lastWord = wordList.size()-1; positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, true, true); } else { lastWord = wordList.size(); positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, false, true); } int pos = 0; int wordPos = 0; while(wordPos < lastWord) { TopicWord word = wordList.get(wordPos++); if(word == TopicWord.WILDCARD_WORD) { int nextWordPos = wordPos++; word = wordList.get(nextWordPos); positions[pos] = new Position(pos++,word,true,false); } else { positions[pos] = new Position(pos++,word,false,false); } } for(int p = 0; p,SimpleState> stateMap = new HashMap,SimpleState>(); SimpleState state = new SimpleState(); state._positions = Collections.singleton( positions[0] ); stateMap.put(state._positions, state); calculateNextStates(state, stateMap, positions); SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]); HashMap[] dfaStateMaps = new HashMap[simpleStates.length]; Map simple2DFAMap = new HashMap(); for(int i = 0; i < simpleStates.length; i++) { Collection results; boolean endState = false; for(Position p : simpleStates[i]._positions) { if(p._endState) { endState = true; break; } } if(endState) { results = Collections.singleton(result); } else { results = Collections.EMPTY_SET; } dfaStateMaps[i] = new HashMap(); simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i],results)); } for(int i = 0; i < simpleStates.length; i++) { SimpleState simpleState = simpleStates[i]; Map nextSimpleStateMap = simpleState._nextState; for(Map.Entry stateMapEntry : nextSimpleStateMap.entrySet()) { dfaStateMaps[i].put(stateMapEntry.getKey(), simple2DFAMap.get(stateMapEntry.getValue())); } } return simple2DFAMap.get(state); } private void calculateNextStates(final SimpleState state, final Map, SimpleState> stateMap, final Position[] positions) { Map> transitions = new HashMap>(); for(Position pos : state._positions) { if(pos._selfTransition) { Set dest = transitions.get(TopicWord.ANY_WORD); if(dest == null) { dest = new HashSet(); transitions.put(TopicWord.ANY_WORD,dest); } dest.add(pos); } final int nextPos = pos._position + 1; Position nextPosition = nextPos == positions.length ? ERROR_POSITION : positions[nextPos]; Set dest = transitions.get(pos._word); if(dest == null) { dest = new HashSet(); transitions.put(pos._word,dest); } dest.add(nextPosition); } Set anyWordTransitions = transitions.get(TopicWord.ANY_WORD); if(anyWordTransitions != null) { for(Set dest : transitions.values()) { dest.addAll(anyWordTransitions); } } state._nextState = new HashMap(); for(Map.Entry> dest : transitions.entrySet()) { if(dest.getValue().size()>1) { dest.getValue().remove(ERROR_POSITION); } Position loopingTerminal = null; for(Position destPos : dest.getValue()) { if(destPos._selfTransition && destPos._endState) { loopingTerminal = destPos; break; } } if(loopingTerminal!=null) { dest.setValue(Collections.singleton(loopingTerminal)); } else { Position anyLoop = null; for(Position destPos : dest.getValue()) { if(destPos._followedByAnyLoop) { if(anyLoop == null || anyLoop._position removals = new ArrayList(); for(Position destPos : dest.getValue()) { if(destPos._position < anyLoop._position) { removals.add(destPos); } } dest.getValue().removeAll(removals); } } SimpleState stateForEntry = stateMap.get(dest.getValue()); if(stateForEntry == null) { stateForEntry = new SimpleState(); stateForEntry._positions = dest.getValue(); stateMap.put(dest.getValue(),stateForEntry); calculateNextStates(stateForEntry, stateMap, positions); } state._nextState.put(dest.getKey(),stateForEntry); } // remove redundant transitions SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD); if(anyWordState != null) { List removeList = new ArrayList(); for(Map.Entry entry : state._nextState.entrySet()) { if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD) { removeList.add(entry.getKey()); } } for(TopicWord removeKey : removeList) { state._nextState.remove(removeKey); } } } private List createTopicWordList(final AMQShortString bindingKey) { AMQShortStringTokenizer tokens = bindingKey.tokenize(TOPIC_DELIMITER); TopicWord previousWord = null; List wordList = new ArrayList(); while(tokens.hasMoreTokens()) { TopicWord nextWord = _dictionary.getOrCreateWord(tokens.nextToken()); if(previousWord == TopicWord.WILDCARD_WORD) { if(nextWord == TopicWord.WILDCARD_WORD) { // consecutive wildcards can be merged // i.e. subsequent wildcards can be discarded continue; } else if(nextWord == TopicWord.ANY_WORD) { // wildcard and anyword can be reordered to always put anyword first wordList.set(wordList.size()-1,TopicWord.ANY_WORD); nextWord = TopicWord.WILDCARD_WORD; } } wordList.add(nextWord); previousWord = nextWord; } return wordList; } public static void main(String[] args) { printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); printMatches(new String[]{ "#.a.#", "#.b.#", "#.c.#", "#.d.#", "#.e.#", "#.f.#", "#.g.#", "#.h.#", "#.i.#", "#.j.#", "#.k.#", "#.l.#", "#.m.#", "#.n.#", "#.o.#", "#.p.#", "#.q.#" }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); /* printMatches(new String[]{ "#.a.#", "#.b.#", "#.c.#", "#.d.#", "#.e.#", "#.f.#", "#.g.#", "#.h.#", "#.i.#", "#.j.#", "#.k.#", "#.l.#", "#.m.#", "#.n.#", "#.o.#", "#.p.#", "#.q.#", "#.r.#", "#.s.#", "#.t.#", "#.u.#", "#.v.#", "#.w.#", "#.x.#", "#.y.#", "#.z.#" },"a.b"); printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c"); */ printMatches("",""); printMatches("a","a"); printMatches("a",""); printMatches("","a"); printMatches("a.b","a.b"); printMatches("a","a.b"); printMatches("a.b","a"); printMatches("*","a"); printMatches("*.b","a.b"); printMatches("*.*","a.b"); printMatches("a.*","a.b"); printMatches("a.*.#","a.b"); printMatches("a.#.b","a.b"); printMatches("#.b","a"); printMatches("#.b","a.b"); printMatches("#.a.b","a.b"); printMatches("#",""); printMatches("#","a"); printMatches("#","a.b"); printMatches("#.#","a.b"); printMatches("#.*","a.b"); printMatches("#.a.b","a.b"); printMatches("a.b.#","a.b"); printMatches("a.#","a.b"); printMatches("#.*.#","a.b"); printMatches("#.*.b.#","a.b"); printMatches("#.a.*.#","a.b"); printMatches("#.a.#.b.#","a.b"); printMatches("#.*.#.*.#","a.b"); printMatches("*.#.*.#","a.b"); printMatches("#.*.#.*","a.b"); printMatches(new String[]{"a.#.b.#","a.*.#.b.#"},"a.b.b.b.b.b.b.b.c"); printMatches(new String[]{"a.b", "a.c"},"a.b"); printMatches(new String[]{"a.#", "a.c", "#.b"},"a.b"); printMatches(new String[]{"a.#", "a.c", "#.b", "#", "*.*"},"a.b"); printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.e"); printMatches(new String[]{"a.b.c.d.e.#", "a.b.c.d.#", "a.b.c.d.*", "a.b.c.#", "#.e", "a.*.c.d.e","#.c.*.#.*.*"},"a.b.c.d.f.g"); } private static void printMatches(final String[] bindingKeys, final String routingKey) { TopicMatcherDFAState sm = null; Map resultMap = new HashMap(); TopicParser parser = new TopicParser(); long start = System.currentTimeMillis(); for(int i = 0; i < bindingKeys.length; i++) { System.out.println((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]); TopicMatcherResult r = new TopicMatcherResult(){}; resultMap.put(r, bindingKeys[i]); AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]); System.err.println("====================================================="); System.err.println("Adding binding key: " + bindingKeyShortString); System.err.println("-----------------------------------------------------"); if(i==0) { sm = parser.createStateMachine(bindingKeyShortString, r); } else { sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r)); } System.err.println(sm.reachableStates()); System.err.println("====================================================="); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } AMQShortString routingKeyShortString = new AMQShortString(routingKey); Collection results = sm.parse(parser._dictionary, routingKeyShortString); Collection resultStrings = new ArrayList(); for(TopicMatcherResult result : results) { resultStrings.add(resultMap.get(result)); } final ArrayList nonMatches = new ArrayList(Arrays.asList(bindingKeys)); nonMatches.removeAll(resultStrings); System.out.println("\""+routingKeyShortString+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches); } private static void printMatches(String bindingKey, String routingKey) { printMatches(new String[] { bindingKey }, routingKey); } private static boolean matches(String bindingKey, String routingKey) { AMQShortString bindingKeyShortString = new AMQShortString(bindingKey); AMQShortString routingKeyShortString = new AMQShortString(routingKey); TopicParser parser = new TopicParser(); final TopicMatcherResult result = new TopicMatcherResult(){}; TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result); return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty(); } }