diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 190 |
1 files changed, 129 insertions, 61 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b38a8967..43962491 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -33,11 +33,11 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2]). +-export([start_link/1, set_maximum_since_use/2, info/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, prioritise_info/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -45,8 +45,28 @@ -behaviour(gm). -include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + -include("gm_specs.hrl"). +-ifdef(use_specs). +%% Shut dialyzer up +-spec(promote_me/2 :: (_, _) -> no_return()). +-endif. + +%%---------------------------------------------------------------------------- + + +-define(CREATION_EVENT_KEYS, + [pid, + name, + master_pid, + is_synchronised + ]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS). + -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(DEATH_TIMEOUT, 20000). %% 20 seconds @@ -64,7 +84,9 @@ ack_num, msg_id_status, - known_senders + known_senders, + + synchronised }). start_link(Q) -> @@ -73,6 +95,9 @@ start_link(Q) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +info(QPid) -> + gen_server2:call(QPid, info, infinity). + init([#amqqueue { name = QueueName } = Q]) -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -89,33 +114,38 @@ init([#amqqueue { name = QueueName } = Q]) -> %% ASSERTION [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], MPids1 = MPids ++ [Self], - mnesia:write(rabbit_queue, - Q1 #amqqueue { slave_pids = MPids1 }, - write), + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), {ok, QPid} end), erlang:monitor(process, MPid), ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), + rabbit_amqqueue, set_maximum_since_use, [Self]), ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), BQS = bq_init(BQ, Q, false), - {ok, #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new() - }, hibernate, + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false + }, + rabbit_event:notify(queue_slave_created, + infos(?CREATION_EVENT_KEYS, State)), + ok = gm:broadcast(GM, request_length), + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> @@ -145,29 +175,32 @@ handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, gm = GM, master_pid = MPid }) -> - rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - rabbit_misc:pid_to_string(self()), - [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), %% The GM has told us about deaths, which means we're not going to %% receive any more messages from GM case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid} when node(Pid) =:= node(MPid) -> - %% master hasn't changed - reply(ok, State); - {ok, Pid} when node(Pid) =:= node() -> - %% we've become master - promote_me(From, State); - {ok, Pid} -> - %% master has changed to not us. - gen_server2:reply(From, ok), - erlang:monitor(process, Pid), - ok = gm:broadcast(GM, heartbeat), - noreply(State #state { master_pid = Pid }); {error, not_found} -> gen_server2:reply(From, ok), - {stop, normal, State} - end. + {stop, normal, State}; + {ok, Pid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, + DeadPids), + if node(Pid) =:= node(MPid) -> + %% master hasn't changed + reply(ok, State); + node(Pid) =:= node() -> + %% we've become master + promote_me(From, State); + true -> + %% master has changed to not us. + gen_server2:reply(From, ok), + erlang:monitor(process, Pid), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_pid = Pid }) + end + end; + +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -187,9 +220,9 @@ handle_cast({set_ram_duration_target, Duration}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - noreply(State #state { backing_queue_state = BQS1 }); + noreply(State #state { backing_queue_state = BQS1 }). -handle_cast(update_ram_duration, +handle_info(update_ram_duration, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), @@ -199,9 +232,9 @@ handle_cast(update_ram_duration, noreply(State #state { rate_timer_ref = just_measured, backing_queue_state = BQS2 }); -handle_cast(sync_timeout, State) -> +handle_info(sync_timeout, State) -> noreply(backing_queue_timeout( - State #state { sync_timer_ref = undefined })). + State #state { sync_timer_ref = undefined })); handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); @@ -260,22 +293,28 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _State) -> case Msg of + info -> 9; {gm_deaths, _Deaths} -> 5; _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; {gm, _Msg} -> 5; {post_commit, _Txn, _AckTags} -> 4; _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + sync_timeout -> 6; + _ -> 0 + end. + %% --------------------------------------------------------------------------- %% GM %% --------------------------------------------------------------------------- @@ -291,6 +330,9 @@ members_changed([SPid], _Births, Deaths) -> handle_msg([_SPid], _From, heartbeat) -> ok; +handle_msg([_SPid], _From, request_length) -> + %% This is only of value to the master + ok; handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> %% This is only of value to the master ok; @@ -315,6 +357,14 @@ inform_deaths(SPid, Deaths) -> %% Others %% --------------------------------------------------------------------------- +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _State) -> self(); +i(name, #state { q = #amqqueue { name = Name } }) -> Name; +i(master_pid, #state { master_pid = MPid }) -> MPid; +i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(Item, _State) -> throw({bad_argument, Item}). + bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, @@ -380,7 +430,7 @@ gb_trees_cons(Key, Value, Tree) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. -promote_me(From, #state { q = Q, +promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, backing_queue_state = BQS, @@ -389,12 +439,14 @@ promote_me(From, #state { q = Q, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> + rabbit_event:notify(queue_slave_promoted, [{pid, self()}, + {name, QName}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", - [rabbit_misc:rs(Q #amqqueue.name), - rabbit_misc:pid_to_string(self())]), + [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), + Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), + rabbit_mirror_queue_master:length_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), @@ -516,8 +568,7 @@ backing_queue_timeout(State = #state { backing_queue = BQ }) -> run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), State #state { sync_timer_ref = TRef }; ensure_sync_timer(State) -> State. @@ -525,14 +576,12 @@ ensure_sync_timer(State) -> stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> State; stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { sync_timer_ref = undefined }. ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), + TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), State #state { rate_timer_ref = TRef }; ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> State #state { rate_timer_ref = undefined }; @@ -544,7 +593,7 @@ stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> State #state { rate_timer_ref = undefined }; stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { rate_timer_ref = undefined }. ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> @@ -748,7 +797,7 @@ process_instruction({set_length, Length}, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, - {ok, case ToDrop > 0 of + {ok, case ToDrop >= 0 of true -> BQS1 = lists:foldl( fun (const, BQSN) -> @@ -756,7 +805,8 @@ process_instruction({set_length, Length}, BQSN1} = BQ:fetch(false, BQSN), BQSN1 end, BQS, lists:duplicate(ToDrop, const)), - State #state { backing_queue_state = BQS1 }; + set_synchronised( + true, State #state { backing_queue_state = BQS1 }); false -> State end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, @@ -769,6 +819,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); + Other when Other + 1 =:= Remaining -> + set_synchronised(true, State); Other when Other < Remaining -> %% we must be shorter than the master State @@ -821,6 +873,10 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = dict:erase(ChPid, KS) } end}; +process_instruction({length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -848,3 +904,15 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, ack_num = Num }) -> State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. + +%% We intentionally leave out the head where a slave becomes +%% unsynchronised: we assert that can never happen. +set_synchronised(true, State = #state { q = #amqqueue { name = QName }, + synchronised = false }) -> + rabbit_event:notify(queue_slave_synchronised, [{pid, self()}, + {name, QName}]), + State #state { synchronised = true }; +set_synchronised(true, State) -> + State; +set_synchronised(false, State = #state { synchronised = false }) -> + State. |