summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-03-01 17:09:32 -0800
committerJerry Kuch <jerryk@vmware.com>2011-03-01 17:09:32 -0800
commitb494ad7209b914cee98cb855c9225d8d9c3b1ce0 (patch)
tree920d3f6e0656fc04eb5388cd7ec7a868679f6ddc
parent5506a0fad18cf5485662231abe77d1cd4ed24d97 (diff)
downloadrabbitmq-server-b494ad7209b914cee98cb855c9225d8d9c3b1ce0.tar.gz
Miscellaneous MySQL wire up for init method.
-rw-r--r--src/mysql_helper.erl53
-rw-r--r--src/rabbit_mysql_queue.erl352
2 files changed, 215 insertions, 190 deletions
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl
index 733f122b..6c974410 100644
--- a/src/mysql_helper.erl
+++ b/src/mysql_helper.erl
@@ -54,12 +54,13 @@ ensure_connection_pool() ->
exit:pool_already_exists -> ok
end.
+
+%% NOTE: What the MySQL protocol actually supports in prepared statements
+%% seems a bit non-uniform. For example, 'COMMIT' is in, but
+%% 'START TRANSACTION' isn't. Fortunately, most of the things that
+%% are parametrizable, and thus prone to injection attacks and the
+%% like do seem to be there.
prepare_mysql_statements() ->
- %% NOTE: What the MySQL protocol actually supports in prepared statements
- %% seems a bit non-uniform. For example, 'COMMIT' is in, but
- %% 'START TRANSACTION' isn't. Fortunately, most of the things that
- %% are parametrizable, and thus prone to injection attacks and the
- %% like do seem to be there.
Statements = [{insert_q_stmt,<<"INSERT INTO q(queue_name, m) VALUES(?,?)">>},
{insert_p_stmt,
<<"INSERT INTO p(seq_id, queue_name, m) VALUES(?,?,?)">>},
@@ -67,18 +68,52 @@ prepare_mysql_statements() ->
<<"INSERT INTO n(queue_name, next_seq_id) VALUES(?,?)">>},
{delete_q_stmt,<<"DELETE FROM q WHERE queue_name = ?">>},
{delete_p_stmt,<<"DELETE FROM p WHERE queue_name = ?">>},
- {delete_n_stmt,<<"DELETE FROM n WHERE queue_name = ?">>}],
+ {delete_n_stmt,<<"DELETE FROM n WHERE queue_name = ?">>},
+ {read_n_stmt, <<"SELECT * FROM n WHERE queue_name = ?">>},
+ {put_n_stmt, <<"REPLACE INTO n(queue_name, next_seq_id) VALUES(?,?)">>} ],
+
[ emysql:prepare(StmtAtom, StmtBody) || {StmtAtom, StmtBody} <- Statements ].
+begin_mysql_transaction() ->
+ emysql:execute(?RABBIT_DB_POOL_NAME,
+ <<"START TRANSACTION">>).
+
+commit_mysql_transaction() ->
+ emysql:execute(?RABBIT_DB_POOL_NAME,
+ <<"COMMIT">>).
+
delete_queue_data(QueueName) ->
%% TODO: Error checking...
- emysql:execute(?RABBIT_DB_POOL_NAME,
- <<"START TRANSACTION">>),
[ emysql:execute(?RABBIT_DB_POOL_NAME,
Stmt,
[QueueName]) || Stmt <- [delete_q_stmt,
delete_p_stmt,
delete_n_stmt] ],
+ ok.
+
+read_n_record(QueueName) ->
+ %% BUGBUG: Ugly. We really should convert the result to Erlang records
+ %% here and isolate rabbit_mysql_queue from any direct touching
+ %% of the emysql library, but we need to split some records out
+ %% to an include file first...
emysql:execute(?RABBIT_DB_POOL_NAME,
- <<"COMMIT">>),
+ read_n_stmt,
+ [QueueName]).
+
+%% TODO: Consistent error handling...
+write_n_record(DbQueueName, NextSeqId) ->
+ Result = emysql:execute(?RABBIT_DB_POOL_NAME,
+ put_n_stmt,
+ [DbQueueName, NextSeqId]),
+ case Result of
+ #ok_packet{} -> ok;
+ #error_packet{} -> rabbit_log:error("Failed REPLACE on n Table (~p,~p)",
+ [DbQueueName, NextSeqId])
+ end.
+
+%% Delete non-persistent msgs after a restart.
+-spec delete_nonpersistent_msgs(string()) -> ok.
+
+delete_nonpersistent_msgs(DbQueueName) ->
+
ok.
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl
index b7c8af3f..bf7c558c 100644
--- a/src/rabbit_mysql_queue.erl
+++ b/src/rabbit_mysql_queue.erl
@@ -71,11 +71,8 @@
%% (which can be dropped on a crash).
-record(s, % The in-RAM queue state
- { q_table, % The MySQL queue table name
- p_table, % The MySQL pending-ack table name
- n_table, % The MySQL next_(seq_id, out_id) table name
+ { queue_name, % Queue name as canonicalized for database
next_seq_id, % The next M's seq_id
- next_out_id, % The next M's out id
txn_dict % In-progress txn->tx map
}).
@@ -110,7 +107,8 @@
%% requeued while keeping the same seq_id.)
-record(q_record, % Q records in MySQL
- { out_id, % The key: The out_id
+ { id, % Analogous to the 'out_id' out in Mnesia queue
+ queue_name, % Queue name
m % The value: The M
}).
@@ -119,7 +117,8 @@
%% accssed by seq_id.
-record(p_record, % P records in MySQL
- { seq_id, % The key: The seq_id
+ { seq_id, % The seq_id
+ queue_name, % Queue name
m % The value: The M
}).
@@ -129,10 +128,9 @@
%% MySQL transaction that updates them in the in-RAM S.
-record(n_record, % next_seq_id & next_out_id record in MySQL
- { key, % The key: the atom 'n'
- next_seq_id, % The MySQL next_seq_id
- next_out_id % The MySQL next_out_id
- }).
+ { queue_name, % Queue name
+ next_seq_id % The MySQL next_seq_id
+ }).
-include("rabbit.hrl").
@@ -147,11 +145,8 @@
-type(seq_id() :: non_neg_integer()).
-type(ack() :: seq_id()).
--type(s() :: #s { q_table :: atom(),
- p_table :: atom(),
- n_table :: atom(),
+-type(s() :: #s { queue_name :: string(),
next_seq_id :: seq_id(),
- next_out_id :: non_neg_integer(),
txn_dict :: dict() }).
-type(state() :: s()).
@@ -164,15 +159,15 @@
rabbit_types:message_properties()}],
to_ack :: [seq_id()] }).
--type(q_record() :: #q_record { out_id :: non_neg_integer(),
+-type(q_record() :: #q_record { id :: non_neg_integer(),
+ queue_name :: string(),
m :: m() }).
-type(p_record() :: #p_record { seq_id :: seq_id(),
m :: m() }).
--type(n_record() :: #n_record { key :: 'n',
- next_seq_id :: seq_id(),
- next_out_id :: non_neg_integer() }).
+-type(n_record() :: #n_record { queue_name:: string(),
+ next_seq_id :: seq_id() }).
-include("rabbit_backing_queue_spec.hrl").
@@ -215,59 +210,46 @@ stop() -> ok.
%% init/3 creates one backing queue, returning its state. Names are
%% local to the vhost, and must be unique.
%%
-%% init/3 creates Mnesia transactions to run in, and therefore may not
-%% be called from inside another Mnesia transaction.
+%% init/3 creates MySQL transactions to run in, and therefore may not
+%% be called from inside another MySQL transaction.
%%
%% -spec(init/3 ::
%% (rabbit_amqqueue:name(), is_durable(), attempt_recovery())
%% -> state()).
-
-%% BUG: We should allow clustering of the Mnesia tables.
-
-%% BUG: It's unfortunate that this can't all be done in a single
-%% Mnesia transaction!
-
+%%
+%% BUGBUG: Error checking...
init(QueueName, IsDurable, Recover) ->
- rabbit_log:info("init(~n ~p,~n ~p,~n ~p) ->", [QueueName, IsDurable, Recover]),
+ rabbit_log:info("init(~n ~p,~n ~p,~n ~p) ->",
+ [QueueName, IsDurable, Recover]),
DbQueueName = canonicalize_queue_name(QueueName),
case Recover of
false -> _ = mysql_helper:delete_queue_data(DbQueueName);
true -> ok
end,
- %% BOOKMARK
- %% create_table(QTable, 'q_record', 'ordered_set', record_info(fields,
- %% q_record)),
- %% create_table(PTable, 'p_record', 'set', record_info(fields, p_record)),
- %% create_table(NTable, 'n_record', 'set', record_info(fields, n_record)),
- %% {atomic, Result} =
- %% mnesia:transaction(
- %% fun () ->
- %% case IsDurable of
- %% false -> clear_table(QTable),
- %% clear_table(PTable),
- %% clear_table(NTable);
- %% true -> delete_nonpersistent_msgs(QTable)
- %% end,
- %% {NextSeqId, NextOutId} =
- %% case mnesia:read(NTable, 'n', 'read') of
- %% [] -> {0, 0};
- %% [#n_record { next_seq_id = NextSeqId0,
- %% next_out_id = NextOutId0 }] ->
- %% {NextSeqId0, NextOutId0}
- %% end,
- %% RS = #s { q_table = QTable,
- %% p_table = PTable,
- %% n_table = NTable,
- %% next_seq_id = NextSeqId,
- %% next_out_id = NextOutId,
- %% txn_dict = dict:new() },
- %% save(RS),
- %% RS
- %% end),
- %% % rabbit_log:info("init ->~n ~p", [Result]),
+
+ mysql_helper:begin_mysql_transaction(),
+ case IsDurable of
+ false -> mysql_helper:delete_queue_data(DbQueueName);
+ true -> mysql_helper:delete_nonpersistent_msgs(DbQueueName)
+ end,
+ NReadResult = mysql_helper:read_n_record(DbQueueName),
+ NRecs = emysql_util:as_record(NReadResult,
+ n_record,
+ record_info(fields, n_record)),
+ NextSeqId = case NRecs of
+ [] -> 0;
+ [#n_record{ next_seq_id = NextSeqId0 }] -> NextSeqId0
+ end,
+ RS = #s { queue_name = DbQueueName,
+ next_seq_id = NextSeqId,
+ txn_dict = dict:new() },
+ save(RS),
+ mysql_helper:commit_mysql_transaction(),
+ Result = RS,
+ rabbit_log:info("init ->~n ~p", [Result]),
callback([]),
- Result = yo_mama_i_am_a_placeholder.
+ Result.
%%#############################################################################
%% OTHER SIDE OF THE RUBICON...
@@ -283,13 +265,14 @@ init(QueueName, IsDurable, Recover) ->
%%
%% -spec(terminate/1 :: (state()) -> state()).
-terminate(S = #s { q_table = QTable, p_table = PTable, n_table = NTable }) ->
- % rabbit_log:info("terminate(~n ~p) ->", [S]),
- {atomic, Result} =
- mnesia:transaction(fun () -> clear_table(PTable), S end),
- mnesia:dump_tables([QTable, PTable, NTable]),
- % rabbit_log:info("terminate ->~n ~p", [Result]),
- Result.
+terminate(S = #s { queue_name = DbQueueName}) ->
+ %% % rabbit_log:info("terminate(~n ~p) ->", [S]),
+ %% {atomic, Result} =
+ %% mnesia:transaction(fun () -> clear_table(PTable), S end),
+ %% mnesia:dump_tables([QTable, PTable, NTable]),
+ %% % rabbit_log:info("terminate ->~n ~p", [Result]),
+ %% Result.
+ yo_mama_bogus_result.
%%----------------------------------------------------------------------------
%% delete_and_terminate/1 deletes all of a queue's enqueued msgs and
@@ -300,18 +283,17 @@ terminate(S = #s { q_table = QTable, p_table = PTable, n_table = NTable }) ->
%%
%% -spec(delete_and_terminate/1 :: (state()) -> state()).
-delete_and_terminate(S = #s { q_table = QTable,
- p_table = PTable,
- n_table = NTable }) ->
+delete_and_terminate(S = #s { queue_name = DbQueueName }) ->
% rabbit_log:info("delete_and_terminate(~n ~p) ->", [S]),
- {atomic, Result} =
- mnesia:transaction(fun () -> clear_table(QTable),
- clear_table(PTable),
- S
- end),
- mnesia:dump_tables([QTable, PTable, NTable]),
- % rabbit_log:info("delete_and_terminate ->~n ~p", [Result]),
- Result.
+ %% {atomic, Result} =
+ %% mnesia:transaction(fun () -> clear_table(QTable),
+ %% clear_table(PTable),
+ %% S
+ %% end),
+ %% mnesia:dump_tables([QTable, PTable, NTable]),
+ %% % rabbit_log:info("delete_and_terminate ->~n ~p", [Result]),
+ %% Result.
+ yo_mama_bogus_result.
%%----------------------------------------------------------------------------
%% purge/1 deletes all of queue's enqueued msgs, generating pending
@@ -322,15 +304,16 @@ delete_and_terminate(S = #s { q_table = QTable,
%%
%% -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-purge(S = #s { q_table = QTable }) ->
+purge(S = #s { queue_name = DbQueueName }) ->
% rabbit_log:info("purge(~n ~p) ->", [S]),
- {atomic, Result} =
- mnesia:transaction(fun () -> LQ = length(mnesia:all_keys(QTable)),
- clear_table(QTable),
- {LQ, S}
- end),
- % rabbit_log:info("purge ->~n ~p", [Result]),
- Result.
+ %% {atomic, Result} =
+ %% mnesia:transaction(fun () -> LQ = length(mnesia:all_keys(QTable)),
+ %% clear_table(QTable),
+ %% {LQ, S}
+ %% end),
+ %% % rabbit_log:info("purge ->~n ~p", [Result]),
+ %% Result.
+ yo_mama_bogus_result.
%%----------------------------------------------------------------------------
%% publish/3 publishes a msg.
@@ -379,20 +362,21 @@ publish_delivered(false, Msg, Props, S) ->
publish_delivered(true,
Msg,
Props,
- S = #s { next_seq_id = SeqId, next_out_id = OutId }) ->
+ S = #s { next_seq_id = SeqId }) ->
% rabbit_log:info("publish_delivered(true,~n ~p,~n ~p,~n ~p) ->", [Msg, Props, S]),
- {atomic, Result} =
- mnesia:transaction(
- fun () ->
- add_p((m(Msg, SeqId, Props)) #m { is_delivered = true }, S),
- RS = S #s { next_seq_id = SeqId + 1,
- next_out_id = OutId + 1 },
- save(RS),
- {SeqId, RS}
- end),
- % rabbit_log:info("publish_delivered ->~n ~p", [Result]),
- callback([{Msg, Props}]),
- Result.
+ %% {atomic, Result} =
+ %% mnesia:transaction(
+ %% fun () ->
+ %% add_p((m(Msg, SeqId, Props)) #m { is_delivered = true }, S),
+ %% RS = S #s { next_seq_id = SeqId + 1,
+ %% next_out_id = OutId + 1 },
+ %% save(RS),
+ %% {SeqId, RS}
+ %% end),
+ %% % rabbit_log:info("publish_delivered ->~n ~p", [Result]),
+ %% callback([{Msg, Props}]),
+ %% Result.
+ yo_mama_bogus_result.
%%----------------------------------------------------------------------------
%% dropwhile/2 drops msgs from the head of the queue while there are
@@ -610,12 +594,13 @@ requeue(SeqIds, PropsF, S) ->
%%
%% -spec(len/1 :: (state()) -> non_neg_integer()).
-len(#s { q_table = QTable }) ->
- % rabbit_log:info("len(~n ~p) ->", [S]),
- {atomic, Result} =
- mnesia:transaction(fun () -> length(mnesia:all_keys(QTable)) end),
- % rabbit_log:info("len ->~n ~p", [Result]),
- Result.
+len(#s { queue_name = DbQueueName }) ->
+ %% % rabbit_log:info("len(~n ~p) ->", [S]),
+ %% {atomic, Result} =
+ %% mnesia:transaction(fun () -> length(mnesia:all_keys(QTable)) end),
+ %% % rabbit_log:info("len ->~n ~p", [Result]),
+ %% Result.
+ yo_mama_bogus_result.
%%----------------------------------------------------------------------------
%% is_empty/1 returns true iff the queue is empty.
@@ -625,12 +610,13 @@ len(#s { q_table = QTable }) ->
%%
%% -spec(is_empty/1 :: (state()) -> boolean()).
-is_empty(#s { q_table = QTable }) ->
- % rabbit_log:info("is_empty(~n ~p) ->", [S]),
- {atomic, Result} =
- mnesia:transaction(fun () -> 0 == length(mnesia:all_keys(QTable)) end),
- % rabbit_log:info("is_empty ->~n ~p", [Result]),
- Result.
+is_empty(#s { queue_name = DbQueueName }) ->
+ %% % rabbit_log:info("is_empty(~n ~p) ->", [S]),
+ %% {atomic, Result} =
+ %% mnesia:transaction(fun () -> 0 == length(mnesia:all_keys(QTable)) end),
+ %% % rabbit_log:info("is_empty ->~n ~p", [Result]),
+ %% Result.
+ yo_mama_bogus_result.
%%----------------------------------------------------------------------------
%% set_ram_duration_target informs us that the target is to have no
@@ -688,18 +674,18 @@ handle_pre_hibernate(S) -> S.
%%
%% -spec(status/1 :: (state()) -> [{atom(), any()}]).
-status(#s { q_table = QTable,
- p_table = PTable,
+status(#s { queue_name = DbQueueName,
next_seq_id = NextSeqId }) ->
- % rabbit_log:info("status(~n ~p) ->", [S]),
- {atomic, Result} =
- mnesia:transaction(
- fun () -> LQ = length(mnesia:all_keys(QTable)),
- LP = length(mnesia:all_keys(PTable)),
- [{len, LQ}, {next_seq_id, NextSeqId}, {acks, LP}]
- end),
- % rabbit_log:info("status ->~n ~p", [Result]),
- Result.
+ %% % rabbit_log:info("status(~n ~p) ->", [S]),
+ %% {atomic, Result} =
+ %% mnesia:transaction(
+ %% fun () -> LQ = length(mnesia:all_keys(QTable)),
+ %% LP = length(mnesia:all_keys(PTable)),
+ %% [{len, LQ}, {next_seq_id, NextSeqId}, {acks, LP}]
+ %% end),
+ %% % rabbit_log:info("status ->~n ~p", [Result]),
+ %% Result.
+ yo_mama_bogus_result.
%%----------------------------------------------------------------------------
%% Monadic helper functions for inside transactions.
@@ -739,16 +725,17 @@ clear_table(Table) ->
-spec delete_nonpersistent_msgs(atom()) -> ok.
delete_nonpersistent_msgs(QTable) ->
- lists:foreach(
- fun (Key) ->
- [#q_record { out_id = Key, m = M }] =
- mnesia:read(QTable, Key, 'read'),
- case M of
- #m { msg = #basic_message { is_persistent = true }} -> ok;
- _ -> mnesia:delete(QTable, Key, 'write')
- end
- end,
- mnesia:all_keys(QTable)).
+ %% lists:foreach(
+ %% fun (Key) ->
+ %% [#q_record { out_id = Key, m = M }] =
+ %% mnesia:read(QTable, Key, 'read'),
+ %% case M of
+ %% #m { msg = #basic_message { is_persistent = true }} -> ok;
+ %% _ -> mnesia:delete(QTable, Key, 'write')
+ %% end
+ %% end,
+ %% mnesia:all_keys(QTable)).
+ yo_mama_bogus_result.
%% internal_fetch/2 fetches the next msg, if any, inside an Mnesia
%% transaction, generating a pending ack as necessary.
@@ -786,12 +773,12 @@ tx_commit_state(Pubs, SeqIds, PropsF, S) ->
publish_state(Msg,
Props,
IsDelivered,
- S = #s { q_table = QTable,
- next_seq_id = SeqId,
- next_out_id = OutId }) ->
- M = (m(Msg, SeqId, Props)) #m { is_delivered = IsDelivered },
- mnesia:write(QTable, #q_record { out_id = OutId, m = M }, 'write'),
- S #s { next_seq_id = SeqId + 1, next_out_id = OutId + 1 }.
+ S = #s { queue_name = DbQueueName,
+ next_seq_id = SeqId }) ->
+ %% M = (m(Msg, SeqId, Props)) #m { is_delivered = IsDelivered },
+ %% mnesia:write(QTable, #q_record { out_id = OutId, m = M }, 'write'),
+ %% S #s { next_seq_id = SeqId + 1, next_out_id = OutId + 1 }.
+ yo_mama_bogus_result.
-spec(internal_ack/2 :: ([seq_id()], s()) -> s()).
@@ -817,27 +804,29 @@ internal_dropwhile(Pred, S) ->
-spec q_pop(s()) -> maybe(m()).
-q_pop(#s { q_table = QTable }) ->
- case mnesia:first(QTable) of
- '$end_of_table' -> nothing;
- OutId -> [#q_record { out_id = OutId, m = M }] =
- mnesia:read(QTable, OutId, 'read'),
- mnesia:delete(QTable, OutId, 'write'),
- {just, M}
- end.
+q_pop(#s { queue_name = DbQueueName }) ->
+ %% case mnesia:first(QTable) of
+ %% '$end_of_table' -> nothing;
+ %% OutId -> [#q_record { out_id = OutId, m = M }] =
+ %% mnesia:read(QTable, OutId, 'read'),
+ %% mnesia:delete(QTable, OutId, 'write'),
+ %% {just, M}
+ %% end.
+ yo_mama_bogus_result.
%% q_peek returns the first msg, if any, from the Q table in
%% Mnesia.
-spec q_peek(s()) -> maybe(m()).
-q_peek(#s { q_table = QTable }) ->
- case mnesia:first(QTable) of
- '$end_of_table' -> nothing;
- OutId -> [#q_record { out_id = OutId, m = M }] =
- mnesia:read(QTable, OutId, 'read'),
- {just, M}
- end.
+q_peek(#s { queue_name = DbQueueName }) ->
+ %% case mnesia:first(QTable) of
+ %% '$end_of_table' -> nothing;
+ %% OutId -> [#q_record { out_id = OutId, m = M }] =
+ %% mnesia:read(QTable, OutId, 'read'),
+ %% {just, M}
+ %% end.
+ yo_mama_bogus_result.
%% post_pop operates after q_pop, calling add_p if necessary.
@@ -846,23 +835,25 @@ q_peek(#s { q_table = QTable }) ->
post_pop(true,
M = #m { seq_id = SeqId, msg = Msg, is_delivered = IsDelivered },
- S = #s { q_table = QTable }) ->
- LQ = length(mnesia:all_keys(QTable)),
- add_p(M #m { is_delivered = true }, S),
- {Msg, IsDelivered, SeqId, LQ};
-post_pop(false,
- #m { msg = Msg, is_delivered = IsDelivered },
- #s { q_table = QTable }) ->
- LQ = length(mnesia:all_keys(QTable)),
- {Msg, IsDelivered, undefined, LQ}.
+ S = #s { queue_name = DbQueueName }) ->
+%% LQ = length(mnesia:all_keys(QTable)),
+%% add_p(M #m { is_delivered = true }, S),
+%% {Msg, IsDelivered, SeqId, LQ};
+%% post_pop(false,
+%% #m { msg = Msg, is_delivered = IsDelivered },
+%% #s { q_table = QTable }) ->
+%% LQ = length(mnesia:all_keys(QTable)),
+%% {Msg, IsDelivered, undefined, LQ}.
+ yo_mama_bogus_result.
%% add_p adds a pending ack to the P table in Mnesia.
-spec add_p(m(), s()) -> ok.
-add_p(M = #m { seq_id = SeqId }, #s { p_table = PTable }) ->
- mnesia:write(PTable, #p_record { seq_id = SeqId, m = M }, 'write'),
- ok.
+add_p(M = #m { seq_id = SeqId }, #s { queue_name = DbQueueName }) ->
+ %% mnesia:write(PTable, #p_record { seq_id = SeqId, m = M }, 'write'),
+ %% ok.
+ yo_mama_bogus_result.
%% del_ps deletes some number of pending acks from the P table in
%% Mnesia, applying a (Mnesia transactional) function F after each msg
@@ -870,29 +861,28 @@ add_p(M = #m { seq_id = SeqId }, #s { p_table = PTable }) ->
-spec del_ps(fun ((m(), s()) -> s()), [seq_id()], s()) -> s().
-del_ps(F, SeqIds, S = #s { p_table = PTable }) ->
- lists:foldl(
- fun (SeqId, Si) ->
- [#p_record { m = M }] = mnesia:read(PTable, SeqId, 'read'),
- mnesia:delete(PTable, SeqId, 'write'),
- F(M, Si)
- end,
- S,
- SeqIds).
+del_ps(F, SeqIds, S = #s { queue_name = DbQueueName }) ->
+ %% lists:foldl(
+ %% fun (SeqId, Si) ->
+ %% [#p_record { m = M }] = mnesia:read(PTable, SeqId, 'read'),
+ %% mnesia:delete(PTable, SeqId, 'write'),
+ %% F(M, Si)
+ %% end,
+ %% S,
+ %% SeqIds).
+ yo_mama_bogus_result.
%% save copies the volatile part of the state (next_seq_id and
%% next_out_id) to Mnesia.
-spec save(s()) -> ok.
-save(#s { n_table = NTable,
- next_seq_id = NextSeqId,
- next_out_id = NextOutId }) ->
- ok = mnesia:write(NTable,
- #n_record { key = 'n',
- next_seq_id = NextSeqId,
- next_out_id = NextOutId },
- 'write').
+save(#s { queue_name = DbQueueName,
+ next_seq_id = NextSeqId}) ->
+ mysql_helper:begin_mysql_transaction(),
+ Result = mysql_helper:write_n_record(DbQueueName, NextSeqId),
+ mysql_helper:commit_mysql_transaction(),
+ ok = Result.
%%----------------------------------------------------------------------------
%% Pure helper functions.