summaryrefslogtreecommitdiff
path: root/src/rabbit_mysql_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mysql_queue.erl')
-rw-r--r--src/rabbit_mysql_queue.erl86
1 files changed, 34 insertions, 52 deletions
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl
index fa01f7e8..44441225 100644
--- a/src/rabbit_mysql_queue.erl
+++ b/src/rabbit_mysql_queue.erl
@@ -453,8 +453,8 @@ ack(SeqIds, S) ->
%% msg and its properties in the to_pub field of the txn, waiting to
%% be committed.
%%
-%% tx_publish/4 creates an Mnesia transaction to run in, and therefore
-%% may not be called from inside another Mnesia transaction.
+%% tx_publish/4 creates a MySQL transaction to run in, and therefore
+%% may not be called from inside another MySQL transaction.
%%
%% -spec(tx_publish/4 ::
%% (rabbit_types:txn(),
@@ -465,17 +465,13 @@ ack(SeqIds, S) ->
tx_publish(Txn, Msg, Props, S) ->
% rabbit_log:info("tx_publish(~n ~p,~n ~p,~n ~p,~n ~p) ->", [Txn, Msg, Props, S]),
- {atomic, Result} =
- mnesia:transaction(
- fun () -> Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S),
- RS = store_tx(Txn,
- Tx #tx { to_pub = [{Msg, Props} | Pubs] },
- S),
- save(RS),
- RS
- end),
+ Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S),
+ RS = store_tx(Txn,
+ Tx #tx { to_pub = [{Msg, Props} | Pubs] },
+ S),
+ save(RS),
+ RS.
% rabbit_log:info("tx_publish ->~n ~p", [Result]),
- Result.
%%----------------------------------------------------------------------------
%% tx_ack/3 acks within an AMQP transaction. It stores the seq_id in
@@ -488,18 +484,14 @@ tx_publish(Txn, Msg, Props, S) ->
tx_ack(Txn, SeqIds, S) ->
% rabbit_log:info("tx_ack(~n ~p,~n ~p,~n ~p) ->", [Txn, SeqIds, S]),
- {atomic, Result} =
- mnesia:transaction(
- fun () -> Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S),
- RS = store_tx(Txn,
- Tx #tx {
- to_ack = lists:append(SeqIds, SeqIds0) },
- S),
- save(RS),
- RS
- end),
- % rabbit_log:info("tx_ack ->~n ~p", [Result]),
- Result.
+ Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S),
+ RS = store_tx(Txn,
+ Tx #tx {
+ to_ack = lists:append(SeqIds, SeqIds0) },
+ S),
+ save(RS),
+ % rabbit_log:info("tx_ack ->~n ~p", [RS]),
+ RS.
%%----------------------------------------------------------------------------
%% tx_rollback/2 aborts an AMQP transaction.
@@ -511,25 +503,17 @@ tx_ack(Txn, SeqIds, S) ->
tx_rollback(Txn, S) ->
% rabbit_log:info("tx_rollback(~n ~p,~n ~p) ->", [Txn, S]),
- {atomic, Result} =
- mnesia:transaction(fun () ->
- #tx { to_ack = SeqIds } = lookup_tx(Txn, S),
- RS = erase_tx(Txn, S),
- save(RS),
- {SeqIds, RS}
- end),
- % rabbit_log:info("tx_rollback ->~n ~p", [Result]),
- Result.
+ #tx { to_ack = SeqIds } = lookup_tx(Txn, S),
+ RS = erase_tx(Txn, S),
+ save(RS),
+ % rabbit_log:info("tx_rollback ->~n ~p", [RS]),
+ {SeqIds, RS}.
%%----------------------------------------------------------------------------
%% tx_commit/4 commits an AMQP transaction. The F passed in is called
%% once the msgs have really been commited. This CPS permits the
%% possibility of commit coalescing.
%%
-%% tx_commit/4 creates an Mnesia transaction to run in, and therefore
-%% may not be called from inside another Mnesia transaction. However,
-%% the supplied F is called outside the transaction.
-%%
%% -spec(tx_commit/4 ::
%% (rabbit_types:txn(),
%% fun (() -> any()),
@@ -539,15 +523,10 @@ tx_rollback(Txn, S) ->
tx_commit(Txn, F, PropsF, S) ->
% rabbit_log:info("tx_commit(~n ~p,~n ~p,~n ~p,~n ~p) ->", [Txn, F, PropsF, S]),
- {atomic, {Result, Pubs}} =
- mnesia:transaction(
- fun () ->
- #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S),
- RS =
- tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S)),
- save(RS),
- {{SeqIds, RS}, Pubs}
- end),
+ #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S),
+ RS = tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S)),
+ save(RS),
+ {Result = {SeqIds, RS}, Pubs},
F(),
% rabbit_log:info("tx_commit ->~n ~p", [Result]),
callback(Pubs),
@@ -811,13 +790,16 @@ del_ps(F, SeqIds, S = #s { queue_name = DbQueueName }) ->
lists:foldl(
fun( SeqId, Si) ->
DbList = mysql_helper:read_p_record(DbQueueName, SeqId),
- [#p_record {m = MBin}] =
- emysql_util:as_record(DbList,
- p_record,
- record_info(fields,
- p_record)),
+ rabbit_log:info(">>>>> DbQueueName = ~p~n", [DbQueueName]),
+ rabbit_log:info(">>>>> DbList = ~p~n", [DbList]),
+ Temp = emysql_util:as_record(DbList,
+ p_record,
+ record_info(fields,
+ p_record)),
+ rabbit_log:info(">>>>> Temp = ~p~n", [Temp]),
+ [#p_record {m = MBin}] = Temp,
M = binary_to_term(MBin),
- mysql_helper:delete_message_from_p_by_seq_id(SeqId),
+ mysql_helper:delete_message_from_p_by_seq_id(SeqId, DbQueueName),
F(M, Si)
end,
S,