diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-09-15 16:45:19 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-09-15 16:45:19 +0100 |
commit | aeb39f0c02992d4a965c13ae4ffbf5a915ede20c (patch) | |
tree | 68c3ec77badcf258ecbf7151aa86c17a63c7b205 /src/rabbit_binding.erl | |
parent | 2654a33573dcced989eb45b60b912e62e9d66b0d (diff) | |
download | rabbitmq-server-aeb39f0c02992d4a965c13ae4ffbf5a915ede20c.tar.gz |
Scatter/Gather. Eliminates gaps between txns and thus goes much faster.
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r-- | src/rabbit_binding.erl | 49 |
1 files changed, 30 insertions, 19 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index d8190586..84f3a3ec 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -108,35 +108,46 @@ recover(XNames, QNames) -> SelectSet = fun (#resource{kind = exchange}) -> XNameSet; (#resource{kind = queue}) -> QNameSet end, - [recover_semi_durable_route(R, SelectSet(Dst)) || + {ok, Gatherer} = gatherer:start_link(), + [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) || R = #route{binding = #binding{destination = Dst}} <- rabbit_misc:dirty_read_all(rabbit_semi_durable_route)], + empty = gatherer:out(Gatherer), + ok = gatherer:stop(Gatherer), ok. -recover_semi_durable_route(R = #route{binding = B}, ToRecover) -> +recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) -> #binding{source = Src, destination = Dst} = B, case sets:is_element(Dst, ToRecover) of true -> {ok, X} = rabbit_exchange:lookup(Src), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case [] =/= mnesia:match_object( - rabbit_semi_durable_route, R, read) of - false -> no_recover; - true -> ok = sync_transient_route( - R, fun mnesia:write/3), - rabbit_exchange:serial(X) - end - end, - fun (no_recover, _) -> - ok; - (_Serial, true) -> - x_callback(transaction, X, add_binding, B); - (Serial, false) -> - x_callback(Serial, X, add_binding, B) - end); + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + recover_semi_durable_route_txn_fun(Gatherer, R, X)); false -> ok end. +recover_semi_durable_route_txn_fun(Gatherer, R = #route{binding = B}, X) -> + fun () -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case [] =/= mnesia:match_object( + rabbit_semi_durable_route, R, read) of + false -> no_recover; + true -> ok = sync_transient_route( + R, fun mnesia:write/3), + rabbit_exchange:serial(X) + end + end, + fun (no_recover, _) -> + ok; + (_Serial, true) -> + x_callback(transaction, X, add_binding, B); + (Serial, false) -> + x_callback(Serial, X, add_binding, B) + end), + ok = gatherer:finish(Gatherer) + end. + exists(Binding) -> binding_action( Binding, fun (_Src, _Dst, B) -> |