summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl90
1 files changed, 53 insertions, 37 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a5d1f68e..b1a86493 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -41,15 +41,13 @@
%%----------------------------------------------------------------------------
--define(CREATION_EVENT_KEYS,
+-define(INFO_KEYS,
[pid,
name,
master_pid,
is_synchronised
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS).
-
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -61,7 +59,7 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState}
msg_id_ack, %% :: MsgId -> AckTag
msg_id_status,
@@ -120,12 +118,10 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- known_senders = pmon:new(),
+ known_senders = pmon:new(delegate),
depth_delta = undefined
},
- rabbit_event:notify(queue_slave_created,
- infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
{ok, State, hibernate,
@@ -284,7 +280,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
noreply(State);
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
- noreply(local_sender_death(ChPid, State));
+ local_sender_death(ChPid, State),
+ noreply(maybe_forget_sender(ChPid, down_from_ch, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -574,10 +571,15 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- Delivery <- queue:to_list(PubQ)],
+ Deliveries = [Delivery ||
+ {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
+ Delivery <- queue:to_list(PubQ)],
+ AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
+ KS1 = lists:foldl(fun (ChPid0, KS0) ->
+ pmon:demonitor(ChPid0, KS0)
+ end, KS, AwaitGmDown),
rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS,
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
noreply(State) ->
@@ -617,7 +619,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
-local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+local_sender_death(ChPid, #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
@@ -625,8 +627,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
- end,
- State.
+ end.
confirm_sender_death(Pid) ->
%% We have to deal with the possibility that we'll be promoted to
@@ -655,12 +656,38 @@ confirm_sender_death(Pid) ->
State
end,
%% Note that we do not remove our knowledge of this ChPid until we
- %% get the sender_death from GM.
+ %% get the sender_death from GM as well as a DOWN notification.
{ok, _TRef} = timer:apply_after(
?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue,
[self(), rabbit_mirror_queue_master, Fun]),
ok.
+forget_sender(_, running) -> false;
+forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
+
+%% Record and process lifetime events from channels. Forget all about a channel
+%% only when down notifications are received from both the channel and from gm.
+maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
+ msg_id_status = MS,
+ known_senders = KS }) ->
+ case dict:find(ChPid, SQ) of
+ error ->
+ State;
+ {ok, {MQ, PendCh, ChStateRecord}} ->
+ case forget_sender(ChState, ChStateRecord) of
+ true ->
+ credit_flow:peer_down(ChPid),
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = lists:foldl(
+ fun dict:erase/2,
+ MS, sets:to_list(PendCh)),
+ known_senders = pmon:demonitor(ChPid, KS) };
+ false ->
+ SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
+ State #state { sender_queues = SQ1 }
+ end
+ end.
+
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
@@ -669,9 +696,9 @@ maybe_enqueue_message(
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
MQ1 = queue:in(Delivery, MQ),
- SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
MS1 = send_or_record_confirm(
@@ -683,7 +710,7 @@ maybe_enqueue_message(
get_sender_queue(ChPid, SQ) ->
case dict:find(ChPid, SQ) of
- error -> {queue:new(), sets:new()};
+ error -> {queue:new(), sets:new(), running};
{ok, Val} -> Val
end.
@@ -691,19 +718,20 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
case dict:find(ChPid, SQ) of
error ->
SQ;
- {ok, {MQ, PendingCh}} ->
- dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
+ {ok, {MQ, PendingCh, ChState}} ->
+ dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
+ SQ)
end.
publish_or_discard(Status, ChPid, MsgId,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
%% We really are going to do the publish/discard right now, even
%% though we may not have seen it directly from the channel. But
- %% we cannot issues confirms until the latter has happened. So we
+ %% we cannot issue confirms until the latter has happened. So we
%% need to keep track of the MsgId and its confirmation status in
%% the meantime.
State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
@@ -723,7 +751,7 @@ publish_or_discard(Status, ChPid, MsgId,
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
@@ -784,25 +812,13 @@ process_instruction({requeue, MsgIds},
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
- State = #state { sender_queues = SQ,
- msg_id_status = MS,
- known_senders = KS }) ->
+ State = #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a message
%% from it. In this case we just want to avoid doing work if we
%% never got any messages.
{ok, case pmon:is_monitored(ChPid, KS) of
false -> State;
- true -> MS1 = case dict:find(ChPid, SQ) of
- error ->
- MS;
- {ok, {_MQ, PendingCh}} ->
- lists:foldl(fun dict:erase/2, MS,
- sets:to_list(PendingCh))
- end,
- credit_flow:peer_down(ChPid),
- State #state { sender_queues = dict:erase(ChPid, SQ),
- msg_id_status = MS1,
- known_senders = pmon:demonitor(ChPid, KS) }
+ true -> maybe_forget_sender(ChPid, down_from_gm, State)
end};
process_instruction({depth, Depth},
State = #state { backing_queue = BQ,