diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-07-19 13:23:10 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-07-19 13:23:10 +0100 |
commit | 5c123c21456a364871e1ef52b626a780ad9f1150 (patch) | |
tree | 7359bc775b7bb5952b737edbdf8fb7d7df081d14 | |
parent | 5424a13ea86566789b3fb72096c9776cc37a1bd5 (diff) | |
download | rabbitmq-server-5c123c21456a364871e1ef52b626a780ad9f1150.tar.gz |
Slaves detect stale master pids on startup
-rw-r--r-- | src/gm.erl | 25 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 |
2 files changed, 28 insertions, 4 deletions
@@ -81,6 +81,12 @@ %% Provide the Pid. Returns a proplist with various facts, including %% the group name and the current group members. %% +%% validate_members/2 +%% Check whether a given member list agrees with the chosen member's +%% view. Any differences will be communicated via the members_changed +%% callback. If there are no differences then there will be no reply. +%% Note that members will not necessarily share the same view. +%% %% forget_group/1 %% Provide the group name. Removes its mnesia record. Makes no attempt %% to ensure the group is empty. @@ -377,7 +383,7 @@ -behaviour(gen_server2). -export([create_tables/0, start_link/4, leave/1, broadcast/2, - confirmed_broadcast/2, info/1, forget_group/1]). + confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/3]). @@ -438,6 +444,7 @@ -spec(broadcast/2 :: (pid(), any()) -> 'ok'). -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). -spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(validate_members/2 :: (pid(), [pid()]) -> 'ok'). -spec(forget_group/1 :: (group_name()) -> 'ok'). %% The joined, members_changed and handle_msg callbacks can all return @@ -524,6 +531,9 @@ confirmed_broadcast(Server, Msg) -> info(Server) -> gen_server2:call(Server, info, infinity). +validate_members(Server, Members) -> + gen_server2:cast(Server, {validate_members, Members}). + forget_group(GroupName) -> {atomic, ok} = mnesia:sync_transaction( fun () -> @@ -659,6 +669,19 @@ handle_cast(join, State = #state { self = Self, handle_callback_result( {Module:joined(Args, get_pids(all_known_members(View))), State1}); +handle_cast({validate_members, OldMembers}, + State = #state { view = View, + module = Module, + callback_args = Args }) -> + NewMembers = get_pids(all_known_members(View)), + Births = NewMembers -- OldMembers, + Deaths = OldMembers -- NewMembers, + case {Births, Deaths} of + {[], []} -> noreply(State); + _ -> Result = Module:members_changed(Args, Births, Deaths), + handle_callback_result({Result, State}) + end; + handle_cast(leave, State) -> {stop, normal, State}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 294e1ebb..5e83e8a4 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -100,7 +100,7 @@ init(Q = #amqqueue { name = QName }) -> Node = node(), case rabbit_misc:execute_mnesia_transaction( fun() -> init_it(Self, GM, Node, QName) end) of - {new, QPid} -> + {new, QPid, GMPids} -> erlang:monitor(process, QPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), @@ -127,6 +127,7 @@ init(Q = #amqqueue { name = QName }) -> rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_depth), + ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; @@ -144,7 +145,7 @@ init_it(Self, GM, Node, QName) -> mnesia:read({rabbit_queue, QName}), case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of [] -> add_slave(Q, Self, GM), - {new, QPid}; + {new, QPid, GMPids}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; false -> {stale, QPid} @@ -156,7 +157,7 @@ init_it(Self, GM, Node, QName) -> gm_pids = [T || T = {_, S} <- GMPids, S =/= SPid] }, add_slave(Q1, Self, GM), - {new, QPid} + {new, QPid, GMPids} end end. |