diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-08-06 16:45:33 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-08-06 16:45:33 +0100 |
commit | 734b2d9c2feee3f90cd69f19ba45bfbac065143e (patch) | |
tree | 249fd6e5a50242271f25946f31dbb4077824d95a | |
parent | eb4c410d607d7e9cfa6c102038ef9c6f48ad8725 (diff) | |
download | rabbitmq-server-734b2d9c2feee3f90cd69f19ba45bfbac065143e.tar.gz |
Delay clearing of state in slaves
until sender down notification is received from channel as well as GM
in order to avoid messages being enqueued more than once
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 27 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 78 |
2 files changed, 63 insertions, 42 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index c9918fed..f54e9bd1 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -222,20 +222,19 @@ %% sender_death message to all the slaves, saying the sender has %% died. Once the slaves receive the sender_death message, they know %% that they're not going to receive any more instructions from the gm -%% regarding that sender, thus they throw away any publications from -%% the sender pending publication instructions. However, it is -%% possible that the coordinator receives the DOWN and communicates -%% that to the master before the master has finished receiving and -%% processing publishes from the sender. This turns out not to be a -%% problem: the sender has actually died, and so will not need to -%% receive confirms or other feedback, and should further messages be -%% "received" from the sender, the master will ask the coordinator to -%% set up a new monitor, and will continue to process the messages -%% normally. Slaves may thus receive publishes via gm from previously -%% declared "dead" senders, but again, this is fine: should the slave -%% have just thrown out the message it had received directly from the -%% sender (due to receiving a sender_death message via gm), it will be -%% able to cope with the publication purely from the master via gm. +%% regarding that sender. However, it is possible that the coordinator +%% receives the DOWN and communicates that to the master before the +%% master has finished receiving and processing publishes from the +%% sender. This turns out not to be a problem: the sender has actually +%% died, and so will not need to receive confirms or other feedback, +%% and should further messages be "received" from the sender, the +%% master will ask the coordinator to set up a new monitor, and +%% will continue to process the messages normally. Slaves may thus +%% receive publishes via gm from previously declared "dead" senders, +%% but again, this is fine: should the slave have just thrown out the +%% message it had received directly from the sender (due to receiving +%% a sender_death message via gm), it will be able to cope with the +%% publication purely from the master via gm. %% %% When a slave receives a DOWN message for a sender, if it has not %% received the sender_death message from the master via gm already, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1996fd0a..6425a855 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -61,7 +61,7 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q Msg, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState} msg_id_ack, %% :: MsgId -> AckTag msg_id_status, @@ -275,7 +275,7 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> local_sender_death(ChPid, State), - noreply(State); + noreply(sender_lifetime(ChPid, down_from_ch, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -563,10 +563,15 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - Delivery <- queue:to_list(PubQ)], + Deliveries = [Delivery || + {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), + Delivery <- queue:to_list(PubQ)], + AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], + KS1 = lists:foldl(fun (ChPid0, KS0) -> + pmon:demonitor(ChPid0, KS0) + end, KS, AwaitGmDown), rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS, + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). noreply(State) -> @@ -643,12 +648,39 @@ confirm_sender_death(Pid) -> State end, %% Note that we do not remove our knowledge of this ChPid until we - %% get the sender_death from GM. + %% get the sender_death from GM as well as a DOWN notification. {ok, _TRef} = timer:apply_after( ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue, [self(), rabbit_mirror_queue_master, Fun]), ok. +forget_sender(running, _) -> false; +forget_sender(_, running) -> false; +forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. + +%% Record and process lifetime events from channels. Forget all about a channel +%% only when down notifications are received from both the channel and from gm. +sender_lifetime(ChPid, ChState, State = #state { sender_queues = SQ, + msg_id_status = MS, + known_senders = KS }) -> + case dict:find(ChPid, SQ) of + error -> + State; + {ok, {MQ, PendCh, ChStateRecord}} -> + case forget_sender(ChState, ChStateRecord) of + true -> + credit_flow:peer_down(ChPid), + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = lists:foldl( + fun dict:erase/2, + MS, sets:to_list(PendCh)), + known_senders = pmon:demonitor(ChPid, KS) }; + false -> + SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ), + State #state { sender_queues = SQ1 } + end + end. + maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, @@ -657,9 +689,9 @@ maybe_enqueue_message( %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), MQ1 = queue:in(Delivery, MQ), - SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ), State1 #state { sender_queues = SQ1 }; {ok, Status} -> MS1 = send_or_record_confirm( @@ -671,7 +703,7 @@ maybe_enqueue_message( get_sender_queue(ChPid, SQ) -> case dict:find(ChPid, SQ) of - error -> {queue:new(), sets:new()}; + error -> {queue:new(), sets:new(), running}; {ok, Val} -> Val end. @@ -679,19 +711,20 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> case dict:find(ChPid, SQ) of error -> SQ; - {ok, {MQ, PendingCh}} -> - dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) + {ok, {MQ, PendingCh, ChState}} -> + dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, + SQ) end. publish_or_discard(Status, ChPid, MsgId, State = #state { sender_queues = SQ, msg_id_status = MS }) -> %% We really are going to do the publish/discard right now, even %% though we may not have seen it directly from the channel. But - %% we cannot issues confirms until the latter has happened. So we + %% we cannot issue confirms until the latter has happened. So we %% need to keep track of the MsgId and its confirmation status in %% the meantime. State1 = ensure_monitoring(ChPid, State), - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> @@ -711,7 +744,7 @@ publish_or_discard(Status, ChPid, MsgId, %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ), State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. @@ -772,25 +805,14 @@ process_instruction({requeue, MsgIds}, {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, - State = #state { sender_queues = SQ, - msg_id_status = MS, - known_senders = KS }) -> + State = #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a message %% from it. In this case we just want to avoid doing work if we %% never got any messages. {ok, case pmon:is_monitored(ChPid, KS) of false -> State; - true -> MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - credit_flow:peer_down(ChPid), - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = pmon:demonitor(ChPid, KS) } + true -> credit_flow:peer_down(ChPid), + sender_lifetime(ChPid, down_from_gm, State) end}; process_instruction({depth, Depth}, State = #state { backing_queue = BQ, |