summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-03-02 14:01:20 -0800
committerJerry Kuch <jerryk@vmware.com>2011-03-02 14:01:20 -0800
commitb1c1be5a956c5d655f8ca1c57a1e3463af5ca271 (patch)
treebdc5bfef1491d5a1ab135480c6db748d3b9a9148
parent3f3943fa109d0887b93adbcc09526dbc52aabe5c (diff)
downloadrabbitmq-server-b1c1be5a956c5d655f8ca1c57a1e3463af5ca271.tar.gz
Fix publish and publish_state.
-rw-r--r--src/mysql_helper.erl10
-rw-r--r--src/rabbit_mysql_queue.erl52
2 files changed, 36 insertions, 26 deletions
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl
index 74004b08..3d979130 100644
--- a/src/mysql_helper.erl
+++ b/src/mysql_helper.erl
@@ -82,7 +82,9 @@ prepare_mysql_statements() ->
{count_q_stmt,
<<"SELECT COUNT(*) FROM q WHERE queue_name = ?">>},
{count_n_stmt,
- <<"SELECT COUNT(*) FROM n WHERE queue_name = ?">>} ],
+ <<"SELECT COUNT(*) FROM n WHERE queue_name = ?">>},
+ {write_msg_to_q_stmt,
+ <<"INSERT INTO q(queue_name, m, is_persistent) VALUES (?,?,?)">>} ],
[ emysql:prepare(StmtAtom, StmtBody) || {StmtAtom, StmtBody} <- Statements ].
begin_mysql_transaction() ->
@@ -164,6 +166,12 @@ count_rows_for_queue(TableType, DbQueueName) ->
{result_packet, _,_,[[Val]],_} = QueryResult,
Val.
+write_message_to_q(DbQueueName, Msg, IsPersistent) ->
+ emysql:execute(?RABBIT_DB_POOL_NAME,
+ write_msg_to_q_stmt,
+ [DbQueueName, term_to_binary(Msg), IsPersistent]),
+ ok.
+
%% This is only for convenience in REPL debugging. Get rid of it later.
wake_up() ->
ensure_connection_pool(),
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl
index d3afaeed..b4397798 100644
--- a/src/rabbit_mysql_queue.erl
+++ b/src/rabbit_mysql_queue.erl
@@ -305,15 +305,11 @@ purge(S = #s { queue_name = DbQueueName }) ->
Result.
-%%#############################################################################
-%% THE RUBICON...
-%%#############################################################################
-
%%----------------------------------------------------------------------------
%% publish/3 publishes a msg.
%%
-%% publish/3 creates an Mnesia transaction to run in, and therefore
-%% may not be called from inside another Mnesia transaction.
+%% publish/3 creates a MySQL transaction to run in, and therefore
+%% may not be called from inside another MySQL transaction.
%%
%% -spec(publish/3 ::
%% (rabbit_types:basic_message(),
@@ -322,28 +318,27 @@ purge(S = #s { queue_name = DbQueueName }) ->
%% -> state()).
publish(Msg, Props, S) ->
- % rabbit_log:info("publish(~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]),
- {atomic, Result} =
- mnesia:transaction(fun () -> RS = publish_state(Msg, Props, false, S),
- save(RS),
- RS
- end),
- % rabbit_log:info("publish ->~n ~p", [Result]),
+ rabbit_log:info("publish(~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]),
+ mysql_helper:begin_mysql_transaction(),
+ RS = publish_state(Msg, Props, false, S),
+ save(RS),
+ mysql_helper:commit_mysql_transaction(),
+ rabbit_log:info("publish ->~n ~p", [RS]),
callback([{Msg, Props}]),
- Result.
+ RS.
+
%%#############################################################################
-%% OTHER SIDE OF THE RUBICON...
+%% THE RUBICON...
%%#############################################################################
-
%%----------------------------------------------------------------------------
%% publish_delivered/4 is called after a msg has been passed straight
%% out to a client because the queue is empty. We update all state
%% (e.g., next_seq_id) as if we had in fact handled the msg.
%%
-%% publish_delivered/4 creates an Mnesia transaction to run in, and
-%% therefore may not be called from inside another Mnesia transaction.
+%% publish_delivered/4 creates a MySQL transaction to run in, and
+%% therefore may not be called from inside another MySQL transaction.
%%
%% -spec(publish_delivered/4 :: (true, rabbit_types:basic_message(),
%% rabbit_types:message_properties(), state())
@@ -353,16 +348,17 @@ publish(Msg, Props, S) ->
%% -> {undefined, state()}).
publish_delivered(false, Msg, Props, S) ->
- % rabbit_log:info("publish_delivered(false,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]),
+ rabbit_log:info("publish_delivered(false,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]),
Result = {undefined, S},
- % rabbit_log:info("publish_delivered ->~n ~p", [Result]),
+ rabbit_log:info("publish_delivered ->~n ~p", [Result]),
callback([{Msg, Props}]),
Result;
publish_delivered(true,
Msg,
Props,
S = #s { next_seq_id = SeqId }) ->
- % rabbit_log:info("publish_delivered(true,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]),
+ rabbit_log:info("publish_delivered(true,~n ~p,~n ~p,~n ~p) ->",
+ [Msg, Props, S]),
%% {atomic, Result} =
%% mnesia:transaction(
%% fun () ->
@@ -376,6 +372,12 @@ publish_delivered(true,
%% callback([{Msg, Props}]),
%% Result.
yo_mama_bogus_result.
+%%#############################################################################
+%% OTHER SIDE OF THE RUBICON...
+%%#############################################################################
+
+
+
%%----------------------------------------------------------------------------
%% dropwhile/2 drops msgs from the head of the queue while there are
@@ -757,10 +759,10 @@ publish_state(Msg,
IsDelivered,
S = #s { queue_name = DbQueueName,
next_seq_id = SeqId }) ->
- %% M = (m(Msg, SeqId, Props)) #m { is_delivered = IsDelivered },
- %% mnesia:write(QTable, #q_record { out_id = OutId, m = M }, 'write'),
- %% S #s { next_seq_id = SeqId + 1, next_out_id = OutId + 1 }.
- yo_mama_bogus_result.
+ IsPersistent = Msg#'basic_message'.is_persistent,
+ M = (m(Msg, SeqId, Props)) #m { is_delivered = IsDelivered },
+ mysql_helper:write_message_to_q(DbQueueName, M, IsPersistent),
+ S #s { next_seq_id = SeqId + 1}.
-spec(internal_ack/2 :: ([seq_id()], s()) -> s()).