diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-07-07 10:34:59 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-07-07 10:34:59 +0100 |
commit | 1e72197c5720adb868498a119edd78a3444e3198 (patch) | |
tree | 320a9c46d188c476e9d0606c29593e573a44a98d | |
parent | 6757adac1ca87455fb9c89ecf1219f5d0ae645ab (diff) | |
parent | bf7a82d355b9b3bed4d16f40ac5a3fe1494a4356 (diff) | |
download | rabbitmq-server-1e72197c5720adb868498a119edd78a3444e3198.tar.gz |
Merging default into bug24130
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 69 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 28 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 44 |
3 files changed, 105 insertions, 36 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 57f6ca8b..48522af6 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_coordinator). --export([start_link/3, get_gm/1, ensure_monitoring/2]). +-export([start_link/4, get_gm/1, ensure_monitoring/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,15 +32,17 @@ -record(state, { q, gm, monitors, - death_fun + death_fun, + length_fun }). -define(ONE_SECOND, 1000). -ifdef(use_specs). --spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined', - rabbit_mirror_queue_master:death_fun()) -> +-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun(), + rabbit_mirror_queue_master:length_fun()) -> rabbit_types:ok_pid_or_error()). -spec(get_gm/1 :: (pid()) -> pid()). -spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). @@ -138,9 +140,28 @@ %% state of the master. The detection of the sync-status of a slave is %% done entirely based on length: if the slave and the master both %% agree on the length of the queue after the fetch of the head of the -%% queue, then the queues must be in sync. The only other possibility -%% is that the slave's queue is shorter, and thus the fetch should be -%% ignored. +%% queue (or a 'set_length' results in a slave having to drop some +%% messages from the head of its queue), then the queues must be in +%% sync. The only other possibility is that the slave's queue is +%% shorter, and thus the fetch should be ignored. In case slaves are +%% joined to an empty queue which only goes on to receive publishes, +%% they start by asking the master to broadcast its length. This is +%% enough for slaves to always be able to work out when their head +%% does not differ from the master (and is much simpler and cheaper +%% than getting the master to hang on to the guid of the msg at the +%% head of its queue). When a slave is promoted to a master, it +%% unilaterally broadcasts its length, in order to solve the problem +%% of length requests from new slaves being unanswered by a dead +%% master. +%% +%% Obviously, due to the async nature of communication across gm, the +%% slaves can fall behind. This does not matter from a sync pov: if +%% they fall behind and the master dies then a) no publishes are lost +%% because all publishes go to all mirrors anyway; b) the worst that +%% happens is that acks get lost and so messages come back to +%% life. This is no worse than normal given you never get confirmation +%% that an ack has been received (not quite true with QoS-prefetch, +%% but close enough for jazz). %% %% Because acktags are issued by the bq independently, and because %% there is no requirement for the master and all slaves to use the @@ -318,8 +339,8 @@ %% %%---------------------------------------------------------------------------- -start_link(Queue, GM, DeathFun) -> - gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []). +start_link(Queue, GM, DeathFun, LengthFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). @@ -331,7 +352,7 @@ ensure_monitoring(CPid, Pids) -> %% gen_server %% --------------------------------------------------------------------------- -init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> +init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> GM1 = case GM of undefined -> {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -345,10 +366,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> end, {ok, _TRef} = timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), - {ok, #state { q = Q, - gm = GM1, - monitors = dict:new(), - death_fun = DeathFun }, + {ok, #state { q = Q, + gm = GM1, + monitors = dict:new(), + death_fun = DeathFun, + length_fun = LengthFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -368,6 +390,10 @@ handle_cast({gm_deaths, Deaths}, {stop, normal, State} end; +handle_cast(request_length, State = #state { length_fun = LengthFun }) -> + ok = LengthFun(), + noreply(State); + handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Monitors }) -> Monitors1 = @@ -382,13 +408,12 @@ handle_cast({ensure_monitoring, Pids}, handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Monitors, - death_fun = Fun }) -> - noreply( - case dict:is_key(Pid, Monitors) of - false -> State; - true -> ok = Fun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } - end); + death_fun = DeathFun }) -> + noreply(case dict:is_key(Pid, Monitors) of + false -> State; + true -> ok = DeathFun(Pid), + State #state { monitors = dict:erase(Pid, Monitors) } + end); handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -418,6 +443,8 @@ members_changed([CPid], _Births, Deaths) -> handle_msg([_CPid], _From, heartbeat) -> ok; +handle_msg([CPid], _From, request_length = Msg) -> + ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([_CPid], _From, _Msg) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b090ebe8..9578026e 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6, sender_death_fun/0]). +-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). -behaviour(rabbit_backing_queue). @@ -45,9 +45,10 @@ -ifdef(use_specs). --export_type([death_fun/0]). +-export_type([death_fun/0, length_fun/0]). -type(death_fun() :: fun ((pid()) -> 'ok')). +-type(length_fun() :: fun (() -> 'ok')). -type(master_state() :: #state { gm :: pid(), coordinator :: pid(), backing_queue :: atom(), @@ -62,6 +63,7 @@ -spec(promote_backing_queue_state/6 :: (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). +-spec(length_fun/0 :: () -> length_fun()). -endif. @@ -84,7 +86,7 @@ stop() -> init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, AsyncCallback, SyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun()), + Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), MNodes1 = (case MNodes of @@ -95,6 +97,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -369,11 +372,13 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, %% --------------------------------------------------------------------------- promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> + Len = BQ:len(BQS), + ok = gm:broadcast(GM, {length, Len}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS), + set_delivered = Len, seen_status = SeenStatus, confirmed = [], ack_msg_id = dict:new(), @@ -391,9 +396,18 @@ sender_death_fun() -> end) end. -%% --------------------------------------------------------------------------- -%% Helpers -%% --------------------------------------------------------------------------- +length_fun() -> + Self = self(), + fun () -> + rabbit_amqqueue:run_backing_queue_async( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + State + end) + end. maybe_store_acktag(undefined, _MsgId, AM) -> AM; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 55d61d41..a4a40a8c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -64,7 +64,9 @@ ack_num, msg_id_status, - known_senders + known_senders, + + synchronised }). start_link(Q) -> @@ -101,6 +103,9 @@ init([#amqqueue { name = QueueName } = Q]) -> self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), {ok, BQ} = application:get_env(backing_queue_module), BQS = bq_init(BQ, Q, false), + rabbit_event:notify(queue_slave_created, + [{name, QueueName}, {pid, self()}, {master_pid, MPid}]), + ok = gm:broadcast(GM, request_length), {ok, #state { q = Q, gm = GM, master_pid = MPid, @@ -114,7 +119,9 @@ init([#amqqueue { name = QueueName } = Q]) -> ack_num = 0, msg_id_status = dict:new(), - known_senders = dict:new() + known_senders = dict:new(), + + synchronised = false }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -303,6 +310,9 @@ members_changed([SPid], _Births, Deaths) -> handle_msg([_SPid], _From, heartbeat) -> ok; +handle_msg([_SPid], _From, request_length) -> + %% This is only of value to the master + ok; handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> %% This is only of value to the master ok; @@ -399,7 +409,7 @@ gb_trees_cons(Key, Value, Tree) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. -promote_me(From, #state { q = Q, +promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, backing_queue_state = BQS, @@ -408,12 +418,13 @@ promote_me(From, #state { q = Q, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> + rabbit_event:notify(queue_slave_promoted, [{name, QName}, {pid, self()}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", - [rabbit_misc:rs(Q #amqqueue.name), - rabbit_misc:pid_to_string(self())]), + [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), + Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), + rabbit_mirror_queue_master:length_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), @@ -771,7 +782,7 @@ process_instruction({set_length, Length}, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, - {ok, case ToDrop > 0 of + {ok, case ToDrop >= 0 of true -> BQS1 = lists:foldl( fun (const, BQSN) -> @@ -779,7 +790,8 @@ process_instruction({set_length, Length}, BQSN1} = BQ:fetch(false, BQSN), BQSN1 end, BQS, lists:duplicate(ToDrop, const)), - State #state { backing_queue_state = BQS1 }; + set_synchronised( + true, State #state { backing_queue_state = BQS1 }); false -> State end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, @@ -792,6 +804,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); + Other when Other + 1 =:= Remaining -> + set_synchronised(true, State); Other when Other < Remaining -> %% we must be shorter than the master State @@ -844,6 +858,10 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = dict:erase(ChPid, KS) } end}; +process_instruction({length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -871,3 +889,13 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, ack_num = Num }) -> State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. + +%% We intentionally leave out the head where a slave becomes +%% unsynchronised: we assert that can never happen. +set_synchronised(true, State = #state { synchronised = false }) -> + rabbit_event:notify(queue_slave_synchronised, [{pid, self()}]), + State #state { synchronised = true }; +set_synchronised(true, State) -> + State; +set_synchronised(false, State = #state { synchronised = false }) -> + State. |