diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-17 12:13:42 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-17 12:13:42 +0100 |
commit | e58f9b400068753405a308cf1d8269e9cb0331e4 (patch) | |
tree | 8764f418f6d6d20eb9a569933c9cff2ac51dcc78 | |
parent | 4ac62980edaac85249818cb65fad616149c1c38c (diff) | |
download | rabbitmq-server-e58f9b400068753405a308cf1d8269e9cb0331e4.tar.gz |
just removing tabs
-rw-r--r-- | src/rabbit.erl | 4 | ||||
-rw-r--r-- | src/rabbit_db_queue.erl | 290 | ||||
-rw-r--r-- | src/rabbit_disk_queue.erl | 52 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 92 |
4 files changed, 219 insertions, 219 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 44e4dc7f..2eecac5e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -136,7 +136,7 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), - {ok, MemoryAlarms} = application:get_env(memory_alarms), + {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), ok = start_child(rabbit_queue_mode_manager), @@ -311,7 +311,7 @@ rotate_logs(File, Suffix, OldHandler, NewHandler) -> log_rotation_result({error, MainLogError}, {error, SaslLogError}) -> {error, {{cannot_rotate_main_logs, MainLogError}, - {cannot_rotate_sasl_logs, SaslLogError}}}; + {cannot_rotate_sasl_logs, SaslLogError}}}; log_rotation_result({error, MainLogError}, ok) -> {error, {cannot_rotate_main_logs, MainLogError}}; log_rotation_result(ok, {error, SaslLogError}) -> diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl index 897a4a6f..7530892d 100644 --- a/src/rabbit_db_queue.erl +++ b/src/rabbit_db_queue.erl @@ -60,7 +60,7 @@ terminate/2, code_change/3]). -export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, - tx_commit/3, tx_cancel/1, requeue/2, purge/1]). + tx_commit/3, tx_cancel/1, requeue/2, purge/1]). -export([stop/0, stop_and_obliterate/0]). @@ -75,13 +75,13 @@ -type(seq_id() :: non_neg_integer()). -spec(start_link/1 :: (non_neg_integer()) -> - {'ok', pid()} | 'ignore' | {'error', any()}). + {'ok', pid()} | 'ignore' | {'error', any()}). -spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> - {'empty' | {msg_id(), binary(), non_neg_integer(), - bool(), {msg_id(), seq_id()}}}). + {'empty' | {msg_id(), binary(), non_neg_integer(), + bool(), {msg_id(), seq_id()}}}). -spec(phantom_deliver/1 :: (queue_name()) -> - { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). + { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). @@ -97,7 +97,7 @@ start_link(DSN) -> gen_server:start_link({local, ?SERVER}, ?MODULE, - [DSN], []). + [DSN], []). publish(Q, MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). @@ -139,7 +139,7 @@ init([DSN]) -> process_flag(trap_exit, true), odbc:start(), {ok, Conn} = odbc:connect(DSN, [{auto_commit, off}, {tuple_row, on}, - {scrollable_cursors, off}, {trace_driver, off}]), + {scrollable_cursors, off}, {trace_driver, off}]), State = #dbstate { db_conn = Conn }, compact_already_delivered(State), {ok, State}. @@ -213,12 +213,12 @@ escape_byte(B) when B > 31 andalso B < 127 -> B; escape_byte(B) -> case io_lib:format("~.8B", [B]) of - O1 = [[_]] -> - "\\\\00" ++ O1; - O2 = [[_,_]] -> - "\\\\0" ++ O2; - O3 = [[_,_,_]] -> - "\\\\" ++ O3 + O1 = [[_]] -> + "\\\\00" ++ O1; + O2 = [[_,_]] -> + "\\\\0" ++ O2; + O3 = [[_,_,_]] -> + "\\\\" ++ O3 end. escaped_string_to_binary(Str) when is_list(Str) -> @@ -230,9 +230,9 @@ escaped_string_to_binary([$\\,$\\|Rest], Acc) -> escaped_string_to_binary(Rest, [$\\ | Acc]); escaped_string_to_binary([$\\,A,B,C|Rest], Acc) -> escaped_string_to_binary(Rest, [(list_to_integer([A])*64) + - (list_to_integer([B])*8) + - list_to_integer([C]) - | Acc]); + (list_to_integer([B])*8) + + list_to_integer([C]) + | Acc]); escaped_string_to_binary([C|Rest], Acc) -> escaped_string_to_binary(Rest, [C|Acc]). @@ -250,37 +250,37 @@ hex_string_to_binary([A,B|Rest], Acc) -> internal_deliver(Q, ReadMsg, State = #dbstate { db_conn = Conn }) -> QStr = binary_to_escaped_string(term_to_binary(Q)), case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of - {selected, _, []} -> - odbc:commit(Conn, commit), - {ok, empty, State}; - {selected, _, [{ReadSeqId}]} -> - case odbc:sql_query(Conn, "select is_delivered, msg_id from ledger where queue = " ++ QStr ++ - " and seq_id = " ++ integer_to_list(ReadSeqId)) of - {selected, _, []} -> - {ok, empty, State}; - {selected, _, [{IsDeliveredStr, MsgIdStr}]} -> - IsDelivered = IsDeliveredStr /= "0", - if IsDelivered -> ok; - true -> odbc:sql_query(Conn, "update ledger set is_delivered = true where queue = " ++ - QStr ++ " and seq_id = " ++ integer_to_list(ReadSeqId)) - end, - MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)), - %% yeah, this is really necessary. sigh - MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)), - odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++ - " where queue = " ++ QStr), - if ReadMsg -> - {selected, _, [{MsgBodyStr}]} = - odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2), - odbc:commit(Conn, commit), - MsgBody = hex_string_to_binary(MsgBodyStr), - BodySize = size(MsgBody), - {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State}; - true -> - odbc:commit(Conn, commit), - {ok, {MsgId, IsDelivered, {MsgId, ReadSeqId}}, State} - end - end + {selected, _, []} -> + odbc:commit(Conn, commit), + {ok, empty, State}; + {selected, _, [{ReadSeqId}]} -> + case odbc:sql_query(Conn, "select is_delivered, msg_id from ledger where queue = " ++ QStr ++ + " and seq_id = " ++ integer_to_list(ReadSeqId)) of + {selected, _, []} -> + {ok, empty, State}; + {selected, _, [{IsDeliveredStr, MsgIdStr}]} -> + IsDelivered = IsDeliveredStr /= "0", + if IsDelivered -> ok; + true -> odbc:sql_query(Conn, "update ledger set is_delivered = true where queue = " ++ + QStr ++ " and seq_id = " ++ integer_to_list(ReadSeqId)) + end, + MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)), + %% yeah, this is really necessary. sigh + MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)), + odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++ + " where queue = " ++ QStr), + if ReadMsg -> + {selected, _, [{MsgBodyStr}]} = + odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2), + odbc:commit(Conn, commit), + MsgBody = hex_string_to_binary(MsgBodyStr), + BodySize = size(MsgBody), + {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State}; + true -> + odbc:commit(Conn, commit), + {ok, {MsgId, IsDelivered, {MsgId, ReadSeqId}}, State} + end + end end. internal_ack(Q, MsgSeqIds, State) -> @@ -294,22 +294,22 @@ remove_messages(Q, MsgSeqIds, LedgerDelete, State = #dbstate { db_conn = Conn }) QStr = binary_to_escaped_string(term_to_binary(Q)), lists:foreach( fun ({MsgId, SeqId}) -> - MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), - {selected, _, [{RefCount}]} = - odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++ - MsgIdStr), - case RefCount of - 1 -> odbc:sql_query(Conn, "delete from message where msg_id = " ++ - MsgIdStr); - _ -> odbc:sql_query(Conn, "update message set ref_count = " ++ - integer_to_list(RefCount - 1) ++ " where msg_id = " ++ - MsgIdStr) - end, - if LedgerDelete -> - odbc:sql_query(Conn, "delete from ledger where queue = " ++ - QStr ++ " and seq_id = " ++ integer_to_list(SeqId)); - true -> ok - end + MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), + {selected, _, [{RefCount}]} = + odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++ + MsgIdStr), + case RefCount of + 1 -> odbc:sql_query(Conn, "delete from message where msg_id = " ++ + MsgIdStr); + _ -> odbc:sql_query(Conn, "update message set ref_count = " ++ + integer_to_list(RefCount - 1) ++ " where msg_id = " ++ + MsgIdStr) + end, + if LedgerDelete -> + odbc:sql_query(Conn, "delete from ledger where queue = " ++ + QStr ++ " and seq_id = " ++ integer_to_list(SeqId)); + true -> ok + end end, MsgSeqIds), odbc:commit(Conn, commit), {ok, State}. @@ -318,12 +318,12 @@ internal_tx_publish(MsgId, MsgBody, State = #dbstate { db_conn = Conn }) -> MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), MsgStr = binary_to_escaped_string(MsgBody), case odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++ MsgIdStr) of - {selected, _, []} -> - odbc:sql_query(Conn, "insert into message (msg_id, msg, ref_count) values (" ++ - MsgIdStr ++ ", " ++ MsgStr ++ ", 1)"); - {selected, _, [{RefCount}]} -> - odbc:sql_query(Conn, "update message set ref_count = " ++ - integer_to_list(RefCount + 1) ++ " where msg_id = " ++ MsgIdStr) + {selected, _, []} -> + odbc:sql_query(Conn, "insert into message (msg_id, msg, ref_count) values (" ++ + MsgIdStr ++ ", " ++ MsgStr ++ ", 1)"); + {selected, _, [{RefCount}]} -> + odbc:sql_query(Conn, "update message set ref_count = " ++ + integer_to_list(RefCount + 1) ++ " where msg_id = " ++ MsgIdStr) end, odbc:commit(Conn, commit), {ok, State}. @@ -331,24 +331,24 @@ internal_tx_publish(MsgId, MsgBody, State = #dbstate { db_conn = Conn }) -> internal_tx_commit(Q, PubMsgIds, AckSeqIds, State = #dbstate { db_conn = Conn }) -> QStr = binary_to_escaped_string(term_to_binary(Q)), {InsertOrUpdate, NextWrite} = - case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of - {selected, _, []} -> {insert, 0}; - {selected, _, [{NextWrite2}]} -> {update, NextWrite2} - end, + case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of + {selected, _, []} -> {insert, 0}; + {selected, _, [{NextWrite2}]} -> {update, NextWrite2} + end, NextWrite3 = - lists:foldl(fun (MsgId, WriteSeqInteger) -> - MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), - odbc:sql_query(Conn, - "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++ - QStr ++ ", " ++ integer_to_list(WriteSeqInteger) ++ ", false, " ++ - MsgIdStr ++ ")"), - WriteSeqInteger + 1 - end, NextWrite, PubMsgIds), + lists:foldl(fun (MsgId, WriteSeqInteger) -> + MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), + odbc:sql_query(Conn, + "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++ + QStr ++ ", " ++ integer_to_list(WriteSeqInteger) ++ ", false, " ++ + MsgIdStr ++ ")"), + WriteSeqInteger + 1 + end, NextWrite, PubMsgIds), case InsertOrUpdate of - update -> odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(NextWrite3) ++ - " where queue = " ++ QStr); - insert -> odbc:sql_query(Conn, "insert into sequence (queue, next_read, next_write) values (" ++ - QStr ++ ", 0, " ++ integer_to_list(NextWrite3) ++ ")") + update -> odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(NextWrite3) ++ + " where queue = " ++ QStr); + insert -> odbc:sql_query(Conn, "insert into sequence (queue, next_read, next_write) values (" ++ + QStr ++ ", 0, " ++ integer_to_list(NextWrite3) ++ ")") end, odbc:commit(Conn, commit), remove_messages(Q, AckSeqIds, true, State), @@ -359,19 +359,19 @@ internal_publish(Q, MsgId, MsgBody, State = #dbstate { db_conn = Conn }) -> MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)), QStr = binary_to_escaped_string(term_to_binary(Q)), NextWrite = - case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of - {selected, _, []} -> - odbc:sql_query(Conn, - "insert into sequence (queue, next_read, next_write) values (" ++ - QStr ++ ", 0, 1)"), - 0; - {selected, _, [{NextWrite2}]} -> - odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(1 + NextWrite2) ++ - " where queue = " ++ QStr), - NextWrite2 - end, + case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of + {selected, _, []} -> + odbc:sql_query(Conn, + "insert into sequence (queue, next_read, next_write) values (" ++ + QStr ++ ", 0, 1)"), + 0; + {selected, _, [{NextWrite2}]} -> + odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(1 + NextWrite2) ++ + " where queue = " ++ QStr), + NextWrite2 + end, odbc:sql_query(Conn, "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++ - QStr ++ ", " ++ integer_to_list(NextWrite) ++ ", false, " ++ MsgIdStr ++ ")"), + QStr ++ ", " ++ integer_to_list(NextWrite) ++ ", false, " ++ MsgIdStr ++ ")"), odbc:commit(Conn, commit), {ok, State1}. @@ -382,36 +382,36 @@ internal_tx_cancel(MsgIds, State) -> internal_requeue(Q, MsgSeqIds, State = #dbstate { db_conn = Conn }) -> QStr = binary_to_escaped_string(term_to_binary(Q)), {selected, _, [{WriteSeqId}]} = - odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr), + odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr), WriteSeqId2 = - lists:foldl( - fun ({_MsgId, SeqId}, NextWriteSeqId) -> - odbc:sql_query(Conn, "update ledger set seq_id = " ++ integer_to_list(NextWriteSeqId) ++ - " where seq_id = " ++ integer_to_list(SeqId) ++ " and queue = " ++ QStr), - NextWriteSeqId + 1 - end, WriteSeqId, MsgSeqIds), + lists:foldl( + fun ({_MsgId, SeqId}, NextWriteSeqId) -> + odbc:sql_query(Conn, "update ledger set seq_id = " ++ integer_to_list(NextWriteSeqId) ++ + " where seq_id = " ++ integer_to_list(SeqId) ++ " and queue = " ++ QStr), + NextWriteSeqId + 1 + end, WriteSeqId, MsgSeqIds), odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(WriteSeqId2) ++ - " where queue = " ++ QStr), + " where queue = " ++ QStr), odbc:commit(Conn, commit), {ok, State}. - + compact_already_delivered(#dbstate { db_conn = Conn }) -> {selected, _, Seqs} = odbc:sql_query(Conn, "select queue, next_read from sequence"), lists:foreach( fun ({QHexStr, ReadSeqId}) -> - Q = binary_to_term(hex_string_to_binary(QHexStr)), - QStr = binary_to_escaped_string(term_to_binary(Q)), - case odbc:sql_query(Conn, "select min(seq_id) from ledger where queue = " - ++ QStr) of - {selected, _, []} -> ok; - {selected, _, [{null}]} -> ok; %% AGH! - {selected, _, [{Min}]} -> - Gap = shuffle_up(Conn, QStr, Min - 1, ReadSeqId - 1, 0), - odbc:sql_query(Conn, "update sequence set next_read = " ++ - integer_to_list(Min + Gap) ++ - " where queue = " ++ QStr) - end + Q = binary_to_term(hex_string_to_binary(QHexStr)), + QStr = binary_to_escaped_string(term_to_binary(Q)), + case odbc:sql_query(Conn, "select min(seq_id) from ledger where queue = " + ++ QStr) of + {selected, _, []} -> ok; + {selected, _, [{null}]} -> ok; %% AGH! + {selected, _, [{Min}]} -> + Gap = shuffle_up(Conn, QStr, Min - 1, ReadSeqId - 1, 0), + odbc:sql_query(Conn, "update sequence set next_read = " ++ + integer_to_list(Min + Gap) ++ + " where queue = " ++ QStr) + end end, Seqs), odbc:commit(Conn, commit). @@ -419,36 +419,36 @@ shuffle_up(_Conn, _QStr, SeqId, SeqId, Gap) -> Gap; shuffle_up(Conn, QStr, BaseSeqId, SeqId, Gap) -> GapInc = - case odbc:sql_query(Conn, "select count(1) from ledger where queue = " ++ - QStr ++ " and seq_id = " ++ integer_to_list(SeqId)) of - {selected, _, [{"0"}]} -> - 1; - {selected, _, [{"1"}]} -> - if Gap =:= 0 -> ok; - true -> odbc:sql_query(Conn, "update ledger set seq_id = " ++ - integer_to_list(SeqId + Gap) ++ " where seq_id = " ++ - integer_to_list(SeqId) ++ " and queue = " ++ QStr) - end, - 0 - end, + case odbc:sql_query(Conn, "select count(1) from ledger where queue = " ++ + QStr ++ " and seq_id = " ++ integer_to_list(SeqId)) of + {selected, _, [{"0"}]} -> + 1; + {selected, _, [{"1"}]} -> + if Gap =:= 0 -> ok; + true -> odbc:sql_query(Conn, "update ledger set seq_id = " ++ + integer_to_list(SeqId + Gap) ++ " where seq_id = " ++ + integer_to_list(SeqId) ++ " and queue = " ++ QStr) + end, + 0 + end, shuffle_up(Conn, QStr, BaseSeqId, SeqId - 1, Gap + GapInc). internal_purge(Q, State = #dbstate { db_conn = Conn }) -> QStr = binary_to_escaped_string(term_to_binary(Q)), case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of - {selected, _, []} -> - odbc:commit(Conn, commit), - {ok, 0, State}; - {selected, _, [{ReadSeqId}]} -> - odbc:sql_query(Conn, "update sequence set next_read = next_write where queue = " ++ QStr), - {selected, _, MsgSeqIds} = - odbc:sql_query(Conn, "select msg_id, seq_id from ledger where queue = " ++ - QStr ++ " and seq_id >= " ++ ReadSeqId), - MsgSeqIds2 = lists:map( - fun ({MsgIdStr, SeqIdStr}) -> - { binary_to_term(hex_string_to_binary(MsgIdStr)), - list_to_integer(SeqIdStr) } - end, MsgSeqIds), - {ok, State2} = remove_messages(Q, MsgSeqIds2, true, State), - {ok, length(MsgSeqIds2), State2} + {selected, _, []} -> + odbc:commit(Conn, commit), + {ok, 0, State}; + {selected, _, [{ReadSeqId}]} -> + odbc:sql_query(Conn, "update sequence set next_read = next_write where queue = " ++ QStr), + {selected, _, MsgSeqIds} = + odbc:sql_query(Conn, "select msg_id, seq_id from ledger where queue = " ++ + QStr ++ " and seq_id >= " ++ ReadSeqId), + MsgSeqIds2 = lists:map( + fun ({MsgIdStr, SeqIdStr}) -> + { binary_to_term(hex_string_to_binary(MsgIdStr)), + list_to_integer(SeqIdStr) } + end, MsgSeqIds), + {ok, State2} = remove_messages(Q, MsgSeqIds2, true, State), + {ok, length(MsgSeqIds2), State2} end. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 2b6f7b00..3b30a0da 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -40,7 +40,7 @@ -export([publish/4, publish_with_seq/5, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, - requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, + requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1 ]). @@ -68,19 +68,19 @@ -define(FILE_SIZE_LIMIT, (256*1024*1024)). -record(dqstate, {msg_location_dets, %% where are messages? - msg_location_ets, %% as above, but for ets version + msg_location_ets, %% as above, but for ets version operation_mode, %% ram_disk | disk_only - file_summary, %% what's in the files? - sequences, %% next read and write for each q - current_file_num, %% current file name as number - current_file_name, %% current file name - current_file_handle, %% current file handle - current_offset, %% current offset within current file + file_summary, %% what's in the files? + sequences, %% next read and write for each q + current_file_num, %% current file name as number + current_file_name, %% current file name + current_file_handle, %% current file handle + current_offset, %% current offset within current file current_dirty, %% has the current file been written to since the last fsync? - file_size_limit, %% how big can our files get? - read_file_handles, %% file handles for reading (LRU) - read_file_handles_limit %% how many file handles can we open? - }). + file_size_limit, %% how big can our files get? + read_file_handles, %% file handles for reading (LRU) + read_file_handles_limit %% how many file handles can we open? + }). %% The components: %% @@ -92,10 +92,10 @@ %% {Q, ReadSeqId, WriteSeqId, QueueLength} %% rabbit_disk_queue: this is an mnesia table which contains: %% #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, -%% is_delivered = IsDelivered, -%% msg_id = MsgId, +%% is_delivered = IsDelivered, +%% msg_id = MsgId, %% next_seq_id = SeqId -%% } +%% } %% %% The basic idea is that messages are appended to the current file up @@ -190,18 +190,18 @@ %% variable. Judicious use of a mirror is required). %% %% +-------+ +-------+ +-------+ -%% | X | | G | | G | -%% +-------+ +-------+ +-------+ -%% | D | | X | | F | -%% +-------+ +-------+ +-------+ -%% | X | | X | | E | -%% +-------+ +-------+ +-------+ +%% | X | | G | | G | +%% +-------+ +-------+ +-------+ +%% | D | | X | | F | +%% +-------+ +-------+ +-------+ +%% | X | | X | | E | +%% +-------+ +-------+ +-------+ %% | C | | F | ===> | D | -%% +-------+ +-------+ +-------+ -%% | X | | X | | C | -%% +-------+ +-------+ +-------+ -%% | B | | X | | B | -%% +-------+ +-------+ +-------+ +%% +-------+ +-------+ +-------+ +%% | X | | X | | C | +%% +-------+ +-------+ +-------+ +%% | B | | X | | B | +%% +-------+ +-------+ +-------+ %% | A | | E | | A | %% +-------+ +-------+ +-------+ %% left right left diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 31c0fb10..5082fe55 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -52,7 +52,7 @@ start_link(Queue, IsDurable, disk) -> purge_non_persistent_messages( #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, - is_durable = IsDurable, length = 0 }); + is_durable = IsDurable, length = 0 }); start_link(Queue, IsDurable, mixed) -> {ok, State} = start_link(Queue, IsDurable, disk), to_mixed_mode(State). @@ -111,12 +111,12 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) -> {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}. purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, - is_durable = IsDurable }) -> + is_durable = IsDurable }) -> %% iterate through the content on disk, ack anything which isn't %% persistent, accumulate everything else that is persistent and %% requeue it {Acks, Requeue, Length} = - deliver_all_messages(Q, IsDurable, [], [], 0), + deliver_all_messages(Q, IsDurable, [], [], 0), ok = if Requeue == [] -> ok; true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) end, @@ -127,19 +127,19 @@ purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) -> case rabbit_disk_queue:deliver(Q) of - empty -> {Acks, Requeue, Length}; - {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} -> - #basic_message { guid = MsgId, is_persistent = IsPersistent } = - bin_to_msg(MsgBin), - OnDisk = IsPersistent andalso IsDurable, - {Acks2, Requeue2, Length2} = - if OnDisk -> {Acks, - [{AckTag, {next, IsDelivered}} | Requeue], - Length + 1 - }; - true -> {[AckTag | Acks], Requeue, Length} - end, - deliver_all_messages(Q, IsDurable, Acks2, Requeue2, Length2) + empty -> {Acks, Requeue, Length}; + {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} -> + #basic_message { guid = MsgId, is_persistent = IsPersistent } = + bin_to_msg(MsgBin), + OnDisk = IsPersistent andalso IsDurable, + {Acks2, Requeue2, Length2} = + if OnDisk -> {Acks, + [{AckTag, {next, IsDelivered}} | Requeue], + Length + 1 + }; + true -> {[AckTag | Acks], Requeue, Length} + end, + deliver_all_messages(Q, IsDurable, Acks2, Requeue2, Length2) end. msg_to_bin(Msg = #basic_message { content = Content }) -> @@ -167,7 +167,7 @@ publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, %% Assumption here is that the queue is empty already (only called via %% attempt_immediate_delivery). publish_delivered(Msg = - #basic_message { guid = MsgId, is_persistent = IsPersistent}, + #basic_message { guid = MsgId, is_persistent = IsPersistent}, State = #mqstate { mode = Mode, is_durable = IsDurable, queue = Q, length = 0 }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> @@ -235,7 +235,7 @@ ack(Acks, State = #mqstate { queue = Q }) -> end. tx_publish(Msg = #basic_message { guid = MsgId }, - State = #mqstate { mode = disk }) -> + State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), {ok, State}; tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -254,9 +254,9 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q, length = Length }) -> RealAcks = remove_noacks(Acks), ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; - true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), - RealAcks) - end, + true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), + RealAcks) + end, {ok, State #mqstate { length = Length + erlang:length(Publishes) }}; tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, @@ -266,7 +266,7 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, {PersistentPubs, MsgBuf2} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, {Acc, MsgBuf3}) -> - OnDisk = IsPersistent andalso IsDurable, + OnDisk = IsPersistent andalso IsDurable, Acc2 = if OnDisk -> [Msg #basic_message.guid | Acc]; @@ -279,32 +279,32 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, %% requirements of rabbit_disk_queue (ascending SeqIds) RealAcks = remove_noacks(Acks), ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok; - true -> - rabbit_disk_queue:tx_commit( - Q, lists:reverse(PersistentPubs), RealAcks) - end, + true -> + rabbit_disk_queue:tx_commit( + Q, lists:reverse(PersistentPubs), RealAcks) + end, {ok, State #mqstate { msg_buf = MsgBuf2, length = Length + erlang:length(Publishes) }}. only_persistent_msg_ids(Pubs) -> lists:reverse( lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> - if IsPersistent -> [Msg #basic_message.guid | Acc]; - true -> Acc - end - end, [], Pubs)). + fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> + if IsPersistent -> [Msg #basic_message.guid | Acc]; + true -> Acc + end + end, [], Pubs)). tx_cancel(Publishes, State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)), {ok, State}; tx_cancel(Publishes, - State = #mqstate { mode = mixed, is_durable = IsDurable }) -> + State = #mqstate { mode = mixed, is_durable = IsDurable }) -> ok = - if IsDurable -> - rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes)); - true -> ok - end, + if IsDurable -> + rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes)); + true -> ok + end, {ok, State}. %% [{Msg, AckTag}] @@ -337,16 +337,16 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, }) -> {PersistentPubs, MsgBuf2} = lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, - {Acc, MsgBuf3}) -> - OnDisk = IsDurable andalso IsPersistent, - Acc2 = - if OnDisk -> [AckTag | Acc]; - true -> Acc - end, - MsgBuf4 = queue:in({Msg, true, OnDisk}, MsgBuf3), - {Acc2, MsgBuf4} - end, {[], MsgBuf}, MessagesWithAckTags), + fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, + {Acc, MsgBuf3}) -> + OnDisk = IsDurable andalso IsPersistent, + Acc2 = + if OnDisk -> [AckTag | Acc]; + true -> Acc + end, + MsgBuf4 = queue:in({Msg, true, OnDisk}, MsgBuf3), + {Acc2, MsgBuf4} + end, {[], MsgBuf}, MessagesWithAckTags), ok = if [] == PersistentPubs -> ok; true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) end, |