+%% For general documentation of HA design, see
+%% rabbit_mirror_queue_coordinator
+%% We join the GM group before we add ourselves to the amqqueue
+%% record. As a result:
+%% 1. We can receive msgs from GM that correspond to messages we will
+%% never receive from publishers.
+%% 2. When we receive a message from publishers, we must receive a
+%% message from the GM group for it.
+%% 3. However, that instruction from the GM group can arrive either
+%% before or after the actual message. We need to be able to
+%% distinguish between GM instructions arriving early, and case (1)
+%% above.
+%% All instructions from the GM group must be processed in the order
+%% in which they're received.
+-export([start_link/1, set_maximum_since_use/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
+-export([joined/2, members_changed/3, handle_msg/3]).
+-define(SYNC_INTERVAL, 25). %% milliseconds
+-define(DEATH_TIMEOUT, 20000). %% 20 seconds
+-record(state, { q,
+ gm,
+ master_pid,
+ backing_queue,
+ backing_queue_state,
+ sync_timer_ref,
+ rate_timer_ref,
+ sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId}
+ msg_id_ack, %% :: MsgId -> AckTag
+ ack_num,
+ msg_id_status,
+ known_senders
+ }).
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, [Q], []).
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+init([#amqqueue { name = QueueName } = Q]) ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ Self = self(),
+ Node = node(),
+ {ok, MPid} =
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
+ MPids1 = MPids ++ [Self],
+ mnesia:write(rabbit_queue,
+ Q1 #amqqueue { slave_pids = MPids1 },
+ write),
+ {ok, QPid}
+ end),
+ erlang:monitor(process, MPid),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = bq_init(BQ, Q, false),
+ {ok, #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+ msg_id_status = dict:new(),
+ known_senders = dict:new()
+ }, hibernate,
+handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "immediate" delivery mode
+ %% It is safe to reply 'false' here even if a) we've not seen the
+ %% msg via gm, or b) the master dies before we receive the msg via
+ %% gm. In the case of (a), we will eventually receive the msg via
+ %% gm, and it's only the master's result to the channel that is
+ %% important. In the case of (b), if the master does die and we do
+ %% get promoted then at that point we have no consumers, thus
+ %% 'false' is precisely the correct answer. However, we must be
+ %% careful to _not_ enqueue the message in this case.
+ %% Note this is distinct from the case where we receive the msg
+ %% via gm first, then we're promoted to master, and only then do
+ %% we receive the msg from the channel.
+ gen_server2:reply(From, false), %% master may deliver it, not us
+ noreply(maybe_enqueue_message(Delivery, false, State));
+handle_call({deliver, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode
+ gen_server2:reply(From, true), %% amqqueue throws away the result anyway
+ noreply(maybe_enqueue_message(Delivery, true, State));
+handle_call({gm_deaths, Deaths}, From,
+ State = #state { q = #amqqueue { name = QueueName },
+ gm = GM,
+ master_pid = MPid }) ->
+ rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n",
+ [rabbit_misc:rs(QueueName),
+ rabbit_misc:pid_to_string(self()),
+ [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]),
+ %% The GM has told us about deaths, which means we're not going to
+ %% receive any more messages from GM
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= node(MPid) ->
+ %% master hasn't changed
+ reply(ok, State);
+ {ok, Pid} when node(Pid) =:= node() ->
+ %% we've become master
+ promote_me(From, State);
+ {ok, Pid} ->
+ %% master has changed to not us.
+ gen_server2:reply(From, ok),
+ erlang:monitor(process, Pid),
+ ok = gm:broadcast(GM, heartbeat),
+ noreply(State #state { master_pid = Pid });
+ {error, not_found} ->
+ gen_server2:reply(From, ok),
+ {stop, normal, State}
+ end.
+handle_cast({run_backing_queue, Mod, Fun}, State) ->
+ noreply(run_backing_queue(Mod, Fun, State));
+handle_cast({gm, Instruction}, State) ->
+ handle_process_result(process_instruction(Instruction, State));
+handle_cast({deliver, Delivery = #delivery {}}, State) ->
+ %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ noreply(maybe_enqueue_message(Delivery, true, State));
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+handle_cast({set_ram_duration_target, Duration},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ noreply(State #state { backing_queue_state = BQS1 });
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State #state { rate_timer_ref = just_measured,
+ backing_queue_state = BQS2 });
+handle_cast(sync_timeout, State) ->
+ noreply(backing_queue_timeout(
+ State #state { sync_timer_ref = undefined })).
+handle_info(timeout, State) ->
+ noreply(backing_queue_timeout(State));
+handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
+ State = #state { gm = GM, master_pid = MPid }) ->
+ ok = gm:broadcast(GM, {process_death, MPid}),
+ noreply(State);
+handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
+ noreply(local_sender_death(ChPid, State));
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
+%% being deleted: it's just the node going down. Even though we're a
+%% slave, we have no idea whether or not we'll be the only copy coming
+%% back up. Thus we must assume we will be, and preserve anything we
+%% have on disk.
+terminate(_Reason, #state { backing_queue_state = undefined }) ->
+ %% We've received a delete_and_terminate from gm, thus nothing to
+ %% do here.
+ ok;
+terminate({shutdown, dropped} = R, #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ %% See rabbit_mirror_queue_master:terminate/2
+ BQ:delete_and_terminate(R, BQS);
+terminate(Reason, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef }) ->
+ ok = gm:leave(GM),
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q, BQ, BQS, RateTRef, [], [], dict:new()),
+ rabbit_amqqueue_process:terminate(Reason, QueueState);
+terminate([_SPid], _Reason) ->
+ %% gm case
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQS3 = BQ:handle_pre_hibernate(BQS2),
+ {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ {gm_deaths, _Deaths} -> 5;
+ _ -> 0
+ end.
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ {gm, _Msg} -> 5;
+ {post_commit, _Txn, _AckTags} -> 4;
+ _ -> 0
+ end.
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+joined([SPid], _Members) ->
+ SPid ! {joined, self()},
+ ok.
+members_changed([_SPid], _Births, []) ->
+ ok;
+members_changed([SPid], _Births, Deaths) ->
+ inform_deaths(SPid, Deaths).
+handle_msg([_SPid], _From, heartbeat) ->
+ ok;
+handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
+ %% This is only of value to the master
+ ok;
+handle_msg([SPid], _From, {process_death, Pid}) ->
+ inform_deaths(SPid, [Pid]);
+handle_msg([SPid], _From, Msg) ->
+ ok = gen_server2:cast(SPid, {gm, Msg}).
+inform_deaths(SPid, Deaths) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> {stop, normal} end,
+ fun () ->
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok ->
+ ok;
+ {promote, CPid} ->
+ {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end
+ end).
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+bq_init(BQ, Q, Recover) ->
+ Self = self(),
+ BQ:init(Q, Recover,
+ fun (Mod, Fun) ->
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
+ end).
+run_backing_queue(rabbit_mirror_queue_master, Fun, State) ->
+ %% Yes, this might look a little crazy, but see comments in
+ %% confirm_sender_death/1
+ Fun(?MODULE, State);
+run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
+needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
+ never;
+needs_confirming(#delivery { message = #basic_message {
+ is_persistent = true } },
+ #state { q = #amqqueue { durable = true } }) ->
+ eventually;
+needs_confirming(_Delivery, _State) ->
+ immediately.
+confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
+ {MS1, CMs} =
+ lists:foldl(
+ fun (MsgId, {MSN, CMsN} = Acc) ->
+ %% We will never see 'discarded' here
+ case dict:find(MsgId, MSN) of
+ error ->
+ %% If it needed confirming, it'll have
+ %% already been done.
+ Acc;
+ {ok, {published, ChPid}} ->
+ %% Still not seen it from the channel, just
+ %% record that it's been confirmed.
+ {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN};
+ {ok, {published, ChPid, MsgSeqNo}} ->
+ %% Seen from both GM and Channel. Can now
+ %% confirm.
+ {dict:erase(MsgId, MSN),
+ gb_trees_cons(ChPid, MsgSeqNo, CMsN)};
+ {ok, {confirmed, _ChPid}} ->
+ %% It's already been confirmed. This is
+ %% probably it's been both sync'd to disk
+ %% and then delivered and ack'd before we've
+ %% seen the publish from the
+ %% channel. Nothing to do here.
+ Acc
+ end
+ end, {MS, gb_trees:empty()}, MsgIds),
+ [ok = rabbit_channel:confirm(ChPid, MsgSeqNos)
+ || {ChPid, MsgSeqNos} <- gb_trees:to_list(CMs)],
+ State #state { msg_id_status = MS1 }.
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
+handle_process_result({ok, State}) -> noreply(State);
+handle_process_result({stop, State}) -> {stop, normal, State}.
+promote_me(From, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ sender_queues = SQ,
+ msg_id_ack = MA,
+ msg_id_status = MS,
+ known_senders = KS }) ->
+ rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
+ [rabbit_misc:rs(Q,
+ rabbit_misc:pid_to_string(self())]),
+ Q1 = Q #amqqueue { pid = self() },
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
+ true = unlink(GM),
+ gen_server2:reply(From, {promote, CPid}),
+ ok = gm:confirmed_broadcast(GM, heartbeat),
+ %% Everything that we're monitoring, we need to ensure our new
+ %% coordinator is monitoring.
+ MonitoringPids = [begin true = erlang:demonitor(MRef),
+ Pid
+ end || {Pid, MRef} <- dict:to_list(KS)],
+ ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
+ CPid, MonitoringPids),
+ %% We find all the messages that we've received from channels but
+ %% not from gm, and if they're due to be enqueued on promotion
+ %% then we pass them to the
+ %% queue_process:init_with_backing_queue_state to be enqueued.
+ %%
+ %% We also have to requeue messages which are pending acks: the
+ %% consumers from the master queue have been lost and so these
+ %% messages need requeuing. They might also be pending
+ %% confirmation, and indeed they might also be pending arrival of
+ %% the publication from the channel itself, if we received both
+ %% the publication and the fetch via gm first! Requeuing doesn't
+ %% affect confirmations: if the message was previously pending a
+ %% confirmation then it still will be, under the same msg_id. So
+ %% as a master, we need to be prepared to filter out the
+ %% publication of said messages from the channel (is_duplicate
+ %% (thus such requeued messages must remain in the msg_id_status
+ %% (MS) which becomes seen_status (SS) in the master)).
+ %%
+ %% Then there are messages we already have in the queue, which are
+ %% not currently pending acknowledgement:
+ %% 1. Messages we've only received via gm:
+ %% Filter out subsequent publication from channel through
+ %% validate_message. Might have to issue confirms then or
+ %% later, thus queue_process state will have to know that
+ %% there's a pending confirm.
+ %% 2. Messages received via both gm and channel:
+ %% Queue will have to deal with issuing confirms if necessary.
+ %%
+ %% MS contains the following three entry types:
+ %%
+ %% a) {published, ChPid}:
+ %% published via gm only; pending arrival of publication from
+ %% channel, maybe pending confirm.
+ %%
+ %% b) {published, ChPid, MsgSeqNo}:
+ %% published via gm and channel; pending confirm.
+ %%
+ %% c) {confirmed, ChPid}:
+ %% published via gm only, and confirmed; pending publication
+ %% from channel.
+ %%
+ %% d) discarded
+ %% seen via gm only as discarded. Pending publication from
+ %% channel
+ %%
+ %% The forms a, c and d only, need to go to the master state
+ %% seen_status (SS).
+ %%
+ %% The form b only, needs to go through to the queue_process
+ %% state to form the msg_id_to_channel mapping (MTC).
+ %%
+ %% No messages that are enqueued from SQ at this point will have
+ %% entries in MS.
+ %%
+ %% Messages that are extracted from MA may have entries in MS, and
+ %% those messages are then requeued. However, as discussed above,
+ %% this does not affect MS, nor which bits go through to SS in
+ %% Master, or MTC in queue_process.
+ %%
+ %% Everything that's in MA gets requeued. Consequently the new
+ %% master should start with a fresh AM as there are no messages
+ %% pending acks.
+ MSList = dict:to_list(MS),
+ SS = dict:from_list(
+ [E || E = {_MsgId, discarded} <- MSList] ++
+ [{MsgId, Status}
+ || {MsgId, {Status, _ChPid}} <- MSList,
+ Status =:= published orelse Status =:= confirmed]),
+ MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
+ CPid, BQ, BQS, GM, SS, MonitoringPids),
+ MTC = dict:from_list(
+ [{MsgId, {ChPid, MsgSeqNo}} ||
+ {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
+ NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
+ AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
+ Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
+ {Delivery, true} <- queue:to_list(PubQ)],
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
+ AckTags, Deliveries, MTC),
+ {become, rabbit_amqqueue_process, QueueState, hibernate}.
+noreply(State) ->
+ {NewState, Timeout} = next_state(State),
+ {noreply, NewState, Timeout}.
+reply(Reply, State) ->
+ {NewState, Timeout} = next_state(State),
+ {reply, Reply, NewState, Timeout}.
+next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ State1 = ensure_rate_timer(
+ confirm_messages(MsgIds, State #state {
+ backing_queue_state = BQS1 })),
+ case BQ:needs_timeout(BQS1) of
+ false -> {stop_sync_timer(State1), hibernate};
+ idle -> {stop_sync_timer(State1), 0 };
+ timed -> {ensure_sync_timer(State1), 0 }
+ end.
+backing_queue_timeout(State = #state { backing_queue = BQ }) ->
+ run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
+ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ State #state { sync_timer_ref = TRef };
+ensure_sync_timer(State) ->
+ State.
+stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ State;
+stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { sync_timer_ref = undefined }.
+ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State #state { rate_timer_ref = TRef };
+ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+ensure_rate_timer(State) ->
+ State.
+stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ State;
+stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { rate_timer_ref = undefined }.
+ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
+ case dict:is_key(ChPid, KS) of
+ true -> State;
+ false -> MRef = erlang:monitor(process, ChPid),
+ State #state { known_senders = dict:store(ChPid, MRef, KS) }
+ end.
+local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+ ok = case dict:is_key(ChPid, KS) of
+ false -> ok;
+ true -> confirm_sender_death(ChPid)
+ end,
+ State.
+confirm_sender_death(Pid) ->
+ %% We have to deal with the possibility that we'll be promoted to
+ %% master before this thing gets run. Consequently we set the
+ %% module to rabbit_mirror_queue_master so that if we do become a
+ %% rabbit_amqqueue_process before then, sane things will happen.
+ Fun =
+ fun (?MODULE, State = #state { known_senders = KS,
+ gm = GM }) ->
+ %% We're running still as a slave
+ ok = case dict:is_key(Pid, KS) of
+ false -> ok;
+ true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
+ confirm_sender_death(Pid)
+ end,
+ State;
+ (rabbit_mirror_queue_master, State) ->
+ %% We've become a master. State is now opaque to
+ %% us. When we became master, if Pid was still known
+ %% to us then we'd have set up monitoring of it then,
+ %% so this is now a noop.
+ State
+ end,
+ %% Note that we do not remove our knowledge of this ChPid until we
+ %% get the sender_death from GM.
+ {ok, _TRef} = timer:apply_after(
+ ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue,
+ [self(), rabbit_mirror_queue_master, Fun]),
+ ok.
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ msg_seq_no = MsgSeqNo,
+ sender = ChPid },
+ EnqueueOnPromotion,
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ State1 = ensure_monitoring(ChPid, State),
+ %% We will never see {published, ChPid, MsgSeqNo} here.
+ case dict:find(MsgId, MS) of
+ error ->
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
+ State1 #state { sender_queues = SQ1 };
+ {ok, {confirmed, ChPid}} ->
+ %% BQ has confirmed it but we didn't know what the
+ %% msg_seq_no was at the time. We do now!
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { sender_queues = SQ1,
+ msg_id_status = dict:erase(MsgId, MS) };
+ {ok, {published, ChPid}} ->
+ %% It was published to the BQ and we didn't know the
+ %% msg_seq_no so couldn't confirm it at the time.
+ case needs_confirming(Delivery, State1) of
+ never ->
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
+ eventually ->
+ State1 #state {
+ msg_id_status =
+ dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 }
+ end;
+ {ok, discarded} ->
+ %% We've already heard from GM that the msg is to be
+ %% discarded. We won't see this again.
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 }
+ end.
+get_sender_queue(ChPid, SQ) ->
+ case dict:find(ChPid, SQ) of
+ error -> {queue:new(), sets:new()};
+ {ok, Val} -> Val
+ end.
+remove_from_pending_ch(MsgId, ChPid, SQ) ->
+ case dict:find(ChPid, SQ) of
+ error ->
+ SQ;
+ {ok, {MQ, PendingCh}} ->
+ dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
+ end.
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
+ State = #state { sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_status = MS }) ->
+ %% We really are going to do the publish right now, even though we
+ %% may not have seen it directly from the channel. As a result, we
+ %% may know that it needs confirming without knowing its
+ %% msg_seq_no, which means that we can see the confirmation come
+ %% back from the backing queue without knowing the msg_seq_no,
+ %% which means that we're going to have to hang on to the fact
+ %% that we've seen the msg_id confirmed until we can associate it
+ %% with a msg_seq_no.
+ State1 = ensure_monitoring(ChPid, State),
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ1, PendingCh1, MS1} =
+ case queue:out(MQ) of
+ {empty, _MQ2} ->
+ {MQ, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, {published, ChPid}, MS)};
+ {{value, {Delivery = #delivery {
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { id = MsgId } },
+ _EnqueueOnPromotion}}, MQ2} ->
+ %% We received the msg from the channel first. Thus we
+ %% need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never ->
+ {MQ2, PendingCh, MS};
+ eventually ->
+ {MQ2, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ {MQ2, PendingCh, MS}
+ end;
+ {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ %% The instruction was sent to us before we were
+ %% within the slave_pids within the #amqqueue{}
+ %% record. We'll never receive the message directly
+ %% from the channel. And the channel will not be
+ %% expecting any confirms from us.
+ {MQ, PendingCh, MS}
+ end,
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
+ {ok,
+ case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State2 #state { backing_queue_state = BQS1 };
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State2 #state { backing_queue_state = BQS1 })
+ end};
+process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
+ State = #state { sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_status = MS }) ->
+ %% Many of the comments around the publish head above apply here
+ %% too.
+ State1 = ensure_monitoring(ChPid, State),
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ1, PendingCh1, MS1} =
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {MQ, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, discarded, MS)};
+ {{value, {#delivery { message = #basic_message { id = MsgId } },
+ _EnqueueOnPromotion}}, MQ2} ->
+ %% We've already seen it from the channel, we're not
+ %% going to see this again, so don't add it to MS
+ {MQ2, PendingCh, MS};
+ {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ %% The instruction was sent to us before we were
+ %% within the slave_pids within the #amqqueue{}
+ %% record. We'll never receive the message directly
+ %% from the channel.
+ {MQ, PendingCh, MS}
+ end,
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ BQS1 = BQ:discard(Msg, ChPid, BQS),
+ {ok, State1 #state { sender_queues = SQ1,
+ msg_id_status = MS1,
+ backing_queue_state = BQS1 }};
+process_instruction({set_length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ QLen = BQ:len(BQS),
+ ToDrop = QLen - Length,
+ {ok, case ToDrop > 0 of
+ true -> BQS1 =
+ lists:foldl(
+ fun (const, BQSN) ->
+ {{_Msg, _IsDelivered, _AckTag, _Remaining},
+ BQSN1} = BQ:fetch(false, BQSN),
+ end, BQS, lists:duplicate(ToDrop, const)),
+ State #state { backing_queue_state = BQS1 };
+ false -> State
+ end};
+process_instruction({fetch, AckRequired, MsgId, Remaining},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ QLen = BQ:len(BQS),
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 });
+ Other when Other < Remaining ->
+ %% we must be shorter than the master
+ State
+ end};
+process_instruction({ack, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_ack = MA }) ->
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
+ [] = MsgIds1 -- MsgIds, %% ASSERTION
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
+process_instruction({requeue, MsgPropsFun, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_ack = MA }) ->
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {ok, case length(AckTags) =:= length(MsgIds) of
+ true ->
+ {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 };
+ false ->
+ %% The only thing we can safely do is nuke out our BQ
+ %% and MA. The interaction between this and confirms
+ %% doesn't really bear thinking about...
+ {_Count, BQS1} = BQ:purge(BQS),
+ {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
+ State #state { msg_id_ack = dict:new(),
+ backing_queue_state = BQS2 }
+ end};
+process_instruction({sender_death, ChPid},
+ State = #state { sender_queues = SQ,
+ msg_id_status = MS,
+ known_senders = KS }) ->
+ {ok, case dict:find(ChPid, KS) of
+ error ->
+ State;
+ {ok, MRef} ->
+ true = erlang:demonitor(MRef),
+ MS1 = case dict:find(ChPid, SQ) of
+ error ->
+ MS;
+ {ok, {_MQ, PendingCh}} ->
+ lists:foldl(fun dict:erase/2, MS,
+ sets:to_list(PendingCh))
+ end,
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = MS1,
+ known_senders = dict:erase(ChPid, KS) }
+ end};
+process_instruction({delete_and_terminate, Reason},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQ:delete_and_terminate(Reason, BQS),
+ {stop, State #state { backing_queue_state = undefined }}.
+msg_ids_to_acktags(MsgIds, MA) ->
+ {AckTags, MA1} =
+ lists:foldl(
+ fun (MsgId, {Acc, MAN}) ->
+ case dict:find(MsgId, MA) of
+ error -> {Acc, MAN};
+ {ok, {_Num, AckTag}} -> {[AckTag | Acc],
+ dict:erase(MsgId, MAN)}
+ end
+ end, {[], MA}, MsgIds),
+ {lists:reverse(AckTags), MA1}.
+ack_all(BQ, MA, BQS) ->
+ BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
+maybe_store_ack(false, _MsgId, _AckTag, State) ->
+ State;
+maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
+ ack_num = Num }) ->
+ State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
+ ack_num = Num + 1 }.