summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-12 20:56:41 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-12 20:56:41 +0100
commit88a9592e06dd8721bb3bdf4ea5c47a0f1238669e (patch)
treea731c6bb3c93f2b695e779f622a3550877adb59e
parent1586c9c0528d410848b38bc3c35453357d49f4c9 (diff)
downloadrabbitmq-server-88a9592e06dd8721bb3bdf4ea5c47a0f1238669e.tar.gz
refactor: simplify confirm handling in queue
three functions into one
-rw-r--r--src/rabbit_amqqueue_process.erl50
1 files changed, 19 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a744cde0..f1821b5a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -496,32 +496,21 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
- never;
-should_confirm_message(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
+ {never, State};
+send_or_record_confirm(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- #q{q = #amqqueue{durable = true}}) ->
- {eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(#delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo},
- _State) ->
- {immediately, SenderPid, MsgSeqNo}.
-
-needs_confirming({eventually, _, _, _}) -> true;
-needs_confirming(_) -> false.
-
-maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
- State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel =
- gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
-maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ State = #q{q = #amqqueue{durable = true},
+ msg_id_to_channel = MTC}) ->
+ MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
+ {eventually, State#q{msg_id_to_channel = MTC1}};
+send_or_record_confirm(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo}, State) ->
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- State;
-maybe_record_confirm_message(_Confirm, State) ->
- State.
+ {immediately, State}.
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
@@ -551,23 +540,22 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
- Confirm = should_confirm_message(Delivery, State),
+ {Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props,
- maybe_record_confirm_message(Confirm, State)) of
- {true, State1} ->
- State1;
+ case attempt_delivery(Delivery, Props, State1) of
+ {true, State2} ->
+ State2;
%% the next one is an optimisations
%% TODO: optimise the Confirm =/= never case too
- {false, State1 = #q{ttl = 0, dlx = undefined,
+ {false, State2 = #q{ttl = 0, dlx = undefined,
backing_queue = BQ, backing_queue_state = BQS}}
when Confirm == never ->
BQS1 = BQ:discard(Message, SenderPid, BQS),
- State1#q{backing_queue_state = BQS1};
- {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ State2#q{backing_queue_state = BQS1};
+ {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
- State1#q{backing_queue_state = BQS1})
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
@@ -686,7 +674,7 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm),
+ needs_confirming = Confirm == eventually,
delivered = Delivered}.
calculate_msg_expiry(undefined) -> undefined;