summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-16 13:36:31 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-16 13:36:31 +0000
commit7556d6ae1e71ffc07dab7666216e94bbd91c1dec (patch)
tree5339db8ca5ad6253db1a27473d36a0918e77a339
parent157d3f401c729df060b32327385211b27a0e0105 (diff)
downloadrabbitmq-server-7556d6ae1e71ffc07dab7666216e94bbd91c1dec.tar.gz
refactorings
-rw-r--r--src/rabbit_amqqueue_process.erl56
1 files changed, 23 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 96352c13..4ebdb7a3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -454,11 +454,10 @@ should_confirm_message(_Delivery, _State) ->
needs_confirming({eventually, _, _, _}) -> true;
needs_confirming(_) -> false.
-record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
- State = #q{msg_id_to_channel = MTC}) ->
+maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
+ State = #q{msg_id_to_channel = MTC}) ->
State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)};
-record_confirm_message(Confirm, State)
- when Confirm =:= immediately orelse Confirm =:= never ->
+maybe_record_confirm_message(_Confirm, State) ->
State.
run_message_queue(State) ->
@@ -473,9 +472,9 @@ run_message_queue(State) ->
attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
- msg_seq_no = MsgSeqNo},
- Confirm,
+ msg_seq_no = MsgSeqNo} = Delivery,
State = #q{backing_queue = BQ}) ->
+ Confirm = should_confirm_message(Delivery, State),
case Confirm of
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
@@ -500,28 +499,26 @@ attempt_delivery(#delivery{txn = none,
{Delivered, Confirm, State1};
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
- message = Message},
- Confirm,
+ message = Message} = Delivery,
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
- {true, Confirm, State#q{backing_queue_state = BQS1}}.
-
-deliver_or_enqueue(Delivery, State) ->
- case attempt_delivery(Delivery,
- should_confirm_message(Delivery, State), State) of
- {true, Confirm, State1} ->
- record_confirm_message(Confirm, State1);
- {false, Confirm, State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}} ->
- #delivery{message = Message} = Delivery,
- BQS1 = BQ:publish(Message,
- (message_properties(State)) #message_properties{
- needs_confirming = needs_confirming(Confirm)},
- BQS),
- State2 = record_confirm_message(Confirm, State1),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ {true, should_confirm_message(Delivery, State),
+ State#q{backing_queue_state = BQS1}}.
+
+deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
+ {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ case Delivered of
+ true -> State2;
+ false -> BQS1 =
+ BQ:publish(Message,
+ (message_properties(State)) #message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
@@ -840,15 +837,8 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, Confirm, State1} =
- attempt_delivery(Delivery,
- should_confirm_message(Delivery, State),
- State),
- State2 = case Delivered andalso needs_confirming(Confirm) of
- true -> record_confirm_message(Confirm, State);
- false -> State1
- end,
- reply(Delivered, State2);
+ {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ reply(Delivered, maybe_record_confirm_message(Confirm, State1));
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.