diff options
author | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2010-08-26 18:46:36 +0100 |
---|---|---|
committer | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2010-08-26 18:46:36 +0100 |
commit | 766295ae1ff8e59b61c93b345624eda3f146e9f4 (patch) | |
tree | 4bcaccb401e6e9928a85be849d24e59b83ba72ad | |
parent | 9398957b19e81628d057fd7ea2d7a0eb4630ab1f (diff) | |
download | rabbitmq-server-766295ae1ff8e59b61c93b345624eda3f146e9f4.tar.gz |
implementing topic routing with tries; adding better test for topic routing
-rw-r--r-- | include/rabbit.hrl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 253 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 17 | ||||
-rw-r--r-- | src/rabbit_router.erl | 6 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 114 |
5 files changed, 334 insertions, 62 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b9abd788..210709b9 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -63,6 +63,12 @@ -record(binding, {exchange_name, key, queue_name, args = []}). -record(reverse_binding, {queue_name, key, exchange_name, args = []}). +-record(topic_trie_edge, {trie_edge, node_id}). +-record(topic_trie_binding, {trie_binding, value = const}). + +-record(trie_edge, {exchange_name, node_id, word}). +-record(trie_binding, {exchange_name, node_id, queue_name}). + -record(listener, {node, protocol, host, port}). -record(basic_message, {exchange_name, routing_key, content, guid, diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index e796acf3..35f25ccb 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -30,6 +30,7 @@ %% -module(rabbit_exchange_type_topic). +-include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -behaviour(rabbit_exchange_type). @@ -46,59 +47,231 @@ {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). --export([topic_matches/2]). +-export([which_matches/2]). -ifdef(use_specs). --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-spec(which_matches/2 :: + (rabbit_exchange:name(), rabbit_router:routing_key()) -> + [rabbit_amqqueue:name()]). -endif. +%%---------------------------------------------------------------------------- + description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end), - Delivery). - -split_topic_key(Key) -> - string:tokens(binary_to_list(Key), "."). - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) - when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or - last_topic_match(P, [BacktrackNext | R], BacktrackList). +publish(#exchange{name = X}, Delivery = + #delivery{message = #basic_message{routing_key = Key}}) -> + rabbit_router:deliver_by_queue_names(which_matches(X, Key), Delivery). validate(_X) -> ok. create(_X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. + +delete(#exchange{name = X}, _Bs) -> + rabbit_misc:execute_mnesia_transaction(fun() -> trie_remove_all_edges(X), + trie_remove_all_bindings(X) + end), + ok. + +add_binding(_Exchange, #binding{exchange_name = X, key = K, queue_name = Q}) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> FinalNode = follow_down_create(X, split_topic_key(K)), + trie_add_binding(X, FinalNode, Q) + end), + ok. + +remove_bindings(_X, Bs) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> lists:foreach(fun remove_binding/1, Bs) end), + ok. + +remove_binding(#binding{exchange_name = X, key = K, queue_name = Q}) -> + Path = follow_down_get_path(X, split_topic_key(K)), + {FinalNode, _} = hd(Path), + trie_remove_binding(X, FinalNode, Q), + remove_path_if_empty(X, Path), + ok. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). + +%% NB: This function may return duplicate results in some situations (that's ok) +which_matches(X, Key) -> + Words = split_topic_key(Key), + mnesia:async_dirty(fun trie_match/2, [X, Words]). + +%%---------------------------------------------------------------------------- + +trie_match(X, Words) -> + trie_match(X, root, Words). +trie_match(X, Node, []) -> + FinalRes = trie_bindings(X, Node), + HashRes = case trie_child(X, Node, "#") of + {ok, HashNode} -> trie_match(X, HashNode, []); + error -> [] + end, + FinalRes ++ HashRes; +trie_match(X, Node, [W | RestW] = Words) -> + ExactRes = case trie_child(X, Node, W) of + {ok, NextNode} -> trie_match(X, NextNode, RestW); + error -> [] + end, + StarRes = case trie_child(X, Node, "*") of + {ok, StarNode} -> trie_match(X, StarNode, RestW); + error -> [] + end, + HashRes = case trie_child(X, Node, "#") of + {ok, HashNode} -> trie_match_skip_any(X, HashNode, Words); + error -> [] + end, + ExactRes ++ StarRes ++ HashRes. + +trie_match_skip_any(X, Node, []) -> + trie_match(X, Node, []); +trie_match_skip_any(X, Node, [_ | RestW] = Words) -> + trie_match(X, Node, Words) ++ + trie_match_skip_any(X, Node, RestW). + +follow_down(X, Words) -> + follow_down(X, root, Words). +follow_down(_X, CurNode, []) -> + {ok, CurNode}; +follow_down(X, CurNode, [W | RestW]) -> + case trie_child(X, CurNode, W) of + {ok, NextNode} -> follow_down(X, NextNode, RestW); + error -> {error, CurNode, [W | RestW]} + end. + +follow_down_create(X, Words) -> + case follow_down(X, Words) of + {ok, FinalNode} -> + FinalNode; + {error, Node, RestW} -> + lists:foldl( + fun(W, CurNode) -> + NewNode = new_node(), + trie_add_edge(X, CurNode, NewNode, W), + NewNode + end, Node, RestW) + end. + +follow_down_get_path(X, Words) -> + follow_down_get_path(X, root, Words, [{root, none}]). +follow_down_get_path(_, _, [], PathAcc) -> + PathAcc; +follow_down_get_path(X, CurNode, [W | RestW], PathAcc) -> + {ok, NextNode} = trie_child(X, CurNode, W), + follow_down_get_path(X, NextNode, RestW, [{NextNode, W} | PathAcc]). + +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case trie_has_any_bindings(X, Node) orelse + trie_has_any_children(X, Node) of + true -> ok; + false -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath) + end. + +trie_child(X, Node, Word) -> + Query = qlc:q([NextNode || + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X1, + node_id = Node1, + word = Word1}, + node_id = NextNode} + <- mnesia:table(rabbit_topic_trie_edge), + X1 == X, + Node1 == Node, + Word1 == Word]), + case qlc:e(Query) of + [NextNode] -> {ok, NextNode}; + [] -> error + end. + +trie_bindings(X, Node) -> + MatchHead = #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + queue_name = '$1'}}, + mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). + +trie_add_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). +trie_remove_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). +trie_edge_op(X, FromNode, ToNode, W, Op) -> + ok = Op(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = FromNode, + word = W}, + node_id = ToNode}, + write). + +trie_add_binding(X, Node, Q) -> + trie_binding_op(X, Node, Q, fun mnesia:write/3). +trie_remove_binding(X, Node, Q) -> + trie_binding_op(X, Node, Q, fun mnesia:delete_object/3). +trie_binding_op(X, Node, Q, Op) -> + ok = Op(rabbit_topic_trie_binding, + #topic_trie_binding{trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + queue_name = Q}}, + write). + +trie_has_any_children(X, Node) -> + MatchHead = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = Node, + word = '$1'}, + _='_'}, + Select = mnesia:select(rabbit_topic_trie_edge, + [{MatchHead, [], ['$1']}], 1, read), + select_while_no_result(Select) /= '$end_of_table'. + +trie_has_any_bindings(X, Node) -> + MatchHead = #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + queue_name = '$1'}, + _='_'}, + Select = mnesia:select(rabbit_topic_trie_binding, + [{MatchHead, [], ['$1']}], 1, read), + select_while_no_result(Select) /= '$end_of_table'. + +select_while_no_result({[], Cont}) -> + select_while_no_result(mnesia:select(Cont)); +select_while_no_result(Other) -> + Other. + +trie_remove_all_edges(X) -> + Query = qlc:q([Entry || + Entry = #topic_trie_edge{ + trie_edge = #trie_edge{exchange_name = X1, + _='_'}, + _='_'} + <- mnesia:table(rabbit_topic_trie_edge), + X1 == X]), + lists:foreach( + fun(O) -> mnesia:delete_object(rabbit_topic_trie_edge, O, write) end, + qlc:e(Query)). + +trie_remove_all_bindings(X) -> + Query = qlc:q([Entry || + Entry = #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X1, + _='_'}, + _='_'} + <- mnesia:table(rabbit_topic_trie_binding), + X1 == X]), + lists:foreach( + fun(O) -> mnesia:delete_object(rabbit_topic_trie_binding, O, write) end, + qlc:e(Query)). + +new_node() -> + now(). % UUID + +split_topic_key(Key) -> + string:tokens(binary_to_list(Key), "."). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4a5adfae..37708b22 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -194,6 +194,17 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_edge, + [{record_name, topic_trie_edge}, + {attributes, record_info(fields, topic_trie_edge)}, + {type, ordered_set}, + {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, + {rabbit_topic_trie_binding, + [{record_name, topic_trie_binding}, + {attributes, record_info(fields, topic_trie_binding)}, + {type, ordered_set}, + {match, #topic_trie_binding{trie_binding = trie_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, @@ -223,6 +234,12 @@ reverse_binding_match() -> #reverse_binding{queue_name = queue_name_match(), exchange_name = exchange_name_match(), _='_'}. +trie_edge_match() -> + #trie_edge{exchange_name = exchange_name_match(), + _='_'}. +trie_binding_match() -> + #trie_edge{exchange_name = exchange_name_match(), + _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ec049a1a..d7d6d0ad 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -34,6 +34,7 @@ -include("rabbit.hrl"). -export([deliver/2, + deliver_by_queue_names/2, match_bindings/2, match_routing_key/2]). @@ -48,6 +49,8 @@ -spec(deliver/2 :: ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). +-spec(deliver_by_queue_names/2 :: + ([binary()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). -endif. @@ -77,6 +80,9 @@ deliver(QPids, Delivery) -> check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, {Routed, Handled}). +deliver_by_queue_names(Qs, Delivery) -> + deliver(lookup_qpids(Qs), Delivery). + %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange match_bindings(Name, Match) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 082e7877..1e7f533a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -584,30 +584,100 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). -test_topic_match(P, R) -> - test_topic_match(P, R, true). - -test_topic_match(P, R, Expected) -> - case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), - list_to_binary(R)) of - Expected -> - passed; - _ -> - {topic_match_failure, P, R} - end. +test_topic_expect_match(#exchange{name = XName}, List) -> + lists:foreach( + fun({Key, Expected}) -> + Res = rabbit_exchange_type_topic:which_matches( + XName, list_to_binary(Key)), + ExpectedRes = lists:map( + fun(Q) -> #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)} + end, Expected), + true = (lists:usort(ExpectedRes) =:= lists:usort(Res)) + end, List). test_topic_matching() -> - passed = test_topic_match("#", "test.test"), - passed = test_topic_match("#", ""), - passed = test_topic_match("#.T.R", "T.T.R"), - passed = test_topic_match("#.T.R", "T.R.T.R"), - passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"), - passed = test_topic_match("#.test", "test"), - passed = test_topic_match("#.test", "test.test"), - passed = test_topic_match("#.test", "ignored.test"), - passed = test_topic_match("#.test", "more.ignored.test"), - passed = test_topic_match("#.test", "notmatched", false), - passed = test_topic_match("#.z", "one.two.three.four", false), + XName = #resource{virtual_host = <<"/">>, + kind = exchange, + name = <<"test_exchange">>}, + X = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + %% create + rabbit_exchange_type_topic:validate(X), + rabbit_exchange_type_topic:create(X), + + %% add some bindings + Bindings = lists:map( + fun({Key, Q}) -> + #binding{exchange_name = XName, + key = list_to_binary(Key), + queue_name = #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)}} + end, [{"a.b.c", "t1"}, + {"a.*.c", "t2"}, + {"a.#.b", "t3"}, + {"a.b.b.c", "t4"}, + {"#", "t5"}, + {"#.#", "t6"}, + {"#.b", "t7"}, + {"*.*", "t8"}, + {"a.*", "t9"}, + {"*.b.c", "t10"}, + {"a.#", "t11"}, + {"a.#.#", "t12"}, + {"b.b.c", "t13"}, + {"a.b.b", "t14"}, + {"a.b", "t15"}, + {"b.c", "t16"}, + {"", "t17"}, + {"*.*.*", "t18"}, + {"vodka.martini", "t19"}, + {"a.b.c", "t20"}]), + lists:foreach(fun(B) -> rabbit_exchange_type_topic:add_binding(X, B) end, + Bindings), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", "t18", "t20"]}, + {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", "t12", "t15"]}, + {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", "t18"]}, + {"", ["t5", "t6", "t17"]}, + {"b.c.c", ["t5", "t6", "t18"]}, + {"a.a.a.a.a", ["t5", "t6", "t11", "t12"]}, + {"vodka.gin", ["t5", "t6", "t8"]}, + {"vodka.martini", ["t5", "t6", "t8", "t19"]}, + {"b.b.c", ["t5", "t6", "t10", "t13", "t18"]}, + {"nothing.here.at.all", ["t5", "t6"]}, + {"un_der_sc.ore", ["t5", "t6", "t8"]}]), + + %% remove some bindings + RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), + lists:nth(11, Bindings)], + rabbit_exchange_type_topic:remove_bindings(X, RemovedBindings), + RemainingBindings = ordsets:to_list( + ordsets:subtract(ordsets:from_list(Bindings), + ordsets:from_list(RemovedBindings))), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20"]}, + {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15"]}, + {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18"]}, + {"", ["t6", "t17"]}, + {"b.c.c", ["t6", "t18"]}, + {"a.a.a.a.a", ["t6", "t12"]}, + {"vodka.gin", ["t6", "t8"]}, + {"vodka.martini", ["t6", "t8", "t19"]}, + {"b.b.c", ["t6", "t10", "t13", "t18"]}, + {"nothing.here.at.all", ["t6"]}, + {"un_der_sc.ore", ["t6", "t8"]}]), + + %% remove the entire exchange + rabbit_exchange_type_topic:delete(X, RemainingBindings), + %% none should match now + test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. test_app_management() -> |