diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-31 16:04:41 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-31 16:04:41 +0100 |
commit | 54753b4b0b5a803ebe4777bc7e771b8e43d6fa1f (patch) | |
tree | 3742d0a17f6a1ad607ecaf93dd9d066920a557b4 | |
parent | 6fe41f3e724cd65792916b24e50748d0bdc0e4be (diff) | |
download | rabbitmq-server-54753b4b0b5a803ebe4777bc7e771b8e43d6fa1f.tar.gz |
Unify recovery into one boot step, based binding recovery on the queues that have been recovered.
-rw-r--r-- | src/rabbit.erl | 47 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 22 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 41 |
4 files changed, 55 insertions, 62 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 807e9e7d..86c53ff6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -27,7 +27,7 @@ %%--------------------------------------------------------------------------- %% Boot steps. --export([maybe_insert_default_data/0, boot_delegate/0]). +-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]). -rabbit_boot_step({codec_correctness_check, [{description, "codec correctness check"}, @@ -123,15 +123,9 @@ {requires, core_initialized}, {enables, routing_ready}]}). --rabbit_boot_step({exchange_recovery, - [{description, "exchange recovery"}, - {mfa, {rabbit_exchange, recover, []}}, - {requires, empty_db_check}, - {enables, routing_ready}]}). - --rabbit_boot_step({queue_sup_queue_recovery, - [{description, "queue supervisor and queue recovery"}, - {mfa, {rabbit_amqqueue, start, []}}, +-rabbit_boot_step({recovery, + [{description, "exchange / queue recovery"}, + {mfa, {rabbit, recover, []}}, {requires, empty_db_check}, {enables, routing_ready}]}). @@ -186,6 +180,7 @@ -spec(maybe_insert_default_data/0 :: () -> 'ok'). -spec(boot_delegate/0 :: () -> 'ok'). +-spec(recover/0 :: () -> 'ok'). -endif. @@ -464,6 +459,38 @@ boot_delegate() -> {ok, Count} = application:get_env(rabbit, delegate_count), rabbit_sup:start_child(delegate_sup, [Count]). +recover() -> + Xs = rabbit_exchange:recover(), + Qs = rabbit_amqqueue:start(), + Bs = rabbit_binding:recover(Qs), + {RecXBs, NoRecSrcBs} = filter_recovered_exchanges(Xs, Bs), + ok = recovery_callbacks(RecXBs, NoRecSrcBs). + +filter_recovered_exchanges(Xs, Bs) -> + RecXs = dict:from_list([{XName, X} || X = #exchange{name = XName} <- Xs]), + lists:foldl( + fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) -> + case dict:find(Src, RecXs) of + {ok, X} -> {dict:append(X, B, RecXBs), NoRecXBs}; + error -> {RecXBs, dict:append(Src, B, NoRecXBs)} + end + end, {dict:new(), dict:new()}, Bs). + +recovery_callbacks(RecXBs, NoRecXBs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> ok end, + fun (ok, Tx) -> + dict:map(fun (X, Bs) -> + rabbit_exchange:callback(X, start, [Tx, X, Bs]) + end, RecXBs), + dict:map(fun (Src, Bs) -> + {ok, X} = rabbit_exchange:lookup(Src), + rabbit_exchange:callback(X, add_bindings, + [Tx, X, Bs]) + end, NoRecXBs) + end), + ok. + maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of true -> insert_default_data(); diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c7391965..2618c1f5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -57,7 +57,7 @@ -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). --spec(start/0 :: () -> 'ok'). +-spec(start/0 :: () -> [rabbit_types:amqqueue()]). -spec(stop/0 :: () -> 'ok'). -spec(declare/5 :: (name(), boolean(), boolean(), @@ -166,8 +166,7 @@ start() -> {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - _RealDurableQueues = recover_durable_queues(DurableQueues), - ok. + recover_durable_queues(DurableQueues). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -188,7 +187,7 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], [Q || Q <- Qs, - gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q]. + gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index c9cf0a39..e656cfc7 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -17,7 +17,7 @@ -module(rabbit_binding). -include("rabbit.hrl"). --export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). +-export([recover/1, exists/1, add/1, remove/1, add/2, remove/2, list/1]). -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, @@ -50,7 +50,7 @@ -opaque(deletions() :: dict()). --spec(recover/0 :: () -> [rabbit_types:binding()]). +-spec(recover/1 :: ([rabbit_types:amqqueue()]) -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> add_res()). -spec(remove/1 :: (rabbit_types:binding()) -> remove_res()). @@ -93,10 +93,11 @@ destination_name, destination_kind, routing_key, arguments]). -recover() -> +recover(Qs) -> + QNames = sets:from_list([Name || #amqqueue{name = Name} <- Qs]), rabbit_misc:table_fold( fun (Route = #route{binding = B}, Acc) -> - case should_recover(B) of + case should_recover(B, QNames) of true -> {_, Rev} = route_with_reverse(Route), ok = mnesia:write(rabbit_route, Route, write), ok = mnesia:write(rabbit_reverse_route, Rev, write), @@ -105,19 +106,12 @@ recover() -> end end, [], rabbit_durable_route). -should_recover(B = #binding{destination = Dest = #resource{ kind = Kind }}) -> +should_recover(B = #binding{destination = Dest = #resource{ kind = Kind }}, + QNames) -> case mnesia:read({rabbit_route, B}) of [] -> case Kind of exchange -> true; - queue -> case mnesia:read({rabbit_durable_queue, Dest}) of - [Q] -> #amqqueue{pid = Pid} = Q, - Node = node(), - case node(Pid) of - Node -> true; - _ -> false - end; - _ -> false - end + queue -> sets:is_element(Dest, QNames) end; _ -> false end. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 572a0b70..fa837d0c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -83,41 +83,14 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> - Xs = rabbit_misc:table_fold( - fun (X = #exchange{name = XName}, Acc) -> - case mnesia:read({rabbit_exchange, XName}) of - [] -> ok = mnesia:write(rabbit_exchange, X, write), - [X | Acc]; - [_] -> Acc - end - end, [], rabbit_durable_exchange), - Bs = rabbit_binding:recover(), - {RecXBs, NoRecXBs} = filter_recovered_exchanges(Xs, Bs), - ok = recovery_callbacks(RecXBs, NoRecXBs). - -filter_recovered_exchanges(Xs, Bs) -> - RecXs = dict:from_list([{XName, X} || X = #exchange{name = XName} <- Xs]), - lists:foldl( - fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) -> - case dict:find(Src, RecXs) of - {ok, X} -> {dict:append(X, B, RecXBs), NoRecXBs}; - error -> {ok, X} = lookup(Src), - {RecXBs, dict:append(X, B, NoRecXBs)} + rabbit_misc:table_fold( + fun (X = #exchange{name = XName}, Acc) -> + case mnesia:read({rabbit_exchange, XName}) of + [] -> ok = mnesia:write(rabbit_exchange, X, write), + [X | Acc]; + [_] -> Acc end - end, {dict:new(), dict:new()}, Bs). - -recovery_callbacks(RecXBs, NoRecXBs) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> ok end, - fun (ok, Tx) -> - dict:map(fun (X = #exchange{type = Type}, Bs) -> - (type_to_module(Type)):start(Tx, X, Bs) - end, RecXBs), - dict:map(fun (X = #exchange{type = Type}, Bs) -> - (type_to_module(Type)):add_bindings(Tx, X, Bs) - end, NoRecXBs) - end), - ok. + end, [], rabbit_durable_exchange). callback(#exchange{type = XType}, Fun, Args) -> apply(type_to_module(XType), Fun, Args). |