diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-26 18:53:15 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-06-26 18:53:15 +0100 |
commit | 1a996146d8d99d1ebe2396348a1bed8a0ec2ffc5 (patch) | |
tree | 0cce03e9dc133347b72a0fa56da10ba26bd72b3c | |
parent | a07a3fa48f73cb86c2afd246bbeec35177a455c5 (diff) | |
download | rabbitmq-server-1a996146d8d99d1ebe2396348a1bed8a0ec2ffc5.tar.gz |
Master responds to length_requests with length broadcast; slaves can determine their synchronised state correctly.
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 37 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 65 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 31 |
3 files changed, 85 insertions, 48 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index ab0b2362..b70761ea 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_coordinator). --export([start_link/3, get_gm/1, ensure_monitoring/2]). +-export([start_link/4, get_gm/1, ensure_monitoring/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,7 +32,8 @@ -record(state, { q, gm, monitors, - death_fun + death_fun, + length_fun }). -define(ONE_SECOND, 1000). @@ -308,8 +309,8 @@ %% %%---------------------------------------------------------------------------- -start_link(Queue, GM, DeathFun) -> - gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []). +start_link(Queue, GM, DeathFun, LengthFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). @@ -321,7 +322,7 @@ ensure_monitoring(CPid, Pids) -> %% gen_server %% --------------------------------------------------------------------------- -init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> +init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> GM1 = case GM of undefined -> {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -335,10 +336,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> end, {ok, _TRef} = timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), - {ok, #state { q = Q, - gm = GM1, - monitors = dict:new(), - death_fun = DeathFun }, + {ok, #state { q = Q, + gm = GM1, + monitors = dict:new(), + death_fun = DeathFun, + length_fun = LengthFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -358,8 +360,8 @@ handle_cast({gm_deaths, Deaths}, {stop, normal, State} end; -handle_cast(request_length, State) -> - %% TODO: something +handle_cast(request_length, State = #state { length_fun = LengthFun }) -> + ok = LengthFun(), noreply(State); handle_cast({ensure_monitoring, Pids}, @@ -376,13 +378,12 @@ handle_cast({ensure_monitoring, Pids}, handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Monitors, - death_fun = Fun }) -> - noreply( - case dict:is_key(Pid, Monitors) of - false -> State; - true -> ok = Fun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } - end); + death_fun = DeathFun }) -> + noreply(case dict:is_key(Pid, Monitors) of + false -> State; + true -> ok = DeathFun(Pid), + State #state { monitors = dict:erase(Pid, Monitors) } + end); handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 463b8cfb..7d2b7e44 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6, sender_death_fun/0]). +-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). -behaviour(rabbit_backing_queue). @@ -59,22 +59,10 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -sender_death_fun() -> - Self = self(), - fun (DeadPid) -> - rabbit_amqqueue:run_backing_queue_async( - Self, ?MODULE, - fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> - ok = gm:broadcast(GM, {sender_death, DeadPid}), - KS1 = sets:del_element(DeadPid, KS), - State #state { known_senders = KS1 } - end) - end. - init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, AsyncCallback, SyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun()), + Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), MNodes1 = (case MNodes of @@ -85,6 +73,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), + ok = gm:broadcast(GM, {length, BQ:length(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -95,17 +84,6 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, ack_msg_id = dict:new(), known_senders = sets:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> - #state { gm = GM, - coordinator = CPid, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = BQ:len(BQS), - seen_status = SeenStatus, - confirmed = [], - ack_msg_id = dict:new(), - known_senders = sets:from_list(KS) }. - terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly @@ -365,6 +343,43 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, State end. +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> + ok = gm:broadcast(GM, {length, BQ:length(BQS)}), + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = BQ:len(BQS), + seen_status = SeenStatus, + confirmed = [], + ack_msg_id = dict:new(), + known_senders = sets:from_list(KS) }. + +sender_death_fun() -> + Self = self(), + fun (DeadPid) -> + rabbit_amqqueue:run_backing_queue_async( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> + ok = gm:broadcast(GM, {sender_death, DeadPid}), + KS1 = sets:del_element(DeadPid, KS), + State #state { known_senders = KS1 } + end) + end. + +length_fun() -> + Self = self(), + fun () -> + rabbit_amqqueue:run_backing_queue_async( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {length, BQ:length(BQS)}), + State + end) + end. + maybe_store_acktag(undefined, _MsgId, AM) -> AM; maybe_store_acktag(AckTag, MsgId, AM) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8a365e6a..f4c950a9 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -64,7 +64,9 @@ ack_num, msg_id_status, - known_senders + known_senders, + + synchronised }). start_link(Q) -> @@ -117,7 +119,9 @@ init([#amqqueue { name = QueueName } = Q]) -> ack_num = 0, msg_id_status = dict:new(), - known_senders = dict:new() + known_senders = dict:new(), + + synchronised = false }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -419,7 +423,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), + Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), + rabbit_mirror_queue_master:length_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), @@ -777,7 +782,7 @@ process_instruction({set_length, Length}, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, - {ok, case ToDrop > 0 of + {ok, case ToDrop >= 0 of true -> BQS1 = lists:foldl( fun (const, BQSN) -> @@ -785,7 +790,8 @@ process_instruction({set_length, Length}, BQSN1} = BQ:fetch(false, BQSN), BQSN1 end, BQS, lists:duplicate(ToDrop, const)), - State #state { backing_queue_state = BQS1 }; + set_synchronised( + true, State #state { backing_queue_state = BQS1 }); false -> State end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, @@ -798,6 +804,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); + Other when Other + 1 =:= Remaining -> + set_synchronised(true, State); Other when Other < Remaining -> %% we must be shorter than the master State @@ -850,6 +858,10 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = dict:erase(ChPid, KS) } end}; +process_instruction({length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -877,3 +889,12 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, ack_num = Num }) -> State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. + +%% We intentionally leave out the head where a slave becomes +%% unsynchronised: we assert that can never happen. +set_synchronised(true, State = #state { synchronised = false }) -> + State #state { synchronised = true }; +set_synchronised(true, State) -> + State; +set_synchronised(false, State = #state { synchronised = false }) -> + State. |