diff options
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 850 |
1 files changed, 850 insertions, 0 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl new file mode 100644 index 00000000..b38a8967 --- /dev/null +++ b/src/rabbit_mirror_queue_slave.erl @@ -0,0 +1,850 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave). + +%% For general documentation of HA design, see +%% rabbit_mirror_queue_coordinator +%% +%% We join the GM group before we add ourselves to the amqqueue +%% record. As a result: +%% 1. We can receive msgs from GM that correspond to messages we will +%% never receive from publishers. +%% 2. When we receive a message from publishers, we must receive a +%% message from the GM group for it. +%% 3. However, that instruction from the GM group can arrive either +%% before or after the actual message. We need to be able to +%% distinguish between GM instructions arriving early, and case (1) +%% above. +%% +%% All instructions from the GM group must be processed in the order +%% in which they're received. + +-export([start_link/1, set_maximum_since_use/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(DEATH_TIMEOUT, 20000). %% 20 seconds + +-record(state, { q, + gm, + master_pid, + backing_queue, + backing_queue_state, + sync_timer_ref, + rate_timer_ref, + + sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} + msg_id_ack, %% :: MsgId -> AckTag + ack_num, + + msg_id_status, + known_senders + }). + +start_link(Q) -> + gen_server2:start_link(?MODULE, [Q], []). + +set_maximum_since_use(QPid, Age) -> + gen_server2:cast(QPid, {set_maximum_since_use, Age}). + +init([#amqqueue { name = QueueName } = Q]) -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + Self = self(), + Node = node(), + {ok, MPid} = + rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + %% ASSERTION + [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], + MPids1 = MPids ++ [Self], + mnesia:write(rabbit_queue, + Q1 #amqqueue { slave_pids = MPids1 }, + write), + {ok, QPid} + end), + erlang:monitor(process, MPid), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + {ok, #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new() + }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "immediate" delivery mode + + %% It is safe to reply 'false' here even if a) we've not seen the + %% msg via gm, or b) the master dies before we receive the msg via + %% gm. In the case of (a), we will eventually receive the msg via + %% gm, and it's only the master's result to the channel that is + %% important. In the case of (b), if the master does die and we do + %% get promoted then at that point we have no consumers, thus + %% 'false' is precisely the correct answer. However, we must be + %% careful to _not_ enqueue the message in this case. + + %% Note this is distinct from the case where we receive the msg + %% via gm first, then we're promoted to master, and only then do + %% we receive the msg from the channel. + gen_server2:reply(From, false), %% master may deliver it, not us + noreply(maybe_enqueue_message(Delivery, false, State)); + +handle_call({deliver, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "mandatory" delivery mode + gen_server2:reply(From, true), %% amqqueue throws away the result anyway + noreply(maybe_enqueue_message(Delivery, true, State)); + +handle_call({gm_deaths, Deaths}, From, + State = #state { q = #amqqueue { name = QueueName }, + gm = GM, + master_pid = MPid }) -> + rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n", + [rabbit_misc:rs(QueueName), + rabbit_misc:pid_to_string(self()), + [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), + %% The GM has told us about deaths, which means we're not going to + %% receive any more messages from GM + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= node(MPid) -> + %% master hasn't changed + reply(ok, State); + {ok, Pid} when node(Pid) =:= node() -> + %% we've become master + promote_me(From, State); + {ok, Pid} -> + %% master has changed to not us. + gen_server2:reply(From, ok), + erlang:monitor(process, Pid), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_pid = Pid }); + {error, not_found} -> + gen_server2:reply(From, ok), + {stop, normal, State} + end. + +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); + +handle_cast({gm, Instruction}, State) -> + handle_process_result(process_instruction(Instruction, State)); + +handle_cast({deliver, Delivery = #delivery {}}, State) -> + %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + noreply(maybe_enqueue_message(Delivery, true, State)); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State); + +handle_cast({set_ram_duration_target, Duration}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State #state { backing_queue_state = BQS1 }); + +handle_cast(update_ram_duration, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State #state { rate_timer_ref = just_measured, + backing_queue_state = BQS2 }); + +handle_cast(sync_timeout, State) -> + noreply(backing_queue_timeout( + State #state { sync_timer_ref = undefined })). + +handle_info(timeout, State) -> + noreply(backing_queue_timeout(State)); + +handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, + State = #state { gm = GM, master_pid = MPid }) -> + ok = gm:broadcast(GM, {process_death, MPid}), + noreply(State); + +handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> + noreply(local_sender_death(ChPid, State)); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +%% If the Reason is shutdown, or {shutdown, _}, it is not the queue +%% being deleted: it's just the node going down. Even though we're a +%% slave, we have no idea whether or not we'll be the only copy coming +%% back up. Thus we must assume we will be, and preserve anything we +%% have on disk. +terminate(_Reason, #state { backing_queue_state = undefined }) -> + %% We've received a delete_and_terminate from gm, thus nothing to + %% do here. + ok; +terminate({shutdown, dropped} = R, #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + %% See rabbit_mirror_queue_master:terminate/2 + BQ:delete_and_terminate(R, BQS); +terminate(Reason, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef }) -> + ok = gm:leave(GM), + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, BQ, BQS, RateTRef, [], [], dict:new()), + rabbit_amqqueue_process:terminate(Reason, QueueState); +terminate([_SPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQS3 = BQ:handle_pre_hibernate(BQS2), + {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. + +prioritise_call(Msg, _From, _State) -> + case Msg of + {gm_deaths, _Deaths} -> 5; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {run_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + {gm, _Msg} -> 5; + {post_commit, _Txn, _AckTags} -> 4; + _ -> 0 + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([SPid], _Members) -> + SPid ! {joined, self()}, + ok. + +members_changed([_SPid], _Births, []) -> + ok; +members_changed([SPid], _Births, Deaths) -> + inform_deaths(SPid, Deaths). + +handle_msg([_SPid], _From, heartbeat) -> + ok; +handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> + %% This is only of value to the master + ok; +handle_msg([SPid], _From, {process_death, Pid}) -> + inform_deaths(SPid, [Pid]); +handle_msg([SPid], _From, Msg) -> + ok = gen_server2:cast(SPid, {gm, Msg}). + +inform_deaths(SPid, Deaths) -> + rabbit_misc:with_exit_handler( + fun () -> {stop, normal} end, + fun () -> + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> + ok; + {promote, CPid} -> + {become, rabbit_mirror_queue_coordinator, [CPid]} + end + end). + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +bq_init(BQ, Q, Recover) -> + Self = self(), + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) + end). + +run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> + %% Yes, this might look a little crazy, but see comments in + %% confirm_sender_death/1 + Fun(?MODULE, State); +run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. + +needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> + never; +needs_confirming(#delivery { message = #basic_message { + is_persistent = true } }, + #state { q = #amqqueue { durable = true } }) -> + eventually; +needs_confirming(_Delivery, _State) -> + immediately. + +confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> + {MS1, CMs} = + lists:foldl( + fun (MsgId, {MSN, CMsN} = Acc) -> + %% We will never see 'discarded' here + case dict:find(MsgId, MSN) of + error -> + %% If it needed confirming, it'll have + %% already been done. + Acc; + {ok, {published, ChPid}} -> + %% Still not seen it from the channel, just + %% record that it's been confirmed. + {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN}; + {ok, {published, ChPid, MsgSeqNo}} -> + %% Seen from both GM and Channel. Can now + %% confirm. + {dict:erase(MsgId, MSN), + gb_trees_cons(ChPid, MsgSeqNo, CMsN)}; + {ok, {confirmed, _ChPid}} -> + %% It's already been confirmed. This is + %% probably it's been both sync'd to disk + %% and then delivered and ack'd before we've + %% seen the publish from the + %% channel. Nothing to do here. + Acc + end + end, {MS, gb_trees:empty()}, MsgIds), + [ok = rabbit_channel:confirm(ChPid, MsgSeqNos) + || {ChPid, MsgSeqNos} <- gb_trees:to_list(CMs)], + State #state { msg_id_status = MS1 }. + +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. + +handle_process_result({ok, State}) -> noreply(State); +handle_process_result({stop, State}) -> {stop, normal, State}. + +promote_me(From, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + sender_queues = SQ, + msg_id_ack = MA, + msg_id_status = MS, + known_senders = KS }) -> + rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", + [rabbit_misc:rs(Q #amqqueue.name), + rabbit_misc:pid_to_string(self())]), + Q1 = Q #amqqueue { pid = self() }, + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( + Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), + true = unlink(GM), + gen_server2:reply(From, {promote, CPid}), + ok = gm:confirmed_broadcast(GM, heartbeat), + + %% Everything that we're monitoring, we need to ensure our new + %% coordinator is monitoring. + + MonitoringPids = [begin true = erlang:demonitor(MRef), + Pid + end || {Pid, MRef} <- dict:to_list(KS)], + ok = rabbit_mirror_queue_coordinator:ensure_monitoring( + CPid, MonitoringPids), + + %% We find all the messages that we've received from channels but + %% not from gm, and if they're due to be enqueued on promotion + %% then we pass them to the + %% queue_process:init_with_backing_queue_state to be enqueued. + %% + %% We also have to requeue messages which are pending acks: the + %% consumers from the master queue have been lost and so these + %% messages need requeuing. They might also be pending + %% confirmation, and indeed they might also be pending arrival of + %% the publication from the channel itself, if we received both + %% the publication and the fetch via gm first! Requeuing doesn't + %% affect confirmations: if the message was previously pending a + %% confirmation then it still will be, under the same msg_id. So + %% as a master, we need to be prepared to filter out the + %% publication of said messages from the channel (is_duplicate + %% (thus such requeued messages must remain in the msg_id_status + %% (MS) which becomes seen_status (SS) in the master)). + %% + %% Then there are messages we already have in the queue, which are + %% not currently pending acknowledgement: + %% 1. Messages we've only received via gm: + %% Filter out subsequent publication from channel through + %% validate_message. Might have to issue confirms then or + %% later, thus queue_process state will have to know that + %% there's a pending confirm. + %% 2. Messages received via both gm and channel: + %% Queue will have to deal with issuing confirms if necessary. + %% + %% MS contains the following three entry types: + %% + %% a) {published, ChPid}: + %% published via gm only; pending arrival of publication from + %% channel, maybe pending confirm. + %% + %% b) {published, ChPid, MsgSeqNo}: + %% published via gm and channel; pending confirm. + %% + %% c) {confirmed, ChPid}: + %% published via gm only, and confirmed; pending publication + %% from channel. + %% + %% d) discarded + %% seen via gm only as discarded. Pending publication from + %% channel + %% + %% The forms a, c and d only, need to go to the master state + %% seen_status (SS). + %% + %% The form b only, needs to go through to the queue_process + %% state to form the msg_id_to_channel mapping (MTC). + %% + %% No messages that are enqueued from SQ at this point will have + %% entries in MS. + %% + %% Messages that are extracted from MA may have entries in MS, and + %% those messages are then requeued. However, as discussed above, + %% this does not affect MS, nor which bits go through to SS in + %% Master, or MTC in queue_process. + %% + %% Everything that's in MA gets requeued. Consequently the new + %% master should start with a fresh AM as there are no messages + %% pending acks. + + MSList = dict:to_list(MS), + SS = dict:from_list( + [E || E = {_MsgId, discarded} <- MSList] ++ + [{MsgId, Status} + || {MsgId, {Status, _ChPid}} <- MSList, + Status =:= published orelse Status =:= confirmed]), + + MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( + CPid, BQ, BQS, GM, SS, MonitoringPids), + + MTC = dict:from_list( + [{MsgId, {ChPid, MsgSeqNo}} || + {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], + AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], + Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), + {Delivery, true} <- queue:to_list(PubQ)], + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, + AckTags, Deliveries, MTC), + {become, rabbit_amqqueue_process, QueueState, hibernate}. + +noreply(State) -> + {NewState, Timeout} = next_state(State), + {noreply, NewState, Timeout}. + +reply(Reply, State) -> + {NewState, Timeout} = next_state(State), + {reply, Reply, NewState, Timeout}. + +next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + State1 = ensure_rate_timer( + confirm_messages(MsgIds, State #state { + backing_queue_state = BQS1 })), + case BQ:needs_timeout(BQS1) of + false -> {stop_sync_timer(State1), hibernate}; + idle -> {stop_sync_timer(State1), 0 }; + timed -> {ensure_sync_timer(State1), 0 } + end. + +backing_queue_timeout(State = #state { backing_queue = BQ }) -> + run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). + +ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + State #state { sync_timer_ref = TRef }; +ensure_sync_timer(State) -> + State. + +stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { sync_timer_ref = undefined }. + +ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State #state { rate_timer_ref = TRef }; +ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +ensure_rate_timer(State) -> + State. + +stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> + State; +stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { rate_timer_ref = undefined }. + +ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> + case dict:is_key(ChPid, KS) of + true -> State; + false -> MRef = erlang:monitor(process, ChPid), + State #state { known_senders = dict:store(ChPid, MRef, KS) } + end. + +local_sender_death(ChPid, State = #state { known_senders = KS }) -> + ok = case dict:is_key(ChPid, KS) of + false -> ok; + true -> confirm_sender_death(ChPid) + end, + State. + +confirm_sender_death(Pid) -> + %% We have to deal with the possibility that we'll be promoted to + %% master before this thing gets run. Consequently we set the + %% module to rabbit_mirror_queue_master so that if we do become a + %% rabbit_amqqueue_process before then, sane things will happen. + Fun = + fun (?MODULE, State = #state { known_senders = KS, + gm = GM }) -> + %% We're running still as a slave + ok = case dict:is_key(Pid, KS) of + false -> ok; + true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), + confirm_sender_death(Pid) + end, + State; + (rabbit_mirror_queue_master, State) -> + %% We've become a master. State is now opaque to + %% us. When we became master, if Pid was still known + %% to us then we'd have set up monitoring of it then, + %% so this is now a noop. + State + end, + %% Note that we do not remove our knowledge of this ChPid until we + %% get the sender_death from GM. + {ok, _TRef} = timer:apply_after( + ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue, + [self(), rabbit_mirror_queue_master, Fun]), + ok. + +maybe_enqueue_message( + Delivery = #delivery { message = #basic_message { id = MsgId }, + msg_seq_no = MsgSeqNo, + sender = ChPid }, + EnqueueOnPromotion, + State = #state { sender_queues = SQ, msg_id_status = MS }) -> + State1 = ensure_monitoring(ChPid, State), + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, MS) of + error -> + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), + State1 #state { sender_queues = SQ1 }; + {ok, {confirmed, ChPid}} -> + %% BQ has confirmed it but we didn't know what the + %% msg_seq_no was at the time. We do now! + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { sender_queues = SQ1, + msg_id_status = dict:erase(MsgId, MS) }; + {ok, {published, ChPid}} -> + %% It was published to the BQ and we didn't know the + %% msg_seq_no so couldn't confirm it at the time. + case needs_confirming(Delivery, State1) of + never -> + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 }; + eventually -> + State1 #state { + msg_id_status = + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 } + end; + {ok, discarded} -> + %% We've already heard from GM that the msg is to be + %% discarded. We won't see this again. + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 } + end. + +get_sender_queue(ChPid, SQ) -> + case dict:find(ChPid, SQ) of + error -> {queue:new(), sets:new()}; + {ok, Val} -> Val + end. + +remove_from_pending_ch(MsgId, ChPid, SQ) -> + case dict:find(ChPid, SQ) of + error -> + SQ; + {ok, {MQ, PendingCh}} -> + dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) + end. + +process_instruction( + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_status = MS }) -> + + %% We really are going to do the publish right now, even though we + %% may not have seen it directly from the channel. As a result, we + %% may know that it needs confirming without knowing its + %% msg_seq_no, which means that we can see the confirmation come + %% back from the backing queue without knowing the msg_seq_no, + %% which means that we're going to have to hang on to the fact + %% that we've seen the msg_id confirmed until we can associate it + %% with a msg_seq_no. + State1 = ensure_monitoring(ChPid, State), + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ1, PendingCh1, MS1} = + case queue:out(MQ) of + {empty, _MQ2} -> + {MQ, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, {published, ChPid}, MS)}; + {{value, {Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ2} -> + %% We received the msg from the channel first. Thus we + %% need to deal with confirms here. + case needs_confirming(Delivery, State1) of + never -> + {MQ2, PendingCh, MS}; + eventually -> + {MQ2, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + {MQ2, PendingCh, MS} + end; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + %% The instruction was sent to us before we were + %% within the slave_pids within the #amqqueue{} + %% record. We'll never receive the message directly + %% from the channel. And the channel will not be + %% expecting any confirms from us. + {MQ, PendingCh, MS} + end, + + SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, + + {ok, + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State2 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State2 #state { backing_queue_state = BQS1 }) + end}; +process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_status = MS }) -> + %% Many of the comments around the publish head above apply here + %% too. + State1 = ensure_monitoring(ChPid, State), + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ1, PendingCh1, MS1} = + case queue:out(MQ) of + {empty, _MQ} -> + {MQ, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, discarded, MS)}; + {{value, {#delivery { message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ2} -> + %% We've already seen it from the channel, we're not + %% going to see this again, so don't add it to MS + {MQ2, PendingCh, MS}; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + %% The instruction was sent to us before we were + %% within the slave_pids within the #amqqueue{} + %% record. We'll never receive the message directly + %% from the channel. + {MQ, PendingCh, MS} + end, + SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + BQS1 = BQ:discard(Msg, ChPid, BQS), + {ok, State1 #state { sender_queues = SQ1, + msg_id_status = MS1, + backing_queue_state = BQS1 }}; +process_instruction({set_length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + ToDrop = QLen - Length, + {ok, case ToDrop > 0 of + true -> BQS1 = + lists:foldl( + fun (const, BQSN) -> + {{_Msg, _IsDelivered, _AckTag, _Remaining}, + BQSN1} = BQ:fetch(false, BQSN), + BQSN1 + end, BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State + end}; +process_instruction({fetch, AckRequired, MsgId, Remaining}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + {ok, case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }); + Other when Other < Remaining -> + %% we must be shorter than the master + State + end}; +process_instruction({ack, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), + [] = MsgIds1 -- MsgIds, %% ASSERTION + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; +process_instruction({requeue, MsgPropsFun, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {ok, case length(AckTags) =:= length(MsgIds) of + true -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }; + false -> + %% The only thing we can safely do is nuke out our BQ + %% and MA. The interaction between this and confirms + %% doesn't really bear thinking about... + {_Count, BQS1} = BQ:purge(BQS), + {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), + State #state { msg_id_ack = dict:new(), + backing_queue_state = BQS2 } + end}; +process_instruction({sender_death, ChPid}, + State = #state { sender_queues = SQ, + msg_id_status = MS, + known_senders = KS }) -> + {ok, case dict:find(ChPid, KS) of + error -> + State; + {ok, MRef} -> + true = erlang:demonitor(MRef), + MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = dict:erase(ChPid, KS) } + end}; +process_instruction({delete_and_terminate, Reason}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQ:delete_and_terminate(Reason, BQS), + {stop, State #state { backing_queue_state = undefined }}. + +msg_ids_to_acktags(MsgIds, MA) -> + {AckTags, MA1} = + lists:foldl( + fun (MsgId, {Acc, MAN}) -> + case dict:find(MsgId, MA) of + error -> {Acc, MAN}; + {ok, {_Num, AckTag}} -> {[AckTag | Acc], + dict:erase(MsgId, MAN)} + end + end, {[], MA}, MsgIds), + {lists:reverse(AckTags), MA1}. + +ack_all(BQ, MA, BQS) -> + BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). + +maybe_store_ack(false, _MsgId, _AckTag, State) -> + State; +maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, + ack_num = Num }) -> + State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), + ack_num = Num + 1 }. |