diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-19 15:19:35 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-19 15:19:35 +0000 |
commit | 975fc014a343af92239ef4a5d170d2506158ceca (patch) | |
tree | d76f8babdb4689f16ff633bab9d274dbf6eea190 | |
parent | f4e9761a3c6a4cb710d949b55ab9612925783d36 (diff) | |
parent | 1b3b5ba1a1081cd86e257dc35bd90615ba40fc0f (diff) | |
download | rabbitmq-server-975fc014a343af92239ef4a5d170d2506158ceca.tar.gz |
Merge bug23938
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 120 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 13 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 8 |
4 files changed, 73 insertions, 70 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index a603886c..d81b82db 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -56,9 +56,11 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). +-record(topic_trie_node, {trie_node, edge_count, binding_count}). -record(topic_trie_edge, {trie_edge, node_id}). -record(topic_trie_binding, {trie_binding, value = const}). +-record(trie_node, {exchange_name, node_id}). -record(trie_edge, {exchange_name, node_id, word}). -record(trie_binding, {exchange_name, node_id, destination}). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 348655b1..91c7b5d3 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -52,6 +52,7 @@ validate(_X) -> ok. create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> + trie_remove_all_nodes(X), trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; @@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) -> add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(transaction, #exchange{name = X}, Bs) -> - %% The remove process is split into two distinct phases. In the - %% first phase we gather the lists of bindings and edges to - %% delete, then in the second phase we process all the - %% deletions. This is to prevent interleaving of read/write - %% operations in mnesia that can adversely affect performance. - {ToDelete, Paths} = - lists:foldl( - fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) -> - Path = [{FinalNode, _} | _] = - follow_down_get_path(S, split_topic_key(K)), - {[{FinalNode, D} | Acc], - decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} - end, {[], gb_trees:empty()}, Bs), - - [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], - [trie_remove_edge(X, Parent, Node, W) || - {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], +remove_bindings(transaction, _X, Bs) -> + %% See rabbit_binding:lock_route_tables for the rationale for + %% taking table locks. + case Bs of + [_] -> ok; + _ -> [mnesia:lock({table, T}, write) || + T <- [rabbit_topic_trie_node, + rabbit_topic_trie_edge, + rabbit_topic_trie_binding]] + end, + [begin + Path = [{FinalNode, _} | _] = + follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path) + end || #binding{source = X, key = K, destination = D} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. -maybe_add_path(_X, [{root, none}], PathAcc) -> - PathAcc; -maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> - case gb_trees:is_defined(Node, PathAcc) of - true -> PathAcc; - false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node), - trie_child_count(X, Node)}}, - PathAcc) - end. - -decrement_bindings(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end, - Path, PathAcc). - -decrement_edges(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end, - Path, PathAcc). - -with_path_acc(_X, _Fun, [{root, none}], PathAcc) -> - PathAcc; -with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) -> - {Parent, W, Counts} = gb_trees:get(Node, PathAcc), - NewCounts = Fun(Counts), - NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc), - case NewCounts of - {0, 0} -> decrement_edges(X, ParentPath, - maybe_add_path(X, ParentPath, NewPathAcc)); - _ -> NewPathAcc - end. - - assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). @@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> error -> {error, Acc, Words} end. +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, node_id = Node}, write) of + [] -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath); + _ -> ok + end. + trie_child(X, Node, Word) -> case mnesia:read({rabbit_topic_trie_edge, #trie_edge{exchange_name = X, @@ -199,10 +177,30 @@ trie_bindings(X, Node) -> destination = '$1'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). +trie_update_node_counts(X, Node, Field, Delta) -> + E = case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, + node_id = Node}, write) of + [] -> #topic_trie_node{trie_node = #trie_node{ + exchange_name = X, + node_id = Node}, + edge_count = 0, + binding_count = 0}; + [E0] -> E0 + end, + case setelement(Field, E, element(Field, E) + Delta) of + #topic_trie_node{edge_count = 0, binding_count = 0} -> + ok = mnesia:delete_object(rabbit_topic_trie_node, E, write); + EN -> + ok = mnesia:write(rabbit_topic_trie_node, EN, write) + end. + trie_add_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). trie_remove_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). trie_edge_op(X, FromNode, ToNode, W, Op) -> @@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> write). trie_add_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), trie_binding_op(X, Node, D, fun mnesia:write/3). trie_remove_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), trie_binding_op(X, Node, D, fun mnesia:delete_object/3). trie_binding_op(X, Node, D, Op) -> @@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) -> destination = D}}, write). -trie_child_count(X, Node) -> - count(rabbit_topic_trie_edge, - #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -trie_binding_count(X, Node) -> - count(rabbit_topic_trie_binding, - #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -count(Table, Match) -> - length(mnesia:match_object(Table, Match, read)). +trie_remove_all_nodes(X) -> + remove_all(rabbit_topic_trie_node, + #topic_trie_node{trie_node = #trie_node{exchange_name = X, + _ = '_'}, + _ = '_'}). trie_remove_all_edges(X) -> remove_all(rabbit_topic_trie_edge, diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c8c18843..bf997a6f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -268,6 +268,11 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}, + {type, ordered_set}, + {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, {rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, {attributes, record_info(fields, topic_trie_edge)}, @@ -314,12 +319,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_node_match() -> + #trie_node{ exchange_name = exchange_name_match(), _='_'}. trie_edge_match() -> - #trie_edge{exchange_name = exchange_name_match(), - _='_'}. + #trie_edge{ exchange_name = exchange_name_match(), _='_'}. trie_binding_match() -> - #trie_binding{exchange_name = exchange_name_match(), - _='_'}. + #trie_binding{exchange_name = exchange_name_match(), _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index e0ca8cbb..f164035e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -35,6 +35,7 @@ -rabbit_upgrade({gm, mnesia, []}). -rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). -rabbit_upgrade({mirrored_supervisor, mnesia, []}). +-rabbit_upgrade({topic_trie_node, mnesia, []}). %% ------------------------------------------------------------------- @@ -54,6 +55,7 @@ -spec(gm/0 :: () -> 'ok'). -spec(exchange_scratch/0 :: () -> 'ok'). -spec(mirrored_supervisor/0 :: () -> 'ok'). +-spec(topic_trie_node/0 :: () -> 'ok'). -endif. @@ -177,6 +179,12 @@ mirrored_supervisor() -> [{record_name, mirrored_sup_childspec}, {attributes, [key, mirroring_pid, childspec]}]). +topic_trie_node() -> + create(rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, [trie_node, edge_count, binding_count]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |