summaryrefslogtreecommitdiff
path: root/src/rabbit_binding.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-09-15 16:45:19 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-09-15 16:45:19 +0100
commitaeb39f0c02992d4a965c13ae4ffbf5a915ede20c (patch)
tree68c3ec77badcf258ecbf7151aa86c17a63c7b205 /src/rabbit_binding.erl
parent2654a33573dcced989eb45b60b912e62e9d66b0d (diff)
downloadrabbitmq-server-aeb39f0c02992d4a965c13ae4ffbf5a915ede20c.tar.gz
Scatter/Gather. Eliminates gaps between txns and thus goes much faster.
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r--src/rabbit_binding.erl49
1 files changed, 30 insertions, 19 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index d8190586..84f3a3ec 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -108,35 +108,46 @@ recover(XNames, QNames) ->
SelectSet = fun (#resource{kind = exchange}) -> XNameSet;
(#resource{kind = queue}) -> QNameSet
end,
- [recover_semi_durable_route(R, SelectSet(Dst)) ||
+ {ok, Gatherer} = gatherer:start_link(),
+ [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) ||
R = #route{binding = #binding{destination = Dst}} <-
rabbit_misc:dirty_read_all(rabbit_semi_durable_route)],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
ok.
-recover_semi_durable_route(R = #route{binding = B}, ToRecover) ->
+recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) ->
#binding{source = Src, destination = Dst} = B,
case sets:is_element(Dst, ToRecover) of
true -> {ok, X} = rabbit_exchange:lookup(Src),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case [] =/= mnesia:match_object(
- rabbit_semi_durable_route, R, read) of
- false -> no_recover;
- true -> ok = sync_transient_route(
- R, fun mnesia:write/3),
- rabbit_exchange:serial(X)
- end
- end,
- fun (no_recover, _) ->
- ok;
- (_Serial, true) ->
- x_callback(transaction, X, add_binding, B);
- (Serial, false) ->
- x_callback(Serial, X, add_binding, B)
- end);
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ recover_semi_durable_route_txn_fun(Gatherer, R, X));
false -> ok
end.
+recover_semi_durable_route_txn_fun(Gatherer, R = #route{binding = B}, X) ->
+ fun () ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case [] =/= mnesia:match_object(
+ rabbit_semi_durable_route, R, read) of
+ false -> no_recover;
+ true -> ok = sync_transient_route(
+ R, fun mnesia:write/3),
+ rabbit_exchange:serial(X)
+ end
+ end,
+ fun (no_recover, _) ->
+ ok;
+ (_Serial, true) ->
+ x_callback(transaction, X, add_binding, B);
+ (Serial, false) ->
+ x_callback(Serial, X, add_binding, B)
+ end),
+ ok = gatherer:finish(Gatherer)
+ end.
+
exists(Binding) ->
binding_action(
Binding, fun (_Src, _Dst, B) ->