summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Alexandru Ionescu <vlad@rabbitmq.com>2010-08-26 18:46:36 +0100
committerVlad Alexandru Ionescu <vlad@rabbitmq.com>2010-08-26 18:46:36 +0100
commit766295ae1ff8e59b61c93b345624eda3f146e9f4 (patch)
tree4bcaccb401e6e9928a85be849d24e59b83ba72ad
parent9398957b19e81628d057fd7ea2d7a0eb4630ab1f (diff)
downloadrabbitmq-server-766295ae1ff8e59b61c93b345624eda3f146e9f4.tar.gz
implementing topic routing with tries; adding better test for topic routing
-rw-r--r--include/rabbit.hrl6
-rw-r--r--src/rabbit_exchange_type_topic.erl253
-rw-r--r--src/rabbit_mnesia.erl17
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_tests.erl114
5 files changed, 334 insertions, 62 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b9abd788..210709b9 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -63,6 +63,12 @@
-record(binding, {exchange_name, key, queue_name, args = []}).
-record(reverse_binding, {queue_name, key, exchange_name, args = []}).
+-record(topic_trie_edge, {trie_edge, node_id}).
+-record(topic_trie_binding, {trie_binding, value = const}).
+
+-record(trie_edge, {exchange_name, node_id, word}).
+-record(trie_binding, {exchange_name, node_id, queue_name}).
+
-record(listener, {node, protocol, host, port}).
-record(basic_message, {exchange_name, routing_key, content, guid,
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index e796acf3..35f25ccb 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -30,6 +30,7 @@
%%
-module(rabbit_exchange_type_topic).
+-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-behaviour(rabbit_exchange_type).
@@ -46,59 +47,231 @@
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).
--export([topic_matches/2]).
+-export([which_matches/2]).
-ifdef(use_specs).
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-spec(which_matches/2 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key()) ->
+ [rabbit_amqqueue:name()]).
-endif.
+%%----------------------------------------------------------------------------
+
description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-publish(#exchange{name = Name}, Delivery =
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_bindings(
- Name, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end),
- Delivery).
-
-split_topic_key(Key) ->
- string:tokens(binary_to_list(Key), ".").
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest])
- when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or
- last_topic_match(P, [BacktrackNext | R], BacktrackList).
+publish(#exchange{name = X}, Delivery =
+ #delivery{message = #basic_message{routing_key = Key}}) ->
+ rabbit_router:deliver_by_queue_names(which_matches(X, Key), Delivery).
validate(_X) -> ok.
create(_X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+
+delete(#exchange{name = X}, _Bs) ->
+ rabbit_misc:execute_mnesia_transaction(fun() -> trie_remove_all_edges(X),
+ trie_remove_all_bindings(X)
+ end),
+ ok.
+
+add_binding(_Exchange, #binding{exchange_name = X, key = K, queue_name = Q}) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> FinalNode = follow_down_create(X, split_topic_key(K)),
+ trie_add_binding(X, FinalNode, Q)
+ end),
+ ok.
+
+remove_bindings(_X, Bs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> lists:foreach(fun remove_binding/1, Bs) end),
+ ok.
+
+remove_binding(#binding{exchange_name = X, key = K, queue_name = Q}) ->
+ Path = follow_down_get_path(X, split_topic_key(K)),
+ {FinalNode, _} = hd(Path),
+ trie_remove_binding(X, FinalNode, Q),
+ remove_path_if_empty(X, Path),
+ ok.
+
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
+
+%% NB: This function may return duplicate results in some situations (that's ok)
+which_matches(X, Key) ->
+ Words = split_topic_key(Key),
+ mnesia:async_dirty(fun trie_match/2, [X, Words]).
+
+%%----------------------------------------------------------------------------
+
+trie_match(X, Words) ->
+ trie_match(X, root, Words).
+trie_match(X, Node, []) ->
+ FinalRes = trie_bindings(X, Node),
+ HashRes = case trie_child(X, Node, "#") of
+ {ok, HashNode} -> trie_match(X, HashNode, []);
+ error -> []
+ end,
+ FinalRes ++ HashRes;
+trie_match(X, Node, [W | RestW] = Words) ->
+ ExactRes = case trie_child(X, Node, W) of
+ {ok, NextNode} -> trie_match(X, NextNode, RestW);
+ error -> []
+ end,
+ StarRes = case trie_child(X, Node, "*") of
+ {ok, StarNode} -> trie_match(X, StarNode, RestW);
+ error -> []
+ end,
+ HashRes = case trie_child(X, Node, "#") of
+ {ok, HashNode} -> trie_match_skip_any(X, HashNode, Words);
+ error -> []
+ end,
+ ExactRes ++ StarRes ++ HashRes.
+
+trie_match_skip_any(X, Node, []) ->
+ trie_match(X, Node, []);
+trie_match_skip_any(X, Node, [_ | RestW] = Words) ->
+ trie_match(X, Node, Words) ++
+ trie_match_skip_any(X, Node, RestW).
+
+follow_down(X, Words) ->
+ follow_down(X, root, Words).
+follow_down(_X, CurNode, []) ->
+ {ok, CurNode};
+follow_down(X, CurNode, [W | RestW]) ->
+ case trie_child(X, CurNode, W) of
+ {ok, NextNode} -> follow_down(X, NextNode, RestW);
+ error -> {error, CurNode, [W | RestW]}
+ end.
+
+follow_down_create(X, Words) ->
+ case follow_down(X, Words) of
+ {ok, FinalNode} ->
+ FinalNode;
+ {error, Node, RestW} ->
+ lists:foldl(
+ fun(W, CurNode) ->
+ NewNode = new_node(),
+ trie_add_edge(X, CurNode, NewNode, W),
+ NewNode
+ end, Node, RestW)
+ end.
+
+follow_down_get_path(X, Words) ->
+ follow_down_get_path(X, root, Words, [{root, none}]).
+follow_down_get_path(_, _, [], PathAcc) ->
+ PathAcc;
+follow_down_get_path(X, CurNode, [W | RestW], PathAcc) ->
+ {ok, NextNode} = trie_child(X, CurNode, W),
+ follow_down_get_path(X, NextNode, RestW, [{NextNode, W} | PathAcc]).
+
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case trie_has_any_bindings(X, Node) orelse
+ trie_has_any_children(X, Node) of
+ true -> ok;
+ false -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath)
+ end.
+
+trie_child(X, Node, Word) ->
+ Query = qlc:q([NextNode ||
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X1,
+ node_id = Node1,
+ word = Word1},
+ node_id = NextNode}
+ <- mnesia:table(rabbit_topic_trie_edge),
+ X1 == X,
+ Node1 == Node,
+ Word1 == Word]),
+ case qlc:e(Query) of
+ [NextNode] -> {ok, NextNode};
+ [] -> error
+ end.
+
+trie_bindings(X, Node) ->
+ MatchHead = #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ queue_name = '$1'}},
+ mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+
+trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
+trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
+trie_edge_op(X, FromNode, ToNode, W, Op) ->
+ ok = Op(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = FromNode,
+ word = W},
+ node_id = ToNode},
+ write).
+
+trie_add_binding(X, Node, Q) ->
+ trie_binding_op(X, Node, Q, fun mnesia:write/3).
+trie_remove_binding(X, Node, Q) ->
+ trie_binding_op(X, Node, Q, fun mnesia:delete_object/3).
+trie_binding_op(X, Node, Q, Op) ->
+ ok = Op(rabbit_topic_trie_binding,
+ #topic_trie_binding{trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ queue_name = Q}},
+ write).
+
+trie_has_any_children(X, Node) ->
+ MatchHead = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = '$1'},
+ _='_'},
+ Select = mnesia:select(rabbit_topic_trie_edge,
+ [{MatchHead, [], ['$1']}], 1, read),
+ select_while_no_result(Select) /= '$end_of_table'.
+
+trie_has_any_bindings(X, Node) ->
+ MatchHead = #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ queue_name = '$1'},
+ _='_'},
+ Select = mnesia:select(rabbit_topic_trie_binding,
+ [{MatchHead, [], ['$1']}], 1, read),
+ select_while_no_result(Select) /= '$end_of_table'.
+
+select_while_no_result({[], Cont}) ->
+ select_while_no_result(mnesia:select(Cont));
+select_while_no_result(Other) ->
+ Other.
+
+trie_remove_all_edges(X) ->
+ Query = qlc:q([Entry ||
+ Entry = #topic_trie_edge{
+ trie_edge = #trie_edge{exchange_name = X1,
+ _='_'},
+ _='_'}
+ <- mnesia:table(rabbit_topic_trie_edge),
+ X1 == X]),
+ lists:foreach(
+ fun(O) -> mnesia:delete_object(rabbit_topic_trie_edge, O, write) end,
+ qlc:e(Query)).
+
+trie_remove_all_bindings(X) ->
+ Query = qlc:q([Entry ||
+ Entry = #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X1,
+ _='_'},
+ _='_'}
+ <- mnesia:table(rabbit_topic_trie_binding),
+ X1 == X]),
+ lists:foreach(
+ fun(O) -> mnesia:delete_object(rabbit_topic_trie_binding, O, write) end,
+ qlc:e(Query)).
+
+new_node() ->
+ now(). % UUID
+
+split_topic_key(Key) ->
+ string:tokens(binary_to_list(Key), ".").
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 4a5adfae..37708b22 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -194,6 +194,17 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_edge,
+ [{record_name, topic_trie_edge},
+ {attributes, record_info(fields, topic_trie_edge)},
+ {type, ordered_set},
+ {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
+ {rabbit_topic_trie_binding,
+ [{record_name, topic_trie_binding},
+ {attributes, record_info(fields, topic_trie_binding)},
+ {type, ordered_set},
+ {match, #topic_trie_binding{trie_binding = trie_binding_match(),
+ _='_'}}]},
%% Consider the implications to nodes_of_type/1 before altering
%% the next entry.
{rabbit_durable_exchange,
@@ -223,6 +234,12 @@ reverse_binding_match() ->
#reverse_binding{queue_name = queue_name_match(),
exchange_name = exchange_name_match(),
_='_'}.
+trie_edge_match() ->
+ #trie_edge{exchange_name = exchange_name_match(),
+ _='_'}.
+trie_binding_match() ->
+ #trie_edge{exchange_name = exchange_name_match(),
+ _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ec049a1a..d7d6d0ad 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -34,6 +34,7 @@
-include("rabbit.hrl").
-export([deliver/2,
+ deliver_by_queue_names/2,
match_bindings/2,
match_routing_key/2]).
@@ -48,6 +49,8 @@
-spec(deliver/2 ::
([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
+-spec(deliver_by_queue_names/2 ::
+ ([binary()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
-endif.
@@ -77,6 +80,9 @@ deliver(QPids, Delivery) ->
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
{Routed, Handled}).
+deliver_by_queue_names(Qs, Delivery) ->
+ deliver(lookup_qpids(Qs), Delivery).
+
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
match_bindings(Name, Match) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 082e7877..1e7f533a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -584,30 +584,100 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
-test_topic_match(P, R) ->
- test_topic_match(P, R, true).
-
-test_topic_match(P, R, Expected) ->
- case rabbit_exchange_type_topic:topic_matches(list_to_binary(P),
- list_to_binary(R)) of
- Expected ->
- passed;
- _ ->
- {topic_match_failure, P, R}
- end.
+test_topic_expect_match(#exchange{name = XName}, List) ->
+ lists:foreach(
+ fun({Key, Expected}) ->
+ Res = rabbit_exchange_type_topic:which_matches(
+ XName, list_to_binary(Key)),
+ ExpectedRes = lists:map(
+ fun(Q) -> #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}
+ end, Expected),
+ true = (lists:usort(ExpectedRes) =:= lists:usort(Res))
+ end, List).
test_topic_matching() ->
- passed = test_topic_match("#", "test.test"),
- passed = test_topic_match("#", ""),
- passed = test_topic_match("#.T.R", "T.T.R"),
- passed = test_topic_match("#.T.R", "T.R.T.R"),
- passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"),
- passed = test_topic_match("#.test", "test"),
- passed = test_topic_match("#.test", "test.test"),
- passed = test_topic_match("#.test", "ignored.test"),
- passed = test_topic_match("#.test", "more.ignored.test"),
- passed = test_topic_match("#.test", "notmatched", false),
- passed = test_topic_match("#.z", "one.two.three.four", false),
+ XName = #resource{virtual_host = <<"/">>,
+ kind = exchange,
+ name = <<"test_exchange">>},
+ X = #exchange{name = XName, type = topic, durable = false,
+ auto_delete = false, arguments = []},
+ %% create
+ rabbit_exchange_type_topic:validate(X),
+ rabbit_exchange_type_topic:create(X),
+
+ %% add some bindings
+ Bindings = lists:map(
+ fun({Key, Q}) ->
+ #binding{exchange_name = XName,
+ key = list_to_binary(Key),
+ queue_name = #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}}
+ end, [{"a.b.c", "t1"},
+ {"a.*.c", "t2"},
+ {"a.#.b", "t3"},
+ {"a.b.b.c", "t4"},
+ {"#", "t5"},
+ {"#.#", "t6"},
+ {"#.b", "t7"},
+ {"*.*", "t8"},
+ {"a.*", "t9"},
+ {"*.b.c", "t10"},
+ {"a.#", "t11"},
+ {"a.#.#", "t12"},
+ {"b.b.c", "t13"},
+ {"a.b.b", "t14"},
+ {"a.b", "t15"},
+ {"b.c", "t16"},
+ {"", "t17"},
+ {"*.*.*", "t18"},
+ {"vodka.martini", "t19"},
+ {"a.b.c", "t20"}]),
+ lists:foreach(fun(B) -> rabbit_exchange_type_topic:add_binding(X, B) end,
+ Bindings),
+
+ %% test some matches
+ test_topic_expect_match(X,
+ [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", "t18", "t20"]},
+ {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", "t12", "t15"]},
+ {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", "t18"]},
+ {"", ["t5", "t6", "t17"]},
+ {"b.c.c", ["t5", "t6", "t18"]},
+ {"a.a.a.a.a", ["t5", "t6", "t11", "t12"]},
+ {"vodka.gin", ["t5", "t6", "t8"]},
+ {"vodka.martini", ["t5", "t6", "t8", "t19"]},
+ {"b.b.c", ["t5", "t6", "t10", "t13", "t18"]},
+ {"nothing.here.at.all", ["t5", "t6"]},
+ {"un_der_sc.ore", ["t5", "t6", "t8"]}]),
+
+ %% remove some bindings
+ RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
+ lists:nth(11, Bindings)],
+ rabbit_exchange_type_topic:remove_bindings(X, RemovedBindings),
+ RemainingBindings = ordsets:to_list(
+ ordsets:subtract(ordsets:from_list(Bindings),
+ ordsets:from_list(RemovedBindings))),
+
+ %% test some matches
+ test_topic_expect_match(X,
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18"]},
+ {"", ["t6", "t17"]},
+ {"b.c.c", ["t6", "t18"]},
+ {"a.a.a.a.a", ["t6", "t12"]},
+ {"vodka.gin", ["t6", "t8"]},
+ {"vodka.martini", ["t6", "t8", "t19"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18"]},
+ {"nothing.here.at.all", ["t6"]},
+ {"un_der_sc.ore", ["t6", "t8"]}]),
+
+ %% remove the entire exchange
+ rabbit_exchange_type_topic:delete(X, RemainingBindings),
+ %% none should match now
+ test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]),
passed.
test_app_management() ->