summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-04-02 22:49:15 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-04-02 22:49:15 +0100
commit5beede6bc8fdaec706f23db17e226b91f5180443 (patch)
treeaad025f1f24387a54e7c490bf87d8066a28cbf4b
parent3d63f46b4f94c777a76e62fe951e4b2c96d0c24e (diff)
downloadrabbitmq-server-5beede6bc8fdaec706f23db17e226b91f5180443.tar.gz
refactor queue's confirm handling on publish
I think it's a bit clearer this way
-rw-r--r--src/rabbit_amqqueue_process.erl58
1 files changed, 28 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c1e4573..0f46f51e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -493,6 +493,11 @@ should_confirm_message(_Delivery, _State) ->
needs_confirming({eventually, _, _, _}) -> true;
needs_confirming(_) -> false.
+confirm_if(false, _Delivery) ->
+ ok;
+confirm_if(true, #delivery{sender = SenderPid, msg_seq_no = MsgSeqNo}) ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]).
+
maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
State#q{msg_id_to_channel =
@@ -509,44 +514,37 @@ run_message_queue(State) ->
State2.
attempt_delivery(Delivery = #delivery{sender = SenderPid,
- message = Message,
- msg_seq_no = MsgSeqNo},
+ message = Message},
+ Confirm,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- Confirm = should_confirm_message(Delivery, State),
- case Confirm of
- immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]);
- _ -> ok
- end,
+ confirm_if(Confirm == immediately, Delivery),
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- DeliverFun =
- fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
- {AckTag, BQS3} = BQ:publish_delivered(
- AckRequired, Message, Props,
- SenderPid, BQS2),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS3}}
- end,
- {Delivered, State2} =
- deliver_msgs_to_consumers(DeliverFun, false,
- State#q{backing_queue_state = BQS1}),
- {Delivered, Confirm, State2};
+ deliver_msgs_to_consumers(
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
+ Props = message_properties(Confirm, State1),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ AckRequired, Message, Props,
+ SenderPid, BQS2),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS3}}
+ end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
%% if the message has previously been seen by the BQ then
%% it must have been seen under the same circumstances as
%% now: i.e. if it is now a deliver_immediately then it
%% must have been before.
- Delivered = case Duplicate of
- published -> true;
- discarded -> false
- end,
- {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
+ {case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ State#q{backing_queue_state = BQS1}}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid}, State) ->
- {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ Confirm = should_confirm_message(Delivery, State),
+ {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
case Delivered of
@@ -1049,7 +1047,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ Confirm = should_confirm_message(Delivery, State),
+ {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
reply(Delivered, case Delivered of
true -> maybe_record_confirm_message(Confirm, State1);
false -> discard_delivery(Delivery, State1)
@@ -1199,8 +1198,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo}, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
case Flow of
@@ -1215,7 +1213,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
case already_been_here(Delivery, State) of
false -> noreply(deliver_or_enqueue(Delivery, State));
Qs -> log_cycle_once(Qs),
- rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
+ confirm_if(true, Delivery),
noreply(State)
end;