summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl15
1 files changed, 13 insertions, 2 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 22edfcb6..964b0eb4 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -605,10 +605,13 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+ %% The channel will be monitored iff we have received a delivery
+ %% from it but not heard about its death from the master. So if it
+ %% is monitored we need to point the death out to the master (see
+ %% essay).
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
- true -> credit_flow:peer_down(ChPid),
- confirm_sender_death(ChPid)
+ true -> confirm_sender_death(ChPid)
end,
State.
@@ -621,6 +624,10 @@ confirm_sender_death(Pid) ->
fun (?MODULE, State = #state { known_senders = KS,
gm = GM }) ->
%% We're running still as a slave
+ %%
+ %% See comment in local_sender_death/2; we might have
+ %% received a sender_death in the meanwhile so check
+ %% again.
ok = case pmon:is_monitored(Pid, KS) of
false -> ok;
true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
@@ -766,6 +773,9 @@ process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
+ %% The channel will be monitored iff we have received a message
+ %% from it. In this case we just want to avoid doing work if we
+ %% never got any messages.
{ok, case pmon:is_monitored(ChPid, KS) of
false -> State;
true -> MS1 = case dict:find(ChPid, SQ) of
@@ -775,6 +785,7 @@ process_instruction({sender_death, ChPid},
lists:foldl(fun dict:erase/2, MS,
sets:to_list(PendingCh))
end,
+ credit_flow:peer_down(ChPid),
State #state { sender_queues = dict:erase(ChPid, SQ),
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }