diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 116 |
1 files changed, 59 insertions, 57 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 222457c6..69a3be2b 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -28,7 +28,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -37,18 +37,10 @@ -include("rabbit.hrl"). -%%---------------------------------------------------------------------------- - -include("gm_specs.hrl"). --ifdef(use_specs). -%% Shut dialyzer up --spec(promote_me/2 :: (_, _) -> no_return()). --endif. - %%---------------------------------------------------------------------------- - -define(CREATION_EVENT_KEYS, [pid, name, @@ -79,6 +71,8 @@ depth_delta }). +%%---------------------------------------------------------------------------- + start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> @@ -222,6 +216,31 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, end, noreply(maybe_enqueue_message(Delivery, State)); +handle_cast({sync_start, Ref, Syncer}, + State = #state { depth_delta = DD, + backing_queue = BQ, + backing_queue_state = BQS }) -> + State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), + S = fun({MA, TRefN, BQSN}) -> + State1#state{depth_delta = undefined, + msg_id_ack = dict:from_list(MA), + rate_timer_ref = TRefN, + backing_queue_state = BQSN} + end, + case rabbit_mirror_queue_sync:slave( + DD, Ref, TRef, Syncer, BQ, BQS, + fun (BQN, BQSN) -> + BQSN1 = update_ram_duration(BQN, BQSN), + TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), + {TRefN, BQSN1} + end) of + denied -> noreply(State1); + {ok, Res} -> noreply(set_delta(0, S(Res))); + {failed, Res} -> noreply(S(Res)); + {stop, Reason, Res} -> {stop, Reason, S(Res)} + end; + handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); @@ -232,17 +251,13 @@ handle_cast({set_ram_duration_target, Duration}, BQS1 = BQ:set_ram_duration_target(Duration, BQS), noreply(State #state { backing_queue_state = BQS1 }). -handle_info(update_ram_duration, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), +handle_info(update_ram_duration, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = update_ram_duration(BQ, BQS), %% Don't call noreply/1, we don't want to set timers {State1, Timeout} = next_state(State #state { rate_timer_ref = undefined, - backing_queue_state = BQS2 }), + backing_queue_state = BQS1 }), {noreply, State1, Timeout}; handle_info(sync_timeout, State) -> @@ -321,7 +336,6 @@ prioritise_cast(Msg, _State) -> {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; {gm, _Msg} -> 5; - {post_commit, _Txn, _AckTags} -> 4; _ -> 0 end. @@ -332,6 +346,8 @@ prioritise_info(Msg, _State) -> _ -> 0 end. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %% --------------------------------------------------------------------------- %% GM %% --------------------------------------------------------------------------- @@ -359,6 +375,11 @@ handle_msg([_SPid], _From, process_death) -> handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; +handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> + case lists:member(SPid, SPids) of + true -> gen_server2:cast(SPid, {sync_start, Ref, Syncer}); + false -> ok + end; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). @@ -444,6 +465,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. +-ifdef(use_specs). +-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()). +-endif. promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, @@ -530,7 +554,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, AckTags, SS, MPids), + QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -564,30 +588,18 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> backing_queue_timeout(State = #state { backing_queue = BQ }) -> run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). -ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> - TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), - State #state { sync_timer_ref = TRef }; ensure_sync_timer(State) -> - State. + rabbit_misc:ensure_timer(State, #state.sync_timer_ref, + ?SYNC_INTERVAL, sync_timeout). + +stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #state.sync_timer_ref). -stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> - State; -stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> - erlang:cancel_timer(TRef), - State #state { sync_timer_ref = undefined }. - -ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> - TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, - self(), update_ram_duration), - State #state { rate_timer_ref = TRef }; ensure_rate_timer(State) -> - State. + rabbit_misc:ensure_timer(State, #state.rate_timer_ref, + ?RAM_DURATION_UPDATE_INTERVAL, + update_ram_duration). -stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> - State; -stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> - erlang:cancel_timer(TRef), - State #state { rate_timer_ref = undefined }. +stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. @@ -699,7 +711,7 @@ process_instruction({publish, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({publish_delivered, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> @@ -723,8 +735,7 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State1 = lists:foldl( fun (const, StateN = #state{backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} = - BQ:fetch(AckRequired, BQSN), + {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN), maybe_store_ack( AckRequired, MsgId, AckTag, StateN #state { backing_queue_state = BQSN1 }) @@ -733,21 +744,6 @@ process_instruction({drop, Length, Dropped, AckRequired}, true -> State1; false -> update_delta(ToDrop - Dropped, State1) end}; -process_instruction({fetch, AckRequired, MsgId, Remaining}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }); - _ when QLen =< Remaining andalso AckRequired -> - State; - _ when QLen =< Remaining -> - update_delta(-1, State) - end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -828,6 +824,12 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). +update_ram_duration(BQ, BQS) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQ:set_ram_duration_target(DesiredDuration, BQS1). + record_synchronised(#amqqueue { name = QName }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( |