summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 14:36:28 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 14:36:28 +0100
commitace12dbca873ed44e86ec86ffd4d13d64bf3ad92 (patch)
tree846b6c8c3949bc3d28ba6bf7210cf968ad8d8595 /src/rabbit_amqqueue_process.erl
parent283b74d2fe78d042765f4b27602a8800f36ee069 (diff)
downloadrabbitmq-server-ace12dbca873ed44e86ec86ffd4d13d64bf3ad92.tar.gz
refactoring
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl73
1 files changed, 34 insertions, 39 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0fc7ee35..62ac6992 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -346,7 +346,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{State2, ChAckTags1} =
case AckRequired of
true -> {State1, sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message_internal(
+ false -> {confirm_message(
Message#basic_message.guid,
State1), ChAckTags}
end,
@@ -399,12 +399,12 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
-confirm_messages_internal(Guids, State) when is_list(Guids) ->
+confirm_messages(Guids, State) when is_list(Guids) ->
lists:foldl(fun(Guid, State0) ->
- confirm_message_internal(Guid, State0)
+ confirm_message(Guid, State0)
end, State, Guids).
-confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) ->
+confirm_message(Guid, State = #q{guid_to_channel = GTC}) ->
case dict:find(Guid, GTC) of
{ok, {_ , undefined}} -> ok;
{ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
@@ -412,13 +412,21 @@ confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) ->
end,
State #q { guid_to_channel = dict:erase(Guid, GTC) }.
-maybe_record_confirm_message(#delivery{msg_seq_no = undefined }, State) ->
+record_confirm_message(#delivery{msg_seq_no = undefined }, State) ->
State;
-maybe_record_confirm_message(#delivery{sender = ChPid,
- message = #basic_message{guid = Guid},
- msg_seq_no = MsgSeqNo}, State) ->
- State #q { guid_to_channel =
- dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }.
+record_confirm_message(#delivery{sender = ChPid,
+ message = #basic_message{guid = Guid},
+ msg_seq_no = MsgSeqNo},
+ State = #q{guid_to_channel = GTC}) ->
+ State #q { guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC) }.
+
+ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ AckdGuids = BQ:seqids_to_guids(AckTags, BQS),
+ BQS1 = BQ:ack(AckTags, BQS),
+ confirm_messages(
+ AckdGuids,
+ State #q { backing_queue_state = BQS1 }).
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -449,18 +457,17 @@ attempt_delivery(#delivery{txn = Txn,
record_current_channel_tx(ChPid, Txn),
{true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
-deliver_or_enqueue(Delivery = #delivery{txn = Txn,
- sender = ChPid,
- message = Message,
+deliver_or_enqueue(Delivery = #delivery{message = Message,
msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ}) ->
- case attempt_delivery(Delivery, State) of
+ State1 = record_confirm_message(Delivery, State),
+ case attempt_delivery(Delivery, State1) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
BQS = BQ:publish(Message, MsgSeqNo =/= undefined,
- State #q.backing_queue_state),
+ State1 #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
@@ -562,8 +569,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
case Fun(BQS) of
{BQS1, {confirm, Guids}} ->
run_message_queue(
- confirm_messages_internal(Guids,
- State #q { backing_queue_state = BQS1 }));
+ confirm_messages(Guids, State #q { backing_queue_state = BQS1 }));
BQS1 ->
run_message_queue(State#q{backing_queue_state = BQS1})
end.
@@ -714,15 +720,15 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- State1 = maybe_record_confirm_message(Delivery, State),
- {Delivered, State2} = attempt_delivery(Delivery, State1),
+ {Delivered, State1} = attempt_delivery(Delivery, State),
+ State2 = confirm_message(Delivery#delivery.message#basic_message.guid,
+ record_confirm_message(Delivery, State1)),
reply(Delivered, State2);
handle_call({deliver, Delivery}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- State1 = maybe_record_confirm_message(Delivery, State),
- {Delivered, State2} = deliver_or_enqueue(Delivery, State1),
- reply(Delivered, State2);
+ {Delivered, State1} = deliver_or_enqueue(Delivery, State),
+ reply(Delivered, State1);
handle_call({commit, Txn, ChPid}, From, State) ->
NewState = commit_transaction(Txn, From, ChPid, State),
@@ -754,8 +760,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
C#cr{acktags = sets:add_element(AckTag, ChAckTags)}),
State1;
false ->
- confirm_message_internal(Message#basic_message.guid,
- State1)
+ confirm_message(Message#basic_message.guid, State1)
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1})
@@ -880,9 +885,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- State1 = maybe_record_confirm_message(Delivery, State),
- {_Delivered, State2} = deliver_or_enqueue(Delivery, State1),
- noreply(State2);
+ {_Delivered, State1} = deliver_or_enqueue(Delivery, State),
+ noreply(State1);
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -895,11 +899,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
none ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- AckdGuids = BQ:seqids_to_guids(AckTags, BQS),
- NewBQS = BQ:ack(AckTags, BQS),
- NewState = confirm_messages_internal(
- AckdGuids,
- State #q { backing_queue_state = NewBQS }),
+ NewState = ack_by_acktags(AckTags, State),
{NewC, NewState};
_ ->
{C#cr{txn = Txn},
@@ -909,8 +909,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -919,11 +918,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> AckdGuids = BQ:seqids_to_guids(AckTags, BQS),
- BQS1 = BQ:ack(AckTags, BQS),
- confirm_messages_internal(
- AckdGuids,
- State #q { backing_queue_state = BQS1 })
+ false -> ack_by_acktags(AckTags, State)
end)
end;