summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-16 07:42:52 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-16 07:42:52 +0100
commit757b4beb0e21dd4401cea9e1dc35aff6b6f6c1e7 (patch)
treea91c1d503e516bbdd99b9496355b4626127e730e
parent27d16335d0e3025b718d7dc9bad9b3749e2fd43f (diff)
downloadrabbitmq-server-757b4beb0e21dd4401cea9e1dc35aff6b6f6c1e7.tar.gz
drive remove_from_queue with DeadGMPids
...instead of LiveGMPids The latter may be out of date s.t. it contains fewer pids than are actually alive, due to new GMs having sprung into live recently. We'd then, incorrectly, remove the corresponding entries from the queue's mnesia record, causing havoc. DeadGMPids can be out of date too; it may contain fewer pids than have actually died, due to GMs having died more recently. That is harmless though since it never leads us to remove an entry that we shouldn't, and we will learn about any new deaths eventually.
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl10
-rw-r--r--src/rabbit_mirror_queue_misc.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
3 files changed, 37 insertions, 28 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2feeea5a..71ce512e 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -348,11 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
-handle_cast({gm_deaths, LiveGMPids},
+handle_cast({gm_deaths, DeadGMPids},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(
- QueueName, MPid, LiveGMPids) of
+ QueueName, MPid, DeadGMPids) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
@@ -401,10 +401,10 @@ joined([CPid], Members) ->
CPid ! {joined, self(), Members},
ok.
-members_changed([_CPid], _Births, [], _Live) ->
+members_changed([_CPid], _Births, [], _Live) ->
ok;
-members_changed([CPid], _Births, _Deaths, Live) ->
- ok = gen_server2:cast(CPid, {gm_deaths, Live}).
+members_changed([CPid], _Births, Deaths, _Live) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ff1d5815..2838996c 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -73,7 +73,7 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, Self, LiveGMPids) ->
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -83,16 +83,15 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
gm_pids = GMPids }] ->
- {GMPids1, Dead} = lists:partition(
+ {Dead, GMPids1} = lists:partition(
fun ({GM, _}) ->
- lists:member(GM, LiveGMPids)
+ lists:member(GM, DeadGMPids)
end, GMPids),
DeadPids = [Pid || {_GM, Pid} <- Dead],
Alive = [QPid | SPids] -- DeadPids,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- GMPids = GMPids1, %% ASSERTION
{ok, QPid1, []};
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
@@ -107,12 +106,23 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
%% then shut it down. So let's check if the new
%% master needs to sync.
maybe_auto_sync(Q1),
- {ok, QPid1, [QPid | SPids] -- Alive};
+ {ok, QPid1, DeadPids};
_ ->
- %% Master has changed, and we're not it,
- %% so leave alone to allow the promoted
- %% slave to find it and make its
- %% promotion atomic.
+ %% Master has changed, and we're not it.
+ %% We still update mnesia here in case
+ %% the slave that is supposed to become
+ %% master dies before it does do so, in
+ %% which case the dead old master might
+ %% otherwise never get removed, which in
+ %% turn might prevent promotion of
+ %% another slave (e.g. us).
+ %%
+ %% Note however that we do not update
+ %% the master pid, for reasons explained
+ %% at the top of the function.
+ Q1 = Q#amqqueue{slave_pids = SPids1,
+ gm_pids = GMPids1},
+ store_updated_slaves(Q1),
{ok, QPid1, []}
end
end
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e5c3adac..fbdcb979 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -191,11 +191,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
{error, Error} -> {stop, Error, NotStarted}
end;
-handle_call({gm_deaths, LiveGMPids}, From,
+handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
name = QName, pid = MPid }}) ->
Self = self(),
- case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
@@ -365,7 +365,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
- {gm_deaths, _Live} -> 5;
+ {gm_deaths, _Dead} -> 5;
_ -> 0
end.
@@ -393,10 +393,17 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, [], _Live) ->
+members_changed([_SPid], _Births, [], _Live) ->
ok;
-members_changed([ SPid], _Births, _Deaths, Live) ->
- inform_deaths(SPid, Live).
+members_changed([ SPid], _Births, Deaths, _Live) ->
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun() ->
+ gen_server2:call(SPid, {gm_deaths, Deaths}, infinity)
+ end) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
@@ -421,14 +428,6 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
-inform_deaths(SPid, Live) ->
- case rabbit_misc:with_exit_handler(
- rabbit_misc:const(ok),
- fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of
- ok -> ok;
- {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
- end.
-
%% ---------------------------------------------------------------------------
%% Others
%% ---------------------------------------------------------------------------