diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 576 |
1 files changed, 252 insertions, 324 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 03fafc3e..1ba1420f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -19,17 +19,8 @@ %% For general documentation of HA design, see %% rabbit_mirror_queue_coordinator %% -%% We join the GM group before we add ourselves to the amqqueue -%% record. As a result: -%% 1. We can receive msgs from GM that correspond to messages we will -%% never receive from publishers. -%% 2. When we receive a message from publishers, we must receive a -%% message from the GM group for it. -%% 3. However, that instruction from the GM group can arrive either -%% before or after the actual message. We need to be able to -%% distinguish between GM instructions arriving early, and case (1) -%% above. -%% +%% We receive messages from GM and from publishers, and the gm +%% messages can arrive either before or after the 'actual' message. %% All instructions from the GM group must be processed in the order %% in which they're received. @@ -73,63 +64,59 @@ -record(state, { q, gm, - master_pid, backing_queue, backing_queue_state, sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId} msg_id_ack, %% :: MsgId -> AckTag - ack_num, msg_id_status, known_senders, - synchronised + %% Master depth - local depth + depth_delta }). -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -info(QPid) -> - gen_server2:call(QPid, info, infinity). - -init(#amqqueue { name = QueueName } = Q) -> +info(QPid) -> gen_server2:call(QPid, info, infinity). + +init(Q = #amqqueue { name = QName }) -> + %% We join the GM group before we add ourselves to the amqqueue + %% record. As a result: + %% 1. We can receive msgs from GM that correspond to messages we will + %% never receive from publishers. + %% 2. When we receive a message from publishers, we must receive a + %% message from the GM group for it. + %% 3. However, that instruction from the GM group can arrive either + %% before or after the actual message. We need to be able to + %% distinguish between GM instructions arriving early, and case (1) + %% above. + %% + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QName, ?MODULE, [self()], + fun rabbit_misc:execute_mnesia_transaction/1), + receive {joined, GM} -> ok end, Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {new, QPid}; - [SPid] -> true = rabbit_misc:is_process_alive(SPid), - existing - end - end) of - {new, MPid} -> - process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), - receive {joined, GM} -> - ok - end, - erlang:monitor(process, MPid), + fun() -> init_it(Self, GM, Node, QName) end) of + {new, QPid} -> + erlang:monitor(process, QPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), ok = rabbit_memory_monitor:register( Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, + Q1 = Q #amqqueue { pid = QPid }, + BQS = bq_init(BQ, Q1, false), + State = #state { q = Q1, gm = GM, - master_pid = MPid, backing_queue = BQ, backing_queue_state = BQS, rate_timer_ref = undefined, @@ -137,70 +124,83 @@ init(#amqqueue { name = QueueName } = Q) -> sender_queues = dict:new(), msg_id_ack = dict:new(), - ack_num = 0, msg_id_status = dict:new(), known_senders = pmon:new(), - synchronised = false + depth_delta = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), - ok = gm:broadcast(GM, request_length), + ok = gm:broadcast(GM, request_depth), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; + {stale, StalePid} -> + {stop, {stale_master_pid, StalePid}}; + duplicate_live_master -> + {stop, {duplicate_live_master, Node}}; existing -> + gm:leave(GM), ignore end. -handle_call({deliver, Delivery = #delivery { immediate = true }}, - From, State) -> - %% It is safe to reply 'false' here even if a) we've not seen the - %% msg via gm, or b) the master dies before we receive the msg via - %% gm. In the case of (a), we will eventually receive the msg via - %% gm, and it's only the master's result to the channel that is - %% important. In the case of (b), if the master does die and we do - %% get promoted then at that point we have no consumers, thus - %% 'false' is precisely the correct answer. However, we must be - %% careful to _not_ enqueue the message in this case. - - %% Note this is distinct from the case where we receive the msg - %% via gm first, then we're promoted to master, and only then do - %% we receive the msg from the channel. - gen_server2:reply(From, false), %% master may deliver it, not us - noreply(maybe_enqueue_message(Delivery, false, State)); - -handle_call({deliver, Delivery = #delivery { mandatory = true }}, - From, State) -> - gen_server2:reply(From, true), %% amqqueue throws away the result anyway - noreply(maybe_enqueue_message(Delivery, true, State)); +init_it(Self, GM, Node, QName) -> + [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] = + mnesia:read({rabbit_queue, QName}), + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of + [] -> add_slave(Q, Self, GM), + {new, QPid}; + [QPid] -> case rabbit_misc:is_process_alive(QPid) of + true -> duplicate_live_master; + false -> {stale, QPid} + end; + [SPid] -> case rabbit_misc:is_process_alive(SPid) of + true -> existing; + false -> Q1 = Q#amqqueue { + slave_pids = SPids -- [SPid], + gm_pids = [T || T = {_, S} <- GMPids, + S =/= SPid] }, + add_slave(Q1, Self, GM), + {new, QPid} + end + end. + +%% Add to the end, so they are in descending order of age, see +%% rabbit_mirror_queue_misc:promote_slave/1 +add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). + +handle_call({deliver, Delivery, true}, From, State) -> + %% Synchronous, "mandatory" deliver mode. + gen_server2:reply(From, ok), + noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, - State = #state { q = #amqqueue { name = QueueName }, - gm = GM, - master_pid = MPid }) -> - %% The GM has told us about deaths, which means we're not going to - %% receive any more messages from GM - case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) -> + Self = self(), + case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; {ok, Pid, DeadPids} -> - rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, + rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids), - if node(Pid) =:= node(MPid) -> + case Pid of + MPid -> %% master hasn't changed - reply(ok, State); - node(Pid) =:= node() -> + gen_server2:reply(From, ok), + noreply(State); + Self -> %% we've become master - promote_me(From, State); - true -> - %% master has changed to not us. + QueueState = promote_me(From, State), + {become, rabbit_amqqueue_process, QueueState, hibernate}; + _ -> + %% master has changed to not us gen_server2:reply(From, ok), erlang:monitor(process, Pid), - ok = gm:broadcast(GM, heartbeat), - noreply(State #state { master_pid = Pid }) + noreply(State #state { q = Q #amqqueue { pid = Pid } }) end end; @@ -213,13 +213,14 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> - %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. +handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, + State) -> + %% Asynchronous, non-"mandatory", deliver mode. case Flow of flow -> credit_flow:ack(Sender); noflow -> ok end, - noreply(maybe_enqueue_message(Delivery, true, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -249,8 +250,8 @@ handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, - State = #state { gm = GM, master_pid = MPid }) -> - ok = gm:broadcast(GM, {process_death, MPid}), + State = #state { gm = GM, q = #amqqueue { pid = MPid } }) -> + ok = gm:broadcast(GM, process_death), noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> @@ -286,7 +287,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), + Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -332,25 +333,26 @@ prioritise_info(Msg, _State) -> %% GM %% --------------------------------------------------------------------------- -joined([SPid], _Members) -> - SPid ! {joined, self()}, - ok. +joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, []) -> - ok; -members_changed([SPid], _Births, Deaths) -> - inform_deaths(SPid, Deaths). +members_changed([_SPid], _Births, []) -> ok; +members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). -handle_msg([_SPid], _From, heartbeat) -> - ok; -handle_msg([_SPid], _From, request_length) -> +handle_msg([_SPid], _From, request_depth) -> %% This is only of value to the master ok; handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> %% This is only of value to the master ok; -handle_msg([SPid], _From, {process_death, Pid}) -> - inform_deaths(SPid, [Pid]); +handle_msg([_SPid], _From, process_death) -> + %% Since GM is by nature lazy we need to make sure there is some + %% traffic when a master dies, to make sure we get informed of the + %% death. That's all process_death does, create some traffic. We + %% must not take any notice of the master death here since it + %% comes without ordering guarantees - there could still be + %% messages from the master we have yet to receive. When we get + %% members_changed, then there will be no more messages. + ok; handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; @@ -371,8 +373,8 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; -i(master_pid, #state { master_pid = MPid }) -> MPid; -i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid; +i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; i(Item, _State) -> throw({bad_argument, Item}). bq_init(BQ, Q, Recover) -> @@ -390,14 +392,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> - never; -needs_confirming(#delivery { message = #basic_message { - is_persistent = true } }, - #state { q = #amqqueue { durable = true } }) -> - eventually; -needs_confirming(_Delivery, _State) -> - immediately. +send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) -> + MS; +send_or_record_confirm(published, #delivery { sender = ChPid, + msg_seq_no = MsgSeqNo, + message = #basic_message { + id = MsgId, + is_persistent = true } }, + MS, #state { q = #amqqueue { durable = true } }) -> + dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); +send_or_record_confirm(_Status, #delivery { sender = ChPid, + msg_seq_no = MsgSeqNo }, + MS, _State) -> + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), + MS. confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {CMs, MS1} = @@ -409,16 +417,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> %% If it needed confirming, it'll have %% already been done. Acc; - {ok, {published, ChPid}} -> + {ok, published} -> %% Still not seen it from the channel, just %% record that it's been confirmed. - {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)}; + {CMsN, dict:store(MsgId, confirmed, MSN)}; {ok, {published, ChPid, MsgSeqNo}} -> %% Seen from both GM and Channel. Can now %% confirm. {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), dict:erase(MsgId, MSN)}; - {ok, {confirmed, _ChPid}} -> + {ok, confirmed} -> %% It's already been confirmed. This is %% probably it's been both sync'd to disk %% and then delivered and ack'd before we've @@ -442,17 +450,14 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> - rabbit_event:notify(queue_slave_promoted, [{pid, self()}, - {name, QName}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), - rabbit_mirror_queue_master:length_fun()), + rabbit_mirror_queue_master:depth_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), - ok = gm:confirmed_broadcast(GM, heartbeat), %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. @@ -460,8 +465,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but - %% not from gm, and if they're due to be enqueued on promotion - %% then we pass them to the + %% not from gm, and pass them to the %% queue_process:init_with_backing_queue_state to be enqueued. %% %% We also have to requeue messages which are pending acks: the @@ -489,18 +493,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% %% MS contains the following three entry types: %% - %% a) {published, ChPid}: + %% a) published: %% published via gm only; pending arrival of publication from %% channel, maybe pending confirm. %% %% b) {published, ChPid, MsgSeqNo}: %% published via gm and channel; pending confirm. %% - %% c) {confirmed, ChPid}: + %% c) confirmed: %% published via gm only, and confirmed; pending publication %% from channel. %% - %% d) discarded + %% d) discarded: %% seen via gm only as discarded. Pending publication from %% channel %% @@ -517,34 +521,24 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% those messages are then requeued. However, as discussed above, %% this does not affect MS, nor which bits go through to SS in %% Master, or MTC in queue_process. - %% - %% Everything that's in MA gets requeued. Consequently the new - %% master should start with a fresh AM as there are no messages - %% pending acks. - MSList = dict:to_list(MS), - SS = dict:from_list( - [E || E = {_MsgId, discarded} <- MSList] ++ - [{MsgId, Status} - || {MsgId, {Status, _ChPid}} <- MSList, - Status =:= published orelse Status =:= confirmed]), + St = [published, confirmed, discarded], + SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), + AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MPids), - - MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> - gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); - (_, MTC0) -> - MTC0 - end, gb_trees:empty(), MSList), - NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], - AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], + CPid, BQ, BQS, GM, AckTags, SS, MPids), + + MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> + gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + (_Msgid, _Status, MTC0) -> + MTC0 + end, gb_trees:empty(), MS), Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - {Delivery, true} <- queue:to_list(PubQ)], - QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, KS, MTC), - {become, rabbit_amqqueue_process, QueueState, hibernate}. + Delivery <- queue:to_list(PubQ)], + rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS, + MTC). noreply(State) -> {NewState, Timeout} = next_state(State), @@ -560,9 +554,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> confirm_messages(MsgIds, State #state { backing_queue_state = BQS1 })), case BQ:needs_timeout(BQS1) of - false -> {stop_sync_timer(State1), hibernate}; - idle -> {stop_sync_timer(State1), 0 }; - timed -> {ensure_sync_timer(State1), 0 } + false -> {stop_sync_timer(State1), hibernate }; + idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; + timed -> {ensure_sync_timer(State1), 0 } end. backing_queue_timeout(State = #state { backing_queue = BQ }) -> @@ -638,49 +632,22 @@ confirm_sender_death(Pid) -> ok. maybe_enqueue_message( - Delivery = #delivery { message = #basic_message { id = MsgId }, - msg_seq_no = MsgSeqNo, - sender = ChPid }, - EnqueueOnPromotion, + Delivery = #delivery { message = #basic_message { id = MsgId }, + sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; - {ok, {confirmed, ChPid}} -> - %% BQ has confirmed it but we didn't know what the - %% msg_seq_no was at the time. We do now! - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { sender_queues = SQ1, - msg_id_status = dict:erase(MsgId, MS) }; - {ok, {published, ChPid}} -> - %% It was published to the BQ and we didn't know the - %% msg_seq_no so couldn't confirm it at the time. - case needs_confirming(Delivery, State1) of - never -> - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 }; - eventually -> - State1 #state { - msg_id_status = - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 } - end; - {ok, discarded} -> - %% We've already heard from GM that the msg is to be - %% discarded. We won't see this again. + {ok, Status} -> + MS1 = send_or_record_confirm( + Status, Delivery, dict:erase(MsgId, MS), State1), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), + State1 #state { msg_id_status = MS1, sender_queues = SQ1 } end. @@ -698,45 +665,27 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) end. -process_instruction( - {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, - State = #state { sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - msg_id_status = MS }) -> - - %% We really are going to do the publish right now, even though we - %% may not have seen it directly from the channel. As a result, we - %% may know that it needs confirming without knowing its - %% msg_seq_no, which means that we can see the confirmation come - %% back from the backing queue without knowing the msg_seq_no, - %% which means that we're going to have to hang on to the fact - %% that we've seen the msg_id confirmed until we can associate it - %% with a msg_seq_no. +publish_or_discard(Status, ChPid, MsgId, + State = #state { sender_queues = SQ, msg_id_status = MS }) -> + %% We really are going to do the publish/discard right now, even + %% though we may not have seen it directly from the channel. But + %% we cannot issues confirms until the latter has happened. So we + %% need to keep track of the MsgId and its confirmation status in + %% the meantime. State1 = ensure_monitoring(ChPid, State), {MQ, PendingCh} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, {published, ChPid}, MS)}; - {{value, {Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> - %% We received the msg from the channel first. Thus we - %% need to deal with confirms here. - case needs_confirming(Delivery, State1) of - never -> - {MQ2, PendingCh, MS}; - eventually -> - {MQ2, PendingCh, - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - {MQ2, PendingCh, MS} - end; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + dict:store(MsgId, Status, MS)}; + {{value, Delivery = #delivery { + message = #basic_message { id = MsgId } }}, MQ2} -> + {MQ2, PendingCh, + %% We received the msg from the channel first. Thus + %% we need to deal with confirms here. + send_or_record_confirm(Status, Delivery, MS, State1)}; + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly @@ -744,73 +693,48 @@ process_instruction( %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), - State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, - - {ok, - case Deliver of - false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State2 #state { backing_queue_state = BQS1 }; - {true, AckRequired} -> - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, - ChPid, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State2 #state { backing_queue_state = BQS1 }) - end}; -process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, - State = #state { sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - msg_id_status = MS }) -> - %% Many of the comments around the publish head above apply here - %% too. - State1 = ensure_monitoring(ChPid, State), - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - {MQ1, PendingCh1, MS1} = - case queue:out(MQ) of - {empty, _MQ} -> - {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, discarded, MS)}; - {{value, {#delivery { message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> - %% We've already seen it from the channel, we're not - %% going to see this again, so don't add it to MS - {MQ2, PendingCh, MS}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> - %% The instruction was sent to us before we were - %% within the slave_pids within the #amqqueue{} - %% record. We'll never receive the message directly - %% from the channel. - {MQ, PendingCh, MS} - end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), - BQS1 = BQ:discard(Msg, ChPid, BQS), - {ok, State1 #state { sender_queues = SQ1, - msg_id_status = MS1, - backing_queue_state = BQS1 }}; -process_instruction({set_length, Length, AckRequired}, + State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. + + +process_instruction({publish, ChPid, MsgProps, + Msg = #basic_message { id = MsgId }}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(published, ChPid, MsgId, State), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + {ok, State1 #state { backing_queue_state = BQS1 }}; +process_instruction({publish_delivered, ChPid, MsgProps, + Msg = #basic_message { id = MsgId }}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(published, ChPid, MsgId, State), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {ok, maybe_store_ack(true, MsgId, AckTag, + State1 #state { backing_queue_state = BQS1 })}; +process_instruction({discard, ChPid, MsgId}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(discarded, ChPid, MsgId, State), + BQS1 = BQ:discard(MsgId, ChPid, BQS), + {ok, State1 #state { backing_queue_state = BQS1 }}; +process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - ToDrop = QLen - Length, - {ok, - case ToDrop >= 0 of - true -> - State1 = - lists:foldl( - fun (const, StateN = #state {backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _IsDelivered, AckTag, - _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN), - maybe_store_ack( - AckRequired, MsgId, AckTag, - StateN #state { backing_queue_state = BQSN1 }) - end, State, lists:duplicate(ToDrop, const)), - set_synchronised(true, State1); - false -> - State - end}; + ToDrop = case QLen - Length of + N when N > 0 -> N; + _ -> 0 + end, + State1 = lists:foldl( + fun (const, StateN = #state{backing_queue_state = BQSN}) -> + {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} = + BQ:fetch(AckRequired, BQSN), + maybe_store_ack( + AckRequired, MsgId, AckTag, + StateN #state { backing_queue_state = BQSN1 }) + end, State, lists:duplicate(ToDrop, const)), + {ok, case AckRequired of + true -> State1; + false -> update_delta(ToDrop - Dropped, State1) + end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -821,11 +745,10 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); - Other when Other + 1 =:= Remaining -> - set_synchronised(true, State); - Other when Other < Remaining -> - %% we must be shorter than the master - State + _ when QLen =< Remaining andalso AckRequired -> + State; + _ when QLen =< Remaining -> + update_delta(-1, State) end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, @@ -834,27 +757,17 @@ process_instruction({ack, MsgIds}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }}; + {ok, update_delta(length(MsgIds1) - length(MsgIds), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {ok, case length(AckTags) =:= length(MsgIds) of - true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }; - false -> - %% The only thing we can safely do is nuke out our BQ - %% and MA. The interaction between this and confirms - %% doesn't really bear thinking about... - {_Count, BQS1} = BQ:purge(BQS), - {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), - State #state { msg_id_ack = dict:new(), - backing_queue_state = BQS2 } - end}; + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, @@ -872,10 +785,11 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, - State = #state { backing_queue = BQ, +process_instruction({depth, Depth}, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; + {ok, set_delta(Depth - BQ:depth(BQS), State)}; + process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -887,31 +801,45 @@ msg_ids_to_acktags(MsgIds, MA) -> lists:foldl( fun (MsgId, {Acc, MAN}) -> case dict:find(MsgId, MA) of - error -> {Acc, MAN}; - {ok, {_Num, AckTag}} -> {[AckTag | Acc], - dict:erase(MsgId, MAN)} + error -> {Acc, MAN}; + {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. -ack_all(BQ, MA, BQS) -> - BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). - maybe_store_ack(false, _MsgId, _AckTag, State) -> State; -maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, - ack_num = Num }) -> - State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), - ack_num = Num + 1 }. - -%% We intentionally leave out the head where a slave becomes -%% unsynchronised: we assert that can never happen. -set_synchronised(true, State = #state { q = #amqqueue { name = QName }, - synchronised = false }) -> - rabbit_event:notify(queue_slave_synchronised, [{pid, self()}, - {name, QName}]), - State #state { synchronised = true }; -set_synchronised(true, State) -> +maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) -> + State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }. + +set_delta(0, State = #state { depth_delta = undefined }) -> + ok = record_synchronised(State#state.q), + State #state { depth_delta = 0 }; +set_delta(NewDelta, State = #state { depth_delta = undefined }) -> + true = NewDelta > 0, %% assertion + State #state { depth_delta = NewDelta }; +set_delta(NewDelta, State = #state { depth_delta = Delta }) -> + update_delta(NewDelta - Delta, State). + +update_delta(_DeltaChange, State = #state { depth_delta = undefined }) -> State; -set_synchronised(false, State = #state { synchronised = false }) -> - State. +update_delta( DeltaChange, State = #state { depth_delta = 0 }) -> + 0 = DeltaChange, %% assertion: we cannot become unsync'ed + State; +update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> + true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed + set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). + +record_synchronised(#amqqueue { name = QName }) -> + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q = #amqqueue { sync_slave_pids = SSPids }] -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { sync_slave_pids = [Self | SSPids] }), + ok + end + end). |