diff options
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 71 |
1 files changed, 31 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ef43d96e..964c3e24 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -80,14 +80,12 @@ synchronised }). -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -info(QPid) -> - gen_server2:call(QPid, info, infinity). +info(QPid) -> gen_server2:call(QPid, info, infinity). init(#amqqueue { name = QueueName } = Q) -> %% We join the GM group before we add ourselves to the amqqueue @@ -351,14 +349,10 @@ prioritise_info(Msg, _State) -> %% GM %% --------------------------------------------------------------------------- -joined([SPid], _Members) -> - SPid ! {joined, self()}, - ok. +joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, []) -> - ok; -members_changed([SPid], _Births, Deaths) -> - inform_deaths(SPid, Deaths). +members_changed([_SPid], _Births, []) -> ok; +members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). handle_msg([_SPid], _From, master_changed) -> ok; @@ -675,26 +669,24 @@ maybe_enqueue_message( %% msg_seq_no was at the time. We do now! ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { sender_queues = SQ1, - msg_id_status = dict:erase(MsgId, MS) }; + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 }; {ok, {published, ChPid}} -> %% It was published to the BQ and we didn't know the %% msg_seq_no so couldn't confirm it at the time. - case needs_confirming(Delivery, State1) of - never -> - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 }; - eventually -> - State1 #state { - msg_id_status = - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 } - end; + {MS1, SQ1} = + case needs_confirming(Delivery, State1) of + never -> {dict:erase(MsgId, MS), + remove_from_pending_ch(MsgId, ChPid, SQ)}; + eventually -> MMS = {published, ChPid, MsgSeqNo}, + {dict:store(MsgId, MMS, MS), SQ}; + immediately -> ok = rabbit_misc:confirm_to_sender( + ChPid, [MsgSeqNo]), + {dict:erase(MsgId, MS), + remove_from_pending_ch(MsgId, ChPid, SQ)} + end, + State1 #state { msg_id_status = MS1, + sender_queues = SQ1 }; {ok, discarded} -> %% We've already heard from GM that the msg is to be %% discarded. We won't see this again. @@ -743,18 +735,17 @@ process_instruction( msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId } }, _EnqueueOnPromotion}}, MQ2} -> - %% We received the msg from the channel first. Thus we - %% need to deal with confirms here. - case needs_confirming(Delivery, State1) of - never -> - {MQ2, PendingCh, MS}; - eventually -> - {MQ2, PendingCh, - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - {MQ2, PendingCh, MS} - end; + {MQ2, PendingCh, + %% We received the msg from the channel first. Thus + %% we need to deal with confirms here. + case needs_confirming(Delivery, State1) of + never -> MS; + eventually -> MMS = {published, ChPid, MsgSeqNo}, + dict:store(MsgId, MMS , MS); + immediately -> ok = rabbit_misc:confirm_to_sender( + ChPid, [MsgSeqNo]), + MS + end}; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} |