diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-02 22:49:15 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-02 22:49:15 +0100 |
commit | 5beede6bc8fdaec706f23db17e226b91f5180443 (patch) | |
tree | aad025f1f24387a54e7c490bf87d8066a28cbf4b | |
parent | 3d63f46b4f94c777a76e62fe951e4b2c96d0c24e (diff) | |
download | rabbitmq-server-5beede6bc8fdaec706f23db17e226b91f5180443.tar.gz |
refactor queue's confirm handling on publish
I think it's a bit clearer this way
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 58 |
1 files changed, 28 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c1e4573..0f46f51e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -493,6 +493,11 @@ should_confirm_message(_Delivery, _State) -> needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. +confirm_if(false, _Delivery) -> + ok; +confirm_if(true, #delivery{sender = SenderPid, msg_seq_no = MsgSeqNo}) -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]). + maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> State#q{msg_id_to_channel = @@ -509,44 +514,37 @@ run_message_queue(State) -> State2. attempt_delivery(Delivery = #delivery{sender = SenderPid, - message = Message, - msg_seq_no = MsgSeqNo}, + message = Message}, + Confirm, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Confirm = should_confirm_message(Delivery, State), - case Confirm of - immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); - _ -> ok - end, + confirm_if(Confirm == immediately, Delivery), case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - DeliverFun = - fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), - {AckTag, BQS3} = BQ:publish_delivered( - AckRequired, Message, Props, - SenderPid, BQS2), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS3}} - end, - {Delivered, State2} = - deliver_msgs_to_consumers(DeliverFun, false, - State#q{backing_queue_state = BQS1}), - {Delivered, Confirm, State2}; + deliver_msgs_to_consumers( + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> + Props = message_properties(Confirm, State1), + {AckTag, BQS3} = BQ:publish_delivered( + AckRequired, Message, Props, + SenderPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> %% if the message has previously been seen by the BQ then %% it must have been seen under the same circumstances as %% now: i.e. if it is now a deliver_immediately then it %% must have been before. - Delivered = case Duplicate of - published -> true; - discarded -> false - end, - {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + {case Duplicate of + published -> true; + discarded -> false + end, + State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, State) -> - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + Confirm = should_confirm_message(Delivery, State), + {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), case Delivered of @@ -1049,7 +1047,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + Confirm = should_confirm_message(Delivery, State), + {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); false -> discard_delivery(Delivery, State1) @@ -1199,8 +1198,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. case Flow of @@ -1215,7 +1213,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, case already_been_here(Delivery, State) of false -> noreply(deliver_or_enqueue(Delivery, State)); Qs -> log_cycle_once(Qs), - rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), + confirm_if(true, Delivery), noreply(State) end; |