summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-19 15:19:35 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-19 15:19:35 +0000
commit975fc014a343af92239ef4a5d170d2506158ceca (patch)
treed76f8babdb4689f16ff633bab9d274dbf6eea190
parentf4e9761a3c6a4cb710d949b55ab9612925783d36 (diff)
parent1b3b5ba1a1081cd86e257dc35bd90615ba40fc0f (diff)
downloadrabbitmq-server-975fc014a343af92239ef4a5d170d2506158ceca.tar.gz
Merge bug23938
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_exchange_type_topic.erl120
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_upgrade_functions.erl8
4 files changed, 73 insertions, 70 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index a603886c..d81b82db 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -56,9 +56,11 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
+-record(topic_trie_node, {trie_node, edge_count, binding_count}).
-record(topic_trie_edge, {trie_edge, node_id}).
-record(topic_trie_binding, {trie_binding, value = const}).
+-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
-record(trie_binding, {exchange_name, node_id, destination}).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 348655b1..91c7b5d3 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -52,6 +52,7 @@ validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
+ trie_remove_all_nodes(X),
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
@@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) ->
add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(transaction, #exchange{name = X}, Bs) ->
- %% The remove process is split into two distinct phases. In the
- %% first phase we gather the lists of bindings and edges to
- %% delete, then in the second phase we process all the
- %% deletions. This is to prevent interleaving of read/write
- %% operations in mnesia that can adversely affect performance.
- {ToDelete, Paths} =
- lists:foldl(
- fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
- Path = [{FinalNode, _} | _] =
- follow_down_get_path(S, split_topic_key(K)),
- {[{FinalNode, D} | Acc],
- decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
- end, {[], gb_trees:empty()}, Bs),
-
- [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
- [trie_remove_edge(X, Parent, Node, W) ||
- {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
+remove_bindings(transaction, _X, Bs) ->
+ %% See rabbit_binding:lock_route_tables for the rationale for
+ %% taking table locks.
+ case Bs of
+ [_] -> ok;
+ _ -> [mnesia:lock({table, T}, write) ||
+ T <- [rabbit_topic_trie_node,
+ rabbit_topic_trie_edge,
+ rabbit_topic_trie_binding]]
+ end,
+ [begin
+ Path = [{FinalNode, _} | _] =
+ follow_down_get_path(X, split_topic_key(K)),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path)
+ end || #binding{source = X, key = K, destination = D} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
-maybe_add_path(_X, [{root, none}], PathAcc) ->
- PathAcc;
-maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
- case gb_trees:is_defined(Node, PathAcc) of
- true -> PathAcc;
- false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
- trie_child_count(X, Node)}},
- PathAcc)
- end.
-
-decrement_bindings(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
- Path, PathAcc).
-
-decrement_edges(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
- Path, PathAcc).
-
-with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
- PathAcc;
-with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
- {Parent, W, Counts} = gb_trees:get(Node, PathAcc),
- NewCounts = Fun(Counts),
- NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
- case NewCounts of
- {0, 0} -> decrement_edges(X, ParentPath,
- maybe_add_path(X, ParentPath, NewPathAcc));
- _ -> NewPathAcc
- end.
-
-
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
@@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
error -> {error, Acc, Words}
end.
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X, node_id = Node}, write) of
+ [] -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath);
+ _ -> ok
+ end.
+
trie_child(X, Node, Word) ->
case mnesia:read({rabbit_topic_trie_edge,
#trie_edge{exchange_name = X,
@@ -199,10 +177,30 @@ trie_bindings(X, Node) ->
destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+trie_update_node_counts(X, Node, Field, Delta) ->
+ E = case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X,
+ node_id = Node}, write) of
+ [] -> #topic_trie_node{trie_node = #trie_node{
+ exchange_name = X,
+ node_id = Node},
+ edge_count = 0,
+ binding_count = 0};
+ [E0] -> E0
+ end,
+ case setelement(Field, E, element(Field, E) + Delta) of
+ #topic_trie_node{edge_count = 0, binding_count = 0} ->
+ ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
+ EN ->
+ ok = mnesia:write(rabbit_topic_trie_node, EN, write)
+ end.
+
trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
trie_edge_op(X, FromNode, ToNode, W, Op) ->
@@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
write).
trie_add_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
trie_binding_op(X, Node, D, fun mnesia:write/3).
trie_remove_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
trie_binding_op(X, Node, D, Op) ->
@@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) ->
destination = D}},
write).
-trie_child_count(X, Node) ->
- count(rabbit_topic_trie_edge,
- #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-trie_binding_count(X, Node) ->
- count(rabbit_topic_trie_binding,
- #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-count(Table, Match) ->
- length(mnesia:match_object(Table, Match, read)).
+trie_remove_all_nodes(X) ->
+ remove_all(rabbit_topic_trie_node,
+ #topic_trie_node{trie_node = #trie_node{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c8c18843..bf997a6f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -268,6 +268,11 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
{rabbit_topic_trie_edge,
[{record_name, topic_trie_edge},
{attributes, record_info(fields, topic_trie_edge)},
@@ -314,12 +319,12 @@ reverse_binding_match() ->
_='_'}.
binding_destination_match() ->
resource_match('_').
+trie_node_match() ->
+ #trie_node{ exchange_name = exchange_name_match(), _='_'}.
trie_edge_match() ->
- #trie_edge{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_edge{ exchange_name = exchange_name_match(), _='_'}.
trie_binding_match() ->
- #trie_binding{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_binding{exchange_name = exchange_name_match(), _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index e0ca8cbb..f164035e 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -35,6 +35,7 @@
-rabbit_upgrade({gm, mnesia, []}).
-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}).
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
+-rabbit_upgrade({topic_trie_node, mnesia, []}).
%% -------------------------------------------------------------------
@@ -54,6 +55,7 @@
-spec(gm/0 :: () -> 'ok').
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
+-spec(topic_trie_node/0 :: () -> 'ok').
-endif.
@@ -177,6 +179,12 @@ mirrored_supervisor() ->
[{record_name, mirrored_sup_childspec},
{attributes, [key, mirroring_pid, childspec]}]).
+topic_trie_node() ->
+ create(rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, [trie_node, edge_count, binding_count]},
+ {type, ordered_set}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->