diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-04-17 14:48:41 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-04-17 14:48:41 +0100 |
commit | d564c0b221def8caa7c681e63730b00c33f22375 (patch) | |
tree | 67cb185f22fa75f4e4f7fafea265176ddea91b15 | |
parent | dfb19cda6c6663ea3ed3cc00bfc9996016ccba69 (diff) | |
parent | a38578e69f2147dc3ba24565139e18d9ace04a20 (diff) | |
download | rabbitmq-server-d564c0b221def8caa7c681e63730b00c33f22375.tar.gz |
Merge bug26117
-rw-r--r-- | include/gm_specs.hrl | 3 | ||||
-rw-r--r-- | src/gm.erl | 12 | ||||
-rw-r--r-- | src/gm_soak_test.erl | 4 | ||||
-rw-r--r-- | src/gm_speed_test.erl | 4 | ||||
-rw-r--r-- | src/gm_tests.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 66 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 29 |
8 files changed, 73 insertions, 61 deletions
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl index f4ea0df8..245c23bc 100644 --- a/include/gm_specs.hrl +++ b/include/gm_specs.hrl @@ -21,8 +21,7 @@ -type(members() :: [pid()]). -spec(joined/2 :: (args(), members()) -> callback_result()). --spec(members_changed/4 :: (args(), members(), - members(), members()) -> callback_result()). +-spec(members_changed/3 :: (args(), members(), members()) -> callback_result()). -spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()). -spec(terminate/2 :: (args(), term()) -> any()). @@ -476,8 +476,8 @@ %% joined/2 before receiving any messages from it; and (2) we will not %% see members die that we have not seen born (or supplied in the %% members to joined/2). --callback members_changed(Args :: term(), Births :: [pid()], - Deaths :: [pid()], Live :: [pid()]) -> +-callback members_changed(Args :: term(), + Births :: [pid()], Deaths :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. %% Supplied with Args provided in start_link, the sender, and the @@ -496,7 +496,7 @@ -else. behaviour_info(callbacks) -> - [{joined, 2}, {members_changed, 4}, {handle_msg, 3}, {terminate, 2}]; + [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}]; behaviour_info(_Other) -> undefined. @@ -685,8 +685,7 @@ handle_cast({validate_members, OldMembers}, Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> noreply(State); - _ -> Result = Module:members_changed( - Args, Births, Deaths, NewMembers), + _ -> Result = Module:members_changed(Args, Births, Deaths), handle_callback_result({Result, State}) end; @@ -1399,8 +1398,7 @@ callback_view_changed(Args, Module, OldView, NewView) -> case {Births, Deaths} of {[], []} -> ok; _ -> Module:members_changed( - Args, get_pids(Births), get_pids(Deaths), - get_pids(NewMembers)) + Args, get_pids(Births), get_pids(Deaths)) end. handle_callback_result({Result, State}) -> diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 4ff1645a..c9a25522 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -17,7 +17,7 @@ -module(gm_soak_test). -export([test/0]). --export([joined/2, members_changed/4, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). -behaviour(gm). @@ -51,7 +51,7 @@ joined([], Members) -> put(ts, now()), ok. -members_changed([], Births, Deaths, _Live) -> +members_changed([], Births, Deaths) -> with_state( fun (State) -> State1 = diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index fa515fa8..41be6dd8 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -17,7 +17,7 @@ -module(gm_speed_test). -export([test/3]). --export([joined/2, members_changed/4, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). -export([wile_e_coyote/2]). -behaviour(gm). @@ -30,7 +30,7 @@ joined(Owner, _Members) -> Owner ! joined, ok. -members_changed(_Owner, _Births, _Deaths, _Live) -> +members_changed(_Owner, _Births, _Deaths) -> ok. handle_msg(Owner, _From, ping) -> diff --git a/src/gm_tests.erl b/src/gm_tests.erl index 23b8f8cb..cae2164b 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -22,7 +22,7 @@ test_member_death/0, test_receive_in_order/0, all_tests/0]). --export([joined/2, members_changed/4, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). -behaviour(gm). @@ -40,7 +40,7 @@ joined(Pid, Members) -> Pid ! {joined, self(), Members}, ok. -members_changed(Pid, Births, Deaths, _Live) -> +members_changed(Pid, Births, Deaths) -> Pid ! {members_changed, self(), Births, Deaths}, ok. diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2feeea5a..23718da1 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -21,7 +21,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([joined/2, members_changed/4, handle_msg/3]). +-export([joined/2, members_changed/3, handle_msg/3]). -behaviour(gen_server2). -behaviour(gm). @@ -348,11 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> handle_call(get_gm, _From, State = #state { gm = GM }) -> reply(GM, State). -handle_cast({gm_deaths, LiveGMPids}, +handle_cast({gm_deaths, DeadGMPids}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue( - QueueName, MPid, LiveGMPids) of + QueueName, MPid, DeadGMPids) of {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), @@ -401,10 +401,10 @@ joined([CPid], Members) -> CPid ! {joined, self(), Members}, ok. -members_changed([_CPid], _Births, [], _Live) -> +members_changed([_CPid], _Births, []) -> ok; -members_changed([CPid], _Births, _Deaths, Live) -> - ok = gen_server2:cast(CPid, {gm_deaths, Live}). +members_changed([CPid], _Births, Deaths) -> + ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). handle_msg([CPid], _From, request_depth = Msg) -> ok = gen_server2:cast(CPid, Msg); diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ff1d5815..256543de 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -65,15 +65,8 @@ %%---------------------------------------------------------------------------- -%% If the dead pids include the queue pid (i.e. the master has died) -%% then only remove that if we are about to be promoted. Otherwise we -%% can have the situation where a slave updates the mnesia record for -%% a queue, promoting another slave before that slave realises it has -%% become the new master, which is bad because it could then mean the -%% slave (now master) receives messages it's not ready for (for -%% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, Self, LiveGMPids) -> +remove_from_queue(QueueName, Self, DeadGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -83,40 +76,63 @@ remove_from_queue(QueueName, Self, LiveGMPids) -> [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] -> - {GMPids1, Dead} = lists:partition( - fun ({GM, _}) -> - lists:member(GM, LiveGMPids) - end, GMPids), - DeadPids = [Pid || {_GM, Pid} <- Dead], - Alive = [QPid | SPids] -- DeadPids, + {DeadGM, AliveGM} = lists:partition( + fun ({GM, _}) -> + lists:member(GM, DeadGMPids) + end, GMPids), + DeadPids = [Pid || {_GM, Pid} <- DeadGM], + AlivePids = [Pid || {_GM, Pid} <- AliveGM], + Alive = [Pid || Pid <- [QPid | SPids], + lists:member(Pid, AlivePids)], {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - GMPids = GMPids1, %% ASSERTION - {ok, QPid1, []}; + ok; _ when QPid =:= QPid1 orelse QPid1 =:= Self -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. Q1 = Q#amqqueue{pid = QPid1, slave_pids = SPids1, - gm_pids = GMPids1}, + gm_pids = AliveGM}, store_updated_slaves(Q1), %% If we add and remove nodes at the same time we %% might tell the old master we need to sync and %% then shut it down. So let's check if the new %% master needs to sync. - maybe_auto_sync(Q1), - {ok, QPid1, [QPid | SPids] -- Alive}; + maybe_auto_sync(Q1); _ -> - %% Master has changed, and we're not it, - %% so leave alone to allow the promoted - %% slave to find it and make its - %% promotion atomic. - {ok, QPid1, []} - end + %% Master has changed, and we're not it. + %% [1]. + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM}, + store_updated_slaves(Q1) + end, + {ok, QPid1, DeadPids} end end). +%% [1] We still update mnesia here in case the slave that is supposed +%% to become master dies before it does do so, in which case the dead +%% old master might otherwise never get removed, which in turn might +%% prevent promotion of another slave (e.g. us). +%% +%% Note however that we do not update the master pid. Otherwise we can +%% have the situation where a slave updates the mnesia record for a +%% queue, promoting another slave before that slave realises it has +%% become the new master, which is bad because it could then mean the +%% slave (now master) receives messages it's not ready for (for +%% example, new consumers). +%% +%% We set slave_pids to Alive rather than SPids1 since otherwise we'd +%% be removing the pid of the candidate master, which in turn would +%% prevent it from promoting itself. +%% +%% We maintain gm_pids as our source of truth, i.e. it contains the +%% most up-to-date information about which GMs and associated +%% {M,S}Pids are alive. And all pids in slave_pids always have a +%% corresponding entry in gm_pids. By contrast, due to the +%% aforementioned restriction on updating the master pid, that pid may +%% not be present in gm_pids, but only if said master has died. on_node_up() -> QNames = diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8c550d95..f6acd91a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -30,7 +30,7 @@ code_change/3, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). --export([joined/2, members_changed/4, handle_msg/3]). +-export([joined/2, members_changed/3, handle_msg/3]). -behaviour(gen_server2). -behaviour(gm). @@ -191,11 +191,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> {error, Error} -> {stop, Error, NotStarted} end; -handle_call({gm_deaths, LiveGMPids}, From, +handle_call({gm_deaths, DeadGMPids}, From, State = #state { gm = GM, q = Q = #amqqueue { name = QName, pid = MPid }}) -> Self = self(), - case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of + case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; @@ -368,7 +368,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; - {gm_deaths, _Live} -> 5; + {gm_deaths, _Dead} -> 5; _ -> 0 end. @@ -396,10 +396,17 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, [], _Live) -> +members_changed([_SPid], _Births, []) -> ok; -members_changed([ SPid], _Births, _Deaths, Live) -> - inform_deaths(SPid, Live). +members_changed([ SPid], _Births, Deaths) -> + case rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun() -> + gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) + end) of + ok -> ok; + {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} + end. handle_msg([_SPid], _From, request_depth) -> %% This is only of value to the master @@ -424,14 +431,6 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). -inform_deaths(SPid, Live) -> - case rabbit_misc:with_exit_handler( - rabbit_misc:const(ok), - fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of - ok -> ok; - {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} - end. - %% --------------------------------------------------------------------------- %% Others %% --------------------------------------------------------------------------- |