diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8e541db1..b4b0d4d3 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -150,9 +150,7 @@ init_it(Self, Node, QueueName) -> [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), + [] -> add_slave(Q1, Self, MPids), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; @@ -160,13 +158,16 @@ init_it(Self, Node, QueueName) -> end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> existing; - false -> MPids1 = (MPids -- [SPid]) ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), + false -> add_slave(Q1, Self, MPids -- [SPid]), {new, QPid} end end. +%% Add to the end, so they are in descending order of age, see +%% rabbit_mirror_queue_misc:promote_slave/1 +add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = MPids ++ [New]}). + handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), @@ -182,18 +183,25 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids} -> + {ok, Pid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed - reply(ok, State); + gen_server2:reply(From, ok), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), + noreply(State); node(Pid) =:= node() -> %% we've become master - promote_me(From, State); + QueueState = promote_me(From, State), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), + {become, rabbit_amqqueue_process, QueueState, hibernate}; true -> %% master has changed to not us. gen_server2:reply(From, ok), + %% assertion, we don't need to add_mirrors/2 in this + %% branch, see last clause in remove_from_queue/2 + [] = ExtraNodes, erlang:monitor(process, Pid), %% GM is lazy. So we know of the death of the %% slave since it is a neighbour of ours, but @@ -533,10 +541,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], - QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, KS, MTC), - {become, rabbit_amqqueue_process, QueueState, hibernate}. + rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, + Deliveries, KS, MTC). noreply(State) -> {NewState, Timeout} = next_state(State), |