diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-04-16 17:38:01 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-04-16 17:38:01 +0100 |
commit | 39fdd3039cf23a323df9136f94c4266cc602922f (patch) | |
tree | e934ebc9cf130f92d5f54dc11cdcc5ca06715d86 /src | |
parent | f48db4a0bbbe15ab62922b309a2ac89af3e9a2fb (diff) | |
download | rabbitmq-server-39fdd3039cf23a323df9136f94c4266cc602922f.tar.gz |
tweak tx API on amqqueue
commit_all and rollback_all take the channel pid as an additional
arg. This brings these functions in line with deliver and ack, which
also take both a txn and ch_pid.
In the queue process this saves us some book keeping.
Also, in the queue process we need to clear the txn field in the ch
record on commit/rollback, since otherwise a subsequent channel 'DOWN'
results in some suprising and unnecessary, though perfectly safe,
control flow.
Finally, there is no need to check that a commit relates to a channel
the queue process knows about - this is always guaranteed to be the
case.
All these changes were cherry-picked from the bug21673 branch and
bring default closer to that branch.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 38 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 4 |
3 files changed, 25 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 97b5ce46..f6278836 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,7 +41,7 @@ -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). +-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -91,8 +91,8 @@ -spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). --spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). @@ -288,16 +288,16 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). -commit_all(QPids, Txn) -> +commit_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). -rollback_all(QPids, Txn) -> +rollback_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end, QPids). notify_down_all(QPids, ChPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ba41f550..449e79ea 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -57,7 +57,7 @@ -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(tx, {is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary -record(cr, {consumer_count, @@ -431,8 +431,7 @@ do_if_persistent(F, Txn, QName) -> lookup_tx(Txn) -> case get({txn, Txn}) of - undefined -> #tx{ch_pid = none, - is_persistent = false, + undefined -> #tx{is_persistent = false, pending_messages = [], pending_acks = []}; V -> V @@ -461,26 +460,19 @@ is_tx_persistent(Txn) -> record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], - ch_pid = ChPid}). + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], - ch_pid = ChPid}). - -process_pending(Txn, State) -> - #tx{ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), - case lookup_ch(ChPid) of - not_found -> ok; - C = #cr{unacked_messages = UAM} -> - {_Acked, Remaining} = - collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}) - end, + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}). + +process_pending(Txn, ChPid, State) -> + #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} = + lookup_tx(Txn), + C = #cr{unacked_messages = UAM} = lookup_ch(ChPid), + {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), deliver_or_enqueue_n(lists:reverse(PendingMessages), State). collect_messages(MsgIds, UAM) -> @@ -589,12 +581,13 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({commit, Txn}, From, State) -> +handle_call({commit, Txn, ChPid}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue gen_server2:reply(From, ok), - NewState = process_pending(Txn, State), + NewState = process_pending(Txn, ChPid, State), erase_tx(Txn), + record_current_channel_tx(ChPid, none), noreply(NewState); handle_call({notify_down, ChPid}, _From, State) -> @@ -776,9 +769,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> noreply(State) end; -handle_cast({rollback, Txn}, State) -> +handle_cast({rollback, Txn, ChPid}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), + record_current_channel_tx(ChPid, none), noreply(State); handle_cast({redeliver, Messages}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9aeb4623..7d3cd722 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -928,7 +928,7 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); @@ -945,7 +945,7 @@ internal_rollback(State = #ch{transaction_id = TxnKey, queue:len(UAQ), queue:len(UAMQ)]), case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> rabbit_misc:protocol_error( |