summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Alexandru Ionescu <vlad@rabbitmq.com>2011-01-19 15:55:06 +0000
committerVlad Alexandru Ionescu <vlad@rabbitmq.com>2011-01-19 15:55:06 +0000
commit9b99266e92bbf03f9cbe4cd7f062e0c3e3c2904a (patch)
tree1a7c8e99b82d8bbf6838b087755d874d301e468a
parenta3ff54bc80fbe7cef61edc4a53142204f99f8f38 (diff)
parent45a97a0535d43f2556cb8caa39af0ff00538ea5c (diff)
downloadrabbitmq-server-9b99266e92bbf03f9cbe4cd7f062e0c3e3c2904a.tar.gz
merging in from default
-rw-r--r--include/rabbit.hrl6
-rw-r--r--src/rabbit_exchange_type_topic.erl246
-rw-r--r--src/rabbit_mnesia.erl17
-rw-r--r--src/rabbit_tests.erl133
4 files changed, 335 insertions, 67 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 15f5d7c5..a4b80b09 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -54,6 +54,12 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
+-record(topic_trie_edge, {trie_edge, node_id}).
+-record(topic_trie_binding, {trie_binding, value = const}).
+
+-record(trie_edge, {exchange_name, node_id, word}).
+-record(trie_binding, {exchange_name, node_id, destination}).
+
-record(listener, {node, protocol, host, ip_address, port}).
-record(basic_message, {exchange_name, routing_key, content, guid,
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 9cbf8100..2da3f3ee 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -15,6 +15,7 @@
%%
-module(rabbit_exchange_type_topic).
+
-include("rabbit.hrl").
-behaviour(rabbit_exchange_type).
@@ -31,58 +32,215 @@
{requires, rabbit_registry},
{enables, kernel_ready}]}).
--export([topic_matches/2]).
-
--ifdef(use_specs).
-
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
-
--endif.
+%%----------------------------------------------------------------------------
description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_bindings(Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end).
-
-split_topic_key(Key) ->
- string:tokens(binary_to_list(Key), ".").
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest])
- when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or
- last_topic_match(P, [BacktrackNext | R], BacktrackList).
+%% NB: This may return duplicate results in some situations (that's ok)
+route(#exchange{name = X},
+ #delivery{message = #basic_message{routing_key = Key}}) ->
+ Words = split_topic_key(Key),
+ mnesia:async_dirty(fun trie_match/2, [X, Words]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_Tx, _X, _Bs) -> ok.
-add_binding(_Tx, _X, _B) -> ok.
-remove_bindings(_Tx, _X, _Bs) -> ok.
+
+delete(_Tx, #exchange{name = X}, _Bs) ->
+ rabbit_misc:execute_mnesia_transaction(fun () -> trie_remove_all_edges(X),
+ trie_remove_all_bindings(X)
+ end),
+ ok.
+
+add_binding(_Tx, _Exchange, #binding{source = X, key = K, destination = D}) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> FinalNode = follow_down_create(X, split_topic_key(K)),
+ trie_add_binding(X, FinalNode, D)
+ end),
+ ok.
+
+remove_bindings(_Tx, _X, Bs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> lists:foreach(fun remove_binding/1, Bs) end),
+ ok.
+
+remove_binding(#binding{source = X, key = K, destination = D}) ->
+ Path = follow_down_get_path(X, split_topic_key(K)),
+ {FinalNode, _} = hd(Path),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path),
+ ok.
+
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
+
+%%----------------------------------------------------------------------------
+
+trie_match(X, Words) ->
+ trie_match(X, root, Words).
+
+trie_match(X, Node, []) ->
+ FinalRes = trie_bindings(X, Node),
+ HashRes = case trie_child(X, Node, "#") of
+ {ok, HashNode} -> trie_match(X, HashNode, []);
+ error -> []
+ end,
+ FinalRes ++ HashRes;
+trie_match(X, Node, [W | RestW] = Words) ->
+ ExactRes = case trie_child(X, Node, W) of
+ {ok, NextNode} -> trie_match(X, NextNode, RestW);
+ error -> []
+ end,
+ StarRes = case trie_child(X, Node, "*") of
+ {ok, StarNode} -> trie_match(X, StarNode, RestW);
+ error -> []
+ end,
+ HashRes = case trie_child(X, Node, "#") of
+ {ok, HashNode} -> trie_match_skip_any(X, HashNode, Words);
+ error -> []
+ end,
+ ExactRes ++ StarRes ++ HashRes.
+
+trie_match_skip_any(X, Node, []) ->
+ trie_match(X, Node, []);
+trie_match_skip_any(X, Node, [_ | RestW] = Words) ->
+ trie_match(X, Node, Words) ++ trie_match_skip_any(X, Node, RestW).
+
+follow_down(X, Words) ->
+ follow_down(X, root, Words).
+
+follow_down(_X, CurNode, []) ->
+ {ok, CurNode};
+follow_down(X, CurNode, [W | RestW]) ->
+ case trie_child(X, CurNode, W) of
+ {ok, NextNode} -> follow_down(X, NextNode, RestW);
+ error -> {error, CurNode, [W | RestW]}
+ end.
+
+follow_down_create(X, Words) ->
+ case follow_down(X, Words) of
+ {ok, FinalNode} -> FinalNode;
+ {error, Node, RestW} -> lists:foldl(
+ fun (W, CurNode) ->
+ NewNode = new_node_id(),
+ trie_add_edge(X, CurNode, NewNode, W),
+ NewNode
+ end, Node, RestW)
+ end.
+
+follow_down_get_path(X, Words) ->
+ follow_down_get_path(X, root, Words, [{root, none}]).
+
+follow_down_get_path(_, _, [], PathAcc) ->
+ PathAcc;
+follow_down_get_path(X, CurNode, [W | RestW], PathAcc) ->
+ {ok, NextNode} = trie_child(X, CurNode, W),
+ follow_down_get_path(X, NextNode, RestW, [{NextNode, W} | PathAcc]).
+
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of
+ true -> ok;
+ false -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath)
+ end.
+
+trie_child(X, Node, Word) ->
+ case mnesia:read(rabbit_topic_trie_edge, #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}) of
+ [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
+ [] -> error
+ end.
+
+trie_bindings(X, Node) ->
+ MatchHead = #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = '$1'}},
+ mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+
+trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
+
+trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
+
+trie_edge_op(X, FromNode, ToNode, W, Op) ->
+ ok = Op(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = FromNode,
+ word = W},
+ node_id = ToNode},
+ write).
+
+trie_add_binding(X, Node, D) ->
+ trie_binding_op(X, Node, D, fun mnesia:write/3).
+
+trie_remove_binding(X, Node, D) ->
+ trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
+
+trie_binding_op(X, Node, D, Op) ->
+ ok = Op(rabbit_topic_trie_binding,
+ #topic_trie_binding{trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = D}},
+ write).
+
+trie_has_any_children(X, Node) ->
+ MatchHead = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = Node,
+ _='_'},
+ _='_'},
+ Select = mnesia:select(rabbit_topic_trie_edge,
+ [{MatchHead, [], ['$_']}], 1, read),
+ select_while_no_result(Select) /= '$end_of_table'.
+
+trie_has_any_bindings(X, Node) ->
+ MatchHead = #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ _='_'},
+ _='_'},
+ Select = mnesia:select(rabbit_topic_trie_binding,
+ [{MatchHead, [], ['$_']}], 1, read),
+ select_while_no_result(Select) /= '$end_of_table'.
+
+select_while_no_result({[], Cont}) ->
+ select_while_no_result(mnesia:select(Cont));
+select_while_no_result(Other) ->
+ Other.
+
+trie_remove_all_edges(X) ->
+ Pattern = #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ _='_'},
+ _='_'},
+ lists:foreach(
+ fun (R) -> mnesia:delete_object(rabbit_topic_trie_edge, R, write) end,
+ mnesia:match_object(rabbit_topic_trie_edge, Pattern, write)).
+
+trie_remove_all_bindings(X) ->
+ Pattern = #topic_trie_binding{trie_binding = #trie_binding{exchange_name =X,
+ _='_'},
+ _='_'},
+ lists:foreach(
+ fun (R) -> mnesia:delete_object(rabbit_topic_trie_binding, R, write) end,
+ mnesia:match_object(rabbit_topic_trie_binding, Pattern, write)).
+
+new_node_id() ->
+ rabbit_guid:guid().
+
+split_topic_key(Key) ->
+ split_topic_key(Key, [], []).
+
+split_topic_key(<<>>, [], []) ->
+ [];
+split_topic_key(<<>>, RevWordAcc, RevResAcc) ->
+ lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]);
+split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
+ split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
+split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
+ split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a9b4e177..9bebae4b 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -185,6 +185,17 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_edge,
+ [{record_name, topic_trie_edge},
+ {attributes, record_info(fields, topic_trie_edge)},
+ {type, ordered_set},
+ {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
+ {rabbit_topic_trie_binding,
+ [{record_name, topic_trie_binding},
+ {attributes, record_info(fields, topic_trie_binding)},
+ {type, ordered_set},
+ {match, #topic_trie_binding{trie_binding = trie_binding_match(),
+ _='_'}}]},
%% Consider the implications to nodes_of_type/1 before altering
%% the next entry.
{rabbit_durable_exchange,
@@ -216,6 +227,12 @@ reverse_binding_match() ->
_='_'}.
binding_destination_match() ->
resource_match('_').
+trie_edge_match() ->
+ #trie_edge{exchange_name = exchange_name_match(),
+ _='_'}.
+trie_binding_match() ->
+ #trie_edge{exchange_name = exchange_name_match(),
+ _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 49b09508..b80f3692 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -580,32 +580,119 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
-test_topic_match(P, R) ->
- test_topic_match(P, R, true).
-
-test_topic_match(P, R, Expected) ->
- case rabbit_exchange_type_topic:topic_matches(list_to_binary(P),
- list_to_binary(R)) of
- Expected ->
- passed;
- _ ->
- {topic_match_failure, P, R}
- end.
-
test_topic_matching() ->
- passed = test_topic_match("#", "test.test"),
- passed = test_topic_match("#", ""),
- passed = test_topic_match("#.T.R", "T.T.R"),
- passed = test_topic_match("#.T.R", "T.R.T.R"),
- passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"),
- passed = test_topic_match("#.test", "test"),
- passed = test_topic_match("#.test", "test.test"),
- passed = test_topic_match("#.test", "ignored.test"),
- passed = test_topic_match("#.test", "more.ignored.test"),
- passed = test_topic_match("#.test", "notmatched", false),
- passed = test_topic_match("#.z", "one.two.three.four", false),
+ XName = #resource{virtual_host = <<"/">>,
+ kind = exchange,
+ name = <<"test_exchange">>},
+ X = #exchange{name = XName, type = topic, durable = false,
+ auto_delete = false, arguments = []},
+ %% create
+ rabbit_exchange_type_topic:validate(X),
+ rabbit_exchange_type_topic:create(X),
+
+ %% add some bindings
+ Bindings = lists:map(
+ fun ({Key, Q}) ->
+ #binding{source = XName,
+ key = list_to_binary(Key),
+ destination = #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}}
+ end, [{"a.b.c", "t1"},
+ {"a.*.c", "t2"},
+ {"a.#.b", "t3"},
+ {"a.b.b.c", "t4"},
+ {"#", "t5"},
+ {"#.#", "t6"},
+ {"#.b", "t7"},
+ {"*.*", "t8"},
+ {"a.*", "t9"},
+ {"*.b.c", "t10"},
+ {"a.#", "t11"},
+ {"a.#.#", "t12"},
+ {"b.b.c", "t13"},
+ {"a.b.b", "t14"},
+ {"a.b", "t15"},
+ {"b.c", "t16"},
+ {"", "t17"},
+ {"*.*.*", "t18"},
+ {"vodka.martini", "t19"},
+ {"a.b.c", "t20"},
+ {"*.#", "t21"},
+ {"#.*.#", "t22"},
+ {"*.#.#", "t23"},
+ {"#.#.#", "t24"},
+ {"*", "t25"},
+ {"#.b.#", "t26"}]),
+ lists:foreach(fun (B) -> rabbit_exchange_type_topic:add_binding(X, B) end,
+ Bindings),
+
+ %% test some matches
+ test_topic_expect_match(X,
+ [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", "t18", "t20",
+ "t21", "t22", "t23", "t24", "t26"]},
+ {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", "t12", "t15",
+ "t21", "t22", "t23", "t24", "t26"]},
+ {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", "t18", "t21",
+ "t22", "t23", "t24", "t26"]},
+ {"", ["t5", "t6", "t17", "t24"]},
+ {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
+ "t24"]},
+ {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22", "t23",
+ "t24", "t26"]},
+ {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
+ {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", "t25"]}]),
+
+ %% remove some bindings
+ RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
+ lists:nth(11, Bindings), lists:nth(19, Bindings),
+ lists:nth(21, Bindings)],
+ rabbit_exchange_type_topic:remove_bindings(X, RemovedBindings),
+ RemainingBindings = ordsets:to_list(
+ ordsets:subtract(ordsets:from_list(Bindings),
+ ordsets:from_list(RemovedBindings))),
+
+ %% test some matches
+ test_topic_expect_match(X,
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", "t23",
+ "t24", "t26"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", "t22", "t23",
+ "t24", "t26"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", "t23",
+ "t24", "t26"]},
+ {"", ["t6", "t17", "t24"]},
+ {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", "t24", "t26"]},
+ {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+
+ %% remove the entire exchange
+ rabbit_exchange_type_topic:delete(X, RemainingBindings),
+ %% none should match now
+ test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]),
passed.
+test_topic_expect_match(X, List) ->
+ lists:foreach(
+ fun ({Key, Expected}) ->
+ BinKey = list_to_binary(Key),
+ Res = rabbit_exchange_type_topic:route(
+ X, #delivery{message = #basic_message{routing_key =
+ BinKey}}),
+ ExpectedRes = lists:map(
+ fun (Q) -> #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}
+ end, Expected),
+ true = (lists:usort(ExpectedRes) =:= lists:usort(Res))
+ end, List).
+
test_app_management() ->
%% starting, stopping, status
ok = control_action(stop_app, []),