diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-12 20:56:41 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-12 20:56:41 +0100 |
commit | 88a9592e06dd8721bb3bdf4ea5c47a0f1238669e (patch) | |
tree | a731c6bb3c93f2b695e779f622a3550877adb59e | |
parent | 1586c9c0528d410848b38bc3c35453357d49f4c9 (diff) | |
download | rabbitmq-server-88a9592e06dd8721bb3bdf4ea5c47a0f1238669e.tar.gz |
refactor: simplify confirm handling in queue
three functions into one
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 |
1 files changed, 19 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a744cde0..f1821b5a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -496,32 +496,21 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. -should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> - never; -should_confirm_message(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) -> + {never, State}; +send_or_record_confirm(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, - #q{q = #amqqueue{durable = true}}) -> - {eventually, SenderPid, MsgSeqNo, MsgId}; -should_confirm_message(#delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo}, - _State) -> - {immediately, SenderPid, MsgSeqNo}. - -needs_confirming({eventually, _, _, _}) -> true; -needs_confirming(_) -> false. - -maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, - State = #q{msg_id_to_channel = MTC}) -> - State#q{msg_id_to_channel = - gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; -maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) -> + State = #q{q = #amqqueue{durable = true}, + msg_id_to_channel = MTC}) -> + MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), + {eventually, State#q{msg_id_to_channel = MTC1}}; +send_or_record_confirm(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo}, State) -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), - State; -maybe_record_confirm_message(_Confirm, State) -> - State. + {immediately, State}. run_message_queue(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = @@ -551,23 +540,22 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State) -> - Confirm = should_confirm_message(Delivery, State), + {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Confirm, Delivered, State), - case attempt_delivery(Delivery, Props, - maybe_record_confirm_message(Confirm, State)) of - {true, State1} -> - State1; + case attempt_delivery(Delivery, Props, State1) of + {true, State2} -> + State2; %% the next one is an optimisations %% TODO: optimise the Confirm =/= never case too - {false, State1 = #q{ttl = 0, dlx = undefined, + {false, State2 = #q{ttl = 0, dlx = undefined, backing_queue = BQ, backing_queue_state = BQS}} when Confirm == never -> BQS1 = BQ:discard(Message, SenderPid, BQS), - State1#q{backing_queue_state = BQS1}; - {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + State2#q{backing_queue_state = BQS1}; + {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, - State1#q{backing_queue_state = BQS1}) + State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, @@ -686,7 +674,7 @@ subtract_acks(ChPid, AckTags, State, Fun) -> message_properties(Confirm, Delivered, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL), - needs_confirming = needs_confirming(Confirm), + needs_confirming = Confirm == eventually, delivered = Delivered}. calculate_msg_expiry(undefined) -> undefined; |