package org.apache.qpid.server.exchange.topic; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.AMQShortStringTokenizer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ public class TopicParser { private static final byte TOPIC_DELIMITER = (byte)'.'; private final TopicWordDictionary _dictionary = new TopicWordDictionary(); private final AtomicReference _stateMachine = new AtomicReference(); private static class Position { private final TopicWord _word; private final boolean _selfTransition; private final int _position; private final boolean _endState; private boolean _followedByAnyLoop; public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState) { _position = position; _word = word; _selfTransition = selfTransition; _endState = endState; } public TopicWord getWord() { return _word; } public boolean isSelfTransition() { return _selfTransition; } public int getPosition() { return _position; } public boolean isEndState() { return _endState; } public boolean isFollowedByAnyLoop() { return _followedByAnyLoop; } public void setFollowedByAnyLoop(boolean followedByAnyLoop) { _followedByAnyLoop = followedByAnyLoop; } } private static final Position ERROR_POSITION = new Position(Integer.MAX_VALUE,null, true, false); private static class SimpleState { private Set _positions; private Map _nextState; } public void addBinding(AMQShortString bindingKey, TopicMatcherResult result) { TopicMatcherDFAState startingStateMachine; TopicMatcherDFAState newStateMachine; do { startingStateMachine = _stateMachine.get(); if(startingStateMachine == null) { newStateMachine = createStateMachine(bindingKey, result); } else { newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result)); } } while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine)); } public Collection parse(AMQShortString routingKey) { TopicMatcherDFAState stateMachine = _stateMachine.get(); if(stateMachine == null) { return Collections.EMPTY_SET; } else { return stateMachine.parse(_dictionary,routingKey); } } TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result) { List wordList = createTopicWordList(bindingKey); int wildCards = 0; for(TopicWord word : wordList) { if(word == TopicWord.WILDCARD_WORD) { wildCards++; } } if(wildCards == 0) { TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1]; states[states.length-1] = new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result)); for(int i = states.length-2; i >= 0; i--) { states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET); } return states[0]; } else if(wildCards == wordList.size()) { Map stateMap = new HashMap(); TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result)); stateMap.put(TopicWord.ANY_WORD, state); return state; } int positionCount = wordList.size() - wildCards; Position[] positions = new Position[positionCount+1]; int lastWord; if(wordList.get(wordList.size()-1)== TopicWord.WILDCARD_WORD) { lastWord = wordList.size()-1; positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, true, true); } else { lastWord = wordList.size(); positions[positionCount] = new Position(positionCount, TopicWord.ANY_WORD, false, true); } int pos = 0; int wordPos = 0; while(wordPos < lastWord) { TopicWord word = wordList.get(wordPos++); if(word == TopicWord.WILDCARD_WORD) { int nextWordPos = wordPos++; word = wordList.get(nextWordPos); positions[pos] = new Position(pos++,word,true,false); } else { positions[pos] = new Position(pos++,word,false,false); } } for(int p = 0; p,SimpleState> stateMap = new HashMap,SimpleState>(); SimpleState state = new SimpleState(); state._positions = Collections.singleton( positions[0] ); stateMap.put(state._positions, state); calculateNextStates(state, stateMap, positions); SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]); HashMap[] dfaStateMaps = new HashMap[simpleStates.length]; Map simple2DFAMap = new HashMap(); for(int i = 0; i < simpleStates.length; i++) { Collection results; boolean endState = false; for(Position p : simpleStates[i]._positions) { if(p.isEndState()) { endState = true; break; } } if(endState) { results = Collections.singleton(result); } else { results = Collections.EMPTY_SET; } dfaStateMaps[i] = new HashMap(); simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i],results)); } for(int i = 0; i < simpleStates.length; i++) { SimpleState simpleState = simpleStates[i]; Map nextSimpleStateMap = simpleState._nextState; for(Map.Entry stateMapEntry : nextSimpleStateMap.entrySet()) { dfaStateMaps[i].put(stateMapEntry.getKey(), simple2DFAMap.get(stateMapEntry.getValue())); } } return simple2DFAMap.get(state); } private void calculateNextStates(final SimpleState state, final Map, SimpleState> stateMap, final Position[] positions) { Map> transitions = new HashMap>(); for(Position pos : state._positions) { if(pos.isSelfTransition()) { Set dest = transitions.get(TopicWord.ANY_WORD); if(dest == null) { dest = new HashSet(); transitions.put(TopicWord.ANY_WORD,dest); } dest.add(pos); } final int nextPos = pos.getPosition() + 1; Position nextPosition = nextPos == positions.length ? ERROR_POSITION : positions[nextPos]; Set dest = transitions.get(pos.getWord()); if(dest == null) { dest = new HashSet(); transitions.put(pos.getWord(),dest); } dest.add(nextPosition); } Set anyWordTransitions = transitions.get(TopicWord.ANY_WORD); if(anyWordTransitions != null) { for(Set dest : transitions.values()) { dest.addAll(anyWordTransitions); } } state._nextState = new HashMap(); for(Map.Entry> dest : transitions.entrySet()) { if(dest.getValue().size()>1) { dest.getValue().remove(ERROR_POSITION); } Position loopingTerminal = null; for(Position destPos : dest.getValue()) { if(destPos.isSelfTransition() && destPos.isEndState()) { loopingTerminal = destPos; break; } } if(loopingTerminal!=null) { dest.setValue(Collections.singleton(loopingTerminal)); } else { Position anyLoop = null; for(Position destPos : dest.getValue()) { if(destPos.isFollowedByAnyLoop()) { if(anyLoop == null || anyLoop.getPosition() < destPos.getPosition()) { anyLoop = destPos; } } } if(anyLoop != null) { Collection removals = new ArrayList(); for(Position destPos : dest.getValue()) { if(destPos.getPosition() < anyLoop.getPosition()) { removals.add(destPos); } } dest.getValue().removeAll(removals); } } SimpleState stateForEntry = stateMap.get(dest.getValue()); if(stateForEntry == null) { stateForEntry = new SimpleState(); stateForEntry._positions = dest.getValue(); stateMap.put(dest.getValue(),stateForEntry); calculateNextStates(stateForEntry, stateMap, positions); } state._nextState.put(dest.getKey(),stateForEntry); } // remove redundant transitions SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD); if(anyWordState != null) { List removeList = new ArrayList(); for(Map.Entry entry : state._nextState.entrySet()) { if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD) { removeList.add(entry.getKey()); } } for(TopicWord removeKey : removeList) { state._nextState.remove(removeKey); } } } private List createTopicWordList(final AMQShortString bindingKey) { AMQShortStringTokenizer tokens = bindingKey.tokenize(TOPIC_DELIMITER); TopicWord previousWord = null; List wordList = new ArrayList(); while(tokens.hasMoreTokens()) { TopicWord nextWord = _dictionary.getOrCreateWord(tokens.nextToken()); if(previousWord == TopicWord.WILDCARD_WORD) { if(nextWord == TopicWord.WILDCARD_WORD) { // consecutive wildcards can be merged // i.e. subsequent wildcards can be discarded continue; } else if(nextWord == TopicWord.ANY_WORD) { // wildcard and anyword can be reordered to always put anyword first wordList.set(wordList.size()-1,TopicWord.ANY_WORD); nextWord = TopicWord.WILDCARD_WORD; } } wordList.add(nextWord); previousWord = nextWord; } return wordList; } }