diff options
Diffstat (limited to 'src/rabbit_exchange_type_topic.erl')
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 132 |
1 files changed, 77 insertions, 55 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index c1741b30..74c566b8 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -40,19 +40,15 @@ description() -> %% NB: This may return duplicate results in some situations (that's ok) route(#exchange{name = X}, - #delivery{message = #basic_message{routing_key = Key}}) -> - Words = split_topic_key(Key), - mnesia:async_dirty(fun trie_match/2, [X, Words]). + #delivery{message = #basic_message{routing_keys = Routes}}) -> + lists:append([begin + Words = split_topic_key(RKey), + mnesia:async_dirty(fun trie_match/2, [X, Words]) + end || RKey <- Routes]). validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_Exchange, Bs) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) - end). - delete(true, #exchange{name = X}, _Bs) -> trie_remove_all_edges(X), trie_remove_all_bindings(X), @@ -65,17 +61,58 @@ add_binding(true, _Exchange, Binding) -> add_binding(false, _Exchange, _Binding) -> ok. -remove_bindings(true, _X, Bs) -> - lists:foreach(fun remove_binding/1, Bs), +remove_bindings(true, #exchange{name = X}, Bs) -> + %% The remove process is split into two distinct phases. In the + %% first phase we gather the lists of bindings and edges to + %% delete, then in the second phase we process all the + %% deletions. This is to prevent interleaving of read/write + %% operations in mnesia that can adversely affect performance. + {ToDelete, Paths} = + lists:foldl( + fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) -> + Path = [{FinalNode, _} | _] = + follow_down_get_path(S, split_topic_key(K)), + {[{FinalNode, D} | Acc], + decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} + end, {[], gb_trees:empty()}, Bs), + + [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], + [trie_remove_edge(X, Parent, Node, W) || + {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], ok; remove_bindings(false, _X, _Bs) -> ok. -remove_binding(#binding{source = X, key = K, destination = D}) -> - Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), - trie_remove_binding(X, FinalNode, D), - remove_path_if_empty(X, Path), - ok. +maybe_add_path(_X, [{root, none}], PathAcc) -> + PathAcc; +maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> + case gb_trees:is_defined(Node, PathAcc) of + true -> PathAcc; + false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node), + trie_child_count(X, Node)}}, + PathAcc) + end. + +decrement_bindings(X, Path, PathAcc) -> + with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end, + Path, PathAcc). + +decrement_edges(X, Path, PathAcc) -> + with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end, + Path, PathAcc). + +with_path_acc(_X, _Fun, [{root, none}], PathAcc) -> + PathAcc; +with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) -> + {Parent, W, Counts} = gb_trees:get(Node, PathAcc), + NewCounts = Fun(Counts), + NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc), + case NewCounts of + {0, 0} -> decrement_edges(X, ParentPath, + maybe_add_path(X, ParentPath, NewPathAcc)); + _ -> NewPathAcc + end. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). @@ -144,29 +181,20 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> error -> {error, Acc, Words} end. -remove_path_if_empty(_, [{root, none}]) -> - ok; -remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> - case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of - true -> ok; - false -> trie_remove_edge(X, Parent, Node, W), - remove_path_if_empty(X, RestPath) - end. - trie_child(X, Node, Word) -> - case mnesia:read(rabbit_topic_trie_edge, - #trie_edge{exchange_name = X, - node_id = Node, - word = Word}) of + case mnesia:read({rabbit_topic_trie_edge, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}}) of [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; [] -> error end. trie_bindings(X, Node) -> MatchHead = #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - destination = '$1'}}, + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = '$1'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). trie_add_edge(X, FromNode, ToNode, W) -> @@ -192,25 +220,28 @@ trie_remove_binding(X, Node, D) -> trie_binding_op(X, Node, D, Op) -> ok = Op(rabbit_topic_trie_binding, #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - destination = D}}, + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = D}}, write). -trie_has_any_children(X, Node) -> - has_any(rabbit_topic_trie_edge, +trie_child_count(X, Node) -> + count(rabbit_topic_trie_edge, #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, node_id = Node, _ = '_'}, _ = '_'}). -trie_has_any_bindings(X, Node) -> - has_any(rabbit_topic_trie_binding, +trie_binding_count(X, Node) -> + count(rabbit_topic_trie_binding, #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +count(Table, Match) -> + length(mnesia:match_object(Table, Match, read)). trie_remove_all_edges(X) -> remove_all(rabbit_topic_trie_edge, @@ -221,17 +252,8 @@ trie_remove_all_edges(X) -> trie_remove_all_bindings(X) -> remove_all(rabbit_topic_trie_binding, #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, _ = '_'}, - _ = '_'}). - -has_any(Table, MatchHead) -> - Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read), - select_while_no_result(Select) /= '$end_of_table'. - -select_while_no_result({[], Cont}) -> - select_while_no_result(mnesia:select(Cont)); -select_while_no_result(Other) -> - Other. + trie_binding = #trie_binding{exchange_name = X, _ = '_'}, + _ = '_'}). remove_all(Table, Pattern) -> lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, |