summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-03-15 14:20:12 -0700
committerJerry Kuch <jerryk@vmware.com>2011-03-15 14:20:12 -0700
commit096411d81b3b1e8bcf41fd910050b678eb5d2849 (patch)
tree72497cd577a51e305af1827024a5093cae5b299a
parent22d6bfbe7cd88c0e606c31b7b1ea9a123431709f (diff)
downloadrabbitmq-server-096411d81b3b1e8bcf41fd910050b678eb5d2849.tar.gz
Fix typing blunders to pass more qpid tests.
-rw-r--r--src/mysql_helper.erl8
-rw-r--r--src/rabbit_mysql_queue.erl21
2 files changed, 18 insertions, 11 deletions
diff --git a/src/mysql_helper.erl b/src/mysql_helper.erl
index c3a36231..1fb03d77 100644
--- a/src/mysql_helper.erl
+++ b/src/mysql_helper.erl
@@ -185,18 +185,18 @@ count_rows_for_queue(TableType, DbQueueName) ->
{result_packet, _,_,[[Val]],_} = QueryResult,
Val.
-write_message_to_q(DbQueueName, Msg, IsPersistent) ->
+write_message_to_q(DbQueueName, M, IsPersistent) ->
emysql:execute(?RABBIT_DB_POOL_NAME,
write_msg_to_q_stmt,
- [DbQueueName, term_to_binary(Msg), IsPersistent]),
+ [DbQueueName, term_to_binary(M), IsPersistent]),
ok.
%% BUGBUG: Since the q table shadows is_persistent for convenience, will
%% the pending acks table need to as well?
-write_message_to_p(DbQueueName, SeqId, Msg) ->
+write_message_to_p(DbQueueName, SeqId, M) ->
emysql:execute(?RABBIT_DB_POOL_NAME,
insert_p_stmt,
- [SeqId, DbQueueName, term_to_binary(Msg)]),
+ [SeqId, DbQueueName, term_to_binary(M)]),
ok.
delete_message_from_p_by_seq_id(SeqId) ->
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl
index b12ed520..fa01f7e8 100644
--- a/src/rabbit_mysql_queue.erl
+++ b/src/rabbit_mysql_queue.erl
@@ -280,7 +280,8 @@ delete_and_terminate(S = #s { queue_name = DbQueueName }) ->
mysql_helper:clear_table(q, DbQueueName),
mysql_helper:clear_table(p, DbQueueName),
mysql_helper:commit_mysql_transaction(),
- rabbit_log:info("delete_and_terminate ->~n ~p", [S]).
+ rabbit_log:info("delete_and_terminate ->~n ~p", [S]),
+ S.
%%----------------------------------------------------------------------------
@@ -356,7 +357,11 @@ publish_delivered(true,
rabbit_log:info("publish_delivered(true,~n ~p,~n ~p,~n ~p) ->",
[Msg, Props, S]),
mysql_helper:begin_mysql_transaction(),
- mysql_helper:write_message_to_p(DbQueueName, SeqId, Msg),
+ mysql_helper:write_message_to_p(DbQueueName,
+ SeqId,
+ (m(Msg,
+ SeqId,
+ Props))#m{is_delivered = true}),
RS = S #s { next_seq_id = SeqId + 1 },
save(RS),
mysql_helper:commit_mysql_transaction(),
@@ -573,7 +578,7 @@ requeue(SeqIds, PropsF, S) ->
S),
save(RS),
mysql_helper:commit_mysql_transaction(),
- % rabbit_log:info("requeue ->~n ~p", [Result]),
+ % rabbit_log:info("requeue ->~n ~p", [RS]),
callback([]),
RS.
@@ -806,10 +811,12 @@ del_ps(F, SeqIds, S = #s { queue_name = DbQueueName }) ->
lists:foldl(
fun( SeqId, Si) ->
DbList = mysql_helper:read_p_record(DbQueueName, SeqId),
- [#p_record {m = M}] = emysql_util:as_record(DbList,
- p_record,
- record_info(fields,
- p_record)),
+ [#p_record {m = MBin}] =
+ emysql_util:as_record(DbList,
+ p_record,
+ record_info(fields,
+ p_record)),
+ M = binary_to_term(MBin),
mysql_helper:delete_message_from_p_by_seq_id(SeqId),
F(M, Si)
end,