summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl33
1 files changed, 20 insertions, 13 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8e541db1..b4b0d4d3 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -150,9 +150,7 @@ init_it(Self, Node, QueueName) ->
[Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
+ [] -> add_slave(Q1, Self, MPids),
{new, QPid};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
@@ -160,13 +158,16 @@ init_it(Self, Node, QueueName) ->
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
true -> existing;
- false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
+ false -> add_slave(Q1, Self, MPids -- [SPid]),
{new, QPid}
end
end.
+%% Add to the end, so they are in descending order of age, see
+%% rabbit_mirror_queue_misc:promote_slave/1
+add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves(
+ Q#amqqueue{slave_pids = MPids ++ [New]}).
+
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
@@ -182,18 +183,25 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, DeadPids} ->
+ {ok, Pid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
DeadPids),
if node(Pid) =:= node(MPid) ->
%% master hasn't changed
- reply(ok, State);
+ gen_server2:reply(From, ok),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
+ noreply(State);
node(Pid) =:= node() ->
%% we've become master
- promote_me(From, State);
+ QueueState = promote_me(From, State),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
+ {become, rabbit_amqqueue_process, QueueState, hibernate};
true ->
%% master has changed to not us.
gen_server2:reply(From, ok),
+ %% assertion, we don't need to add_mirrors/2 in this
+ %% branch, see last clause in remove_from_queue/2
+ [] = ExtraNodes,
erlang:monitor(process, Pid),
%% GM is lazy. So we know of the death of the
%% slave since it is a neighbour of ours, but
@@ -533,10 +541,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
- QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, KS, MTC),
- {become, rabbit_amqqueue_process, QueueState, hibernate}.
+ rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags,
+ Deliveries, KS, MTC).
noreply(State) ->
{NewState, Timeout} = next_state(State),