diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 14:36:28 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 14:36:28 +0100 |
commit | ace12dbca873ed44e86ec86ffd4d13d64bf3ad92 (patch) | |
tree | 846b6c8c3949bc3d28ba6bf7210cf968ad8d8595 /src/rabbit_amqqueue_process.erl | |
parent | 283b74d2fe78d042765f4b27602a8800f36ee069 (diff) | |
download | rabbitmq-server-ace12dbca873ed44e86ec86ffd4d13d64bf3ad92.tar.gz |
refactoring
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 73 |
1 files changed, 34 insertions, 39 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0fc7ee35..62ac6992 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -346,7 +346,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {State2, ChAckTags1} = case AckRequired of true -> {State1, sets:add_element(AckTag, ChAckTags)}; - false -> {confirm_message_internal( + false -> {confirm_message( Message#basic_message.guid, State1), ChAckTags} end, @@ -399,12 +399,12 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. -confirm_messages_internal(Guids, State) when is_list(Guids) -> +confirm_messages(Guids, State) when is_list(Guids) -> lists:foldl(fun(Guid, State0) -> - confirm_message_internal(Guid, State0) + confirm_message(Guid, State0) end, State, Guids). -confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) -> +confirm_message(Guid, State = #q{guid_to_channel = GTC}) -> case dict:find(Guid, GTC) of {ok, {_ , undefined}} -> ok; {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); @@ -412,13 +412,21 @@ confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) -> end, State #q { guid_to_channel = dict:erase(Guid, GTC) }. -maybe_record_confirm_message(#delivery{msg_seq_no = undefined }, State) -> +record_confirm_message(#delivery{msg_seq_no = undefined }, State) -> State; -maybe_record_confirm_message(#delivery{sender = ChPid, - message = #basic_message{guid = Guid}, - msg_seq_no = MsgSeqNo}, State) -> - State #q { guid_to_channel = - dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }. +record_confirm_message(#delivery{sender = ChPid, + message = #basic_message{guid = Guid}, + msg_seq_no = MsgSeqNo}, + State = #q{guid_to_channel = GTC}) -> + State #q { guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC) }. + +ack_by_acktags(AckTags, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + AckdGuids = BQ:seqids_to_guids(AckTags, BQS), + BQS1 = BQ:ack(AckTags, BQS), + confirm_messages( + AckdGuids, + State #q { backing_queue_state = BQS1 }). run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, @@ -449,18 +457,17 @@ attempt_delivery(#delivery{txn = Txn, record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Delivery = #delivery{txn = Txn, - sender = ChPid, - message = Message, +deliver_or_enqueue(Delivery = #delivery{message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Delivery, State) of + State1 = record_confirm_message(Delivery, State), + case attempt_delivery(Delivery, State1) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers BQS = BQ:publish(Message, MsgSeqNo =/= undefined, - State #q.backing_queue_state), + State1 #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -562,8 +569,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> case Fun(BQS) of {BQS1, {confirm, Guids}} -> run_message_queue( - confirm_messages_internal(Guids, - State #q { backing_queue_state = BQS1 })); + confirm_messages(Guids, State #q { backing_queue_state = BQS1 })); BQS1 -> run_message_queue(State#q{backing_queue_state = BQS1}) end. @@ -714,15 +720,15 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - State1 = maybe_record_confirm_message(Delivery, State), - {Delivered, State2} = attempt_delivery(Delivery, State1), + {Delivered, State1} = attempt_delivery(Delivery, State), + State2 = confirm_message(Delivery#delivery.message#basic_message.guid, + record_confirm_message(Delivery, State1)), reply(Delivered, State2); handle_call({deliver, Delivery}, _From, State) -> %% Synchronous, "mandatory" delivery mode - State1 = maybe_record_confirm_message(Delivery, State), - {Delivered, State2} = deliver_or_enqueue(Delivery, State1), - reply(Delivered, State2); + {Delivered, State1} = deliver_or_enqueue(Delivery, State), + reply(Delivered, State1); handle_call({commit, Txn, ChPid}, From, State) -> NewState = commit_transaction(Txn, From, ChPid, State), @@ -754,8 +760,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, C#cr{acktags = sets:add_element(AckTag, ChAckTags)}), State1; false -> - confirm_message_internal(Message#basic_message.guid, - State1) + confirm_message(Message#basic_message.guid, State1) end, Msg = {QName, self(), AckTag, IsDelivered, Message}, reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1}) @@ -880,9 +885,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - State1 = maybe_record_confirm_message(Delivery, State), - {_Delivered, State2} = deliver_or_enqueue(Delivery, State1), - noreply(State2); + {_Delivered, State1} = deliver_or_enqueue(Delivery, State), + noreply(State1); handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -895,11 +899,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - AckdGuids = BQ:seqids_to_guids(AckTags, BQS), - NewBQS = BQ:ack(AckTags, BQS), - NewState = confirm_messages_internal( - AckdGuids, - State #q { backing_queue_state = NewBQS }), + NewState = ack_by_acktags(AckTags, State), {NewC, NewState}; _ -> {C#cr{txn = Txn}, @@ -909,8 +909,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -919,11 +918,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> AckdGuids = BQ:seqids_to_guids(AckTags, BQS), - BQS1 = BQ:ack(AckTags, BQS), - confirm_messages_internal( - AckdGuids, - State #q { backing_queue_state = BQS1 }) + false -> ack_by_acktags(AckTags, State) end) end; |