summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-20 23:12:42 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-20 23:12:42 +0000
commit34290816e2bdfe4205bda82c6ae95aafef319d65 (patch)
tree0545c8eec06a221541ef6b0bffd4f2cb97367d8e
parente2645b89e948168c3c2d9595ec7f7097ad0fb3d0 (diff)
downloadrabbitmq-server-bug23626.tar.gz
move channel lookups into handle_ methodsbug23626
This avoids double lookup in some paths and unnecessary invocations of run_message_queue. Also inline recorde_current_channel_tx, which eliminates some lookup. And allow rollback to trigger the forgetting of a channel record.
-rw-r--r--src/rabbit_amqqueue_process.erl72
1 files changed, 29 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 48192dcb..0346ec7d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -332,11 +332,6 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
-record_current_channel_tx(ChPid, Txn) ->
- %% as a side effect this also starts monitoring the channel (if
- %% that wasn't happening already)
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
@@ -495,7 +490,7 @@ attempt_delivery(#delivery{txn = Txn,
{NeedsConfirming,
State = #q{backing_queue = BQ,
backing_queue_state = BQS}}) ->
- record_current_channel_tx(ChPid, Txn),
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
{true,
NeedsConfirming,
State#q{backing_queue_state =
@@ -591,7 +586,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
true -> {stop, State1};
false -> State2 = case Txn of
none -> State1;
- _ -> rollback_transaction(Txn, ChPid,
+ _ -> rollback_transaction(Txn, C,
State1)
end,
{ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -627,39 +622,24 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
run_message_queue(
confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
-commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL}) ->
- %% ChPid must be known here because of the participant management
- %% by the channel. However, in a cluster, the DOWN can overtake
- %% the commit, and so there is a case where handle_ch_down has
- %% already been called for ChPid.
- case lookup_ch(ChPid) of
- not_found ->
- gen_server2:reply(From, ok),
- State;
- C = #cr{acktags = ChAckTags} ->
- {AckTags, BQS1} = BQ:tx_commit(
- Txn, fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(TTL), BQS),
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
- State#q{backing_queue_state = BQS1}
- end.
-
-rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case lookup_ch(ChPid) of
- not_found ->
- State;
- #cr{} ->
- {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
- %% Iff we removed acktags from the channel record on
- %% ack+txn then we would add them back in here (would also
- %% require ChPid)
- record_current_channel_tx(ChPid, none),
- State#q{backing_queue_state = BQS1}
- end.
+commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL}) ->
+ {AckTags, BQS1} = BQ:tx_commit(
+ Txn, fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(TTL), BQS),
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
+ State#q{backing_queue_state = BQS1}.
+
+rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ %% Iff we removed acktags from the channel record on ack+txn then
+ %% we would add them back in here.
+ maybe_store_ch_record(C#cr{txn = none}),
+ State#q{backing_queue_state = BQS1}.
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
@@ -860,8 +840,11 @@ handle_call({deliver, Delivery}, From, State) ->
noreply(NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
- NewState = commit_transaction(Txn, From, ChPid, State),
- noreply(run_message_queue(NewState));
+ case lookup_ch(ChPid) of
+ not_found -> reply(ok, State);
+ C -> noreply(run_message_queue(
+ commit_transaction(Txn, From, C, State)))
+ end;
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1060,7 +1043,10 @@ handle_cast({reject, AckTags, Requeue, ChPid},
end;
handle_cast({rollback, Txn, ChPid}, State) ->
- noreply(rollback_transaction(Txn, ChPid, State));
+ noreply(case lookup_ch(ChPid) of
+ not_found -> State;
+ C -> rollback_transaction(Txn, C, State)
+ end);
handle_cast(delete_immediately, State) ->
{stop, normal, State};