diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-15 16:13:50 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-15 16:13:50 +0100 |
commit | 58c1346b1ddede2ea88c0b0e1623c145faa0f319 (patch) | |
tree | e47c7800f78950a64e31adb706471049a0e2d9c4 | |
parent | 93129443584f3e310613e1d752be2cea0a10a013 (diff) | |
download | rabbitmq-server-58c1346b1ddede2ea88c0b0e1623c145faa0f319.tar.gz |
refactor: extract commonality between 'publish' and 'discard'
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 103 |
1 files changed, 30 insertions, 73 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b1b27c7e..aea3b54e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -387,13 +387,13 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> +needs_confirming(_, #delivery{ msg_seq_no = undefined }, _State) -> never; -needs_confirming(#delivery { message = #basic_message { - is_persistent = true } }, +needs_confirming(published, #delivery { message = #basic_message { + is_persistent = true } }, #state { q = #amqqueue { durable = true } }) -> eventually; -needs_confirming(_Delivery, _State) -> +needs_confirming(_Status, _Delivery, _State) -> immediately. confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> @@ -638,10 +638,10 @@ maybe_enqueue_message( SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 }; - {ok, published} -> - MS1 = case needs_confirming(Delivery, State1) of + {ok, Status} when Status =:= published orelse Status =:= discarded -> + MS1 = case needs_confirming(Status, Delivery, State1) of never -> dict:erase(MsgId, MS); - eventually -> MMS = {published, ChPid, MsgSeqNo}, + eventually -> MMS = {Status, ChPid, MsgSeqNo}, dict:store(MsgId, MMS, MS); immediately -> ok = rabbit_misc:confirm_to_sender( ChPid, [MsgSeqNo]), @@ -649,16 +649,6 @@ maybe_enqueue_message( end, SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = MS1, - sender_queues = SQ1 }; - {ok, discarded} -> - %% We've already heard from GM that the msg is to be - %% discarded. We won't see this again. - case needs_confirming(Delivery, State1) of - never -> ok; - _ -> ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]) - end, - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } end. @@ -676,34 +666,30 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) end. -process_instruction( - {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, - State = #state { sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - msg_id_status = MS }) -> - - %% We really are going to do the publish right now, even though we - %% may not have seen it directly from the channel. But we cannot - %% issues confirms until the latter has happened. So we need to - %% keep track of the MsgId and its confirmation status in the - %% meantime. +publish_or_discard(Status, ChPid, MsgId, + State = #state { sender_queues = SQ, + msg_id_status = MS }) -> + %% We really are going to do the publish/discard right now, even + %% though we may not have seen it directly from the channel. But + %% we cannot issues confirms until the latter has happened. So we + %% need to keep track of the MsgId and its confirmation status in + %% the meantime. State1 = ensure_monitoring(ChPid, State), {MQ, PendingCh} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, published, MS)}; + dict:store(MsgId, Status, MS)}; {{value, Delivery = #delivery { msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, %% We received the msg from the channel first. Thus %% we need to deal with confirms here. - case needs_confirming(Delivery, State1) of + case needs_confirming(Status, Delivery, State1) of never -> MS; - eventually -> MMS = {published, ChPid, MsgSeqNo}, + eventually -> MMS = {Status, ChPid, MsgSeqNo}, dict:store(MsgId, MMS , MS); immediately -> ok = rabbit_misc:confirm_to_sender( ChPid, [MsgSeqNo]), @@ -717,60 +703,31 @@ process_instruction( %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), - State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, + State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. + +process_instruction({publish, Deliver, ChPid, MsgProps, + Msg = #basic_message { id = MsgId }}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(published, ChPid, MsgId, State), {ok, case Deliver of false -> BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State2 #state { backing_queue_state = BQS1 }; + State1 #state { backing_queue_state = BQS1 }; {true, AckRequired} -> {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, - State2 #state { backing_queue_state = BQS1 }) + State1 #state { backing_queue_state = BQS1 }) end}; process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, - State = #state { sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - msg_id_status = MS }) -> - %% Many of the comments around the publish head above apply here - %% too. - State1 = ensure_monitoring(ChPid, State), - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - {MQ1, PendingCh1, MS1} = - case queue:out(MQ) of - {empty, _MQ} -> - {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, discarded, MS)}; - {{value, Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }}, MQ2} -> - %% We received the msg from the channel first. Thus - %% we need to deal with confirms here. - case needs_confirming(Delivery, State1) of - never -> ok; - _ -> ok = rabbit_misc:confirm_to_sender( - ChPid, [MsgSeqNo]) - end, - %% We've already seen it from the channel, we're not - %% going to see this again, so don't add it to MS - {MQ2, PendingCh, MS}; - {{value, #delivery {}}, _MQ2} -> - %% The instruction was sent to us before we were - %% within the slave_pids within the #amqqueue{} - %% record. We'll never receive the message directly - %% from the channel. - {MQ, PendingCh, MS} - end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(discarded, ChPid, MsgId, State), BQS1 = BQ:discard(Msg, ChPid, BQS), - {ok, State1 #state { sender_queues = SQ1, - msg_id_status = MS1, - backing_queue_state = BQS1 }}; + {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> |