summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-03-02 17:35:02 -0800
committerJerry Kuch <jerryk@vmware.com>2011-03-02 17:35:02 -0800
commitb04340d804e4f3085b8dcc361bf2a838783acbaf (patch)
tree6262d9867c48faab8246e7cc93d38f8ce4deb766
parent214216ca913bbd5600d1205703e96394cc564e70 (diff)
downloadrabbitmq-server-b04340d804e4f3085b8dcc361bf2a838783acbaf.tar.gz
MySQL queue peek.
-rw-r--r--src/mysql_helper.erl9
-rw-r--r--src/rabbit_mysql_queue.erl15
2 files changed, 19 insertions, 5 deletions
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl
index cda968bf..2047304a 100644
--- a/src/mysql_helper.erl
+++ b/src/mysql_helper.erl
@@ -84,7 +84,9 @@ prepare_mysql_statements() ->
{count_n_stmt,
<<"SELECT COUNT(*) FROM n WHERE queue_name = ?">>},
{write_msg_to_q_stmt,
- <<"INSERT INTO q(queue_name, m, is_persistent) VALUES (?,?,?)">>} ],
+ <<"INSERT INTO q(queue_name, m, is_persistent) VALUES (?,?,?)">>},
+ {q_peek_stmt,
+ <<"SELECT * FROM q WHERE queue_name = ? ORDER BY id LIMIT 1">>} ],
[ emysql:prepare(StmtAtom, StmtBody) || {StmtAtom, StmtBody} <- Statements ].
begin_mysql_transaction() ->
@@ -180,6 +182,11 @@ write_message_to_p(DbQueueName, SeqId, Msg) ->
[SeqId, DbQueueName, term_to_binary(Msg)]),
ok.
+q_peek(DbQueueName) ->
+ emysql:execute(?RABBIT_DB_POOL_NAME,
+ q_peek_stmt,
+ [DbQueueName]).
+
%% This is only for convenience in REPL debugging. Get rid of it later.
wake_up() ->
ensure_connection_pool(),
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl
index d2a59d2b..93862810 100644
--- a/src/rabbit_mysql_queue.erl
+++ b/src/rabbit_mysql_queue.erl
@@ -402,7 +402,7 @@ dropwhile(Pred, S) ->
%% {Atom, RS}
%% end),
mysql_helper:begin_mysql_transaction(),
-
+ Result = utterly_bogus_placeholder_result,
mysql_helper:commit_mysql_transaction(),
rabbit_log:info("dropwhile ->~n ~p", [Result]),
Result.
@@ -769,8 +769,7 @@ q_pop(#s { queue_name = DbQueueName }) ->
%% end.
yo_mama_bogus_result.
-%% q_peek returns the first msg, if any, from the Q table in
-%% Mnesia.
+%% q_peek returns the first msg, if any, from the Q table in MySQL.
-spec q_peek(s()) -> maybe(m()).
@@ -781,7 +780,15 @@ q_peek(#s { queue_name = DbQueueName }) ->
%% mnesia:read(QTable, OutId, 'read'),
%% {just, M}
%% end.
- yo_mama_bogus_result.
+ RecList = mysql_helper:q_peek(DbQueueName),
+ MList = emysql_util:as_record(RecList,
+ q_record,
+ record_info(fields,
+ q_record)),
+ case MList of
+ [] -> nothing;
+ [M] -> {just, M}
+ end.
%% post_pop operates after q_pop, calling add_p if necessary.