summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-31 16:04:41 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-03-31 16:04:41 +0100
commit54753b4b0b5a803ebe4777bc7e771b8e43d6fa1f (patch)
tree3742d0a17f6a1ad607ecaf93dd9d066920a557b4
parent6fe41f3e724cd65792916b24e50748d0bdc0e4be (diff)
downloadrabbitmq-server-54753b4b0b5a803ebe4777bc7e771b8e43d6fa1f.tar.gz
Unify recovery into one boot step, based binding recovery on the queues that have been recovered.
-rw-r--r--src/rabbit.erl47
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_binding.erl22
-rw-r--r--src/rabbit_exchange.erl41
4 files changed, 55 insertions, 62 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 807e9e7d..86c53ff6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,7 +27,7 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0, boot_delegate/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
@@ -123,15 +123,9 @@
{requires, core_initialized},
{enables, routing_ready}]}).
--rabbit_boot_step({exchange_recovery,
- [{description, "exchange recovery"},
- {mfa, {rabbit_exchange, recover, []}},
- {requires, empty_db_check},
- {enables, routing_ready}]}).
-
--rabbit_boot_step({queue_sup_queue_recovery,
- [{description, "queue supervisor and queue recovery"},
- {mfa, {rabbit_amqqueue, start, []}},
+-rabbit_boot_step({recovery,
+ [{description, "exchange / queue recovery"},
+ {mfa, {rabbit, recover, []}},
{requires, empty_db_check},
{enables, routing_ready}]}).
@@ -186,6 +180,7 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
+-spec(recover/0 :: () -> 'ok').
-endif.
@@ -464,6 +459,38 @@ boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
rabbit_sup:start_child(delegate_sup, [Count]).
+recover() ->
+ Xs = rabbit_exchange:recover(),
+ Qs = rabbit_amqqueue:start(),
+ Bs = rabbit_binding:recover(Qs),
+ {RecXBs, NoRecSrcBs} = filter_recovered_exchanges(Xs, Bs),
+ ok = recovery_callbacks(RecXBs, NoRecSrcBs).
+
+filter_recovered_exchanges(Xs, Bs) ->
+ RecXs = dict:from_list([{XName, X} || X = #exchange{name = XName} <- Xs]),
+ lists:foldl(
+ fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) ->
+ case dict:find(Src, RecXs) of
+ {ok, X} -> {dict:append(X, B, RecXBs), NoRecXBs};
+ error -> {RecXBs, dict:append(Src, B, NoRecXBs)}
+ end
+ end, {dict:new(), dict:new()}, Bs).
+
+recovery_callbacks(RecXBs, NoRecXBs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok end,
+ fun (ok, Tx) ->
+ dict:map(fun (X, Bs) ->
+ rabbit_exchange:callback(X, start, [Tx, X, Bs])
+ end, RecXBs),
+ dict:map(fun (Src, Bs) ->
+ {ok, X} = rabbit_exchange:lookup(Src),
+ rabbit_exchange:callback(X, add_bindings,
+ [Tx, X, Bs])
+ end, NoRecXBs)
+ end),
+ ok.
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7391965..2618c1f5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -57,7 +57,7 @@
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
--spec(start/0 :: () -> 'ok').
+-spec(start/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
@@ -166,8 +166,7 @@ start() ->
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- _RealDurableQueues = recover_durable_queues(DurableQueues),
- ok.
+ recover_durable_queues(DurableQueues).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -188,7 +187,7 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs,
- gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q].
+ gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index c9cf0a39..e656cfc7 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -17,7 +17,7 @@
-module(rabbit_binding).
-include("rabbit.hrl").
--export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
+-export([recover/1, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
@@ -50,7 +50,7 @@
-opaque(deletions() :: dict()).
--spec(recover/0 :: () -> [rabbit_types:binding()]).
+-spec(recover/1 :: ([rabbit_types:amqqueue()]) -> [rabbit_types:binding()]).
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
-spec(add/1 :: (rabbit_types:binding()) -> add_res()).
-spec(remove/1 :: (rabbit_types:binding()) -> remove_res()).
@@ -93,10 +93,11 @@
destination_name, destination_kind,
routing_key, arguments]).
-recover() ->
+recover(Qs) ->
+ QNames = sets:from_list([Name || #amqqueue{name = Name} <- Qs]),
rabbit_misc:table_fold(
fun (Route = #route{binding = B}, Acc) ->
- case should_recover(B) of
+ case should_recover(B, QNames) of
true -> {_, Rev} = route_with_reverse(Route),
ok = mnesia:write(rabbit_route, Route, write),
ok = mnesia:write(rabbit_reverse_route, Rev, write),
@@ -105,19 +106,12 @@ recover() ->
end
end, [], rabbit_durable_route).
-should_recover(B = #binding{destination = Dest = #resource{ kind = Kind }}) ->
+should_recover(B = #binding{destination = Dest = #resource{ kind = Kind }},
+ QNames) ->
case mnesia:read({rabbit_route, B}) of
[] -> case Kind of
exchange -> true;
- queue -> case mnesia:read({rabbit_durable_queue, Dest}) of
- [Q] -> #amqqueue{pid = Pid} = Q,
- Node = node(),
- case node(Pid) of
- Node -> true;
- _ -> false
- end;
- _ -> false
- end
+ queue -> sets:is_element(Dest, QNames)
end;
_ -> false
end.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 572a0b70..fa837d0c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -83,41 +83,14 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
- Xs = rabbit_misc:table_fold(
- fun (X = #exchange{name = XName}, Acc) ->
- case mnesia:read({rabbit_exchange, XName}) of
- [] -> ok = mnesia:write(rabbit_exchange, X, write),
- [X | Acc];
- [_] -> Acc
- end
- end, [], rabbit_durable_exchange),
- Bs = rabbit_binding:recover(),
- {RecXBs, NoRecXBs} = filter_recovered_exchanges(Xs, Bs),
- ok = recovery_callbacks(RecXBs, NoRecXBs).
-
-filter_recovered_exchanges(Xs, Bs) ->
- RecXs = dict:from_list([{XName, X} || X = #exchange{name = XName} <- Xs]),
- lists:foldl(
- fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) ->
- case dict:find(Src, RecXs) of
- {ok, X} -> {dict:append(X, B, RecXBs), NoRecXBs};
- error -> {ok, X} = lookup(Src),
- {RecXBs, dict:append(X, B, NoRecXBs)}
+ rabbit_misc:table_fold(
+ fun (X = #exchange{name = XName}, Acc) ->
+ case mnesia:read({rabbit_exchange, XName}) of
+ [] -> ok = mnesia:write(rabbit_exchange, X, write),
+ [X | Acc];
+ [_] -> Acc
end
- end, {dict:new(), dict:new()}, Bs).
-
-recovery_callbacks(RecXBs, NoRecXBs) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> ok end,
- fun (ok, Tx) ->
- dict:map(fun (X = #exchange{type = Type}, Bs) ->
- (type_to_module(Type)):start(Tx, X, Bs)
- end, RecXBs),
- dict:map(fun (X = #exchange{type = Type}, Bs) ->
- (type_to_module(Type)):add_bindings(Tx, X, Bs)
- end, NoRecXBs)
- end),
- ok.
+ end, [], rabbit_durable_exchange).
callback(#exchange{type = XType}, Fun, Args) ->
apply(type_to_module(XType), Fun, Args).