diff options
author | Jerry Kuch <jerryk@vmware.com> | 2011-03-02 14:01:20 -0800 |
---|---|---|
committer | Jerry Kuch <jerryk@vmware.com> | 2011-03-02 14:01:20 -0800 |
commit | b1c1be5a956c5d655f8ca1c57a1e3463af5ca271 (patch) | |
tree | bdc5bfef1491d5a1ab135480c6db748d3b9a9148 | |
parent | 3f3943fa109d0887b93adbcc09526dbc52aabe5c (diff) | |
download | rabbitmq-server-b1c1be5a956c5d655f8ca1c57a1e3463af5ca271.tar.gz |
Fix publish and publish_state.
-rw-r--r-- | src/mysql_helper.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mysql_queue.erl | 52 |
2 files changed, 36 insertions, 26 deletions
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl index 74004b08..3d979130 100644 --- a/src/mysql_helper.erl +++ b/src/mysql_helper.erl @@ -82,7 +82,9 @@ prepare_mysql_statements() -> {count_q_stmt, <<"SELECT COUNT(*) FROM q WHERE queue_name = ?">>}, {count_n_stmt, - <<"SELECT COUNT(*) FROM n WHERE queue_name = ?">>} ], + <<"SELECT COUNT(*) FROM n WHERE queue_name = ?">>}, + {write_msg_to_q_stmt, + <<"INSERT INTO q(queue_name, m, is_persistent) VALUES (?,?,?)">>} ], [ emysql:prepare(StmtAtom, StmtBody) || {StmtAtom, StmtBody} <- Statements ]. begin_mysql_transaction() -> @@ -164,6 +166,12 @@ count_rows_for_queue(TableType, DbQueueName) -> {result_packet, _,_,[[Val]],_} = QueryResult, Val. +write_message_to_q(DbQueueName, Msg, IsPersistent) -> + emysql:execute(?RABBIT_DB_POOL_NAME, + write_msg_to_q_stmt, + [DbQueueName, term_to_binary(Msg), IsPersistent]), + ok. + %% This is only for convenience in REPL debugging. Get rid of it later. wake_up() -> ensure_connection_pool(), diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl index d3afaeed..b4397798 100644 --- a/src/rabbit_mysql_queue.erl +++ b/src/rabbit_mysql_queue.erl @@ -305,15 +305,11 @@ purge(S = #s { queue_name = DbQueueName }) -> Result. -%%############################################################################# -%% THE RUBICON... -%%############################################################################# - %%---------------------------------------------------------------------------- %% publish/3 publishes a msg. %% -%% publish/3 creates an Mnesia transaction to run in, and therefore -%% may not be called from inside another Mnesia transaction. +%% publish/3 creates a MySQL transaction to run in, and therefore +%% may not be called from inside another MySQL transaction. %% %% -spec(publish/3 :: %% (rabbit_types:basic_message(), @@ -322,28 +318,27 @@ purge(S = #s { queue_name = DbQueueName }) -> %% -> state()). publish(Msg, Props, S) -> - % rabbit_log:info("publish(~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]), - {atomic, Result} = - mnesia:transaction(fun () -> RS = publish_state(Msg, Props, false, S), - save(RS), - RS - end), - % rabbit_log:info("publish ->~n ~p", [Result]), + rabbit_log:info("publish(~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]), + mysql_helper:begin_mysql_transaction(), + RS = publish_state(Msg, Props, false, S), + save(RS), + mysql_helper:commit_mysql_transaction(), + rabbit_log:info("publish ->~n ~p", [RS]), callback([{Msg, Props}]), - Result. + RS. + %%############################################################################# -%% OTHER SIDE OF THE RUBICON... +%% THE RUBICON... %%############################################################################# - %%---------------------------------------------------------------------------- %% publish_delivered/4 is called after a msg has been passed straight %% out to a client because the queue is empty. We update all state %% (e.g., next_seq_id) as if we had in fact handled the msg. %% -%% publish_delivered/4 creates an Mnesia transaction to run in, and -%% therefore may not be called from inside another Mnesia transaction. +%% publish_delivered/4 creates a MySQL transaction to run in, and +%% therefore may not be called from inside another MySQL transaction. %% %% -spec(publish_delivered/4 :: (true, rabbit_types:basic_message(), %% rabbit_types:message_properties(), state()) @@ -353,16 +348,17 @@ publish(Msg, Props, S) -> %% -> {undefined, state()}). publish_delivered(false, Msg, Props, S) -> - % rabbit_log:info("publish_delivered(false,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]), + rabbit_log:info("publish_delivered(false,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]), Result = {undefined, S}, - % rabbit_log:info("publish_delivered ->~n ~p", [Result]), + rabbit_log:info("publish_delivered ->~n ~p", [Result]), callback([{Msg, Props}]), Result; publish_delivered(true, Msg, Props, S = #s { next_seq_id = SeqId }) -> - % rabbit_log:info("publish_delivered(true,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]), + rabbit_log:info("publish_delivered(true,~n ~p,~n ~p,~n ~p) ->", + [Msg, Props, S]), %% {atomic, Result} = %% mnesia:transaction( %% fun () -> @@ -376,6 +372,12 @@ publish_delivered(true, %% callback([{Msg, Props}]), %% Result. yo_mama_bogus_result. +%%############################################################################# +%% OTHER SIDE OF THE RUBICON... +%%############################################################################# + + + %%---------------------------------------------------------------------------- %% dropwhile/2 drops msgs from the head of the queue while there are @@ -757,10 +759,10 @@ publish_state(Msg, IsDelivered, S = #s { queue_name = DbQueueName, next_seq_id = SeqId }) -> - %% M = (m(Msg, SeqId, Props)) #m { is_delivered = IsDelivered }, - %% mnesia:write(QTable, #q_record { out_id = OutId, m = M }, 'write'), - %% S #s { next_seq_id = SeqId + 1, next_out_id = OutId + 1 }. - yo_mama_bogus_result. + IsPersistent = Msg#'basic_message'.is_persistent, + M = (m(Msg, SeqId, Props)) #m { is_delivered = IsDelivered }, + mysql_helper:write_message_to_q(DbQueueName, M, IsPersistent), + S #s { next_seq_id = SeqId + 1}. -spec(internal_ack/2 :: ([seq_id()], s()) -> s()). |