From 34290816e2bdfe4205bda82c6ae95aafef319d65 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 20 Jan 2011 23:12:42 +0000 Subject: move channel lookups into handle_ methods This avoids double lookup in some paths and unnecessary invocations of run_message_queue. Also inline recorde_current_channel_tx, which eliminates some lookup. And allow rollback to trigger the forgetting of a channel record. --- src/rabbit_amqqueue_process.erl | 72 +++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 43 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 48192dcb..0346ec7d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -332,11 +332,6 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. -record_current_channel_tx(ChPid, Txn) -> - %% as a side effect this also starts monitoring the channel (if - %% that wasn't happening already) - store_ch_record((ch_record(ChPid))#cr{txn = Txn}). - deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, @@ -495,7 +490,7 @@ attempt_delivery(#delivery{txn = Txn, {NeedsConfirming, State = #q{backing_queue = BQ, backing_queue_state = BQS}}) -> - record_current_channel_tx(ChPid, Txn), + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), {true, NeedsConfirming, State#q{backing_queue_state = @@ -591,7 +586,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> true -> {stop, State1}; false -> State2 = case Txn of none -> State1; - _ -> rollback_transaction(Txn, ChPid, + _ -> rollback_transaction(Txn, C, State1) end, {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -627,39 +622,24 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue( confirm_messages(Guids, State#q{backing_queue_state = BQS1})). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL}) -> - %% ChPid must be known here because of the participant management - %% by the channel. However, in a cluster, the DOWN can overtake - %% the commit, and so there is a case where handle_ch_down has - %% already been called for ChPid. - case lookup_ch(ChPid) of - not_found -> - gen_server2:reply(From, ok), - State; - C = #cr{acktags = ChAckTags} -> - {AckTags, BQS1} = BQ:tx_commit( - Txn, fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), BQS), - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), - State#q{backing_queue_state = BQS1} - end. - -rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - State; - #cr{} -> - {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - %% Iff we removed acktags from the channel record on - %% ack+txn then we would add them back in here (would also - %% require ChPid) - record_current_channel_tx(ChPid, none), - State#q{backing_queue_state = BQS1} - end. +commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> + {AckTags, BQS1} = BQ:tx_commit( + Txn, fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(TTL), BQS), + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + State#q{backing_queue_state = BQS1}. + +rollback_transaction(Txn, C, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + %% Iff we removed acktags from the channel record on ack+txn then + %% we would add them back in here. + maybe_store_ch_record(C#cr{txn = none}), + State#q{backing_queue_state = BQS1}. subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). @@ -860,8 +840,11 @@ handle_call({deliver, Delivery}, From, State) -> noreply(NewState); handle_call({commit, Txn, ChPid}, From, State) -> - NewState = commit_transaction(Txn, From, ChPid, State), - noreply(run_message_queue(NewState)); + case lookup_ch(ChPid) of + not_found -> reply(ok, State); + C -> noreply(run_message_queue( + commit_transaction(Txn, From, C, State))) + end; handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1060,7 +1043,10 @@ handle_cast({reject, AckTags, Requeue, ChPid}, end; handle_cast({rollback, Txn, ChPid}, State) -> - noreply(rollback_transaction(Txn, ChPid, State)); + noreply(case lookup_ch(ChPid) of + not_found -> State; + C -> rollback_transaction(Txn, C, State) + end); handle_cast(delete_immediately, State) -> {stop, normal, State}; -- cgit v1.2.1