summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-11-30 10:27:57 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-11-30 10:27:57 +0000
commit072ced71211c632e91de067618a521600770c9a0 (patch)
tree9a08f55f775b9ba899df20ba5e2613416acb199e
parent11f6e036dff49c982e841462c6b8ba2bc5017656 (diff)
downloadrabbitmq-server-072ced71211c632e91de067618a521600770c9a0.tar.gz
reduce time complexity of binding removal on queue deletion
from O(n^2) to O(n). The solution comes in two parts: 1) a new table, rabbit_topic_trie_node, to explicitly track edge and binding counts of nodes in the trie. This eliminates expensive mnesia:match_object operations to determine these counts when needed. 2) table-scope write locks to eliminate row-level locking, which is expensive when there are many such locks in a single transaction
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_exchange_type_topic.erl80
-rw-r--r--src/rabbit_mnesia.erl5
3 files changed, 68 insertions, 19 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index a603886c..d81b82db 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -56,9 +56,11 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
+-record(topic_trie_node, {trie_node, edge_count, binding_count}).
-record(topic_trie_edge, {trie_edge, node_id}).
-record(topic_trie_binding, {trie_binding, value = const}).
+-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
-record(trie_binding, {exchange_name, node_id, destination}).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 348655b1..b675cd4d 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -52,6 +52,7 @@ validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
+ trie_remove_all_nodes(X),
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
@@ -64,6 +65,20 @@ add_binding(none, _Exchange, _Binding) ->
ok.
remove_bindings(transaction, #exchange{name = X}, Bs) ->
+ %% We need these locks to reduce the time complexity; without them
+ %% we end up with K*length(Bs) row-level locks, and inserting each
+ %% lock takes time proportional to the number of existing locks,
+ %% thus resulting in O(length(Bs)^2) complexity. And they need to
+ %% be write locks since ultimately we end up removing all these
+ %% rows. The downside of all this is that no other binding
+ %% operations except lookup/routing (which uses dirty ops) on
+ %% topic exchanges can take place concurrently. However, that is
+ %% the case already since the removal of bindings from the
+ %% rabbit_route etc table, which precedes all this, uses
+ %% match_object with a partial key, which results in a table lock.
+ [mnesia:lock({table, T}, write) || T <- [rabbit_topic_trie_node,
+ rabbit_topic_trie_edge,
+ rabbit_topic_trie_binding]],
%% The remove process is split into two distinct phases. In the
%% first phase we gather the lists of bindings and edges to
%% delete, then in the second phase we process all the
@@ -77,7 +92,6 @@ remove_bindings(transaction, #exchange{name = X}, Bs) ->
{[{FinalNode, D} | Acc],
decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
end, {[], gb_trees:empty()}, Bs),
-
[trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
[trie_remove_edge(X, Parent, Node, W) ||
{Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
@@ -90,8 +104,7 @@ maybe_add_path(_X, [{root, none}], PathAcc) ->
maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
case gb_trees:is_defined(Node, PathAcc) of
true -> PathAcc;
- false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
- trie_child_count(X, Node)}},
+ false -> gb_trees:insert(Node, {Parent, W, trie_node_counts(X, Node)},
PathAcc)
end.
@@ -199,10 +212,36 @@ trie_bindings(X, Node) ->
destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+trie_update_node(X, Node, Fun) ->
+ E = case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X,
+ node_id = Node}, write) of
+ [] -> #topic_trie_node{trie_node = #trie_node{
+ exchange_name = X,
+ node_id = Node},
+ edge_count = 0,
+ binding_count = 0};
+ [E0] -> E0
+ end,
+ case Fun(E) of
+ #topic_trie_node{edge_count = 0, binding_count = 0} ->
+ ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
+ EN ->
+ ok = mnesia:write(rabbit_topic_trie_node, EN, write)
+ end.
+
trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_update_node(X, FromNode,
+ fun(E = #topic_trie_node{edge_count = C}) ->
+ E#topic_trie_node{edge_count = C + 1}
+ end),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_update_node(X, FromNode,
+ fun(E = #topic_trie_node{edge_count = C}) ->
+ E#topic_trie_node{edge_count = C - 1}
+ end),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
trie_edge_op(X, FromNode, ToNode, W, Op) ->
@@ -214,9 +253,17 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
write).
trie_add_binding(X, Node, D) ->
+ trie_update_node(X, Node,
+ fun(E = #topic_trie_node{binding_count = C}) ->
+ E#topic_trie_node{binding_count = C + 1}
+ end),
trie_binding_op(X, Node, D, fun mnesia:write/3).
trie_remove_binding(X, Node, D) ->
+ trie_update_node(X, Node,
+ fun(E = #topic_trie_node{binding_count = C}) ->
+ E#topic_trie_node{binding_count = C - 1}
+ end),
trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
trie_binding_op(X, Node, D, Op) ->
@@ -227,23 +274,18 @@ trie_binding_op(X, Node, D, Op) ->
destination = D}},
write).
-trie_child_count(X, Node) ->
- count(rabbit_topic_trie_edge,
- #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-trie_binding_count(X, Node) ->
- count(rabbit_topic_trie_binding,
- #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
+trie_node_counts(X, Node) ->
+ [#topic_trie_node{edge_count = EC, binding_count = BC}] =
+ mnesia:read({rabbit_topic_trie_node,
+ #trie_node{exchange_name = X,
+ node_id = Node}}),
+ {BC, EC}.
-count(Table, Match) ->
- length(mnesia:match_object(Table, Match, read)).
+trie_remove_all_nodes(X) ->
+ remove_all(rabbit_topic_trie_node,
+ #topic_trie_node{trie_node = #trie_node{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c8c18843..2204aec3 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -268,6 +268,11 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{}}]},
{rabbit_topic_trie_edge,
[{record_name, topic_trie_edge},
{attributes, record_info(fields, topic_trie_edge)},