summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-03-08 12:38:14 -0800
committerJerry Kuch <jerryk@vmware.com>2011-03-08 12:38:14 -0800
commit60bfba637639c29aa158ae52f97afe0ccf5d7817 (patch)
tree34ab8bcb206d64e6c2939ecf865907233e89387d
parent1ff2a7e797a0dfe65112a40daa31ac49dead360b (diff)
downloadrabbitmq-server-60bfba637639c29aa158ae52f97afe0ccf5d7817.tar.gz
Re-organize prepared statements. Add SQL support for internal_ack.
-rw-r--r--src/mysql_helper.erl50
1 files changed, 34 insertions, 16 deletions
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl
index 15e9e2ec..515b3e08 100644
--- a/src/mysql_helper.erl
+++ b/src/mysql_helper.erl
@@ -60,35 +60,47 @@ ensure_connection_pool() ->
%% 'START TRANSACTION' isn't. Fortunately, most of the things that
%% are parametrizable, and thus prone to injection attacks and the
%% like do seem to be there.
+%%
+%% NOTE: Other note. There are a few other things that seem missing. For
+%% example there doesn't seem to be an easy way to prepare a list of
+%% values into a prepared statement '?' e.g. in something like:
+%% DELETE FROM whatever WHERE id IN (1,2,3,4,5);
prepare_mysql_statements() ->
- Statements = [{insert_q_stmt,<<"INSERT INTO q(queue_name, m) VALUES(?,?)">>},
- {insert_p_stmt,
- <<"INSERT INTO p(seq_id, queue_name, m) VALUES(?,?,?)">>},
- {insert_n_stmt,
- <<"INSERT INTO n(queue_name, next_seq_id) VALUES(?,?)">>},
+ Statements = [ %% ---------- Statements for 'q' table ------------
+ {insert_q_stmt,<<"INSERT INTO q(queue_name, m) VALUES(?,?)">>},
{delete_q_stmt,<<"DELETE FROM q WHERE queue_name = ?">>},
- {delete_p_stmt,<<"DELETE FROM p WHERE queue_name = ?">>},
- {delete_n_stmt,<<"DELETE FROM n WHERE queue_name = ?">>},
{delete_non_persistent_msgs_stmt,
<<"DELETE FROM q WHERE queue_name = ? AND is_persistent = FALSE">>},
- {read_n_stmt, <<"SELECT * FROM n WHERE queue_name = ?">>},
- {put_n_stmt, <<"REPLACE INTO n(queue_name, next_seq_id) VALUES(?,?)">>},
- {clear_p_stmt,
- <<"DELETE FROM p WHERE queue_name = ?">>},
{clear_q_stmt,
<<"DELETE FROM q WHERE queue_name = ?">>},
- {count_p_stmt,
- <<"SELECT COUNT(*) FROM p WHERE queue_name = ?">>},
{count_q_stmt,
<<"SELECT COUNT(*) FROM q WHERE queue_name = ?">>},
- {count_n_stmt,
- <<"SELECT COUNT(*) FROM n WHERE queue_name = ?">>},
{write_msg_to_q_stmt,
<<"INSERT INTO q(queue_name, m, is_persistent) VALUES (?,?,?)">>},
{q_peek_stmt,
<<"SELECT * FROM q WHERE queue_name = ? ORDER BY id LIMIT 1">>},
{q_delete_stmt,
- <<"DELETE FROM q WHERE queue_name = ? and id = ?">>} ],
+ <<"DELETE FROM q WHERE queue_name = ? and id = ?">>},
+
+ %% ---------- Statements for 'p' table ------------
+ {insert_p_stmt,
+ <<"INSERT INTO p(seq_id, queue_name, m) VALUES(?,?,?)">>},
+ {delete_p_stmt,<<"DELETE FROM p WHERE queue_name = ?">>},
+ {delete_p_by_seq_id_stmt,
+ <<"DELETE FROM p WHERE seq_id = ?">>},
+ {clear_p_stmt,
+ <<"DELETE FROM p WHERE queue_name = ?">>},
+ {count_p_stmt,
+ <<"SELECT COUNT(*) FROM p WHERE queue_name = ?">>},
+
+ %% ---------- Statements for 'n' table ------------
+ {insert_n_stmt,
+ <<"INSERT INTO n(queue_name, next_seq_id) VALUES(?,?)">>},
+ {delete_n_stmt,<<"DELETE FROM n WHERE queue_name = ?">>},
+ {read_n_stmt, <<"SELECT * FROM n WHERE queue_name = ?">>},
+ {put_n_stmt, <<"REPLACE INTO n(queue_name, next_seq_id) VALUES(?,?)">>},
+ {count_n_stmt,
+ <<"SELECT COUNT(*) FROM n WHERE queue_name = ?">>} ],
[ emysql:prepare(StmtAtom, StmtBody) || {StmtAtom, StmtBody} <- Statements ].
begin_mysql_transaction() ->
@@ -184,6 +196,12 @@ write_message_to_p(DbQueueName, SeqId, Msg) ->
[SeqId, DbQueueName, term_to_binary(Msg)]),
ok.
+delete_message_from_p(DbQueueName, SeqId) ->
+ emysql:execute(?RABBIT_DB_POOL_NAME,
+ delete_p_stmt,
+ [SeqId]),
+ ok.
+
q_peek(DbQueueName) ->
emysql:execute(?RABBIT_DB_POOL_NAME,
q_peek_stmt,