diff options
Diffstat (limited to 'src/rabbit_mysql_queue.erl')
-rw-r--r-- | src/rabbit_mysql_queue.erl | 86 |
1 files changed, 34 insertions, 52 deletions
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl index fa01f7e8..44441225 100644 --- a/src/rabbit_mysql_queue.erl +++ b/src/rabbit_mysql_queue.erl @@ -453,8 +453,8 @@ ack(SeqIds, S) -> %% msg and its properties in the to_pub field of the txn, waiting to %% be committed. %% -%% tx_publish/4 creates an Mnesia transaction to run in, and therefore -%% may not be called from inside another Mnesia transaction. +%% tx_publish/4 creates a MySQL transaction to run in, and therefore +%% may not be called from inside another MySQL transaction. %% %% -spec(tx_publish/4 :: %% (rabbit_types:txn(), @@ -465,17 +465,13 @@ ack(SeqIds, S) -> tx_publish(Txn, Msg, Props, S) -> % rabbit_log:info("tx_publish(~n ~p,~n ~p,~n ~p,~n ~p) ->", [Txn, Msg, Props, S]), - {atomic, Result} = - mnesia:transaction( - fun () -> Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S), - RS = store_tx(Txn, - Tx #tx { to_pub = [{Msg, Props} | Pubs] }, - S), - save(RS), - RS - end), + Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S), + RS = store_tx(Txn, + Tx #tx { to_pub = [{Msg, Props} | Pubs] }, + S), + save(RS), + RS. % rabbit_log:info("tx_publish ->~n ~p", [Result]), - Result. %%---------------------------------------------------------------------------- %% tx_ack/3 acks within an AMQP transaction. It stores the seq_id in @@ -488,18 +484,14 @@ tx_publish(Txn, Msg, Props, S) -> tx_ack(Txn, SeqIds, S) -> % rabbit_log:info("tx_ack(~n ~p,~n ~p,~n ~p) ->", [Txn, SeqIds, S]), - {atomic, Result} = - mnesia:transaction( - fun () -> Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S), - RS = store_tx(Txn, - Tx #tx { - to_ack = lists:append(SeqIds, SeqIds0) }, - S), - save(RS), - RS - end), - % rabbit_log:info("tx_ack ->~n ~p", [Result]), - Result. + Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S), + RS = store_tx(Txn, + Tx #tx { + to_ack = lists:append(SeqIds, SeqIds0) }, + S), + save(RS), + % rabbit_log:info("tx_ack ->~n ~p", [RS]), + RS. %%---------------------------------------------------------------------------- %% tx_rollback/2 aborts an AMQP transaction. @@ -511,25 +503,17 @@ tx_ack(Txn, SeqIds, S) -> tx_rollback(Txn, S) -> % rabbit_log:info("tx_rollback(~n ~p,~n ~p) ->", [Txn, S]), - {atomic, Result} = - mnesia:transaction(fun () -> - #tx { to_ack = SeqIds } = lookup_tx(Txn, S), - RS = erase_tx(Txn, S), - save(RS), - {SeqIds, RS} - end), - % rabbit_log:info("tx_rollback ->~n ~p", [Result]), - Result. + #tx { to_ack = SeqIds } = lookup_tx(Txn, S), + RS = erase_tx(Txn, S), + save(RS), + % rabbit_log:info("tx_rollback ->~n ~p", [RS]), + {SeqIds, RS}. %%---------------------------------------------------------------------------- %% tx_commit/4 commits an AMQP transaction. The F passed in is called %% once the msgs have really been commited. This CPS permits the %% possibility of commit coalescing. %% -%% tx_commit/4 creates an Mnesia transaction to run in, and therefore -%% may not be called from inside another Mnesia transaction. However, -%% the supplied F is called outside the transaction. -%% %% -spec(tx_commit/4 :: %% (rabbit_types:txn(), %% fun (() -> any()), @@ -539,15 +523,10 @@ tx_rollback(Txn, S) -> tx_commit(Txn, F, PropsF, S) -> % rabbit_log:info("tx_commit(~n ~p,~n ~p,~n ~p,~n ~p) ->", [Txn, F, PropsF, S]), - {atomic, {Result, Pubs}} = - mnesia:transaction( - fun () -> - #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S), - RS = - tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S)), - save(RS), - {{SeqIds, RS}, Pubs} - end), + #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S), + RS = tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S)), + save(RS), + {Result = {SeqIds, RS}, Pubs}, F(), % rabbit_log:info("tx_commit ->~n ~p", [Result]), callback(Pubs), @@ -811,13 +790,16 @@ del_ps(F, SeqIds, S = #s { queue_name = DbQueueName }) -> lists:foldl( fun( SeqId, Si) -> DbList = mysql_helper:read_p_record(DbQueueName, SeqId), - [#p_record {m = MBin}] = - emysql_util:as_record(DbList, - p_record, - record_info(fields, - p_record)), + rabbit_log:info(">>>>> DbQueueName = ~p~n", [DbQueueName]), + rabbit_log:info(">>>>> DbList = ~p~n", [DbList]), + Temp = emysql_util:as_record(DbList, + p_record, + record_info(fields, + p_record)), + rabbit_log:info(">>>>> Temp = ~p~n", [Temp]), + [#p_record {m = MBin}] = Temp, M = binary_to_term(MBin), - mysql_helper:delete_message_from_p_by_seq_id(SeqId), + mysql_helper:delete_message_from_p_by_seq_id(SeqId, DbQueueName), F(M, Si) end, S, |