diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java | 171 |
1 files changed, 168 insertions, 3 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java index f564bbb565..3e9facf412 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java @@ -4,6 +4,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.AMQShortStringTokenizer; import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.io.IOException; /* * @@ -30,6 +32,7 @@ public class TopicParser private static final byte TOPIC_DELIMITER = (byte)'.'; private final TopicWordDictionary _dictionary = new TopicWordDictionary(); + private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>(); private static class Position { @@ -37,6 +40,7 @@ public class TopicParser private final boolean _selfTransition; private final int _position; private final boolean _endState; + private boolean _followedByAnyLoop; public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState) @@ -59,6 +63,43 @@ public class TopicParser } + public void addBinding(AMQShortString bindingKey, TopicMatcherResult result) + { + + TopicMatcherDFAState startingStateMachine; + TopicMatcherDFAState newStateMachine; + + do + { + startingStateMachine = _stateMachine.get(); + if(startingStateMachine == null) + { + newStateMachine = createStateMachine(bindingKey, result); + } + else + { + newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result)); + } + + } + while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine)); + + } + + public Collection<TopicMatcherResult> parse(AMQShortString routingKey) + { + TopicMatcherDFAState stateMachine = _stateMachine.get(); + if(stateMachine == null) + { + return Collections.EMPTY_SET; + } + else + { + return stateMachine.parse(_dictionary,routingKey); + } + } + + TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result) { List<TopicWord> wordList = createTopicWordList(bindingKey); @@ -108,7 +149,6 @@ public class TopicParser } - int pos = 0; int wordPos = 0; @@ -131,6 +171,31 @@ public class TopicParser } + + for(int p = 0; p<positionCount; p++) + { + boolean followedByWildcards = true; + + int n = p; + while(followedByWildcards && n<(positionCount+1)) + { + + if(positions[n]._selfTransition) + { + break; + } + else if(positions[n]._word!=TopicWord.ANY_WORD) + { + followedByWildcards = false; + } + n++; + } + + + positions[p]._followedByAnyLoop = followedByWildcards && (n!= positionCount+1); + } + + // from each position you transition to a set of other positions. // we approach this by examining steps of increasing length - so we // look how far we can go from the start position in 1 word, 2 words, etc... @@ -258,6 +323,32 @@ public class TopicParser { dest.setValue(Collections.singleton(loopingTerminal)); } + else + { + Position anyLoop = null; + for(Position destPos : dest.getValue()) + { + if(destPos._followedByAnyLoop) + { + if(anyLoop == null || anyLoop._position<destPos._position) + { + anyLoop = destPos; + } + } + } + if(anyLoop != null) + { + Collection<Position> removals = new ArrayList<Position>(); + for(Position destPos : dest.getValue()) + { + if(destPos._position < anyLoop._position) + { + removals.add(destPos); + } + } + dest.getValue().removeAll(removals); + } + } SimpleState stateForEntry = stateMap.get(dest.getValue()); if(stateForEntry == null) @@ -332,8 +423,65 @@ public class TopicParser public static void main(String[] args) { + + printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); + printMatches(new String[]{ + "#.a.#", + "#.b.#", + "#.c.#", + "#.d.#", + "#.e.#", + "#.f.#", + "#.g.#", + "#.h.#", + "#.i.#", + "#.j.#", + "#.k.#", + "#.l.#", + "#.m.#", + "#.n.#", + "#.o.#", + "#.p.#", + "#.q.#" + + }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); +/* + printMatches(new String[]{ + "#.a.#", + "#.b.#", + "#.c.#", + "#.d.#", + "#.e.#", + "#.f.#", + "#.g.#", + "#.h.#", + "#.i.#", + "#.j.#", + "#.k.#", + "#.l.#", + "#.m.#", + "#.n.#", + "#.o.#", + "#.p.#", + "#.q.#", + "#.r.#", + "#.s.#", + "#.t.#", + "#.u.#", + "#.v.#", + "#.w.#", + "#.x.#", + "#.y.#", + "#.z.#" + + + },"a.b"); + + printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); + printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z"); printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c"); +*/ printMatches("",""); printMatches("a","a"); @@ -394,12 +542,19 @@ public class TopicParser TopicParser parser = new TopicParser(); + long start = System.currentTimeMillis(); for(int i = 0; i < bindingKeys.length; i++) { - TopicMatcherResult r = new TopicMatcherResult(); + System.out.println((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]); + TopicMatcherResult r = new TopicMatcherResult(){}; resultMap.put(r, bindingKeys[i]); AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]); + System.err.println("====================================================="); + System.err.println("Adding binding key: " + bindingKeyShortString); + System.err.println("-----------------------------------------------------"); + + if(i==0) { sm = parser.createStateMachine(bindingKeyShortString, r); @@ -408,6 +563,16 @@ public class TopicParser { sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r)); } + System.err.println(sm.reachableStates()); + System.err.println("====================================================="); + try + { + System.in.read(); + } + catch (IOException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } } AMQShortString routingKeyShortString = new AMQShortString(routingKey); @@ -438,7 +603,7 @@ public class TopicParser AMQShortString routingKeyShortString = new AMQShortString(routingKey); TopicParser parser = new TopicParser(); - final TopicMatcherResult result = new TopicMatcherResult(); + final TopicMatcherResult result = new TopicMatcherResult(){}; TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result); return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty(); |