summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-17 12:13:42 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-17 12:13:42 +0100
commite58f9b400068753405a308cf1d8269e9cb0331e4 (patch)
tree8764f418f6d6d20eb9a569933c9cff2ac51dcc78
parent4ac62980edaac85249818cb65fad616149c1c38c (diff)
downloadrabbitmq-server-e58f9b400068753405a308cf1d8269e9cb0331e4.tar.gz
just removing tabs
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_db_queue.erl290
-rw-r--r--src/rabbit_disk_queue.erl52
-rw-r--r--src/rabbit_mixed_queue.erl92
4 files changed, 219 insertions, 219 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 44e4dc7f..2eecac5e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -136,7 +136,7 @@ start(normal, []) ->
ok = rabbit_amqqueue:start(),
- {ok, MemoryAlarms} = application:get_env(memory_alarms),
+ {ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
ok = start_child(rabbit_queue_mode_manager),
@@ -311,7 +311,7 @@ rotate_logs(File, Suffix, OldHandler, NewHandler) ->
log_rotation_result({error, MainLogError}, {error, SaslLogError}) ->
{error, {{cannot_rotate_main_logs, MainLogError},
- {cannot_rotate_sasl_logs, SaslLogError}}};
+ {cannot_rotate_sasl_logs, SaslLogError}}};
log_rotation_result({error, MainLogError}, ok) ->
{error, {cannot_rotate_main_logs, MainLogError}};
log_rotation_result(ok, {error, SaslLogError}) ->
diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl
index 897a4a6f..7530892d 100644
--- a/src/rabbit_db_queue.erl
+++ b/src/rabbit_db_queue.erl
@@ -60,7 +60,7 @@
terminate/2, code_change/3]).
-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2,
- tx_commit/3, tx_cancel/1, requeue/2, purge/1]).
+ tx_commit/3, tx_cancel/1, requeue/2, purge/1]).
-export([stop/0, stop_and_obliterate/0]).
@@ -75,13 +75,13 @@
-type(seq_id() :: non_neg_integer()).
-spec(start_link/1 :: (non_neg_integer()) ->
- {'ok', pid()} | 'ignore' | {'error', any()}).
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
- {'empty' | {msg_id(), binary(), non_neg_integer(),
- bool(), {msg_id(), seq_id()}}}).
+ {'empty' | {msg_id(), binary(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}}).
-spec(phantom_deliver/1 :: (queue_name()) ->
- { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}).
+ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
@@ -97,7 +97,7 @@
start_link(DSN) ->
gen_server:start_link({local, ?SERVER}, ?MODULE,
- [DSN], []).
+ [DSN], []).
publish(Q, MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
@@ -139,7 +139,7 @@ init([DSN]) ->
process_flag(trap_exit, true),
odbc:start(),
{ok, Conn} = odbc:connect(DSN, [{auto_commit, off}, {tuple_row, on},
- {scrollable_cursors, off}, {trace_driver, off}]),
+ {scrollable_cursors, off}, {trace_driver, off}]),
State = #dbstate { db_conn = Conn },
compact_already_delivered(State),
{ok, State}.
@@ -213,12 +213,12 @@ escape_byte(B) when B > 31 andalso B < 127 ->
B;
escape_byte(B) ->
case io_lib:format("~.8B", [B]) of
- O1 = [[_]] ->
- "\\\\00" ++ O1;
- O2 = [[_,_]] ->
- "\\\\0" ++ O2;
- O3 = [[_,_,_]] ->
- "\\\\" ++ O3
+ O1 = [[_]] ->
+ "\\\\00" ++ O1;
+ O2 = [[_,_]] ->
+ "\\\\0" ++ O2;
+ O3 = [[_,_,_]] ->
+ "\\\\" ++ O3
end.
escaped_string_to_binary(Str) when is_list(Str) ->
@@ -230,9 +230,9 @@ escaped_string_to_binary([$\\,$\\|Rest], Acc) ->
escaped_string_to_binary(Rest, [$\\ | Acc]);
escaped_string_to_binary([$\\,A,B,C|Rest], Acc) ->
escaped_string_to_binary(Rest, [(list_to_integer([A])*64) +
- (list_to_integer([B])*8) +
- list_to_integer([C])
- | Acc]);
+ (list_to_integer([B])*8) +
+ list_to_integer([C])
+ | Acc]);
escaped_string_to_binary([C|Rest], Acc) ->
escaped_string_to_binary(Rest, [C|Acc]).
@@ -250,37 +250,37 @@ hex_string_to_binary([A,B|Rest], Acc) ->
internal_deliver(Q, ReadMsg, State = #dbstate { db_conn = Conn }) ->
QStr = binary_to_escaped_string(term_to_binary(Q)),
case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of
- {selected, _, []} ->
- odbc:commit(Conn, commit),
- {ok, empty, State};
- {selected, _, [{ReadSeqId}]} ->
- case odbc:sql_query(Conn, "select is_delivered, msg_id from ledger where queue = " ++ QStr ++
- " and seq_id = " ++ integer_to_list(ReadSeqId)) of
- {selected, _, []} ->
- {ok, empty, State};
- {selected, _, [{IsDeliveredStr, MsgIdStr}]} ->
- IsDelivered = IsDeliveredStr /= "0",
- if IsDelivered -> ok;
- true -> odbc:sql_query(Conn, "update ledger set is_delivered = true where queue = " ++
- QStr ++ " and seq_id = " ++ integer_to_list(ReadSeqId))
- end,
- MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)),
- %% yeah, this is really necessary. sigh
- MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)),
- odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++
- " where queue = " ++ QStr),
- if ReadMsg ->
- {selected, _, [{MsgBodyStr}]} =
- odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2),
- odbc:commit(Conn, commit),
- MsgBody = hex_string_to_binary(MsgBodyStr),
- BodySize = size(MsgBody),
- {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State};
- true ->
- odbc:commit(Conn, commit),
- {ok, {MsgId, IsDelivered, {MsgId, ReadSeqId}}, State}
- end
- end
+ {selected, _, []} ->
+ odbc:commit(Conn, commit),
+ {ok, empty, State};
+ {selected, _, [{ReadSeqId}]} ->
+ case odbc:sql_query(Conn, "select is_delivered, msg_id from ledger where queue = " ++ QStr ++
+ " and seq_id = " ++ integer_to_list(ReadSeqId)) of
+ {selected, _, []} ->
+ {ok, empty, State};
+ {selected, _, [{IsDeliveredStr, MsgIdStr}]} ->
+ IsDelivered = IsDeliveredStr /= "0",
+ if IsDelivered -> ok;
+ true -> odbc:sql_query(Conn, "update ledger set is_delivered = true where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(ReadSeqId))
+ end,
+ MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)),
+ %% yeah, this is really necessary. sigh
+ MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)),
+ odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++
+ " where queue = " ++ QStr),
+ if ReadMsg ->
+ {selected, _, [{MsgBodyStr}]} =
+ odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2),
+ odbc:commit(Conn, commit),
+ MsgBody = hex_string_to_binary(MsgBodyStr),
+ BodySize = size(MsgBody),
+ {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State};
+ true ->
+ odbc:commit(Conn, commit),
+ {ok, {MsgId, IsDelivered, {MsgId, ReadSeqId}}, State}
+ end
+ end
end.
internal_ack(Q, MsgSeqIds, State) ->
@@ -294,22 +294,22 @@ remove_messages(Q, MsgSeqIds, LedgerDelete, State = #dbstate { db_conn = Conn })
QStr = binary_to_escaped_string(term_to_binary(Q)),
lists:foreach(
fun ({MsgId, SeqId}) ->
- MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
- {selected, _, [{RefCount}]} =
- odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++
- MsgIdStr),
- case RefCount of
- 1 -> odbc:sql_query(Conn, "delete from message where msg_id = " ++
- MsgIdStr);
- _ -> odbc:sql_query(Conn, "update message set ref_count = " ++
- integer_to_list(RefCount - 1) ++ " where msg_id = " ++
- MsgIdStr)
- end,
- if LedgerDelete ->
- odbc:sql_query(Conn, "delete from ledger where queue = " ++
- QStr ++ " and seq_id = " ++ integer_to_list(SeqId));
- true -> ok
- end
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ {selected, _, [{RefCount}]} =
+ odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++
+ MsgIdStr),
+ case RefCount of
+ 1 -> odbc:sql_query(Conn, "delete from message where msg_id = " ++
+ MsgIdStr);
+ _ -> odbc:sql_query(Conn, "update message set ref_count = " ++
+ integer_to_list(RefCount - 1) ++ " where msg_id = " ++
+ MsgIdStr)
+ end,
+ if LedgerDelete ->
+ odbc:sql_query(Conn, "delete from ledger where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(SeqId));
+ true -> ok
+ end
end, MsgSeqIds),
odbc:commit(Conn, commit),
{ok, State}.
@@ -318,12 +318,12 @@ internal_tx_publish(MsgId, MsgBody, State = #dbstate { db_conn = Conn }) ->
MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
MsgStr = binary_to_escaped_string(MsgBody),
case odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++ MsgIdStr) of
- {selected, _, []} ->
- odbc:sql_query(Conn, "insert into message (msg_id, msg, ref_count) values (" ++
- MsgIdStr ++ ", " ++ MsgStr ++ ", 1)");
- {selected, _, [{RefCount}]} ->
- odbc:sql_query(Conn, "update message set ref_count = " ++
- integer_to_list(RefCount + 1) ++ " where msg_id = " ++ MsgIdStr)
+ {selected, _, []} ->
+ odbc:sql_query(Conn, "insert into message (msg_id, msg, ref_count) values (" ++
+ MsgIdStr ++ ", " ++ MsgStr ++ ", 1)");
+ {selected, _, [{RefCount}]} ->
+ odbc:sql_query(Conn, "update message set ref_count = " ++
+ integer_to_list(RefCount + 1) ++ " where msg_id = " ++ MsgIdStr)
end,
odbc:commit(Conn, commit),
{ok, State}.
@@ -331,24 +331,24 @@ internal_tx_publish(MsgId, MsgBody, State = #dbstate { db_conn = Conn }) ->
internal_tx_commit(Q, PubMsgIds, AckSeqIds, State = #dbstate { db_conn = Conn }) ->
QStr = binary_to_escaped_string(term_to_binary(Q)),
{InsertOrUpdate, NextWrite} =
- case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of
- {selected, _, []} -> {insert, 0};
- {selected, _, [{NextWrite2}]} -> {update, NextWrite2}
- end,
+ case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of
+ {selected, _, []} -> {insert, 0};
+ {selected, _, [{NextWrite2}]} -> {update, NextWrite2}
+ end,
NextWrite3 =
- lists:foldl(fun (MsgId, WriteSeqInteger) ->
- MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
- odbc:sql_query(Conn,
- "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++
- QStr ++ ", " ++ integer_to_list(WriteSeqInteger) ++ ", false, " ++
- MsgIdStr ++ ")"),
- WriteSeqInteger + 1
- end, NextWrite, PubMsgIds),
+ lists:foldl(fun (MsgId, WriteSeqInteger) ->
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ odbc:sql_query(Conn,
+ "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++
+ QStr ++ ", " ++ integer_to_list(WriteSeqInteger) ++ ", false, " ++
+ MsgIdStr ++ ")"),
+ WriteSeqInteger + 1
+ end, NextWrite, PubMsgIds),
case InsertOrUpdate of
- update -> odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(NextWrite3) ++
- " where queue = " ++ QStr);
- insert -> odbc:sql_query(Conn, "insert into sequence (queue, next_read, next_write) values (" ++
- QStr ++ ", 0, " ++ integer_to_list(NextWrite3) ++ ")")
+ update -> odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(NextWrite3) ++
+ " where queue = " ++ QStr);
+ insert -> odbc:sql_query(Conn, "insert into sequence (queue, next_read, next_write) values (" ++
+ QStr ++ ", 0, " ++ integer_to_list(NextWrite3) ++ ")")
end,
odbc:commit(Conn, commit),
remove_messages(Q, AckSeqIds, true, State),
@@ -359,19 +359,19 @@ internal_publish(Q, MsgId, MsgBody, State = #dbstate { db_conn = Conn }) ->
MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
QStr = binary_to_escaped_string(term_to_binary(Q)),
NextWrite =
- case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of
- {selected, _, []} ->
- odbc:sql_query(Conn,
- "insert into sequence (queue, next_read, next_write) values (" ++
- QStr ++ ", 0, 1)"),
- 0;
- {selected, _, [{NextWrite2}]} ->
- odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(1 + NextWrite2) ++
- " where queue = " ++ QStr),
- NextWrite2
- end,
+ case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of
+ {selected, _, []} ->
+ odbc:sql_query(Conn,
+ "insert into sequence (queue, next_read, next_write) values (" ++
+ QStr ++ ", 0, 1)"),
+ 0;
+ {selected, _, [{NextWrite2}]} ->
+ odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(1 + NextWrite2) ++
+ " where queue = " ++ QStr),
+ NextWrite2
+ end,
odbc:sql_query(Conn, "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++
- QStr ++ ", " ++ integer_to_list(NextWrite) ++ ", false, " ++ MsgIdStr ++ ")"),
+ QStr ++ ", " ++ integer_to_list(NextWrite) ++ ", false, " ++ MsgIdStr ++ ")"),
odbc:commit(Conn, commit),
{ok, State1}.
@@ -382,36 +382,36 @@ internal_tx_cancel(MsgIds, State) ->
internal_requeue(Q, MsgSeqIds, State = #dbstate { db_conn = Conn }) ->
QStr = binary_to_escaped_string(term_to_binary(Q)),
{selected, _, [{WriteSeqId}]} =
- odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr),
+ odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr),
WriteSeqId2 =
- lists:foldl(
- fun ({_MsgId, SeqId}, NextWriteSeqId) ->
- odbc:sql_query(Conn, "update ledger set seq_id = " ++ integer_to_list(NextWriteSeqId) ++
- " where seq_id = " ++ integer_to_list(SeqId) ++ " and queue = " ++ QStr),
- NextWriteSeqId + 1
- end, WriteSeqId, MsgSeqIds),
+ lists:foldl(
+ fun ({_MsgId, SeqId}, NextWriteSeqId) ->
+ odbc:sql_query(Conn, "update ledger set seq_id = " ++ integer_to_list(NextWriteSeqId) ++
+ " where seq_id = " ++ integer_to_list(SeqId) ++ " and queue = " ++ QStr),
+ NextWriteSeqId + 1
+ end, WriteSeqId, MsgSeqIds),
odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(WriteSeqId2) ++
- " where queue = " ++ QStr),
+ " where queue = " ++ QStr),
odbc:commit(Conn, commit),
{ok, State}.
-
+
compact_already_delivered(#dbstate { db_conn = Conn }) ->
{selected, _, Seqs} = odbc:sql_query(Conn, "select queue, next_read from sequence"),
lists:foreach(
fun ({QHexStr, ReadSeqId}) ->
- Q = binary_to_term(hex_string_to_binary(QHexStr)),
- QStr = binary_to_escaped_string(term_to_binary(Q)),
- case odbc:sql_query(Conn, "select min(seq_id) from ledger where queue = "
- ++ QStr) of
- {selected, _, []} -> ok;
- {selected, _, [{null}]} -> ok; %% AGH!
- {selected, _, [{Min}]} ->
- Gap = shuffle_up(Conn, QStr, Min - 1, ReadSeqId - 1, 0),
- odbc:sql_query(Conn, "update sequence set next_read = " ++
- integer_to_list(Min + Gap) ++
- " where queue = " ++ QStr)
- end
+ Q = binary_to_term(hex_string_to_binary(QHexStr)),
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ case odbc:sql_query(Conn, "select min(seq_id) from ledger where queue = "
+ ++ QStr) of
+ {selected, _, []} -> ok;
+ {selected, _, [{null}]} -> ok; %% AGH!
+ {selected, _, [{Min}]} ->
+ Gap = shuffle_up(Conn, QStr, Min - 1, ReadSeqId - 1, 0),
+ odbc:sql_query(Conn, "update sequence set next_read = " ++
+ integer_to_list(Min + Gap) ++
+ " where queue = " ++ QStr)
+ end
end, Seqs),
odbc:commit(Conn, commit).
@@ -419,36 +419,36 @@ shuffle_up(_Conn, _QStr, SeqId, SeqId, Gap) ->
Gap;
shuffle_up(Conn, QStr, BaseSeqId, SeqId, Gap) ->
GapInc =
- case odbc:sql_query(Conn, "select count(1) from ledger where queue = " ++
- QStr ++ " and seq_id = " ++ integer_to_list(SeqId)) of
- {selected, _, [{"0"}]} ->
- 1;
- {selected, _, [{"1"}]} ->
- if Gap =:= 0 -> ok;
- true -> odbc:sql_query(Conn, "update ledger set seq_id = " ++
- integer_to_list(SeqId + Gap) ++ " where seq_id = " ++
- integer_to_list(SeqId) ++ " and queue = " ++ QStr)
- end,
- 0
- end,
+ case odbc:sql_query(Conn, "select count(1) from ledger where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(SeqId)) of
+ {selected, _, [{"0"}]} ->
+ 1;
+ {selected, _, [{"1"}]} ->
+ if Gap =:= 0 -> ok;
+ true -> odbc:sql_query(Conn, "update ledger set seq_id = " ++
+ integer_to_list(SeqId + Gap) ++ " where seq_id = " ++
+ integer_to_list(SeqId) ++ " and queue = " ++ QStr)
+ end,
+ 0
+ end,
shuffle_up(Conn, QStr, BaseSeqId, SeqId - 1, Gap + GapInc).
internal_purge(Q, State = #dbstate { db_conn = Conn }) ->
QStr = binary_to_escaped_string(term_to_binary(Q)),
case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of
- {selected, _, []} ->
- odbc:commit(Conn, commit),
- {ok, 0, State};
- {selected, _, [{ReadSeqId}]} ->
- odbc:sql_query(Conn, "update sequence set next_read = next_write where queue = " ++ QStr),
- {selected, _, MsgSeqIds} =
- odbc:sql_query(Conn, "select msg_id, seq_id from ledger where queue = " ++
- QStr ++ " and seq_id >= " ++ ReadSeqId),
- MsgSeqIds2 = lists:map(
- fun ({MsgIdStr, SeqIdStr}) ->
- { binary_to_term(hex_string_to_binary(MsgIdStr)),
- list_to_integer(SeqIdStr) }
- end, MsgSeqIds),
- {ok, State2} = remove_messages(Q, MsgSeqIds2, true, State),
- {ok, length(MsgSeqIds2), State2}
+ {selected, _, []} ->
+ odbc:commit(Conn, commit),
+ {ok, 0, State};
+ {selected, _, [{ReadSeqId}]} ->
+ odbc:sql_query(Conn, "update sequence set next_read = next_write where queue = " ++ QStr),
+ {selected, _, MsgSeqIds} =
+ odbc:sql_query(Conn, "select msg_id, seq_id from ledger where queue = " ++
+ QStr ++ " and seq_id >= " ++ ReadSeqId),
+ MsgSeqIds2 = lists:map(
+ fun ({MsgIdStr, SeqIdStr}) ->
+ { binary_to_term(hex_string_to_binary(MsgIdStr)),
+ list_to_integer(SeqIdStr) }
+ end, MsgSeqIds),
+ {ok, State2} = remove_messages(Q, MsgSeqIds2, true, State),
+ {ok, length(MsgSeqIds2), State2}
end.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 2b6f7b00..3b30a0da 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -40,7 +40,7 @@
-export([publish/4, publish_with_seq/5, deliver/1, phantom_deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
- requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
+ requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1
]).
@@ -68,19 +68,19 @@
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
-record(dqstate, {msg_location_dets, %% where are messages?
- msg_location_ets, %% as above, but for ets version
+ msg_location_ets, %% as above, but for ets version
operation_mode, %% ram_disk | disk_only
- file_summary, %% what's in the files?
- sequences, %% next read and write for each q
- current_file_num, %% current file name as number
- current_file_name, %% current file name
- current_file_handle, %% current file handle
- current_offset, %% current offset within current file
+ file_summary, %% what's in the files?
+ sequences, %% next read and write for each q
+ current_file_num, %% current file name as number
+ current_file_name, %% current file name
+ current_file_handle, %% current file handle
+ current_offset, %% current offset within current file
current_dirty, %% has the current file been written to since the last fsync?
- file_size_limit, %% how big can our files get?
- read_file_handles, %% file handles for reading (LRU)
- read_file_handles_limit %% how many file handles can we open?
- }).
+ file_size_limit, %% how big can our files get?
+ read_file_handles, %% file handles for reading (LRU)
+ read_file_handles_limit %% how many file handles can we open?
+ }).
%% The components:
%%
@@ -92,10 +92,10 @@
%% {Q, ReadSeqId, WriteSeqId, QueueLength}
%% rabbit_disk_queue: this is an mnesia table which contains:
%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
-%% is_delivered = IsDelivered,
-%% msg_id = MsgId,
+%% is_delivered = IsDelivered,
+%% msg_id = MsgId,
%% next_seq_id = SeqId
-%% }
+%% }
%%
%% The basic idea is that messages are appended to the current file up
@@ -190,18 +190,18 @@
%% variable. Judicious use of a mirror is required).
%%
%% +-------+ +-------+ +-------+
-%% | X | | G | | G |
-%% +-------+ +-------+ +-------+
-%% | D | | X | | F |
-%% +-------+ +-------+ +-------+
-%% | X | | X | | E |
-%% +-------+ +-------+ +-------+
+%% | X | | G | | G |
+%% +-------+ +-------+ +-------+
+%% | D | | X | | F |
+%% +-------+ +-------+ +-------+
+%% | X | | X | | E |
+%% +-------+ +-------+ +-------+
%% | C | | F | ===> | D |
-%% +-------+ +-------+ +-------+
-%% | X | | X | | C |
-%% +-------+ +-------+ +-------+
-%% | B | | X | | B |
-%% +-------+ +-------+ +-------+
+%% +-------+ +-------+ +-------+
+%% | X | | X | | C |
+%% +-------+ +-------+ +-------+
+%% | B | | X | | B |
+%% +-------+ +-------+ +-------+
%% | A | | E | | A |
%% +-------+ +-------+ +-------+
%% left right left
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 31c0fb10..5082fe55 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -52,7 +52,7 @@
start_link(Queue, IsDurable, disk) ->
purge_non_persistent_messages(
#mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- is_durable = IsDurable, length = 0 });
+ is_durable = IsDurable, length = 0 });
start_link(Queue, IsDurable, mixed) ->
{ok, State} = start_link(Queue, IsDurable, disk),
to_mixed_mode(State).
@@ -111,12 +111,12 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) ->
{ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
- is_durable = IsDurable }) ->
+ is_durable = IsDurable }) ->
%% iterate through the content on disk, ack anything which isn't
%% persistent, accumulate everything else that is persistent and
%% requeue it
{Acks, Requeue, Length} =
- deliver_all_messages(Q, IsDurable, [], [], 0),
+ deliver_all_messages(Q, IsDurable, [], [], 0),
ok = if Requeue == [] -> ok;
true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
end,
@@ -127,19 +127,19 @@ purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) ->
case rabbit_disk_queue:deliver(Q) of
- empty -> {Acks, Requeue, Length};
- {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} ->
- #basic_message { guid = MsgId, is_persistent = IsPersistent } =
- bin_to_msg(MsgBin),
- OnDisk = IsPersistent andalso IsDurable,
- {Acks2, Requeue2, Length2} =
- if OnDisk -> {Acks,
- [{AckTag, {next, IsDelivered}} | Requeue],
- Length + 1
- };
- true -> {[AckTag | Acks], Requeue, Length}
- end,
- deliver_all_messages(Q, IsDurable, Acks2, Requeue2, Length2)
+ empty -> {Acks, Requeue, Length};
+ {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} ->
+ #basic_message { guid = MsgId, is_persistent = IsPersistent } =
+ bin_to_msg(MsgBin),
+ OnDisk = IsPersistent andalso IsDurable,
+ {Acks2, Requeue2, Length2} =
+ if OnDisk -> {Acks,
+ [{AckTag, {next, IsDelivered}} | Requeue],
+ Length + 1
+ };
+ true -> {[AckTag | Acks], Requeue, Length}
+ end,
+ deliver_all_messages(Q, IsDurable, Acks2, Requeue2, Length2)
end.
msg_to_bin(Msg = #basic_message { content = Content }) ->
@@ -167,7 +167,7 @@ publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
%% Assumption here is that the queue is empty already (only called via
%% attempt_immediate_delivery).
publish_delivered(Msg =
- #basic_message { guid = MsgId, is_persistent = IsPersistent},
+ #basic_message { guid = MsgId, is_persistent = IsPersistent},
State = #mqstate { mode = Mode, is_durable = IsDurable,
queue = Q, length = 0 })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
@@ -235,7 +235,7 @@ ack(Acks, State = #mqstate { queue = Q }) ->
end.
tx_publish(Msg = #basic_message { guid = MsgId },
- State = #mqstate { mode = disk }) ->
+ State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
@@ -254,9 +254,9 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q,
length = Length }) ->
RealAcks = remove_noacks(Acks),
ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
- true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
- RealAcks)
- end,
+ true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
+ RealAcks)
+ end,
{ok, State #mqstate { length = Length + erlang:length(Publishes) }};
tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
@@ -266,7 +266,7 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
{PersistentPubs, MsgBuf2} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
{Acc, MsgBuf3}) ->
- OnDisk = IsPersistent andalso IsDurable,
+ OnDisk = IsPersistent andalso IsDurable,
Acc2 =
if OnDisk ->
[Msg #basic_message.guid | Acc];
@@ -279,32 +279,32 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
%% requirements of rabbit_disk_queue (ascending SeqIds)
RealAcks = remove_noacks(Acks),
ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok;
- true ->
- rabbit_disk_queue:tx_commit(
- Q, lists:reverse(PersistentPubs), RealAcks)
- end,
+ true ->
+ rabbit_disk_queue:tx_commit(
+ Q, lists:reverse(PersistentPubs), RealAcks)
+ end,
{ok, State #mqstate { msg_buf = MsgBuf2,
length = Length + erlang:length(Publishes) }}.
only_persistent_msg_ids(Pubs) ->
lists:reverse(
lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
- if IsPersistent -> [Msg #basic_message.guid | Acc];
- true -> Acc
- end
- end, [], Pubs)).
+ fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
+ if IsPersistent -> [Msg #basic_message.guid | Acc];
+ true -> Acc
+ end
+ end, [], Pubs)).
tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)),
{ok, State};
tx_cancel(Publishes,
- State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
+ State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
ok =
- if IsDurable ->
- rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes));
- true -> ok
- end,
+ if IsDurable ->
+ rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes));
+ true -> ok
+ end,
{ok, State}.
%% [{Msg, AckTag}]
@@ -337,16 +337,16 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
}) ->
{PersistentPubs, MsgBuf2} =
lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
- {Acc, MsgBuf3}) ->
- OnDisk = IsDurable andalso IsPersistent,
- Acc2 =
- if OnDisk -> [AckTag | Acc];
- true -> Acc
- end,
- MsgBuf4 = queue:in({Msg, true, OnDisk}, MsgBuf3),
- {Acc2, MsgBuf4}
- end, {[], MsgBuf}, MessagesWithAckTags),
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
+ {Acc, MsgBuf3}) ->
+ OnDisk = IsDurable andalso IsPersistent,
+ Acc2 =
+ if OnDisk -> [AckTag | Acc];
+ true -> Acc
+ end,
+ MsgBuf4 = queue:in({Msg, true, OnDisk}, MsgBuf3),
+ {Acc2, MsgBuf4}
+ end, {[], MsgBuf}, MessagesWithAckTags),
ok = if [] == PersistentPubs -> ok;
true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
end,