summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-04-17 14:48:41 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-04-17 14:48:41 +0100
commitd564c0b221def8caa7c681e63730b00c33f22375 (patch)
tree67cb185f22fa75f4e4f7fafea265176ddea91b15
parentdfb19cda6c6663ea3ed3cc00bfc9996016ccba69 (diff)
parenta38578e69f2147dc3ba24565139e18d9ace04a20 (diff)
downloadrabbitmq-server-d564c0b221def8caa7c681e63730b00c33f22375.tar.gz
Merge bug26117
-rw-r--r--include/gm_specs.hrl3
-rw-r--r--src/gm.erl12
-rw-r--r--src/gm_soak_test.erl4
-rw-r--r--src/gm_speed_test.erl4
-rw-r--r--src/gm_tests.erl4
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl12
-rw-r--r--src/rabbit_mirror_queue_misc.erl66
-rw-r--r--src/rabbit_mirror_queue_slave.erl29
8 files changed, 73 insertions, 61 deletions
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index f4ea0df8..245c23bc 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -21,8 +21,7 @@
-type(members() :: [pid()]).
-spec(joined/2 :: (args(), members()) -> callback_result()).
--spec(members_changed/4 :: (args(), members(),
- members(), members()) -> callback_result()).
+-spec(members_changed/3 :: (args(), members(), members()) -> callback_result()).
-spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()).
-spec(terminate/2 :: (args(), term()) -> any()).
diff --git a/src/gm.erl b/src/gm.erl
index 2ed2fcf1..9a51bfc2 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -476,8 +476,8 @@
%% joined/2 before receiving any messages from it; and (2) we will not
%% see members die that we have not seen born (or supplied in the
%% members to joined/2).
--callback members_changed(Args :: term(), Births :: [pid()],
- Deaths :: [pid()], Live :: [pid()]) ->
+-callback members_changed(Args :: term(),
+ Births :: [pid()], Deaths :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Supplied with Args provided in start_link, the sender, and the
@@ -496,7 +496,7 @@
-else.
behaviour_info(callbacks) ->
- [{joined, 2}, {members_changed, 4}, {handle_msg, 3}, {terminate, 2}];
+ [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}];
behaviour_info(_Other) ->
undefined.
@@ -685,8 +685,7 @@ handle_cast({validate_members, OldMembers},
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> noreply(State);
- _ -> Result = Module:members_changed(
- Args, Births, Deaths, NewMembers),
+ _ -> Result = Module:members_changed(Args, Births, Deaths),
handle_callback_result({Result, State})
end;
@@ -1399,8 +1398,7 @@ callback_view_changed(Args, Module, OldView, NewView) ->
case {Births, Deaths} of
{[], []} -> ok;
_ -> Module:members_changed(
- Args, get_pids(Births), get_pids(Deaths),
- get_pids(NewMembers))
+ Args, get_pids(Births), get_pids(Deaths))
end.
handle_callback_result({Result, State}) ->
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 4ff1645a..c9a25522 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -17,7 +17,7 @@
-module(gm_soak_test).
-export([test/0]).
--export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
-behaviour(gm).
@@ -51,7 +51,7 @@ joined([], Members) ->
put(ts, now()),
ok.
-members_changed([], Births, Deaths, _Live) ->
+members_changed([], Births, Deaths) ->
with_state(
fun (State) ->
State1 =
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index fa515fa8..41be6dd8 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -17,7 +17,7 @@
-module(gm_speed_test).
-export([test/3]).
--export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
-export([wile_e_coyote/2]).
-behaviour(gm).
@@ -30,7 +30,7 @@ joined(Owner, _Members) ->
Owner ! joined,
ok.
-members_changed(_Owner, _Births, _Deaths, _Live) ->
+members_changed(_Owner, _Births, _Deaths) ->
ok.
handle_msg(Owner, _From, ping) ->
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 23b8f8cb..cae2164b 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -22,7 +22,7 @@
test_member_death/0,
test_receive_in_order/0,
all_tests/0]).
--export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
-behaviour(gm).
@@ -40,7 +40,7 @@ joined(Pid, Members) ->
Pid ! {joined, self(), Members},
ok.
-members_changed(Pid, Births, Deaths, _Live) ->
+members_changed(Pid, Births, Deaths) ->
Pid ! {members_changed, self(), Births, Deaths},
ok.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2feeea5a..23718da1 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -21,7 +21,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([joined/2, members_changed/4, handle_msg/3]).
+-export([joined/2, members_changed/3, handle_msg/3]).
-behaviour(gen_server2).
-behaviour(gm).
@@ -348,11 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
-handle_cast({gm_deaths, LiveGMPids},
+handle_cast({gm_deaths, DeadGMPids},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(
- QueueName, MPid, LiveGMPids) of
+ QueueName, MPid, DeadGMPids) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
@@ -401,10 +401,10 @@ joined([CPid], Members) ->
CPid ! {joined, self(), Members},
ok.
-members_changed([_CPid], _Births, [], _Live) ->
+members_changed([_CPid], _Births, []) ->
ok;
-members_changed([CPid], _Births, _Deaths, Live) ->
- ok = gen_server2:cast(CPid, {gm_deaths, Live}).
+members_changed([CPid], _Births, Deaths) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ff1d5815..256543de 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -65,15 +65,8 @@
%%----------------------------------------------------------------------------
-%% If the dead pids include the queue pid (i.e. the master has died)
-%% then only remove that if we are about to be promoted. Otherwise we
-%% can have the situation where a slave updates the mnesia record for
-%% a queue, promoting another slave before that slave realises it has
-%% become the new master, which is bad because it could then mean the
-%% slave (now master) receives messages it's not ready for (for
-%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, Self, LiveGMPids) ->
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -83,40 +76,63 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
gm_pids = GMPids }] ->
- {GMPids1, Dead} = lists:partition(
- fun ({GM, _}) ->
- lists:member(GM, LiveGMPids)
- end, GMPids),
- DeadPids = [Pid || {_GM, Pid} <- Dead],
- Alive = [QPid | SPids] -- DeadPids,
+ {DeadGM, AliveGM} = lists:partition(
+ fun ({GM, _}) ->
+ lists:member(GM, DeadGMPids)
+ end, GMPids),
+ DeadPids = [Pid || {_GM, Pid} <- DeadGM],
+ AlivePids = [Pid || {_GM, Pid} <- AliveGM],
+ Alive = [Pid || Pid <- [QPid | SPids],
+ lists:member(Pid, AlivePids)],
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- GMPids = GMPids1, %% ASSERTION
- {ok, QPid1, []};
+ ok;
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
Q1 = Q#amqqueue{pid = QPid1,
slave_pids = SPids1,
- gm_pids = GMPids1},
+ gm_pids = AliveGM},
store_updated_slaves(Q1),
%% If we add and remove nodes at the same time we
%% might tell the old master we need to sync and
%% then shut it down. So let's check if the new
%% master needs to sync.
- maybe_auto_sync(Q1),
- {ok, QPid1, [QPid | SPids] -- Alive};
+ maybe_auto_sync(Q1);
_ ->
- %% Master has changed, and we're not it,
- %% so leave alone to allow the promoted
- %% slave to find it and make its
- %% promotion atomic.
- {ok, QPid1, []}
- end
+ %% Master has changed, and we're not it.
+ %% [1].
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1)
+ end,
+ {ok, QPid1, DeadPids}
end
end).
+%% [1] We still update mnesia here in case the slave that is supposed
+%% to become master dies before it does do so, in which case the dead
+%% old master might otherwise never get removed, which in turn might
+%% prevent promotion of another slave (e.g. us).
+%%
+%% Note however that we do not update the master pid. Otherwise we can
+%% have the situation where a slave updates the mnesia record for a
+%% queue, promoting another slave before that slave realises it has
+%% become the new master, which is bad because it could then mean the
+%% slave (now master) receives messages it's not ready for (for
+%% example, new consumers).
+%%
+%% We set slave_pids to Alive rather than SPids1 since otherwise we'd
+%% be removing the pid of the candidate master, which in turn would
+%% prevent it from promoting itself.
+%%
+%% We maintain gm_pids as our source of truth, i.e. it contains the
+%% most up-to-date information about which GMs and associated
+%% {M,S}Pids are alive. And all pids in slave_pids always have a
+%% corresponding entry in gm_pids. By contrast, due to the
+%% aforementioned restriction on updating the master pid, that pid may
+%% not be present in gm_pids, but only if said master has died.
on_node_up() ->
QNames =
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8c550d95..f6acd91a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -30,7 +30,7 @@
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
--export([joined/2, members_changed/4, handle_msg/3]).
+-export([joined/2, members_changed/3, handle_msg/3]).
-behaviour(gen_server2).
-behaviour(gm).
@@ -191,11 +191,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
{error, Error} -> {stop, Error, NotStarted}
end;
-handle_call({gm_deaths, LiveGMPids}, From,
+handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
name = QName, pid = MPid }}) ->
Self = self(),
- case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
@@ -368,7 +368,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
- {gm_deaths, _Live} -> 5;
+ {gm_deaths, _Dead} -> 5;
_ -> 0
end.
@@ -396,10 +396,17 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, [], _Live) ->
+members_changed([_SPid], _Births, []) ->
ok;
-members_changed([ SPid], _Births, _Deaths, Live) ->
- inform_deaths(SPid, Live).
+members_changed([ SPid], _Births, Deaths) ->
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun() ->
+ gen_server2:call(SPid, {gm_deaths, Deaths}, infinity)
+ end) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
@@ -424,14 +431,6 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
-inform_deaths(SPid, Live) ->
- case rabbit_misc:with_exit_handler(
- rabbit_misc:const(ok),
- fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of
- ok -> ok;
- {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
- end.
-
%% ---------------------------------------------------------------------------
%% Others
%% ---------------------------------------------------------------------------