diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 90 |
1 files changed, 53 insertions, 37 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a5d1f68e..b1a86493 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -41,15 +41,13 @@ %%---------------------------------------------------------------------------- --define(CREATION_EVENT_KEYS, +-define(INFO_KEYS, [pid, name, master_pid, is_synchronised ]). --define(INFO_KEYS, ?CREATION_EVENT_KEYS). - -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(DEATH_TIMEOUT, 20000). %% 20 seconds @@ -61,7 +59,7 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q Msg, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState} msg_id_ack, %% :: MsgId -> AckTag msg_id_status, @@ -120,12 +118,10 @@ init(Q = #amqqueue { name = QName }) -> msg_id_ack = dict:new(), msg_id_status = dict:new(), - known_senders = pmon:new(), + known_senders = pmon:new(delegate), depth_delta = undefined }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), {ok, State, hibernate, @@ -284,7 +280,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> - noreply(local_sender_death(ChPid, State)); + local_sender_death(ChPid, State), + noreply(maybe_forget_sender(ChPid, down_from_ch, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -574,10 +571,15 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - Delivery <- queue:to_list(PubQ)], + Deliveries = [Delivery || + {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), + Delivery <- queue:to_list(PubQ)], + AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], + KS1 = lists:foldl(fun (ChPid0, KS0) -> + pmon:demonitor(ChPid0, KS0) + end, KS, AwaitGmDown), rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS, + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). noreply(State) -> @@ -617,7 +619,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. -local_sender_death(ChPid, State = #state { known_senders = KS }) -> +local_sender_death(ChPid, #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a delivery %% from it but not heard about its death from the master. So if it %% is monitored we need to point the death out to the master (see @@ -625,8 +627,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> confirm_sender_death(ChPid) - end, - State. + end. confirm_sender_death(Pid) -> %% We have to deal with the possibility that we'll be promoted to @@ -655,12 +656,38 @@ confirm_sender_death(Pid) -> State end, %% Note that we do not remove our knowledge of this ChPid until we - %% get the sender_death from GM. + %% get the sender_death from GM as well as a DOWN notification. {ok, _TRef} = timer:apply_after( ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue, [self(), rabbit_mirror_queue_master, Fun]), ok. +forget_sender(_, running) -> false; +forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. + +%% Record and process lifetime events from channels. Forget all about a channel +%% only when down notifications are received from both the channel and from gm. +maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ, + msg_id_status = MS, + known_senders = KS }) -> + case dict:find(ChPid, SQ) of + error -> + State; + {ok, {MQ, PendCh, ChStateRecord}} -> + case forget_sender(ChState, ChStateRecord) of + true -> + credit_flow:peer_down(ChPid), + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = lists:foldl( + fun dict:erase/2, + MS, sets:to_list(PendCh)), + known_senders = pmon:demonitor(ChPid, KS) }; + false -> + SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ), + State #state { sender_queues = SQ1 } + end + end. + maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, @@ -669,9 +696,9 @@ maybe_enqueue_message( %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), MQ1 = queue:in(Delivery, MQ), - SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ), State1 #state { sender_queues = SQ1 }; {ok, Status} -> MS1 = send_or_record_confirm( @@ -683,7 +710,7 @@ maybe_enqueue_message( get_sender_queue(ChPid, SQ) -> case dict:find(ChPid, SQ) of - error -> {queue:new(), sets:new()}; + error -> {queue:new(), sets:new(), running}; {ok, Val} -> Val end. @@ -691,19 +718,20 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> case dict:find(ChPid, SQ) of error -> SQ; - {ok, {MQ, PendingCh}} -> - dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) + {ok, {MQ, PendingCh, ChState}} -> + dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, + SQ) end. publish_or_discard(Status, ChPid, MsgId, State = #state { sender_queues = SQ, msg_id_status = MS }) -> %% We really are going to do the publish/discard right now, even %% though we may not have seen it directly from the channel. But - %% we cannot issues confirms until the latter has happened. So we + %% we cannot issue confirms until the latter has happened. So we %% need to keep track of the MsgId and its confirmation status in %% the meantime. State1 = ensure_monitoring(ChPid, State), - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> @@ -723,7 +751,7 @@ publish_or_discard(Status, ChPid, MsgId, %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ), State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. @@ -784,25 +812,13 @@ process_instruction({requeue, MsgIds}, {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, - State = #state { sender_queues = SQ, - msg_id_status = MS, - known_senders = KS }) -> + State = #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a message %% from it. In this case we just want to avoid doing work if we %% never got any messages. {ok, case pmon:is_monitored(ChPid, KS) of false -> State; - true -> MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - credit_flow:peer_down(ChPid), - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = pmon:demonitor(ChPid, KS) } + true -> maybe_forget_sender(ChPid, down_from_gm, State) end}; process_instruction({depth, Depth}, State = #state { backing_queue = BQ, |