diff options
author | Jerry Kuch <jerryk@vmware.com> | 2011-03-01 17:09:32 -0800 |
---|---|---|
committer | Jerry Kuch <jerryk@vmware.com> | 2011-03-01 17:09:32 -0800 |
commit | b494ad7209b914cee98cb855c9225d8d9c3b1ce0 (patch) | |
tree | 920d3f6e0656fc04eb5388cd7ec7a868679f6ddc | |
parent | 5506a0fad18cf5485662231abe77d1cd4ed24d97 (diff) | |
download | rabbitmq-server-b494ad7209b914cee98cb855c9225d8d9c3b1ce0.tar.gz |
Miscellaneous MySQL wire up for init method.
-rw-r--r-- | src/mysql_helper.erl | 53 | ||||
-rw-r--r-- | src/rabbit_mysql_queue.erl | 352 |
2 files changed, 215 insertions, 190 deletions
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl index 733f122b..6c974410 100644 --- a/src/mysql_helper.erl +++ b/src/mysql_helper.erl @@ -54,12 +54,13 @@ ensure_connection_pool() -> exit:pool_already_exists -> ok end. + +%% NOTE: What the MySQL protocol actually supports in prepared statements +%% seems a bit non-uniform. For example, 'COMMIT' is in, but +%% 'START TRANSACTION' isn't. Fortunately, most of the things that +%% are parametrizable, and thus prone to injection attacks and the +%% like do seem to be there. prepare_mysql_statements() -> - %% NOTE: What the MySQL protocol actually supports in prepared statements - %% seems a bit non-uniform. For example, 'COMMIT' is in, but - %% 'START TRANSACTION' isn't. Fortunately, most of the things that - %% are parametrizable, and thus prone to injection attacks and the - %% like do seem to be there. Statements = [{insert_q_stmt,<<"INSERT INTO q(queue_name, m) VALUES(?,?)">>}, {insert_p_stmt, <<"INSERT INTO p(seq_id, queue_name, m) VALUES(?,?,?)">>}, @@ -67,18 +68,52 @@ prepare_mysql_statements() -> <<"INSERT INTO n(queue_name, next_seq_id) VALUES(?,?)">>}, {delete_q_stmt,<<"DELETE FROM q WHERE queue_name = ?">>}, {delete_p_stmt,<<"DELETE FROM p WHERE queue_name = ?">>}, - {delete_n_stmt,<<"DELETE FROM n WHERE queue_name = ?">>}], + {delete_n_stmt,<<"DELETE FROM n WHERE queue_name = ?">>}, + {read_n_stmt, <<"SELECT * FROM n WHERE queue_name = ?">>}, + {put_n_stmt, <<"REPLACE INTO n(queue_name, next_seq_id) VALUES(?,?)">>} ], + [ emysql:prepare(StmtAtom, StmtBody) || {StmtAtom, StmtBody} <- Statements ]. +begin_mysql_transaction() -> + emysql:execute(?RABBIT_DB_POOL_NAME, + <<"START TRANSACTION">>). + +commit_mysql_transaction() -> + emysql:execute(?RABBIT_DB_POOL_NAME, + <<"COMMIT">>). + delete_queue_data(QueueName) -> %% TODO: Error checking... - emysql:execute(?RABBIT_DB_POOL_NAME, - <<"START TRANSACTION">>), [ emysql:execute(?RABBIT_DB_POOL_NAME, Stmt, [QueueName]) || Stmt <- [delete_q_stmt, delete_p_stmt, delete_n_stmt] ], + ok. + +read_n_record(QueueName) -> + %% BUGBUG: Ugly. We really should convert the result to Erlang records + %% here and isolate rabbit_mysql_queue from any direct touching + %% of the emysql library, but we need to split some records out + %% to an include file first... emysql:execute(?RABBIT_DB_POOL_NAME, - <<"COMMIT">>), + read_n_stmt, + [QueueName]). + +%% TODO: Consistent error handling... +write_n_record(DbQueueName, NextSeqId) -> + Result = emysql:execute(?RABBIT_DB_POOL_NAME, + put_n_stmt, + [DbQueueName, NextSeqId]), + case Result of + #ok_packet{} -> ok; + #error_packet{} -> rabbit_log:error("Failed REPLACE on n Table (~p,~p)", + [DbQueueName, NextSeqId]) + end. + +%% Delete non-persistent msgs after a restart. +-spec delete_nonpersistent_msgs(string()) -> ok. + +delete_nonpersistent_msgs(DbQueueName) -> + ok. diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl index b7c8af3f..bf7c558c 100644 --- a/src/rabbit_mysql_queue.erl +++ b/src/rabbit_mysql_queue.erl @@ -71,11 +71,8 @@ %% (which can be dropped on a crash). -record(s, % The in-RAM queue state - { q_table, % The MySQL queue table name - p_table, % The MySQL pending-ack table name - n_table, % The MySQL next_(seq_id, out_id) table name + { queue_name, % Queue name as canonicalized for database next_seq_id, % The next M's seq_id - next_out_id, % The next M's out id txn_dict % In-progress txn->tx map }). @@ -110,7 +107,8 @@ %% requeued while keeping the same seq_id.) -record(q_record, % Q records in MySQL - { out_id, % The key: The out_id + { id, % Analogous to the 'out_id' out in Mnesia queue + queue_name, % Queue name m % The value: The M }). @@ -119,7 +117,8 @@ %% accssed by seq_id. -record(p_record, % P records in MySQL - { seq_id, % The key: The seq_id + { seq_id, % The seq_id + queue_name, % Queue name m % The value: The M }). @@ -129,10 +128,9 @@ %% MySQL transaction that updates them in the in-RAM S. -record(n_record, % next_seq_id & next_out_id record in MySQL - { key, % The key: the atom 'n' - next_seq_id, % The MySQL next_seq_id - next_out_id % The MySQL next_out_id - }). + { queue_name, % Queue name + next_seq_id % The MySQL next_seq_id + }). -include("rabbit.hrl"). @@ -147,11 +145,8 @@ -type(seq_id() :: non_neg_integer()). -type(ack() :: seq_id()). --type(s() :: #s { q_table :: atom(), - p_table :: atom(), - n_table :: atom(), +-type(s() :: #s { queue_name :: string(), next_seq_id :: seq_id(), - next_out_id :: non_neg_integer(), txn_dict :: dict() }). -type(state() :: s()). @@ -164,15 +159,15 @@ rabbit_types:message_properties()}], to_ack :: [seq_id()] }). --type(q_record() :: #q_record { out_id :: non_neg_integer(), +-type(q_record() :: #q_record { id :: non_neg_integer(), + queue_name :: string(), m :: m() }). -type(p_record() :: #p_record { seq_id :: seq_id(), m :: m() }). --type(n_record() :: #n_record { key :: 'n', - next_seq_id :: seq_id(), - next_out_id :: non_neg_integer() }). +-type(n_record() :: #n_record { queue_name:: string(), + next_seq_id :: seq_id() }). -include("rabbit_backing_queue_spec.hrl"). @@ -215,59 +210,46 @@ stop() -> ok. %% init/3 creates one backing queue, returning its state. Names are %% local to the vhost, and must be unique. %% -%% init/3 creates Mnesia transactions to run in, and therefore may not -%% be called from inside another Mnesia transaction. +%% init/3 creates MySQL transactions to run in, and therefore may not +%% be called from inside another MySQL transaction. %% %% -spec(init/3 :: %% (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) %% -> state()). - -%% BUG: We should allow clustering of the Mnesia tables. - -%% BUG: It's unfortunate that this can't all be done in a single -%% Mnesia transaction! - +%% +%% BUGBUG: Error checking... init(QueueName, IsDurable, Recover) -> - rabbit_log:info("init(~n ~p,~n ~p,~n ~p) ->", [QueueName, IsDurable, Recover]), + rabbit_log:info("init(~n ~p,~n ~p,~n ~p) ->", + [QueueName, IsDurable, Recover]), DbQueueName = canonicalize_queue_name(QueueName), case Recover of false -> _ = mysql_helper:delete_queue_data(DbQueueName); true -> ok end, - %% BOOKMARK - %% create_table(QTable, 'q_record', 'ordered_set', record_info(fields, - %% q_record)), - %% create_table(PTable, 'p_record', 'set', record_info(fields, p_record)), - %% create_table(NTable, 'n_record', 'set', record_info(fields, n_record)), - %% {atomic, Result} = - %% mnesia:transaction( - %% fun () -> - %% case IsDurable of - %% false -> clear_table(QTable), - %% clear_table(PTable), - %% clear_table(NTable); - %% true -> delete_nonpersistent_msgs(QTable) - %% end, - %% {NextSeqId, NextOutId} = - %% case mnesia:read(NTable, 'n', 'read') of - %% [] -> {0, 0}; - %% [#n_record { next_seq_id = NextSeqId0, - %% next_out_id = NextOutId0 }] -> - %% {NextSeqId0, NextOutId0} - %% end, - %% RS = #s { q_table = QTable, - %% p_table = PTable, - %% n_table = NTable, - %% next_seq_id = NextSeqId, - %% next_out_id = NextOutId, - %% txn_dict = dict:new() }, - %% save(RS), - %% RS - %% end), - %% % rabbit_log:info("init ->~n ~p", [Result]), + + mysql_helper:begin_mysql_transaction(), + case IsDurable of + false -> mysql_helper:delete_queue_data(DbQueueName); + true -> mysql_helper:delete_nonpersistent_msgs(DbQueueName) + end, + NReadResult = mysql_helper:read_n_record(DbQueueName), + NRecs = emysql_util:as_record(NReadResult, + n_record, + record_info(fields, n_record)), + NextSeqId = case NRecs of + [] -> 0; + [#n_record{ next_seq_id = NextSeqId0 }] -> NextSeqId0 + end, + RS = #s { queue_name = DbQueueName, + next_seq_id = NextSeqId, + txn_dict = dict:new() }, + save(RS), + mysql_helper:commit_mysql_transaction(), + Result = RS, + rabbit_log:info("init ->~n ~p", [Result]), callback([]), - Result = yo_mama_i_am_a_placeholder. + Result. %%############################################################################# %% OTHER SIDE OF THE RUBICON... @@ -283,13 +265,14 @@ init(QueueName, IsDurable, Recover) -> %% %% -spec(terminate/1 :: (state()) -> state()). -terminate(S = #s { q_table = QTable, p_table = PTable, n_table = NTable }) -> - % rabbit_log:info("terminate(~n ~p) ->", [S]), - {atomic, Result} = - mnesia:transaction(fun () -> clear_table(PTable), S end), - mnesia:dump_tables([QTable, PTable, NTable]), - % rabbit_log:info("terminate ->~n ~p", [Result]), - Result. +terminate(S = #s { queue_name = DbQueueName}) -> + %% % rabbit_log:info("terminate(~n ~p) ->", [S]), + %% {atomic, Result} = + %% mnesia:transaction(fun () -> clear_table(PTable), S end), + %% mnesia:dump_tables([QTable, PTable, NTable]), + %% % rabbit_log:info("terminate ->~n ~p", [Result]), + %% Result. + yo_mama_bogus_result. %%---------------------------------------------------------------------------- %% delete_and_terminate/1 deletes all of a queue's enqueued msgs and @@ -300,18 +283,17 @@ terminate(S = #s { q_table = QTable, p_table = PTable, n_table = NTable }) -> %% %% -spec(delete_and_terminate/1 :: (state()) -> state()). -delete_and_terminate(S = #s { q_table = QTable, - p_table = PTable, - n_table = NTable }) -> +delete_and_terminate(S = #s { queue_name = DbQueueName }) -> % rabbit_log:info("delete_and_terminate(~n ~p) ->", [S]), - {atomic, Result} = - mnesia:transaction(fun () -> clear_table(QTable), - clear_table(PTable), - S - end), - mnesia:dump_tables([QTable, PTable, NTable]), - % rabbit_log:info("delete_and_terminate ->~n ~p", [Result]), - Result. + %% {atomic, Result} = + %% mnesia:transaction(fun () -> clear_table(QTable), + %% clear_table(PTable), + %% S + %% end), + %% mnesia:dump_tables([QTable, PTable, NTable]), + %% % rabbit_log:info("delete_and_terminate ->~n ~p", [Result]), + %% Result. + yo_mama_bogus_result. %%---------------------------------------------------------------------------- %% purge/1 deletes all of queue's enqueued msgs, generating pending @@ -322,15 +304,16 @@ delete_and_terminate(S = #s { q_table = QTable, %% %% -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). -purge(S = #s { q_table = QTable }) -> +purge(S = #s { queue_name = DbQueueName }) -> % rabbit_log:info("purge(~n ~p) ->", [S]), - {atomic, Result} = - mnesia:transaction(fun () -> LQ = length(mnesia:all_keys(QTable)), - clear_table(QTable), - {LQ, S} - end), - % rabbit_log:info("purge ->~n ~p", [Result]), - Result. + %% {atomic, Result} = + %% mnesia:transaction(fun () -> LQ = length(mnesia:all_keys(QTable)), + %% clear_table(QTable), + %% {LQ, S} + %% end), + %% % rabbit_log:info("purge ->~n ~p", [Result]), + %% Result. + yo_mama_bogus_result. %%---------------------------------------------------------------------------- %% publish/3 publishes a msg. @@ -379,20 +362,21 @@ publish_delivered(false, Msg, Props, S) -> publish_delivered(true, Msg, Props, - S = #s { next_seq_id = SeqId, next_out_id = OutId }) -> + S = #s { next_seq_id = SeqId }) -> % rabbit_log:info("publish_delivered(true,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]), - {atomic, Result} = - mnesia:transaction( - fun () -> - add_p((m(Msg, SeqId, Props)) #m { is_delivered = true }, S), - RS = S #s { next_seq_id = SeqId + 1, - next_out_id = OutId + 1 }, - save(RS), - {SeqId, RS} - end), - % rabbit_log:info("publish_delivered ->~n ~p", [Result]), - callback([{Msg, Props}]), - Result. + %% {atomic, Result} = + %% mnesia:transaction( + %% fun () -> + %% add_p((m(Msg, SeqId, Props)) #m { is_delivered = true }, S), + %% RS = S #s { next_seq_id = SeqId + 1, + %% next_out_id = OutId + 1 }, + %% save(RS), + %% {SeqId, RS} + %% end), + %% % rabbit_log:info("publish_delivered ->~n ~p", [Result]), + %% callback([{Msg, Props}]), + %% Result. + yo_mama_bogus_result. %%---------------------------------------------------------------------------- %% dropwhile/2 drops msgs from the head of the queue while there are @@ -610,12 +594,13 @@ requeue(SeqIds, PropsF, S) -> %% %% -spec(len/1 :: (state()) -> non_neg_integer()). -len(#s { q_table = QTable }) -> - % rabbit_log:info("len(~n ~p) ->", [S]), - {atomic, Result} = - mnesia:transaction(fun () -> length(mnesia:all_keys(QTable)) end), - % rabbit_log:info("len ->~n ~p", [Result]), - Result. +len(#s { queue_name = DbQueueName }) -> + %% % rabbit_log:info("len(~n ~p) ->", [S]), + %% {atomic, Result} = + %% mnesia:transaction(fun () -> length(mnesia:all_keys(QTable)) end), + %% % rabbit_log:info("len ->~n ~p", [Result]), + %% Result. + yo_mama_bogus_result. %%---------------------------------------------------------------------------- %% is_empty/1 returns true iff the queue is empty. @@ -625,12 +610,13 @@ len(#s { q_table = QTable }) -> %% %% -spec(is_empty/1 :: (state()) -> boolean()). -is_empty(#s { q_table = QTable }) -> - % rabbit_log:info("is_empty(~n ~p) ->", [S]), - {atomic, Result} = - mnesia:transaction(fun () -> 0 == length(mnesia:all_keys(QTable)) end), - % rabbit_log:info("is_empty ->~n ~p", [Result]), - Result. +is_empty(#s { queue_name = DbQueueName }) -> + %% % rabbit_log:info("is_empty(~n ~p) ->", [S]), + %% {atomic, Result} = + %% mnesia:transaction(fun () -> 0 == length(mnesia:all_keys(QTable)) end), + %% % rabbit_log:info("is_empty ->~n ~p", [Result]), + %% Result. + yo_mama_bogus_result. %%---------------------------------------------------------------------------- %% set_ram_duration_target informs us that the target is to have no @@ -688,18 +674,18 @@ handle_pre_hibernate(S) -> S. %% %% -spec(status/1 :: (state()) -> [{atom(), any()}]). -status(#s { q_table = QTable, - p_table = PTable, +status(#s { queue_name = DbQueueName, next_seq_id = NextSeqId }) -> - % rabbit_log:info("status(~n ~p) ->", [S]), - {atomic, Result} = - mnesia:transaction( - fun () -> LQ = length(mnesia:all_keys(QTable)), - LP = length(mnesia:all_keys(PTable)), - [{len, LQ}, {next_seq_id, NextSeqId}, {acks, LP}] - end), - % rabbit_log:info("status ->~n ~p", [Result]), - Result. + %% % rabbit_log:info("status(~n ~p) ->", [S]), + %% {atomic, Result} = + %% mnesia:transaction( + %% fun () -> LQ = length(mnesia:all_keys(QTable)), + %% LP = length(mnesia:all_keys(PTable)), + %% [{len, LQ}, {next_seq_id, NextSeqId}, {acks, LP}] + %% end), + %% % rabbit_log:info("status ->~n ~p", [Result]), + %% Result. + yo_mama_bogus_result. %%---------------------------------------------------------------------------- %% Monadic helper functions for inside transactions. @@ -739,16 +725,17 @@ clear_table(Table) -> -spec delete_nonpersistent_msgs(atom()) -> ok. delete_nonpersistent_msgs(QTable) -> - lists:foreach( - fun (Key) -> - [#q_record { out_id = Key, m = M }] = - mnesia:read(QTable, Key, 'read'), - case M of - #m { msg = #basic_message { is_persistent = true }} -> ok; - _ -> mnesia:delete(QTable, Key, 'write') - end - end, - mnesia:all_keys(QTable)). + %% lists:foreach( + %% fun (Key) -> + %% [#q_record { out_id = Key, m = M }] = + %% mnesia:read(QTable, Key, 'read'), + %% case M of + %% #m { msg = #basic_message { is_persistent = true }} -> ok; + %% _ -> mnesia:delete(QTable, Key, 'write') + %% end + %% end, + %% mnesia:all_keys(QTable)). + yo_mama_bogus_result. %% internal_fetch/2 fetches the next msg, if any, inside an Mnesia %% transaction, generating a pending ack as necessary. @@ -786,12 +773,12 @@ tx_commit_state(Pubs, SeqIds, PropsF, S) -> publish_state(Msg, Props, IsDelivered, - S = #s { q_table = QTable, - next_seq_id = SeqId, - next_out_id = OutId }) -> - M = (m(Msg, SeqId, Props)) #m { is_delivered = IsDelivered }, - mnesia:write(QTable, #q_record { out_id = OutId, m = M }, 'write'), - S #s { next_seq_id = SeqId + 1, next_out_id = OutId + 1 }. + S = #s { queue_name = DbQueueName, + next_seq_id = SeqId }) -> + %% M = (m(Msg, SeqId, Props)) #m { is_delivered = IsDelivered }, + %% mnesia:write(QTable, #q_record { out_id = OutId, m = M }, 'write'), + %% S #s { next_seq_id = SeqId + 1, next_out_id = OutId + 1 }. + yo_mama_bogus_result. -spec(internal_ack/2 :: ([seq_id()], s()) -> s()). @@ -817,27 +804,29 @@ internal_dropwhile(Pred, S) -> -spec q_pop(s()) -> maybe(m()). -q_pop(#s { q_table = QTable }) -> - case mnesia:first(QTable) of - '$end_of_table' -> nothing; - OutId -> [#q_record { out_id = OutId, m = M }] = - mnesia:read(QTable, OutId, 'read'), - mnesia:delete(QTable, OutId, 'write'), - {just, M} - end. +q_pop(#s { queue_name = DbQueueName }) -> + %% case mnesia:first(QTable) of + %% '$end_of_table' -> nothing; + %% OutId -> [#q_record { out_id = OutId, m = M }] = + %% mnesia:read(QTable, OutId, 'read'), + %% mnesia:delete(QTable, OutId, 'write'), + %% {just, M} + %% end. + yo_mama_bogus_result. %% q_peek returns the first msg, if any, from the Q table in %% Mnesia. -spec q_peek(s()) -> maybe(m()). -q_peek(#s { q_table = QTable }) -> - case mnesia:first(QTable) of - '$end_of_table' -> nothing; - OutId -> [#q_record { out_id = OutId, m = M }] = - mnesia:read(QTable, OutId, 'read'), - {just, M} - end. +q_peek(#s { queue_name = DbQueueName }) -> + %% case mnesia:first(QTable) of + %% '$end_of_table' -> nothing; + %% OutId -> [#q_record { out_id = OutId, m = M }] = + %% mnesia:read(QTable, OutId, 'read'), + %% {just, M} + %% end. + yo_mama_bogus_result. %% post_pop operates after q_pop, calling add_p if necessary. @@ -846,23 +835,25 @@ q_peek(#s { q_table = QTable }) -> post_pop(true, M = #m { seq_id = SeqId, msg = Msg, is_delivered = IsDelivered }, - S = #s { q_table = QTable }) -> - LQ = length(mnesia:all_keys(QTable)), - add_p(M #m { is_delivered = true }, S), - {Msg, IsDelivered, SeqId, LQ}; -post_pop(false, - #m { msg = Msg, is_delivered = IsDelivered }, - #s { q_table = QTable }) -> - LQ = length(mnesia:all_keys(QTable)), - {Msg, IsDelivered, undefined, LQ}. + S = #s { queue_name = DbQueueName }) -> +%% LQ = length(mnesia:all_keys(QTable)), +%% add_p(M #m { is_delivered = true }, S), +%% {Msg, IsDelivered, SeqId, LQ}; +%% post_pop(false, +%% #m { msg = Msg, is_delivered = IsDelivered }, +%% #s { q_table = QTable }) -> +%% LQ = length(mnesia:all_keys(QTable)), +%% {Msg, IsDelivered, undefined, LQ}. + yo_mama_bogus_result. %% add_p adds a pending ack to the P table in Mnesia. -spec add_p(m(), s()) -> ok. -add_p(M = #m { seq_id = SeqId }, #s { p_table = PTable }) -> - mnesia:write(PTable, #p_record { seq_id = SeqId, m = M }, 'write'), - ok. +add_p(M = #m { seq_id = SeqId }, #s { queue_name = DbQueueName }) -> + %% mnesia:write(PTable, #p_record { seq_id = SeqId, m = M }, 'write'), + %% ok. + yo_mama_bogus_result. %% del_ps deletes some number of pending acks from the P table in %% Mnesia, applying a (Mnesia transactional) function F after each msg @@ -870,29 +861,28 @@ add_p(M = #m { seq_id = SeqId }, #s { p_table = PTable }) -> -spec del_ps(fun ((m(), s()) -> s()), [seq_id()], s()) -> s(). -del_ps(F, SeqIds, S = #s { p_table = PTable }) -> - lists:foldl( - fun (SeqId, Si) -> - [#p_record { m = M }] = mnesia:read(PTable, SeqId, 'read'), - mnesia:delete(PTable, SeqId, 'write'), - F(M, Si) - end, - S, - SeqIds). +del_ps(F, SeqIds, S = #s { queue_name = DbQueueName }) -> + %% lists:foldl( + %% fun (SeqId, Si) -> + %% [#p_record { m = M }] = mnesia:read(PTable, SeqId, 'read'), + %% mnesia:delete(PTable, SeqId, 'write'), + %% F(M, Si) + %% end, + %% S, + %% SeqIds). + yo_mama_bogus_result. %% save copies the volatile part of the state (next_seq_id and %% next_out_id) to Mnesia. -spec save(s()) -> ok. -save(#s { n_table = NTable, - next_seq_id = NextSeqId, - next_out_id = NextOutId }) -> - ok = mnesia:write(NTable, - #n_record { key = 'n', - next_seq_id = NextSeqId, - next_out_id = NextOutId }, - 'write'). +save(#s { queue_name = DbQueueName, + next_seq_id = NextSeqId}) -> + mysql_helper:begin_mysql_transaction(), + Result = mysql_helper:write_n_record(DbQueueName, NextSeqId), + mysql_helper:commit_mysql_transaction(), + ok = Result. %%---------------------------------------------------------------------------- %% Pure helper functions. |