summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-08-06 16:45:33 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-08-06 16:45:33 +0100
commit734b2d9c2feee3f90cd69f19ba45bfbac065143e (patch)
tree249fd6e5a50242271f25946f31dbb4077824d95a
parenteb4c410d607d7e9cfa6c102038ef9c6f48ad8725 (diff)
downloadrabbitmq-server-734b2d9c2feee3f90cd69f19ba45bfbac065143e.tar.gz
Delay clearing of state in slaves
until sender down notification is received from channel as well as GM in order to avoid messages being enqueued more than once
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl27
-rw-r--r--src/rabbit_mirror_queue_slave.erl78
2 files changed, 63 insertions, 42 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index c9918fed..f54e9bd1 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -222,20 +222,19 @@
%% sender_death message to all the slaves, saying the sender has
%% died. Once the slaves receive the sender_death message, they know
%% that they're not going to receive any more instructions from the gm
-%% regarding that sender, thus they throw away any publications from
-%% the sender pending publication instructions. However, it is
-%% possible that the coordinator receives the DOWN and communicates
-%% that to the master before the master has finished receiving and
-%% processing publishes from the sender. This turns out not to be a
-%% problem: the sender has actually died, and so will not need to
-%% receive confirms or other feedback, and should further messages be
-%% "received" from the sender, the master will ask the coordinator to
-%% set up a new monitor, and will continue to process the messages
-%% normally. Slaves may thus receive publishes via gm from previously
-%% declared "dead" senders, but again, this is fine: should the slave
-%% have just thrown out the message it had received directly from the
-%% sender (due to receiving a sender_death message via gm), it will be
-%% able to cope with the publication purely from the master via gm.
+%% regarding that sender. However, it is possible that the coordinator
+%% receives the DOWN and communicates that to the master before the
+%% master has finished receiving and processing publishes from the
+%% sender. This turns out not to be a problem: the sender has actually
+%% died, and so will not need to receive confirms or other feedback,
+%% and should further messages be "received" from the sender, the
+%% master will ask the coordinator to set up a new monitor, and
+%% will continue to process the messages normally. Slaves may thus
+%% receive publishes via gm from previously declared "dead" senders,
+%% but again, this is fine: should the slave have just thrown out the
+%% message it had received directly from the sender (due to receiving
+%% a sender_death message via gm), it will be able to cope with the
+%% publication purely from the master via gm.
%%
%% When a slave receives a DOWN message for a sender, if it has not
%% received the sender_death message from the master via gm already,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1996fd0a..6425a855 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -61,7 +61,7 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState}
msg_id_ack, %% :: MsgId -> AckTag
msg_id_status,
@@ -275,7 +275,7 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
local_sender_death(ChPid, State),
- noreply(State);
+ noreply(sender_lifetime(ChPid, down_from_ch, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -563,10 +563,15 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- Delivery <- queue:to_list(PubQ)],
+ Deliveries = [Delivery ||
+ {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
+ Delivery <- queue:to_list(PubQ)],
+ AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
+ KS1 = lists:foldl(fun (ChPid0, KS0) ->
+ pmon:demonitor(ChPid0, KS0)
+ end, KS, AwaitGmDown),
rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS,
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
noreply(State) ->
@@ -643,12 +648,39 @@ confirm_sender_death(Pid) ->
State
end,
%% Note that we do not remove our knowledge of this ChPid until we
- %% get the sender_death from GM.
+ %% get the sender_death from GM as well as a DOWN notification.
{ok, _TRef} = timer:apply_after(
?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue,
[self(), rabbit_mirror_queue_master, Fun]),
ok.
+forget_sender(running, _) -> false;
+forget_sender(_, running) -> false;
+forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
+
+%% Record and process lifetime events from channels. Forget all about a channel
+%% only when down notifications are received from both the channel and from gm.
+sender_lifetime(ChPid, ChState, State = #state { sender_queues = SQ,
+ msg_id_status = MS,
+ known_senders = KS }) ->
+ case dict:find(ChPid, SQ) of
+ error ->
+ State;
+ {ok, {MQ, PendCh, ChStateRecord}} ->
+ case forget_sender(ChState, ChStateRecord) of
+ true ->
+ credit_flow:peer_down(ChPid),
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = lists:foldl(
+ fun dict:erase/2,
+ MS, sets:to_list(PendCh)),
+ known_senders = pmon:demonitor(ChPid, KS) };
+ false ->
+ SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
+ State #state { sender_queues = SQ1 }
+ end
+ end.
+
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
@@ -657,9 +689,9 @@ maybe_enqueue_message(
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
MQ1 = queue:in(Delivery, MQ),
- SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
MS1 = send_or_record_confirm(
@@ -671,7 +703,7 @@ maybe_enqueue_message(
get_sender_queue(ChPid, SQ) ->
case dict:find(ChPid, SQ) of
- error -> {queue:new(), sets:new()};
+ error -> {queue:new(), sets:new(), running};
{ok, Val} -> Val
end.
@@ -679,19 +711,20 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
case dict:find(ChPid, SQ) of
error ->
SQ;
- {ok, {MQ, PendingCh}} ->
- dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
+ {ok, {MQ, PendingCh, ChState}} ->
+ dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
+ SQ)
end.
publish_or_discard(Status, ChPid, MsgId,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
%% We really are going to do the publish/discard right now, even
%% though we may not have seen it directly from the channel. But
- %% we cannot issues confirms until the latter has happened. So we
+ %% we cannot issue confirms until the latter has happened. So we
%% need to keep track of the MsgId and its confirmation status in
%% the meantime.
State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
@@ -711,7 +744,7 @@ publish_or_discard(Status, ChPid, MsgId,
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
@@ -772,25 +805,14 @@ process_instruction({requeue, MsgIds},
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
- State = #state { sender_queues = SQ,
- msg_id_status = MS,
- known_senders = KS }) ->
+ State = #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a message
%% from it. In this case we just want to avoid doing work if we
%% never got any messages.
{ok, case pmon:is_monitored(ChPid, KS) of
false -> State;
- true -> MS1 = case dict:find(ChPid, SQ) of
- error ->
- MS;
- {ok, {_MQ, PendingCh}} ->
- lists:foldl(fun dict:erase/2, MS,
- sets:to_list(PendingCh))
- end,
- credit_flow:peer_down(ChPid),
- State #state { sender_queues = dict:erase(ChPid, SQ),
- msg_id_status = MS1,
- known_senders = pmon:demonitor(ChPid, KS) }
+ true -> credit_flow:peer_down(ChPid),
+ sender_lifetime(ChPid, down_from_gm, State)
end};
process_instruction({depth, Depth},
State = #state { backing_queue = BQ,