summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl206
1 files changed, 119 insertions, 87 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 6db6ce9d..bcd4861a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,58 +17,60 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
- dropwhile/3, set_ram_duration_target/2, ram_duration/1,
+ purge/1, purge_acks/1, publish/5, publish_delivered/4,
+ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, fold/3]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
--export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
+-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
-behaviour(rabbit_backing_queue).
-include("rabbit.hrl").
--record(state, { gm,
+-record(state, { name,
+ gm,
coordinator,
backing_queue,
backing_queue_state,
- set_delivered,
seen_status,
confirmed,
- ack_msg_id,
known_senders
}).
-ifdef(use_specs).
--export_type([death_fun/0, depth_fun/0]).
+-export_type([death_fun/0, depth_fun/0, stats_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
-type(depth_fun() :: fun (() -> 'ok')).
--type(master_state() :: #state { gm :: pid(),
+-type(stats_fun() :: fun ((any()) -> 'ok')).
+-type(master_state() :: #state { name :: rabbit_amqqueue:name(),
+ gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- set_delivered :: non_neg_integer(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
- ack_msg_id :: dict(),
known_senders :: set()
}).
--spec(promote_backing_queue_state/7 ::
- (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) ->
- master_state()).
+-spec(promote_backing_queue_state/8 ::
+ (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], dict(),
+ [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
-spec(depth_fun/0 :: () -> depth_fun()).
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
master_state()).
-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
+-spec(sync_mirrors/3 :: (stats_fun(), stats_fun(), master_state()) ->
+ {'ok', master_state()} | {stop, any(), master_state()}).
-endif.
@@ -109,14 +111,13 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0,
seen_status = dict:new(),
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:new() }.
stop_mirroring(State = #state { coordinator = CPid,
@@ -126,6 +127,31 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
+sync_mirrors(HandleInfo, EmitStats,
+ State = #state { name = QName,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Log = fun (Fmt, Params) ->
+ rabbit_log:info("Synchronising ~s: " ++ Fmt ++ "~n",
+ [rabbit_misc:rs(QName) | Params])
+ end,
+ Log("~p messages to synchronise", [BQ:len(BQS)]),
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ Ref = make_ref(),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
+ gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
+ S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
+ case rabbit_mirror_queue_sync:master_go(
+ Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of
+ {shutdown, R, BQS1} -> {stop, R, S(BQS1)};
+ {sync_died, R, BQS1} -> Log("~p", [R]),
+ {ok, S(BQS1)};
+ {already_synced, BQS1} -> {ok, S(BQS1)};
+ {ok, BQS1} -> Log("complete", []),
+ {ok, S(BQS1)}
+ end.
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -135,8 +161,8 @@ terminate({shutdown, dropped} = Reason,
%% in without this node being restarted. Thus we must do the full
%% blown delete_and_terminate now, but only locally: we do not
%% broadcast delete_and_terminate.
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 };
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
+
terminate(Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -147,20 +173,16 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
stop_all_slaves(Reason, State),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
-
-stop_all_slaves(Reason, #state{gm = GM}) ->
- Info = gm:info(GM),
- Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
- node(Pid) =/= node()],
- MRefs = [erlang:monitor(process, S) || S <- Slaves],
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
+
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ MRefs = [erlang:monitor(process, SPid) || SPid <- SPids],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
- QName = proplists:get_value(group_name, Info),
rabbit_misc:execute_mnesia_transaction(
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
@@ -174,30 +196,29 @@ purge(State = #state { gm = GM,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
- {Count, State #state { backing_queue_state = BQS1,
- set_delivered = 0 }}.
+ {Count, State #state { backing_queue_state = BQS1 }}.
+
+purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
discard(MsgId, ChPid, State = #state { gm = GM,
@@ -220,22 +241,17 @@ discard(MsgId, ChPid, State = #state { gm = GM,
State
end.
-dropwhile(Pred, AckRequired,
- State = #state{gm = GM,
- backing_queue = BQ,
- set_delivered = SetDelivered,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
- Len1 = BQ:len(BQS1),
- Dropped = Len - Len1,
- case Dropped of
- 0 -> ok;
- _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
- end,
- SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Next, Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, BQS1} = BQ:dropwhile(Pred, BQS),
+ {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
+
+fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
+ {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -267,43 +283,33 @@ drain_confirmed(State = #state { backing_queue = BQ,
seen_status = SS1,
confirmed = [] }}.
-fetch(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+fetch(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
- Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- SetDelivered1 = lists:max([0, SetDelivered - 1]),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1,
- ack_msg_id = AM1 }}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1)
+ end}.
+
+drop(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:drop(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, AckTag} -> drop_one(AckTag, State1)
+ end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
- {MsgIds, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
-
-fold(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -312,6 +318,16 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+ackfold(MsgFun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
+ {Acc1, State #state { backing_queue_state = BQS1 }}.
+
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).
@@ -399,20 +415,19 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Other exported functions
%% ---------------------------------------------------------------------------
-promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
+promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
Len = BQ:len(BQS1),
Depth = BQ:depth(BQS1),
true = Len == Depth, %% ASSERTION: everything must have been requeued
ok = gm:broadcast(GM, {depth, Depth}),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- set_delivered = Len,
- seen_status = SeenStatus,
+ seen_status = Seen,
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
sender_death_fun() ->
@@ -440,8 +455,25 @@ depth_fun() ->
end)
end.
-maybe_store_acktag(undefined, _MsgId, AM) -> AM;
-maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+drop_one(AckTag, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
+ State.
+
+drop(PrevLen, AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ case PrevLen - Len of
+ 0 -> State;
+ Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
+ State
+ end.
ensure_monitoring(ChPid, State = #state { coordinator = CPid,
known_senders = KS }) ->