summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl116
1 files changed, 59 insertions, 57 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 222457c6..69a3be2b 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -28,7 +28,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -37,18 +37,10 @@
-include("rabbit.hrl").
-%%----------------------------------------------------------------------------
-
-include("gm_specs.hrl").
--ifdef(use_specs).
-%% Shut dialyzer up
--spec(promote_me/2 :: (_, _) -> no_return()).
--endif.
-
%%----------------------------------------------------------------------------
-
-define(CREATION_EVENT_KEYS,
[pid,
name,
@@ -79,6 +71,8 @@
depth_delta
}).
+%%----------------------------------------------------------------------------
+
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
@@ -222,6 +216,31 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
end,
noreply(maybe_enqueue_message(Delivery, State));
+handle_cast({sync_start, Ref, Syncer},
+ State = #state { depth_delta = DD,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
+ S = fun({MA, TRefN, BQSN}) ->
+ State1#state{depth_delta = undefined,
+ msg_id_ack = dict:from_list(MA),
+ rate_timer_ref = TRefN,
+ backing_queue_state = BQSN}
+ end,
+ case rabbit_mirror_queue_sync:slave(
+ DD, Ref, TRef, Syncer, BQ, BQS,
+ fun (BQN, BQSN) ->
+ BQSN1 = update_ram_duration(BQN, BQSN),
+ TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
+ {TRefN, BQSN1}
+ end) of
+ denied -> noreply(State1);
+ {ok, Res} -> noreply(set_delta(0, S(Res)));
+ {failed, Res} -> noreply(S(Res));
+ {stop, Reason, Res} -> {stop, Reason, S(Res)}
+ end;
+
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
@@ -232,17 +251,13 @@ handle_cast({set_ram_duration_target, Duration},
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State #state { backing_queue_state = BQS1 }).
-handle_info(update_ram_duration,
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+handle_info(update_ram_duration, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = update_ram_duration(BQ, BQS),
%% Don't call noreply/1, we don't want to set timers
{State1, Timeout} = next_state(State #state {
rate_timer_ref = undefined,
- backing_queue_state = BQS2 }),
+ backing_queue_state = BQS1 }),
{noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
@@ -321,7 +336,6 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{gm, _Msg} -> 5;
- {post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
@@ -332,6 +346,8 @@ prioritise_info(Msg, _State) ->
_ -> 0
end.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
@@ -359,6 +375,11 @@ handle_msg([_SPid], _From, process_death) ->
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
+handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
+ case lists:member(SPid, SPids) of
+ true -> gen_server2:cast(SPid, {sync_start, Ref, Syncer});
+ false -> ok
+ end;
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
@@ -444,6 +465,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
+-ifdef(use_specs).
+-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()).
+-endif.
promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
@@ -530,7 +554,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, AckTags, SS, MPids),
+ QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -564,30 +588,18 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
-ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
- State #state { sync_timer_ref = TRef };
ensure_sync_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
+ ?SYNC_INTERVAL, sync_timeout).
+
+stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #state.sync_timer_ref).
-stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- State;
-stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
- erlang:cancel_timer(TRef),
- State #state { sync_timer_ref = undefined }.
-
-ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
- self(), update_ram_duration),
- State #state { rate_timer_ref = TRef };
ensure_rate_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #state.rate_timer_ref,
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ update_ram_duration).
-stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- State;
-stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
- erlang:cancel_timer(TRef),
- State #state { rate_timer_ref = undefined }.
+stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
@@ -699,7 +711,7 @@ process_instruction({publish, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
@@ -723,8 +735,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
- BQ:fetch(AckRequired, BQSN),
+ {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
@@ -733,21 +744,6 @@ process_instruction({drop, Length, Dropped, AckRequired},
true -> State1;
false -> update_delta(ToDrop - Dropped, State1)
end};
-process_instruction({fetch, AckRequired, MsgId, Remaining},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- _ when QLen =< Remaining andalso AckRequired ->
- State;
- _ when QLen =< Remaining ->
- update_delta(-1, State)
- end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -828,6 +824,12 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+update_ram_duration(BQ, BQS) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQ:set_ram_duration_target(DesiredDuration, BQS1).
+
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(