diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-03-19 15:43:56 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-03-19 15:43:56 +0000 |
commit | 59b1fe0e022d010df8f3213bc734a59ef963fd6d (patch) | |
tree | 8ba8cd1b4f7785c3b6ca639a2883cd2c2356a015 | |
parent | 421f044f6cb6ca5551c143cef877ad7f4f9ceb81 (diff) | |
download | rabbitmq-server-59b1fe0e022d010df8f3213bc734a59ef963fd6d.tar.gz |
recover exchanges/bindings/queues in per-item transactionsbug20500
Because recovering them in large, single transactions is incredibly
slow, with complexity that is far worse than linear in the number of
entries we recover, presumably due to the way mnesia represents
transaction-local storage.
-rw-r--r-- | src/rabbit_amqqueue.erl | 39 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 25 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 17 |
3 files changed, 52 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b9abb29..69c97dfe 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -122,19 +122,32 @@ recover() -> recover_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(durable_queues), - node(Pid) == Node])) - end), - Queues = lists:map(fun start_queue_process/1, R), - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun store_queue/1, Queues), - ok - end). + lists:foreach( + fun (RecoveredQ) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + durable_queues, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> ok; + false -> exit(Q#amqqueue.pid, shutdown) + end + end, + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(durable_queues), + node(Pid) == Node])) + end)), + ok. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 925c335c..e72669ac 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -102,22 +102,15 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:foldl( - fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), - Acc - end, ok, durable_exchanges), - mnesia:foldl( - fun (Route, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(Route), - ok = mnesia:write(ReverseRoute), - Acc - end, ok, durable_routes), - ok - end). + ok = rabbit_misc:table_foreach( + fun(Exchange) -> ok = mnesia:write(Exchange) end, + durable_exchanges), + ok = rabbit_misc:table_foreach( + fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute) + end, durable_routes), + ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 053bde54..5730fdc0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,6 +46,7 @@ -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). +-export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). @@ -97,6 +98,7 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). @@ -295,6 +297,21 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). +%% For each entry in a table, execute a function in a transaction. +%% This is often far more efficient than wrapping a tx around the lot. +%% +%% We ignore entries that have been modified or removed. +table_foreach(F, TableName) -> + lists:foreach( + fun (E) -> execute_mnesia_transaction( + fun () -> case mnesia:match_object(TableName, E, read) of + [] -> ok; + _ -> F(E) + end + end) + end, dirty_read_all(TableName)), + ok. + dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). |