From 56917912572d387cbc1522069ac594abd6c7f9a4 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 15 Oct 2012 17:34:43 +0100 Subject: merge slave 'confirm' decision making and action similar to what we did in rabbit_amqqueue_process --- src/rabbit_mirror_queue_slave.erl | 52 ++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6d7bc304..2314ffea 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -387,14 +387,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -needs_confirming(_, #delivery{ msg_seq_no = undefined }, _State) -> - never; -needs_confirming(published, #delivery { message = #basic_message { - is_persistent = true } }, - #state { q = #amqqueue { durable = true } }) -> - eventually; -needs_confirming(_Status, _Delivery, _State) -> - immediately. +send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) -> + MS; +send_or_record_confirm(published, #delivery { sender = ChPid, + msg_seq_no = MsgSeqNo, + message = #basic_message { + id = MsgId, + is_persistent = true } }, + MS, #state { q = #amqqueue { durable = true } }) -> + dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); +send_or_record_confirm(_Status, #delivery { sender = ChPid, + msg_seq_no = MsgSeqNo }, + MS, _State) -> + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), + MS. confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {CMs, MS1} = @@ -621,9 +627,8 @@ confirm_sender_death(Pid) -> ok. maybe_enqueue_message( - Delivery = #delivery { message = #basic_message { id = MsgId }, - msg_seq_no = MsgSeqNo, - sender = ChPid }, + Delivery = #delivery { message = #basic_message { id = MsgId }, + sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. @@ -634,14 +639,8 @@ maybe_enqueue_message( SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; {ok, Status} -> - MS1 = case needs_confirming(Status, Delivery, State1) of - never -> dict:erase(MsgId, MS); - eventually -> MMS = {Status, ChPid, MsgSeqNo}, - dict:store(MsgId, MMS, MS); - immediately -> ok = rabbit_misc:confirm_to_sender( - ChPid, [MsgSeqNo]), - dict:erase(MsgId, MS) - end, + MS1 = send_or_record_confirm( + Status, Delivery, dict:erase(MsgId, MS), State1), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = MS1, sender_queues = SQ1 } @@ -662,8 +661,7 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> end. publish_or_discard(Status, ChPid, MsgId, - State = #state { sender_queues = SQ, - msg_id_status = MS }) -> + State = #state { sender_queues = SQ, msg_id_status = MS }) -> %% We really are going to do the publish/discard right now, even %% though we may not have seen it directly from the channel. But %% we cannot issues confirms until the latter has happened. So we @@ -677,19 +675,11 @@ publish_or_discard(Status, ChPid, MsgId, {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, Status, MS)}; {{value, Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }}, MQ2} -> + message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, %% We received the msg from the channel first. Thus %% we need to deal with confirms here. - case needs_confirming(Status, Delivery, State1) of - never -> MS; - eventually -> MMS = {Status, ChPid, MsgSeqNo}, - dict:store(MsgId, MMS , MS); - immediately -> ok = rabbit_misc:confirm_to_sender( - ChPid, [MsgSeqNo]), - MS - end}; + send_or_record_confirm(Status, Delivery, MS, State1)}; {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} -- cgit v1.2.1