summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-07-19 13:23:10 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-07-19 13:23:10 +0100
commit5c123c21456a364871e1ef52b626a780ad9f1150 (patch)
tree7359bc775b7bb5952b737edbdf8fb7d7df081d14
parent5424a13ea86566789b3fb72096c9776cc37a1bd5 (diff)
downloadrabbitmq-server-5c123c21456a364871e1ef52b626a780ad9f1150.tar.gz
Slaves detect stale master pids on startup
-rw-r--r--src/gm.erl25
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
2 files changed, 28 insertions, 4 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 3f0909e8..76e769d9 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -81,6 +81,12 @@
%% Provide the Pid. Returns a proplist with various facts, including
%% the group name and the current group members.
%%
+%% validate_members/2
+%% Check whether a given member list agrees with the chosen member's
+%% view. Any differences will be communicated via the members_changed
+%% callback. If there are no differences then there will be no reply.
+%% Note that members will not necessarily share the same view.
+%%
%% forget_group/1
%% Provide the group name. Removes its mnesia record. Makes no attempt
%% to ensure the group is empty.
@@ -377,7 +383,7 @@
-behaviour(gen_server2).
-export([create_tables/0, start_link/4, leave/1, broadcast/2,
- confirmed_broadcast/2, info/1, forget_group/1]).
+ confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/3]).
@@ -438,6 +444,7 @@
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(validate_members/2 :: (pid(), [pid()]) -> 'ok').
-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
@@ -524,6 +531,9 @@ confirmed_broadcast(Server, Msg) ->
info(Server) ->
gen_server2:call(Server, info, infinity).
+validate_members(Server, Members) ->
+ gen_server2:cast(Server, {validate_members, Members}).
+
forget_group(GroupName) ->
{atomic, ok} = mnesia:sync_transaction(
fun () ->
@@ -659,6 +669,19 @@ handle_cast(join, State = #state { self = Self,
handle_callback_result(
{Module:joined(Args, get_pids(all_known_members(View))), State1});
+handle_cast({validate_members, OldMembers},
+ State = #state { view = View,
+ module = Module,
+ callback_args = Args }) ->
+ NewMembers = get_pids(all_known_members(View)),
+ Births = NewMembers -- OldMembers,
+ Deaths = OldMembers -- NewMembers,
+ case {Births, Deaths} of
+ {[], []} -> noreply(State);
+ _ -> Result = Module:members_changed(Args, Births, Deaths),
+ handle_callback_result({Result, State})
+ end;
+
handle_cast(leave, State) ->
{stop, normal, State}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 294e1ebb..5e83e8a4 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -100,7 +100,7 @@ init(Q = #amqqueue { name = QName }) ->
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
fun() -> init_it(Self, GM, Node, QName) end) of
- {new, QPid} ->
+ {new, QPid, GMPids} ->
erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@@ -127,6 +127,7 @@ init(Q = #amqqueue { name = QName }) ->
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_depth),
+ ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
@@ -144,7 +145,7 @@ init_it(Self, GM, Node, QName) ->
mnesia:read({rabbit_queue, QName}),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self, GM),
- {new, QPid};
+ {new, QPid, GMPids};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
@@ -156,7 +157,7 @@ init_it(Self, GM, Node, QName) ->
gm_pids = [T || T = {_, S} <- GMPids,
S =/= SPid] },
add_slave(Q1, Self, GM),
- {new, QPid}
+ {new, QPid, GMPids}
end
end.