diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-05-23 12:59:36 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-05-23 12:59:36 +0100 |
commit | 1c83a53cd0a40e69e48a016e18df81efb1803381 (patch) | |
tree | 0b7b714066067971264a0fc19246e9c79be873e4 | |
parent | 4d89531345d95406e8127bef87e2f578307102a4 (diff) | |
parent | 853b30bb9cfd76ef9bf74129e61e1fe6541786fb (diff) | |
download | rabbitmq-server-1c83a53cd0a40e69e48a016e18df81efb1803381.tar.gz |
Merge bug24919
-rw-r--r-- | docs/rabbitmqctl.1.xml | 3 | ||||
-rw-r--r-- | src/mirrored_supervisor.erl | 81 | ||||
-rw-r--r-- | src/mirrored_supervisor_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 17 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 13 | ||||
-rw-r--r-- | src/rabbit_sup.erl | 38 |
8 files changed, 84 insertions, 94 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 1effd691..3eb83c88 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -978,7 +978,8 @@ </varlistentry> <varlistentry> <term>type</term> - <listitem><para>The exchange type (one of [<command>direct</command>, + <listitem><para>The exchange type (such as + [<command>direct</command>, <command>topic</command>, <command>headers</command>, <command>fanout</command>]).</para></listitem> </varlistentry> diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 4fe93981..4fc488b8 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -225,8 +225,8 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2). count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2). check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). -call(Sup, Msg) -> - ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity). +call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity). +cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg). find_call(Sup, Id, Msg) -> Group = call(Sup, group), @@ -237,7 +237,7 @@ find_call(Sup, Id, Msg) -> %% immediately after the tx - we can't be 100% here. So we may as %% well dirty_select. case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of - [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity); + [Mirror] -> call(Mirror, Msg); [] -> {error, not_found} end. @@ -246,13 +246,16 @@ fold(FunAtom, Sup, AggFun) -> lists:foldl(AggFun, [], [apply(?SUPERVISOR, FunAtom, [D]) || M <- ?PG2:get_members(Group), - D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]). + D <- [delegate(M)]]). child(Sup, Id) -> [Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup), Id1 =:= Id], Pid. +delegate(Sup) -> child(Sup, delegate). +mirroring(Sup) -> child(Sup, mirroring). + %%---------------------------------------------------------------------------- start_internal(Group, ChildSpecs) -> @@ -288,28 +291,29 @@ handle_call({init, Overall}, _From, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), ?PG2:create(Group), - ok = ?PG2:join(Group, self()), - Rest = ?PG2:get_members(Group) -- [self()], + ok = ?PG2:join(Group, Overall), + Rest = ?PG2:get_members(Group) -- [Overall], case Rest of [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end); _ -> ok end, [begin - ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}), + ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}), erlang:monitor(process, Pid) end || Pid <- Rest], - Delegate = child(Overall, delegate), + Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of + case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of [] -> {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; handle_call({start_child, ChildSpec}, _From, - State = #state{delegate = Delegate, + State = #state{overall = Overall, + delegate = Delegate, group = Group}) -> - {reply, case maybe_start(Group, Delegate, ChildSpec) of + {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of already_in_mnesia -> {error, already_present}; {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; Else -> Else @@ -322,9 +326,6 @@ handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; -handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) -> - {reply, Delegate, State}; - handle_call(group, _From, State = #state{group = Group}) -> {reply, Group, State}; @@ -343,7 +344,7 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. handle_info({'DOWN', _Ref, process, Pid, Reason}, - State = #state{delegate = Pid, group = Group}) -> + State = #state{overall = Pid, group = Group}) -> %% Since the delegate is temporary, its death won't cause us to %% die. Since the overall supervisor kills processes in reverse %% order when shutting down "from above" and we started after the @@ -357,15 +358,16 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, {stop, Reason, State}; handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{delegate = Delegate, group = Group}) -> + State = #state{delegate = Delegate, group = Group, + overall = O}) -> %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. - Self = self(), R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [Self | _] -> {atomic, ChildSpecs} = - mnesia:transaction(fun() -> update_all(Pid) end), - [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; - _ -> [] + [O | _] -> {atomic, ChildSpecs} = + mnesia:transaction( + fun() -> update_all(O, Pid) end), + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> [] end, case errors(R) of [] -> {noreply, State}; @@ -384,13 +386,11 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- tell_all_peers_to_die(Group, Reason) -> - [?GEN_SERVER:cast(P, {die, Reason}) || - P <- ?PG2:get_members(Group) -- [self()]]. + [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]]. -maybe_start(Group, Delegate, ChildSpec) -> - case mnesia:transaction(fun() -> - check_start(Group, Delegate, ChildSpec) - end) of +maybe_start(Group, Overall, Delegate, ChildSpec) -> + case mnesia:transaction( + fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of {atomic, start} -> start(Delegate, ChildSpec); {atomic, undefined} -> already_in_mnesia; {atomic, Pid} -> {already_in_mnesia, Pid}; @@ -398,31 +398,29 @@ maybe_start(Group, Delegate, ChildSpec) -> {aborted, E} -> {error, E} end. -check_start(Group, Delegate, ChildSpec) -> +check_start(Group, Overall, Delegate, ChildSpec) -> case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of - [] -> write(Group, ChildSpec), + [] -> write(Group, Overall, ChildSpec), start; [S] -> #mirrored_sup_childspec{key = {Group, Id}, mirroring_pid = Pid} = S, - case self() of + case Overall of Pid -> child(Delegate, Id); _ -> case supervisor(Pid) of - dead -> write(Group, ChildSpec), + dead -> write(Group, Overall, ChildSpec), start; Delegate0 -> child(Delegate0, Id) end end end. -supervisor(Pid) -> - with_exit_handler( - fun() -> dead end, - fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end). +supervisor(Pid) -> with_exit_handler(fun() -> dead end, + fun() -> delegate(Pid) end). -write(Group, ChildSpec) -> +write(Group, Overall, ChildSpec) -> ok = mnesia:write( #mirrored_sup_childspec{key = {Group, id(ChildSpec)}, - mirroring_pid = self(), + mirroring_pid = Overall, childspec = ChildSpec}), ChildSpec. @@ -448,12 +446,12 @@ check_stop(Group, Delegate, Id) -> id({Id, _, _, _, _, _}) -> Id. -update_all(OldPid) -> - MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid, +update_all(Overall, OldOverall) -> + MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall, key = '$1', childspec = '$2', _ = '_'}, - [write(Group, C) || + [write(Group, Overall, C) || [{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])]. delete_all(Group) -> @@ -467,8 +465,7 @@ errors(Results) -> [E || {error, E} <- Results]. %%---------------------------------------------------------------------------- -create_tables() -> - create_tables([?TABLE_DEF]). +create_tables() -> create_tables([?TABLE_DEF]). create_tables([]) -> ok; diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 60192b34..f8cbd853 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -157,7 +157,7 @@ test_no_migration_on_shutdown() -> with_sups(fun([Evil, _]) -> ?MS:start_child(Evil, childspec(worker)), try - call(worker, ping), + call(worker, ping, 1000, 100), exit(worker_should_not_have_migrated) catch exit:{timeout_waiting_for_server, _, _} -> ok @@ -268,7 +268,7 @@ inc_group() -> get_group(Group) -> {Group, get(counter)}. -call(Id, Msg) -> call(Id, Msg, 1000, 100). +call(Id, Msg) -> call(Id, Msg, 10*1000, 100). call(Id, Msg, 0, _Decr) -> exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); @@ -285,7 +285,7 @@ kill(Pid, Wait) when is_pid(Wait) -> kill(Pid, [Wait]); kill(Pid, Waits) -> erlang:monitor(process, Pid), [erlang:monitor(process, P) || P <- Waits], - exit(Pid, kill), + exit(Pid, bang), kill_wait(Pid), [kill_wait(P) || P <- Waits]. diff --git a/src/rabbit.erl b/src/rabbit.erl index ea9731b6..df009529 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -60,7 +60,8 @@ -rabbit_boot_step({worker_pool, [{description, "worker pool"}, - {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {mfa, {rabbit_sup, start_supervisor_child, + [worker_pool_sup]}}, {requires, pre_boot}, {enables, external_infrastructure}]}). @@ -143,7 +144,8 @@ -rabbit_boot_step({mirror_queue_slave_sup, [{description, "mirror queue slave sup"}, - {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, + {mfa, {rabbit_sup, start_supervisor_child, + [rabbit_mirror_queue_slave_sup]}}, {requires, recovery}, {enables, routing_ready}]}). @@ -538,7 +540,7 @@ boot_error(Format, Args) -> boot_delegate() -> {ok, Count} = application:get_env(rabbit, delegate_count), - rabbit_sup:start_child(delegate_sup, [Count]). + rabbit_sup:start_supervisor_child(delegate_sup, [Count]). recover() -> rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index a471d282..c07ad832 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -47,16 +47,10 @@ %%---------------------------------------------------------------------------- -boot() -> - {ok, _} = - supervisor2:start_child( - rabbit_sup, - {rabbit_direct_client_sup, - {rabbit_client_sup, start_link, +boot() -> rabbit_sup:start_supervisor_child( + rabbit_direct_client_sup, rabbit_client_sup, [{local, rabbit_direct_client_sup}, - {rabbit_channel_sup, start_link, []}]}, - transient, infinity, supervisor, [rabbit_client_sup]}), - ok. + {rabbit_channel_sup, start_link, []}]). force_event_refresh() -> [Pid ! force_event_refresh || Pid<- list()], diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index 8eacb1f3..a2034876 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start/0, start_link/0, start_child/2]). +-export([start_link/0, start_child/2]). -export([init/1]). @@ -26,20 +26,9 @@ -define(SERVER, ?MODULE). -start() -> - {ok, _} = - supervisor2:start_child( - rabbit_sup, - {rabbit_mirror_queue_slave_sup, - {rabbit_mirror_queue_slave_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}), - ok. +start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_link() -> - supervisor2:start_link({local, ?SERVER}, ?MODULE, []). - -start_child(Node, Args) -> - supervisor2:start_child({?SERVER, Node}, Args). +start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). init([]) -> {ok, {{simple_one_for_one_terminate, 10, 10}, diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index f0c75d23..78deea97 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -136,15 +136,10 @@ boot_ssl() -> ok end. -start() -> - {ok,_} = supervisor2:start_child( - rabbit_sup, - {rabbit_tcp_client_sup, - {rabbit_client_sup, start_link, - [{local, rabbit_tcp_client_sup}, - {rabbit_connection_sup,start_link,[]}]}, - transient, infinity, supervisor, [rabbit_client_sup]}), - ok. +start() -> rabbit_sup:start_supervisor_child( + rabbit_tcp_client_sup, rabbit_client_sup, + [{local, rabbit_tcp_client_sup}, + {rabbit_connection_sup,start_link,[]}]). ensure_ssl() -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index bf2b4798..f142d233 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -19,6 +19,8 @@ -behaviour(supervisor). -export([start_link/0, start_child/1, start_child/2, start_child/3, + start_supervisor_child/1, start_supervisor_child/2, + start_supervisor_child/3, start_restartable_child/1, start_restartable_child/2, stop_child/1]). -export([init/1]). @@ -33,7 +35,11 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_child/1 :: (atom()) -> 'ok'). +-spec(start_child/2 :: (atom(), [any()]) -> 'ok'). -spec(start_child/3 :: (atom(), atom(), [any()]) -> 'ok'). +-spec(start_supervisor_child/1 :: (atom()) -> 'ok'). +-spec(start_supervisor_child/2 :: (atom(), [any()]) -> 'ok'). +-spec(start_supervisor_child/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(start_restartable_child/1 :: (atom()) -> 'ok'). -spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok'). -spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())). @@ -42,22 +48,29 @@ %%---------------------------------------------------------------------------- -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). -start_child(Mod) -> - start_child(Mod, []). +start_child(Mod) -> start_child(Mod, []). -start_child(Mod, Args) -> - start_child(Mod, Mod, Args). +start_child(Mod, Args) -> start_child(Mod, Mod, Args). start_child(ChildId, Mod, Args) -> - child_reply(supervisor:start_child(?SERVER, - {ChildId, {Mod, start_link, Args}, - transient, ?MAX_WAIT, worker, [Mod]})). + child_reply(supervisor:start_child( + ?SERVER, + {ChildId, {Mod, start_link, Args}, + transient, ?MAX_WAIT, worker, [Mod]})). + +start_supervisor_child(Mod) -> start_supervisor_child(Mod, []). + +start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args). + +start_supervisor_child(ChildId, Mod, Args) -> + child_reply(supervisor:start_child( + ?SERVER, + {ChildId, {Mod, start_link, Args}, + transient, infinity, supervisor, [Mod]})). -start_restartable_child(Mod) -> - start_restartable_child(Mod, []). +start_restartable_child(Mod) -> start_restartable_child(Mod, []). start_restartable_child(Mod, Args) -> Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), @@ -73,8 +86,7 @@ stop_child(ChildId) -> E -> E end. -init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. +init([]) -> {ok, {{one_for_all, 0, 1}, []}}. %%---------------------------------------------------------------------------- |