diff options
author | Jerry Kuch <jerryk@vmware.com> | 2011-03-02 17:35:02 -0800 |
---|---|---|
committer | Jerry Kuch <jerryk@vmware.com> | 2011-03-02 17:35:02 -0800 |
commit | b04340d804e4f3085b8dcc361bf2a838783acbaf (patch) | |
tree | 6262d9867c48faab8246e7cc93d38f8ce4deb766 | |
parent | 214216ca913bbd5600d1205703e96394cc564e70 (diff) | |
download | rabbitmq-server-b04340d804e4f3085b8dcc361bf2a838783acbaf.tar.gz |
MySQL queue peek.
-rw-r--r-- | src/mysql_helper.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mysql_queue.erl | 15 |
2 files changed, 19 insertions, 5 deletions
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl index cda968bf..2047304a 100644 --- a/src/mysql_helper.erl +++ b/src/mysql_helper.erl @@ -84,7 +84,9 @@ prepare_mysql_statements() -> {count_n_stmt, <<"SELECT COUNT(*) FROM n WHERE queue_name = ?">>}, {write_msg_to_q_stmt, - <<"INSERT INTO q(queue_name, m, is_persistent) VALUES (?,?,?)">>} ], + <<"INSERT INTO q(queue_name, m, is_persistent) VALUES (?,?,?)">>}, + {q_peek_stmt, + <<"SELECT * FROM q WHERE queue_name = ? ORDER BY id LIMIT 1">>} ], [ emysql:prepare(StmtAtom, StmtBody) || {StmtAtom, StmtBody} <- Statements ]. begin_mysql_transaction() -> @@ -180,6 +182,11 @@ write_message_to_p(DbQueueName, SeqId, Msg) -> [SeqId, DbQueueName, term_to_binary(Msg)]), ok. +q_peek(DbQueueName) -> + emysql:execute(?RABBIT_DB_POOL_NAME, + q_peek_stmt, + [DbQueueName]). + %% This is only for convenience in REPL debugging. Get rid of it later. wake_up() -> ensure_connection_pool(), diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl index d2a59d2b..93862810 100644 --- a/src/rabbit_mysql_queue.erl +++ b/src/rabbit_mysql_queue.erl @@ -402,7 +402,7 @@ dropwhile(Pred, S) -> %% {Atom, RS} %% end), mysql_helper:begin_mysql_transaction(), - + Result = utterly_bogus_placeholder_result, mysql_helper:commit_mysql_transaction(), rabbit_log:info("dropwhile ->~n ~p", [Result]), Result. @@ -769,8 +769,7 @@ q_pop(#s { queue_name = DbQueueName }) -> %% end. yo_mama_bogus_result. -%% q_peek returns the first msg, if any, from the Q table in -%% Mnesia. +%% q_peek returns the first msg, if any, from the Q table in MySQL. -spec q_peek(s()) -> maybe(m()). @@ -781,7 +780,15 @@ q_peek(#s { queue_name = DbQueueName }) -> %% mnesia:read(QTable, OutId, 'read'), %% {just, M} %% end. - yo_mama_bogus_result. + RecList = mysql_helper:q_peek(DbQueueName), + MList = emysql_util:as_record(RecList, + q_record, + record_info(fields, + q_record)), + case MList of + [] -> nothing; + [M] -> {just, M} + end. %% post_pop operates after q_pop, calling add_p if necessary. |