summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java171
1 files changed, 168 insertions, 3 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
index f564bbb565..3e9facf412 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
@@ -4,6 +4,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQShortStringTokenizer;
import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.io.IOException;
/*
*
@@ -30,6 +32,7 @@ public class TopicParser
private static final byte TOPIC_DELIMITER = (byte)'.';
private final TopicWordDictionary _dictionary = new TopicWordDictionary();
+ private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
private static class Position
{
@@ -37,6 +40,7 @@ public class TopicParser
private final boolean _selfTransition;
private final int _position;
private final boolean _endState;
+ private boolean _followedByAnyLoop;
public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
@@ -59,6 +63,43 @@ public class TopicParser
}
+ public void addBinding(AMQShortString bindingKey, TopicMatcherResult result)
+ {
+
+ TopicMatcherDFAState startingStateMachine;
+ TopicMatcherDFAState newStateMachine;
+
+ do
+ {
+ startingStateMachine = _stateMachine.get();
+ if(startingStateMachine == null)
+ {
+ newStateMachine = createStateMachine(bindingKey, result);
+ }
+ else
+ {
+ newStateMachine = startingStateMachine.mergeStateMachines(createStateMachine(bindingKey, result));
+ }
+
+ }
+ while(!_stateMachine.compareAndSet(startingStateMachine,newStateMachine));
+
+ }
+
+ public Collection<TopicMatcherResult> parse(AMQShortString routingKey)
+ {
+ TopicMatcherDFAState stateMachine = _stateMachine.get();
+ if(stateMachine == null)
+ {
+ return Collections.EMPTY_SET;
+ }
+ else
+ {
+ return stateMachine.parse(_dictionary,routingKey);
+ }
+ }
+
+
TopicMatcherDFAState createStateMachine(AMQShortString bindingKey, TopicMatcherResult result)
{
List<TopicWord> wordList = createTopicWordList(bindingKey);
@@ -108,7 +149,6 @@ public class TopicParser
}
-
int pos = 0;
int wordPos = 0;
@@ -131,6 +171,31 @@ public class TopicParser
}
+
+ for(int p = 0; p<positionCount; p++)
+ {
+ boolean followedByWildcards = true;
+
+ int n = p;
+ while(followedByWildcards && n<(positionCount+1))
+ {
+
+ if(positions[n]._selfTransition)
+ {
+ break;
+ }
+ else if(positions[n]._word!=TopicWord.ANY_WORD)
+ {
+ followedByWildcards = false;
+ }
+ n++;
+ }
+
+
+ positions[p]._followedByAnyLoop = followedByWildcards && (n!= positionCount+1);
+ }
+
+
// from each position you transition to a set of other positions.
// we approach this by examining steps of increasing length - so we
// look how far we can go from the start position in 1 word, 2 words, etc...
@@ -258,6 +323,32 @@ public class TopicParser
{
dest.setValue(Collections.singleton(loopingTerminal));
}
+ else
+ {
+ Position anyLoop = null;
+ for(Position destPos : dest.getValue())
+ {
+ if(destPos._followedByAnyLoop)
+ {
+ if(anyLoop == null || anyLoop._position<destPos._position)
+ {
+ anyLoop = destPos;
+ }
+ }
+ }
+ if(anyLoop != null)
+ {
+ Collection<Position> removals = new ArrayList<Position>();
+ for(Position destPos : dest.getValue())
+ {
+ if(destPos._position < anyLoop._position)
+ {
+ removals.add(destPos);
+ }
+ }
+ dest.getValue().removeAll(removals);
+ }
+ }
SimpleState stateForEntry = stateMap.get(dest.getValue());
if(stateForEntry == null)
@@ -332,8 +423,65 @@ public class TopicParser
public static void main(String[] args)
{
+
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.*.q.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+ printMatches(new String[]{
+ "#.a.#",
+ "#.b.#",
+ "#.c.#",
+ "#.d.#",
+ "#.e.#",
+ "#.f.#",
+ "#.g.#",
+ "#.h.#",
+ "#.i.#",
+ "#.j.#",
+ "#.k.#",
+ "#.l.#",
+ "#.m.#",
+ "#.n.#",
+ "#.o.#",
+ "#.p.#",
+ "#.q.#"
+
+ }, "a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+/*
+ printMatches(new String[]{
+ "#.a.#",
+ "#.b.#",
+ "#.c.#",
+ "#.d.#",
+ "#.e.#",
+ "#.f.#",
+ "#.g.#",
+ "#.h.#",
+ "#.i.#",
+ "#.j.#",
+ "#.k.#",
+ "#.l.#",
+ "#.m.#",
+ "#.n.#",
+ "#.o.#",
+ "#.p.#",
+ "#.q.#",
+ "#.r.#",
+ "#.s.#",
+ "#.t.#",
+ "#.u.#",
+ "#.v.#",
+ "#.w.#",
+ "#.x.#",
+ "#.y.#",
+ "#.z.#"
+
+
+ },"a.b");
+
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
+ printMatches("#.b.*.*.*.*.*.h.#.j.*.*.*.*.*.p.#.r.*.*.*.*.*.*.*.*","a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z");
printMatches("a.#.b.#","a.b.b.b.b.b.b.b.c");
+*/
printMatches("","");
printMatches("a","a");
@@ -394,12 +542,19 @@ public class TopicParser
TopicParser parser = new TopicParser();
+ long start = System.currentTimeMillis();
for(int i = 0; i < bindingKeys.length; i++)
{
- TopicMatcherResult r = new TopicMatcherResult();
+ System.out.println((System.currentTimeMillis() - start) + ":\t" + bindingKeys[i]);
+ TopicMatcherResult r = new TopicMatcherResult(){};
resultMap.put(r, bindingKeys[i]);
AMQShortString bindingKeyShortString = new AMQShortString(bindingKeys[i]);
+ System.err.println("=====================================================");
+ System.err.println("Adding binding key: " + bindingKeyShortString);
+ System.err.println("-----------------------------------------------------");
+
+
if(i==0)
{
sm = parser.createStateMachine(bindingKeyShortString, r);
@@ -408,6 +563,16 @@ public class TopicParser
{
sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeyShortString, r));
}
+ System.err.println(sm.reachableStates());
+ System.err.println("=====================================================");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
AMQShortString routingKeyShortString = new AMQShortString(routingKey);
@@ -438,7 +603,7 @@ public class TopicParser
AMQShortString routingKeyShortString = new AMQShortString(routingKey);
TopicParser parser = new TopicParser();
- final TopicMatcherResult result = new TopicMatcherResult();
+ final TopicMatcherResult result = new TopicMatcherResult(){};
TopicMatcherDFAState sm = parser.createStateMachine(bindingKeyShortString, result);
return !sm.parse(parser._dictionary,routingKeyShortString).isEmpty();