diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-11-30 10:27:57 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-11-30 10:27:57 +0000 |
commit | 072ced71211c632e91de067618a521600770c9a0 (patch) | |
tree | 9a08f55f775b9ba899df20ba5e2613416acb199e | |
parent | 11f6e036dff49c982e841462c6b8ba2bc5017656 (diff) | |
download | rabbitmq-server-072ced71211c632e91de067618a521600770c9a0.tar.gz |
reduce time complexity of binding removal on queue deletion
from O(n^2) to O(n).
The solution comes in two parts:
1) a new table, rabbit_topic_trie_node, to explicitly track edge and
binding counts of nodes in the trie. This eliminates expensive
mnesia:match_object operations to determine these counts when needed.
2) table-scope write locks to eliminate row-level locking, which is
expensive when there are many such locks in a single transaction
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 80 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 5 |
3 files changed, 68 insertions, 19 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index a603886c..d81b82db 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -56,9 +56,11 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). +-record(topic_trie_node, {trie_node, edge_count, binding_count}). -record(topic_trie_edge, {trie_edge, node_id}). -record(topic_trie_binding, {trie_binding, value = const}). +-record(trie_node, {exchange_name, node_id}). -record(trie_edge, {exchange_name, node_id, word}). -record(trie_binding, {exchange_name, node_id, destination}). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 348655b1..b675cd4d 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -52,6 +52,7 @@ validate(_X) -> ok. create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> + trie_remove_all_nodes(X), trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; @@ -64,6 +65,20 @@ add_binding(none, _Exchange, _Binding) -> ok. remove_bindings(transaction, #exchange{name = X}, Bs) -> + %% We need these locks to reduce the time complexity; without them + %% we end up with K*length(Bs) row-level locks, and inserting each + %% lock takes time proportional to the number of existing locks, + %% thus resulting in O(length(Bs)^2) complexity. And they need to + %% be write locks since ultimately we end up removing all these + %% rows. The downside of all this is that no other binding + %% operations except lookup/routing (which uses dirty ops) on + %% topic exchanges can take place concurrently. However, that is + %% the case already since the removal of bindings from the + %% rabbit_route etc table, which precedes all this, uses + %% match_object with a partial key, which results in a table lock. + [mnesia:lock({table, T}, write) || T <- [rabbit_topic_trie_node, + rabbit_topic_trie_edge, + rabbit_topic_trie_binding]], %% The remove process is split into two distinct phases. In the %% first phase we gather the lists of bindings and edges to %% delete, then in the second phase we process all the @@ -77,7 +92,6 @@ remove_bindings(transaction, #exchange{name = X}, Bs) -> {[{FinalNode, D} | Acc], decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} end, {[], gb_trees:empty()}, Bs), - [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], [trie_remove_edge(X, Parent, Node, W) || {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], @@ -90,8 +104,7 @@ maybe_add_path(_X, [{root, none}], PathAcc) -> maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> case gb_trees:is_defined(Node, PathAcc) of true -> PathAcc; - false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node), - trie_child_count(X, Node)}}, + false -> gb_trees:insert(Node, {Parent, W, trie_node_counts(X, Node)}, PathAcc) end. @@ -199,10 +212,36 @@ trie_bindings(X, Node) -> destination = '$1'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). +trie_update_node(X, Node, Fun) -> + E = case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, + node_id = Node}, write) of + [] -> #topic_trie_node{trie_node = #trie_node{ + exchange_name = X, + node_id = Node}, + edge_count = 0, + binding_count = 0}; + [E0] -> E0 + end, + case Fun(E) of + #topic_trie_node{edge_count = 0, binding_count = 0} -> + ok = mnesia:delete_object(rabbit_topic_trie_node, E, write); + EN -> + ok = mnesia:write(rabbit_topic_trie_node, EN, write) + end. + trie_add_edge(X, FromNode, ToNode, W) -> + trie_update_node(X, FromNode, + fun(E = #topic_trie_node{edge_count = C}) -> + E#topic_trie_node{edge_count = C + 1} + end), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). trie_remove_edge(X, FromNode, ToNode, W) -> + trie_update_node(X, FromNode, + fun(E = #topic_trie_node{edge_count = C}) -> + E#topic_trie_node{edge_count = C - 1} + end), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). trie_edge_op(X, FromNode, ToNode, W, Op) -> @@ -214,9 +253,17 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> write). trie_add_binding(X, Node, D) -> + trie_update_node(X, Node, + fun(E = #topic_trie_node{binding_count = C}) -> + E#topic_trie_node{binding_count = C + 1} + end), trie_binding_op(X, Node, D, fun mnesia:write/3). trie_remove_binding(X, Node, D) -> + trie_update_node(X, Node, + fun(E = #topic_trie_node{binding_count = C}) -> + E#topic_trie_node{binding_count = C - 1} + end), trie_binding_op(X, Node, D, fun mnesia:delete_object/3). trie_binding_op(X, Node, D, Op) -> @@ -227,23 +274,18 @@ trie_binding_op(X, Node, D, Op) -> destination = D}}, write). -trie_child_count(X, Node) -> - count(rabbit_topic_trie_edge, - #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -trie_binding_count(X, Node) -> - count(rabbit_topic_trie_binding, - #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). +trie_node_counts(X, Node) -> + [#topic_trie_node{edge_count = EC, binding_count = BC}] = + mnesia:read({rabbit_topic_trie_node, + #trie_node{exchange_name = X, + node_id = Node}}), + {BC, EC}. -count(Table, Match) -> - length(mnesia:match_object(Table, Match, read)). +trie_remove_all_nodes(X) -> + remove_all(rabbit_topic_trie_node, + #topic_trie_node{trie_node = #trie_node{exchange_name = X, + _ = '_'}, + _ = '_'}). trie_remove_all_edges(X) -> remove_all(rabbit_topic_trie_edge, diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c8c18843..2204aec3 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -268,6 +268,11 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}, + {type, ordered_set}, + {match, #topic_trie_node{}}]}, {rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, {attributes, record_info(fields, topic_trie_edge)}, |